Skip to content

Commit

Permalink
Merge pull request #2246 from minrk/canning_depth
Browse files Browse the repository at this point in the history
serialize individual args/kwargs rather than the containers

This allows specials to be handled inside containers as args/kwargs

It is less efficient than before (still more efficient than 0.13) in cases where there is a very large number of simple arguments, but I think that generally the number of arguments is modest, and simple cases of small containers of arrays/References are likely (as most readily demonstrated by use in map).

The relevant behaviors are tested.

closes #2239
  • Loading branch information
minrk committed Aug 9, 2012
2 parents 346cac5 + 2fc7d23 commit 75c66c8
Show file tree
Hide file tree
Showing 2 changed files with 94 additions and 22 deletions.
48 changes: 48 additions & 0 deletions IPython/parallel/tests/test_view.py
Expand Up @@ -627,4 +627,52 @@ def test_data_pub(self):
self.assertTrue(all(isinstance(d, dict) for d in ar.data))
ar.get(5)
self.assertEqual(ar.data, [dict(i=4)] * len(ar))

def test_can_list_arg(self):
"""args in lists are canned"""
view = self.client[-1]
view['a'] = 128
rA = pmod.Reference('a')
ar = view.apply_async(lambda x: x, [rA])
r = ar.get(5)
self.assertEqual(r, [128])

def test_can_dict_arg(self):
"""args in dicts are canned"""
view = self.client[-1]
view['a'] = 128
rA = pmod.Reference('a')
ar = view.apply_async(lambda x: x, dict(foo=rA))
r = ar.get(5)
self.assertEqual(r, dict(foo=128))

def test_can_list_kwarg(self):
"""kwargs in lists are canned"""
view = self.client[-1]
view['a'] = 128
rA = pmod.Reference('a')
ar = view.apply_async(lambda x=5: x, x=[rA])
r = ar.get(5)
self.assertEqual(r, [128])

def test_can_dict_kwarg(self):
"""kwargs in dicts are canned"""
view = self.client[-1]
view['a'] = 128
rA = pmod.Reference('a')
ar = view.apply_async(lambda x=5: x, dict(foo=rA))
r = ar.get(5)
self.assertEqual(r, dict(foo=128))

def test_map_ref(self):
"""view.map works with references"""
view = self.client[:]
ranks = sorted(self.client.ids)
view.scatter('rank', ranks, flatten=True)
rrank = pmod.Reference('rank')

amr = view.map_async(lambda x: x*2, [rrank] * len(view))
drank = amr.get(5)
self.assertEqual(drank, [ r*2 for r in ranks ])


68 changes: 46 additions & 22 deletions IPython/zmq/serialize.py
Expand Up @@ -32,6 +32,7 @@

# IPython imports
from IPython.utils import py3compat
from IPython.utils.data import flatten
from IPython.utils.pickleutil import (
can, uncan, can_sequence, uncan_sequence, CannedObject
)
Expand Down Expand Up @@ -123,7 +124,11 @@ def unserialize_object(buffers, g=None):
(newobj, bufs) : unpacked object, and the list of remaining unused buffers.
"""
bufs = list(buffers)
canned = pickle.loads(bufs.pop(0))
pobj = bufs.pop(0)
if not isinstance(pobj, bytes):
# a zmq message
pobj = bytes(pobj)
canned = pickle.loads(pobj)
if isinstance(canned, (list, tuple)) and len(canned) < MAX_ITEMS:
for c in canned:
_restore_buffers(c, bufs)
Expand All @@ -143,38 +148,57 @@ def unserialize_object(buffers, g=None):
def pack_apply_message(f, args, kwargs, buffer_threshold=MAX_BYTES, item_threshold=MAX_ITEMS):
"""pack up a function, args, and kwargs to be sent over the wire
as a series of buffers. Any object whose data is larger than `threshold`
will not have their data copied (currently only numpy arrays support zero-copy)
Each element of args/kwargs will be canned for special treatment,
but inspection will not go any deeper than that.
Any object whose data is larger than `threshold` will not have their data copied
(only numpy arrays and bytes/buffers support zero-copy)
Message will be a list of bytes/buffers of the format:
[ cf, pinfo, <arg_bufs>, <kwarg_bufs> ]
With length at least two + len(args) + len(kwargs)
"""

arg_bufs = flatten(serialize_object(arg, buffer_threshold, item_threshold) for arg in args)

kw_keys = sorted(kwargs.keys())
kwarg_bufs = flatten(serialize_object(kwargs[key], buffer_threshold, item_threshold) for key in kw_keys)

info = dict(nargs=len(args), narg_bufs=len(arg_bufs), kw_keys=kw_keys)

msg = [pickle.dumps(can(f),-1)]
databuffers = [] # for large objects
sargs = serialize_object(args, buffer_threshold, item_threshold)
msg.append(sargs[0])
databuffers.extend(sargs[1:])
skwargs = serialize_object(kwargs, buffer_threshold, item_threshold)
msg.append(skwargs[0])
databuffers.extend(skwargs[1:])
msg.extend(databuffers)
msg.append(pickle.dumps(info, -1))
msg.extend(arg_bufs)
msg.extend(kwarg_bufs)

return msg

def unpack_apply_message(bufs, g=None, copy=True):
"""unpack f,args,kwargs from buffers packed by pack_apply_message()
Returns: original f,args,kwargs"""
bufs = list(bufs) # allow us to pop
assert len(bufs) >= 3, "not enough buffers!"
assert len(bufs) >= 2, "not enough buffers!"
if not copy:
for i in range(3):
for i in range(2):
bufs[i] = bufs[i].bytes
f = uncan(pickle.loads(bufs.pop(0)), g)
# sargs = bufs.pop(0)
# pop kwargs out, so first n-elements are args, serialized
skwargs = bufs.pop(1)
args, bufs = unserialize_object(bufs, g)
# put skwargs back in as the first element
bufs.insert(0, skwargs)
kwargs, bufs = unserialize_object(bufs, g)

assert not bufs, "Shouldn't be any data left over"
info = pickle.loads(bufs.pop(0))
arg_bufs, kwarg_bufs = bufs[:info['narg_bufs']], bufs[info['narg_bufs']:]

args = []
for i in range(info['nargs']):
arg, arg_bufs = unserialize_object(arg_bufs, g)
args.append(arg)
args = tuple(args)
assert not arg_bufs, "Shouldn't be any arg bufs left over"

kwargs = {}
for key in info['kw_keys']:
kwarg, kwarg_bufs = unserialize_object(kwarg_bufs, g)
kwargs[key] = kwarg
assert not kwarg_bufs, "Shouldn't be any kwarg bufs left over"

return f,args,kwargs

0 comments on commit 75c66c8

Please sign in to comment.