Skip to content

Commit

Permalink
Start ZMQ transport driver
Browse files Browse the repository at this point in the history
  • Loading branch information
Matthew Harris committed Sep 21, 2017
1 parent b368ecc commit 35f55b3
Show file tree
Hide file tree
Showing 18 changed files with 400 additions and 53 deletions.
10 changes: 10 additions & 0 deletions eventify/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,3 +102,13 @@ def load_config(self):
logger.error('configuration file is required for eventify')
logger.error('unable to load configuration for service')
raise EventifyConfigError('Configuration is required! Missing: %s' % self.config_file)

@staticmethod
def check_event_loop():
"""
Check if event loop is closed and
create a new event loop
"""
loop = asyncio.get_event_loop()
if loop.is_closed():
asyncio.set_event_loop(asyncio.new_event_loop())
64 changes: 64 additions & 0 deletions eventify/drivers/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
"""
Base Driver Module
"""
import asyncio

import asyncpg

from autobahn.wamp.types import SubscribeOptions, PublishOptions

from eventify.persist.constants import EVENT_DB_HOST, \
EVENT_DB_USER, \
EVENT_DB_PASS, \
EVENT_DB_NAME
from eventify.util import objdict


class BaseComponent(object):
"""
Base class for driver components
"""

async def onConnect(self):
"""
Configure the component
"""
# Add extra attribute
# This allows for following crossbar/autobahn spec
# without changing legacy configuration
if not hasattr(self.config, 'extra'):
original_config = {'config': self.config}
self.config = objdict(self.config)
setattr(self.config, 'extra', original_config)
self.config.extra['handlers'] = self.handlers

# setup transport host
self.transport_host = self.config.extra['config']['transport_host']

# subscription setup
self.subscribe_options = SubscribeOptions(**self.config.extra['config']['sub_options'])
self.replay_events = self.config.extra['config']['replay_events']

# publishing setup
self.publish_topic = self.config.extra['config']['publish_topic']['topic']
self.publish_options = PublishOptions(**self.config.extra['config']['pub_options'])

# setup callback
self.handlers = self.config.extra['handlers']

# optional subscribed topics from config.json
self.subscribed_topics = self.config.extra['config']['subscribed_topics']

# put name on session
self.name = self.config.extra['config']['name']

# setup db pool - optionally
if self.config.extra['config']['pub_options']['retain'] is True:
self.pool = await asyncpg.create_pool(
user=EVENT_DB_USER,
password=EVENT_DB_PASS,
host=EVENT_DB_HOST,
database=EVENT_DB_NAME
)

await asyncio.sleep(0)
57 changes: 7 additions & 50 deletions eventify/drivers/crossbar.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,61 +15,29 @@

from autobahn.asyncio.wamp import ApplicationSession, ApplicationRunner
from autobahn.wamp.exception import TransportLost
from autobahn.wamp.types import SubscribeOptions, PublishOptions

from eventify import Eventify
from eventify.drivers.base import BaseComponent
from eventify.persist import persist_event
from eventify.persist.constants import EVENT_DB_HOST, EVENT_DB_USER, EVENT_DB_PASS, \
EVENT_DB_NAME
from eventify import Eventify


txaio.use_asyncio()


class Component(ApplicationSession):
class Component(BaseComponent, ApplicationSession):
"""
Handle subscribing to topics
"""
log = logging.getLogger("eventify.drivers.crossbar")


async def onConnect(self):
"""
Configure the component
Inherited from BaseComponent
"""
# setup transport host
self.transport_host = self.config.extra['config']['transport_host']

# subscription setup
self.subscribe_options = SubscribeOptions(**self.config.extra['config']['sub_options'])
self.replay_events = self.config.extra['config']['replay_events']

# publishing setup
self.publish_topic = self.config.extra['config']['publish_topic']['topic']
self.publish_options = PublishOptions(**self.config.extra['config']['pub_options'])

# setup callback
self.handlers = self.config.extra['handlers']

# optional subscribed topics from config.json
self.subscribed_topics = self.config.extra['config']['subscribed_topics']

# put name on session
self.name = self.config.extra['config']['name']

# setup db pool - optionally
if self.config.extra['config']['pub_options']['retain'] is True:
self.pool = await asyncpg.create_pool(
user=EVENT_DB_USER,
password=EVENT_DB_PASS,
host=EVENT_DB_HOST,
database=EVENT_DB_NAME
)

# Check for replay option
# if self.replay_events:
# await replay_events(self)

# join topic
super(BaseComponent, self).onConnect()
self.log.info("connected")
self.join(self.config.realm)

Expand Down Expand Up @@ -236,17 +204,6 @@ def setup_runner(self):
return runner


@staticmethod
def check_event_loop():
"""
Check if event loop is closed and
create a new event loop
"""
loop = asyncio.get_event_loop()
if loop.is_closed():
asyncio.set_event_loop(asyncio.new_event_loop())


