diff --git a/invenio_records_resources/config.py b/invenio_records_resources/config.py index 10c0c21e..7aa35a13 100644 --- a/invenio_records_resources/config.py +++ b/invenio_records_resources/config.py @@ -1,6 +1,6 @@ # -*- coding: utf-8 -*- # -# Copyright (C) 2020 CERN. +# Copyright (C) 2020-2022 CERN. # Copyright (C) 2020 Northwestern University. # # Invenio-Records-Resources is free software; you can redistribute it and/or @@ -16,3 +16,7 @@ SITE_UI_URL = "https://127.0.0.1:5000" SITE_API_URL = "https://127.0.0.1:5000/api" + +RECORDS_RESOURCES_EVENTS_HANDLERS = {} + +RECORDS_RESOURCES_EVENTS_QUEUE = "events" diff --git a/invenio_records_resources/services/events/__init__.py b/invenio_records_resources/services/events/__init__.py new file mode 100644 index 00000000..9041db44 --- /dev/null +++ b/invenio_records_resources/services/events/__init__.py @@ -0,0 +1,17 @@ +# -*- coding: utf-8 -*- +# +# Copyright (C) 2022 CERN. +# +# Invenio-Records-Resources is free software; you can redistribute it and/or +# modify it under the terms of the MIT License; see LICENSE file for more +# details. + +"""Module for event driven actions support.""" + +from .bus import EventBus +from .events import Event + +__all__ = ( + "Event", + "EventBus" +) diff --git a/invenio_records_resources/services/events/bus.py b/invenio_records_resources/services/events/bus.py new file mode 100644 index 00000000..36b9c62a --- /dev/null +++ b/invenio_records_resources/services/events/bus.py @@ -0,0 +1,43 @@ +# -*- coding: utf-8 -*- +# +# Copyright (C) 2022 CERN. +# +# Invenio-Records-Resources is free software; you can redistribute it and/or +# modify it under the terms of the MIT License; see LICENSE file for more +# details. + +"""Events bus module.""" + +from pickle import dumps, loads + +from flask import current_app +from invenio_queues.proxies import current_queues + + +class EventBus: + """Event bus.""" + + def __init__(self, queue_name=None): + """Constructor.""" + self._queue_name = queue_name or \ + current_app.config["RECORDS_RESOURCES_EVENTS_QUEUE"] + self._queue = None + + for name, queue in current_queues.queues.items(): + if name == self._queue_name: + self._queue = queue + break + + def publish(self, event): + """Publish an event to the bus queue.""" + return self._queue.publish([dumps(event)]) + + def consume(self): + """Consume an event from the bus queue.""" + for event in self._queue.consume(): # consume() returns a generator + yield loads(event) + + def active_consumer(self): + """Returns a consumer that stays open.""" + # TODO: see usage in handlers.py + pass diff --git a/invenio_records_resources/services/events/events.py b/invenio_records_resources/services/events/events.py new file mode 100644 index 00000000..e143c9a0 --- /dev/null +++ b/invenio_records_resources/services/events/events.py @@ -0,0 +1,40 @@ +# -*- coding: utf-8 -*- +# +# Copyright (C) 2022 CERN. +# +# Invenio-Records-Resources is free software; you can redistribute it and/or +# modify it under the terms of the MIT License; see LICENSE file for more +# details. + +"""Events module.""" + +from dataclasses import dataclass +from datetime import datetime +from typing import ClassVar + + +@dataclass +class Event: + """Base event.""" + + created: datetime + type: str + action: str + handling_key: str + + +@dataclass +class RecordEvent(Event): + """Record related events.""" + + recid: str + type: ClassVar[str] = "RECORD" + handling_key: ClassVar[str] = "RECORD" + + +@dataclass +class RecordCreatedEvent(RecordEvent): + """Record related events.""" + + action: ClassVar[str] = "PUBLISHED" + handling_key: ClassVar[str] = f"{RecordEvent.type}.{action}" diff --git a/invenio_records_resources/services/events/handlers.py b/invenio_records_resources/services/events/handlers.py new file mode 100644 index 00000000..4f444722 --- /dev/null +++ b/invenio_records_resources/services/events/handlers.py @@ -0,0 +1,81 @@ +# -*- coding: utf-8 -*- +# +# Copyright (C) 2022 CERN. +# +# Invenio-Records-Resources is free software; you can redistribute it and/or +# modify it under the terms of the MIT License; see LICENSE file for more +# details. + +"""Event handlers module.""" + +from dataclasses import asdict +from datetime import datetime + +from celery import shared_task +from flask import current_app + +from .bus import EventBus + + +def _handlers_for_key(key): + """Returns the handlers for a key.""" + config_handlers = current_app.config["RECORDS_RESOURCES_EVENTS_HANDLERS"] + keys_parts = key.split(".") + + event_handlers = [] + curr_key = "" + for part in keys_parts: + curr_key = f"{curr_key}.{part}" + try: + event_handlers.expand(config_handlers[curr_key]) + except KeyError: + current_app.logger.warning(f"No handler for key {curr_key}") + + return event_handlers + + +def _handle_event(event, handler=None): + """Executes the handlers configured for an event.""" + handlers = _handlers_for_key(event.handling_key) + + for handler in handlers: + func = handler + async_ = True + if isinstance(handler, tuple): + func = handler[0] + async_ = handler[1] + + if async_: + func.delay(**asdict(event)) + else: + func(**asdict(event)) + + # audit logging + current_app.logger.info( + f"{event.type}-{event.action} handled successfully." + ) + + +@shared_task(ignore_result=True) +def handle_events(queue_name=None, max_events=1000, ttl=300): + """Handle events queue. + + :param max_events: maximum number of events to process by the task. + :param ttl: time to live (in seconds) for the task. + """ + bus = EventBus(queue_name) + start = datetime.timestamp(datetime.now()) + end = start + spawn_new = False + with bus.active_consumer() as consumer: + while max_events > 0 and (start + ttl) > end: + spawn_new = False + event = consumer.consume() # blocking + _handle_event(event) # execute all handlers + end = datetime.timestamp(datetime.now()) + spawn_new = True + + if spawn_new: + handle_events.delay( + queue_name=queue_name, max_events=max_events, ttl=ttl + ) diff --git a/invenio_records_resources/services/uow.py b/invenio_records_resources/services/uow.py index 488e73c5..d804dfce 100644 --- a/invenio_records_resources/services/uow.py +++ b/invenio_records_resources/services/uow.py @@ -106,6 +106,8 @@ def on_commit(self, uow): from invenio_db import db +from .events import EventBus + # # Unit of work operations @@ -199,6 +201,21 @@ def on_post_commit(self, uow): self._celery_task.delay(*self._args, **self._kwargs) +class EventOp(Operation): + """A task to send an event. + + All events will be sent after the commit phase. + """ + + def __init__(self, event, *args, **kwargs): + """Constructor.""" + self._event = event + + def on_post_commit(self, uow): + """Publish the event to the bus.""" + uow._event_bus.publish(self._event) + + # # Unit of work context manager # @@ -215,6 +232,7 @@ def __init__(self, session=None): """Initialize unit of work context.""" self._session = session or db.session self._operations = [] + self._event_bus = EventBus() self._dirty = False def __enter__(self): diff --git a/run-tests.sh b/run-tests.sh index 61d6b06a..fc46cdc7 100755 --- a/run-tests.sh +++ b/run-tests.sh @@ -31,7 +31,7 @@ trap cleanup EXIT python -m check_manifest --ignore ".*-requirements.txt" python -m sphinx.cmd.build -qnNW docs docs/_build/html -eval "$(docker-services-cli up --db ${DB:-postgresql} --search ${SEARCH:-elasticsearch} --cache ${CACHE:-redis} --env)" +eval "$(docker-services-cli up --db ${DB:-postgresql} --search ${SEARCH:-elasticsearch} --cache ${CACHE:-redis} --mq ${MQ:-rabbitmq} --env)" python -m pytest $@ tests_exit_code=$? python -m sphinx.cmd.build -qnNW -b doctest docs docs/_build/doctest diff --git a/setup.py b/setup.py index 5eea53b7..a84c052c 100644 --- a/setup.py +++ b/setup.py @@ -67,6 +67,7 @@ "invenio-indexer>=1.2.1", "invenio-jsonschemas>=1.1.3", "invenio-pidstore>=1.2.2", + "invenio-queues>=1.0.0a4", "invenio-records-permissions>=0.13.0,<0.14.0", "invenio-records>=1.6.0", "luqum>=0.11.0", diff --git a/tests/services/events/conftest.py b/tests/services/events/conftest.py new file mode 100644 index 00000000..b3446514 --- /dev/null +++ b/tests/services/events/conftest.py @@ -0,0 +1,59 @@ +# -*- coding: utf-8 -*- +# +# Copyright (C) 2022 CERN. +# +# Invenio-Records-Resources is free software; you can redistribute it and/or +# modify it under the terms of the MIT License; see LICENSE file for more +# details. + +"""Pytest configuration. + +See https://pytest-invenio.readthedocs.io/ for documentation on which test +fixtures are available. +""" + +from datetime import timedelta + +import pytest +from kombu import Exchange + +from invenio_records_resources.services.events.events import \ + RecordCreatedEvent, RecordEvent + + +@pytest.fixture(scope="module") +def app_config(app_config): + """Application configuration.""" + # handlers + app_config["RECORDS_RESOURCES_EVENTS_HANDLERS"] = { + RecordEvent.handling_key: [], + RecordCreatedEvent.handling_key: [ + # (sync_handler_task, True), + # (explicit_asyn_handler_task, False), + # implicit_asyn_handler_task, + ], + } + + # events queue + queue_name = "test-events" + exchange = Exchange( + queue=queue_name, + type="direct", + delivery_mode="persistent", # in-memory and disk + ) + + app_config["RECORDS_RESOURCES_EVENT_QUEUE"] = queue_name + app_config["QUEUES_DEFINITIONS"] = [ + {"name": queue_name, "exchange": exchange} + ] + + # celery config + app_config["CELERY_ACCEPT_CONTENT"] = ["json", "msgpack", "yaml", "pickle"] + app_config["CELERYBEAT_SCHEDULE"] = { + 'event_handling': { + 'task': 'invenio_records_resources.services.events.handle_events', + 'schedule': timedelta(minutes=5), + }, + } + + return app_config diff --git a/tests/services/events/test_bus.py b/tests/services/events/test_bus.py new file mode 100644 index 00000000..c9574d56 --- /dev/null +++ b/tests/services/events/test_bus.py @@ -0,0 +1,25 @@ +# -*- coding: utf-8 -*- +# +# Copyright (C) 2022 CERN. +# +# Invenio-Records-Resources is free software; you can redistribute it and/or +# modify it under the terms of the MIT License; see LICENSE file for more +# details. + +"""Event bus test.""" + +from datetime import datetime +from time import sleep + +from invenio_records_resources.services.events import EventBus +from invenio_records_resources.services.events.events import RecordCreatedEvent + + +def test_bus_publish_consume(app): + bus = EventBus("test-events") + event = RecordCreatedEvent(created=datetime.now(), recid="12345-abcde") + + bus.publish(event) + sleep(10) + consumed_event = bus.consume() + assert event == next(consumed_event)