From fe748e1186249455192fe28c2e1d7e9dae1bbd10 Mon Sep 17 00:00:00 2001 From: Jon Nordby Date: Wed, 1 Feb 2017 17:08:06 +0100 Subject: [PATCH 1/3] examples: Fix hashbang --- bin/msgflo-python | 2 +- examples/repeat.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/bin/msgflo-python b/bin/msgflo-python index 59cf79b..7d585c7 100755 --- a/bin/msgflo-python +++ b/bin/msgflo-python @@ -1,4 +1,4 @@ -#!/usr/bin/env python +#!/usr/bin/env python2 import sys, os, importlib sys.path.append(os.path.abspath("..")) diff --git a/examples/repeat.py b/examples/repeat.py index b6d7eb9..60461b4 100755 --- a/examples/repeat.py +++ b/examples/repeat.py @@ -1,4 +1,4 @@ -#!/usr/bin/env python +#!/usr/bin/env python2 import sys, os, json, logging sys.path.append(os.path.abspath(".")) From b88afe7be5eaf85e231f625835007a34d012729e Mon Sep 17 00:00:00 2001 From: Jon Nordby Date: Wed, 1 Feb 2017 17:08:32 +0100 Subject: [PATCH 2/3] main(): Fix using wrong commandline argument for role --- msgflo/msgflo.py | 2 +- spec/participant.coffee | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/msgflo/msgflo.py b/msgflo/msgflo.py index b983bd3..9d95032 100755 --- a/msgflo/msgflo.py +++ b/msgflo/msgflo.py @@ -314,7 +314,7 @@ def run(participant, broker=None, done_cb=None): def main(Participant, role=None): if not role: try: - role = sys.argv[0] + role = sys.argv[1] except IndexError, e: role = participant.definition.component.tolower() diff --git a/spec/participant.coffee b/spec/participant.coffee index 5f6b6a4..f746cab 100644 --- a/spec/participant.coffee +++ b/spec/participant.coffee @@ -4,8 +4,9 @@ chai = require 'chai' unless chai heterogenous = require '../node_modules/msgflo/spec/heterogenous.coffee' python = process.env.PYTHON or 'python' +repeatPy = path.join __dirname, '..', 'examples', 'repeat.py' participants = - 'PythonRepeat': [python, path.join __dirname, '..', 'examples', 'repeat.py'] + 'PythonRepeat': [python, repeatPy, 'repeater'] # Note: most require running an external broker service transports = From cbd383b1ad1c5fae9d9d6359dfcbc93baee54a3d Mon Sep 17 00:00:00 2001 From: Jon Nordby Date: Wed, 1 Feb 2017 17:10:08 +0100 Subject: [PATCH 3/3] Send discovery message periodically --- msgflo/msgflo.py | 29 +++++++++++++++++++++-------- 1 file changed, 21 insertions(+), 8 deletions(-) diff --git a/msgflo/msgflo.py b/msgflo/msgflo.py index 9d95032..6ff67e9 100755 --- a/msgflo/msgflo.py +++ b/msgflo/msgflo.py @@ -63,11 +63,14 @@ def nack(self, msg): self._engine.nack_message(msg) +DEFAULT_DISCOVERY_PERIOD=60 + # Interface for engine/transport implementations class Engine(object): - def __init__(self, broker): + def __init__(self, broker, discovery_period=None): self.broker_url = broker self.broker_info = urlparse.urlparse(self.broker_url) + self.discovery_period = discovery_period if discovery_period else DEFAULT_DISCOVERY_PERIOD def done_callback(self, done_cb): self._done_cb = done_cb @@ -91,8 +94,8 @@ def __init__(self, raw): class AmqpEngine(Engine): - def __init__(self, broker): - Engine.__init__(self, broker) + def __init__(self, broker, discovery_period=None): + Engine.__init__(self, broker, discovery_period=discovery_period) # Connect to AMQP broker with default connection and authentication # FIXME: respect self.broker_url @@ -108,14 +111,19 @@ def add_participant(self, participant): self.participant = participant self.participant._engine = self - self._send_discovery(self._channel, self.participant.definition) - # Create and configure message exchange and queue for p in self.participant.definition['inports']: self._setup_queue(self.participant, self._channel, 'in', p) for p in self.participant.definition['outports']: self._setup_queue(self.participant, self._channel, 'out', p) + def send_discovery(): + while self.participant: + self._send_discovery(self._channel, self.participant.definition) + delay = self.discovery_period/2.2 + gevent.sleep(delay) # yields + gevent.Greenlet.spawn(send_discovery) + def run(self): # Start message pump self._message_pump_greenlet = gevent.spawn(self._message_pump_greenthread) @@ -249,9 +257,7 @@ def _message_pump_greenthread(self): def _on_connect(self, client, userdata, flags, rc): print("Connected with result code" + str(rc)) - self._send_discovery(self.participant.definition) - - + # Subscribe to queues for inports subscriptions = [] # ("topic", QoS) for port in self.participant.definition['inports']: @@ -260,6 +266,13 @@ def _on_connect(self, client, userdata, flags, rc): subscriptions.append((topic, 0)) self._client.subscribe(subscriptions) + def send_discovery(): + while self.participant: + delay = self.discovery_period/2.2 + self._send_discovery(self.participant.definition) + gevent.sleep(delay) # yields + gevent.Greenlet.spawn(send_discovery) + def _on_subscribe(self, client, userdata, mid, granted_qos): logging.debug('subscribed %s' % str(mid))