Skip to content

Commit

Permalink
Merge pull request #2069 from minrk/betterserial
Browse files Browse the repository at this point in the history
start improving serialization in parallel code

* newserialized is now totally unused
* never double-pickle anything
* canning in pickleutil is extensible via can_map dict
* Session has configurable zero-copy threshold
  • Loading branch information
fperez committed Jul 26, 2012
2 parents 884e2d5 + 3abd3ab commit 8796581
Show file tree
Hide file tree
Showing 9 changed files with 434 additions and 177 deletions.
5 changes: 4 additions & 1 deletion IPython/parallel/client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1209,7 +1209,10 @@ def send_apply_request(self, socket, f, args=None, kwargs=None, metadata=None, t
if not isinstance(metadata, dict):
raise TypeError("metadata must be dict, not %s"%type(metadata))

bufs = util.pack_apply_message(f,args,kwargs)
bufs = util.pack_apply_message(f, args, kwargs,
buffer_threshold=self.session.buffer_threshold,
item_threshold=self.session.item_threshold,
)

msg = self.session.send(socket, "apply_request", buffers=bufs, ident=ident,
metadata=metadata, track=track)
Expand Down
4 changes: 2 additions & 2 deletions IPython/parallel/tests/test_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ def test_scatter_gather_numpy(self):
from numpy.testing.utils import assert_array_equal, assert_array_almost_equal
view = self.client[:]
a = numpy.arange(64)
view.scatter('a', a)
view.scatter('a', a, block=True)
b = view.gather('a', block=True)
assert_array_equal(b, a)

Expand Down Expand Up @@ -325,7 +325,7 @@ def test_map_iterable(self):
r = view.map_sync(lambda x:x, arr)
self.assertEqual(r, list(arr))

def test_scatterGatherNonblocking(self):
def test_scatter_gather_nonblocking(self):
data = range(16)
view = self.client[:]
view.scatter('a', data, block=False)
Expand Down
6 changes: 0 additions & 6 deletions IPython/parallel/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,17 +43,11 @@

# IPython imports
from IPython.config.application import Application
from IPython.utils import py3compat
from IPython.utils.pickleutil import can, uncan, canSequence, uncanSequence
from IPython.utils.newserialized import serialize, unserialize
from IPython.zmq.log import EnginePUBHandler
from IPython.zmq.serialize import (
unserialize_object, serialize_object, pack_apply_message, unpack_apply_message
)

if py3compat.PY3:
buffer = memoryview

#-----------------------------------------------------------------------------
# Classes
#-----------------------------------------------------------------------------
Expand Down
160 changes: 123 additions & 37 deletions IPython/utils/pickleutil.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,28 @@
#-------------------------------------------------------------------------------

import copy
import logging
import sys
from types import FunctionType

try:
import cPickle as pickle
except ImportError:
import pickle

try:
import numpy
except:
numpy = None

import codeutil
import py3compat
from importstring import import_item

from IPython.config import Application

if py3compat.PY3:
buffer = memoryview

#-------------------------------------------------------------------------------
# Classes
Expand All @@ -32,45 +50,49 @@ def __init__(self, obj, keys=[]):
self.obj = copy.copy(obj)
for key in keys:
setattr(self.obj, key, can(getattr(obj, key)))

self.buffers = []


def getObject(self, g=None):
def get_object(self, g=None):
if g is None:
g = globals()
g = {}
for key in self.keys:
setattr(self.obj, key, uncan(getattr(self.obj, key), g))
return self.obj


class Reference(CannedObject):
"""object for wrapping a remote reference by name."""
def __init__(self, name):
if not isinstance(name, basestring):
raise TypeError("illegal name: %r"%name)
self.name = name
self.buffers = []

def __repr__(self):
return "<Reference: %r>"%self.name

def getObject(self, g=None):
def get_object(self, g=None):
if g is None:
g = globals()
g = {}

return eval(self.name, g)


class CannedFunction(CannedObject):

def __init__(self, f):
self._checkType(f)
self._check_type(f)
self.code = f.func_code
self.defaults = f.func_defaults
self.module = f.__module__ or '__main__'
self.__name__ = f.__name__
self.buffers = []

def _checkType(self, obj):
def _check_type(self, obj):
assert isinstance(obj, FunctionType), "Not a function type"

def getObject(self, g=None):
def get_object(self, g=None):
# try to load function back into its module:
if not self.module.startswith('__'):
try:
Expand All @@ -81,30 +103,73 @@ def getObject(self, g=None):
g = sys.modules[self.module].__dict__

if g is None:
g = globals()
g = {}
newFunc = FunctionType(self.code, g, self.__name__, self.defaults)
return newFunc


class CannedArray(CannedObject):
def __init__(self, obj):
self.shape = obj.shape
self.dtype = obj.dtype.descr if obj.dtype.fields else obj.dtype.str
if sum(obj.shape) == 0:
# just pickle it
self.buffers = [pickle.dumps(obj, -1)]
else:
# ensure contiguous
obj = numpy.ascontiguousarray(obj, dtype=None)
self.buffers = [buffer(obj)]

def get_object(self, g=None):
data = self.buffers[0]
if sum(self.shape) == 0:
# no shape, we just pickled it
return pickle.loads(data)
else:
return numpy.frombuffer(data, dtype=self.dtype).reshape(self.shape)


class CannedBytes(CannedObject):
wrap = bytes
def __init__(self, obj):
self.buffers = [obj]

def get_object(self, g=None):
data = self.buffers[0]
return self.wrap(data)

def CannedBuffer(CannedBytes):
wrap = buffer

#-------------------------------------------------------------------------------
# Functions
#-------------------------------------------------------------------------------

def can(obj):
# import here to prevent module-level circular imports
from IPython.parallel import dependent
if isinstance(obj, dependent):
keys = ('f','df')
return CannedObject(obj, keys=keys)
elif isinstance(obj, FunctionType):
return CannedFunction(obj)
elif isinstance(obj,dict):
return canDict(obj)
elif isinstance(obj, (list,tuple)):
return canSequence(obj)
def _error(*args, **kwargs):
if Application.initialized():
logger = Application.instance().log
else:
return obj
logger = logging.getLogger()
if not logger.handlers:
logging.basicConfig()
logger.error(*args, **kwargs)

def canDict(obj):
def can(obj):
"""prepare an object for pickling"""
for cls,canner in can_map.iteritems():
if isinstance(cls, basestring):
try:
cls = import_item(cls)
except Exception:
_error("cannning class not importable: %r", cls, exc_info=True)
cls = None
continue
if isinstance(obj, cls):
return canner(obj)
return obj

def can_dict(obj):
"""can the *values* of a dict"""
if isinstance(obj, dict):
newobj = {}
for k, v in obj.iteritems():
Expand All @@ -113,24 +178,29 @@ def canDict(obj):
else:
return obj

def canSequence(obj):
def can_sequence(obj):
"""can the elements of a sequence"""
if isinstance(obj, (list, tuple)):
t = type(obj)
return t([can(i) for i in obj])
else:
return obj

def uncan(obj, g=None):
if isinstance(obj, CannedObject):
return obj.getObject(g)
elif isinstance(obj,dict):
return uncanDict(obj, g)
elif isinstance(obj, (list,tuple)):
return uncanSequence(obj, g)
else:
return obj

def uncanDict(obj, g=None):
"""invert canning"""
for cls,uncanner in uncan_map.iteritems():
if isinstance(cls, basestring):
try:
cls = import_item(cls)
except Exception:
_error("uncanning class not importable: %r", cls, exc_info=True)
cls = None
continue
if isinstance(obj, cls):
return uncanner(obj, g)
return obj

def uncan_dict(obj, g=None):
if isinstance(obj, dict):
newobj = {}
for k, v in obj.iteritems():
Expand All @@ -139,13 +209,29 @@ def uncanDict(obj, g=None):
else:
return obj

def uncanSequence(obj, g=None):
def uncan_sequence(obj, g=None):
if isinstance(obj, (list, tuple)):
t = type(obj)
return t([uncan(i,g) for i in obj])
else:
return obj


def rebindFunctionGlobals(f, glbls):
return FunctionType(f.func_code, glbls)
#-------------------------------------------------------------------------------
# API dictionary
#-------------------------------------------------------------------------------

# These dicts can be extended for custom serialization of new objects

can_map = {
'IPython.parallel.dependent' : lambda obj: CannedObject(obj, keys=('f','df')),
'numpy.ndarray' : CannedArray,
FunctionType : CannedFunction,
bytes : CannedBytes,
buffer : CannedBuffer,
}

uncan_map = {
CannedObject : lambda obj, g: obj.get_object(g),
}

7 changes: 5 additions & 2 deletions IPython/zmq/ipkernel.py
Original file line number Diff line number Diff line change
Expand Up @@ -570,8 +570,11 @@ def apply_request(self, stream, ident, parent):
for key in ns.iterkeys():
working.pop(key)

packed_result,buf = serialize_object(result)
result_buf = [packed_result]+buf
result_buf = serialize_object(result,
buffer_threshold=self.session.buffer_threshold,
item_threshold=self.session.item_threshold,
)

except:
# invoke IPython traceback formatting
shell.showtraceback()
Expand Down
Loading

0 comments on commit 8796581

Please sign in to comment.