Skip to content

Commit

Permalink
Merge pull request #152 from phenobarbital/queue-manager
Browse files Browse the repository at this point in the history
Updated version of Queue Manager
  • Loading branch information
phenobarbital authored Jun 7, 2023
2 parents 984c2b6 + d448de7 commit 9efc088
Show file tree
Hide file tree
Showing 5 changed files with 29 additions and 11 deletions.
2 changes: 1 addition & 1 deletion qw/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ def get_worker_list(workers: list):
RESOURCE_THRESHOLD = config.getint('RESOURCE_THRESHOLD', fallback=90)
CHECK_RESOURCE_USAGE = config.getboolean('CHECK_RESOURCE_USAGE', fallback=True)
WORKER_RETRY_INTERVAL = config.getint('WORKER_RETRY_INTERVAL', fallback=10)
WORKER_RETRY_COUNT = config.getint('WORKER_RETRY_COUNT', fallback=3)
WORKER_RETRY_COUNT = config.getint('WORKER_RETRY_COUNT', fallback=2)
WORKER_CONCURRENCY_NUMBER = config.getint('WORKER_CONCURRENCY_NUMBER', fallback=4)
WORKER_TASK_TIMEOUT = config.getint('WORKER_TASK_TIMEOUT', fallback=30)

Expand Down
6 changes: 4 additions & 2 deletions qw/executor/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ async def run_task(self):
result = task.result()
except Exception as err: # pylint: disable=W0703
self.logger.error(
f"An Error occurred while running Task {self.task}"
f"An Error occurred while running Task {self.task}: {err}"
)
result = err
finally:
Expand All @@ -55,7 +55,9 @@ def task_done(self, task, *args, **kwargs):
def get_notify(self):
# TODO: implement other notify connectors:
# defining the Default chat object:
recipient = Chat(**{"chat_id": EVENT_CHAT_ID, "chat_name": "Navigator"})
recipient = Chat(
**{"chat_id": EVENT_CHAT_ID, "chat_name": "Navigator"}
)
# send notifications to Telegram bot
tm = Telegram()
return [tm, recipient]
Expand Down
15 changes: 11 additions & 4 deletions qw/queues/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@
from collections.abc import Awaitable, Callable
import importlib
from navconfig.logging import logging
from flowtask.exceptions import (
DataNotFound,
FileNotFound
)
from qw.exceptions import QWException
from ..conf import (
WORKER_QUEUE_SIZE,
Expand All @@ -14,6 +18,7 @@
from ..executor import TaskExecutor
from ..wrappers.base import QueueWrapper


class QueueManager:
"""base Class for all Queue Managers in Queue Worker.
"""
Expand Down Expand Up @@ -94,8 +99,8 @@ async def put(self, task: QueueWrapper, id: str):
task (QueueWrapper): an instance of QueueWrapper
"""
try:
# await self.queue.put(task)
_ = asyncio.create_task(self.queue.put(task))
await self.queue.put(task)
await asyncio.sleep(.1)
self.logger.info(
f'Task {task!s} with id {id} was queued at {int(time.time())}'
)
Expand Down Expand Up @@ -137,8 +142,10 @@ async def queue_handler(self):
result = await executor.run()
if type(result) == asyncio.TimeoutError:
raise
if isinstance(result, BaseException):
if task.retries <= WORKER_RETRY_COUNT:
elif type(result) in (DataNotFound, FileNotFound):
raise
elif isinstance(result, BaseException):
if task.retries < WORKER_RETRY_COUNT:
task.add_retries()
self.logger.warning(
f"Task {task} failed. Retrying. Retry count: {task.retries}"
Expand Down
2 changes: 1 addition & 1 deletion qw/version.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
__description__ = ('QueueWorker is asynchronous Task Queue implementation '
'built on top of Asyncio.'
'Can you spawn distributed workers to run functions inside workers.')
__version__ = '1.8.5'
__version__ = '1.8.6'
__author__ = 'Jesus Lara'
__author_email__ = 'jesuslarag@gmail.com'
__license__ = 'MIT'
Expand Down
15 changes: 12 additions & 3 deletions qw/wrappers/di_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
try:
from flowtask.tasks.task import Task
from flowtask.exceptions import (
TaskException,
TaskNotFound,
TaskError,
TaskFailed
Expand Down Expand Up @@ -47,6 +48,9 @@ def __init__(self, program, task, *args, task_id: str = None, **kwargs):
def task_id(self):
return f'{self.id!s}'

def task_obj(self):
return self._task

def __repr__(self):
return f'Task(task={self.task}, program={self.program}, debug={self._debug})'

Expand Down Expand Up @@ -117,6 +121,8 @@ async def run(self):
)
except Exception as err:
logging.error(str(err), exc_info=True)
if isinstance(err, TaskException):
raise
raise TaskFailed(
f"{err}"
) from err
Expand All @@ -127,9 +133,12 @@ async def run(self):
result = await task.run()
except Exception as err:
logging.exception(err, stack_info=False)
raise TaskFailed(
f"{err}"
) from err
if isinstance(err, TaskException):
raise
else:
raise TaskFailed(
f"{err}"
) from err
return result

async def close(self):
Expand Down

0 comments on commit 9efc088

Please sign in to comment.