Permalink
Browse files

reflect revised apply_bound pattern

  • Loading branch information...
1 parent 62f8971 commit e5c376134e47f621dbb4bc79d0fccff1ef8f9cb2 @minrk minrk committed Mar 19, 2011
View
4 IPython/utils/newserialized.py
@@ -102,7 +102,7 @@ def __init__(self, unSerialized):
self.typeDescriptor = 'ndarray'
self.metadata = {'shape':self.obj.shape,
'dtype':self.obj.dtype.str}
- elif isinstance(self.obj, bytes):
+ elif isinstance(self.obj, str):
self.typeDescriptor = 'bytes'
self.metadata = {}
elif isinstance(self.obj, buffer):
@@ -148,7 +148,7 @@ def getObject(self):
typeDescriptor = self.serialized.getTypeDescriptor()
if globals().has_key('numpy') and typeDescriptor == 'ndarray':
buf = self.serialized.getData()
- if isinstance(buf, (buffer,bytes)):
+ if isinstance(buf, (str, buffer)):
result = numpy.frombuffer(buf, dtype = self.serialized.metadata['dtype'])
else:
# memoryview
View
3 IPython/utils/pickleutil.py
@@ -66,6 +66,7 @@ class CannedFunction(CannedObject):
def __init__(self, f):
self._checkType(f)
self.code = f.func_code
+ self.defaults = f.func_defaults
self.__name__ = f.__name__
def _checkType(self, obj):
@@ -74,7 +75,7 @@ def _checkType(self, obj):
def getObject(self, g=None):
if g is None:
g = globals()
- newFunc = FunctionType(self.code, g)
+ newFunc = FunctionType(self.code, g, self.__name__, self.defaults)
return newFunc
#-------------------------------------------------------------------------------
View
44 IPython/zmq/parallel/client.py
@@ -45,30 +45,29 @@
# helpers for implementing old MEC API via client.apply
#--------------------------------------------------------------------------
-def _push(ns):
+def _push(user_ns, **ns):
"""helper method for implementing `client.push` via `client.apply`"""
- globals().update(ns)
+ user_ns.update(ns)
-def _pull(keys):
+def _pull(user_ns, keys):
"""helper method for implementing `client.pull` via `client.apply`"""
- g = globals()
if isinstance(keys, (list,tuple, set)):
for key in keys:
- if not g.has_key(key):
+ if not user_ns.has_key(key):
raise NameError("name '%s' is not defined"%key)
- return map(g.get, keys)
+ return map(user_ns.get, keys)
else:
- if not g.has_key(keys):
+ if not user_ns.has_key(keys):
raise NameError("name '%s' is not defined"%keys)
- return g.get(keys)
+ return user_ns.get(keys)
-def _clear():
+def _clear(user_ns):
"""helper method for implementing `client.clear` via `client.apply`"""
- globals().clear()
+ user_ns.clear()
-def _execute(code):
+def _execute(user_ns, code):
"""helper method for implementing `client.execute` via `client.apply`"""
- exec code in globals()
+ exec code in user_ns
#--------------------------------------------------------------------------
@@ -946,7 +945,7 @@ def _build_dependency(self, dep):
return list(Dependency(dep))
@defaultblock
- def apply(self, f, args=None, kwargs=None, bound=True, block=None,
+ def apply(self, f, args=None, kwargs=None, bound=False, block=None,
targets=None, balanced=None,
after=None, follow=None, timeout=None,
track=False):
@@ -963,9 +962,8 @@ def apply(self, f, args=None, kwargs=None, bound=True, block=None,
The positional arguments passed to `f`
kwargs : dict
The keyword arguments passed to `f`
- bound : bool (default: True)
- Whether to execute in the Engine(s) namespace, or in a clean
- namespace not affecting the engine.
+ bound : bool (default: False)
+ Whether to pass the Engine(s) Namespace as the first argument to `f`.
block : bool (default: self.block)
Whether to wait for the result, or return immediately.
False:
@@ -1171,12 +1169,12 @@ def _apply_direct(self, f, args, kwargs, bound=None, block=None, targets=None,
#--------------------------------------------------------------------------
@defaultblock
- def remote(self, bound=True, block=None, targets=None, balanced=None):
+ def remote(self, bound=False, block=None, targets=None, balanced=None):
"""Decorator for making a RemoteFunction"""
return remote(self, bound=bound, targets=targets, block=block, balanced=balanced)
@defaultblock
- def parallel(self, dist='b', bound=True, block=None, targets=None, balanced=None):
+ def parallel(self, dist='b', bound=False, block=None, targets=None, balanced=None):
"""Decorator for making a ParallelFunction"""
return parallel(self, bound=bound, targets=targets, block=block, balanced=balanced)
@@ -1249,19 +1247,21 @@ def push(self, ns, targets='all', block=None, track=False):
"""Push the contents of `ns` into the namespace on `target`"""
if not isinstance(ns, dict):
raise TypeError("Must be a dict, not %s"%type(ns))
- result = self.apply(_push, (ns,), targets=targets, block=block, bound=True, balanced=False, track=track)
+ result = self.apply(_push, kwargs=ns, targets=targets, block=block, bound=True, balanced=False, track=track)
if not block:
return result
@defaultblock
def pull(self, keys, targets='all', block=None):
"""Pull objects from `target`'s namespace by `keys`"""
- if isinstance(keys, str):
+ if isinstance(keys, basestring):
pass
elif isinstance(keys, (list,tuple,set)):
for key in keys:
- if not isinstance(key, str):
- raise TypeError
+ if not isinstance(key, basestring):
+ raise TypeError("keys must be str, not type %r"%type(key))
+ else:
+ raise TypeError("keys must be strs, not %r"%keys)
result = self.apply(_pull, (keys,), targets=targets, block=block, bound=True, balanced=False)
return result
View
4 IPython/zmq/parallel/remotefunction.py
@@ -22,7 +22,7 @@
#-----------------------------------------------------------------------------
@testdec.skip_doctest
-def remote(client, bound=True, block=None, targets=None, balanced=None):
+def remote(client, bound=False, block=None, targets=None, balanced=None):
"""Turn a function into a remote function.
This method can be used for map:
@@ -37,7 +37,7 @@ def remote_function(f):
return remote_function
@testdec.skip_doctest
-def parallel(client, dist='b', bound=True, block=None, targets='all', balanced=None):
+def parallel(client, dist='b', bound=False, block=None, targets='all', balanced=None):
"""Turn a function into a parallel remote function.
This method can be used for map:
View
45 IPython/zmq/parallel/streamkernel.py
@@ -34,7 +34,7 @@
from .error import wrap_exception
from .factory import SessionFactory
from .streamsession import StreamSession
-from .util import serialize_object, unpack_apply_message, ISO8601
+from .util import serialize_object, unpack_apply_message, ISO8601, Namespace
def printer(*args):
pprint(args, stream=sys.__stdout__)
@@ -305,35 +305,38 @@ def apply_request(self, stream, ident, parent):
sys.stdout.set_parent(parent)
sys.stderr.set_parent(parent)
# exec "f(*args,**kwargs)" in self.user_ns, self.user_ns
- if bound:
- working = self.user_ns
- suffix = str(msg_id).replace("-","")
- prefix = "_"
-
- else:
- working = dict()
- suffix = prefix = "_" # prevent keyword collisions with lambda
+ working = self.user_ns
+ # suffix =
+ prefix = "_"+str(msg_id).replace("-","")+"_"
+ # if bound:
+ #
+ # else:
+ # working = dict()
+ # suffix = prefix = "_" # prevent keyword collisions with lambda
f,args,kwargs = unpack_apply_message(bufs, working, copy=False)
+ if bound:
+ bound_ns = Namespace(working)
+ args = [bound_ns]+list(args)
# if f.fun
fname = getattr(f, '__name__', 'f')
- fname = prefix+fname.strip('<>')+suffix
- argname = prefix+"args"+suffix
- kwargname = prefix+"kwargs"+suffix
- resultname = prefix+"result"+suffix
+ fname = prefix+"f"
+ argname = prefix+"args"
+ kwargname = prefix+"kwargs"
+ resultname = prefix+"result"
- ns = { fname : f, argname : args, kwargname : kwargs }
+ ns = { fname : f, argname : args, kwargname : kwargs , resultname : None }
# print ns
working.update(ns)
code = "%s=%s(*%s,**%s)"%(resultname, fname, argname, kwargname)
- exec code in working, working
- result = working.get(resultname)
- # clear the namespace
- if bound:
+ try:
+ exec code in working,working
+ result = working.get(resultname)
+ finally:
for key in ns.iterkeys():
- self.user_ns.pop(key)
- else:
- del working
+ working.pop(key)
+ if bound:
+ working.update(bound_ns)
packed_result,buf = serialize_object(result)
result_buf = [packed_result]+buf
View
10 IPython/zmq/parallel/tests/__init__.py
@@ -4,8 +4,7 @@
import time
from subprocess import Popen, PIPE, STDOUT
-from IPython.zmq.parallel.ipcluster import launch_process
-from IPython.zmq.parallel.entry_point import select_random_ports
+from IPython.zmq.parallel import client
processes = []
blackhole = tempfile.TemporaryFile()
@@ -17,7 +16,10 @@ def setup():
processes.append(cp)
time.sleep(.5)
add_engine()
- time.sleep(2)
+ c = client.Client(profile='iptest')
+ while not c.ids:
+ time.sleep(.1)
+ c.spin()
def add_engine(profile='iptest'):
ep = Popen(['ipenginez']+ ['--profile', profile, '--log-level', '40'], stdout=blackhole, stderr=STDOUT)
@@ -42,5 +44,5 @@ def teardown():
print 'killing'
p.kill()
except:
- print "couldn't shutdown process: ",p
+ print "couldn't shutdown process: ", p
View
4 IPython/zmq/parallel/tests/clienttest.py
@@ -91,8 +91,8 @@ def setUp(self):
def tearDown(self):
self.client.close()
BaseZMQTestCase.tearDown(self)
- # [ e.terminate() for e in filter(lambda e: e.poll() is None, self.engines) ]
- # [ e.wait() for e in self.engines ]
+ # [ e.terminate() for e in filter(lambda e: e.poll() is None, self.engines) ]
+ # [ e.wait() for e in self.engines ]
# while len(self.client.ids) > self.base_engine_count:
# time.sleep(.1)
# del self.engines
View
16 IPython/zmq/parallel/tests/test_client.py
@@ -165,6 +165,17 @@ def geta():
v.execute('b=f()')
self.assertEquals(v['b'], 5)
+ def test_push_function_defaults(self):
+ """test that pushed functions preserve default args"""
+ def echo(a=10):
+ return a
+ self.add_engines(1)
+ v = self.client[-1]
+ v.block=True
+ v['f'] = echo
+ v.execute('b=f()')
+ self.assertEquals(v['b'], 10)
+
def test_get_result(self):
"""test getting results from the Hub."""
c = clientmod.Client(profile='iptest')
@@ -195,7 +206,7 @@ def test_run_newline(self):
""")
v = self.client[-1]
v.run(tmpfile, block=True)
- self.assertEquals(v.apply_sync_bound(lambda : g()), 5)
+ self.assertEquals(v.apply_sync(lambda : g()), 5)
def test_apply_tracked(self):
"""test tracking for apply"""
@@ -245,8 +256,7 @@ def test_remote_reference(self):
v = self.client[-1]
v['a'] = 123
ra = clientmod.Reference('a')
- b = v.apply_sync_bound(lambda x: x, ra)
+ b = v.apply_sync(lambda x: x, ra)
self.assertEquals(b, 123)
- self.assertRaisesRemote(NameError, v.apply_sync, lambda x: x, ra)
View
30 IPython/zmq/parallel/util.py
@@ -15,6 +15,23 @@
ISO8601="%Y-%m-%dT%H:%M:%S.%f"
+class Namespace(dict):
+ """Subclass of dict for attribute access to keys."""
+
+ def __getattr__(self, key):
+ """getattr aliased to getitem"""
+ if key in self.iterkeys():
+ return self[key]
+ else:
+ raise NameError(key)
+
+ def __setattr__(self, key, value):
+ """setattr aliased to setitem, with strict"""
+ if hasattr(dict, key):
+ raise KeyError("Cannot override dict keys %r"%key)
+ self[key] = value
+
+
class ReverseDict(dict):
"""simple double-keyed subset of dict methods."""
@@ -264,7 +281,18 @@ def unpack_apply_message(bufs, g=None, copy=True):
for k in sorted(skwargs.iterkeys()):
sa = skwargs[k]
if sa.data is None:
- sa.data = bufs.pop(0)
+ m = bufs.pop(0)
+ if sa.getTypeDescriptor() in ('buffer', 'ndarray'):
+ if copy:
+ sa.data = buffer(m)
+ else:
+ sa.data = m.buffer
+ else:
+ if copy:
+ sa.data = m
+ else:
+ sa.data = m.bytes
+
kwargs[k] = uncan(unserialize(sa), g)
return f,args,kwargs
View
50 IPython/zmq/parallel/view.py
@@ -74,14 +74,15 @@ class View(HasTraits):
"""
block=Bool(False)
bound=Bool(False)
+ track=Bool(False)
history=List()
outstanding = Set()
results = Dict()
client = Instance('IPython.zmq.parallel.client.Client')
_ntargets = Int(1)
_balanced = Bool(False)
- _default_names = List(['block', 'bound'])
+ _default_names = List(['block', 'bound', 'track'])
_targets = Any()
def __init__(self, client=None, targets=None):
@@ -139,7 +140,12 @@ def set_flags(self, **kwargs):
block : bool
whether to wait for results
bound : bool
- whether to use the client's namespace
+ whether to pass the client's Namespace as the first argument
+ to functions called via `apply`.
+ track : bool
+ whether to create a MessageTracker to allow the user to
+ safely edit after arrays and buffers during non-copying
+ sends.
"""
for key in kwargs:
if key not in self._default_names:
@@ -161,10 +167,11 @@ def spin(self):
def apply(self, f, *args, **kwargs):
"""calls f(*args, **kwargs) on remote engines, returning the result.
- This method does not involve the engine's namespace.
+ This method sets all of `client.apply`'s keyword arguments via this
+ View's attributes.
if self.block is False:
- returns msg_id
+ returns AsyncResult
else:
returns actual result of f(*args, **kwargs)
"""
@@ -174,9 +181,7 @@ def apply(self, f, *args, **kwargs):
def apply_async(self, f, *args, **kwargs):
"""calls f(*args, **kwargs) on remote engines in a nonblocking manner.
- This method does not involve the engine's namespace.
-
- returns msg_id
+ returns AsyncResult
"""
d = self._defaults('block', 'bound')
return self.client.apply(f,args,kwargs, block=False, bound=False, **d)
@@ -187,11 +192,9 @@ def apply_sync(self, f, *args, **kwargs):
"""calls f(*args, **kwargs) on remote engines in a blocking manner,
returning the result.
- This method does not involve the engine's namespace.
-
returns: actual result of f(*args, **kwargs)
"""
- d = self._defaults('block', 'bound')
+ d = self._defaults('block', 'bound', 'track')
return self.client.apply(f,args,kwargs, block=True, bound=False, **d)
# @sync_results
@@ -216,9 +219,9 @@ def apply_async_bound(self, f, *args, **kwargs):
"""calls f(*args, **kwargs) bound to engine namespace(s)
in a nonblocking manner.
- returns: msg_id
+ The first argument to `f` will be the Engine's Namespace
- This method has access to the targets' namespace via globals()
+ returns: AsyncResult
"""
d = self._defaults('block', 'bound')
@@ -229,9 +232,9 @@ def apply_async_bound(self, f, *args, **kwargs):
def apply_sync_bound(self, f, *args, **kwargs):
"""calls f(*args, **kwargs) bound to engine namespace(s), waiting for the result.
- returns: actual result of f(*args, **kwargs)
+ The first argument to `f` will be the Engine's Namespace
- This method has access to the targets' namespace via globals()
+ returns: actual result of f(*args, **kwargs)
"""
d = self._defaults('block', 'bound')
@@ -323,11 +326,11 @@ def imap(self, f, *sequences, **kwargs):
# Decorators
#-------------------------------------------------------------------
- def remote(self, bound=True, block=True):
+ def remote(self, bound=False, block=True):
"""Decorator for making a RemoteFunction"""
return remote(self.client, bound=bound, targets=self._targets, block=block, balanced=self._balanced)
- def parallel(self, dist='b', bound=True, block=None):
+ def parallel(self, dist='b', bound=False, block=None):
"""Decorator for making a ParallelFunction"""
block = self.block if block is None else block
return parallel(self.client, bound=bound, targets=self._targets, block=block, balanced=self._balanced)
@@ -378,7 +381,7 @@ def map(self, f, *sequences, **kwargs):
block : bool
whether to wait for the result or not [default self.block]
bound : bool
- whether to have access to the engines' namespaces [default self.bound]
+ whether to pass the client's Namespace as the first argument to `f`
Returns
-------
@@ -572,7 +575,12 @@ def set_flags(self, **kwargs):
block : bool
whether to wait for results
bound : bool
- whether to use the engine's namespace
+ whether to pass the client's Namespace as the first argument
+ to functions called via `apply`.
+ track : bool
+ whether to create a MessageTracker to allow the user to
+ safely edit after arrays and buffers during non-copying
+ sends.
follow : Dependency, list, msg_id, AsyncResult
the location dependencies of tasks
after : Dependency, list, msg_id, AsyncResult
@@ -621,7 +629,11 @@ def map(self, f, *sequences, **kwargs):
block : bool
whether to wait for the result or not [default self.block]
bound : bool
- whether to use the engine's namespace [default self.bound]
+ whether to pass the client's Namespace as the first argument to `f`
+ track : bool
+ whether to create a MessageTracker to allow the user to
+ safely edit after arrays and buffers during non-copying
+ sends.
chunk_size : int
how many elements should be in each task [default 1]
View
6 docs/examples/newparallel/interengine/interengine.py
@@ -10,12 +10,12 @@
view.execute('com = EngineCommunicator()')
# gather the connection information into a dict
-ar = view.apply_async_bound(lambda : com.info)
+ar = view.apply_async(lambda : com.info)
peers = ar.get_dict()
# this is a dict, keyed by engine ID, of the connection info for the EngineCommunicators
# connect the engines to each other:
-view.apply_sync_bound(lambda pdict: com.connect(pdict), peers)
+view.apply_sync(lambda pdict: com.connect(pdict), peers)
# now all the engines are connected, and we can communicate between them:
@@ -34,7 +34,7 @@ def _send(targets, m_name):
msg = globals()[m_name]
return com.send(targets, msg)
- client[sender].apply_async_bound(_send, targets, msg_name)
+ client[sender].apply_async(_send, targets, msg_name)
return client[targets].execute('%s=com.recv()'%dest_name, block=None)
View
2 docs/examples/newparallel/parallelpi.py
@@ -42,7 +42,7 @@
# Run 10m digits on 1 engine
t1 = clock()
-freqs10m = c[id0].apply_sync_bound(compute_two_digit_freqs, files[0])
+freqs10m = c[id0].apply_sync(compute_two_digit_freqs, files[0])
t2 = clock()
digits_per_second1 = 10.0e6/(t2-t1)
print "Digits per second (1 core, 10m digits): ", digits_per_second1
View
47 docs/source/parallelz/parallel_multiengine.txt
@@ -68,7 +68,7 @@ constructed via list-access to the client:
.. seealso::
- For more information, see the in-depth explanation of :ref:`Views <parallel_view>`.
+ For more information, see the in-depth explanation of :ref:`Views <parallel_details>`.
Quick and easy parallelism
@@ -232,8 +232,8 @@ blocks until the engines are done executing the command:
In [5]: dview['b'] = 10
- In [6]: dview.apply_bound(lambda x: a+b+x, 27)
- Out[6]: [42, 42, 42, 42]%exit
+ In [6]: dview.apply_sync(lambda x: a+b+x, 27)
+ Out[6]: [42, 42, 42, 42]
Python commands can be executed on specific engines by calling execute using the ``targets``
keyword argument in :meth:`client.execute`, or creating a :class:`DirectView` instance by
@@ -265,39 +265,30 @@ Bound and unbound execution
The previous example also shows one of the most important things about the IPython
engines: they have a persistent user namespaces. The :meth:`apply` method can
-be run in either a bound or unbound manner:
+be run in either a bound or unbound manner.
+
+When applying a function in a `bound` manner, the first argument to that function
+will be the Engine's namespace, which is a :class:`Namespace` object, a dictionary
+also providing attribute-access to keys.
+
+In all (unbound and bound) execution
.. sourcecode:: ipython
In [9]: dview['b'] = 5 # assign b to 5 everywhere
In [10]: v0 = rc[0]
- In [12]: v0.apply_sync_bound(lambda : b)
- Out[12]: 5
+ # multiply b*2 inplace
+ In [12]: v0.apply_sync_bound(lambda ns: ns.b*=2)
- In [13]: v0.apply_sync(lambda : b)
- ---------------------------------------------------------------------------
- RemoteError Traceback (most recent call last)
- /home/you/<ipython-input-34-21a468eb10f0> in <module>()
- ----> 1 v0.apply(lambda : b)
- ...
- RemoteError: NameError(global name 'b' is not defined)
- Traceback (most recent call last):
- File "/Users/minrk/dev/ip/mine/IPython/zmq/parallel/streamkernel.py", line 294, in apply_request
- exec code in working, working
- File "<string>", line 1, in <module>
- File "<ipython-input-34-21a468eb10f0>", line 1, in <lambda>
- NameError: global name 'b' is not defined
-
-
-Specifically, `bound=True` specifies that the engine's namespace is to be used
-as the `globals` when the function is called, and `bound=False` specifies that
-the engine's namespace is not to be used (hence, 'b' is undefined during unbound
-execution, since the function is called in an empty namespace). Unbound execution is
-often useful for large numbers of atomic tasks, which prevents bloating the engine's
-memory, while bound execution lets you build on your previous work.
+ # b is still available in globals during unbound execution
+ In [13]: v0.apply_sync(lambda a: a*b, 3)
+ Out[13]: 30
+`bound=True` specifies that the engine's namespace is to be passed as the first argument when
+the function is called, and the default `bound=False` specifies that the normal behavior, but
+the engine's namespace will be available as the globals() when the function is called.
Non-blocking execution
----------------------
@@ -469,7 +460,7 @@ specifying the index of the result to be requested. It is simply a shortcut to t
.. sourcecode:: ipython
- In [29]: dv.apply_async_bound(lambda : ev)
+ In [29]: dv.apply_async(lambda : ev)
In [30]: %result
Out[30]: [ [ 1.28167017 0.14197338],
View
15 docs/source/parallelz/parallel_task.txt
@@ -298,6 +298,8 @@ The basic cases that are checked:
This analysis has not been proven to be rigorous, so it is likely possible for tasks
to become impossible to run in obscure situations, so a timeout may be a good choice.
+.. _parallel_schedulers:
+
Schedulers
==========
@@ -309,18 +311,25 @@ of a controller config object.
The built-in routing schemes:
+To select one of these schemes, simply do::
+
+ $ ipcontrollerz --scheme <schemename>
+ for instance:
+ $ ipcontrollerz --scheme lru
+
lru: Least Recently Used
Always assign work to the least-recently-used engine. A close relative of
round-robin, it will be fair with respect to the number of tasks, agnostic
with respect to runtime of each task.
plainrandom: Plain Random
+
Randomly picks an engine on which to run.
twobin: Two-Bin Random
- **Depends on numpy**
+ **Requires numpy**
Pick two engines at random, and use the LRU of the two. This is known to be better
than plain random in many cases, but requires a small amount of computation.
@@ -333,7 +342,7 @@ leastload: Least Load
weighted: Weighted Two-Bin Random
- **Depends on numpy**
+ **Requires numpy**
Pick two engines at random using the number of outstanding tasks as inverse weights,
and use the one with the lower load.
@@ -360,7 +369,7 @@ Disabled features when using the ZMQ Scheduler:
allows graceful handling of Engines coming and going. There is no way to know
where ZeroMQ messages have gone, so there is no way to know what tasks are on which
engine until they *finish*. This makes recovery from engine shutdown very difficult.
-
+
.. note::

0 comments on commit e5c3761

Please sign in to comment.