Skip to content

Commit

Permalink
move apply serialization into zmq.serialize
Browse files Browse the repository at this point in the history
  • Loading branch information
minrk committed May 13, 2012
1 parent d362026 commit ed57daf
Show file tree
Hide file tree
Showing 3 changed files with 187 additions and 135 deletions.
137 changes: 3 additions & 134 deletions IPython/parallel/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@
from IPython.utils.pickleutil import can, uncan, canSequence, uncanSequence
from IPython.utils.newserialized import serialize, unserialize
from IPython.zmq.log import EnginePUBHandler
from IPython.zmq.serialize import (
unserialize_object, serialize_object, pack_apply_message, unpack_apply_message
)

if py3compat.PY3:
buffer = memoryview
Expand Down Expand Up @@ -222,140 +225,6 @@ def disambiguate_url(url, location=None):

return "%s://%s:%s"%(proto,ip,port)

def serialize_object(obj, threshold=64e-6):
"""Serialize an object into a list of sendable buffers.
Parameters
----------
obj : object
The object to be serialized
threshold : float
The threshold for not double-pickling the content.
Returns
-------
('pmd', [bufs]) :
where pmd is the pickled metadata wrapper,
bufs is a list of data buffers
"""
databuffers = []
if isinstance(obj, (list, tuple)):
clist = canSequence(obj)
slist = map(serialize, clist)
for s in slist:
if s.typeDescriptor in ('buffer', 'ndarray') or s.getDataSize() > threshold:
databuffers.append(s.getData())
s.data = None
return pickle.dumps(slist,-1), databuffers
elif isinstance(obj, dict):
sobj = {}
for k in sorted(obj.iterkeys()):
s = serialize(can(obj[k]))
if s.typeDescriptor in ('buffer', 'ndarray') or s.getDataSize() > threshold:
databuffers.append(s.getData())
s.data = None
sobj[k] = s
return pickle.dumps(sobj,-1),databuffers
else:
s = serialize(can(obj))
if s.typeDescriptor in ('buffer', 'ndarray') or s.getDataSize() > threshold:
databuffers.append(s.getData())
s.data = None
return pickle.dumps(s,-1),databuffers


def unserialize_object(bufs):
"""reconstruct an object serialized by serialize_object from data buffers."""
bufs = list(bufs)
sobj = pickle.loads(bufs.pop(0))
if isinstance(sobj, (list, tuple)):
for s in sobj:
if s.data is None:
s.data = bufs.pop(0)
return uncanSequence(map(unserialize, sobj)), bufs
elif isinstance(sobj, dict):
newobj = {}
for k in sorted(sobj.iterkeys()):
s = sobj[k]
if s.data is None:
s.data = bufs.pop(0)
newobj[k] = uncan(unserialize(s))
return newobj, bufs
else:
if sobj.data is None:
sobj.data = bufs.pop(0)
return uncan(unserialize(sobj)), bufs

def pack_apply_message(f, args, kwargs, threshold=64e-6):
"""pack up a function, args, and kwargs to be sent over the wire
as a series of buffers. Any object whose data is larger than `threshold`
will not have their data copied (currently only numpy arrays support zero-copy)"""
msg = [pickle.dumps(can(f),-1)]
databuffers = [] # for large objects
sargs, bufs = serialize_object(args,threshold)
msg.append(sargs)
databuffers.extend(bufs)
skwargs, bufs = serialize_object(kwargs,threshold)
msg.append(skwargs)
databuffers.extend(bufs)
msg.extend(databuffers)
return msg

def unpack_apply_message(bufs, g=None, copy=True):
"""unpack f,args,kwargs from buffers packed by pack_apply_message()
Returns: original f,args,kwargs"""
bufs = list(bufs) # allow us to pop
assert len(bufs) >= 3, "not enough buffers!"
if not copy:
for i in range(3):
bufs[i] = bufs[i].bytes
cf = pickle.loads(bufs.pop(0))
sargs = list(pickle.loads(bufs.pop(0)))
skwargs = dict(pickle.loads(bufs.pop(0)))
# print sargs, skwargs
f = uncan(cf, g)
for sa in sargs:
if sa.data is None:
m = bufs.pop(0)
if sa.getTypeDescriptor() in ('buffer', 'ndarray'):
# always use a buffer, until memoryviews get sorted out
sa.data = buffer(m)
# disable memoryview support
# if copy:
# sa.data = buffer(m)
# else:
# sa.data = m.buffer
else:
if copy:
sa.data = m
else:
sa.data = m.bytes

args = uncanSequence(map(unserialize, sargs), g)
kwargs = {}
for k in sorted(skwargs.iterkeys()):
sa = skwargs[k]
if sa.data is None:
m = bufs.pop(0)
if sa.getTypeDescriptor() in ('buffer', 'ndarray'):
# always use a buffer, until memoryviews get sorted out
sa.data = buffer(m)
# disable memoryview support
# if copy:
# sa.data = buffer(m)
# else:
# sa.data = m.buffer
else:
if copy:
sa.data = m
else:
sa.data = m.bytes

kwargs[k] = uncan(unserialize(sa), g)

return f,args,kwargs

#--------------------------------------------------------------------------
# helpers for implementing old MEC API via view.apply
Expand Down
6 changes: 5 additions & 1 deletion IPython/zmq/ipkernel.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@
import time
import traceback
import logging
import uuid

from datetime import datetime
from signal import (
signal, default_int_handler, SIGINT, SIG_IGN
)
Expand All @@ -47,6 +50,7 @@

from entry_point import base_launch_kernel
from kernelapp import KernelApp, kernel_flags, kernel_aliases
from serialize import serialize_object, unpack_apply_message
from session import Session, Message
from zmqshell import ZMQInteractiveShell

