Skip to content

Commit

Permalink
Merge pull request #136 from walkIT-nl/135-improve-asyncio-implementa…
Browse files Browse the repository at this point in the history
…tion

135 improve asyncio implementation
  • Loading branch information
walcovanloon committed Jun 20, 2023
2 parents bef463d + f2ee104 commit 3afdd2b
Show file tree
Hide file tree
Showing 13 changed files with 259 additions and 55 deletions.
1 change: 0 additions & 1 deletion .github/workflows/main.yml
Expand Up @@ -12,7 +12,6 @@ jobs:
- ["3.11", "py311"]
- ["3.10", "py310"]
- ["3.9", "py39"]
- ["3.8", "py38"]
runs-on: ubuntu-latest
name: ${{ matrix.config[1] }}
steps:
Expand Down
2 changes: 2 additions & 0 deletions docker-compose.yml
Expand Up @@ -12,6 +12,8 @@ services:
ORTHANC__SSL_ENABLED: "true"
ORTHANC__SSL_CERTIFICATE: "/ssl/keyAndCert.pem"

ORTHANC__OVERWRITE_INSTANCES: "true"

ORTHANC__PYTHON_VERBOSE: "false"
ORTHANC__HTTP_PORT: "8042"
ORTHANC__PYTHON_SCRIPT: "/python/entry_point.py"
Expand Down
2 changes: 1 addition & 1 deletion orthanc_ext/__init__.py
Expand Up @@ -2,4 +2,4 @@

__author__ = """WalkIT"""
__email__ = 'code@walkit.nl'
__version__ = '3.3.0'
__version__ = '3.4.0'
46 changes: 18 additions & 28 deletions orthanc_ext/event_dispatcher.py
@@ -1,9 +1,9 @@
import asyncio
import inspect
import json
import logging
from dataclasses import dataclass

from orthanc_ext.executor_utilities import SequentialHybridExecutor
from orthanc_ext.http_utilities import create_internal_client, get_rest_api_base_url, \
get_certificate, ClientType
from orthanc_ext.logging_configurator import python_logging
Expand All @@ -15,7 +15,8 @@ def register_event_handlers(
orthanc_module,
sync_client,
async_client=None,
logging_configuration=python_logging):
logging_configuration=python_logging,
handler_executor=SequentialHybridExecutor):
logging_configuration(orthanc_module)

@dataclass
Expand All @@ -39,43 +40,32 @@ def create_type_index(orthanc_type):

event_handlers = {k: ensure_iterable(v) for k, v in event_handlers.items()}

executor = handler_executor(sync_client, async_client)

def unhandled_event_logger(event, _):
logging.debug(f'no handler registered for {event_types[event.change_type]}')

async def on_change_async(async_handlers):
return_values = await asyncio.gather(*async_handlers, return_exceptions=True)

for index, return_value in enumerate(return_values):
if isinstance(return_value, BaseException):
logging.exception(
'execution of %s failed; %s', async_handlers[index], repr(return_value))

return return_values

def get_validated_async_client(async_client):
if async_client is None:
raise ValueError('a configured async_client is required when using async handlers')
return async_client

def OnChange(change_type, resource_type, resource_id):
event = ChangeEvent(change_type, resource_type, resource_id)
handlers = event_handlers.get(change_type, [unhandled_event_logger])

return_values = [
handler(event, sync_client)
for handler in handlers
if not inspect.iscoroutinefunction(handler)
]
async_handlers = [
handler(event, get_validated_async_client(async_client))
for handler in handlers
if inspect.iscoroutinefunction(handler)
]
sync_handlers = get_sync_handlers(handlers)
async_handlers = get_async_handlers(handlers)

return return_values + asyncio.run(on_change_async(async_handlers))
return executor.invoke_all(event, sync_handlers, async_handlers)

orthanc_module.RegisterOnChangeCallback(OnChange)

return executor


def get_async_handlers(handlers):
return [handler for handler in handlers if inspect.iscoroutinefunction(handler)]


def get_sync_handlers(handlers):
return [handler for handler in handlers if not inspect.iscoroutinefunction(handler)]


