Skip to content

Commit

Permalink
Merge pull request #132 from walkIT-nl/131-implementEventPublication
Browse files Browse the repository at this point in the history
#131 Add event publication for NATS, RabbitMQ and Kafka; asyncio support and function composition
  • Loading branch information
walcovanloon committed Jan 30, 2023
2 parents 5528705 + 1235657 commit 56a6303
Show file tree
Hide file tree
Showing 27 changed files with 681 additions and 56 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ jobs:
name: ${{ matrix.config[1] }}
steps:
- name: Checkout
uses: actions/checkout@v2
uses: actions/checkout@v3
- name: Setup
uses: actions/setup-python@v2
uses: actions/setup-python@v4
with:
python-version: ${{ matrix.config[0] }}
- name: Install dependencies
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/publish-pypi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ jobs:
steps:
- uses: actions/checkout@main
- name: Set up Python 3.9
uses: actions/setup-python@v1
uses: actions/setup-python@v3
with:
python-version: 3.9
- name: build-distribution
Expand Down
5 changes: 3 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
FROM osimis/orthanc:22.9.0-full

RUN apt-get update && apt-get dist-upgrade -y && apt-get install -y openssl
RUN apt-get update && ACCEPT_EULA=Y apt-get dist-upgrade -y && apt-get install -y openssl

COPY server_cert.cnf .
RUN openssl req -nodes -new -x509 -days 3650 -keyout /etc/ssl/private/server.key -out /etc/ssl/certs/server.pem -config server_cert.cnf
RUN mkdir -p /ssl && cat /etc/ssl/private/server.key /etc/ssl/certs/server.pem > /ssl/keyAndCert.pem

RUN pip3 install httpx # does not get picked up in setup.py
COPY orthanc_ext /python/orthanc_ext
WORKDIR /python
COPY setup.py README.rst HISTORY.rst ./
RUN pip3 install httpx .[nats-event-publisher] # does not get picked up in setup.py
RUN python3 setup.py install
COPY tests/entry_point.py /python/entry_point.py
6 changes: 6 additions & 0 deletions HISTORY.rst
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,12 @@
History
=======

3.3.0 (2022-01-30)
------------------
* Publish Orthanc change events to Kafka, RabbitMQ and NATS
* Run asyncio functions (coroutines) for concurrent processing of a change event
* Chain functions into a pipeline (composition)

3.2.8 (2021-09-18)
------------------
* get_metadata_of_first_instance_of_series() now propagates http errors if /instances call fails.
Expand Down
27 changes: 22 additions & 5 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ Orthanc Server Extensions
:target: https://github.com/walkIT-nl/orthanc-server-extensions/actions/workflows/main.yml
:alt: Build and test status

A simple Orthanc python plugin based framework to extend Orthanc’s feature set with testable python scripts. It focusses on
integration and orchestration scripts, like study routing, event notifications and audit logging.
A simple Orthanc python plugin based event processing framework to extend Orthanc’s feature set. It focuses on
integration and orchestration, like study routing, event notifications and audit logging.


* Free software: GNU Affero General Public License v3
Expand All @@ -27,8 +27,17 @@ integration and orchestration scripts, like study routing, event notifications a

Features
--------
* easily plug event handling scripts for all Orthanc's `change events`_ -
* chain functions into a pipeline (composition)
* run asyncio functions (coroutines) for concurrent processing of a change event
* run (integration) tests for your Orthanc python scripts
* currently supports handling of `change events`_
* publish events to Kafka, RabbitMQ and NATS

Modules
-------
* auto_retries: retry failed jobs
* auto_forward: forward DICOM to external systems based on python match functions
* anonymization: anonymize DICOM Series using the Orthanc API

Why this library was written
----------------------------
Expand All @@ -39,15 +48,23 @@ With this library, you can start from the unit tests, move to integration tests,
Enable testability: the Orthanc API is provided as a module which is not easy to mock in a clean way.
Orthanc server extensions provide a few simple abstractions that keep functions clean and independently testable.

Improve performance: async functions will be executed concurrently, which is advantageous if the processing is I/O bound.

Httpx was chosen as a base library to access the Orthanc API, rather than orthanc.RestApi*, because it is well known,
developer friendly, and external API access avoids deadlocks in the Python plugin (before this was solved in 3.1).


Running
-------

