Skip to content
Browse files

propagate iopub to clients

  • Loading branch information...
1 parent 84a3424 commit 8554e339e08c0dc794daa7cf102f205e7f15ac10 @minrk minrk committed Feb 1, 2011
View
4 IPython/zmq/displayhook.py
@@ -4,6 +4,8 @@
class DisplayHook(object):
+ topic=None
+
def __init__(self, session, pub_socket):
self.session = session
self.pub_socket = pub_socket
@@ -15,7 +17,7 @@ def __call__(self, obj):
__builtin__._ = obj
msg = self.session.send(self.pub_socket, u'pyout', {u'data':repr(obj)},
- parent=self.parent_header)
+ parent=self.parent_header, ident=self.topic)
def set_parent(self, parent):
self.parent_header = extract_header(parent)
View
9 IPython/zmq/iostream.py
@@ -23,7 +23,8 @@ class OutStream(object):
# The time interval between automatic flushes, in seconds.
flush_interval = 0.05
-
+ topic=None
+
def __init__(self, session, pub_socket, name):
self.session = session
self.pub_socket = pub_socket
@@ -49,10 +50,10 @@ def flush(self):
enc = sys.stdin.encoding or sys.getdefaultencoding()
data = data.decode(enc, 'replace')
content = {u'name':self.name, u'data':data}
- msg = self.session.send(self.pub_socket, u'stream',
- content=content,
- parent=self.parent_header)
+ msg = self.session.send(self.pub_socket, u'stream', content=content,
+ parent=self.parent_header, ident=self.topic)
logger.debug(msg)
+
self._buffer.close()
self._new_buffer()
View
4 IPython/zmq/parallel/asyncresult.py
@@ -137,11 +137,13 @@ def metadata(self):
return self._metadata
@property
- @check_ready
def result_dict(self):
"""result property as a dict."""
return self.get_dict(0)
+ def __dict__(self):
+ return self.get_dict(0)
+
#-------------------------------------
# dict-access
#-------------------------------------
View
84 IPython/zmq/parallel/client.py
@@ -103,6 +103,32 @@ def __getitem__(self, key):
raise res
return res
+class Metadata(dict):
+ """Subclass of dict for initializing metadata values."""
+ def __init__(self, *args, **kwargs):
+ dict.__init__(self)
+ md = {'msg_id' : None,
+ 'submitted' : None,
+ 'started' : None,
+ 'completed' : None,
+ 'received' : None,
+ 'engine_uuid' : None,
+ 'engine_id' : None,
+ 'follow' : None,
+ 'after' : None,
+ 'status' : None,
+
+ 'pyin' : None,
+ 'pyout' : None,
+ 'pyerr' : None,
+ 'stdout' : '',
+ 'stderr' : '',
+ }
+ self.update(md)
+ self.update(dict(*args, **kwargs))
+
+
+
class Client(object):
"""A semi-synchronous client to the IPython ZMQ controller
@@ -196,6 +222,7 @@ class Client(object):
_registration_socket=None
_query_socket=None
_control_socket=None
+ _iopub_socket=None
_notification_socket=None
_mux_socket=None
_task_socket=None
@@ -325,6 +352,11 @@ def connect_socket(s, addr):
self._control_socket = self.context.socket(zmq.PAIR)
self._control_socket.setsockopt(zmq.IDENTITY, self.session.session)
connect_socket(self._control_socket, content.control)
+ if content.iopub:
+ self._iopub_socket = self.context.socket(zmq.SUB)
+ self._iopub_socket.setsockopt(zmq.SUBSCRIBE, '')
+ self._iopub_socket.setsockopt(zmq.IDENTITY, self.session.session)
+ connect_socket(self._iopub_socket, content.iopub)
self._update_engines(dict(content.engines))
else:
@@ -350,8 +382,8 @@ def _unregister_engine(self, msg):
if eid in self._ids:
self._ids.remove(eid)
self._engines.pop(eid)
- #
- def _build_metadata(self, header, parent, content):
+
+ def _extract_metadata(self, header, parent, content):
md = {'msg_id' : parent['msg_id'],
'submitted' : datetime.strptime(parent['date'], ss.ISO8601),
'started' : datetime.strptime(header['started'], ss.ISO8601),
@@ -361,8 +393,8 @@ def _build_metadata(self, header, parent, content):
'engine_id' : self._engines.get(header['engine'], None),
'follow' : parent['follow'],
'after' : parent['after'],
- 'status' : content['status']
- }
+ 'status' : content['status'],
+ }
return md
def _handle_execute_reply(self, msg):
@@ -390,8 +422,12 @@ def _handle_apply_reply(self, msg):
content = msg['content']
header = msg['header']
- self.metadata[msg_id] = self._build_metadata(header, parent, content)
+ # construct metadata:
+ md = self.metadata.setdefault(msg_id, Metadata())
+ md.update(self._extract_metadata(header, parent, content))
+ self.metadata[msg_id] = md
+ # construct result:
if content['status'] == 'ok':
self.results[msg_id] = ss.unserialize_object(msg['buffers'])[0]
elif content['status'] == 'aborted':
@@ -448,6 +484,37 @@ def _flush_control(self, sock):
pprint(msg)
msg = self.session.recv(sock, mode=zmq.NOBLOCK)
+ def _flush_iopub(self, sock):
+ """Flush replies from the iopub channel waiting
+ in the ZMQ queue.
+ """
+ msg = self.session.recv(sock, mode=zmq.NOBLOCK)
+ while msg is not None:
+ if self.debug:
+ pprint(msg)
+ msg = msg[-1]
+ parent = msg['parent_header']
+ msg_id = parent['msg_id']
+ content = msg['content']
+ header = msg['header']
+ msg_type = msg['msg_type']
+
+ # init metadata:
+ md = self.metadata.setdefault(msg_id, Metadata())
+
+ if msg_type == 'stream':
+ name = content['name']
+ s = md[name] or ''
+ md[name] = s + content['data']
+ elif msg_type == 'pyerr':
+ md.update({'pyerr' : ss.unwrap_exception(content)})
+ else:
+ md.update({msg_type : content['data']})
+
+ self.metadata[msg_id] = md
+
+ msg = self.session.recv(sock, mode=zmq.NOBLOCK)
+
#--------------------------------------------------------------------------
# getitem
#--------------------------------------------------------------------------
@@ -501,6 +568,8 @@ def spin(self):
self._flush_results(self._task_socket)
if self._control_socket:
self._flush_control(self._control_socket)
+ if self._iopub_socket:
+ self._flush_iopub(self._iopub_socket)
def barrier(self, msg_ids=None, timeout=-1):
"""waits on one or more `msg_ids`, for up to `timeout` seconds.
@@ -966,10 +1035,13 @@ def get_results(self, msg_ids, status_only=False):
parent = rec['header']
header = rec['result_header']
rcontent = rec['result_content']
+ iodict = rec['io']
if isinstance(rcontent, str):
rcontent = self.session.unpack(rcontent)
- self.metadata[msg_id] = self._build_metadata(header, parent, rcontent)
+ md = self.metadata.setdefault(msg_id, Metadata())
+ md.update(self._extract_metadata(header, parent, rcontent))
+ md.update(iodict)
if rcontent['status'] == 'ok':
res,buffers = ss.unserialize_object(buffers)
View
37 IPython/zmq/parallel/controller.py
@@ -58,8 +58,8 @@ def make_argument_parser():
help='set the PUB socket for registration notification [default: random]')
parser.add_argument('--hb', type=str, metavar='PORTS',
help='set the 2 ports for heartbeats [default: random]')
- parser.add_argument('--ping', type=int, default=3000,
- help='set the heartbeat period in ms [default: 3000]')
+ parser.add_argument('--ping', type=int, default=100,
+ help='set the heartbeat period in ms [default: 100]')
parser.add_argument('--monitor', type=int, metavar='PORT', default=0,
help='set the SUB port for queue monitoring [default: random]')
parser.add_argument('--mux', type=str, metavar='PORTS',
@@ -68,11 +68,15 @@ def make_argument_parser():
help='set the XREP/XREQ ports for the task queue [default: random]')
parser.add_argument('--control', type=str, metavar='PORTS',
help='set the XREP ports for the control queue [default: random]')
- parser.add_argument('--scheduler', type=str, default='pure',
+ parser.add_argument('--iopub', type=str, metavar='PORTS',
+ help='set the PUB/SUB ports for the iopub relay [default: random]')
+ parser.add_argument('--scheduler', type=str, default='lru',
choices = ['pure', 'lru', 'plainrandom', 'weighted', 'twobin','leastload'],
- help='select the task scheduler [default: pure ZMQ]')
+ help='select the task scheduler [default: Python LRU]')
parser.add_argument('--mongodb', action='store_true',
help='Use MongoDB task storage [default: in-memory]')
+ parser.add_argument('--session', type=str, default=None,
+ help='Manually specify the session id.')
return parser
@@ -95,6 +99,11 @@ def main(argv=None):
else:
mux = None
random_ports += 2
+ if args.iopub:
+ iopub = split_ports(args.iopub, 2)
+ else:
+ iopub = None
+ random_ports += 2
if args.task:
task = split_ports(args.task, 2)
else:
@@ -139,7 +148,8 @@ def main(argv=None):
if args.execkey and not os.path.isfile(args.execkey):
generate_exec_key(args.execkey)
- thesession = session.StreamSession(username=args.ident or "controller", keyfile=args.execkey)
+ thesession = session.StreamSession(username=args.ident or "controller",
+ keyfile=args.execkey, session=args.session)
### build and launch the queues ###
@@ -151,6 +161,19 @@ def main(argv=None):
ports = select_random_ports(random_ports)
children = []
+
+ # IOPub relay (in a Process)
+ if not iopub:
+ iopub = (ports.pop(),ports.pop())
+ q = ProcessMonitoredQueue(zmq.SUB, zmq.PUB, zmq.PUB, 'iopub', 'N/A')
+ q.bind_in(iface%iopub[1])
+ q.bind_out(iface%iopub[0])
+ q.setsockopt_in(zmq.SUBSCRIBE, '')
+ q.connect_mon(iface%monport)
+ q.daemon=True
+ q.start()
+ children.append(q.launcher)
+
# Multiplexer Queue (in a Process)
if not mux:
mux = (ports.pop(),ports.pop())
@@ -204,6 +227,7 @@ def main(argv=None):
'queue': iface%mux[1],
'heartbeat': (iface%hb[0], iface%hb[1]),
'task' : iface%task[1],
+ 'iopub' : iface%iopub[1],
'monitor' : iface%monport,
}
@@ -212,8 +236,11 @@ def main(argv=None):
'query': iface%cport,
'queue': iface%mux[0],
'task' : iface%task[0],
+ 'iopub' : iface%iopub[0],
'notification': iface%nport
}
+
+ # register relay of signals to the children
signal_children(children)
hub = Hub(loop, thesession, sub, reg, hmon, c, n, db, engine_addrs, client_addrs)
dc = ioloop.DelayedCallback(lambda : print("Controller started..."), 100, loop)
View
17 IPython/zmq/parallel/engine.py
@@ -26,7 +26,7 @@
def printer(*msg):
- pprint(msg)
+ pprint(msg, stream=sys.__stdout__)
class Engine(object):
"""IPython engine"""
@@ -69,19 +69,12 @@ def complete_registration(self, msg):
shell_addrs = [str(queue_addr)]
control_addr = str(msg.content.control)
task_addr = msg.content.task
+ iopub_addr = msg.content.iopub
if task_addr:
shell_addrs.append(str(task_addr))
hb_addrs = msg.content.heartbeat
# ioloop.DelayedCallback(self.heart.start, 1000, self.loop).start()
-
- # placeholder for no, since pub isn't hooked up:
- sub = self.context.socket(zmq.SUB)
- sub = zmqstream.ZMQStream(sub, self.loop)
- sub.on_recv(lambda *a: None)
- port = sub.bind_to_random_port("tcp://%s"%LOCALHOST)
- iopub_addr = "tcp://%s:%i"%(LOCALHOST,12345)
-
k = make_kernel(self.ident, control_addr, shell_addrs, iopub_addr,
hb_addrs, client_addr=None, loop=self.loop,
context=self.context, key=self.session.key)[-1]
@@ -96,15 +89,15 @@ def complete_registration(self, msg):
# logger.info("engine::completed registration with id %s"%self.session.username)
- print (msg)
+ print (msg,file=sys.__stdout__)
def unregister(self):
self.session.send(self.registrar, "unregistration_request", content=dict(id=int(self.session.username)))
time.sleep(1)
sys.exit(0)
def start(self):
- print ("registering")
+ print ("registering",file=sys.__stdout__)
self.register()
@@ -128,7 +121,7 @@ def main(argv=None, user_ns=None):
connect_logger(ctx, iface%args.logport, root="engine", loglevel=args.loglevel)
reg_conn = iface % args.regport
- print (reg_conn)
+ print (reg_conn, file=sys.__stdout__)
print ("Starting the engine...", file=sys.__stderr__)
reg = ctx.socket(zmq.PAIR)
View
7 IPython/zmq/parallel/entry_point.py
@@ -91,7 +91,7 @@ def make_base_argument_parser():
help='set the XREP port for registration [default: 10101]')
parser.add_argument('--logport', type=int, metavar='PORT', default=20202,
help='set the PUB port for logging [default: 10201]')
- parser.add_argument('--loglevel', type=int, metavar='LEVEL', default=logging.DEBUG,
+ parser.add_argument('--loglevel', type=str, metavar='LEVEL', default=logging.DEBUG,
help='set the log level [default: DEBUG]')
parser.add_argument('--ident', type=str,
help='set the ZMQ identity [default: random]')
@@ -107,6 +107,11 @@ def make_base_argument_parser():
def connect_logger(context, iface, root="ip", loglevel=logging.DEBUG):
+ try:
+ loglevel = int(loglevel)
+ except ValueError:
+ if isinstance(loglevel, str):
+ loglevel = getattr(logging, loglevel)
lsock = context.socket(zmq.PUB)
lsock.connect(iface)
handler = handlers.PUBHandler(lsock)
View
82 IPython/zmq/parallel/hub.py
@@ -41,6 +41,10 @@
def _passer(*args, **kwargs):
return
+def _printer(*args, **kwargs):
+ print (args)
+ print (kwargs)
+
def init_record(msg):
"""return an empty TaskRecord dict, with all keys initialized with None."""
header = msg['header']
@@ -58,7 +62,12 @@ def init_record(msg):
'result_header' : None,
'result_content' : None,
'result_buffers' : None,
- 'queue' : None
+ 'queue' : None,
+ 'pyin' : None,
+ 'pyout': None,
+ 'pyerr': None,
+ 'stdout': '',
+ 'stderr': '',
}
@@ -181,19 +190,20 @@ def __init__(self, loop, session, queue, registrar, heartbeat, clientele, notifi
# register our callbacks
self.registrar.on_recv(self.dispatch_register_request)
self.clientele.on_recv(self.dispatch_client_msg)
- self.queue.on_recv(self.dispatch_queue_traffic)
+ self.queue.on_recv(self.dispatch_monitor_traffic)
if heartbeat is not None:
heartbeat.add_heart_failure_handler(self.handle_heart_failure)
heartbeat.add_new_heart_handler(self.handle_new_heart)
- self.queue_handlers = { 'in' : self.save_queue_request,
+ self.monitor_handlers = { 'in' : self.save_queue_request,
'out': self.save_queue_result,
'intask': self.save_task_request,
'outtask': self.save_task_result,
'tracktask': self.save_task_destination,
'incontrol': _passer,
'outcontrol': _passer,
+ 'iopub': self.save_iopub_message,
}
self.client_handlers = {'queue_request': self.queue_status,
@@ -262,7 +272,7 @@ def _validate_client_msg(self, msg):
try:
msg = self.session.unpack_message(msg[1:], content=True)
except:
- logger.error("client::Invalid Message %s"%msg)
+ logger.error("client::Invalid Message %s"%msg, exc_info=True)
return False
msg_type = msg.get('msg_type', None)
@@ -299,19 +309,20 @@ def dispatch_register_request(self, msg):
else:
handler(idents, msg)
- def dispatch_queue_traffic(self, msg):
- """all ME and Task queue messages come through here"""
- logger.debug("queue traffic: %s"%msg[:2])
+ def dispatch_monitor_traffic(self, msg):
+ """all ME and Task queue messages come through here, as well as
+ IOPub traffic."""
+ logger.debug("monitor traffic: %s"%msg[:2])
switch = msg[0]
idents, msg = self.session.feed_identities(msg[1:])
if not idents:
- logger.error("Bad Queue Message: %s"%msg)
+ logger.error("Bad Monitor Message: %s"%msg)
return
- handler = self.queue_handlers.get(switch, None)
+ handler = self.monitor_handlers.get(switch, None)
if handler is not None:
handler(idents, msg)
else:
- logger.error("Invalid message topic: %s"%switch)
+ logger.error("Invalid monitor topic: %s"%switch)
def dispatch_client_msg(self, msg):
@@ -486,7 +497,7 @@ def save_task_result(self, idents, msg):
msg = self.session.unpack_message(msg, content=False)
except:
logger.error("task::invalid task result message send to %r: %s"%(
- client_id, msg))
+ client_id, msg), exc_info=True)
raise
return
@@ -532,7 +543,7 @@ def save_task_destination(self, idents, msg):
try:
msg = self.session.unpack_message(msg, content=True)
except:
- logger.error("task::invalid task tracking message")
+ logger.error("task::invalid task tracking message", exc_info=True)
return
content = msg['content']
print (content)
@@ -557,6 +568,43 @@ def mia_task_request(self, idents, msg):
# self.session.send('mia_reply', content=content, idents=client_id)
+ #--------------------- IOPub Traffic ------------------------------
+
+ def save_iopub_message(self, topics, msg):
+ """save an iopub message into the db"""
+ print (topics)
+ try:
+ msg = self.session.unpack_message(msg, content=True)
+ except:
+ logger.error("iopub::invalid IOPub message", exc_info=True)
+ return
+
+ parent = msg['parent_header']
+ msg_id = parent['msg_id']
+ msg_type = msg['msg_type']
+ content = msg['content']
+
+ # ensure msg_id is in db
+ try:
+ rec = self.db.get_record(msg_id)
+ except:
+ logger.error("iopub::IOPub message has invalid parent", exc_info=True)
+ return
+ # stream
+ d = {}
+ if msg_type == 'stream':
+ name = content['name']
+ s = rec[name] or ''
+ d[name] = s + content['data']
+
+ elif msg_type == 'pyerr':
+ d['pyerr'] = content
+ else:
+ d[msg_type] = content['data']
+
+ self.db.update_record(msg_id, d)
+
+
#-------------------------------------------------------------------------
# Registration requests
@@ -579,7 +627,7 @@ def register_engine(self, reg, msg):
try:
queue = content['queue']
except KeyError:
- logger.error("registration::queue not specified")
+ logger.error("registration::queue not specified", exc_info=True)
return
heart = content.get('heartbeat', None)
"""register a new engine, and create the socket(s) necessary"""
@@ -639,7 +687,7 @@ def unregister_engine(self, ident, msg):
try:
eid = msg['content']['id']
except:
- logger.error("registration::bad engine id for unregistration: %s"%ident)
+ logger.error("registration::bad engine id for unregistration: %s"%ident, exc_info=True)
return
logger.info("registration::unregister_engine(%s)"%eid)
content=dict(id=eid, queue=self.engines[eid].queue)
@@ -662,7 +710,7 @@ def finish_registration(self, heart):
try:
(eid,queue,reg,purge) = self.incoming_registrations.pop(heart)
except KeyError:
- logger.error("registration::tried to finish nonexistant registration")
+ logger.error("registration::tried to finish nonexistant registration", exc_info=True)
return
logger.info("registration::finished registering engine %i:%r"%(eid,queue))
if purge is not None:
@@ -820,9 +868,13 @@ def get_results(self, client_id, msg):
completed.append(msg_id)
if not statusonly:
rec = records[msg_id]
+ io_dict = {}
+ for key in 'pyin pyout pyerr stdout stderr'.split():
+ io_dict[key] = rec[key]
content[msg_id] = { 'result_content': rec['result_content'],
'header': rec['header'],
'result_header' : rec['result_header'],
+ 'io' : io_dict,
}
buffers.extend(map(str, rec['result_buffers']))
else:
View
6 IPython/zmq/parallel/scheduler.py
@@ -30,9 +30,9 @@
@decorator
def logged(f,self,*args,**kwargs):
- print ("#--------------------")
- print ("%s(*%s,**%s)"%(f.func_name, args, kwargs))
- print ("#--")
+ # print ("#--------------------")
+ # print ("%s(*%s,**%s)"%(f.func_name, args, kwargs))
+ # print ("#--")
return f(self,*args, **kwargs)
#----------------------------------------------------------------------
View
40 IPython/zmq/parallel/streamkernel.py
@@ -28,6 +28,9 @@
from IPython.utils.traitlets import HasTraits, Instance, List
from IPython.zmq.completer import KernelCompleter
from IPython.zmq.log import logger # a Logger object
+from IPython.zmq.iostream import OutStream
+from IPython.zmq.displayhook import DisplayHook
+
from streamsession import StreamSession, Message, extract_header, serialize_object,\
unpack_apply_message, ISO8601, wrap_exception
@@ -36,7 +39,7 @@
from client import Client
def printer(*args):
- pprint(args)
+ pprint(args, stream=sys.__stdout__)
#-----------------------------------------------------------------------------
# Main kernel class
@@ -59,6 +62,7 @@ class Kernel(HasTraits):
def __init__(self, **kwargs):
super(Kernel, self).__init__(**kwargs)
self.identity = self.shell_streams[0].getsockopt(zmq.IDENTITY)
+ self.prefix = 'engine.%s'%self.identity
self.user_ns = {}
self.history = []
self.compiler = CommandCompiler()
@@ -212,18 +216,22 @@ def execute_request(self, stream, ident, parent):
return
# pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
# self.iopub_stream.send(pyin_msg)
- self.session.send(self.iopub_stream, u'pyin', {u'code':code},parent=parent)
+ self.session.send(self.iopub_stream, u'pyin', {u'code':code},parent=parent,
+ ident=self.identity+'.pyin')
started = datetime.now().strftime(ISO8601)
try:
comp_code = self.compiler(code, '<zmq-kernel>')
# allow for not overriding displayhook
if hasattr(sys.displayhook, 'set_parent'):
sys.displayhook.set_parent(parent)
+ sys.stdout.set_parent(parent)
+ sys.stderr.set_parent(parent)
exec comp_code in self.user_ns, self.user_ns
except:
exc_content = self._wrap_exception('execute')
# exc_msg = self.session.msg(u'pyerr', exc_content, parent)
- self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent)
+ self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent,
+ ident=self.identity+'.pyerr')
reply_content = exc_content
else:
reply_content = {'status' : 'ok'}
@@ -247,7 +255,7 @@ def complete(self, msg):
return self.completer.complete(msg.content.line, msg.content.text)
def apply_request(self, stream, ident, parent):
- print (parent)
+ # print (parent)
try:
content = parent[u'content']
bufs = parent[u'buffers']
@@ -266,6 +274,8 @@ def apply_request(self, stream, ident, parent):
# allow for not overriding displayhook
if hasattr(sys.displayhook, 'set_parent'):
sys.displayhook.set_parent(parent)
+ sys.stdout.set_parent(parent)
+ sys.stderr.set_parent(parent)
# exec "f(*args,**kwargs)" in self.user_ns, self.user_ns
if bound:
working = self.user_ns
@@ -305,7 +315,8 @@ def apply_request(self, stream, ident, parent):
except:
exc_content = self._wrap_exception('apply')
# exc_msg = self.session.msg(u'pyerr', exc_content, parent)
- self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent)
+ self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent,
+ ident=self.identity+'.pyerr')
reply_content = exc_content
result_buf = []
@@ -318,7 +329,7 @@ def apply_request(self, stream, ident, parent):
# self.reply_socket.send_json(reply_msg)
reply_msg = self.session.send(stream, u'apply_reply', reply_content,
parent=parent, ident=ident,buffers=result_buf, subheader=sub)
- print(Message(reply_msg), file=sys.__stdout__)
+ # print(Message(reply_msg), file=sys.__stdout__)
# if reply_msg['content']['status'] == u'error':
# self.abort_queues()
@@ -364,7 +375,7 @@ def dispatcher(msg):
if self.iopub_stream:
self.iopub_stream.on_err(printer)
- self.iopub_stream.on_send(printer)
+ # self.iopub_stream.on_send(printer)
#### while True mode:
# while True:
@@ -388,7 +399,9 @@ def dispatcher(msg):
# time.sleep(1e-3)
def make_kernel(identity, control_addr, shell_addrs, iopub_addr, hb_addrs,
- client_addr=None, loop=None, context=None, key=None):
+ client_addr=None, loop=None, context=None, key=None,
+ out_stream_factory=OutStream, display_hook_factory=DisplayHook):
+
# create loop, context, and session:
if loop is None:
loop = ioloop.IOLoop.instance()
@@ -417,6 +430,17 @@ def make_kernel(identity, control_addr, shell_addrs, iopub_addr, hb_addrs,
iopub_stream.setsockopt(zmq.IDENTITY, identity)
iopub_stream.connect(iopub_addr)
+ # Redirect input streams and set a display hook.
+ if out_stream_factory:
+ sys.stdout = out_stream_factory(session, iopub_stream, u'stdout')
+ sys.stdout.topic = identity+'.stdout'
+ sys.stderr = out_stream_factory(session, iopub_stream, u'stderr')
+ sys.stderr.topic = identity+'.stderr'
+ if display_hook_factory:
+ sys.displayhook = display_hook_factory(session, iopub_stream)
+ sys.displayhook.topic = identity+'.pyout'
+
+
# launch heartbeat
heart = heartmonitor.Heart(*map(str, hb_addrs), heart_id=identity)
heart.start()
View
15 IPython/zmq/parallel/streamsession.py
@@ -336,18 +336,17 @@ def check_key(self, msg_or_header):
return header.get('key', None) == self.key
- def send(self, stream, msg_type, content=None, buffers=None, parent=None, subheader=None, ident=None):
+ def send(self, stream, msg_or_type, content=None, buffers=None, parent=None, subheader=None, ident=None):
"""Build and send a message via stream or socket.
Parameters
----------
stream : zmq.Socket or ZMQStream
the socket-like object used to send the data
- msg_type : str or Message/dict
- Normally, msg_type will be
-
-
+ msg_or_type : str or Message/dict
+ Normally, msg_or_type will be a msg_type unless a message is being sent more
+ than once.
Returns
-------
@@ -356,13 +355,13 @@ def send(self, stream, msg_type, content=None, buffers=None, parent=None, subhea
the nice wrapped dict-like object containing the headers
"""
- if isinstance(msg_type, (Message, dict)):
+ if isinstance(msg_or_type, (Message, dict)):
# we got a Message, not a msg_type
# don't build a new Message
- msg = msg_type
+ msg = msg_or_type
content = msg['content']
else:
- msg = self.msg(msg_type, content, parent, subheader)
+ msg = self.msg(msg_or_type, content, parent, subheader)
buffers = [] if buffers is None else buffers
to_send = []
if isinstance(ident, list):

0 comments on commit 8554e33

Please sign in to comment.
Something went wrong with that request. Please try again.