Skip to content

Commit

Permalink
Merge pull request #141 from phenobarbital/bugfix-task-execution
Browse files Browse the repository at this point in the history
fixed execution queued and directly
  • Loading branch information
phenobarbital authored Jun 2, 2023
2 parents 57bf445 + 09eb68c commit c6f5c7d
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 7 deletions.
14 changes: 8 additions & 6 deletions qw/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ def run_process(self, fn, loop):
fn.set_loop(loop)
try:
result = loop.run_until_complete(
self.run_function(fn, loop)
self.run_function(fn)
)
print("run_process completed normally")
return result
Expand Down Expand Up @@ -271,13 +271,11 @@ async def task_handler(self, q: asyncio.Queue):
f'Running Queued Task: {task!s}'
)
try:
# fn = partial(self.run_function, task)
fn = loop.create_task(self.run_task(task))
result = await fn
# with ThreadPoolExecutor(max_workers=2) as pool:
# future = loop.run_in_executor(pool, fn)
# result = await future
print(f'RESULT > {result}')
except (RuntimeError) as err:
raise QWException(
f"Error: {err}"
Expand Down Expand Up @@ -481,9 +479,13 @@ async def handle_queue_wrapper(
else:
try:
# executed and send result to client
loop = asyncio.get_running_loop()
task.id = uid
fn = partial(self.run_process, task)
return await self._loop.run_in_executor(self._executor, fn)
task.set_loop(loop)
fn = loop.create_task(
self.run_function(task)
)
return await fn
except Exception as err: # pylint: disable=W0703
try:
result = cloudpickle.dumps(err)
Expand Down Expand Up @@ -530,7 +532,7 @@ async def connection_handler(
return False
elif callable(task):
result = await self.run_function(
task, self._loop
task
)
else:
# put work in Queue:
Expand Down
2 changes: 1 addition & 1 deletion qw/version.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
__description__ = ('QueueWorker is asynchronous Task Queue implementation '
'built on top of Asyncio.'
'Can you spawn distributed workers to run functions inside workers.')
__version__ = '1.7.7'
__version__ = '1.7.8'
__author__ = 'Jesus Lara'
__author_email__ = 'jesuslarag@gmail.com'
__license__ = 'MIT'
Expand Down
1 change: 1 addition & 0 deletions qw/wrappers/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ def __init__(self, *args, **kwargs):
self._id = uuid.uuid4()
self.args = args
self.kwargs = kwargs
self.loop = None

@property
def queued(self):
Expand Down

0 comments on commit c6f5c7d

Please sign in to comment.