``docker-compose up --build`` should greet you with 'orthanc started event handled!' message and provides the first boilerplate
to get started.
``entry_point.py`` provides the first boilerplate to get started. Run it by issuing
``docker-compose up --build``; you should be greeted with 'orthanc started event handled!' message, which is also published to

Developing
----------

Write your event handling scripts and register them in ``event_dispatcher.register_event_handlers()``. Examples,
including the use of async functions and function composition (pipeline), can be found in ``tests/test_event_dispatcher.py``.


Credits
Expand Down
7 changes: 7 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
version: "3"
services:
nats:
image: nats
command:
- '-js'
orthanc:
image: local/orthanc
build: .
Expand All @@ -15,6 +19,9 @@ services:
ORTHANC__REGISTERED_USERS: |
{"demo": "demo"}
NATS_URL: nats://nats
depends_on:
- nats
ports:
- "127.0.0.1:4242:4242"
- "127.0.0.1:8042:8042"
2 changes: 1 addition & 1 deletion orthanc_ext/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@

__author__ = """WalkIT"""
__email__ = 'code@walkit.nl'
__version__ = '3.2.12'
__version__ = '3.3.0'
48 changes: 39 additions & 9 deletions orthanc_ext/event_dispatcher.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,21 @@
import asyncio
import inspect
import json
import logging
from dataclasses import dataclass

from orthanc_ext.http_utilities import create_internal_client, get_rest_api_base_url, \
get_certificate
get_certificate, ClientType
from orthanc_ext.logging_configurator import python_logging
from orthanc_ext.python_utilities import ensure_iterable, create_reverse_type_dict


def register_event_handlers(
event_handlers, orthanc_module, requests_session, logging_configuration=python_logging):
event_handlers,
orthanc_module,
sync_client,
async_client=None,
logging_configuration=python_logging):
logging_configuration(orthanc_module)

@dataclass
Expand All @@ -36,19 +42,43 @@ def create_type_index(orthanc_type):
def unhandled_event_logger(event, _):
logging.debug(f'no handler registered for {event_types[event.change_type]}')

async def on_change_async(async_handlers):
return_values = await asyncio.gather(*async_handlers, return_exceptions=True)

for index, return_value in enumerate(return_values):
if isinstance(return_value, BaseException):
logging.exception(
'execution of %s failed; %s', async_handlers[index], repr(return_value))

return return_values

def get_validated_async_client(async_client):
if async_client is None:
raise ValueError('a configured async_client is required when using async handlers')
return async_client

def OnChange(change_type, resource_type, resource_id):
event = ChangeEvent(change_type, resource_type, resource_id)
handlers = event_handlers.get(change_type, [unhandled_event_logger])
return_values = []
for handler in handlers:
event = ChangeEvent(change_type, resource_type, resource_id)
return_values.append(handler(event, requests_session))
return return_values

return_values = [
handler(event, sync_client)
for handler in handlers
if not inspect.iscoroutinefunction(handler)
]
async_handlers = [
handler(event, get_validated_async_client(async_client))
for handler in handlers
if inspect.iscoroutinefunction(handler)
]

return return_values + asyncio.run(on_change_async(async_handlers))

orthanc_module.RegisterOnChangeCallback(OnChange)


def create_session(orthanc):
def create_session(orthanc, client_type=ClientType.SYNC):
config = json.loads(orthanc.GetConfiguration())
return create_internal_client(
get_rest_api_base_url(config), orthanc.GenerateRestApiAuthorizationToken(),
get_certificate(config))
get_certificate(config), client_type)
14 changes: 12 additions & 2 deletions orthanc_ext/http_utilities.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from enum import Enum
from typing import Union

import httpx
Expand All @@ -13,8 +14,17 @@ def get_certificate(config):
return False if not config.get('SslEnabled', False) else config.get('SslCertificate', False)


