diff --git a/bin/msgflo-python b/bin/msgflo-python index 7d585c7..c27d573 100755 --- a/bin/msgflo-python +++ b/bin/msgflo-python @@ -25,7 +25,7 @@ def load_module_file(filepath): def main(): prog, args = sys.argv[0], sys.argv[1:] try: - modulepath, role = args + modulepath, role = args[0:2] except ValueError, e: sys.stderr.write("Usage: msgflo-python MODULE.py ROLE\n") return 1 diff --git a/msgflo/msgflo.py b/msgflo/msgflo.py index d105af6..9c69416 100755 --- a/msgflo/msgflo.py +++ b/msgflo/msgflo.py @@ -3,6 +3,8 @@ import sys, os, json, random, urlparse sys.path.append(os.path.abspath(".")) +from optparse import OptionParser + import logging logging.basicConfig() log_level = os.environ.get('MSGFLO_PYTHON_LOGLEVEL') @@ -75,7 +77,7 @@ def __init__(self, broker, discovery_period=None): def done_callback(self, done_cb): self._done_cb = done_cb - def add_participant(self, participant): + def add_participant(self, participant, iips={}): raise NotImplementedError def ack_message(self, msg): @@ -92,6 +94,13 @@ def __init__(self, raw): self.data = raw self.json = None +def deliver_iips(participant): + iips = participant._iips + for port, data in iips.items(): + msg = Message(data) + msg.json = data + participant.process(port, msg) + class AmqpEngine(Engine): def __init__(self, broker, discovery_period=None): @@ -107,9 +116,10 @@ def __init__(self, broker, discovery_period=None): self._channel = self._conn.channel() self._channel.add_close_listener(self._channel_closed_cb) - def add_participant(self, participant): + def add_participant(self, participant, iips={}): self.participant = participant self.participant._engine = self + self.participant._iips = iips # Create and configure message exchange and queue for p in self.participant.definition['inports']: @@ -117,6 +127,10 @@ def add_participant(self, participant): for p in self.participant.definition['outports']: self._setup_queue(self.participant, self._channel, 'out', p) + # Deliver IIPs + deliver_iips(self.participant) + + # Send discovery message def send_discovery(): while self.participant: self._send_discovery(self._channel, self.participant.definition) @@ -222,9 +236,10 @@ def __init__(self, broker): port = 1883 self._client.connect(host, port, 60) - def add_participant(self, participant): + def add_participant(self, participant, iips={}): self.participant = participant self.participant._engine = self + self.participant._iips = iips def run(self): self._message_pump_greenlet = gevent.spawn(self._message_pump_greenthread) @@ -266,6 +281,10 @@ def _on_connect(self, client, userdata, flags, rc): subscriptions.append((topic, 0)) self._client.subscribe(subscriptions) + # Deliver IIPs + deliver_iips(self.participant) + + # Send discovery messsage def send_discovery(): while self.participant: delay = self.discovery_period/2.2 @@ -304,7 +323,7 @@ def _send_discovery(self, definition): logger.debug('sent discovery message %s' % msg) return -def run(participant, broker=None, done_cb=None): +def run(participant, broker=None, done_cb=None, iips={}): if broker is None: broker = os.environ.get('MSGFLO_BROKER', 'amqp://localhost') @@ -319,22 +338,36 @@ def run(participant, broker=None, done_cb=None): if done_cb: engine.done_callback(done_cb) - engine.add_participant(participant) + engine.add_participant(participant, iips) engine.run() return engine +def parse(argv, defaults={}): + parser = OptionParser(usage="%prog [options] role") + parser.add_option("-i", "--iips", dest="iips", default='{}', + help="Data as initial information packets", metavar='{"inportA": "inA-data", ...}') + + (options, args) = parser.parse_args(argv) + for k, v in defaults.items(): + if k not in options: + options[k] = v + options.iips = json.loads(options.iips) + options.role = args[0] + + return [options, parser] + def main(Participant, role=None): - if not role: - try: - role = sys.argv[1] - except IndexError, e: - role = participant.definition.component.tolower() + [config, parser] = parse(sys.argv) + if role: + config.role = role + if not config.role: + parser.error("role not specified") - participant = Participant(role) + participant = Participant(config.role) d = participant.definition waiter = gevent.event.AsyncResult() - engine = run(participant, done_cb=waiter.set) + engine = run(participant, done_cb=waiter.set, iips=config.iips) anon_url = "%s://%s" % (engine.broker_info.scheme, engine.broker_info.hostname) print "%s(%s) running on %s" % (d['role'], d['component'], anon_url) sys.stdout.flush()