Skip to content

Commit

Permalink
Merge pull request #179 from phenobarbital/queue-manager
Browse files Browse the repository at this point in the history
deal with queue full
  • Loading branch information
phenobarbital authored Jul 5, 2023
2 parents 2ef717f + 7e16f8c commit ef15682
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 8 deletions.
3 changes: 3 additions & 0 deletions qw/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -467,6 +467,8 @@ async def queue(self, fn: Any, *args, use_wrapper: bool = True, **kwargs):
await self.close(writer)
received = cloudpickle.loads(serialized_result)
# we dont need the result, return true
if isinstance(received, (QWException, asyncio.QueueFull)):
raise
serialized_result = {
"status": "Queued",
"task": f"{func!r}",
Expand All @@ -477,6 +479,7 @@ async def queue(self, fn: Any, *args, use_wrapper: bool = True, **kwargs):
self.logger.exception(
f'Error Serializing Task: {err!s}'
)
raise QWException(err)

async def publish(self, fn: Any, *args, use_wrapper: bool = True, **kwargs):
"""Publish a function into a Pub/Sub Channel.
Expand Down
28 changes: 21 additions & 7 deletions qw/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,18 @@ async def discard_task(self, message: str, writer: asyncio.StreamWriter):
)
return False

async def queue_full(self, message: str, writer: asyncio.StreamWriter):
exc = asyncio.QueueFull(
message
)
self.logger.error(message)
result = cloudpickle.dumps(exc)
await self.closing_writer(
writer,
result
)
return False

async def signature_validation(
self,
reader: asyncio.StreamReader,
Expand Down Expand Up @@ -514,13 +526,15 @@ async def handle_queue_wrapper(
)
result = f'Task {task!s} with id {uid} was queued.'.encode('utf-8')
return await self.return_result(writer, result, task, uid)
except asyncio.QueueFull:
return await self.discard_task(
f"Queue in {self.name!s} is Full, discarding Task {task!r}"
except asyncio.QueueFull as ex:
return await self.queue_full(
message=f"Queue in {self.name!s} is Full, discarding Task {task!r}",
writer=writer
)
except asyncio.TimeoutError:
return await self.discard_task(
f"Task {task!r} in {self.name!s} discarded due Timeout"
f"Task {task!r} in {self.name!s} discarded due Timeout",
writer=writer
)
else:
result = None
Expand Down Expand Up @@ -585,9 +599,9 @@ async def connection_handler(
await self.queue.put(task, id=task_uuid)
result = f'Task {task!s} was Queued.'.encode('utf-8')
return await self.return_result(writer, result, task, task_uuid)
except asyncio.QueueFull:
return await self.discard_task(
message=f'Task {task!s} was discarded, queue full',
except asyncio.QueueFull as exc:
return await self.queue_full(
message=f'Queue Full, Task {task!s} was discarded',
writer=writer
)
except Exception as exc:
Expand Down
2 changes: 1 addition & 1 deletion qw/version.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
__description__ = ('QueueWorker is asynchronous Task Queue implementation '
'built on top of Asyncio.'
'Can you spawn distributed workers to run functions inside workers.')
__version__ = '1.9.9'
__version__ = '1.9.10'
__author__ = 'Jesus Lara'
__author_email__ = 'jesuslarag@gmail.com'
__license__ = 'MIT'
Expand Down

0 comments on commit ef15682

Please sign in to comment.