Skip to content

Commit

Permalink
Merge pull request #58 from mokshaproject/stomp-queue-or-bust
Browse files Browse the repository at this point in the history
Apply stomp_queue later.
  • Loading branch information
ralphbean committed Jun 7, 2018
2 parents 14bc50d + 0b19844 commit 45e8955
Showing 1 changed file with 12 additions and 10 deletions.
22 changes: 12 additions & 10 deletions moksha.hub/moksha/hub/stomp/stomp.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,16 +55,6 @@ def __init__(self, hub, config):
host = self.config.get('stomp_broker')
uri = "%s:%i" % (host, port)

# Sometimes, a stomp consumer may wish to be subscribed to a queue
# which is composed of messages from many different topics. In this
# case, the hub hands dispatching messages to the right consumers.
# This extension is only concerned with the queue, and negotiating that
# with the broker.
stomp_queue = self.config.get('stomp_queue', None)
if stomp_queue:
# Overwrite the declarations of all of our consumers.
self._topics = [stomp_queue]

# A list of addresses over which we emulate failover()
self.addresses = [pair.split(":") for pair in uri.split(',')]
self.address_index = 0
Expand Down Expand Up @@ -118,6 +108,18 @@ def connected(self, server_heartbeat):
else:
log.debug("Skipping heartbeat initialization")

# Sometimes, a stomp consumer may wish to be subscribed to a queue
# which is composed of messages from many different topics. In this
# case, the hub hands dispatching messages to the right consumers.
# This extension is only concerned with the queue, and negotiating that
# with the broker.
stomp_queue = self.config.get('stomp_queue', None)
if stomp_queue and self._topics and self.topics != [stomp_queue]:
log.info('Discarding consumer-specified topics in favor of '
'stomp_queue=%s: %r' % (stomp_queue, self._topics))
# Overwrite the declarations of all of our consumers.
self._topics = [stomp_queue]

for topic in self._topics:
log.info('Subscribing to %s topic' % topic)
self.subscribe(topic, callback=lambda msg: None)
Expand Down

0 comments on commit 45e8955

Please sign in to comment.