Skip to content

Commit

Permalink
Event Hub (#105)
Browse files Browse the repository at this point in the history
This pull request implements EventHub - a central place for all Events in the system
- implemented the base Event class;
- implemented the ObjectRelatedEvent for events related with modification, creation and deletion of objects;
- added a concept of Event Topic;
- EventHub is not yet integrated with the rest of the system.

Closes #11 (Finally)

This is a squashed commit
  • Loading branch information
s-kostyuk committed May 12, 2018
1 parent 06fa5e7 commit 89fb2cb
Show file tree
Hide file tree
Showing 6 changed files with 250 additions and 0 deletions.
Empty file added dpl/events/__init__.py
Empty file.
43 changes: 43 additions & 0 deletions dpl/events/build_object_related_event.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
"""
This module contains functions for construction of ObjectRelatedEvents based on
on a data sent by ObservableServices
"""
from typing import Optional

from dpl.model.domain_id import TDomainId
from dpl.dtos.base_dto import BaseDto
from dpl.services.observable_service import ObservableService, ServiceEventType
from .topic import iterable_to_topic
from .object_related_event import ObjectRelatedEvent


def build_object_related_event(
source: ObservableService,
object_id: TDomainId, event_type: ServiceEventType,
object_dto: Optional[BaseDto],
*,
target_root_topic: str
) -> ObjectRelatedEvent:
"""
Builds an instance of ObjectRelatedEvent based on data received from
an ObservableService
:param source: source of the event
:param object_id: an identifier of an altered object
:param event_type: enum value, specifies what happened to the object
:param object_dto: a DTO of the altered object or None if it was deleted
:param target_root_topic: a root topic to be used for construction
:return: an instance of ObjectRelatedEvent
"""
assert isinstance(source, ObservableService)

last_topic_part = event_type.name
topic_parts = (target_root_topic, object_id, last_topic_part)
topic = iterable_to_topic(topic_parts)

event = ObjectRelatedEvent(
topic=topic,
object_dto=object_dto
)

return event
46 changes: 46 additions & 0 deletions dpl/events/event.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
"""
This module contains a definition of Event - a structure-like class that
carries information about events that happened in the system
"""
import time


class Event(object):
"""
Event class carries information about an event happened in the system.
Base (this) Event class contains only two fields:
- timestamp - when event occurred;
- topic - what the topic (category) of this Event.
All the remaining fields are defined by Event subclasses.
"""
def __init__(self, topic: str):
"""
Constructor. Receives a topic - a hierarchical identifier of a theme,
topic, event type this Event belongs to.
:param topic: a hierarchical topic (category) this Event belongs to
"""
self._timestamp = time.time()
self._topic = topic

@property
def timestamp(self) -> float:
"""
Returns a time moment when this Event was generated
:return: a time moment when this Event was generated in
UNIX time format (floating point number)
"""
return self._timestamp

@property
def topic(self) -> str:
"""
Returns a topic of this Event
:return: a hierarchical topic (category) this Event belongs to
"""
return self._topic
98 changes: 98 additions & 0 deletions dpl/events/event_hub.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
"""
This module contains a definition of EventHub - a central place for processing
of all events in the system
"""
import functools
import warnings
from typing import Type, MutableSet, Callable

from dpl.utils.observer import Observer
from dpl.utils.observable import Observable
from dpl.events.event import Event


def _convert_to_event(source: Observable, *args, **kwargs) -> Event:
"""
A skeleton of a function that converts the specified data received from the
specified event source to an instance of Event
:param source: a source of this event
:param args: positional arguments, an information about event
:param kwargs: keyword arguments, an information about event
:return: an instance of Event
"""
raise NotImplementedError(
"Failed to find an event builder for this source: %s" % source
)


class EventHub(Observer, Observable):
"""
EventHub is a central place for all events in the system. It's responsible
for pre-processing of events, coming from all the services, APIs and other
sources, and their distribution to other subscribers (like services, APIs,
loggers and other interested parties
"""
def __init__(self):
"""
Constructor. Initializes internal variables
"""
self._observers = set() # type: MutableSet[Observer]
self._converter = functools.singledispatch(_convert_to_event)

def update(self, source: Observable, *args, **kwargs) -> None:
"""
A method to be called by event sources if any event was generated.
The source of event is determined by the first parameter of
this method. The number and an exact set of parameters is determined
by the event source. The way of handling such parameters is determined
registered event handlers.
:param source: an object which generated an event
:param args: positional arguments, information about event
:param kwargs: keyword arguments, information about event
:return: None
"""
event = self._converter(source, *args, **kwargs)
self._notify(event)

def subscribe(self, observer: Observer) -> None:
"""
Adds the specified Observer to the list of subscribers
:param observer: an instance of Observer to be added
:return: None
"""
self._observers.add(observer)

def unsubscribe(self, observer: Observer) -> None:
"""
Removes the specified Observer from the list of subscribers
:param observer: an instance of Observer to be deleted
:return: None
"""
self._observers.discard(observer)

def _notify(self, event: Event) -> None:
"""
Sends the specified event to all subscribers of EventHub
:param event: an event to be broadcasted
:return: None
"""
for observer in self._observers:
observer.update(self, event)

def register_handler(self, source_type: Type, handler: Callable) -> None:
"""
Registers a handler method (converter) that will generate Event objects
based on data sent by the specified type of source objects
(Observables)
:param source_type: a type of Observable events from which will be
processed by the specified handler
:param handler: a callable to process events from the specified sources
:return: None
"""
self._converter.register(source_type, handler)
38 changes: 38 additions & 0 deletions dpl/events/object_related_event.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
"""
This module contains a definition of ObjectRelatedEvent - a class for events
that are related to some objects (i.e. to their creation, deletion or
modification).
"""
from typing import Optional

from dpl.dtos.base_dto import BaseDto
from .event import Event


class ObjectRelatedEvent(Event):
"""
Contains information about an event that happened with some object
"""
def __init__(self, topic: str, object_dto: Optional[BaseDto]):
"""
Constructor. Receives information about a topic of event (constructed
like ``object_category/object_id/what_changed`` and an object DTO -
representation of the current state of the object.
:param topic: a topic (category) of this Event
:param object_dto: a current state of an object or None if it was
deleted
"""
super().__init__(topic)
self._object_dto = object_dto

@property
def object_dto(self) -> Optional[BaseDto]:
"""
Returns the current DTO (representation) of the object this Event is
related to
:return: the current DTO (representation) of the object this Event is
related to
"""
return self._object_dto
25 changes: 25 additions & 0 deletions dpl/events/topic.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
"""
This module contains methods for working with topics
"""

import typing


def topic_to_list(topic: str) -> typing.List[str]:
"""
Converts the specified topic to the corresponding list
:param topic: a tuple to be converted
:return: a tuple which corresponds to the specified topic
"""
return topic.split('/')


def iterable_to_topic(iterable: typing.Iterable[str]) -> str:
"""
Converts a content of iterable to the topic string
:param iterable: an iterable to be converted
:return: topic as a string
"""
return '/'.join(iterable)

0 comments on commit 89fb2cb

Please sign in to comment.