Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP

Loading…

always use Session.send #250

Merged
2 commits merged into from

2 participants

@minrk
Owner

This commit removes all explicit calls to socket.send/socket.recv, and replaces them with session.send/recv. This allows for later expansion of the send/recv protocols, and eases eventual migration to StreamSession object developed in newparallel.

Some adjustments were made to the Session to accomodate this:

  • Session.send can take a Message or dict to allow for multiple sending of the same message
  • Session.send returns a dict instead of wrapping it in a Message
  • Session.recv always returns a tuple of length two: (ident,msg), where ident is None if there was no identity prefix, and msg is None if EAGAIN was raised (indicating no message).

Everything else should function the same.

@minrk minrk all sends/recvs now via Session.send/recv.
This allows changes to protocols/patterns
to happen in a single location.
8c0b397
@ellisonbg
Owner

session.py

  • In send, rename msg_type to msg_or_type to emphasize it can be either.
  • Add docstrings to the send, recv and msg methods, that clarify what the methods take and return.
  • Remove the comment on L98.
  • Are you using the Message class in parallel? Should be get away from it? What do your recommend? I like that recv returns a raw dict. I think session should deal primarily with dicts at its core.

displayhook.py

  • Where was the parent being set before near L17? Was this a bug?

Other than that, this looks good. Why don't you fix these things and go
ahead and merge.

@minrk
Owner

session.py

  • docstrings updated, comment removed.
  • No, I approximately never use the Message class in parallel. I use it once or twice in the Engine (not kernel) and client for attr-access, but nowhere internal.

displayhook.py

The parent setting isn't different at all. Note that L18 is unchanged, and L17 doesn't close its parentheses.

@damianavila damianavila referenced this pull request from a commit
Commit has since been removed from the repository and is no longer available.
This issue was closed.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Commits on Jan 22, 2011
  1. @minrk

    all sends/recvs now via Session.send/recv.

    minrk authored
    This allows changes to protocols/patterns
    to happen in a single location.
  2. @minrk

    add docstrings, per review.

    minrk authored
