Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[POC]: event bus implementation #300

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 5 additions & 1 deletion invenio_records_resources/config.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# -*- coding: utf-8 -*-
#
# Copyright (C) 2020 CERN.
# Copyright (C) 2020-2022 CERN.
# Copyright (C) 2020 Northwestern University.
#
# Invenio-Records-Resources is free software; you can redistribute it and/or
Expand All @@ -16,3 +16,7 @@
SITE_UI_URL = "https://127.0.0.1:5000"

SITE_API_URL = "https://127.0.0.1:5000/api"

RECORDS_RESOURCES_EVENTS_HANDLERS = {}

RECORDS_RESOURCES_EVENTS_QUEUE = "events"
17 changes: 17 additions & 0 deletions invenio_records_resources/services/events/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# -*- coding: utf-8 -*-
#
# Copyright (C) 2022 CERN.
#
# Invenio-Records-Resources is free software; you can redistribute it and/or
# modify it under the terms of the MIT License; see LICENSE file for more
# details.

"""Module for event driven actions support."""

from .bus import EventBus
from .events import Event

__all__ = (
"Event",
"EventBus"
)
43 changes: 43 additions & 0 deletions invenio_records_resources/services/events/bus.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
# -*- coding: utf-8 -*-
#
# Copyright (C) 2022 CERN.
#
# Invenio-Records-Resources is free software; you can redistribute it and/or
# modify it under the terms of the MIT License; see LICENSE file for more
# details.

"""Events bus module."""

from pickle import dumps, loads

from flask import current_app
from invenio_queues.proxies import current_queues


class EventBus:
"""Event bus."""

def __init__(self, queue_name=None):
"""Constructor."""
self._queue_name = queue_name or \
current_app.config["RECORDS_RESOURCES_EVENTS_QUEUE"]
self._queue = None

for name, queue in current_queues.queues.items():
if name == self._queue_name:
self._queue = queue
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will the queue always be found, or can it exit the for loop and stay None?

break

def publish(self, event):
"""Publish an event to the bus queue."""
return self._queue.publish([dumps(event)])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would make configurable the serialization obj and not hardcode pickle. Celery does the same. We might want to use simple JSON or msgpack instead.


def consume(self):
"""Consume an event from the bus queue."""
for event in self._queue.consume(): # consume() returns a generator
yield loads(event)

def active_consumer(self):
"""Returns a consumer that stays open."""
# TODO: see usage in handlers.py
pass
40 changes: 40 additions & 0 deletions invenio_records_resources/services/events/events.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# -*- coding: utf-8 -*-
#
# Copyright (C) 2022 CERN.
#
# Invenio-Records-Resources is free software; you can redistribute it and/or
# modify it under the terms of the MIT License; see LICENSE file for more
# details.

"""Events module."""

from dataclasses import dataclass
from datetime import datetime
from typing import ClassVar


@dataclass
class Event:
"""Base event."""

created: datetime
type: str
action: str
handling_key: str


@dataclass
class RecordEvent(Event):
"""Record related events."""

recid: str
type: ClassVar[str] = "RECORD"
handling_key: ClassVar[str] = "RECORD"


@dataclass
class RecordCreatedEvent(RecordEvent):
"""Record related events."""

action: ClassVar[str] = "PUBLISHED"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if RecordCreatedEvent is always for published, not sure that the action field is useful?

Is the handling_key necessary? Isn't it the class name/import enough? Unless there could be multiple class events with the same handling_key.

handling_key: ClassVar[str] = f"{RecordEvent.type}.{action}"
81 changes: 81 additions & 0 deletions invenio_records_resources/services/events/handlers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
# -*- coding: utf-8 -*-
#
# Copyright (C) 2022 CERN.
#
# Invenio-Records-Resources is free software; you can redistribute it and/or
# modify it under the terms of the MIT License; see LICENSE file for more
# details.

"""Event handlers module."""

from dataclasses import asdict
from datetime import datetime

from celery import shared_task
from flask import current_app

from .bus import EventBus


def _handlers_for_key(key):
"""Returns the handlers for a key."""
config_handlers = current_app.config["RECORDS_RESOURCES_EVENTS_HANDLERS"]
keys_parts = key.split(".")

event_handlers = []
curr_key = ""
for part in keys_parts:
curr_key = f"{curr_key}.{part}"
try:
event_handlers.expand(config_handlers[curr_key])
except KeyError:
current_app.logger.warning(f"No handler for key {curr_key}")

return event_handlers


def _handle_event(event, handler=None):
"""Executes the handlers configured for an event."""
handlers = _handlers_for_key(event.handling_key)

for handler in handlers:
func = handler
async_ = True
if isinstance(handler, tuple):
func = handler[0]
async_ = handler[1]

if async_:
func.delay(**asdict(event))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may want to pass more params to the delay func: wouldn't it be better to always have a sync func to handle the event, and that it eventually spawns an async task?

def hander(...):
    my_async_handler.delay(....)

else:
func(**asdict(event))

# audit logging
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is not really audit logging this one. I would suggest creating a dedicate logger for the events so it can be configured ad-hoc.

current_app.logger.info(
f"{event.type}-{event.action} handled successfully."
)


@shared_task(ignore_result=True)
def handle_events(queue_name=None, max_events=1000, ttl=300):
"""Handle events queue.

:param max_events: maximum number of events to process by the task.
:param ttl: time to live (in seconds) for the task.
"""
bus = EventBus(queue_name)
start = datetime.timestamp(datetime.now())
end = start
spawn_new = False
with bus.active_consumer() as consumer:
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea here is to have one open consumer for the duration of the task. We could potentially consume() but it might become 1000 consumers, even if they are light objects I think it is safe to open an active one here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there a specific need for this?

