Skip to content

Commit

Permalink
#135 introduce executor abstraction with two implementations
Browse files Browse the repository at this point in the history
 * SequentialHybridExecutor - the current implementation
 * AsyncOnlyExecutor - an experimental executor that executes handlers selected by events as they come in
  • Loading branch information
walcovanloon committed Jun 20, 2023
1 parent ae56052 commit 564e925
Show file tree
Hide file tree
Showing 3 changed files with 210 additions and 28 deletions.
46 changes: 18 additions & 28 deletions orthanc_ext/event_dispatcher.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import asyncio
import inspect
import json
import logging
from dataclasses import dataclass

from orthanc_ext.executor_utilities import SequentialHybridExecutor
from orthanc_ext.http_utilities import create_internal_client, get_rest_api_base_url, \
get_certificate, ClientType
from orthanc_ext.logging_configurator import python_logging
Expand All @@ -15,7 +15,8 @@ def register_event_handlers(
orthanc_module,
sync_client,
async_client=None,
logging_configuration=python_logging):
logging_configuration=python_logging,
handler_executor=SequentialHybridExecutor):
logging_configuration(orthanc_module)

@dataclass
Expand All @@ -39,43 +40,32 @@ def create_type_index(orthanc_type):

event_handlers = {k: ensure_iterable(v) for k, v in event_handlers.items()}

executor = handler_executor(sync_client, async_client)

def unhandled_event_logger(event, _):
logging.debug(f'no handler registered for {event_types[event.change_type]}')

async def on_change_async(async_handlers):
return_values = await asyncio.gather(*async_handlers, return_exceptions=True)

for index, return_value in enumerate(return_values):
if isinstance(return_value, BaseException):
logging.exception(
'execution of %s failed; %s', async_handlers[index], repr(return_value))

return return_values

def get_validated_async_client(async_client):
if async_client is None:
raise ValueError('a configured async_client is required when using async handlers')
return async_client

def OnChange(change_type, resource_type, resource_id):
event = ChangeEvent(change_type, resource_type, resource_id)
handlers = event_handlers.get(change_type, [unhandled_event_logger])

return_values = [
handler(event, sync_client)
for handler in handlers
if not inspect.iscoroutinefunction(handler)
]
async_handlers = [
handler(event, get_validated_async_client(async_client))
for handler in handlers
if inspect.iscoroutinefunction(handler)
]
sync_handlers = get_sync_handlers(handlers)
async_handlers = get_async_handlers(handlers)

return return_values + asyncio.run(on_change_async(async_handlers))
return executor.invoke_all(event, sync_handlers, async_handlers)

orthanc_module.RegisterOnChangeCallback(OnChange)

return executor


def get_async_handlers(handlers):
return [handler for handler in handlers if inspect.iscoroutinefunction(handler)]


def get_sync_handlers(handlers):
return [handler for handler in handlers if not inspect.iscoroutinefunction(handler)]


def create_session(orthanc, client_type=ClientType.SYNC):
config = json.loads(orthanc.GetConfiguration())
Expand Down
103 changes: 103 additions & 0 deletions orthanc_ext/executor_utilities.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
import asyncio
import logging
from threading import Thread


class AsyncOnlyExecutor:
"""
Delegates sync handlers to an executor, mimicking async execution.
Executes async handlers in an event loop that runs in a separate thread.
For optimal performance, use of only async handlers is preferable.
This executor needs to be started and stopped.
"""

def __init__(self, sync_client, async_client):
self.sync_client = sync_client
self.async_client = async_client
self.loop = asyncio.new_event_loop()

self.tasks = set() # make sure tasks are not garbage collected

def run_loop(loop):
asyncio.set_event_loop(self.loop)
try:
loop.run_forever()
finally:
loop.run_until_complete(loop.shutdown_asyncgens())
loop.close()

self.thread = Thread(target=run_loop, args=(self.loop, ), daemon=True)

def invoke_all(self, event, sync_handlers, async_handlers):
tasks = [
asyncio.run_coroutine_threadsafe(
on_change_async(inject_with_event_http_client(
[handler], event, self.async_client)), self.loop) for handler in async_handlers
]

tasks.append(
self.loop.create_task(
asyncio.to_thread(
inject_with_event_http_client, sync_handlers, event, self.sync_client),
name=f'sync_handlers{sync_handlers}'))

self.tasks.update(tasks)
for task in tasks:
task.add_done_callback(self.tasks.discard)

return tasks

def start(self):
self.thread.start()

def stop(self):
if self.tasks:
logging.warning(
'about to stop event loop with %i task(s) pending: %s', len(self.tasks), self.tasks)
pending = asyncio.all_tasks(self.loop)
for task in pending:
task.cancel()