def create_internal_client(base_url, token='', cert: Union[str, bool] = False) -> httpx.Client:
return httpx.Client(
class ClientType(Enum):
SYNC = httpx.Client
ASYNC = httpx.AsyncClient


def create_internal_client(
base_url,
token='',
cert: Union[str, bool] = False,
client_type: ClientType = ClientType.SYNC) -> httpx.Client:
return client_type.value(
base_url=base_url,
timeout=httpx.Timeout(300, connect=30),
verify=cert,
Expand Down
16 changes: 16 additions & 0 deletions orthanc_ext/python_utilities.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,22 @@
from typing import Iterable


def pipeline(*functions):

class Pipeline:

def __call__(self, evt, *args):
arg = evt
for step in functions:
arg = step(arg, *args)
return arg

def __repr__(self):
return f'pipeline({functions})'

return Pipeline()


def ensure_iterable(v):
return v if isinstance(v, Iterable) else [v]

Expand Down
13 changes: 9 additions & 4 deletions orthanc_ext/scripts/auto_retries.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,19 @@ def calculate_delay(job, first_retry=ONE_MINUTE):
return min(max(first_retry, elapsed.seconds * 2), ONE_DAY)


def resubmit_job(client, job_id, delay):
def resubmit_job(client, job_id):
resp = client.post(f'/jobs/{job_id}/resubmit')
resp.raise_for_status()
logging.info(f'resubmitted job "{job_id}"')


def handle_failed_forwarding_job(first_retry=ONE_MINUTE, job_types=RETRYABLE_JOBTYPES):
def python_timer_runner(job_id, delay, client):
timer = threading.Timer(interval=delay, function=resubmit_job, args=[client, job_id])
timer.start()


def handle_failed_forwarding_job(
first_retry=ONE_MINUTE, job_types=RETRYABLE_JOBTYPES, job_runner=python_timer_runner):

def handle_failed_forwarding_job(event, client):
job_id = event.resource_id
Expand All @@ -37,7 +43,6 @@ def handle_failed_forwarding_job(event, client):
return
delay = calculate_delay(job, first_retry)
logging.debug(f'resubmitting job "{job_id}" after {delay} seconds')
timer = threading.Timer(interval=delay, function=resubmit_job, args=[client, job_id, delay])
timer.start()
return job_runner(job_id, delay, client)

return handle_failed_forwarding_job
20 changes: 20 additions & 0 deletions orthanc_ext/scripts/event_publisher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import dataclasses

from cloudevents.conversion import to_structured, from_http
from cloudevents.http import CloudEvent


def create_valid_orthanc_cloud_event(evt):
return CloudEvent.create({
'type': 'orthanc-server-extensions.change-event',
'source': 'https://orthanc-server-identifer'
},
data=dataclasses.asdict(evt))


def convert_change_event_to_message(evt) -> tuple:
return to_structured(create_valid_orthanc_cloud_event(evt))


def convert_message_to_change_event(headers: dict, data: bytes):
return from_http(CloudEvent, headers, data=data)
31 changes: 31 additions & 0 deletions orthanc_ext/scripts/kafka_event_publisher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
from dataclasses import dataclass

from aiokafka import AIOKafkaProducer
from kafka.admin import KafkaAdminClient, NewTopic

from orthanc_ext.scripts.event_publisher import convert_change_event_to_message


@dataclass
class KafkaConfig:
bootstrap_server: str
topic: str = 'orthanc-events'


async def publish_to_kafka(kafka_config: KafkaConfig, evt, _):
producer = AIOKafkaProducer(
security_protocol='PLAINTEXT', bootstrap_servers=kafka_config.bootstrap_server)
await producer.start()
try:
_, event = convert_change_event_to_message(evt)
await producer.send_and_wait(kafka_config.topic, event)

finally:
await producer.stop()


def create_stream(kafka_config: KafkaConfig, *_):
admin_client = KafkaAdminClient(bootstrap_servers=kafka_config.bootstrap_server)
admin_client.create_topics(
new_topics=[NewTopic(name=kafka_config.topic, num_partitions=1, replication_factor=1)],
validate_only=False)
31 changes: 31 additions & 0 deletions orthanc_ext/scripts/nats_event_publisher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
from dataclasses import dataclass

import nats

from orthanc_ext.scripts.event_publisher import convert_change_event_to_message


@dataclass
class NatsConfig:
url: str
stream_name: str = 'orthanc-events'
subject = 'onchange'


async def create_stream(nats_config: NatsConfig, *_):
nc = await nats.connect(nats_config.url)
try:
js = nc.jetstream()
await js.add_stream(name=nats_config.stream_name, subjects=[nats_config.subject])
finally:
await nc.close()


async def publish_to_nats(nats_config: NatsConfig, evt, *_):
nc = await nats.connect(nats_config.url)
try:
js = nc.jetstream()
_, message = convert_change_event_to_message(evt)
return await js.publish(nats_config.subject, message, stream=nats_config.stream_name)
finally:
await nc.close()

0 comments on commit 56a6303

Please sign in to comment.