def create_session(orthanc, client_type=ClientType.SYNC):
config = json.loads(orthanc.GetConfiguration())
Expand Down
103 changes: 103 additions & 0 deletions orthanc_ext/executor_utilities.py
@@ -0,0 +1,103 @@
import asyncio
import logging
from threading import Thread


class AsyncOnlyExecutor:
"""
Delegates sync handlers to an executor, mimicking async execution.
Executes async handlers in an event loop that runs in a separate thread.
For optimal performance, use of only async handlers is preferable.
This executor needs to be started and stopped.
"""

def __init__(self, sync_client, async_client):
self.sync_client = sync_client
self.async_client = async_client
self.loop = asyncio.new_event_loop()

self.tasks = set() # make sure tasks are not garbage collected

def run_loop(loop):
asyncio.set_event_loop(self.loop)
try:
loop.run_forever()
finally:
loop.run_until_complete(loop.shutdown_asyncgens())
loop.close()

self.thread = Thread(target=run_loop, args=(self.loop, ), daemon=True)

def invoke_all(self, event, sync_handlers, async_handlers):
tasks = [
asyncio.run_coroutine_threadsafe(
on_change_async(inject_with_event_http_client(
[handler], event, self.async_client)), self.loop) for handler in async_handlers
]

tasks.append(
self.loop.create_task(
asyncio.to_thread(
inject_with_event_http_client, sync_handlers, event, self.sync_client),
name=f'sync_handlers{sync_handlers}'))

self.tasks.update(tasks)
for task in tasks:
task.add_done_callback(self.tasks.discard)

return tasks

def start(self):
self.thread.start()

def stop(self):
if self.tasks:
logging.warning(
'about to stop event loop with %i task(s) pending: %s', len(self.tasks), self.tasks)
pending = asyncio.all_tasks(self.loop)
for task in pending:
task.cancel()

asyncio.run_coroutine_threadsafe(stop_event_loop_in_thread(self.loop), self.loop)
self.thread.join()


async def stop_event_loop_in_thread(loop):
logging.info('stopping event loop')
loop.stop()


def inject_with_event_http_client(handlers, event, client):
return [handler(event, client) for handler in handlers]


class SequentialHybridExecutor:
"""Blocking event executor that handles both sync and async handlers,
returning the gathered results in a list.
It waits for all async handlers to have completed per received event.
"""

def __init__(self, sync_client, async_client):
self.sync_client = sync_client
self.async_client = async_client

def invoke_all(self, event, sync_handlers, async_handlers):
return inject_with_event_http_client(sync_handlers, event, self.sync_client) + asyncio.run(
on_change_async(
inject_with_event_http_client(async_handlers, event, self.async_client)))


async def on_change_async(async_handlers):
return_values = await asyncio.gather(*async_handlers, return_exceptions=True)

for index, return_value in enumerate(return_values):
if isinstance(return_value, BaseException):
logging.exception(
'execution of coroutine \'%s\' failed with exception %s',
async_handlers[index].__name__,
repr(return_value),
exc_info=(return_value.__class__, return_value, return_value.__traceback__))

return return_values
1 change: 1 addition & 0 deletions requirements_dev.in
Expand Up @@ -13,6 +13,7 @@ pytest
dockercontext
pytest-asyncio
pytest-html
pytest-cov

