Skip to content
This repository has been archived by the owner on Aug 1, 2019. It is now read-only.

Commit

Permalink
Added robust message publisher and subscriber
Browse files Browse the repository at this point in the history
AMQP message broadcasting did not reconnect when, for some reason,
the server disconnected (ex: restart). This change builds on the
ConnectionService class to make it a generic, self-healing
connection manager that may be extended. The publisher and
subscriber logic was then built on top of ConnectionService to
take advantage of connection management.

- New self-healing, lazy initializing connection service.
- Publisher extends connection service.
- Subscriber extends connection service.
- ResourceHook was moved into notifications as the NotificationHook.
- Configuration options for explicit exchange and queue naming added.

Change-Id: Ib57c56a38574a0c70db9066625aef75ff8891c93
  • Loading branch information
krotscheck committed Sep 17, 2014
1 parent 8d86f7e commit 6df6a60
Show file tree
Hide file tree
Showing 9 changed files with 416 additions and 156 deletions.
9 changes: 9 additions & 0 deletions etc/storyboard.conf.sample
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,15 @@ lock_path = $state_path/lock
# The virtual host within which our queues and exchanges live.
# rabbit_virtual_host = /

# Application name that binds to rabbit.
# rabbit_application_name=storyboard

# The name of the topic exchange to which storyboard will broadcast its events.
# rabbit_exchange_name=storyboard

# The name of the queue that will be created for API events.
# rabbit_event_queue_name=storyboard_events

[database]
# This line MUST be changed to actually run storyboard
# Example:
Expand Down
6 changes: 2 additions & 4 deletions storyboard/api/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,11 @@
from storyboard.api.auth.token_storage import impls as storage_impls
from storyboard.api.auth.token_storage import storage
from storyboard.api import config as api_config
from storyboard.api.middleware import resource_hook
from storyboard.api.middleware import token_middleware
from storyboard.api.middleware import user_id_hook
from storyboard.api.v1.search import impls as search_engine_impls
from storyboard.api.v1.search import search_engine
from storyboard.notifications import connection_service
from storyboard.notifications.notification_hook import NotificationHook
from storyboard.openstack.common.gettextutils import _ # noqa
from storyboard.openstack.common import log

Expand Down Expand Up @@ -84,8 +83,7 @@ def setup_app(pecan_config=None):

# Setup notifier
if CONF.enable_notifications:
connection_service.initialize()
hooks.append(resource_hook.ResourceHook())
hooks.append(NotificationHook())

app = pecan.make_app(
pecan_config.app.root,
Expand Down
31 changes: 0 additions & 31 deletions storyboard/api/middleware/resource_hook.py

This file was deleted.

2 changes: 1 addition & 1 deletion storyboard/db/api/timeline_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ def event_create(values):
"resource": "timeline_events",
"event_id": new_event.id
}
publish(payload, "timeline_events")
publish("timeline_events", payload)

return new_event

Expand Down
9 changes: 9 additions & 0 deletions storyboard/notifications/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,15 @@
CONF = cfg.CONF

