Permalink
Browse files

tweaks related to docs + add activate() for magics

  • Loading branch information...
1 parent cd84656 commit 5172055435f369d0c0daf1cdf637bfba14381a21 @minrk minrk committed Jan 27, 2011
Showing with 112 additions and 29 deletions.
  1. +56 −22 IPython/zmq/parallel/client.py
  2. +56 −7 IPython/zmq/parallel/view.py
@@ -10,8 +10,6 @@
# Imports
#-----------------------------------------------------------------------------
-from __future__ import print_function
-
import os
import time
from getpass import getpass
@@ -57,7 +55,7 @@ def _clear():
"""helper method for implementing `client.clear` via `client.apply`"""
globals().clear()
-def execute(code):
+def _execute(code):
"""helper method for implementing `client.execute` via `client.apply`"""
exec code in globals()
@@ -79,8 +77,10 @@ def defaultblock(f, self, *args, **kwargs):
block = self.block if block is None else block
saveblock = self.block
self.block = block
- ret = f(self, *args, **kwargs)
- self.block = saveblock
+ try:
+ ret = f(self, *args, **kwargs)
+ finally:
+ self.block = saveblock
return ret
@@ -198,13 +198,15 @@ class Client(object):
results = None
history = None
debug = False
+ targets = None
def __init__(self, addr='tcp://127.0.0.1:10101', context=None, username=None, debug=False,
sshserver=None, sshkey=None, password=None, paramiko=None,
exec_key=None,):
if context is None:
context = zmq.Context()
self.context = context
+ self.targets = 'all'
self._addr = addr
self._ssh = bool(sshserver or sshkey or password)
if self._ssh and sshserver is None:
@@ -478,7 +480,7 @@ def barrier(self, msg_ids=None, timeout=-1):
Parameters
----------
- msg_ids : int, str, or list of ints and/or strs
+ msg_ids : int, str, or list of ints and/or strs, or one or more AsyncResult objects
ints are indices to self.history
strs are msg_ids
default: wait on all outstanding messages
@@ -495,13 +497,18 @@ def barrier(self, msg_ids=None, timeout=-1):
if msg_ids is None:
theids = self.outstanding
else:
- if isinstance(msg_ids, (int, str)):
+ if isinstance(msg_ids, (int, str, AsyncResult)):
msg_ids = [msg_ids]
theids = set()
for msg_id in msg_ids:
if isinstance(msg_id, int):
msg_id = self.history[msg_id]
+ elif isinstance(msg_id, AsyncResult):
+ map(theids.add, msg_id._msg_ids)
+ continue
theids.add(msg_id)
+ if not theids.intersection(self.outstanding):
+ return True
self.spin()
while theids.intersection(self.outstanding):
if timeout >= 0 and ( time.time()-tic ) > timeout:
@@ -607,7 +614,7 @@ def execute(self, code, targets='all', block=None):
whether or not to wait until done to return
default: self.block
"""
- result = self.apply(execute, (code,), targets=targets, block=block, bound=True)
+ result = self.apply(_execute, (code,), targets=targets, block=self.block, bound=True)
return result
def run(self, code, block=None):
@@ -796,11 +803,11 @@ def map(self, f, *sequences):
return pf.map(*sequences)
def parallel(self, bound=True, targets='all', block=True):
- """Decorator for making a ParallelFunction"""
+ """Decorator for making a ParallelFunction."""
return parallel(self, bound=bound, targets=targets, block=block)
def remote(self, bound=True, targets='all', block=True):
- """Decorator for making a RemoteFunction"""
+ """Decorator for making a RemoteFunction."""
return remote(self, bound=bound, targets=targets, block=block)
#--------------------------------------------------------------------------
@@ -816,7 +823,7 @@ def push(self, ns, targets='all', block=None):
return result
@defaultblock
- def pull(self, keys, targets='all', block=True):
+ def pull(self, keys, targets='all', block=None):
"""Pull objects from `target`'s namespace by `keys`"""
if isinstance(keys, str):
pass
@@ -827,45 +834,43 @@ def pull(self, keys, targets='all', block=True):
result = self.apply(_pull, (keys,), targets=targets, block=block, bound=True)
return result
- @defaultblock
def scatter(self, key, seq, dist='b', flatten=False, targets='all', block=None):
"""
Partition a Python sequence and send the partitions to a set of engines.
"""
+ block = block if block is not None else self.block
targets = self._build_targets(targets)[-1]
mapObject = Map.dists[dist]()
nparts = len(targets)
msg_ids = []
for index, engineid in enumerate(targets):
partition = mapObject.getPartition(seq, index, nparts)
if flatten and len(partition) == 1:
- mid = self.push({key: partition[0]}, targets=engineid, block=False)
+ r = self.push({key: partition[0]}, targets=engineid, block=False)
else:
- mid = self.push({key: partition}, targets=engineid, block=False)
- msg_ids.append(mid)
+ r = self.push({key: partition}, targets=engineid, block=False)
+ msg_ids.extend(r._msg_ids)
r = AsyncResult(self, msg_ids)
if block:
- r.wait()
- return
+ return r.get()
else:
return r
- @defaultblock
- def gather(self, key, dist='b', targets='all', block=True):
+ def gather(self, key, dist='b', targets='all', block=None):
"""
Gather a partitioned sequence on a set of engines as a single local seq.
"""
+ block = block if block is not None else self.block
targets = self._build_targets(targets)[-1]
mapObject = Map.dists[dist]()
msg_ids = []
for index, engineid in enumerate(targets):
- msg_ids.append(self.pull(key, targets=engineid,block=False))
+ msg_ids.extend(self.pull(key, targets=engineid,block=False)._msg_ids)
r = AsyncMapResult(self, msg_ids, mapObject)
if block:
- r.wait()
- return r.result
+ return r.get()
else:
return r
@@ -980,6 +985,35 @@ def purge_results(self, msg_ids=[], targets=[]):
if content['status'] != 'ok':
raise ss.unwrap_exception(content)
+ #----------------------------------------
+ # activate for %px,%autopx magics
+ #----------------------------------------
+ def activate(self):
+ """Make this `View` active for parallel magic commands.
+
+ IPython has a magic command syntax to work with `MultiEngineClient` objects.
+ In a given IPython session there is a single active one. While
+ there can be many `Views` created and used by the user,
+ there is only one active one. The active `View` is used whenever
+ the magic commands %px and %autopx are used.
+
+ The activate() method is called on a given `View` to make it
+ active. Once this has been done, the magic commands can be used.
+ """
+
+ try:
+ # This is injected into __builtins__.
+ ip = get_ipython()
+ except NameError:
+ print "The IPython parallel magics (%result, %px, %autopx) only work within IPython."
+ else:
+ pmagic = ip.plugin_manager.get_plugin('parallelmagic')
+ if pmagic is not None:
+ pmagic.active_multiengine_client = self
+ else:
+ print "You must first load the parallelmagic extension " \
+ "by doing '%load_ext parallelmagic'"
+
class AsynClient(Client):
"""An Asynchronous client, using the Tornado Event Loop.
!!!unfinished!!!"""
@@ -11,7 +11,7 @@
#-----------------------------------------------------------------------------
from IPython.external.decorator import decorator
-from IPython.zmq.parallel.remotefunction import ParallelFunction
+from IPython.zmq.parallel.remotefunction import ParallelFunction, parallel
#-----------------------------------------------------------------------------
# Decorators
@@ -22,8 +22,10 @@ def myblock(f, self, *args, **kwargs):
"""override client.block with self.block during a call"""
block = self.client.block
self.client.block = self.block
- ret = f(self, *args, **kwargs)
- self.client.block = block
+ try:
+ ret = f(self, *args, **kwargs)
+ finally:
+ self.client.block = block
return ret
@decorator
@@ -65,7 +67,6 @@ class View(object):
Don't use this class, use subclasses.
"""
_targets = None
- _ntargets = None
block=None
bound=None
history=None
@@ -75,7 +76,7 @@ def __init__(self, client, targets=None):
self._targets = targets
self._ntargets = 1 if isinstance(targets, (int,type(None))) else len(targets)
self.block = client.block
- self.bound=True
+ self.bound=False
self.history = []
self.outstanding = set()
self.results = {}
@@ -92,7 +93,8 @@ def targets(self):
@targets.setter
def targets(self, value):
- raise AttributeError("Cannot set my targets argument after construction!")
+ self._targets = value
+ # raise AttributeError("Cannot set my targets argument after construction!")
@sync_results
def spin(self):
@@ -185,6 +187,10 @@ def map(self, f, *sequences):
bound=True, targets=targets)
return pf.map(*sequences)
+ def parallel(self, bound=True, block=True):
+ """Decorator for making a ParallelFunction"""
+ return parallel(self.client, bound=bound, targets=self.targets, block=block)
+
def abort(self, msg_ids=None, block=None):
"""Abort jobs on my engines.
@@ -202,11 +208,12 @@ def queue_status(self, verbose=False):
"""Fetch the Queue status of my engines"""
return self.client.queue_status(targets=self.targets, verbose=verbose)
- def purge_results(self, msg_ids=[],targets=[]):
+ def purge_results(self, msg_ids=[], targets=[]):
"""Instruct the controller to forget specific results."""
if targets is None or targets == 'all':
targets = self.targets
return self.client.purge_results(msg_ids=msg_ids, targets=targets)
+
class DirectView(View):
@@ -219,8 +226,16 @@ class DirectView(View):
>>> dv_even = client[::2]
>>> dv_some = client[1:3]
+ This object provides dictionary access
+
"""
+ @sync_results
+ @save_ids
+ def execute(self, code, block=True):
+ """execute some code on my targets."""
+ return self.client.execute(code, block=self.block, targets=self.targets)
+
def update(self, ns):
"""update remote namespace with dict `ns`"""
return self.client.push(ns, targets=self.targets, block=self.block)
@@ -234,6 +249,8 @@ def get(self, key_s):
# block = block if block is not None else self.block
return self.client.pull(key_s, block=True, targets=self.targets)
+ @sync_results
+ @save_ids
def pull(self, key_s, block=True):
"""get object(s) by `key_s` from remote namespace
will return one object if it is a key.
@@ -252,6 +269,8 @@ def scatter(self, key, seq, dist='b', flatten=False, targets=None, block=None):
return self.client.scatter(key, seq, dist=dist, flatten=flatten,
targets=targets, block=block)
+ @sync_results
+ @save_ids
def gather(self, key, dist='b', targets=None, block=True):
"""
Gather a partitioned sequence on a set of engines as a single local seq.
@@ -278,6 +297,36 @@ def kill(self, block=True):
block = block if block is not None else self.block
return self.client.kill(targets=self.targets, block=block)
+ #----------------------------------------
+ # activate for %px,%autopx magics
+ #----------------------------------------
+ def activate(self):
+ """Make this `View` active for parallel magic commands.
+
+ IPython has a magic command syntax to work with `MultiEngineClient` objects.
+ In a given IPython session there is a single active one. While
+ there can be many `Views` created and used by the user,
+ there is only one active one. The active `View` is used whenever
+ the magic commands %px and %autopx are used.
+
+ The activate() method is called on a given `View` to make it
+ active. Once this has been done, the magic commands can be used.
+ """
+
+ try:
+ # This is injected into __builtins__.
+ ip = get_ipython()
+ except NameError:
+ print "The IPython parallel magics (%result, %px, %autopx) only work within IPython."
+ else:
+ pmagic = ip.plugin_manager.get_plugin('parallelmagic')
+ if pmagic is not None:
+ pmagic.active_multiengine_client = self
+ else:
+ print "You must first load the parallelmagic extension " \
+ "by doing '%load_ext parallelmagic'"
+
+
class LoadBalancedView(View):
"""An engine-agnostic View that only executes via the Task queue.

0 comments on commit 5172055

Please sign in to comment.