Skip to content
This repository

use pyzmq tools where appropriate #1030

Merged
merged 2 commits into from over 2 years ago

2 participants

Min RK Fernando Perez
Min RK
Owner

ZMQStream is the right object to use for event-driven handling of messages, but instead we had a duplication of half of it in KernelManager.

This removes most of the duplicate code, in favor of using ZMQStream.

also use the pyzmq install() function for using the pyzmq eventloop with tornado, instead of copying its contents into notebookapp.

Min RK use pyzmq tools where appropriate
ZMQStream is the right object to use for event-driven handling of messages, but instead we chose to rewrite half of it in KernelManager.

This removes most of the duplicate code, in favor of using ZMQStream.

also use the pyzmq install() function for using pyzmq with tornado, instead of manually pasting its contents in notebook app.
4b34862
Fernando Perez
Owner

Mmh, with this I can't start the notebook at all. Does it mean it changes the minimum tornado version required?

longs[junk]> ipnb
Traceback (most recent call last):
  File "/home/fperez/usr/bin/ipython", line 7, in <module>
    launch_new_instance()
  File "/home/fperez/usr/lib/python2.6/site-packages/IPython/frontend/terminal/ipapp.py", line 392, in launch_new_instance
    app.initialize()
  File "<string>", line 2, in initialize
  File "/home/fperez/usr/lib/python2.6/site-packages/IPython/config/application.py", line 84, in catch_config_error
    return method(app, *args, **kwargs)
  File "/home/fperez/usr/lib/python2.6/site-packages/IPython/frontend/terminal/ipapp.py", line 292, in initialize
    super(TerminalIPythonApp, self).initialize(argv)
  File "<string>", line 2, in initialize
  File "/home/fperez/usr/lib/python2.6/site-packages/IPython/config/application.py", line 84, in catch_config_error
    return method(app, *args, **kwargs)
  File "/home/fperez/usr/lib/python2.6/site-packages/IPython/core/application.py", line 325, in initialize
    self.parse_command_line(argv)
  File "/home/fperez/usr/lib/python2.6/site-packages/IPython/frontend/terminal/ipapp.py", line 287, in parse_command_line
    return super(TerminalIPythonApp, self).parse_command_line(argv)
  File "<string>", line 2, in parse_command_line
  File "/home/fperez/usr/lib/python2.6/site-packages/IPython/config/application.py", line 84, in catch_config_error
    return method(app, *args, **kwargs)
  File "/home/fperez/usr/lib/python2.6/site-packages/IPython/config/application.py", line 413, in parse_command_line
    return self.initialize_subcommand(subc, subargv)
  File "<string>", line 2, in initialize_subcommand
  File "/home/fperez/usr/lib/python2.6/site-packages/IPython/config/application.py", line 84, in catch_config_error
    return method(app, *args, **kwargs)
  File "/home/fperez/usr/lib/python2.6/site-packages/IPython/config/application.py", line 349, in initialize_subcommand
    subapp = import_item(subapp)
  File "/home/fperez/usr/lib/python2.6/site-packages/IPython/utils/importstring.py", line 40, in import_item
    module = __import__(package,fromlist=[obj])
  File "/home/fperez/usr/lib/python2.6/site-packages/IPython/frontend/html/notebook/notebookapp.py", line 35, in <module>
    ioloop.install()
AttributeError: 'module' object has no attribute 'install'
Min RK
Owner

Sorry, the install() function is new in pyzmq-2.1.7 (from May), so I presume yours is older than that. We currently depend on 2.1.4 (except on Windows), so it should now fallback if it isn't defined.

The parallel code on Windows already depends on fixes in pyzmq-2.1.7, so we could consider updating the baseline.

Fernando Perez
Owner

I'd suggest not forcing unix users to update to > 2.1.4 if we don't really need it (in my case I can do it easily, but someone could be in a situation where they have for some reason a harder time), but emitting a warning if you have to use the fallback. That way we prod people to upgrade to avoid the warning. What do you think of that approach?

Min RK
Owner

Since it's just this one function that we need, which is literally two lines of code, I'd leave out the warning. There are very few changes of any significance to people not using pyzmq directly, after 2.1.4. Most of the changes to pyzmq since then are user-level sugar, or related to libzmq-3.x compatibility.

Fernando Perez
Owner

Got it. In that case, merging now. I tested it and everything seemed OK, the code looks clean, and I'm glad to see this kind of cleanup. Thanks!

