Permalink
Browse files

use new stream.flush()

  • Loading branch information...
1 parent c369179 commit 48fd5d13b5807ae30f9ad85c2ad153dac1117a58 @minrk minrk committed Oct 14, 2010
Showing with 12 additions and 12 deletions.
  1. +12 −12 IPython/zmq/parallel/streamkernel.py
@@ -138,7 +138,7 @@ def __init__(self, session, control_stream, reply_stream, pub_stream,
task_stream=None, client=None):
self.session = session
self.control_stream = control_stream
- self.control_socket = control_stream.socket
+ # self.control_socket = control_stream.socket
self.reply_stream = reply_stream
self.task_stream = task_stream
self.pub_stream = pub_stream
@@ -235,16 +235,16 @@ def dispatch_control(self, msg):
else:
handler(self.control_stream, idents, msg)
- def flush_control(self):
- while any(zmq.select([self.control_socket],[],[],1e-4)):
- try:
- msg = self.control_socket.recv_multipart(zmq.NOBLOCK, copy=False)
- except zmq.ZMQError, e:
- if e.errno != zmq.EAGAIN:
- raise e
- return
- else:
- self.dispatch_control(msg)
+ # def flush_control(self):
+ # while any(zmq.select([self.control_socket],[],[],1e-4)):
+ # try:
+ # msg = self.control_socket.recv_multipart(zmq.NOBLOCK, copy=False)
+ # except zmq.ZMQError, e:
+ # if e.errno != zmq.EAGAIN:
+ # raise e
+ # return
+ # else:
+ # self.dispatch_control(msg)
#-------------------- queue helpers ------------------------------
@@ -404,7 +404,7 @@ def apply_request(self, stream, ident, parent):
self.abort_queues()
def dispatch_queue(self, stream, msg):
- self.flush_control()
+ self.control_stream.flush()
idents,msg = self.session.feed_identities(msg, copy=False)
msg = self.session.unpack_message(msg, content=True, copy=False)

0 comments on commit 48fd5d1

Please sign in to comment.