This page is out of date. Refresh to see the latest.
View
3  IPython/zmq/completer.py
@@ -54,7 +54,8 @@ def request_completion(self, text):
# Give the kernel up to 0.5s to respond
for i in range(5):
- rep = self.session.recv(self.socket)
+ ident,rep = self.session.recv(self.socket)
+ rep = Message(rep)
if rep is not None and rep.msg_type == 'complete_reply':
matches = rep.content.matches
break
View
3  IPython/zmq/displayhook.py
@@ -14,9 +14,8 @@ def __call__(self, obj):
return
__builtin__._ = obj
- msg = self.session.msg(u'pyout', {u'data':repr(obj)},
+ msg = self.session.send(self.pub_socket, u'pyout', {u'data':repr(obj)},
parent=self.parent_header)
- self.pub_socket.send_json(msg)
def set_parent(self, parent):
self.parent_header = extract_header(parent)
View
13 IPython/zmq/frontend.py
@@ -92,10 +92,10 @@ def handle_output(self, omsg):
def recv_output(self):
while True:
- omsg = self.session.recv(self.sub_socket)
- if omsg is None:
+ ident,msg = self.session.recv(self.sub_socket)
+ if msg is None:
break
- self.handle_output(omsg)
+ self.handle_output(Message(msg))
def handle_reply(self, rep):
# Handle any side effects on output channels
@@ -114,9 +114,10 @@ def handle_reply(self, rep):
print >> sys.stderr, ab
def recv_reply(self):
- rep = self.session.recv(self.request_socket)
- self.handle_reply(rep)
- return rep
+ ident,rep = self.session.recv(self.request_socket)
+ mrep = Message(rep)
+ self.handle_reply(mrep)
+ return mrep
def runcode(self, code):
# We can't pickle code objects, so fetch the actual source
View
3  IPython/zmq/iostream.py
@@ -37,10 +37,9 @@ def flush(self):
data = self._buffer.getvalue()
if data:
content = {u'name':self.name, u'data':data}
- msg = self.session.msg(u'stream', content=content,
+ msg = self.session.send(self.pub_socket, u'stream', content=content,
parent=self.parent_header)
io.raw_print(msg)
- self.pub_socket.send_json(msg)
self._buffer.close()
self._new_buffer()
View
53 IPython/zmq/ipkernel.py
@@ -106,17 +106,14 @@ def __init__(self, **kwargs):
def do_one_iteration(self):
"""Do one iteration of the kernel's evaluation loop.
"""
- try:
- ident = self.reply_socket.recv(zmq.NOBLOCK)
- except zmq.ZMQError, e:
- if e.errno == zmq.EAGAIN:
- return
- else:
- raise
+ ident,msg = self.session.recv(self.reply_socket, zmq.NOBLOCK)
+ if msg is None:
+ return
+
# This assert will raise in versions of zeromq 2.0.7 and lesser.
# We now require 2.0.8 or above, so we can uncomment for safety.
- assert self.reply_socket.rcvmore(), "Missing message part."
- msg = self.reply_socket.recv_json()
+ # print(ident,msg, file=sys.__stdout__)
+ assert ident is not None, "Missing message part."
# Print some info about this message and leave a '--->' marker, so it's
# easier to trace visually the message chain when debugging. Each
@@ -169,17 +166,15 @@ def record_ports(self, xrep_port, pub_port, req_port, hb_port):
def _publish_pyin(self, code, parent):
"""Publish the code request on the pyin stream."""
- pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
- self.pub_socket.send_json(pyin_msg)
+ pyin_msg = self.session.send(self.pub_socket, u'pyin',{u'code':code}, parent=parent)
def execute_request(self, ident, parent):
- status_msg = self.session.msg(
+ status_msg = self.session.send(self.pub_socket,
u'status',
{u'execution_state':u'busy'},
parent=parent
)
- self.pub_socket.send_json(status_msg)
try:
content = parent[u'content']
@@ -264,7 +259,7 @@ def execute_request(self, ident, parent):
shell.payload_manager.clear_payload()
# Send the reply.
- reply_msg = self.session.msg(u'execute_reply', reply_content, parent)
+ reply_msg = self.session.send(self.reply_socket, u'execute_reply', reply_content, parent, ident=ident)
io.raw_print(reply_msg)
# Flush output before sending the reply.
@@ -276,17 +271,14 @@ def execute_request(self, ident, parent):
if self._execute_sleep:
time.sleep(self._execute_sleep)
- self.reply_socket.send(ident, zmq.SNDMORE)
- self.reply_socket.send_json(reply_msg)
if reply_msg['content']['status'] == u'error':
self._abort_queue()
- status_msg = self.session.msg(
+ status_msg = self.session.send(self.pub_socket,
u'status',
{u'execution_state':u'idle'},
parent=parent
)
- self.pub_socket.send_json(status_msg)
def complete_request(self, ident, parent):
txt, matches = self._complete(parent)
@@ -335,22 +327,18 @@ def shutdown_request(self, ident, parent):
def _abort_queue(self):
while True:
- try:
- ident = self.reply_socket.recv(zmq.NOBLOCK)
- except zmq.ZMQError, e:
- if e.errno == zmq.EAGAIN:
- break
+ ident,msg = self.session.recv(self.reply_socket, zmq.NOBLOCK)
+ if msg is None:
+ break
else:
- assert self.reply_socket.rcvmore(), \
+ assert ident is not None, \
"Unexpected missing message part."
- msg = self.reply_socket.recv_json()
io.raw_print("Aborting:\n", Message(msg))
msg_type = msg['msg_type']
reply_type = msg_type.split('_')[0] + '_reply'
- reply_msg = self.session.msg(reply_type, {'status' : 'aborted'}, msg)
+ reply_msg = self.session.send(self.reply_socket, reply_type,
+ {'status' : 'aborted'}, msg, ident=ident)
io.raw_print(reply_msg)
- self.reply_socket.send(ident,zmq.SNDMORE)
- self.reply_socket.send_json(reply_msg)
# We need to wait a bit for requests to come in. This can probably
# be set shorter for true asynchronous clients.
time.sleep(0.1)
@@ -362,11 +350,10 @@ def _raw_input(self, prompt, ident, parent):
# Send the input request.
content = dict(prompt=prompt)
- msg = self.session.msg(u'input_request', content, parent)
- self.req_socket.send_json(msg)
+ msg = self.session.send(self.req_socket, u'input_request', content, parent)
# Await a response.
- reply = self.req_socket.recv_json()
+ ident, reply = self.session.recv(self.req_socket, 0)
try:
value = reply['content']['value']
except:
@@ -423,8 +410,8 @@ def _at_shutdown(self):
"""
# io.rprint("Kernel at_shutdown") # dbg
if self._shutdown_message is not None:
- self.reply_socket.send_json(self._shutdown_message)
- self.pub_socket.send_json(self._shutdown_message)
+ self.session.send(self.reply_socket, self._shutdown_message)
+ self.session.send(self.pub_socket, self._shutdown_message)
io.raw_print(self._shutdown_message)
# A very short sleep to give zmq time to flush its message buffers
# before Python truly shuts down.
View
18 IPython/zmq/kernelmanager.py
@@ -33,7 +33,7 @@
from IPython.utils import io
from IPython.utils.localinterfaces import LOCALHOST, LOCAL_IPS
from IPython.utils.traitlets import HasTraits, Any, Instance, Type, TCPAddress
-from session import Session
+from session import Session, Message
#-----------------------------------------------------------------------------
# Constants and exceptions
@@ -330,7 +330,7 @@ def _handle_events(self, socket, events):
self._handle_recv()
def _handle_recv(self):
- msg = self.socket.recv_json()
+ ident,msg = self.session.recv(self.socket, 0)
self.call_handlers(msg)
def _handle_send(self):
@@ -339,7 +339,7 @@ def _handle_send(self):
except Empty:
pass
else:
- self.socket.send_json(msg)
+ self.session.send(self.socket,msg)
if self.command_queue.empty():
self.drop_io_state(POLLOUT)
@@ -424,12 +424,14 @@ def _handle_recv(self):
# Get all of the messages we can
while True:
try:
- msg = self.socket.recv_json(zmq.NOBLOCK)
+ ident,msg = self.session.recv(self.socket)
except zmq.ZMQError:
# Check the errno?
# Will this trigger POLLERR?
break
else:
+ if msg is None:
+ break
self.call_handlers(msg)
def _flush(self):
@@ -486,7 +488,7 @@ def _handle_events(self, socket, events):
self._handle_recv()
def _handle_recv(self):
- msg = self.socket.recv_json()
+ ident,msg = self.session.recv(self.socket, 0)
self.call_handlers(msg)
def _handle_send(self):
@@ -495,7 +497,7 @@ def _handle_send(self):
except Empty:
pass
else:
- self.socket.send_json(msg)
+ self.session.send(self.socket,msg)
if self.msg_queue.empty():
self.drop_io_state(POLLOUT)
@@ -546,7 +548,7 @@ def run(self):
request_time = time.time()
try:
#io.rprint('Ping from HB channel') # dbg
- self.socket.send_json('ping')
+ self.socket.send(b'ping')
except zmq.ZMQError, e:
#io.rprint('*** HB Error:', e) # dbg
if e.errno == zmq.EFSM:
@@ -558,7 +560,7 @@ def run(self):
else:
while True:
try:
- self.socket.recv_json(zmq.NOBLOCK)
+ self.socket.recv(zmq.NOBLOCK)
except zmq.ZMQError, e:
#io.rprint('*** HB Error 2:', e) # dbg
if e.errno == zmq.EAGAIN:
View
29 IPython/zmq/pykernel.py
@@ -69,9 +69,8 @@ def start(self):
""" Start the kernel main loop.
"""
while True:
- ident = self.reply_socket.recv()
- assert self.reply_socket.rcvmore(), "Missing message part."
- msg = self.reply_socket.recv_json()
+ ident,msg = self.session.recv(self.reply_socket,0)
+ assert ident is not None, "Missing message part."
omsg = Message(msg)
print>>sys.__stdout__
print>>sys.__stdout__, omsg
@@ -105,8 +104,7 @@ def execute_request(self, ident, parent):
print>>sys.__stderr__, "Got bad msg: "
print>>sys.__stderr__, Message(parent)
return
- pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
- self.pub_socket.send_json(pyin_msg)
+ pyin_msg = self.session.send(self.pub_socket, u'pyin',{u'code':code}, parent=parent)
try:
comp_code = self.compiler(code, '<zmq-kernel>')
@@ -131,8 +129,7 @@ def execute_request(self, ident, parent):
u'ename' : unicode(etype.__name__),
u'evalue' : unicode(evalue)
}
- exc_msg = self.session.msg(u'pyerr', exc_content, parent)
- self.pub_socket.send_json(exc_msg)
+ exc_msg = self.session.send(self.pub_socket, u'pyerr', exc_content, parent)
reply_content = exc_content
else:
reply_content = { 'status' : 'ok', 'payload' : {} }
@@ -142,10 +139,8 @@ def execute_request(self, ident, parent):
sys.stdout.flush()
# Send the reply.
- reply_msg = self.session.msg(u'execute_reply', reply_content, parent)
+ reply_msg = self.session.send(self.reply_socket, u'execute_reply', reply_content, parent, ident=ident)
print>>sys.__stdout__, Message(reply_msg)
- self.reply_socket.send(ident, zmq.SNDMORE)
- self.reply_socket.send_json(reply_msg)
if reply_msg['content']['status'] == u'error':
self._abort_queue()
@@ -180,21 +175,18 @@ def shutdown_request(self, ident, parent):
def _abort_queue(self):
while True:
try:
- ident = self.reply_socket.recv(zmq.NOBLOCK)
+ ident,msg = self.session.recv(self.reply_socket, zmq.NOBLOCK)
except zmq.ZMQError, e:
if e.errno == zmq.EAGAIN:
break
else:
- assert self.reply_socket.rcvmore(), "Missing message part."
- msg = self.reply_socket.recv_json()
+ assert ident is not None, "Missing message part."
print>>sys.__stdout__, "Aborting:"
print>>sys.__stdout__, Message(msg)
msg_type = msg['msg_type']
reply_type = msg_type.split('_')[0] + '_reply'
- reply_msg = self.session.msg(reply_type, {'status':'aborted'}, msg)
+ reply_msg = self.session.send(self.reply_socket, reply_type, {'status':'aborted'}, msg, ident=ident)
print>>sys.__stdout__, Message(reply_msg)
- self.reply_socket.send(ident,zmq.SNDMORE)
- self.reply_socket.send_json(reply_msg)
# We need to wait a bit for requests to come in. This can probably
# be set shorter for true asynchronous clients.
time.sleep(0.1)
@@ -206,11 +198,10 @@ def _raw_input(self, prompt, ident, parent):
# Send the input request.
content = dict(prompt=prompt)
- msg = self.session.msg(u'input_request', content, parent)
- self.req_socket.send_json(msg)
+ msg = self.session.send(self.req_socket, u'input_request', content, parent)
# Await a response.
- reply = self.req_socket.recv_json()
+ ident,reply = self.session.recv(self.req_socket, 0)
try:
value = reply['content']['value']
except:
View
78 IPython/zmq/session.py
@@ -4,6 +4,8 @@
import zmq
+from zmq.utils import jsonapi as json
+
class Message(object):
"""A simple message object that maps dict keys to attributes.
@@ -78,6 +80,10 @@ def msg_header(self):
return h
def msg(self, msg_type, content=None, parent=None):
+ """Construct a standard-form message, with a given type, content, and parent.
+
+ NOT to be called directly.
+ """
msg = {}
msg['header'] = self.msg_header()
msg['parent_header'] = {} if parent is None else extract_header(parent)
@@ -85,25 +91,81 @@ def msg(self, msg_type, content=None, parent=None):
msg['content'] = {} if content is None else content
return msg
- def send(self, socket, msg_type, content=None, parent=None, ident=None):
- msg = self.msg(msg_type, content, parent)
+ def send(self, socket, msg_or_type, content=None, parent=None, ident=None):
+ """send a message via a socket, using a uniform message pattern.
+
+ Parameters
+ ----------
+ socket : zmq.Socket
+ The socket on which to send.
+ msg_or_type : Message/dict or str
+ if str : then a new message will be constructed from content,parent
+ if Message/dict : then content and parent are ignored, and the message
+ is sent. This is only for use when sending a Message for a second time.
+ content : dict, optional
+ The contents of the message
+ parent : dict, optional
+ The parent header, or parent message, of this message
+ ident : bytes, optional
+ The zmq.IDENTITY prefix of the destination.
+ Only for use on certain socket types.
+
+ Returns
+ -------
+ msg : dict
+ The message, as constructed by self.msg(msg_type,content,parent)
+ """
+ if isinstance(msg_type, (Message, dict)):
+ msg = dict(msg_type)
+ else:
+ msg = self.msg(msg_type, content, parent)
if ident is not None:
socket.send(ident, zmq.SNDMORE)
socket.send_json(msg)
- omsg = Message(msg)
- return omsg
-
+ return msg
+
def recv(self, socket, mode=zmq.NOBLOCK):
+ """recv a message on a socket.
+
+ Receive an optionally identity-prefixed message, as sent via session.send().
+
+ Parameters
+ ----------
+
+ socket : zmq.Socket
+ The socket on which to recv a message.
+ mode : int, optional
+ the mode flag passed to socket.recv
+ default: zmq.NOBLOCK
+
+ Returns
+ -------
+ (ident,msg) : tuple
+ always length 2. If no message received, then return is (None,None)
+ ident : bytes or None
+ the identity prefix is there was one, None otherwise.
+ msg : dict or None
+ The actual message. If mode==zmq.NOBLOCK and no message was waiting,
+ it will be None.
+ """
try:
- msg = socket.recv_json(mode)
+ msg = socket.recv_multipart(mode)
except zmq.ZMQError, e:
if e.errno == zmq.EAGAIN:
# We can convert EAGAIN to None as we know in this case
# recv_json won't return None.
- return None
+ return None,None
else:
raise
- return Message(msg)
+ if len(msg) == 1:
+ ident=None
+ msg = msg[0]
+ elif len(msg) == 2:
+ ident, msg = msg
+ else:
+ raise ValueError("Got message with length > 2, which is invalid")
+
+ return ident, json.loads(msg)
def test_msg2obj():
am = dict(x=1)
View
5 IPython/zmq/zmqshell.py
@@ -71,7 +71,7 @@ def write_result_repr(self, result_repr, extra_formats):
def finish_displayhook(self):
"""Finish up all displayhook activities."""
- self.pub_socket.send_json(self.msg)
+ self.session.send(self.pub_socket, self.msg)
self.msg = None
@@ -126,10 +126,9 @@ def _showtraceback(self, etype, evalue, stb):
}
dh = self.displayhook
- exc_msg = dh.session.msg(u'pyerr', exc_content, dh.parent_header)
# Send exception info over pub socket for other clients than the caller
# to pick up
- dh.pub_socket.send_json(exc_msg)
+ exc_msg = dh.session.send(dh.pub_socket, u'pyerr', exc_content, dh.parent_header)
# FIXME - Hack: store exception info in shell object. Right now, the
# caller is reading this info after the fact, we need to fix this logic
Something went wrong with that request. Please try again.