Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 38 additions & 25 deletions msgflo/msgflo.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ def __init__(self, d, role):
def send(self, outport, outdata):
if not self._engine:
return
self._engine._send(outport, outdata)
self._engine._send(self, outport, outdata)

def process(self, inport, inmsg):
raise NotImplementedError('IParticipant.process()')
Expand Down Expand Up @@ -119,6 +119,8 @@ def __init__(self, broker, discovery_period=None):
if isinstance(haigha, Exception):
raise haigha

self.participant = None

# Prepare connection to AMQP broker
vhost = '/'
if self.broker_info.path:
Expand Down Expand Up @@ -168,8 +170,8 @@ def ack_message(self, msg):
def nack_message(self, msg):
self._channel.basic.nack(msg.delivery_info["delivery_tag"])

def _send(self, outport, data):
ports = self.participant.definition['outports']
def _send(self, participant, outport, data):
ports = participant.definition['outports']
logger.debug("Publishing to message: %s, %s, %s" % (data,outport,ports))
serialized = json.dumps(data)
msg = haigha_Message(serialized)
Expand Down Expand Up @@ -243,6 +245,7 @@ def __init__(self, broker):
Engine.__init__(self, broker)

self._client = mqtt.Client()
self.participants = []
self.connected = False

if self.broker_info.username:
Expand All @@ -260,16 +263,17 @@ def __init__(self, broker):
self._client.connect(host, port, 60)

def add_participant(self, participant, iips={}):
self.participant = participant
self.participant._engine = self
self.participant._iips = iips
participant._engine = self
participant._iips = iips

self.participants.append(participant)

def run(self):
self._message_pump_greenlet = gevent.spawn(self._message_pump_greenthread)

def _send(self, outport, data):
def _send(self, participant, outport, data):
logger.debug('Participant sent on %s' % outport)
ports = self.participant.definition['outports']
ports = participant.definition['outports']
serialized = json.dumps(data)
port = [p for p in ports if outport == p['id']][0]
queue = port['queue']
Expand Down Expand Up @@ -299,21 +303,24 @@ def _on_connect(self, client, userdata, flags, rc):
self.connected = True

# Subscribe to queues for inports
subscriptions = [] # ("topic", QoS)
for port in self.participant.definition['inports']:
topic = port['queue']
logger.debug('subscribing to %s' % topic)
subscriptions.append((topic, 0))
self._client.subscribe(subscriptions)
for participant in self.participants:
subscriptions = [] # ("topic", QoS)
for port in participant.definition['inports']:
topic = port['queue']
logger.debug('subscribing to %s' % topic)
subscriptions.append((topic, 0))
self._client.subscribe(subscriptions)

# Deliver IIPs
deliver_iips(self.participant)
for p in self.participants:
deliver_iips(p)

# Send discovery messsage
def send_discovery():
while self.participant and self.connected:
while self.connected:
delay = self.discovery_period/2.2
self._send_discovery(self.participant.definition)
for p in self.participants:
self._send_discovery(p.definition)
gevent.sleep(delay) # yields
gevent.Greenlet.spawn(send_discovery)

Expand All @@ -336,10 +343,14 @@ def _on_subscribe(self, client, userdata, mid, granted_qos):

def _on_message(self, client, userdata, mqtt_msg):
logger.debug('got message on %s' % mqtt_msg.topic)

participant = None
port = ""
for inport in self.participant.definition['inports']:
if inport['queue'] == mqtt_msg.topic:
port = inport['id']
for p in self.participants:
for inport in p.definition['inports']:
if inport['queue'] == mqtt_msg.topic:
port = inport['id']
participant = p

def notify():
msg = Message(mqtt_msg.payload)
Expand All @@ -354,9 +365,10 @@ def notify():
logger.debug('unknown error %s' % str(e))

logger.debug('Delivering message to %s' % port)
self.participant.process(port, msg)
participant.process(port, msg)

gevent.spawn(notify)
if port and participant:
gevent.spawn(notify)

def _send_discovery(self, definition):
m = {
Expand All @@ -369,7 +381,7 @@ def _send_discovery(self, definition):
logger.debug('sent discovery message %s' % msg)
return

def run(participant, broker=None, done_cb=None, iips={}):
def run(participants, broker=None, done_cb=None, iips={}):
if broker is None:
broker = os.environ.get('MSGFLO_BROKER', 'amqp://localhost')

Expand All @@ -384,7 +396,8 @@ def run(participant, broker=None, done_cb=None, iips={}):

if done_cb:
engine.done_callback(done_cb)
engine.add_participant(participant, iips)
for p in participants:
engine.add_participant(p, iips)
engine.run()

return engine
Expand Down Expand Up @@ -413,7 +426,7 @@ def main(Participant, role=None):
participant = Participant(config.role)
d = participant.definition
waiter = gevent.event.AsyncResult()
engine = run(participant, done_cb=waiter.set, iips=config.iips)
engine = run([participant], done_cb=waiter.set, iips=config.iips)
anon_url = "%s://%s" % (engine.broker_info.scheme, engine.broker_info.hostname)
print("%s(%s) running on %s" % (d['role'], d['component'], anon_url))
sys.stdout.flush()
Expand Down