Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion bin/msgflo-python
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#!/usr/bin/env python
#!/usr/bin/env python2

import sys, os, importlib
sys.path.append(os.path.abspath(".."))
Expand Down
2 changes: 1 addition & 1 deletion examples/repeat.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#!/usr/bin/env python
#!/usr/bin/env python2

import sys, os, json, logging
sys.path.append(os.path.abspath("."))
Expand Down
31 changes: 22 additions & 9 deletions msgflo/msgflo.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,14 @@ def nack(self, msg):
self._engine.nack_message(msg)


DEFAULT_DISCOVERY_PERIOD=60

# Interface for engine/transport implementations
class Engine(object):
def __init__(self, broker):
def __init__(self, broker, discovery_period=None):
self.broker_url = broker
self.broker_info = urlparse.urlparse(self.broker_url)
self.discovery_period = discovery_period if discovery_period else DEFAULT_DISCOVERY_PERIOD

def done_callback(self, done_cb):
self._done_cb = done_cb
Expand All @@ -91,8 +94,8 @@ def __init__(self, raw):

class AmqpEngine(Engine):

def __init__(self, broker):
Engine.__init__(self, broker)
def __init__(self, broker, discovery_period=None):
Engine.__init__(self, broker, discovery_period=discovery_period)

# Connect to AMQP broker with default connection and authentication
# FIXME: respect self.broker_url
Expand All @@ -108,14 +111,19 @@ def add_participant(self, participant):
self.participant = participant
self.participant._engine = self

self._send_discovery(self._channel, self.participant.definition)

# Create and configure message exchange and queue
for p in self.participant.definition['inports']:
self._setup_queue(self.participant, self._channel, 'in', p)
for p in self.participant.definition['outports']:
self._setup_queue(self.participant, self._channel, 'out', p)

def send_discovery():
while self.participant:
self._send_discovery(self._channel, self.participant.definition)
delay = self.discovery_period/2.2
gevent.sleep(delay) # yields
gevent.Greenlet.spawn(send_discovery)

def run(self):
# Start message pump
self._message_pump_greenlet = gevent.spawn(self._message_pump_greenthread)
Expand Down Expand Up @@ -249,9 +257,7 @@ def _message_pump_greenthread(self):

def _on_connect(self, client, userdata, flags, rc):
print("Connected with result code" + str(rc))
self._send_discovery(self.participant.definition)



# Subscribe to queues for inports
subscriptions = [] # ("topic", QoS)
for port in self.participant.definition['inports']:
Expand All @@ -260,6 +266,13 @@ def _on_connect(self, client, userdata, flags, rc):
subscriptions.append((topic, 0))
self._client.subscribe(subscriptions)

def send_discovery():
while self.participant:
delay = self.discovery_period/2.2
self._send_discovery(self.participant.definition)
gevent.sleep(delay) # yields
gevent.Greenlet.spawn(send_discovery)

def _on_subscribe(self, client, userdata, mid, granted_qos):
logging.debug('subscribed %s' % str(mid))

Expand Down Expand Up @@ -314,7 +327,7 @@ def run(participant, broker=None, done_cb=None):
def main(Participant, role=None):
if not role:
try:
role = sys.argv[0]
role = sys.argv[1]
except IndexError, e:
role = participant.definition.component.tolower()

Expand Down
3 changes: 2 additions & 1 deletion spec/participant.coffee
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@ chai = require 'chai' unless chai
heterogenous = require '../node_modules/msgflo/spec/heterogenous.coffee'

python = process.env.PYTHON or 'python'
repeatPy = path.join __dirname, '..', 'examples', 'repeat.py'
participants =
'PythonRepeat': [python, path.join __dirname, '..', 'examples', 'repeat.py']
'PythonRepeat': [python, repeatPy, 'repeater']

# Note: most require running an external broker service
transports =
Expand Down