Skip to content

Commit

Permalink
poc: event bus implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
Pablo Panero committed Feb 10, 2022
1 parent 76274aa commit 60fba01
Show file tree
Hide file tree
Showing 11 changed files with 193 additions and 3 deletions.
6 changes: 5 additions & 1 deletion invenio_records_resources/config.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# -*- coding: utf-8 -*-
#
# Copyright (C) 2020 CERN.
# Copyright (C) 2020-2022 CERN.
# Copyright (C) 2020 Northwestern University.
#
# Invenio-Records-Resources is free software; you can redistribute it and/or
Expand All @@ -16,3 +16,7 @@
SITE_UI_URL = "https://127.0.0.1:5000"

SITE_API_URL = "https://127.0.0.1:5000/api"

RECORDS_RESOURCES_EVENTS_HANDLERS = {}

RECORDS_RESOURCES_EVENTS_QUEUE = "events"
19 changes: 19 additions & 0 deletions invenio_records_resources/services/events/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# -*- coding: utf-8 -*-
#
# Copyright (C) 2022 CERN.
#
# Invenio-Records-Resources is free software; you can redistribute it and/or
# modify it under the terms of the MIT License; see LICENSE file for more
# details.

"""Module for event driven actions support."""

from .bus import EventBus
from .events import Event
from .handlers import EventHandler

__all__ = (
"Event",
"EventHandler",
"EventBus"
)
34 changes: 34 additions & 0 deletions invenio_records_resources/services/events/bus.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
# -*- coding: utf-8 -*-
#
# Copyright (C) 2022 CERN.
#
# Invenio-Records-Resources is free software; you can redistribute it and/or
# modify it under the terms of the MIT License; see LICENSE file for more
# details.

"""Events bus module."""

from flask import current_app
from invenio_queues.proxies import current_queues

class EventBus:
"""Event bus."""

def __init__(self, queue_name=None):
"""Constructor."""
self._queue_name = queue_name or \
current_app.config["RECORDS_RESOURCES_EVENTS_QUEUE"]
self._queue = None

for queue in current_queues.queues:
if queue.routing_key == self._queue_name:
self._queue = queue
break

def publish(self, event):
"""Publish an event to the bus queue."""
return self._queue.publish([event])

def consume(self):
"""Consume an event from the bus queue."""
yield self._queue.consume()
30 changes: 30 additions & 0 deletions invenio_records_resources/services/events/events.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# -*- coding: utf-8 -*-
#
# Copyright (C) 2022 CERN.
#
# Invenio-Records-Resources is free software; you can redistribute it and/or
# modify it under the terms of the MIT License; see LICENSE file for more
# details.

"""Events module."""

from dataclasses import dataclass


@dataclass
class Event:
"""Base event."""

created: str


@dataclass
class RecordEvent(Event):
"""Record related events."""

recid: str


@dataclass
class RecordPublishedEvent(RecordEvent):
"""Record published event."""
20 changes: 20 additions & 0 deletions invenio_records_resources/services/events/handlers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# -*- coding: utf-8 -*-
#
# Copyright (C) 2022 CERN.
#
# Invenio-Records-Resources is free software; you can redistribute it and/or
# modify it under the terms of the MIT License; see LICENSE file for more
# details.

"""Event handlers module."""

from abc import ABC, abstractmethod


class EventHandler(ABC):
"""Abstract event handler class."""

@abstractmethod
def handle(self, event):
"""Handle an event."""
pass
9 changes: 8 additions & 1 deletion invenio_records_resources/services/records/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@
from ...config import lt_es7
from ..base import LinksTemplate, Service
from ..errors import RevisionIdMismatchError
from ..uow import RecordCommitOp, RecordDeleteOp, unit_of_work
from ..events.events import RecordPublishedEvent
from ..uow import EventOp, RecordCommitOp, RecordDeleteOp, unit_of_work
from .schema import ServiceSchemaWrapper