while max_events > 0 and (start + ttl) > end:
Copy link
Member Author

@ppanero ppanero Feb 11, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did not find any configuration that would allow killing a consumer after a determined amount of events or time elapsed (in a similar fashion than uWSGI does). So I have implemented it a bit roughly for PoC purposes.

spawn_new = False
event = consumer.consume() # blocking
_handle_event(event) # execute all handlers
end = datetime.timestamp(datetime.now())
spawn_new = True

if spawn_new:
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Meaning,

  • If the worker died while consuming events spawn a new task, since there are events to be consumed, and is better not to wait.
  • Otherwise, it died while waiting for events, in which case is better to wait until celery beat spawns it again.

handle_events.delay(
queue_name=queue_name, max_events=max_events, ttl=ttl
)
18 changes: 18 additions & 0 deletions invenio_records_resources/services/uow.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,8 @@ def on_commit(self, uow):

from invenio_db import db

from .events import EventBus


#
# Unit of work operations
Expand Down Expand Up @@ -199,6 +201,21 @@ def on_post_commit(self, uow):
self._celery_task.delay(*self._args, **self._kwargs)


class EventOp(Operation):
"""A task to send an event.

All events will be sent after the commit phase.
"""

def __init__(self, event, *args, **kwargs):
"""Constructor."""
self._event = event

def on_post_commit(self, uow):
"""Publish the event to the bus."""
uow._event_bus.publish(self._event)


#
# Unit of work context manager
#
Expand All @@ -215,6 +232,7 @@ def __init__(self, session=None):
"""Initialize unit of work context."""
self._session = session or db.session
self._operations = []
self._event_bus = EventBus()
self._dirty = False

def __enter__(self):
Expand Down
2 changes: 1 addition & 1 deletion run-tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ trap cleanup EXIT

python -m check_manifest --ignore ".*-requirements.txt"
python -m sphinx.cmd.build -qnNW docs docs/_build/html
eval "$(docker-services-cli up --db ${DB:-postgresql} --search ${SEARCH:-elasticsearch} --cache ${CACHE:-redis} --env)"
eval "$(docker-services-cli up --db ${DB:-postgresql} --search ${SEARCH:-elasticsearch} --cache ${CACHE:-redis} --mq ${MQ:-rabbitmq} --env)"
python -m pytest $@
tests_exit_code=$?
python -m sphinx.cmd.build -qnNW -b doctest docs docs/_build/doctest
Expand Down
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
"invenio-indexer>=1.2.1",
"invenio-jsonschemas>=1.1.3",
"invenio-pidstore>=1.2.2",
"invenio-queues>=1.0.0a4",
"invenio-records-permissions>=0.13.0,<0.14.0",
"invenio-records>=1.6.0",
"luqum>=0.11.0",
Expand Down
59 changes: 59 additions & 0 deletions tests/services/events/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
# -*- coding: utf-8 -*-
#
# Copyright (C) 2022 CERN.
#
# Invenio-Records-Resources is free software; you can redistribute it and/or
# modify it under the terms of the MIT License; see LICENSE file for more
# details.

"""Pytest configuration.

See https://pytest-invenio.readthedocs.io/ for documentation on which test
fixtures are available.
"""

from datetime import timedelta

import pytest
from kombu import Exchange

from invenio_records_resources.services.events.events import \
RecordCreatedEvent, RecordEvent


@pytest.fixture(scope="module")
def app_config(app_config):
"""Application configuration."""
# handlers
app_config["RECORDS_RESOURCES_EVENTS_HANDLERS"] = {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Most of this configuration would go to invenio-events and be overwritten in invenio-app-rdm. It is here for demo purposes.

RecordEvent.handling_key: [],
RecordCreatedEvent.handling_key: [
# (sync_handler_task, True),
# (explicit_asyn_handler_task, False),
# implicit_asyn_handler_task,
],
}

# events queue
queue_name = "test-events"
exchange = Exchange(
queue=queue_name,
type="direct",
delivery_mode="persistent", # in-memory and disk
)

app_config["RECORDS_RESOURCES_EVENT_QUEUE"] = queue_name
app_config["QUEUES_DEFINITIONS"] = [
{"name": queue_name, "exchange": exchange}
]

# celery config
app_config["CELERY_ACCEPT_CONTENT"] = ["json", "msgpack", "yaml", "pickle"]
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@slint the only way I found was to globally accept pickle.... which might be a problem...

app_config["CELERYBEAT_SCHEDULE"] = {
'event_handling': {
'task': 'invenio_records_resources.services.events.handle_events',
'schedule': timedelta(minutes=5),
},
}

return app_config
25 changes: 25 additions & 0 deletions tests/services/events/test_bus.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# -*- coding: utf-8 -*-
#
# Copyright (C) 2022 CERN.
#
# Invenio-Records-Resources is free software; you can redistribute it and/or
# modify it under the terms of the MIT License; see LICENSE file for more
# details.

"""Event bus test."""

from datetime import datetime
from time import sleep

from invenio_records_resources.services.events import EventBus
from invenio_records_resources.services.events.events import RecordCreatedEvent


def test_bus_publish_consume(app):
bus = EventBus("test-events")
event = RecordCreatedEvent(created=datetime.now(), recid="12345-abcde")

bus.publish(event)
sleep(10)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sometimes fails, sometimes doesn't I think we are falling once again in the issues we had with ES (fixed now with .index.refresh()).

consumed_event = bus.consume()
assert event == next(consumed_event)