Skip to content

Commit

Permalink
Merge pull request #150 from phenobarbital/queue-manager
Browse files Browse the repository at this point in the history
new fixes on QW
  • Loading branch information
phenobarbital authored Jun 6, 2023
2 parents 00c1998 + 2a87412 commit 56f710b
Show file tree
Hide file tree
Showing 5 changed files with 81 additions and 6 deletions.
1 change: 1 addition & 0 deletions qw/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ def get_worker_list(workers: list):
WORKER_RETRY_INTERVAL = config.getint('WORKER_RETRY_INTERVAL', fallback=10)
WORKER_RETRY_COUNT = config.getint('WORKER_RETRY_COUNT', fallback=3)
WORKER_CONCURRENCY_NUMBER = config.getint('WORKER_CONCURRENCY_NUMBER', fallback=4)
WORKER_TASK_TIMEOUT = config.getint('WORKER_TASK_TIMEOUT', fallback=1)

## Queue Consumed Callback
WORKER_QUEUE_CALLBACK = config.get(
Expand Down
78 changes: 75 additions & 3 deletions qw/executor/__init__.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,16 @@
import asyncio
import inspect
from concurrent.futures import ThreadPoolExecutor
import multiprocessing as mp
from navconfig.logging import logging
from notify.providers.telegram import Telegram
from notify.models import Chat
from flowtask.conf import (
EVENT_CHAT_ID,
ENVIRONMENT
)
from ..wrappers import QueueWrapper, FuncWrapper, TaskWrapper
from ..conf import WORKER_CONCURRENCY_NUMBER
from ..conf import WORKER_CONCURRENCY_NUMBER, WORKER_TASK_TIMEOUT

class TaskExecutor:
def __init__(self, task, *args, **kwargs):
Expand All @@ -18,32 +25,97 @@ async def run_task(self):
)
try:
await self.task.create()
result = await self.task.run()
task = asyncio.create_task(self.task.run())
task.add_done_callback(self.task_done)
_, pending = await asyncio.wait(
[task], timeout=WORKER_TASK_TIMEOUT * 60
)
if task in pending:
await self.task_pending(task)
task.cancel()
raise asyncio.TimeoutError(
f"Task {self.task} was cancelled."
)
# get the result:
result = task.result()
except Exception as err: # pylint: disable=W0703
self.logger.error(
f"An Error occurred while running Task {self.task}"
)
result = err
finally:
await self.task.close()
return result

def task_done(self, task, *args, **kwargs):
self.logger.notice(
f"Finalized Task {task}"
)

def get_notify(self):
# TODO: implement other notify connectors:
# defining the Default chat object:
recipient = Chat(**{"chat_id": EVENT_CHAT_ID, "chat_name": "Navigator"})
# send notifications to Telegram bot
tm = Telegram()
return [tm, recipient]

async def task_pending(self, task, *args, **kwargs):
worker = mp.current_process()
message = f"Task {self.task!s} got timeout and was cancelled at {worker.name!s}"
self.logger.error(
message
)
try:
ntf, recipients = self.get_notify()
message = f'⚠️ ::{ENVIRONMENT} - {message!s}.'
args = {
"recipient": [recipients],
"message": message,
"disable_notification": True
}
try:
async with ntf as conn:
result = await conn.send(**args)
return result
except RuntimeError:
pass
except Exception as err:
self.logger.error(
f"Error Sending Task Failure notification: {err}"
)

async def run(self):
result = None
try:
loop = asyncio.get_running_loop()
except RuntimeError:
loop = asyncio.get_event_loop()
try:
if isinstance(self.task, (FuncWrapper, QueueWrapper)):
if type(self.task) in (FuncWrapper, QueueWrapper):
self.logger.notice(
f"Running Function: {self.task}"
)
self.task.set_loop(loop)
result = await self.task()
elif isinstance(self.task, TaskWrapper):
self.logger.notice(
f"Running Task: {self.task}"
)
self.task.set_loop(loop)
async with self.semaphore:
result = await self.run_task()
elif (
inspect.isawaitable(self.task) or asyncio.iscoroutinefunction(self.task)
):
self.logger.notice(
f"Running Awaitable Func: {self.task}"
)
result = await self.task()
else:
self.logger.notice(
f"Running Blocking Function: {self.task}"
)
with ThreadPoolExecutor(max_workers=2) as executor:
future = loop.run_in_executor(executor, self.task)
result = await future
Expand Down
4 changes: 3 additions & 1 deletion qw/queues/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,10 @@ async def queue_handler(self):
try:
executor = TaskExecutor(task)
result = await executor.run()
if type(result) == asyncio.TimeoutError:
raise
if isinstance(result, BaseException):
if task.retries < WORKER_RETRY_COUNT:
if task.retries <= WORKER_RETRY_COUNT:
task.add_retries()
self.logger.warning(
f"Task {task} failed. Retrying. Retry count: {task.retries}"
Expand Down
2 changes: 1 addition & 1 deletion qw/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,7 @@ async def connection_handler(
# first time: check signature authentication of payload:
if not await self.signature_validation(reader, writer):
return False
self.logger.debug(
self.logger.info(
f"Received Data from {addr!r} to worker {self.name!s} pid: {self._pid}"
)
# after: deserialize Task:
Expand Down
2 changes: 1 addition & 1 deletion qw/version.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
__description__ = ('QueueWorker is asynchronous Task Queue implementation '
'built on top of Asyncio.'
'Can you spawn distributed workers to run functions inside workers.')
__version__ = '1.8.3'
__version__ = '1.8.4'
__author__ = 'Jesus Lara'
__author_email__ = 'jesuslarag@gmail.com'
__license__ = 'MIT'
Expand Down

0 comments on commit 56f710b

Please sign in to comment.