diff --git a/qw/server.py b/qw/server.py index a5caa0d..f7f87ba 100644 --- a/qw/server.py +++ b/qw/server.py @@ -209,7 +209,7 @@ def run_process(self, fn, loop): fn.set_loop(loop) try: result = loop.run_until_complete( - self.run_function(fn, loop) + self.run_function(fn) ) print("run_process completed normally") return result @@ -271,13 +271,11 @@ async def task_handler(self, q: asyncio.Queue): f'Running Queued Task: {task!s}' ) try: - # fn = partial(self.run_function, task) fn = loop.create_task(self.run_task(task)) result = await fn # with ThreadPoolExecutor(max_workers=2) as pool: # future = loop.run_in_executor(pool, fn) # result = await future - print(f'RESULT > {result}') except (RuntimeError) as err: raise QWException( f"Error: {err}" @@ -481,9 +479,13 @@ async def handle_queue_wrapper( else: try: # executed and send result to client + loop = asyncio.get_running_loop() task.id = uid - fn = partial(self.run_process, task) - return await self._loop.run_in_executor(self._executor, fn) + task.set_loop(loop) + fn = loop.create_task( + self.run_function(task) + ) + return await fn except Exception as err: # pylint: disable=W0703 try: result = cloudpickle.dumps(err) @@ -530,7 +532,7 @@ async def connection_handler( return False elif callable(task): result = await self.run_function( - task, self._loop + task ) else: # put work in Queue: diff --git a/qw/version.py b/qw/version.py index 3fe7f41..8a16c6f 100644 --- a/qw/version.py +++ b/qw/version.py @@ -6,7 +6,7 @@ __description__ = ('QueueWorker is asynchronous Task Queue implementation ' 'built on top of Asyncio.' 'Can you spawn distributed workers to run functions inside workers.') -__version__ = '1.7.7' +__version__ = '1.7.8' __author__ = 'Jesus Lara' __author_email__ = 'jesuslarag@gmail.com' __license__ = 'MIT' diff --git a/qw/wrappers/base.py b/qw/wrappers/base.py index 02e5c30..fbd60fa 100644 --- a/qw/wrappers/base.py +++ b/qw/wrappers/base.py @@ -18,6 +18,7 @@ def __init__(self, *args, **kwargs): self._id = uuid.uuid4() self.args = args self.kwargs = kwargs + self.loop = None @property def queued(self):