Fernando Perez fperez merged commit aa2337b into from November 22, 2011
Fernando Perez fperez closed this November 22, 2011
Fernando Perez fperez referenced this pull request from a commit January 10, 2012
Commit has since been removed from the repository and is no longer available.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Showing 2 unique commits by 1 author.

Nov 22, 2011
Min RK use pyzmq tools where appropriate
ZMQStream is the right object to use for event-driven handling of messages, but instead we chose to rewrite half of it in KernelManager.

This removes most of the duplicate code, in favor of using ZMQStream.

also use the pyzmq install() function for using pyzmq with tornado, instead of manually pasting its contents in notebook app.
4b34862
Min RK don't assume ioloop.install is defined (pyzmq < 2.1.7) aaf8939
This page is out of date. Refresh to see the latest.
9  IPython/frontend/html/notebook/notebookapp.py
@@ -32,8 +32,13 @@
32 32
 # Install the pyzmq ioloop. This has to be done before anything else from
33 33
 # tornado is imported.
34 34
 from zmq.eventloop import ioloop
35  
-import tornado.ioloop
36  
-tornado.ioloop.IOLoop = ioloop.IOLoop
  35
+# FIXME: ioloop.install is new in pyzmq-2.1.7, so remove this conditional
  36
+# when pyzmq dependency is updated beyond that.
  37
+if hasattr(ioloop, 'install'):
  38
+    ioloop.install()
  39
+else:
  40
+    import tornado.ioloop
  41
+    tornado.ioloop.IOLoop = ioloop.IOLoop
37 42
 
38 43
 from tornado import httpserver
39 44
 from tornado import web
163  IPython/zmq/kernelmanager.py
@@ -18,7 +18,6 @@
18 18
 # Standard library imports.
19 19
 import errno
20 20
 import json
21  
-from Queue import Queue, Empty
22 21
 from subprocess import Popen
23 22
 import os
24 23
 import signal
@@ -28,8 +27,7 @@
28 27
 
29 28
 # System library imports.
30 29
 import zmq
31  
-from zmq import POLLIN, POLLOUT, POLLERR
32  
-from zmq.eventloop import ioloop
  30
+from zmq.eventloop import ioloop, zmqstream
33 31
 
34 32
 # Local imports.
35 33
 from IPython.config.loader import Config
@@ -88,7 +86,7 @@ class ZMQSocketChannel(Thread):
88 86
     session = None
89 87
     socket = None
90 88
     ioloop = None
91  
-    iostate = None
  89
+    stream = None
92 90
     _address = None
93 91
 
94 92
     def __init__(self, context, session, address):
@@ -144,37 +142,28 @@ def address(self):
144 142
         """
145 143
         return self._address
146 144
 
147  
-    def add_io_state(self, state):
148  
-        """Add IO state to the eventloop.
149  
-
  145
+    def _queue_send(self, msg):
  146
+        """Queue a message to be sent from the IOLoop's thread.
  147
+        
150 148
         Parameters
151 149
         ----------
152  
-        state : zmq.POLLIN|zmq.POLLOUT|zmq.POLLERR
153  
-            The IO state flag to set.
154  
-
155  
-        This is thread safe as it uses the thread safe IOLoop.add_callback.
  150
+        msg : message to send
  151
+        
  152
+        This is threadsafe, as it uses IOLoop.add_callback to give the loop's
  153
+        thread control of the action.
156 154
         """
157  
-        def add_io_state_callback():
158  
-            if not self.iostate & state:
159  
-                self.iostate = self.iostate | state
160  
-                self.ioloop.update_handler(self.socket, self.iostate)
161  
-        self.ioloop.add_callback(add_io_state_callback)
162  
-
163  
-    def drop_io_state(self, state):
164  
-        """Drop IO state from the eventloop.
165  
-
166  
-        Parameters
167  
-        ----------
168  
-        state : zmq.POLLIN|zmq.POLLOUT|zmq.POLLERR
169  
-            The IO state flag to set.
  155
+        def thread_send():
  156
+            self.session.send(self.stream, msg)
  157
+        self.ioloop.add_callback(thread_send)
170 158
 
171  
-        This is thread safe as it uses the thread safe IOLoop.add_callback.
  159
+    def _handle_recv(self, msg):
  160
+        """callback for stream.on_recv
  161
+        
  162
