Skip to content

Commit

Permalink
Merge 26a69a5 into e0795d5
Browse files Browse the repository at this point in the history
  • Loading branch information
xychu committed Aug 11, 2015
2 parents e0795d5 + 26a69a5 commit 2ec207b
Show file tree
Hide file tree
Showing 6 changed files with 69 additions and 18 deletions.
10 changes: 4 additions & 6 deletions pdc/apps/compose/lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,14 +65,14 @@ def _link_compose_to_integrated_product(request, compose, variant):

def _add_compose_create_msg(request, compose_obj):
"""
Add compose create message to request.messagings.
Add compose create message to request._messagings.
"""
msg = {'action': 'create',
'compose_id': compose_obj.compose_id,
'compose_date': compose_obj.compose_date.isoformat(),
'compose_type': compose_obj.compose_type.name,
'compose_respin': compose_obj.compose_respin}
request.messagings.append(('.compose', json.dumps(msg)))
request._request._messagings.append(('.compose', json.dumps(msg)))


@transaction.atomic
Expand All @@ -95,8 +95,7 @@ def compose__import_rpms(request, release_id, composeinfo, rpm_manifest):
compose_label=ci.compose.label or None,
acceptance_testing=acceptance_status,
)

if created and hasattr(request, 'messagings'):
if created and hasattr(request._request, '_messagings'):
# add message
_add_compose_create_msg(request, compose_obj)

Expand Down Expand Up @@ -177,8 +176,7 @@ def compose__import_images(request, release_id, composeinfo, image_manifest):
compose_respin=ci.compose.respin,
compose_label=ci.compose.label or None,
)

if created and hasattr(request, 'messagings'):
if created and hasattr(request._request, '_messagings'):
# add message
_add_compose_create_msg(request, compose_obj)

Expand Down
13 changes: 7 additions & 6 deletions pdc/apps/compose/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -578,12 +578,13 @@ def update(self, request, *args, **kwargs):
json.dumps({'linked_releases': old_data['linked_releases']}),
json.dumps({'linked_releases': response.data['linked_releases']}))
# Add message
if hasattr(request, 'messagings'):
request.messagings.append(('.compose',
json.dumps({'action': 'update',
'compose_id': self.object.compose_id,
'from': old_data,
'to': response.data})))
if hasattr(request._request, '_messagings'):
request._request._messagings.append(
('.compose',
json.dumps({'action': 'update',
'compose_id': self.object.compose_id,
'from': old_data,
'to': response.data})))
return response

def perform_update(self, serializer):
Expand Down
5 changes: 4 additions & 1 deletion pdc/apps/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
#
from django.conf import settings

from .messaging import DummyMessenger, KombuMessenger, FedmsgMessenger, ProtonMessenger
from .messaging import (DummyMessenger, KombuMessenger, FedmsgMessenger,
ProtonMessenger, StompMessenger)


# init messenger
Expand All @@ -15,5 +16,7 @@
messenger = FedmsgMessenger()
elif settings.MESSAGE_BUS['MLP'] == 'proton':
messenger = ProtonMessenger()
elif settings.MESSAGE_BUS['MLP'] == 'stomp':
messenger = StompMessenger()
else:
messenger = DummyMessenger()
37 changes: 37 additions & 0 deletions pdc/apps/utils/messaging.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@
#
from django.conf import settings

import logging

logger = logging.getLogger(__name__)


class DummyMessenger(object):
def __init__(self):
Expand Down Expand Up @@ -49,3 +53,36 @@ def send_message(self, topic, msg):
self.message.body = msg
self.messenger.put(self.message)
self.messenger.send()


class StompMessenger(object):
def __init__(self):
import stomp
self.connection = stomp.Connection(
host_and_ports=settings.MESSAGE_BUS['HOST_AND_PORTS'],
use_ssl=True,
ssl_key_file=settings.MESSAGE_BUS['KEY_FILE'],
ssl_cert_file=settings.MESSAGE_BUS['CERT_FILE'],
)
self.connected = self.do_connect()

def do_connect(self):
try:
self.connection.start()
self.connection.connect()
except Exception, e:
logger.warn("StompMessenger connection exception(%s): %s." % (type(e), e))
return False
return True

def send_message(self, topic, msg):
address = '/topic/' + settings.MESSAGE_BUS['TOPIC'] + str(topic)
if self.connected or self.do_connect():
try:
self.connection.send(body=msg, destination=address,
headers={'persistent': 'true'},
auto='true')
except Exception, e:
logger.warn("Send Message exception(%s): %s." % (type(e), e))
else:
logger.warn("Send Message exception: Failed to Connect to Messaging Server.")
10 changes: 5 additions & 5 deletions pdc/apps/utils/middleware.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,17 @@
class MessagingMiddleware(object):
"""
Create a messaging list for each request. It is accessible via
`request.messagings`. If the request ends sucessfully, the messager
`request._messagings`. If the request ends sucessfully, the messager
could send all the messages in it.
"""

def process_request(self, request):
request.messagings = []
request._messagings = []

def process_response(self, request, response):
if not getattr(response, 'exception', 0) and response.status_code < 400:
if hasattr(request, 'messagings'):
for topic, msg in request.messagings:
if hasattr(request, '_messagings'):
for topic, msg in request._messagings:
messenger.send_message(topic=topic, msg=msg)
request.messagings = None
request._messagings = None
return response
12 changes: 12 additions & 0 deletions pdc/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,7 @@
MESSAGE_BUS = {
# MLP: Messaging Library Package
# e.g. `fedmsg` for fedmsg or `kombu` for AMQP and other transports that `kombu` supports.
# `stomp` for STOMP supports.
'MLP': '',

# # `fedmsg` config example:
Expand All @@ -246,6 +247,17 @@
# 'cert_reqs': ssl.CERT_REQUIRED,
# }
# }
#
# # `stomp` config items:
# 'MLP': 'stomp',
# 'HOST_AND_PORTS': [
# ('stomp.example1.com', 61613),
# ('stomp.example2.com', 61613),
# ('stomp.example3.com', 61613),
# ],
# 'TOPIC': 'pdc',
# 'CERT_FILE': '',
# 'KEY_FILE': '',
}

LOGGING = {
Expand Down

0 comments on commit 2ec207b

Please sign in to comment.