Permalink
Browse files

Parallel kernel/engine startup looks a bit more like pykernel

  • Loading branch information...
1 parent 8473145 commit f6e14610f758f306f7b6fafd04f362d74b0f0059 @minrk minrk committed Nov 4, 2010
Showing with 129 additions and 237 deletions.
  1. +20 −36 IPython/zmq/parallel/engine.py
  2. +109 −201 IPython/zmq/parallel/streamkernel.py
@@ -13,9 +13,12 @@
import zmq
from zmq.eventloop import ioloop, zmqstream
+from IPython.utils.traitlets import HasTraits
+from IPython.utils.localinterfaces import LOCALHOST
+
from streamsession import Message, StreamSession
from client import Client
-import streamkernel as kernel
+from streamkernel import Kernel, make_kernel
import heartmonitor
from entry_point import make_base_argument_parser, connect_logger, parse_url
# import taskthread
@@ -59,45 +62,25 @@ def complete_registration(self, msg):
if msg.content.status == 'ok':
self.session.username = str(msg.content.id)
queue_addr = msg.content.queue
- if queue_addr:
- queue = self.context.socket(zmq.PAIR)
- queue.setsockopt(zmq.IDENTITY, self.ident)
- queue.connect(str(queue_addr))
- self.queue = zmqstream.ZMQStream(queue, self.loop)
-
- control_addr = msg.content.control
- if control_addr:
- control = self.context.socket(zmq.PAIR)
- control.setsockopt(zmq.IDENTITY, self.ident)
- control.connect(str(control_addr))
- self.control = zmqstream.ZMQStream(control, self.loop)
-
+ shell_addrs = [str(queue_addr)]
+ control_addr = str(msg.content.control)
task_addr = msg.content.task
- print (task_addr)
if task_addr:
- # task as stream:
- task = self.context.socket(zmq.PAIR)
- task.setsockopt(zmq.IDENTITY, self.ident)
- task.connect(str(task_addr))
- self.task_stream = zmqstream.ZMQStream(task, self.loop)
- # TaskThread:
- # mon_addr = msg.content.monitor
- # task = taskthread.TaskThread(zmq.PAIR, zmq.PUB, self.ident)
- # task.connect_in(str(task_addr))
- # task.connect_out(str(mon_addr))
- # self.task_stream = taskthread.QueueStream(*task.queues)
- # task.start()
+ shell_addrs.append(str(task_addr))
- hbs = msg.content.heartbeat
- self.heart = heartmonitor.Heart(*map(str, hbs), heart_id=self.ident)
- self.heart.start()
+ hb_addrs = msg.content.heartbeat
# ioloop.DelayedCallback(self.heart.start, 1000, self.loop).start()
- # placeholder for now:
- pub = self.context.socket(zmq.PUB)
- pub = zmqstream.ZMQStream(pub, self.loop)
- # create and start the kernel
- self.kernel = kernel.Kernel(self.session, self.control, self.queue, pub, self.task_stream, self.client)
- self.kernel.start()
+
+ # placeholder for no, since pub isn't hooked up:
+ sub = self.context.socket(zmq.SUB)
+ sub = zmqstream.ZMQStream(sub, self.loop)
+ sub.on_recv(lambda *a: None)
+ port = sub.bind_to_random_port("tcp://%s"%LOCALHOST)
+ iopub_addr = "tcp://%s:%i"%(LOCALHOST,12345)
+
+ make_kernel(self.ident, control_addr, shell_addrs, iopub_addr, hb_addrs,
+ client_addr=None, loop=self.loop, context=self.context)
+
else:
# logger.error("Registration Failed: %s"%msg)
raise Exception("Registration Failed: %s"%msg)
@@ -114,6 +97,7 @@ def unregister(self):
def start(self):
print ("registering")
self.register()
+
def main():
Oops, something went wrong.

0 comments on commit f6e1461

Please sign in to comment.