Skip to content

Commit

Permalink
dependency tweaks + dependency/scheduler docs
Browse files Browse the repository at this point in the history
  • Loading branch information
minrk committed Apr 8, 2011
1 parent 487a2d0 commit 75d9c51
Show file tree
Hide file tree
Showing 19 changed files with 589 additions and 135 deletions.
20 changes: 8 additions & 12 deletions IPython/zmq/parallel/client.py
Expand Up @@ -863,14 +863,9 @@ def _build_dependency(self, dep):
return dep.msg_ids
elif dep is None:
return []
elif isinstance(dep, set):
return list(dep)
elif isinstance(dep, (list,dict)):
return dep
elif isinstance(dep, str):
return [dep]
else:
raise TypeError("Dependency may be: set,list,dict,Dependency or AsyncResult, not %r"%type(dep))
# pass to Dependency constructor
return list(Dependency(dep))

def apply(self, f, args=None, kwargs=None, bound=True, block=None, targets=None,
after=None, follow=None, timeout=None):
Expand Down Expand Up @@ -921,9 +916,11 @@ def apply(self, f, args=None, kwargs=None, bound=True, block=None, targets=None,
This job will only be run on an engine where this dependency
is met.
timeout : float or None
timeout : float/int or None
Only for load-balanced execution (targets=None)
Specify an amount of time (in seconds)
Specify an amount of time (in seconds) for the scheduler to
wait for dependencies to be met before failing with a
DependencyTimeout.
Returns
-------
Expand All @@ -950,9 +947,6 @@ def apply(self, f, args=None, kwargs=None, bound=True, block=None, targets=None,
if not isinstance(kwargs, dict):
raise TypeError("kwargs must be dict, not %s"%type(kwargs))

after = self._build_dependency(after)
follow = self._build_dependency(follow)

options = dict(bound=bound, block=block)

if targets is None:
Expand Down Expand Up @@ -984,6 +978,8 @@ def _apply_balanced(self, f, args, kwargs, bound=True, block=None,
warnings.warn(msg, RuntimeWarning)


after = self._build_dependency(after)
follow = self._build_dependency(follow)
subheader = dict(after=after, follow=follow, timeout=timeout)
bufs = ss.pack_apply_message(f,args,kwargs)
content = dict(bound=bound)
Expand Down
49 changes: 24 additions & 25 deletions IPython/zmq/parallel/dependency.py
Expand Up @@ -2,13 +2,7 @@

from IPython.external.decorator import decorator
from error import UnmetDependency


# flags
ALL = 1 << 0
ANY = 1 << 1
HERE = 1 << 2
ANYWHERE = 1 << 3
from asyncresult import AsyncResult


class depend(object):
Expand Down Expand Up @@ -59,53 +53,58 @@ class Dependency(set):
Subclassed from set()."""

mode='all'
all=True
success_only=True

def __init__(self, dependencies=[], mode='all', success_only=True):
def __init__(self, dependencies=[], all=True, success_only=True):
if isinstance(dependencies, dict):
# load from dict
mode = dependencies.get('mode', mode)
all = dependencies.get('all', True)
success_only = dependencies.get('success_only', success_only)
dependencies = dependencies.get('dependencies', [])
set.__init__(self, dependencies)
self.mode = mode.lower()
ids = []
if isinstance(dependencies, AsyncResult):
ids.extend(AsyncResult.msg_ids)
else:
for d in dependencies:
if isinstance(d, basestring):
ids.append(d)
elif isinstance(d, AsyncResult):
ids.extend(d.msg_ids)
else:
raise TypeError("invalid dependency type: %r"%type(d))
set.__init__(self, ids)
self.all = all
self.success_only=success_only
if self.mode not in ('any', 'all'):
raise NotImplementedError("Only any|all supported, not %r"%mode)

def check(self, completed, failed=None):
if failed is not None and not self.success_only:
completed = completed.union(failed)
if len(self) == 0:
return True
if self.mode == 'all':
if self.all:
return self.issubset(completed)
elif self.mode == 'any':
return not self.isdisjoint(completed)
else:
raise NotImplementedError("Only any|all supported, not %r"%mode)
return not self.isdisjoint(completed)

def unreachable(self, failed):
if len(self) == 0 or len(failed) == 0 or not self.success_only:
return False
print self, self.success_only, self.mode, failed
if self.mode == 'all':
# print self, self.success_only, self.all, failed
if self.all:
return not self.isdisjoint(failed)
elif self.mode == 'any':
return self.issubset(failed)
else:
raise NotImplementedError("Only any|all supported, not %r"%mode)
return self.issubset(failed)


def as_dict(self):
"""Represent this dependency as a dict. For json compatibility."""
return dict(
dependencies=list(self),
mode=self.mode,
all=self.all,
success_only=self.success_only,
)


__all__ = ['depend', 'require', 'Dependency']
__all__ = ['depend', 'require', 'dependent', 'Dependency']

5 changes: 4 additions & 1 deletion IPython/zmq/parallel/error.py
Expand Up @@ -154,7 +154,10 @@ class UnmetDependency(KernelError):
class ImpossibleDependency(UnmetDependency):
pass

class DependencyTimeout(UnmetDependency):
class DependencyTimeout(ImpossibleDependency):
pass

class InvalidDependency(ImpossibleDependency):
pass

class RemoteError(KernelError):
Expand Down
2 changes: 1 addition & 1 deletion IPython/zmq/parallel/hub.py
Expand Up @@ -100,7 +100,7 @@ class HubFactory(RegistrationFactory):
"""The Configurable for setting up a Hub."""

# name of a scheduler scheme
scheme = Str('lru', config=True)
scheme = Str('leastload', config=True)

# port-pairs for monitoredqueues:
hb = Instance(list, config=True)
Expand Down
25 changes: 19 additions & 6 deletions IPython/zmq/parallel/ipclusterapp.py
Expand Up @@ -20,7 +20,9 @@
import os
import signal
import logging
import errno

import zmq
from zmq.eventloop import ioloop

from IPython.external.argparse import ArgumentParser, SUPPRESS
Expand Down Expand Up @@ -385,7 +387,8 @@ def start_launchers(self, controller=True):
# observing of engine stopping is inconsistent. Some launchers
# might trigger on a single engine stopping, other wait until
# all stop. TODO: think more about how to handle this.

else:
self.controller_launcher = None

el_class = import_item(config.Global.engine_launcher)
self.engine_launcher = el_class(
Expand Down Expand Up @@ -427,7 +430,7 @@ def start_engines(self, r=None):

def stop_controller(self, r=None):
# self.log.info("In stop_controller")
if self.controller_launcher.running:
if self.controller_launcher and self.controller_launcher.running:
return self.controller_launcher.stop()

def stop_engines(self, r=None):
Expand Down Expand Up @@ -516,8 +519,13 @@ def start_app_start(self):
self.write_pid_file()
try:
self.loop.start()
except:
self.log.info("stopping...")
except KeyboardInterrupt:
pass
except zmq.ZMQError as e:
if e.errno == errno.EINTR:
pass
else:
raise
self.remove_pid_file()

def start_app_engines(self):
Expand All @@ -539,8 +547,13 @@ def start_app_engines(self):
# self.write_pid_file()
try:
self.loop.start()
except:
self.log.fatal("stopping...")
except KeyboardInterrupt:
pass
except zmq.ZMQError as e:
if e.errno == errno.EINTR:
pass
else:
raise
# self.remove_pid_file()

def start_app_stop(self):
Expand Down

0 comments on commit 75d9c51

Please sign in to comment.