Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Updated version of Queue Manager #152

Merged
merged 1 commit into from
Jun 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion qw/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ def get_worker_list(workers: list):
RESOURCE_THRESHOLD = config.getint('RESOURCE_THRESHOLD', fallback=90)
CHECK_RESOURCE_USAGE = config.getboolean('CHECK_RESOURCE_USAGE', fallback=True)
WORKER_RETRY_INTERVAL = config.getint('WORKER_RETRY_INTERVAL', fallback=10)
WORKER_RETRY_COUNT = config.getint('WORKER_RETRY_COUNT', fallback=3)
WORKER_RETRY_COUNT = config.getint('WORKER_RETRY_COUNT', fallback=2)
WORKER_CONCURRENCY_NUMBER = config.getint('WORKER_CONCURRENCY_NUMBER', fallback=4)
WORKER_TASK_TIMEOUT = config.getint('WORKER_TASK_TIMEOUT', fallback=30)

Expand Down
6 changes: 4 additions & 2 deletions qw/executor/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ async def run_task(self):
result = task.result()
except Exception as err: # pylint: disable=W0703
self.logger.error(
f"An Error occurred while running Task {self.task}"
f"An Error occurred while running Task {self.task}: {err}"
)
result = err
finally:
Expand All @@ -55,7 +55,9 @@ def task_done(self, task, *args, **kwargs):
def get_notify(self):
# TODO: implement other notify connectors:
# defining the Default chat object:
recipient = Chat(**{"chat_id": EVENT_CHAT_ID, "chat_name": "Navigator"})
recipient = Chat(
**{"chat_id": EVENT_CHAT_ID, "chat_name": "Navigator"}
)
# send notifications to Telegram bot
tm = Telegram()
return [tm, recipient]
Expand Down
15 changes: 11 additions & 4 deletions qw/queues/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@
from collections.abc import Awaitable, Callable
import importlib
from navconfig.logging import logging
from flowtask.exceptions import (
DataNotFound,
FileNotFound
)
from qw.exceptions import QWException
from ..conf import (
WORKER_QUEUE_SIZE,
Expand All @@ -14,6 +18,7 @@
from ..executor import TaskExecutor
from ..wrappers.base import QueueWrapper


class QueueManager:
"""base Class for all Queue Managers in Queue Worker.
"""
Expand Down Expand Up @@ -94,8 +99,8 @@ async def put(self, task: QueueWrapper, id: str):
task (QueueWrapper): an instance of QueueWrapper
"""
try:
# await self.queue.put(task)
_ = asyncio.create_task(self.queue.put(task))
await self.queue.put(task)
await asyncio.sleep(.1)
self.logger.info(
f'Task {task!s} with id {id} was queued at {int(time.time())}'
)
Expand Down Expand Up @@ -137,8 +142,10 @@ async def queue_handler(self):
result = await executor.run()
if type(result) == asyncio.TimeoutError:
raise
if isinstance(result, BaseException):
if task.retries <= WORKER_RETRY_COUNT:
elif type(result) in (DataNotFound, FileNotFound):
raise
elif isinstance(result, BaseException):
if task.retries < WORKER_RETRY_COUNT:
task.add_retries()
self.logger.warning(
f"Task {task} failed. Retrying. Retry count: {task.retries}"
Expand Down
2 changes: 1 addition & 1 deletion qw/version.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
__description__ = ('QueueWorker is asynchronous Task Queue implementation '
'built on top of Asyncio.'
'Can you spawn distributed workers to run functions inside workers.')
__version__ = '1.8.5'
__version__ = '1.8.6'
__author__ = 'Jesus Lara'
__author_email__ = 'jesuslarag@gmail.com'
__license__ = 'MIT'
Expand Down
15 changes: 12 additions & 3 deletions qw/wrappers/di_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
try:
from flowtask.tasks.task import Task
from flowtask.exceptions import (
TaskException,
TaskNotFound,
TaskError,
TaskFailed
Expand Down Expand Up @@ -47,6 +48,9 @@ def __init__(self, program, task, *args, task_id: str = None, **kwargs):
def task_id(self):
return f'{self.id!s}'

def task_obj(self):
return self._task

def __repr__(self):
return f'Task(task={self.task}, program={self.program}, debug={self._debug})'

Expand Down Expand Up @@ -117,6 +121,8 @@ async def run(self):
)
except Exception as err:
logging.error(str(err), exc_info=True)
if isinstance(err, TaskException):
raise
raise TaskFailed(
f"{err}"
) from err
Expand All @@ -127,9 +133,12 @@ async def run(self):
result = await task.run()
except Exception as err:
logging.exception(err, stack_info=False)
raise TaskFailed(
f"{err}"
) from err
if isinstance(err, TaskException):
raise
else:
raise TaskFailed(
f"{err}"
) from err
return result

async def close(self):
Expand Down