Skip to content

Commit

Permalink
Renaming unpack_message to unserialize and updating docstrings.
Browse files Browse the repository at this point in the history
  • Loading branch information
ellisonbg committed Jul 21, 2011
1 parent 1bc3aac commit efa1f33
Show file tree
Hide file tree
Showing 6 changed files with 46 additions and 31 deletions.
14 changes: 7 additions & 7 deletions IPython/parallel/controller/hub.py
Expand Up @@ -485,7 +485,7 @@ def dispatch_query(self, msg):
return
client_id = idents[0]
try:
msg = self.session.unpack_message(msg, content=True)
msg = self.session.unserialize(msg, content=True)
except Exception:
content = error.wrap_exception()
self.log.error("Bad Query Message: %r"%msg, exc_info=True)
Expand Down Expand Up @@ -550,7 +550,7 @@ def save_queue_request(self, idents, msg):
return
queue_id, client_id = idents[:2]
try:
msg = self.session.unpack_message(msg)
msg = self.session.unserialize(msg)
except Exception:
self.log.error("queue::client %r sent invalid message to %r: %r"%(client_id, queue_id, msg), exc_info=True)
return
Expand Down Expand Up @@ -597,7 +597,7 @@ def save_queue_result(self, idents, msg):

client_id, queue_id = idents[:2]
try:
msg = self.session.unpack_message(msg)
msg = self.session.unserialize(msg)
except Exception:
self.log.error("queue::engine %r sent invalid message to %r: %r"%(
queue_id,client_id, msg), exc_info=True)
Expand Down Expand Up @@ -647,7 +647,7 @@ def save_task_request(self, idents, msg):
client_id = idents[0]

try:
msg = self.session.unpack_message(msg)
msg = self.session.unserialize(msg)
except Exception:
self.log.error("task::client %r sent invalid task message: %r"%(
client_id, msg), exc_info=True)
Expand Down Expand Up @@ -697,7 +697,7 @@ def save_task_result(self, idents, msg):
"""save the result of a completed task."""
client_id = idents[0]
try:
msg = self.session.unpack_message(msg)
msg = self.session.unserialize(msg)
except Exception:
self.log.error("task::invalid task result message send to %r: %r"%(
client_id, msg), exc_info=True)
Expand Down Expand Up @@ -744,7 +744,7 @@ def save_task_result(self, idents, msg):

def save_task_destination(self, idents, msg):
try:
msg = self.session.unpack_message(msg, content=True)
msg = self.session.unserialize(msg, content=True)
except Exception:
self.log.error("task::invalid task tracking message", exc_info=True)
return
Expand Down Expand Up @@ -781,7 +781,7 @@ def save_iopub_message(self, topics, msg):
"""save an iopub message into the db"""
# print (topics)
try:
msg = self.session.unpack_message(msg, content=True)
msg = self.session.unserialize(msg, content=True)
except Exception:
self.log.error("iopub::invalid IOPub message", exc_info=True)
return
Expand Down
6 changes: 3 additions & 3 deletions IPython/parallel/controller/scheduler.py
Expand Up @@ -211,7 +211,7 @@ def dispatch_notification(self, msg):
self.log.warn("task::Invalid Message: %r",msg)
return
try:
msg = self.session.unpack_message(msg)
msg = self.session.unserialize(msg)
except ValueError:
self.log.warn("task::Unauthorized message from: %r"%idents)
return
Expand Down Expand Up @@ -307,7 +307,7 @@ def dispatch_submission(self, raw_msg):
self.notifier_stream.flush()
try:
idents, msg = self.session.feed_identities(raw_msg, copy=False)
msg = self.session.unpack_message(msg, content=False, copy=False)
msg = self.session.unserialize(msg, content=False, copy=False)
except Exception:
self.log.error("task::Invaid task msg: %r"%raw_msg, exc_info=True)
return
Expand Down Expand Up @@ -515,7 +515,7 @@ def dispatch_result(self, raw_msg):
"""dispatch method for result replies"""
try:
idents,msg = self.session.feed_identities(raw_msg, copy=False)
msg = self.session.unpack_message(msg, content=False, copy=False)
msg = self.session.unserialize(msg, content=False, copy=False)
engine = idents[0]
try:
idx = self.targets.index(engine)
Expand Down
2 changes: 1 addition & 1 deletion IPython/parallel/engine/engine.py
Expand Up @@ -90,7 +90,7 @@ def complete_registration(self, msg):
loop = self.loop
identity = self.bident
idents,msg = self.session.feed_identities(msg)
msg = Message(self.session.unpack_message(msg))
msg = Message(self.session.unserialize(msg))

