Skip to content

Commit

Permalink
Enabled EventHub in mainline codebase
Browse files Browse the repository at this point in the history
  • Loading branch information
s-kostyuk committed May 16, 2018
1 parent e40bdc6 commit c0c24ba
Showing 1 changed file with 43 additions and 0 deletions.
43 changes: 43 additions & 0 deletions dpl/core/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import os
import logging
import argparse
import functools

# Include 3rd-party modules
from sqlalchemy import create_engine
Expand Down Expand Up @@ -35,6 +36,9 @@

from dpl.utils.simple_interceptor import SimpleInterceptor

from dpl.events.event_hub import EventHub
from dpl.events.build_object_related_event import build_object_related_event

from dpl.api.rest_api.things_subapp import build_things_subapp
from dpl.api.rest_api.placements_subapp import build_placements_subapp
from dpl.api.http_api_provider import HttpApiProvider
Expand Down Expand Up @@ -157,6 +161,13 @@ def __init__(self):

api_context_data = {'auth_context': self._auth_context}

self._event_hub = EventHub()
self._setup_event_hub(self._event_hub)

self._user_service_raw.subscribe(self._event_hub)
self._placement_service_raw.subscribe(self._event_hub)
self._thing_service_raw.subscribe(self._event_hub)

self._rest_api_things = build_things_subapp(
thing_service=self._thing_service,
additional_data=api_context_data
Expand Down Expand Up @@ -188,6 +199,38 @@ def __init__(self):
if 'local_announce' in self._apis_config['enabled_apis']:
self._initialize_local_announcement()

@staticmethod
def _setup_event_hub(event_hub: EventHub) -> None:
"""
Sets up EventHub to handle notifications from ObservableServices
:param event_hub: an instance of EventHub to be set up
:return: None
"""
handler_users = functools.partial(
build_object_related_event, target_root_topic='users'
)

handler_placements = functools.partial(
build_object_related_event, target_root_topic='placements'
)

handler_things = functools.partial(
build_object_related_event, target_root_topic='things'
)

event_hub.register_handler(
source_type=UserService, handler=handler_users
)

event_hub.register_handler(
source_type=PlacementService, handler=handler_placements
)

event_hub.register_handler(
source_type=ThingService, handler=handler_things
)

def _initialize_local_announcement(self):
"""
Performs an import of local_announce module an initializes the
Expand Down

0 comments on commit c0c24ba

Please sign in to comment.