Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
nickmbailey committed Apr 4, 2012
0 parents commit 8d02a5c
Show file tree
Hide file tree
Showing 23 changed files with 1,698 additions and 0 deletions.
22 changes: 22 additions & 0 deletions LICENSE
@@ -0,0 +1,22 @@
Copyright (c) 2008 Michael Carter

Permission is hereby granted, free of charge, to any person
obtaining a copy of this software and associated documentation
files (the "Software"), to deal in the Software without
restriction, including without limitation the rights to use,
copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the
Software is furnished to do so, subject to the following
conditions:

The above copyright notice and this permission notice shall be
included in all copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
OTHER DEALINGS IN THE SOFTWARE.
17 changes: 17 additions & 0 deletions PKG-INFO
@@ -0,0 +1,17 @@
Metadata-Version: 1.0
Name: morbid
Version: 0.8.7.3
Summary: A Twisted-based publish/subscribe messaging server that uses the STOMP protocol
Home-page: UNKNOWN
Author: Michael Carter
Author-email: CarterMichael@gmail.com
License: MIT License
Description: UNKNOWN
Platform: UNKNOWN
Classifier: Development Status :: 4 - Beta
Classifier: Environment :: Console
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: MIT License
Classifier: Operating System :: OS Independent
Classifier: Programming Language :: Python
Classifier: Topic :: Software Development :: Libraries :: Python Modules
18 changes: 18 additions & 0 deletions README
@@ -0,0 +1,18 @@
Morbid is a Twisted-based publish/subscribe messaging server that uses the
STOMP protocol. To use Morbid, run python setup.py install; then type morbid.
For additional help, use morbid -h.

It is named Morbid because it is a sin against humanity. Whoever writes new MQs
without good reason should be shot. Morbid exists only to provide basic
publish/subscribe functionality to orbited users that don't know how to
download and start a real message queue. For some reason, no one likes the
idea of doing anything in a reasonable, coherent way with more than one
process, ever.

