From f1335f5233c45d52d6d38cb8825c23f6d2c9d51a Mon Sep 17 00:00:00 2001 From: Pablo Panero Date: Thu, 3 Feb 2022 15:14:46 +0100 Subject: [PATCH] poc: message bus implementation --- invenio_records_resources/config.py | 2 + .../services/events/__init__.py | 19 ++++++++ .../services/events/bus.py | 34 +++++++++++++++ .../services/events/events.py | 29 +++++++++++++ .../services/events/handlers.py | 20 +++++++++ .../services/events/queue.py | 43 +++++++++++++++++++ invenio_records_resources/services/uow.py | 17 ++++++++ 7 files changed, 164 insertions(+) create mode 100644 invenio_records_resources/services/events/__init__.py create mode 100644 invenio_records_resources/services/events/bus.py create mode 100644 invenio_records_resources/services/events/events.py create mode 100644 invenio_records_resources/services/events/handlers.py create mode 100644 invenio_records_resources/services/events/queue.py diff --git a/invenio_records_resources/config.py b/invenio_records_resources/config.py index 10c0c21e..8b74f6be 100644 --- a/invenio_records_resources/config.py +++ b/invenio_records_resources/config.py @@ -16,3 +16,5 @@ SITE_UI_URL = "https://127.0.0.1:5000" SITE_API_URL = "https://127.0.0.1:5000/api" + +RECORDS_RESOURCES_EVENT_HANDLERS = {} diff --git a/invenio_records_resources/services/events/__init__.py b/invenio_records_resources/services/events/__init__.py new file mode 100644 index 00000000..0b0a2278 --- /dev/null +++ b/invenio_records_resources/services/events/__init__.py @@ -0,0 +1,19 @@ +# -*- coding: utf-8 -*- +# +# Copyright (C) 2022 CERN. +# +# Invenio-Records-Resources is free software; you can redistribute it and/or +# modify it under the terms of the MIT License; see LICENSE file for more +# details. + +"""Module for event driven actions support.""" + +from .bus import EventBus +from .events import Event +from .handlers import EventHandler + +__all__ = ( + "Event", + "EventHandler", + "EventBus" +) diff --git a/invenio_records_resources/services/events/bus.py b/invenio_records_resources/services/events/bus.py new file mode 100644 index 00000000..491eae3c --- /dev/null +++ b/invenio_records_resources/services/events/bus.py @@ -0,0 +1,34 @@ +# -*- coding: utf-8 -*- +# +# Copyright (C) 2022 CERN. +# +# Invenio-Records-Resources is free software; you can redistribute it and/or +# modify it under the terms of the MIT License; see LICENSE file for more +# details. + +"""Message bus module.""" + +from flask import current_app + + +class EventBus: + """Event bus.""" + + def __init__(self, handlers, queue): + """Constructor.""" + self._handlers = handlers + self._queue = queue + + def publish(self, event): + """Publish an event to the bus.""" + return self._queue.publish(event) + + def handle_events(self, uow): + """Handle a list of events.""" + for event in self._queue.consume(): + try: + handlers = self._handlers[event] + for handler in handlers: + handler.handle(event, uow) + except KeyError: + current_app.logger.error(f"No handler for event {event}") diff --git a/invenio_records_resources/services/events/events.py b/invenio_records_resources/services/events/events.py new file mode 100644 index 00000000..a6b3c77d --- /dev/null +++ b/invenio_records_resources/services/events/events.py @@ -0,0 +1,29 @@ +# -*- coding: utf-8 -*- +# +# Copyright (C) 2022 CERN. +# +# Invenio-Records-Resources is free software; you can redistribute it and/or +# modify it under the terms of the MIT License; see LICENSE file for more +# details. + +"""Events module.""" + +from dataclasses import dataclass +from datetime import datetime + + +@dataclass +class Event: + """Base event.""" + + created: datetime + + +@dataclass +class RecordEvent(Event): + """Record related events.""" + + recid: str + # FIXME: should be an enum (created, deleted, published) + # or splitted in many events + action: str diff --git a/invenio_records_resources/services/events/handlers.py b/invenio_records_resources/services/events/handlers.py new file mode 100644 index 00000000..9e3143a3 --- /dev/null +++ b/invenio_records_resources/services/events/handlers.py @@ -0,0 +1,20 @@ +# -*- coding: utf-8 -*- +# +# Copyright (C) 2022 CERN. +# +# Invenio-Records-Resources is free software; you can redistribute it and/or +# modify it under the terms of the MIT License; see LICENSE file for more +# details. + +"""Event handlers module.""" + +from abc import ABC, abstractmethod + + +class EventHandler(ABC): + """Abstract event handler class.""" + + @abstractmethod + def handle(self, event, uow): + """Handle an event.""" + pass diff --git a/invenio_records_resources/services/events/queue.py b/invenio_records_resources/services/events/queue.py new file mode 100644 index 00000000..a0dafdbb --- /dev/null +++ b/invenio_records_resources/services/events/queue.py @@ -0,0 +1,43 @@ +# -*- coding: utf-8 -*- +# +# Copyright (C) 2022 CERN. +# +# Invenio-Records-Resources is free software; you can redistribute it and/or +# modify it under the terms of the MIT License; see LICENSE file for more +# details. + +"""Event queue module.""" + +from abc import ABC, abstractmethod +from queue import Empty, SimpleQueue + + +class Queue(ABC): + """Base queue.""" + + @abstractmethod + def publish(self, event): + """Publish and event to the queue.""" + + @abstractmethod + def consume(self): + """Consume an event from the queue.""" + + +class MemoryQueue(Queue): + """In memory queue.""" + + def __init__(self): + """Constructor.""" + self._events = SimpleQueue() + + def publish(self, event): + """Publish and event to the queue.""" + return self._events.put(event) + + def consume(self): + """Consume an event from the queue.""" + try: + yield self._events.get(block=False) + except Empty: + yield None diff --git a/invenio_records_resources/services/uow.py b/invenio_records_resources/services/uow.py index 488e73c5..d62dee79 100644 --- a/invenio_records_resources/services/uow.py +++ b/invenio_records_resources/services/uow.py @@ -104,8 +104,12 @@ def on_commit(self, uow): from functools import wraps +from flask import current_app from invenio_db import db +from .events import EventBus +from .events.queue import MemoryQueue + # # Unit of work operations @@ -215,6 +219,10 @@ def __init__(self, session=None): """Initialize unit of work context.""" self._session = session or db.session self._operations = [] + self._event_bus = EventBus( + handlers=current_app.config["RECORDS_RESOURCES_EVENT_HANDLERS"], + queue=MemoryQueue() + ) self._dirty = False def __enter__(self): @@ -267,6 +275,14 @@ def register(self, op): # Append to list of operations. self._operations.append(op) + def add_event(self, event): + """Adds an event.""" + self._event_bus.publish(event) + + def handle_events(self): + """Triggers the handling of all stored events.""" + self._event_bus.handle_events(uow=self) + def unit_of_work(**kwargs): """Decorator to auto-inject a unit of work if not provided. @@ -291,6 +307,7 @@ def inner(self, *args, **kwargs): kwargs['uow'] = uow res = f(self, *args, **kwargs) uow.commit() + uow.handle_events() return res else: return f(self, *args, **kwargs)