if msg.content.status == 'ok':
self.id = int(msg.content.id)
Expand Down
4 changes: 2 additions & 2 deletions IPython/parallel/engine/kernelstarter.py
Expand Up @@ -40,7 +40,7 @@ def start(self):
def dispatch_request(self, raw_msg):
idents, msg = self.session.feed_identities()
try:
msg = self.session.unpack_message(msg, content=False)
msg = self.session.unserialize(msg, content=False)
except:
print ("bad msg: %s"%msg)

Expand All @@ -54,7 +54,7 @@ def dispatch_request(self, raw_msg):
def dispatch_reply(self, raw_msg):
idents, msg = self.session.feed_identities()
try:
msg = self.session.unpack_message(msg, content=False)
msg = self.session.unserialize(msg, content=False)
except:
print ("bad msg: %s"%msg)

Expand Down
4 changes: 2 additions & 2 deletions IPython/parallel/engine/streamkernel.py
Expand Up @@ -195,7 +195,7 @@ def shutdown_request(self, stream, ident, parent):
def dispatch_control(self, msg):
idents,msg = self.session.feed_identities(msg, copy=False)
try:
msg = self.session.unpack_message(msg, content=True, copy=False)
msg = self.session.unserialize(msg, content=True, copy=False)
except:
self.log.error("Invalid Message", exc_info=True)
return
Expand Down Expand Up @@ -373,7 +373,7 @@ def dispatch_queue(self, stream, msg):
self.control_stream.flush()
idents,msg = self.session.feed_identities(msg, copy=False)
try:
msg = self.session.unpack_message(msg, content=True, copy=False)
msg = self.session.unserialize(msg, content=True, copy=False)
except:
self.log.error("Invalid Message", exc_info=True)
return
Expand Down
47 changes: 31 additions & 16 deletions IPython/zmq/session.py
Expand Up @@ -383,6 +383,10 @@ def sign(self, msg_list):
def serialize(self, msg, ident=None):
"""Serialize the message components to bytes.
This is roughly the inverse of unserialize. The serialize/unserialize
methods work with full message lists, whereas pack/unpack work with
the individual message parts in the message list.
Parameters
----------
msg : dict or Message
Expand Down Expand Up @@ -576,7 +580,7 @@ def recv(self, socket, mode=zmq.NOBLOCK, content=True, copy=True):
# invalid large messages can cause very expensive string comparisons
idents, msg_list = self.feed_identities(msg_list, copy)
try:
return idents, self.unpack_message(msg_list, content=content, copy=copy)
return idents, self.unserialize(msg_list, content=content, copy=copy)
except Exception as e:
print (idents, msg_list)
# TODO: handle it
Expand All @@ -598,10 +602,12 @@ def feed_identities(self, msg_list, copy=True):
Returns
-------
(idents,msg_list) : two lists
idents will always be a list of bytes - the indentity prefix
msg_list will be a list of bytes or Messages, unchanged from input
msg_list should be unpackable via self.unpack_message at this point.
(idents, msg_list) : two lists
idents will always be a list of bytes, each of which is a ZMQ
identity. msg_list will be a list of bytes or zmq.Messages of the
form [HMAC,p_header,p_parent,p_content,buffer1,buffer2,...] and
should be unpackable/unserializable via self.unserialize at this
point.
"""
if copy:
idx = msg_list.index(DELIM)
Expand All @@ -617,21 +623,30 @@ def feed_identities(self, msg_list, copy=True):
idents, msg_list = msg_list[:idx], msg_list[idx+1:]
return [m.bytes for m in idents], msg_list

def unpack_message(self, msg_list, content=True, copy=True):
"""Return a message object from the format
sent by self.send.
def unserialize(self, msg_list, content=True, copy=True):
"""Unserialize a msg_list to a nested message dict.
This is roughly the inverse of serialize. The serialize/unserialize
methods work with full message lists, whereas pack/unpack work with
the individual message parts in the message list.
Parameters:
-----------
msg_list : list of bytes or Message objects
The list of message parts of the form [HMAC,p_header,p_parent,
p_content,buffer1,buffer2,...].
content : bool (True)
whether to unpack the content dict (True),
or leave it serialized (False)
Whether to unpack the content dict (True), or leave it packed
(False).
copy : bool (True)
whether to return the bytes (True),
or the non-copying Message object in each place (False)
Whether to return the bytes (True), or the non-copying Message
object in each place (False).
Returns
-------
msg : dict
The nested message dict with top-level keys [header, parent_header,
content, buffers].
"""
minlen = 4
message = {}
Expand Down

0 comments on commit efa1f33

Please sign in to comment.