NOTIFICATION_OPTS = [
cfg.StrOpt("rabbit_exchange_name", default="storyboard",
help="The name of the topic exchange which storyboard will "
"use to broadcast its events."),
cfg.StrOpt("rabbit_event_queue_name", default="storyboard_events",
help="The name of the queue that will be created for "
"API events."),
cfg.StrOpt("rabbit_application_name", default="storyboard",
help="The rabbit application identifier for storyboard's "
"connection."),
cfg.StrOpt("rabbit_host", default="localhost",
help="Host of the rabbitmq server."),
cfg.StrOpt("rabbit_login_method", default="AMQPLAIN",
Expand Down
138 changes: 114 additions & 24 deletions storyboard/notifications/connection_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,50 +13,140 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from threading import Timer

import pika

from oslo.config import cfg

from storyboard.notifications.conf import NOTIFICATION_OPTS
from storyboard.openstack.common import log

CONF = cfg.CONF
CONN = None

CONF = cfg.CONF
LOG = log.getLogger(__name__)


class ConnectionService:
class ConnectionService(object):
"""A generic amqp connection agent that handles unexpected
interactions with RabbitMQ such as channel and connection closures,
by reconnecting on failure.
"""

def __init__(self, conf):
self.credentials = pika.PlainCredentials(
"""Setup the connection instance based on our configuration.
:param conf A configuration object.
"""
self._connection = None
self._channel = None
self._open = False
self.started = False
self._timer = None
self._closing = False
self._open_hooks = set()
self._exchange_name = conf.rabbit_exchange_name
self._application_id = conf.rabbit_application_name
self._properties = pika.BasicProperties(
app_id='storyboard', content_type='application/json')
self._connection_credentials = pika.PlainCredentials(
conf.rabbit_userid,
conf.rabbit_password)

self.connection = pika.BlockingConnection(pika.ConnectionParameters(
self._connection_parameters = pika.ConnectionParameters(
conf.rabbit_host,
conf.rabbit_port,
conf.rabbit_virtual_host,
self.credentials))
self._connection_credentials)

def _connect(self):
"""This method connects to RabbitMQ, establishes a channel, declares
the storyboard exchange if it doesn't yet exist, and executes any
post-connection hooks that an extending class may have registered.
"""

# If the closing flag is set, just exit.
if self._closing:
return

# If a timer is set, kill it.
if self._timer:
LOG.debug('Clearing timer...')
self._timer.cancel()
self._timer = None

# Create the connection
LOG.info('Connecting to %s', self._connection_parameters.host)
self._connection = pika.BlockingConnection(self._connection_parameters)

# Create a channel
LOG.debug('Creating a new channel')
self._channel = self._connection.channel()
self._channel.confirm_delivery()

# Declare the exchange
LOG.debug('Declaring exchange %s', self._exchange_name)
self._channel.exchange_declare(exchange=self._exchange_name,
exchange_type='topic',
durable=True,
auto_delete=False)

# Set the open flag and execute any connection hooks.
self._open = True
self._execute_open_hooks()

def _reconnect(self):
"""Reconnect to rabbit.
"""

# Sanity check - if we're closing, do nothing.
if self._closing:
return

# If a timer is already there, assume it's doing its thing...
if self._timer:
return
LOG.debug('Scheduling reconnect in 5 seconds...')
self._timer = Timer(5, self._connect)
self._timer.start()

def create_exchange(self, channel, exchange, type):
self.exchange = exchange
self.type = type
self.channel = channel
self.channel.exchange_declare(exchange=self.exchange,
type=self.type, durable=True)
def _close(self):
"""This method closes the connection to RabbitMQ."""
LOG.info('Closing connection')
self._open = False
if self._channel:
self._channel.close()
self._channel = None
if self._connection:
self._connection.close()
self._connection = None
self._closing = False
LOG.debug('Connection Closed')

def close_connection(self):
self.connection.close()
def _execute_open_hooks(self):
"""Executes all hooks that have been registered to run on open.
"""
for hook in self._open_hooks:
hook()

def start(self):
"""Start the publisher, opening a connection to RabbitMQ. This method
must be explicitly invoked, otherwise any messages will simply be
cached for later broadcast.
"""

def initialize():
# Initialize the AMQP event publisher.
global CONN
CONF.register_opts(NOTIFICATION_OPTS, "notifications")
CONN = ConnectionService(CONF.notifications)
# Create the connection.
self.started = True
self._closing = False
self._connect()

def stop(self):
"""Stop the publisher by closing the channel and the connection.
"""
self.started = False
self._closing = True
self._close()

def get_connection():
global CONN
return CONN
def add_open_hook(self, hook):
"""Add a method that will be executed whenever a connection is
established.
"""
self._open_hooks.add(hook)
82 changes: 82 additions & 0 deletions storyboard/notifications/notification_hook.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
# Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import json
import re

from pecan import hooks

from storyboard.notifications.publisher import publish


class NotificationHook(hooks.PecanHook):
def __init__(self):
super(NotificationHook, self).__init__()

def after(self, state):
# Ignore get methods, we only care about changes.
if state.request.method == 'GET':
return

request = state.request
req_method = request.method
req_user_id = request.current_user_id
req_path = request.path
req_resource_grp = self._parse(req_path)

if not req_resource_grp:
return

resource = req_resource_grp[0]

if req_resource_grp[1]:
resource_id = req_resource_grp[1]
else:
# When a resource is created..
response_str = state.response.body
response = json.loads(response_str)
if response:
resource_id = response.get('id')
else:
resource_id = None

# when adding/removing projects to project_groups..
if req_resource_grp[3]:
sub_resource_id = req_resource_grp[3]
payload = {
"user_id": req_user_id,
"method": req_method,
"resource": resource,
"resource_id": resource_id,
"sub_resource_id": sub_resource_id
}

else:
payload = {
"user_id": req_user_id,
"method": req_method,
"resource": resource,
"resource_id": resource_id
}

publish(resource, payload)

def _parse(self, s):
url_pattern = re.match("^\/v1\/([a-z_]+)\/?([0-9]+)?"
"\/?([a-z]+)?\/?([0-9]+)?$", s)
if url_pattern and url_pattern.groups()[0] != "openid":
return url_pattern.groups()
else:
return
Loading

0 comments on commit 6df6a60

Please sign in to comment.