Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion bin/msgflo-python
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ def load_module_file(filepath):
def main():
prog, args = sys.argv[0], sys.argv[1:]
try:
modulepath, role = args
modulepath, role = args[0:2]
except ValueError, e:
sys.stderr.write("Usage: msgflo-python MODULE.py ROLE\n")
return 1
Expand Down
57 changes: 45 additions & 12 deletions msgflo/msgflo.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import sys, os, json, random, urlparse
sys.path.append(os.path.abspath("."))

from optparse import OptionParser

import logging
logging.basicConfig()
log_level = os.environ.get('MSGFLO_PYTHON_LOGLEVEL')
Expand Down Expand Up @@ -75,7 +77,7 @@ def __init__(self, broker, discovery_period=None):
def done_callback(self, done_cb):
self._done_cb = done_cb

def add_participant(self, participant):
def add_participant(self, participant, iips={}):
raise NotImplementedError

def ack_message(self, msg):
Expand All @@ -92,6 +94,13 @@ def __init__(self, raw):
self.data = raw
self.json = None

def deliver_iips(participant):
iips = participant._iips
for port, data in iips.items():
msg = Message(data)
msg.json = data
participant.process(port, msg)

class AmqpEngine(Engine):

def __init__(self, broker, discovery_period=None):
Expand All @@ -107,16 +116,21 @@ def __init__(self, broker, discovery_period=None):
self._channel = self._conn.channel()
self._channel.add_close_listener(self._channel_closed_cb)

def add_participant(self, participant):
def add_participant(self, participant, iips={}):
self.participant = participant
self.participant._engine = self
self.participant._iips = iips

# Create and configure message exchange and queue
for p in self.participant.definition['inports']:
self._setup_queue(self.participant, self._channel, 'in', p)
for p in self.participant.definition['outports']:
self._setup_queue(self.participant, self._channel, 'out', p)

# Deliver IIPs
deliver_iips(self.participant)

# Send discovery message
def send_discovery():
while self.participant:
self._send_discovery(self._channel, self.participant.definition)
Expand Down Expand Up @@ -222,9 +236,10 @@ def __init__(self, broker):
port = 1883
self._client.connect(host, port, 60)

def add_participant(self, participant):
def add_participant(self, participant, iips={}):
self.participant = participant
self.participant._engine = self
self.participant._iips = iips

def run(self):
self._message_pump_greenlet = gevent.spawn(self._message_pump_greenthread)
Expand Down Expand Up @@ -266,6 +281,10 @@ def _on_connect(self, client, userdata, flags, rc):
subscriptions.append((topic, 0))
self._client.subscribe(subscriptions)

# Deliver IIPs
deliver_iips(self.participant)

# Send discovery messsage
def send_discovery():
while self.participant:
delay = self.discovery_period/2.2
Expand Down Expand Up @@ -304,7 +323,7 @@ def _send_discovery(self, definition):
logger.debug('sent discovery message %s' % msg)
return

def run(participant, broker=None, done_cb=None):
def run(participant, broker=None, done_cb=None, iips={}):
if broker is None:
broker = os.environ.get('MSGFLO_BROKER', 'amqp://localhost')

Expand All @@ -319,22 +338,36 @@ def run(participant, broker=None, done_cb=None):

if done_cb:
engine.done_callback(done_cb)
engine.add_participant(participant)
engine.add_participant(participant, iips)
engine.run()

return engine

def parse(argv, defaults={}):
parser = OptionParser(usage="%prog [options] role")
parser.add_option("-i", "--iips", dest="iips", default='{}',
help="Data as initial information packets", metavar='{"inportA": "inA-data", ...}')

(options, args) = parser.parse_args(argv)
for k, v in defaults.items():
if k not in options:
options[k] = v
options.iips = json.loads(options.iips)
options.role = args[0]

return [options, parser]

def main(Participant, role=None):
if not role:
try:
role = sys.argv[1]
except IndexError, e:
role = participant.definition.component.tolower()
[config, parser] = parse(sys.argv)
if role:
config.role = role
if not config.role:
parser.error("role not specified")

participant = Participant(role)
participant = Participant(config.role)
d = participant.definition
waiter = gevent.event.AsyncResult()
engine = run(participant, done_cb=waiter.set)
engine = run(participant, done_cb=waiter.set, iips=config.iips)
anon_url = "%s://%s" % (engine.broker_info.scheme, engine.broker_info.hostname)
print "%s(%s) running on %s" % (d['role'], d['component'], anon_url)
sys.stdout.flush()
Expand Down