+        unpacks message, and calls handlers with it.
172 163
         """
173  
-        def drop_io_state_callback():
174  
-            if self.iostate & state:
175  
-                self.iostate = self.iostate & (~state)
176  
-                self.ioloop.update_handler(self.socket, self.iostate)
177  
-        self.ioloop.add_callback(drop_io_state_callback)
  164
+        ident,smsg = self.session.feed_identities(msg)
  165
+        self.call_handlers(self.session.unserialize(smsg))
  166
+    
178 167
 
179 168
 
180 169
 class ShellSocketChannel(ZMQSocketChannel):
@@ -187,7 +176,6 @@ class ShellSocketChannel(ZMQSocketChannel):
187 176
 
188 177
     def __init__(self, context, session, address):
189 178
         super(ShellSocketChannel, self).__init__(context, session, address)
190  
-        self.command_queue = Queue()
191 179
         self.ioloop = ioloop.IOLoop()
192 180
 
193 181
     def run(self):
@@ -195,9 +183,8 @@ def run(self):
195 183
         self.socket = self.context.socket(zmq.DEALER)
196 184
         self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
197 185
         self.socket.connect('tcp://%s:%i' % self.address)
198  
-        self.iostate = POLLERR|POLLIN
199  
-        self.ioloop.add_handler(self.socket, self._handle_events,
200  
-                                self.iostate)
  186
+        self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
  187
+        self.stream.on_recv(self._handle_recv)
201 188
         self._run_loop()
202 189
 
203 190
     def stop(self):
@@ -268,7 +255,7 @@ def execute(self, code, silent=False,
268 255
                        allow_stdin=allow_stdin,
269 256
                        )
270 257
         msg = self.session.msg('execute_request', content)
271  
-        self._queue_request(msg)
  258
+        self._queue_send(msg)
272 259
         return msg['header']['msg_id']
273 260
 
274 261
     def complete(self, text, line, cursor_pos, block=None):
@@ -293,7 +280,7 @@ def complete(self, text, line, cursor_pos, block=None):
293 280
         """
294 281
         content = dict(text=text, line=line, block=block, cursor_pos=cursor_pos)
295 282
         msg = self.session.msg('complete_request', content)
296  
-        self._queue_request(msg)
  283
+        self._queue_send(msg)
297 284
         return msg['header']['msg_id']
298 285
 
299 286
     def object_info(self, oname):
@@ -310,7 +297,7 @@ def object_info(self, oname):
310 297
         """
311 298
         content = dict(oname=oname)
312 299
         msg = self.session.msg('object_info_request', content)
313  
-        self._queue_request(msg)
  300
+        self._queue_send(msg)
314 301
         return msg['header']['msg_id']
315 302
 
316 303
     def history(self, raw=True, output=False, hist_access_type='range', **kwargs):
@@ -348,7 +335,7 @@ def history(self, raw=True, output=False, hist_access_type='range', **kwargs):
348 335
         content = dict(raw=raw, output=output, hist_access_type=hist_access_type,
349 336
                                                                     **kwargs)
350 337
         msg = self.session.msg('history_request', content)
351  
-        self._queue_request(msg)
  338
+        self._queue_send(msg)
352 339
         return msg['header']['msg_id']
353 340
 
354 341
     def shutdown(self, restart=False):
@@ -365,38 +352,9 @@ def shutdown(self, restart=False):
365 352
         # Send quit message to kernel. Once we implement kernel-side setattr,
366 353
         # this should probably be done that way, but for now this will do.
367 354
         msg = self.session.msg('shutdown_request', {'restart':restart})
368  
-        self._queue_request(msg)
  355
+        self._queue_send(msg)
369 356
         return msg['header']['msg_id']
370 357
 
371  
-    def _handle_events(self, socket, events):
372  
-        if events & POLLERR:
373  
-            self._handle_err()
374  
-        if events & POLLOUT:
375  
-            self._handle_send()
376  
-        if events & POLLIN:
377  
-            self._handle_recv()
378  
-
379  
-    def _handle_recv(self):
380  
-        ident,msg = self.session.recv(self.socket, 0)
381  
-        self.call_handlers(msg)
382  
-
383  
-    def _handle_send(self):
384  
-        try:
385  
-            msg = self.command_queue.get(False)
386  
-        except Empty:
387  
-            pass
388  
-        else:
389  
-            self.session.send(self.socket,msg)
390  
-        if self.command_queue.empty():
391  
-            self.drop_io_state(POLLOUT)
392  
-
393  
-    def _handle_err(self):
394  
-        # We don't want to let this go silently, so eventually we should log.
395  
-        raise zmq.ZMQError()
396  
-
397  
-    def _queue_request(self, msg):
398  
-        self.command_queue.put(msg)
399  
-        self.add_io_state(POLLOUT)
400 358
 
401 359
 
402 360
 class SubSocketChannel(ZMQSocketChannel):
@@ -413,9 +371,8 @@ def run(self):
413 371
         self.socket.setsockopt(zmq.SUBSCRIBE,b'')
414 372
         self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
415 373
         self.socket.connect('tcp://%s:%i' % self.address)
416  
-        self.iostate = POLLIN|POLLERR
417  
-        self.ioloop.add_handler(self.socket, self._handle_events,
418  
-                                self.iostate)
  374
+        self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
  375
+        self.stream.on_recv(self._handle_recv)
419 376
         self._run_loop()
420 377
 
421 378
     def stop(self):
@@ -456,33 +413,9 @@ def flush(self, timeout=1.0):
456 413
             while not self._flushed and time.time() < stop_time:
457 414
                 time.sleep(0.01)
458 415
 
459  
-    def _handle_events(self, socket, events):
460  
-        # Turn on and off POLLOUT depending on if we have made a request
461  
-        if events & POLLERR:
462  
-            self._handle_err()
463  
-        if events & POLLIN:
464  
-            self._handle_recv()
465  
-
466  
-    def _handle_err(self):
467  
-        # We don't want to let this go silently, so eventually we should log.
468  
-        raise zmq.ZMQError()
469  
-
470  
-    def _handle_recv(self):
471  
-        # Get all of the messages we can
472  
-        while True:
473  
-            try:
474  
-                ident,msg = self.session.recv(self.socket)
475  
-            except zmq.ZMQError:
476  
-                # Check the errno?
477  
-                # Will this trigger POLLERR?
478  
-                break
479  
-            else:
480  
-                if msg is None:
481  
-                    break
482  
-                self.call_handlers(msg)
483  
-
484 416
     def _flush(self):
485 417
         """Callback for :method:`self.flush`."""
  418
+        self.stream.flush()
486 419
         self._flushed = True
487 420
 
488 421
 
@@ -494,16 +427,14 @@ class StdInSocketChannel(ZMQSocketChannel):
494 427
     def __init__(self, context, session, address):
495 428
         super(StdInSocketChannel, self).__init__(context, session, address)
496 429
         self.ioloop = ioloop.IOLoop()
497  
-        self.msg_queue = Queue()
498 430
 
499 431
     def run(self):
500 432
         """The thread's main activity.  Call start() instead."""
