Skip to content

Commit

Permalink
add inter-engine communication example
Browse files Browse the repository at this point in the history
  • Loading branch information
minrk committed Apr 8, 2011
1 parent 681a891 commit 864a845
Show file tree
Hide file tree
Showing 2 changed files with 120 additions and 0 deletions.
77 changes: 77 additions & 0 deletions docs/examples/newparallel/interengine/communicator.py
@@ -0,0 +1,77 @@
import socket

import uuid
import zmq

from IPython.zmq.parallel.util import disambiguate_url

class EngineCommunicator(object):

def __init__(self, interface='tcp://*', identity=None):
self._ctx = zmq.Context()
self.socket = self._ctx.socket(zmq.XREP)
self.pub = self._ctx.socket(zmq.PUB)
self.sub = self._ctx.socket(zmq.SUB)

# configure sockets
self.identity = identity or bytes(uuid.uuid4())
print self.identity
self.socket.setsockopt(zmq.IDENTITY, self.identity)
self.sub.setsockopt(zmq.SUBSCRIBE, b'')

# bind to ports
port = self.socket.bind_to_random_port(interface)
pub_port = self.pub.bind_to_random_port(interface)
self.url = interface+":%i"%port
self.pub_url = interface+":%i"%pub_port
# guess first public IP from socket
self.location = socket.gethostbyname_ex(socket.gethostname())[-1][0]
self.peers = {}

def __del__(self):
self.socket.close()
self.pub.close()
self.sub.close()
self._ctx.term()

@property
def info(self):
"""return the connection info for this object's sockets."""
return (self.identity, self.url, self.pub_url, self.location)

def connect(self, peers):
"""connect to peers. `peers` will be a dict of 4-tuples, keyed by name.
{peer : (ident, addr, pub_addr, location)}
where peer is the name, ident is the XREP identity, addr,pub_addr are the
"""
for peer, (ident, url, pub_url, location) in peers.items():
self.peers[peer] = ident
if ident != self.identity:
self.sub.connect(disambiguate_url(pub_url, location))
if ident > self.identity:
# prevent duplicate xrep, by only connecting
# engines to engines with higher IDENTITY
# a doubly-connected pair will crash
self.socket.connect(disambiguate_url(url, location))

def send(self, peers, msg, flags=0, copy=True):
if not isinstance(peers, list):
peers = [peers]
if not isinstance(msg, list):
msg = [msg]
for p in peers:
ident = self.peers[p]
self.socket.send_multipart([ident]+msg, flags=flags, copy=copy)

def recv(self, flags=0, copy=True):
return self.socket.recv_multipart(flags=flags, copy=copy)[1:]

def publish(self, msg, flags=0, copy=True):
if not isinstance(msg, list):
msg = [msg]
self.pub.send_multipart(msg, copy=copy)

def consume(self, flags=0, copy=True):
return self.sub.recv_multipart(flags=flags, copy=copy)


43 changes: 43 additions & 0 deletions docs/examples/newparallel/interengine/interengine.py
@@ -0,0 +1,43 @@
import sys

from IPython.zmq.parallel import client


rc = client.Client()
rc.block=True
view = rc[:]
view.run('communicator.py')
view.execute('com = EngineCommunicator()')

# gather the connection information into a dict
ar = view.apply_async_bound(lambda : com.info)
peers = ar.get_dict()
# this is a dict, keyed by engine ID, of the connection info for the EngineCommunicators

# connect the engines to each other:
view.apply_sync_bound(lambda pdict: com.connect(pdict), peers)

# now all the engines are connected, and we can communicate between them:

def broadcast(client, sender, msg_name, dest_name=None, block=None):
"""broadcast a message from one engine to all others."""
dest_name = msg_name if dest_name is None else dest_name
client[sender].execute('com.publish(%s)'%msg_name, block=None)
targets = client.ids
targets.remove(sender)
return client[targets].execute('%s=com.consume()'%dest_name, block=None)

def send(client, sender, targets, msg_name, dest_name=None, block=None):
"""send a message from one to one-or-more engines."""
dest_name = msg_name if dest_name is None else dest_name
def _send(targets, m_name):
msg = globals()[m_name]
return com.send(targets, msg)

client[sender].apply_async_bound(_send, targets, msg_name)

return client[targets].execute('%s=com.recv()'%dest_name, block=None)




0 comments on commit 864a845

Please sign in to comment.