Skip to content

Commit

Permalink
poc: message bus implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
Pablo Panero committed Feb 4, 2022
1 parent 76274aa commit f1335f5
Show file tree
Hide file tree
Showing 7 changed files with 164 additions and 0 deletions.
2 changes: 2 additions & 0 deletions invenio_records_resources/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,5 @@
SITE_UI_URL = "https://127.0.0.1:5000"

SITE_API_URL = "https://127.0.0.1:5000/api"

RECORDS_RESOURCES_EVENT_HANDLERS = {}
19 changes: 19 additions & 0 deletions invenio_records_resources/services/events/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# -*- coding: utf-8 -*-
#
# Copyright (C) 2022 CERN.
#
# Invenio-Records-Resources is free software; you can redistribute it and/or
# modify it under the terms of the MIT License; see LICENSE file for more
# details.

"""Module for event driven actions support."""

from .bus import EventBus
from .events import Event
from .handlers import EventHandler

__all__ = (
"Event",
"EventHandler",
"EventBus"
)
34 changes: 34 additions & 0 deletions invenio_records_resources/services/events/bus.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
# -*- coding: utf-8 -*-
#
# Copyright (C) 2022 CERN.
#
# Invenio-Records-Resources is free software; you can redistribute it and/or
# modify it under the terms of the MIT License; see LICENSE file for more
# details.

"""Message bus module."""

from flask import current_app


class EventBus:
"""Event bus."""

def __init__(self, handlers, queue):
"""Constructor."""
self._handlers = handlers
self._queue = queue

def publish(self, event):
"""Publish an event to the bus."""
return self._queue.publish(event)

def handle_events(self, uow):
"""Handle a list of events."""
for event in self._queue.consume():
try:
handlers = self._handlers[event]
for handler in handlers:
handler.handle(event, uow)
except KeyError:
current_app.logger.error(f"No handler for event {event}")
29 changes: 29 additions & 0 deletions invenio_records_resources/services/events/events.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# -*- coding: utf-8 -*-
#
# Copyright (C) 2022 CERN.
#
# Invenio-Records-Resources is free software; you can redistribute it and/or
# modify it under the terms of the MIT License; see LICENSE file for more
# details.

"""Events module."""

from dataclasses import dataclass
from datetime import datetime


@dataclass
class Event:
"""Base event."""

created: datetime


@dataclass
class RecordEvent(Event):
"""Record related events."""

recid: str
# FIXME: should be an enum (created, deleted, published)
# or splitted in many events
action: str
20 changes: 20 additions & 0 deletions invenio_records_resources/services/events/handlers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# -*- coding: utf-8 -*-
#
# Copyright (C) 2022 CERN.
#
# Invenio-Records-Resources is free software; you can redistribute it and/or
# modify it under the terms of the MIT License; see LICENSE file for more
# details.

"""Event handlers module."""

from abc import ABC, abstractmethod


class EventHandler(ABC):
"""Abstract event handler class."""

@abstractmethod
def handle(self, event, uow):
"""Handle an event."""
pass
43 changes: 43 additions & 0 deletions invenio_records_resources/services/events/queue.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
# -*- coding: utf-8 -*-
#
# Copyright (C) 2022 CERN.
#
# Invenio-Records-Resources is free software; you can redistribute it and/or
# modify it under the terms of the MIT License; see LICENSE file for more
# details.

"""Event queue module."""

from abc import ABC, abstractmethod
from queue import Empty, SimpleQueue


class Queue(ABC):
"""Base queue."""

@abstractmethod
def publish(self, event):
"""Publish and event to the queue."""

@abstractmethod
def consume(self):
"""Consume an event from the queue."""


class MemoryQueue(Queue):
"""In memory queue."""

def __init__(self):
"""Constructor."""
self._events = SimpleQueue()

def publish(self, event):
"""Publish and event to the queue."""
return self._events.put(event)

def consume(self):
"""Consume an event from the queue."""
try:
yield self._events.get(block=False)
except Empty:
yield None
17 changes: 17 additions & 0 deletions invenio_records_resources/services/uow.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,12 @@ def on_commit(self, uow):

from functools import wraps

from flask import current_app
from invenio_db import db

from .events import EventBus
from .events.queue import MemoryQueue


#
# Unit of work operations
Expand Down Expand Up @@ -215,6 +219,10 @@ def __init__(self, session=None):
"""Initialize unit of work context."""
self._session = session or db.session
self._operations = []
self._event_bus = EventBus(
handlers=current_app.config["RECORDS_RESOURCES_EVENT_HANDLERS"],
queue=MemoryQueue()
)
self._dirty = False

def __enter__(self):
Expand Down Expand Up @@ -267,6 +275,14 @@ def register(self, op):
# Append to list of operations.
self._operations.append(op)

def add_event(self, event):
"""Adds an event."""
self._event_bus.publish(event)

def handle_events(self):
"""Triggers the handling of all stored events."""
self._event_bus.handle_events(uow=self)


def unit_of_work(**kwargs):
"""Decorator to auto-inject a unit of work if not provided.
Expand All @@ -291,6 +307,7 @@ def inner(self, *args, **kwargs):
kwargs['uow'] = uow
res = f(self, *args, **kwargs)
uow.commit()
uow.handle_events()
return res
else:
return f(self, *args, **kwargs)
Expand Down

0 comments on commit f1335f5

Please sign in to comment.