Skip to content


Subversion checkout URL

You can clone with
Download ZIP


Zmqstream #31

9 commits merged into from

2 participants


ZMQStream now provides complete socket interface, excluding recv methods and direct socket properties.

all send_*
all bind*/connect
all get/setsockopt

socket_type (should this be included?)

Also included: socket.close() inside stream.close() is now a DelayedCallback, preventing it from blocking iostream's logging, at least for the error that caused stream.close() to be called.


This looks great, I would no ahead and merge.

This issue was closed.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
This page is out of date. Refresh to see the latest.
Showing with 51 additions and 8 deletions.
  1. +51 −8 zmq/eventloop/
59 zmq/eventloop/
@@ -16,8 +16,10 @@
"""A utility class to send to and recv from a non-blocking socket."""
import logging
-import time
import zmq
+from zmq.core.socket import json, pickle
import ioloop
from queue import Queue
@@ -29,18 +31,18 @@ class ZMQStream(object):
For use with zmq.eventloop.ioloop
- There are 3 main methods:
+ There are 4 main methods:
register a callback to be run every time the socket has something to receive
register a callback to be run every time you call send
register a callback to be run every time there is an error
- send(msg, callback=None)
+ send(self, msg, flags=0, copy=False, callback=None):
perform a send that will trigger the callback
if callback is passed, on_send is also called
- There is also send_multipart()
+ There are also send_multipart(), send_json, send_pyobj
Two other methods for deactivating the callbacks:
@@ -51,6 +53,13 @@ class ZMQStream(object):
turn off the error callback
All of which simply call on_<evt>(None).
+ The entire socket interface, excluding direct recv methods, is also
+ provided, primarily through direct-linking the methods.
+ e.g.
+ >>> stream.bind is stream.socket.bind
+ True
@@ -73,9 +82,13 @@ def __init__(self, socket, io_loop=None):
# shortcircuit some socket methods
self.bind = self.socket.bind
+ self.bind_to_random_port = self.socket.bind_to_random_port
self.connect = self.socket.connect
self.setsockopt = self.socket.setsockopt
self.getsockopt = self.socket.getsockopt
+ self.setsockopt_unicode = self.socket.setsockopt_unicode
+ self.getsockopt_unicode = self.socket.getsockopt_unicode
def stop_on_recv(self):
"""Disable callback and automatic receiving."""
@@ -184,15 +197,42 @@ def send_multipart(self, msg, flags=0, copy=False, callback=None):
# noop callback
self.on_send(lambda *args: None)
+ def send_unicode(self, u, flags=0, encoding='utf-8', callback=None):
+ """Send a unicode message with an encoding.
+ See zmq.socket.send_unicode for details.
+ """
+ if not isinstance(u, basestring):
+ raise TypeError("unicode/str objects only")
+ return self.send(u.encode(encoding), flags=flags, callback=callback)
+ def send_json(self, obj, flags=0, callback=None):
+ """Send json-serialized version of an object.
+ See zmq.socket.send_json for details.
+ """
+ if json is None:
+ raise ImportError('cjson, json or simplejson library is required.')
+ else:
+ msg = json.dumps(obj)
+ return self.send(msg, flags=flags, callback=callback)
+ def send_pyobj(self, obj, flags=0, protocol=-1, callback=None):
+ """Send a Python object as a message using pickle to serialize.
+ See zmq.socket.send_json for details.
+ """
+ msg = pickle.dumps(obj, protocol)
+ return self.send(msg, flags, callback=callback)
def set_close_callback(self, callback):
"""Call the given callback when the stream is closed."""
self._close_callback = callback
def close(self):
"""Close this stream."""
if self.socket is not None:
- self.socket.close()
+ dc = ioloop.DelayedCallback(self.socket.close, 100, self.io_loop)
+ dc.start()
self.socket = None
if self._close_callback:
@@ -271,10 +311,13 @@ def _handle_send(self):
msg = self._send_queue.get()
- queue = self.socket.send_multipart(*msg)
+ try:
+ status = self.socket.send_multipart(*msg)
+ except zmq.ZMQError, e:
+ status = e
if self._send_callback:
callback = self._send_callback
- self._run_callback(callback, msg, queue)
+ self._run_callback(callback, msg, status)
# unregister from event loop:
if not self.sending():
Something went wrong with that request. Please try again.