Browse files

split pendingresult and remotefunction into own files, add view.map.

  • Loading branch information...
1 parent 15b7567 commit 44e6eb1e9892a8c5f4e4f50fa31bc02d384df32d @minrk minrk committed Jan 24, 2011
View
172 IPython/zmq/parallel/client.py
@@ -29,6 +29,8 @@
from dependency import Dependency, depend, require
import error
import map as Map
+from pendingresult import PendingResult,PendingMapResult
+from remotefunction import remote,parallel,ParallelFunction,RemoteFunction
#--------------------------------------------------------------------------
# helpers for implementing old MEC API via client.apply
@@ -81,167 +83,6 @@ def defaultblock(f, self, *args, **kwargs):
self.block = saveblock
return ret
-def remote(client, bound=False, block=None, targets=None):
- """Turn a function into a remote function.
-
- This method can be used for map:
-
- >>> @remote(client,block=True)
- def func(a)
- """
- def remote_function(f):
- return RemoteFunction(client, f, bound, block, targets)
- return remote_function
-
-def parallel(client, dist='b', bound=False, block=None, targets='all'):
- """Turn a function into a parallel remote function.
-
- This method can be used for map:
-
- >>> @parallel(client,block=True)
- def func(a)
- """
- def parallel_function(f):
- return ParallelFunction(client, f, dist, bound, block, targets)
- return parallel_function
-
-#--------------------------------------------------------------------------
-# Classes
-#--------------------------------------------------------------------------
-
-class RemoteFunction(object):
- """Turn an existing function into a remote function.
-
- Parameters
- ----------
-
- client : Client instance
- The client to be used to connect to engines
- f : callable
- The function to be wrapped into a remote function
- bound : bool [default: False]
- Whether the affect the remote namespace when called
- block : bool [default: None]
- Whether to wait for results or not. The default behavior is
- to use the current `block` attribute of `client`
- targets : valid target list [default: all]
- The targets on which to execute.
- """
-
- client = None # the remote connection
- func = None # the wrapped function
- block = None # whether to block
- bound = None # whether to affect the namespace
- targets = None # where to execute
-
- def __init__(self, client, f, bound=False, block=None, targets=None):
- self.client = client
- self.func = f
- self.block=block
- self.bound=bound
- self.targets=targets
-
- def __call__(self, *args, **kwargs):
- return self.client.apply(self.func, args=args, kwargs=kwargs,
- block=self.block, targets=self.targets, bound=self.bound)
-
-
-class ParallelFunction(RemoteFunction):
- """Class for mapping a function to sequences."""
- def __init__(self, client, f, dist='b', bound=False, block=None, targets='all'):
- super(ParallelFunction, self).__init__(client,f,bound,block,targets)
- mapClass = Map.dists[dist]
- self.mapObject = mapClass()
-
- def __call__(self, *sequences):
- len_0 = len(sequences[0])
- for s in sequences:
- if len(s)!=len_0:
- raise ValueError('all sequences must have equal length')
-
- if self.targets is None:
- # load-balanced:
- engines = [None]*len_0
- else:
- # multiplexed:
- engines = self.client._build_targets(self.targets)[-1]
-
- nparts = len(engines)
- msg_ids = []
- for index, engineid in enumerate(engines):
- args = []
- for seq in sequences:
- args.append(self.mapObject.getPartition(seq, index, nparts))
- mid = self.client.apply(self.func, args=args, block=False,
- bound=self.bound,
- targets=engineid)
- msg_ids.append(mid)
-
- if self.block:
- dg = PendingMapResult(self.client, msg_ids, self.mapObject)
- dg.wait()
- return dg.result
- else:
- return dg
-
-
-class PendingResult(object):
- """Class for representing results of non-blocking calls."""
- def __init__(self, client, msg_ids):
- self.client = client
- self.msg_ids = msg_ids
- self._result = None
- self.done = False
-
- def __repr__(self):
- if self.done:
- return "<%s: finished>"%(self.__class__.__name__)
- else:
- return "<%s: %r>"%(self.__class__.__name__,self.msg_ids)
-
- @property
- def result(self):
- if self._result is not None:
- return self._result
- if not self.done:
- self.wait(0)
- if self.done:
- results = map(self.client.results.get, self.msg_ids)
- results = error.collect_exceptions(results, 'get_result')
- self._result = self.reconstruct_result(results)
- return self._result
- else:
- raise error.ResultNotCompleted
-
- def reconstruct_result(self, res):
- """
- Override me in subclasses for turning a list of results
- into the expected form.
- """
- if len(res) == 1:
- return res[0]
- else:
- return res
-
- def wait(self, timout=-1):
- self.done = self.client.barrier(self.msg_ids)
- return self.done
-
-class PendingMapResult(PendingResult):
- """Class for representing results of non-blocking gathers.
-
- This will properly reconstruct the gather.
- """
-
- def __init__(self, client, msg_ids, mapObject):
- self.mapObject = mapObject
- PendingResult.__init__(self, client, msg_ids)
-
- def reconstruct_result(self, res):
- """Perform the gather on the actual results."""
- return self.mapObject.joinPartitions(res)
-
-
class AbortedTask(object):
"""A basic wrapper object describing an aborted task."""
@@ -944,10 +785,11 @@ def _apply_direct(self, f, args, kwargs, bound=True, block=None, targets=None,
result[target] = self.results[mid]
return error.collect_exceptions(result, f.__name__)
- @defaultblock
- def map(self, f, sequences, targets=None, block=None, bound=False):
- pf = ParallelFunction(self,f,block=block,bound=bound,targets=targets)
- return pf(*sequences)
+ def map(self, f, *sequences):
+ """Parallel version of builtin `map`, using all our engines."""
+ pf = ParallelFunction(self, f, block=self.block,
+ bound=True, targets='all')
+ return pf.map(*sequences)
#--------------------------------------------------------------------------
# Data movement
View
75 IPython/zmq/parallel/pendingresult.py
@@ -0,0 +1,75 @@
+"""PendingResult objects for the client"""
+#-----------------------------------------------------------------------------
+# Copyright (C) 2010 The IPython Development Team
+#
+# Distributed under the terms of the BSD License. The full license is in
+# the file COPYING, distributed as part of this software.
+#-----------------------------------------------------------------------------
+
+#-----------------------------------------------------------------------------
+# Imports
+#-----------------------------------------------------------------------------
+
+import error
+
+#-----------------------------------------------------------------------------
+# Classes
+#-----------------------------------------------------------------------------
+
+class PendingResult(object):
+ """Class for representing results of non-blocking calls."""
+ def __init__(self, client, msg_ids):
+ self.client = client
+ self.msg_ids = msg_ids
+ self._result = None
+ self.done = False
+
+ def __repr__(self):
+ if self.done:
+ return "<%s: finished>"%(self.__class__.__name__)
+ else:
+ return "<%s: %r>"%(self.__class__.__name__,self.msg_ids)
+
+ @property
+ def result(self):
+ if self._result is not None:
+ return self._result
+ if not self.done:
+ self.wait(0)
+ if self.done:
+ results = map(self.client.results.get, self.msg_ids)
+ results = error.collect_exceptions(results, 'get_result')
+ self._result = self.reconstruct_result(results)
+ return self._result
+ else:
+ raise error.ResultNotCompleted
+
+ def reconstruct_result(self, res):
+ """
+ Override me in subclasses for turning a list of results
+ into the expected form.
+ """
+ if len(res) == 1:
+ return res[0]
+ else:
+ return res
+
+ def wait(self, timout=-1):
+ self.done = self.client.barrier(self.msg_ids)
+ return self.done
+
+class PendingMapResult(PendingResult):
+ """Class for representing results of non-blocking gathers.
+
+ This will properly reconstruct the gather.
+ """
+
+ def __init__(self, client, msg_ids, mapObject):
+ self.mapObject = mapObject
+ PendingResult.__init__(self, client, msg_ids)
+
+ def reconstruct_result(self, res):
+ """Perform the gather on the actual results."""
+ return self.mapObject.joinPartitions(res)
+
+
View
145 IPython/zmq/parallel/remotefunction.py
@@ -0,0 +1,145 @@
+"""Remote Functions and decorators for the client."""
+#-----------------------------------------------------------------------------
+# Copyright (C) 2010 The IPython Development Team
+#
+# Distributed under the terms of the BSD License. The full license is in
+# the file COPYING, distributed as part of this software.
+#-----------------------------------------------------------------------------
+
+#-----------------------------------------------------------------------------
+# Imports
+#-----------------------------------------------------------------------------
+
+import map as Map
+from pendingresult import PendingMapResult
+
+#-----------------------------------------------------------------------------
+# Decorators
+#-----------------------------------------------------------------------------
+
+def remote(client, bound=False, block=None, targets=None):
+ """Turn a function into a remote function.
+
+ This method can be used for map:
+
+ >>> @remote(client,block=True)
+ def func(a)
+ """
+ def remote_function(f):
+ return RemoteFunction(client, f, bound, block, targets)
+ return remote_function
+
+def parallel(client, dist='b', bound=False, block=None, targets='all'):
+ """Turn a function into a parallel remote function.
+
+ This method can be used for map:
+
+ >>> @parallel(client,block=True)
+ def func(a)
+ """
+ def parallel_function(f):
+ return ParallelFunction(client, f, dist, bound, block, targets)
+ return parallel_function
+
+#--------------------------------------------------------------------------
+# Classes
+#--------------------------------------------------------------------------
+
+class RemoteFunction(object):
+ """Turn an existing function into a remote function.
+
+ Parameters
+ ----------
+
+ client : Client instance
+ The client to be used to connect to engines
+ f : callable
+ The function to be wrapped into a remote function
+ bound : bool [default: False]
+ Whether the affect the remote namespace when called
+ block : bool [default: None]
+ Whether to wait for results or not. The default behavior is
+ to use the current `block` attribute of `client`
+ targets : valid target list [default: all]
+ The targets on which to execute.
+ """
+
+ client = None # the remote connection
+ func = None # the wrapped function
+ block = None # whether to block
+ bound = None # whether to affect the namespace
+ targets = None # where to execute
+
+ def __init__(self, client, f, bound=False, block=None, targets=None):
+ self.client = client
+ self.func = f
+ self.block=block
+ self.bound=bound
+ self.targets=targets
+
+ def __call__(self, *args, **kwargs):
+ return self.client.apply(self.func, args=args, kwargs=kwargs,
+ block=self.block, targets=self.targets, bound=self.bound)
+
+
+class ParallelFunction(RemoteFunction):
+ """Class for mapping a function to sequences."""
+ def __init__(self, client, f, dist='b', bound=False, block=None, targets='all'):
+ super(ParallelFunction, self).__init__(client,f,bound,block,targets)
+ mapClass = Map.dists[dist]
+ self.mapObject = mapClass()
+
+ def __call__(self, *sequences):
+ len_0 = len(sequences[0])
+ for s in sequences:
+ if len(s)!=len_0:
+ raise ValueError('all sequences must have equal length')
+
+ if self.targets is None:
+ # load-balanced:
+ engines = [None]*len_0
+ elif isinstance(self.targets, int):
+ engines = [None]*self.targets
+ else:
+ # multiplexed:
+ engines = self.client._build_targets(self.targets)[-1]
+
+ nparts = len(engines)
+ msg_ids = []
+ # my_f = lambda *a: map(self.func, *a)
+ for index, engineid in enumerate(engines):
+ args = []
+ for seq in sequences:
+ part = self.mapObject.getPartition(seq, index, nparts)
+ if not part:
+ continue
+ else:
+ args.append(part)
+ if not args:
+ continue
+
+ # print (args)
+ if hasattr(self, '_map'):
+ f = map
+ args = [self.func]+args
+ else:
+ f=self.func
+ mid = self.client.apply(f, args=args, block=False,
+ bound=self.bound,
+ targets=engineid).msg_ids[0]
+ msg_ids.append(mid)
+
+ r = PendingMapResult(self.client, msg_ids, self.mapObject)
+ if self.block:
+ r.wait()
+ return r.result
+ else:
+ return r
+
+ def map(self, *sequences):
+ """call a function on each element of a sequence remotely."""
+ self._map = True
+ ret = self.__call__(*sequences)
+ del self._map
+ return ret
+
View
7 IPython/zmq/parallel/streamkernel.py
@@ -277,7 +277,12 @@ def apply_request(self, stream, ident, parent):
suffix = prefix = "_" # prevent keyword collisions with lambda
f,args,kwargs = unpack_apply_message(bufs, working, copy=False)
# if f.fun
- fname = prefix+f.func_name.strip('<>')+suffix
+ if hasattr(f, 'func_name'):
+ fname = f.func_name
+ else:
+ fname = f.__name__
+
+ fname = prefix+fname.strip('<>')+suffix
argname = prefix+"args"+suffix
kwargname = prefix+"kwargs"+suffix
resultname = prefix+"result"+suffix
View
15 IPython/zmq/parallel/view.py
@@ -11,6 +11,7 @@
#-----------------------------------------------------------------------------
from IPython.external.decorator import decorator
+from IPython.zmq.parallel.remotefunction import ParallelFunction
#-----------------------------------------------------------------------------
# Decorators
@@ -28,8 +29,10 @@ def myblock(f, self, *args, **kwargs):
@decorator
def save_ids(f, self, *args, **kwargs):
"""Keep our history and outstanding attributes up to date after a method call."""
+ n_previous = len(self.client.history)
ret = f(self, *args, **kwargs)
- msg_ids = self.client.history[-self._ntargets:]
+ nmsgs = len(self.client.history) - n_previous
+ msg_ids = self.client.history[-nmsgs:]
self.history.extend(msg_ids)
map(self.outstanding.add, msg_ids)
return ret
@@ -172,6 +175,16 @@ def apply_sync_bound(self, f, *args, **kwargs):
"""
return self.client.apply(f, args, kwargs, block=True, targets=self.targets, bound=True)
+ @spin_after
+ @save_ids
+ def map(self, f, *sequences):
+ """Parallel version of builtin `map`, using this view's engines."""
+ if isinstance(self.targets, int):
+ targets = [self.targets]
+ pf = ParallelFunction(self.client, f, block=self.block,
+ bound=True, targets=targets)
+ return pf.map(*sequences)
+
def abort(self, msg_ids=None, block=None):
"""Abort jobs on my engines.

0 comments on commit 44e6eb1

Please sign in to comment.