Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

remove @default_block from client, in favor of clearer implementation

also remove dead _execute/_push/_pull methods from client after transition to View as primary API provider
  • Loading branch information...
commit b191e9a348d73d3541cd38587c9da85d58a4d6de 1 parent 4312fde
@minrk authored
Showing with 7 additions and 67 deletions.
  1. +7 −67 IPython/parallel/client/client.py
View
74 IPython/parallel/client/client.py
@@ -47,19 +47,6 @@ def spin_first(f, self, *args, **kwargs):
self.spin()
return f(self, *args, **kwargs)
-@decorator
-def default_block(f, self, *args, **kwargs):
- """Default to self.block; preserve self.block."""
- block = kwargs.get('block',None)
- block = self.block if block is None else block
- saveblock = self.block
- self.block = block
- try:
- ret = f(self, *args, **kwargs)
- finally:
- self.block = saveblock
- return ret
-
#--------------------------------------------------------------------------
# Classes
@@ -788,14 +775,14 @@ def wait(self, jobs=None, timeout=-1):
#--------------------------------------------------------------------------
@spin_first
- @default_block
def clear(self, targets=None, block=None):
"""Clear the namespace in target(s)."""
+ block = self.block if block is None else block
targets = self._build_targets(targets)[0]
for t in targets:
self.session.send(self._control_socket, 'clear_request', content={}, ident=t)
error = False
- if self.block:
+ if block:
self._flush_ignored_control()
for i in range(len(targets)):
idents,msg = self.session.recv(self._control_socket,0)
@@ -810,7 +797,6 @@ def clear(self, targets=None, block=None):
@spin_first
- @default_block
def abort(self, jobs=None, targets=None, block=None):
"""Abort specific jobs from the execution queues of target(s).
@@ -825,6 +811,7 @@ def abort(self, jobs=None, targets=None, block=None):
"""
+ block = self.block if block is None else block
targets = self._build_targets(targets)[0]
msg_ids = []
if isinstance(jobs, (basestring,AsyncResult)):
@@ -842,7 +829,7 @@ def abort(self, jobs=None, targets=None, block=None):
self.session.send(self._control_socket, 'abort_request',
content=content, ident=t)
error = False
- if self.block:
+ if block:
self._flush_ignored_control()
for i in range(len(targets)):
idents,msg = self.session.recv(self._control_socket,0)
@@ -856,9 +843,9 @@ def abort(self, jobs=None, targets=None, block=None):
raise error
@spin_first
- @default_block
def shutdown(self, targets=None, restart=False, hub=False, block=None):
"""Terminates one or more engine processes, optionally including the hub."""
+ block = self.block if block is None else block
if hub:
targets = 'all'
targets = self._build_targets(targets)[0]
@@ -890,29 +877,9 @@ def shutdown(self, targets=None, restart=False, hub=False, block=None):
raise error
#--------------------------------------------------------------------------
- # Execution methods
+ # Execution related methods
#--------------------------------------------------------------------------
- @default_block
- def _execute(self, code, targets='all', block=None):
- """Executes `code` on `targets` in blocking or nonblocking manner.
-
- ``execute`` is always `bound` (affects engine namespace)
-
- Parameters
- ----------
-
- code : str
- the code string to be executed
- targets : int/str/list of ints/strs
- the engines on which to execute
- default : all
- block : bool
- whether or not to wait until done to return
- default: self.block
- """
- return self[targets].execute(code, block=block)
-
def _maybe_raise(self, result):
"""wrapper for maybe raising an exception if apply failed."""
if isinstance(result, error.RemoteError):
@@ -1008,38 +975,10 @@ def direct_view(self, targets='all'):
return DirectView(client=self, socket=self._mux_socket, targets=targets)
#--------------------------------------------------------------------------
- # Data movement (TO BE REMOVED)
- #--------------------------------------------------------------------------
-
- @default_block
- def _push(self, ns, targets='all', block=None, track=False):
- """Push the contents of `ns` into the namespace on `target`"""
- if not isinstance(ns, dict):
- raise TypeError("Must be a dict, not %s"%type(ns))
- result = self.apply(util._push, kwargs=ns, targets=targets, block=block, bound=True, balanced=False, track=track)
- if not block:
- return result
-
- @default_block
- def _pull(self, keys, targets='all', block=None):
- """Pull objects from `target`'s namespace by `keys`"""
- if isinstance(keys, basestring):
- pass
- elif isinstance(keys, (list,tuple,set)):
- for key in keys:
- if not isinstance(key, basestring):
- raise TypeError("keys must be str, not type %r"%type(key))
- else:
- raise TypeError("keys must be strs, not %r"%keys)
- result = self.apply(util._pull, (keys,), targets=targets, block=block, bound=True, balanced=False)
- return result
-
- #--------------------------------------------------------------------------
# Query methods
#--------------------------------------------------------------------------
@spin_first
- @default_block
def get_result(self, indices_or_msg_ids=None, block=None):
"""Retrieve a result by msg_id or history index, wrapped in an AsyncResult object.
@@ -1077,6 +1016,7 @@ def get_result(self, indices_or_msg_ids=None, block=None):
A subclass of AsyncResult that retrieves results from the Hub
"""
+ block = self.block if block is None else block
if indices_or_msg_ids is None:
indices_or_msg_ids = -1

0 comments on commit b191e9a

Please sign in to comment.
Something went wrong with that request. Please try again.