diff --git a/qw/client.py b/qw/client.py index 2433436..7a543b2 100644 --- a/qw/client.py +++ b/qw/client.py @@ -12,7 +12,6 @@ import jsonpickle import orjson import uvloop -# from dataintegration.utils.parserqs import is_parseable from navconfig.logging import logging from qw.discovery import get_client_discovery from qw.utils import make_signature @@ -47,7 +46,7 @@ def random_worker(workers): try: yield random.choice(workers) except (ValueError, TypeError) as ex: - raise Exception( + raise QWException( "Cannot launch Work on Empty Worker List" ) from ex @@ -57,7 +56,7 @@ def round_robin_worker(workers): try: return next(workers) except (ValueError, TypeError) as ex: - raise Exception( + raise QWException( f"Bad Worker list: {ex}" ) from ex except StopIteration: @@ -408,7 +407,9 @@ async def queue(self, fn: Any, *args, **kwargs): if reader.at_eof(): break except Exception as err: - raise Exception from err + raise QWException( + str(err) + ) from err finally: await self.close(writer) received = cloudpickle.loads(serialized_result) diff --git a/qw/version.py b/qw/version.py index 689f5b6..7715ffb 100644 --- a/qw/version.py +++ b/qw/version.py @@ -5,7 +5,7 @@ __title__ = 'qworker' __description__ = ('QueueWorker is asynchronous Task Queue implementation built on to of Asyncio.' 'Can you spawn distributed workers to run functions inside workers.') -__version__ = '1.6.0' +__version__ = '1.6.1' __author__ = 'Jesus Lara' __author_email__ = 'jesuslarag@gmail.com' __license__ = 'MIT' diff --git a/qw/wrappers/di.py b/qw/wrappers/di.py index eb557a3..7516570 100644 --- a/qw/wrappers/di.py +++ b/qw/wrappers/di.py @@ -4,8 +4,8 @@ """ import logging import multiprocessing as mp -from dataintegration.task import Task -from dataintegration.exceptions import ( +from flowtask.tasks.task import Task +from flowtask.exceptions import ( TaskNotFound, TaskError, TaskFailed diff --git a/qw/wrappers/di_task.py b/qw/wrappers/di_task.py index 00a9707..79c9280 100644 --- a/qw/wrappers/di_task.py +++ b/qw/wrappers/di_task.py @@ -5,8 +5,8 @@ import multiprocessing as mp from navconfig.logging import logging try: - from dataintegration.task import Task - from dataintegration.exceptions import ( + from flowtask.tasks.task import Task + from flowtask.exceptions import ( TaskNotFound, TaskError, TaskFailed