Expand Down Expand Up @@ -540,7 +544,7 @@ def apply_request(self, socket, ident, parent):
exc_content = self._wrap_exception('apply')
# exc_msg = self.session.msg(u'pyerr', exc_content, parent)
self.session.send(self.iopub_socket, u'pyerr', exc_content, parent=parent,
ident=asbytes('%s.pyerr'%self.prefix))
ident=py3compat.str_to_bytes('%s.pyerr'%self.prefix))
reply_content = exc_content
result_buf = []

Expand Down
179 changes: 179 additions & 0 deletions IPython/zmq/serialize.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
"""serialization utilities for apply messages
Authors:
* Min RK
"""
#-----------------------------------------------------------------------------
# Copyright (C) 2010-2011 The IPython Development Team
#
# Distributed under the terms of the BSD License. The full license is in
# the file COPYING, distributed as part of this software.
#-----------------------------------------------------------------------------

#-----------------------------------------------------------------------------
# Imports
#-----------------------------------------------------------------------------

# Standard library imports
import logging
import os
import re
import socket
import sys

try:
import cPickle
pickle = cPickle
except:
cPickle = None
import pickle


# IPython imports
from IPython.utils import py3compat
from IPython.utils.pickleutil import can, uncan, canSequence, uncanSequence
from IPython.utils.newserialized import serialize, unserialize

if py3compat.PY3:
buffer = memoryview

#-----------------------------------------------------------------------------
# Serialization Functions
#-----------------------------------------------------------------------------

def serialize_object(obj, threshold=64e-6):
"""Serialize an object into a list of sendable buffers.
Parameters
----------
obj : object
The object to be serialized
threshold : float
The threshold for not double-pickling the content.
Returns
-------
('pmd', [bufs]) :
where pmd is the pickled metadata wrapper,
bufs is a list of data buffers
"""
databuffers = []
if isinstance(obj, (list, tuple)):
clist = canSequence(obj)
slist = map(serialize, clist)
for s in slist:
if s.typeDescriptor in ('buffer', 'ndarray') or s.getDataSize() > threshold:
databuffers.append(s.getData())
s.data = None
return pickle.dumps(slist,-1), databuffers
elif isinstance(obj, dict):
sobj = {}
for k in sorted(obj.iterkeys()):
s = serialize(can(obj[k]))
if s.typeDescriptor in ('buffer', 'ndarray') or s.getDataSize() > threshold:
databuffers.append(s.getData())
s.data = None
sobj[k] = s
return pickle.dumps(sobj,-1),databuffers
else:
s = serialize(can(obj))
if s.typeDescriptor in ('buffer', 'ndarray') or s.getDataSize() > threshold:
databuffers.append(s.getData())
s.data = None
return pickle.dumps(s,-1),databuffers


def unserialize_object(bufs):
"""reconstruct an object serialized by serialize_object from data buffers."""
bufs = list(bufs)
sobj = pickle.loads(bufs.pop(0))
if isinstance(sobj, (list, tuple)):
for s in sobj:
if s.data is None:
s.data = bufs.pop(0)
return uncanSequence(map(unserialize, sobj)), bufs
elif isinstance(sobj, dict):
newobj = {}
for k in sorted(sobj.iterkeys()):
s = sobj[k]
if s.data is None:
s.data = bufs.pop(0)
newobj[k] = uncan(unserialize(s))
return newobj, bufs
else:
if sobj.data is None:
sobj.data = bufs.pop(0)
return uncan(unserialize(sobj)), bufs

def pack_apply_message(f, args, kwargs, threshold=64e-6):
"""pack up a function, args, and kwargs to be sent over the wire
as a series of buffers. Any object whose data is larger than `threshold`
will not have their data copied (currently only numpy arrays support zero-copy)"""
msg = [pickle.dumps(can(f),-1)]
databuffers = [] # for large objects
sargs, bufs = serialize_object(args,threshold)
msg.append(sargs)
databuffers.extend(bufs)
skwargs, bufs = serialize_object(kwargs,threshold)
msg.append(skwargs)
databuffers.extend(bufs)
msg.extend(databuffers)
return msg

def unpack_apply_message(bufs, g=None, copy=True):
"""unpack f,args,kwargs from buffers packed by pack_apply_message()
Returns: original f,args,kwargs"""
bufs = list(bufs) # allow us to pop
assert len(bufs) >= 3, "not enough buffers!"
if not copy:
for i in range(3):
bufs[i] = bufs[i].bytes
cf = pickle.loads(bufs.pop(0))
sargs = list(pickle.loads(bufs.pop(0)))
skwargs = dict(pickle.loads(bufs.pop(0)))
# print sargs, skwargs
f = uncan(cf, g)
for sa in sargs:
if sa.data is None:
m = bufs.pop(0)
if sa.getTypeDescriptor() in ('buffer', 'ndarray'):
# always use a buffer, until memoryviews get sorted out
sa.data = buffer(m)
# disable memoryview support
# if copy:
# sa.data = buffer(m)
# else:
# sa.data = m.buffer
else:
if copy:
sa.data = m
else:
sa.data = m.bytes

args = uncanSequence(map(unserialize, sargs), g)
kwargs = {}
for k in sorted(skwargs.iterkeys()):
sa = skwargs[k]
if sa.data is None:
m = bufs.pop(0)
if sa.getTypeDescriptor() in ('buffer', 'ndarray'):
# always use a buffer, until memoryviews get sorted out
sa.data = buffer(m)
# disable memoryview support
# if copy:
# sa.data = buffer(m)
# else:
# sa.data = m.buffer
else:
if copy:
sa.data = m
else:
sa.data = m.bytes

kwargs[k] = uncan(unserialize(sa), g)

return f,args,kwargs

0 comments on commit ed57daf

Please sign in to comment.