If you want a real message queue that speaks STOMP (or some almost-STOMP) then
check out RabbitMQ (www.rabbitmq.com) or ActiveMQ (http://activemq.apache.org/)

If you want to extend this server in some way for some reason, then feel free;
just email the Orbited mailing list (orbited-users@groups.google.com) with your
patches. If this happens a lot, I'll create a mailing list and make the
repoistory public.
4 changes: 4 additions & 0 deletions morbid/__init__.py
@@ -0,0 +1,4 @@
__version__ = "0.8.7.1"
from morbid import StompFactory, StompProtocol, get_stomp_factory, main
import messagequeue
import mqsecurity
8 changes: 8 additions & 0 deletions morbid/error.py
@@ -0,0 +1,8 @@
def error(msg, reactor_running=True):
print "MorbidQ Error:",msg
if reactor_running:
from twisted.internet import reactor
reactor.stop()
else:
import sys
sys.exit(-1)
177 changes: 177 additions & 0 deletions morbid/messagequeue.py
@@ -0,0 +1,177 @@
class QueueError(Exception):

def __init__(self, error_code):
self.code = error_code

def __str__(self):
return repr(self.code)


class MessageQueueRoot(object):
def __init__(self, name):
self.name = name
self.subscribers = []
self.id = 0

def subscribe(self, proto):
#print "mqr - subscribe - proto =", proto
if proto not in self.subscribers:
self.subscribers.append(proto)

def unsubscribe(self, proto):
if proto in self.subscribers:
self.subscribers.remove(proto)

def empty(self):
return not bool(self.subscribers)

def prep_headers(self, headers):
self.id += 1
id = self.name + '_' + str(self.id)
headers.update({'destination': self.name, 'message-id': id})
return headers

def prep_message(self, message):
#print "message to prep", message
if isinstance(message, tuple):
headers, body = message
else:
headers, body = {}, message
message = (self.prep_headers(headers), body)
return message


class Queue(MessageQueueRoot):
"""
A queue sends a message to any one listener
"""

def __init__(self, name):
MessageQueueRoot.__init__(self, name)
self.messages = []

def subscribe(self, proto):
MessageQueueRoot.subscribe(self, proto)
while self.messages:
message = self.messages.pop(0)
proto.send(self.prep_message(message))

def send(self, message):
""" Calls the send function for each attached protocol """
if not self.subscribers:
self.messages.append(message)
else:
target = self.subscribers.pop(0)
target.send(self.prep_message(message))
self.subscribers.append(target)

def empty(self):
return not bool(self.subscribers or self.messages)


class Topic(MessageQueueRoot):
def send(self, message):
""" Calls the send function for each attached protocol """
for proto in self.subscribers:
proto.send(self.prep_message(message))


class Pubsub(object):
"""
Sending to a pubsub queue replicates the message to all subordinate queues.
Each pubsub queue contains the list of message queues that should
receive the message.
Clients cannot subscribe to the pubsub queue itself, they must subscribe
to a subordinate queue.
"""
def __init__(self, name):
self.baseQueue = Queue(name)
self.child_queues = []

def add_child(self, queue_ref):
if queue_ref not in self.child_queues:
self.child_queues.append(queue_ref)

def remove_child(self, queue_ref):
if queue_ref in self.child_queues:
self.child_queues.remove(queue_ref)

def subscribe(self, proto):
raise QueueError("FAIL")

def send(self, message):
""" Calls the send function for each child queue """
for q in self.child_queues:
q.send(message)

def empty(self):
return not bool(len(self.child_queues) > 0)


class MessageQueueManager(object):

def __init__(self):
self.message_queues = {}

def set_queue_rights(self, queue_rights):
self.queue_rights = queue_rights

def test_queue_rights(self, groups, qname, rights):
if rights not in self.queue_rights(groups, qname):
error = "FAIL"+rights.upper()
raise QueueError(error)

def create_queue(self, proto, dest_name):
if self.message_queues.has_key(dest_name):
return
self.test_queue_rights(proto.get_groups(), dest_name, 'c')
if dest_name.startswith('/queue/'):
dest = Queue(dest_name)
elif dest_name.startswith('/pubsub/'):
dest = Pubsub(dest_name)
else: dest = Topic(dest_name)
self.message_queues[dest_name] = dest

def subscribe_queue(self, proto, dest_name):
self.create_queue(proto, dest_name)
self.test_queue_rights(proto.get_groups(), dest_name, 'r')
self.message_queues[dest_name].subscribe(proto)

def send_message(self, proto, dest_name, message):
#print "mqm - send - P, D, M", proto, dest_name, message
self.create_queue(proto, dest_name)
self.test_queue_rights(proto.get_groups(), dest_name, 'w')
if not isinstance(message, tuple):
message = ({}, message)
self.message_queues[dest_name].send(message)

def leave_queue(self, proto, dest_name):
if self.message_queues.has_key(dest_name):
self.message_queues[dest_name].unsubscribe(proto)

def destroy_queue(self, proto, dest_name):
self.create_queue(proto, dest_name)
#self.test_queue_rights(proto.get_groups(), dest_name, 'c')
# Go through all the pubsub queues
self.remove_pubsub_child(self.message_queues[dest_name])
del self.message_queues[dest_name]

def remove_pubsub_child(self, queue_ref):
[mq.remove_child(queue_ref) for mq in message_queues
if isinstance(mq, Pubsub)]

def add_pubsub_child(self, proto, dest_name, child_queue):
self.create_queue(proto, dest_name)
self.test_queue_rights(proto.get_groups(), child_queue, 'w')
self.create_queue(proto, child_queue)
self.message_queues[dest_name].add_child(self.message_queues[child_queue])

def get_list_of_queues(self):
aRoomList = self.message_queues.keys()
aRoomList.sort()
return aRoomList

def unsubscribe_all_queues(self, proto):
for q_name in self.message_queues:
self.message_queues[q_name].unsubscribe(proto)

0 comments on commit 8d02a5c

Please sign in to comment.