Expand Down Expand Up @@ -262,6 +263,12 @@ def _create(self, record_cls, identity, data, raise_errors=True, uow=None):
# Persist record (DB and index)
uow.register(RecordCommitOp(record, self.indexer))

# Add events
uow.register(EventOp(RecordPublishedEvent(
created=str(record.created),
recid=str(record.pid.pid_value),
)))

return self.result_item(
self,
identity,
Expand Down
16 changes: 16 additions & 0 deletions invenio_records_resources/services/uow.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,8 @@ def on_commit(self, uow):

from invenio_db import db

from .events import EventBus


#
# Unit of work operations
Expand Down Expand Up @@ -199,6 +201,19 @@ def on_post_commit(self, uow):
self._celery_task.delay(*self._args, **self._kwargs)


class EventOp(Operation):
"""A task to send an event.
All events will be sent after the commit phase.
"""

def __init__(self, event, *args, **kwargs):
self._event = event

def on_post_commit(self, uow):
uow._event_bus.publish(self._event)


#
# Unit of work context manager
#
Expand All @@ -215,6 +230,7 @@ def __init__(self, session=None):
"""Initialize unit of work context."""
self._session = session or db.session
self._operations = []
self._event_bus = EventBus()
self._dirty = False

def __enter__(self):
Expand Down
2 changes: 1 addition & 1 deletion run-tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ trap cleanup EXIT

python -m check_manifest --ignore ".*-requirements.txt"
python -m sphinx.cmd.build -qnNW docs docs/_build/html
eval "$(docker-services-cli up --db ${DB:-postgresql} --search ${SEARCH:-elasticsearch} --cache ${CACHE:-redis} --env)"
eval "$(docker-services-cli up --db ${DB:-postgresql} --search ${SEARCH:-elasticsearch} --cache ${CACHE:-redis} --mq ${MQ:-rabbitmq} --env)"
python -m pytest $@
tests_exit_code=$?
python -m sphinx.cmd.build -qnNW -b doctest docs docs/_build/doctest
Expand Down
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
"invenio-indexer>=1.2.1",
"invenio-jsonschemas>=1.1.3",
"invenio-pidstore>=1.2.2",
"invenio-queues>=1.0.0a4",
"invenio-records-permissions>=0.13.0,<0.14.0",
"invenio-records>=1.6.0",
"luqum>=0.11.0",
Expand Down
34 changes: 34 additions & 0 deletions tests/services/events/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
# -*- coding: utf-8 -*-
#
# Copyright (C) 2022 CERN.
#
# Invenio-Records-Resources is free software; you can redistribute it and/or
# modify it under the terms of the MIT License; see LICENSE file for more
# details.

"""Pytest configuration.
See https://pytest-invenio.readthedocs.io/ for documentation on which test
fixtures are available.
"""

import pytest
from kombu import Exchange


@pytest.fixture(scope="module")
def app_config(app_config):

app_config["RECORDS_RESOURCES_EVENT_QUEUE"] = "test-events"

exchange = Exchange(
queue="test-events",
type="direct",
delivery_mode="persistent", # in-memory and disk
)

app_config["QUEUES_DEFINITIONS"] = [
{"name": "notifications", "exchange": exchange}
]

return app_config
25 changes: 25 additions & 0 deletions tests/services/events/test_bus.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# -*- coding: utf-8 -*-
#
# Copyright (C) 2022 CERN.
#
# Invenio-Records-Resources is free software; you can redistribute it and/or
# modify it under the terms of the MIT License; see LICENSE file for more
# details.

"""Event bus test."""

from invenio_records_resources.services.events import EventBus
from invenio_records_resources.services.events.events import \
RecordPublishedEvent


def test_bus_publish_consume(app):
bus = EventBus("events")
event = RecordPublishedEvent(
created="2022-02-10",
recid="12345-abcde",
)

bus.publish(event)
consumed_event = bus.consume()
assert event == consumed_event

0 comments on commit 60fba01

Please sign in to comment.