From b21fee7c05dce3dfefed3323da802677db6b81e2 Mon Sep 17 00:00:00 2001 From: Jon Nordby Date: Tue, 21 Nov 2017 13:31:18 +0100 Subject: [PATCH] mqtt: Support multiple participants --- msgflo/msgflo.py | 63 +++++++++++++++++++++++++++++------------------- 1 file changed, 38 insertions(+), 25 deletions(-) diff --git a/msgflo/msgflo.py b/msgflo/msgflo.py index d88c879..dbe326c 100755 --- a/msgflo/msgflo.py +++ b/msgflo/msgflo.py @@ -63,7 +63,7 @@ def __init__(self, d, role): def send(self, outport, outdata): if not self._engine: return - self._engine._send(outport, outdata) + self._engine._send(self, outport, outdata) def process(self, inport, inmsg): raise NotImplementedError('IParticipant.process()') @@ -119,6 +119,8 @@ def __init__(self, broker, discovery_period=None): if isinstance(haigha, Exception): raise haigha + self.participant = None + # Prepare connection to AMQP broker vhost = '/' if self.broker_info.path: @@ -168,8 +170,8 @@ def ack_message(self, msg): def nack_message(self, msg): self._channel.basic.nack(msg.delivery_info["delivery_tag"]) - def _send(self, outport, data): - ports = self.participant.definition['outports'] + def _send(self, participant, outport, data): + ports = participant.definition['outports'] logger.debug("Publishing to message: %s, %s, %s" % (data,outport,ports)) serialized = json.dumps(data) msg = haigha_Message(serialized) @@ -243,6 +245,7 @@ def __init__(self, broker): Engine.__init__(self, broker) self._client = mqtt.Client() + self.participants = [] self.connected = False if self.broker_info.username: @@ -260,16 +263,17 @@ def __init__(self, broker): self._client.connect(host, port, 60) def add_participant(self, participant, iips={}): - self.participant = participant - self.participant._engine = self - self.participant._iips = iips + participant._engine = self + participant._iips = iips + + self.participants.append(participant) def run(self): self._message_pump_greenlet = gevent.spawn(self._message_pump_greenthread) - def _send(self, outport, data): + def _send(self, participant, outport, data): logger.debug('Participant sent on %s' % outport) - ports = self.participant.definition['outports'] + ports = participant.definition['outports'] serialized = json.dumps(data) port = [p for p in ports if outport == p['id']][0] queue = port['queue'] @@ -299,21 +303,24 @@ def _on_connect(self, client, userdata, flags, rc): self.connected = True # Subscribe to queues for inports - subscriptions = [] # ("topic", QoS) - for port in self.participant.definition['inports']: - topic = port['queue'] - logger.debug('subscribing to %s' % topic) - subscriptions.append((topic, 0)) - self._client.subscribe(subscriptions) + for participant in self.participants: + subscriptions = [] # ("topic", QoS) + for port in participant.definition['inports']: + topic = port['queue'] + logger.debug('subscribing to %s' % topic) + subscriptions.append((topic, 0)) + self._client.subscribe(subscriptions) # Deliver IIPs - deliver_iips(self.participant) + for p in self.participants: + deliver_iips(p) # Send discovery messsage def send_discovery(): - while self.participant and self.connected: + while self.connected: delay = self.discovery_period/2.2 - self._send_discovery(self.participant.definition) + for p in self.participants: + self._send_discovery(p.definition) gevent.sleep(delay) # yields gevent.Greenlet.spawn(send_discovery) @@ -336,10 +343,14 @@ def _on_subscribe(self, client, userdata, mid, granted_qos): def _on_message(self, client, userdata, mqtt_msg): logger.debug('got message on %s' % mqtt_msg.topic) + + participant = None port = "" - for inport in self.participant.definition['inports']: - if inport['queue'] == mqtt_msg.topic: - port = inport['id'] + for p in self.participants: + for inport in p.definition['inports']: + if inport['queue'] == mqtt_msg.topic: + port = inport['id'] + participant = p def notify(): msg = Message(mqtt_msg.payload) @@ -354,9 +365,10 @@ def notify(): logger.debug('unknown error %s' % str(e)) logger.debug('Delivering message to %s' % port) - self.participant.process(port, msg) + participant.process(port, msg) - gevent.spawn(notify) + if port and participant: + gevent.spawn(notify) def _send_discovery(self, definition): m = { @@ -369,7 +381,7 @@ def _send_discovery(self, definition): logger.debug('sent discovery message %s' % msg) return -def run(participant, broker=None, done_cb=None, iips={}): +def run(participants, broker=None, done_cb=None, iips={}): if broker is None: broker = os.environ.get('MSGFLO_BROKER', 'amqp://localhost') @@ -384,7 +396,8 @@ def run(participant, broker=None, done_cb=None, iips={}): if done_cb: engine.done_callback(done_cb) - engine.add_participant(participant, iips) + for p in participants: + engine.add_participant(p, iips) engine.run() return engine @@ -413,7 +426,7 @@ def main(Participant, role=None): participant = Participant(config.role) d = participant.definition waiter = gevent.event.AsyncResult() - engine = run(participant, done_cb=waiter.set, iips=config.iips) + engine = run([participant], done_cb=waiter.set, iips=config.iips) anon_url = "%s://%s" % (engine.broker_info.scheme, engine.broker_info.hostname) print("%s(%s) running on %s" % (d['role'], d['component'], anon_url)) sys.stdout.flush()