Skip to content

Commit

Permalink
Merge pull request #98 from phenobarbital/new-version
Browse files Browse the repository at this point in the history
Worker Functionality
  • Loading branch information
phenobarbital committed Apr 10, 2023
2 parents f3c5f55 + 24cd2a6 commit 67cc241
Show file tree
Hide file tree
Showing 4 changed files with 10 additions and 9 deletions.
9 changes: 5 additions & 4 deletions qw/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import jsonpickle
import orjson
import uvloop
# from dataintegration.utils.parserqs import is_parseable
from navconfig.logging import logging
from qw.discovery import get_client_discovery
from qw.utils import make_signature
Expand Down Expand Up @@ -47,7 +46,7 @@ def random_worker(workers):
try:
yield random.choice(workers)
except (ValueError, TypeError) as ex:
raise Exception(
raise QWException(
"Cannot launch Work on Empty Worker List"
) from ex

Expand All @@ -57,7 +56,7 @@ def round_robin_worker(workers):
try:
return next(workers)
except (ValueError, TypeError) as ex:
raise Exception(
raise QWException(
f"Bad Worker list: {ex}"
) from ex
except StopIteration:
Expand Down Expand Up @@ -408,7 +407,9 @@ async def queue(self, fn: Any, *args, **kwargs):
if reader.at_eof():
break
except Exception as err:
raise Exception from err
raise QWException(
str(err)
) from err
finally:
await self.close(writer)
received = cloudpickle.loads(serialized_result)
Expand Down
2 changes: 1 addition & 1 deletion qw/version.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
__title__ = 'qworker'
__description__ = ('QueueWorker is asynchronous Task Queue implementation built on to of Asyncio.'
'Can you spawn distributed workers to run functions inside workers.')
__version__ = '1.6.0'
__version__ = '1.6.1'
__author__ = 'Jesus Lara'
__author_email__ = 'jesuslarag@gmail.com'
__license__ = 'MIT'
Expand Down
4 changes: 2 additions & 2 deletions qw/wrappers/di.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
"""
import logging
import multiprocessing as mp
from dataintegration.task import Task
from dataintegration.exceptions import (
from flowtask.tasks.task import Task
from flowtask.exceptions import (
TaskNotFound,
TaskError,
TaskFailed
Expand Down
4 changes: 2 additions & 2 deletions qw/wrappers/di_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
import multiprocessing as mp
from navconfig.logging import logging
try:
from dataintegration.task import Task
from dataintegration.exceptions import (
from flowtask.tasks.task import Task
from flowtask.exceptions import (
TaskNotFound,
TaskError,
TaskFailed
Expand Down

0 comments on commit 67cc241

Please sign in to comment.