asyncio.run_coroutine_threadsafe(stop_event_loop_in_thread(self.loop), self.loop)
self.thread.join()


async def stop_event_loop_in_thread(loop):
logging.info('stopping event loop')
loop.stop()


def inject_with_event_http_client(handlers, event, client):
return [handler(event, client) for handler in handlers]


class SequentialHybridExecutor:
"""Blocking event executor that handles both sync and async handlers,
returning the gathered results in a list.
It waits for all async handlers to have completed per received event.
"""

def __init__(self, sync_client, async_client):
self.sync_client = sync_client
self.async_client = async_client

def invoke_all(self, event, sync_handlers, async_handlers):
return inject_with_event_http_client(sync_handlers, event, self.sync_client) + asyncio.run(
on_change_async(
inject_with_event_http_client(async_handlers, event, self.async_client)))


async def on_change_async(async_handlers):
return_values = await asyncio.gather(*async_handlers, return_exceptions=True)

for index, return_value in enumerate(return_values):
if isinstance(return_value, BaseException):
logging.exception(
'execution of coroutine \'%s\' failed with exception %s',
async_handlers[index].__name__,
repr(return_value),
exc_info=(return_value.__class__, return_value, return_value.__traceback__))

return return_values
89 changes: 89 additions & 0 deletions tests/test_executor_utilities.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
import asyncio
import time
from functools import partial

from orthanc_ext.executor_utilities import SequentialHybridExecutor, AsyncOnlyExecutor
from tests.test_event_dispatcher import ChangeEvent


def test_SequentialHybridExecutor_should_invoke_both_sync_async_handlers_and_return_the_result():
s_client = object()
a_client = object
dispatcher = SequentialHybridExecutor(s_client, a_client)
change_event = ChangeEvent()
start = time.perf_counter()
assert dispatcher.invoke_all(
change_event, [lambda event, client: (42, event, client)],
[partial(long_async_func, 43)]) == [(42, change_event, s_client),
(43, change_event, a_client)]
end = time.perf_counter()
assert end - start > 0.5, 'invoke_all should wait for async function completion'


async def long_async_func(ret_val, event, client):
await asyncio.sleep(0.5)
return ret_val, event, client


def long_sync_func(ret_val, event, client):
time.sleep(0.5)
return ret_val, event, client


def test_AsyncOnlyExecutor_shall_handle_events_as_they_come_in():
a_client = object()
s_client = object()
dispatcher = AsyncOnlyExecutor(s_client, a_client)
dispatcher.start()
change_event1 = ChangeEvent()
change_event2 = ChangeEvent()
change_event2.resource_id = 'resource-uuid2'

start = time.perf_counter()

task1, _ = dispatcher.invoke_all(change_event1, [], [partial(long_async_func, 42)])
sync_task1, = dispatcher.invoke_all(change_event2, [partial(long_sync_func, 40)], [])
end = time.perf_counter()

task2, _ = dispatcher.invoke_all(change_event2, [], [partial(long_async_func, 43)])

assert task1.result() == [(42, change_event1, a_client)]
assert task2.result() == [(43, change_event2, a_client)]
while not sync_task1.done():
time.sleep(0.1)
assert sync_task1.result() == [(40, change_event2, s_client)]

assert end - start < 0.01, 'invoke_all should never block'


def test_AsyncOnlyExecutor_shall_report_exceptions_with_traceback(caplog):
a_client = object()
dispatcher = AsyncOnlyExecutor(None, a_client)
dispatcher.start()
change_event1 = ChangeEvent()
change_event2 = ChangeEvent()
change_event2.resource_id = 'resource-uuid2'

async def failing_func(ex, *_):
raise ex

ex = Exception('failed')
task1, sync_task_empty = dispatcher.invoke_all(change_event1, [], [partial(failing_func, ex)])
assert task1.result() == [ex]
assert sync_task_empty.result() == []

assert "execution of coroutine 'failing_func' failed with exception" in caplog.records[
0].message
assert "Exception('failed')" in caplog.records[0].message


def test_AsyncOnlyExecutor_shall_report_pending_tasks_on_stop(caplog):
a_client = object()
dispatcher = AsyncOnlyExecutor(None, a_client)
dispatcher.start()
sync_task1, = dispatcher.invoke_all(ChangeEvent(), [partial(long_sync_func, 40)], [])
dispatcher.stop()
assert sync_task1.cancelled()
assert 'about to stop event loop with 1 task(s) pending: ' \
"{<Task pending name='sync_handlers[" in caplog.records[0].message
assert not dispatcher.thread.is_alive()

0 comments on commit 564e925

Please sign in to comment.