def check_transport_host(self):
"""
Check if crossbar port is open
Expand Down Expand Up @@ -304,7 +261,7 @@ def start(self, start_loop=True):
Start a producer/consumer service
"""
txaio.start_logging(level='error')
txaio.start_logging()
runner = self.setup_runner()
if start_loop:
try:
Expand Down
157 changes: 157 additions & 0 deletions eventify/drivers/zeromq.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
"""
Crossbar Driver Module
"""
from __future__ import print_function

import asyncio
import logging
import socket
import sys
import time
import traceback

from asyncio import BaseProtocol

import txaio
import zmq
import zmq.asyncio

from eventify import Eventify
from eventify.drivers.base import BaseComponent
from eventify.persist import persist_event
from eventify.persist.constants import EVENT_DB_HOST, EVENT_DB_USER, EVENT_DB_PASS, \
EVENT_DB_NAME


txaio.use_asyncio()
ctx = zmq.asyncio.Context.instance()


class Component(BaseComponent):
"""
Handle subscribing to topics
"""
log = logging.getLogger("eventify.drivers.zeromq")

def __init__(self, config, handlers):
self.config = config
self.handlers = handlers


def run(self):
"""
start component
"""
loop = asyncio.get_event_loop()
if loop.is_closed():
asyncio.set_event_loop(asyncio.new_event_loop())
loop = asyncio.get_event_loop()

txaio.start_logging()
loop.run_until_complete(self.onConnect())


async def onConnect(self):
"""
Inherited from BaseComponent
"""
await super().onConnect()
self.log.info("connected")
await self.onJoin()


async def emit_event(self, event):
"""
Publish an event
:param event: Event object
"""
self.log.info("publishing event on %s", self.publish_topic)
if self.config.extra['config']['pub_options']['retain']:
try:
await persist_event(
self.publish_topic,
event,
self.pool
)
except SystemError as error:
self.log.error(error)
return

await asyncio.sleep(1)
#self.publish(
# self.publish_topic,
# event.__dict__,
# options=self.publish_options
#)


async def onJoin(self):
self.log.info("connected to zmq")

for handler in self.handlers:
# initialize handler
handler_instance = handler()
handler_instance.set_session(self)

if hasattr(handler_instance, 'init'):
await handler_instance.init()

if hasattr(handler_instance, 'on_event'):
self.log.debug("subscribing to topic %s", handler_instance.subscribe_topic)

# Used with base handler defined subscribe_topic
if handler_instance.subscribe_topic is not None:
session = ctx.socket(zmq.SUB)
session.connect(self.transport_host)
session.subscribe(handler_instance.subscribe_topic)
self.log.debug("subscribed to topic: %s", handler_instance.subscribe_topic)
while True:
msg = await session.recv_multipart()
await handler_instance.on_event(msg)
else:
# Used with config.json defined topics
if self.subscribed_topics is not None:
session = ctx.socket(zmq.SUB)
session.connect(self.transport_host)
for topic in self.subscribed_topics:
session.subscribe(topic)
self.log.info("subscribed to topic: %s", topic)
while True:
msg = await session.recv_multipart()
self.log.info('got msg %s', msg)
await handler_instance.on_event(msg)

if hasattr(handler_instance, 'worker'):
while True:
try:
await handler_instance.worker()
except Exception as error:
self.log.error("Operation failed. %s", error)
traceback.print_exc(file=sys.stdout)
continue


class Service(Eventify):
"""
Create zeromq service
"""

def check_transport_host(self):
"""
Check if zeromq socket is available
on transport host
"""
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
result = sock.connect_ex(('events-server', 8080))
if result == 0:
logging.info('port 8080 on zmq is open!')
return True
return False


def start(self):
"""
Start a producer/consumer service
"""
component = Component(self.config, self.handlers)
component.run()
7 changes: 7 additions & 0 deletions eventify/util.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
"""
General Utilities and helpers
"""

class objdict(object):
def __init__(self, d):
self.__dict__ = d
3 changes: 0 additions & 3 deletions example/Dockerfile

This file was deleted.

File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
7 changes: 7 additions & 0 deletions example/zeromq/Dockerfile-zmq-server
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
FROM zeromq/zeromq

ADD zmq_server.py /zmq_server.py

EXPOSE 5570

CMD ["python3", "/zmq_server.py"]
25 changes: 25 additions & 0 deletions example/zeromq/config.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
{
"_comment": "service configuration",
"name": "gce-vm-collector",
"image": "gce/vm",
"driver": "zeromq",
"transport_host": "tcp://events-server:5570",
"pub_options": {
"acknowledge": true,
"retain": false,
"exclude_me": true
},
"publish_topic": {
"topic": "gce-vm",
"timeout": 20,
"reply_in": 0
},
"subscribed_topics": [
"gce-vm"
],
"sub_options": {
"get_retained": false
},
"replay_events": false,
"replay_type": "event_store"
}

0 comments on commit 35f55b3

Please sign in to comment.