httpx
pre-commit
Expand Down
42 changes: 25 additions & 17 deletions requirements_dev.txt
Expand Up @@ -29,8 +29,10 @@ charset-normalizer==3.1.0
# via requests
colorama==0.4.6
# via tox
coverage==7.2.7
# via -r requirements_dev.in
coverage[toml]==7.2.7
# via
# -r requirements_dev.in
# pytest-cov
distlib==0.3.6
# via virtualenv
docker==6.1.3
Expand All @@ -45,7 +47,7 @@ exceptiongroup==1.1.1
# via
# anyio
# pytest
filelock==3.12.0
filelock==3.12.2
# via
# tox
# virtualenv
Expand All @@ -68,19 +70,20 @@ idna==3.4
# requests
imagesize==1.4.1
# via sphinx
importlib-metadata==6.6.0
importlib-metadata==6.7.0
# via
# keyring
# twine
# yapf
iniconfig==2.0.0
# via pytest
jaraco-classes==3.2.3
# via keyring
jinja2==3.1.2
# via sphinx
keyring==23.13.1
keyring==24.0.0
# via twine
markdown-it-py==2.2.0
markdown-it-py==3.0.0
# via rich
markupsafe==2.1.3
# via jinja2
Expand All @@ -105,15 +108,16 @@ packaging==23.1
# tox
pkginfo==1.9.6
# via twine
platformdirs==3.5.1
platformdirs==3.6.0
# via
# tox
# virtualenv
# yapf
pluggy==1.0.0
# via
# pytest
# tox
pre-commit==3.3.2
pre-commit==3.3.3
# via -r requirements_dev.in
py==1.11.0
# via pytest-html
Expand All @@ -126,23 +130,26 @@ pygments==2.15.1
# readme-renderer
# rich
# sphinx
pyproject-api==1.5.1
pyproject-api==1.5.2
# via tox
pytest==7.3.1
pytest==7.3.2
# via
# -r requirements_dev.in
# pytest-asyncio
# pytest-cov
# pytest-html
# pytest-metadata
pytest-asyncio==0.21.0
# via -r requirements_dev.in
pytest-cov==4.1.0
# via -r requirements_dev.in
pytest-html==3.2.0
# via -r requirements_dev.in
pytest-metadata==3.0.0
# via pytest-html
pyyaml==6.0
# via pre-commit
readme-renderer==37.3
readme-renderer==40.0
# via twine
requests==2.31.0
# via
Expand All @@ -156,7 +163,7 @@ respx==0.20.1
# via -r requirements_dev.in
rfc3986==2.0.0
# via twine
rich==13.4.1
rich==13.4.2
# via twine
six==1.16.0
# via bleach
Expand All @@ -183,35 +190,36 @@ sphinxcontrib-serializinghtml==1.1.5
# via sphinx
tomli==2.0.1
# via
# coverage
# mypy
# pyproject-api
# pytest
# tox
# yapf
tox==4.6.0
tox==4.6.2
# via -r requirements_dev.in
twine==4.0.2
# via -r requirements_dev.in
typing-extensions==4.6.3
# via mypy
urllib3==2.0.2
urllib3==2.0.3
# via
# docker
# requests
# twine
virtualenv==20.23.0
virtualenv==20.23.1
# via
# pre-commit
# tox
watchdog==3.0.0
# via -r requirements_dev.in
webencodings==0.5.1
# via bleach
websocket-client==1.5.2
websocket-client==1.6.0
# via docker
wheel==0.40.0
# via -r requirements_dev.in
yapf==0.33.0
yapf==0.40.0
# via -r requirements_dev.in
zipp==3.15.0
# via importlib-metadata
Expand Down
2 changes: 1 addition & 1 deletion setup.cfg
@@ -1,5 +1,5 @@
[bumpversion]
current_version = 3.3.0
current_version = 3.4.0
commit = True
tag = True

Expand Down
2 changes: 1 addition & 1 deletion setup.py
Expand Up @@ -51,6 +51,6 @@
test_suite='tests',
tests_require=test_requirements,
url='https://github.com/walkIT-nl/orthanc-server-extensions',
version='3.3.0',
version='3.4.0',
zip_safe=False,
)
5 changes: 4 additions & 1 deletion tests/test_anonymization.py
Expand Up @@ -22,7 +22,10 @@ def test_anonymization_shall_leverage_orthanc_builtin_functionality(caplog):
})
anonymize_series(client, '1.2.3')
assert store.called
assert caplog.messages == ['Anonymized "/series/1.2.3" to "/series/1.2.4"']
assert caplog.messages == [
'HTTP Request: POST https://localhost:8042/series/1.2.3/anonymize "HTTP/1.1 200 OK"',
'Anonymized "/series/1.2.3" to "/series/1.2.4"'
]


@respx.mock
Expand Down

0 comments on commit 3afdd2b

Please sign in to comment.