501 433
         self.socket = self.context.socket(zmq.DEALER)
502 434
         self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
503 435
         self.socket.connect('tcp://%s:%i' % self.address)
504  
-        self.iostate = POLLERR|POLLIN
505  
-        self.ioloop.add_handler(self.socket, self._handle_events,
506  
-                                self.iostate)
  436
+        self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
  437
+        self.stream.on_recv(self._handle_recv)
507 438
         self._run_loop()
508 439
 
509 440
     def stop(self):
@@ -524,37 +455,7 @@ def input(self, string):
524 455
         """Send a string of raw input to the kernel."""
525 456
         content = dict(value=string)
526 457
         msg = self.session.msg('input_reply', content)
527  
-        self._queue_reply(msg)
528  
-
529  
-    def _handle_events(self, socket, events):
530  
-        if events & POLLERR:
531  
-            self._handle_err()
532  
-        if events & POLLOUT:
533  
-            self._handle_send()
534  
-        if events & POLLIN:
535  
-            self._handle_recv()
536  
-
537  
-    def _handle_recv(self):
538  
-        ident,msg = self.session.recv(self.socket, 0)
539  
-        self.call_handlers(msg)
540  
-
541  
-    def _handle_send(self):
542  
-        try:
543  
-            msg = self.msg_queue.get(False)
544  
-        except Empty:
545  
-            pass
546  
-        else:
547  
-            self.session.send(self.socket,msg)
548  
-        if self.msg_queue.empty():
549  
-            self.drop_io_state(POLLOUT)
550  
-
551  
-    def _handle_err(self):
552  
-        # We don't want to let this go silently, so eventually we should log.
553  
-        raise zmq.ZMQError()
554  
-
555  
-    def _queue_reply(self, msg):
556  
-        self.msg_queue.put(msg)
557  
-        self.add_io_state(POLLOUT)
  458
+        self._queue_send(msg)
558 459
 
559 460
 
560 461
 class HBSocketChannel(ZMQSocketChannel):
Commit_comment_tip

Tip: You can add notes to lines in a file. Hover to the left of a line to make a note

Something went wrong with that request. Please try again.