Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

new fixes on QW #150

Merged
merged 1 commit into from
Jun 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions qw/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ def get_worker_list(workers: list):
WORKER_RETRY_INTERVAL = config.getint('WORKER_RETRY_INTERVAL', fallback=10)
WORKER_RETRY_COUNT = config.getint('WORKER_RETRY_COUNT', fallback=3)
WORKER_CONCURRENCY_NUMBER = config.getint('WORKER_CONCURRENCY_NUMBER', fallback=4)
WORKER_TASK_TIMEOUT = config.getint('WORKER_TASK_TIMEOUT', fallback=1)

## Queue Consumed Callback
WORKER_QUEUE_CALLBACK = config.get(
Expand Down
78 changes: 75 additions & 3 deletions qw/executor/__init__.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,16 @@
import asyncio
import inspect
from concurrent.futures import ThreadPoolExecutor
import multiprocessing as mp
from navconfig.logging import logging
from notify.providers.telegram import Telegram
from notify.models import Chat
from flowtask.conf import (
EVENT_CHAT_ID,
ENVIRONMENT
)
from ..wrappers import QueueWrapper, FuncWrapper, TaskWrapper
from ..conf import WORKER_CONCURRENCY_NUMBER
from ..conf import WORKER_CONCURRENCY_NUMBER, WORKER_TASK_TIMEOUT

class TaskExecutor:
def __init__(self, task, *args, **kwargs):
Expand All @@ -18,32 +25,97 @@ async def run_task(self):
)
try:
await self.task.create()
result = await self.task.run()
task = asyncio.create_task(self.task.run())
task.add_done_callback(self.task_done)
_, pending = await asyncio.wait(
[task], timeout=WORKER_TASK_TIMEOUT * 60
)
if task in pending:
await self.task_pending(task)
task.cancel()
raise asyncio.TimeoutError(
f"Task {self.task} was cancelled."
)
# get the result:
result = task.result()
except Exception as err: # pylint: disable=W0703
self.logger.error(
f"An Error occurred while running Task {self.task}"
)
result = err
finally:
await self.task.close()
return result

def task_done(self, task, *args, **kwargs):
self.logger.notice(
f"Finalized Task {task}"
)

def get_notify(self):
# TODO: implement other notify connectors:
# defining the Default chat object:
recipient = Chat(**{"chat_id": EVENT_CHAT_ID, "chat_name": "Navigator"})
# send notifications to Telegram bot
tm = Telegram()
return [tm, recipient]

async def task_pending(self, task, *args, **kwargs):
worker = mp.current_process()
message = f"Task {self.task!s} got timeout and was cancelled at {worker.name!s}"
self.logger.error(
message
)
try:
ntf, recipients = self.get_notify()
message = f'⚠️ ::{ENVIRONMENT} - {message!s}.'
args = {
"recipient": [recipients],
"message": message,
"disable_notification": True
}
try:
async with ntf as conn:
result = await conn.send(**args)
return result
except RuntimeError:
pass
except Exception as err:
self.logger.error(
f"Error Sending Task Failure notification: {err}"
)

async def run(self):
result = None
try:
loop = asyncio.get_running_loop()
except RuntimeError:
loop = asyncio.get_event_loop()
try:
if isinstance(self.task, (FuncWrapper, QueueWrapper)):
if type(self.task) in (FuncWrapper, QueueWrapper):
self.logger.notice(
f"Running Function: {self.task}"
)
self.task.set_loop(loop)
result = await self.task()
elif isinstance(self.task, TaskWrapper):
self.logger.notice(
f"Running Task: {self.task}"
)
self.task.set_loop(loop)
async with self.semaphore:
result = await self.run_task()
elif (
inspect.isawaitable(self.task) or asyncio.iscoroutinefunction(self.task)
):
self.logger.notice(
f"Running Awaitable Func: {self.task}"
)
result = await self.task()
else:
self.logger.notice(
f"Running Blocking Function: {self.task}"
)
with ThreadPoolExecutor(max_workers=2) as executor:
future = loop.run_in_executor(executor, self.task)
result = await future
Expand Down
4 changes: 3 additions & 1 deletion qw/queues/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,10 @@ async def queue_handler(self):
try:
executor = TaskExecutor(task)
result = await executor.run()
if type(result) == asyncio.TimeoutError:
raise
if isinstance(result, BaseException):
if task.retries < WORKER_RETRY_COUNT:
if task.retries <= WORKER_RETRY_COUNT:
task.add_retries()
self.logger.warning(
f"Task {task} failed. Retrying. Retry count: {task.retries}"
Expand Down
2 changes: 1 addition & 1 deletion qw/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,7 @@ async def connection_handler(
# first time: check signature authentication of payload:
if not await self.signature_validation(reader, writer):
return False
self.logger.debug(
self.logger.info(
f"Received Data from {addr!r} to worker {self.name!s} pid: {self._pid}"
)
# after: deserialize Task:
Expand Down
2 changes: 1 addition & 1 deletion qw/version.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
__description__ = ('QueueWorker is asynchronous Task Queue implementation '
'built on top of Asyncio.'
'Can you spawn distributed workers to run functions inside workers.')
__version__ = '1.8.3'
__version__ = '1.8.4'
__author__ = 'Jesus Lara'
__author_email__ = 'jesuslarag@gmail.com'
__license__ = 'MIT'
Expand Down