diff --git a/IPython/zmq/parallel/client.py b/IPython/zmq/parallel/client.py index a6c5e847e80..fb082e846ef 100644 --- a/IPython/zmq/parallel/client.py +++ b/IPython/zmq/parallel/client.py @@ -863,14 +863,9 @@ def _build_dependency(self, dep): return dep.msg_ids elif dep is None: return [] - elif isinstance(dep, set): - return list(dep) - elif isinstance(dep, (list,dict)): - return dep - elif isinstance(dep, str): - return [dep] else: - raise TypeError("Dependency may be: set,list,dict,Dependency or AsyncResult, not %r"%type(dep)) + # pass to Dependency constructor + return list(Dependency(dep)) def apply(self, f, args=None, kwargs=None, bound=True, block=None, targets=None, after=None, follow=None, timeout=None): @@ -921,9 +916,11 @@ def apply(self, f, args=None, kwargs=None, bound=True, block=None, targets=None, This job will only be run on an engine where this dependency is met. - timeout : float or None + timeout : float/int or None Only for load-balanced execution (targets=None) - Specify an amount of time (in seconds) + Specify an amount of time (in seconds) for the scheduler to + wait for dependencies to be met before failing with a + DependencyTimeout. Returns ------- @@ -950,9 +947,6 @@ def apply(self, f, args=None, kwargs=None, bound=True, block=None, targets=None, if not isinstance(kwargs, dict): raise TypeError("kwargs must be dict, not %s"%type(kwargs)) - after = self._build_dependency(after) - follow = self._build_dependency(follow) - options = dict(bound=bound, block=block) if targets is None: @@ -984,6 +978,8 @@ def _apply_balanced(self, f, args, kwargs, bound=True, block=None, warnings.warn(msg, RuntimeWarning) + after = self._build_dependency(after) + follow = self._build_dependency(follow) subheader = dict(after=after, follow=follow, timeout=timeout) bufs = ss.pack_apply_message(f,args,kwargs) content = dict(bound=bound) diff --git a/IPython/zmq/parallel/dependency.py b/IPython/zmq/parallel/dependency.py index 7f78097f935..0a511cc8f69 100644 --- a/IPython/zmq/parallel/dependency.py +++ b/IPython/zmq/parallel/dependency.py @@ -2,13 +2,7 @@ from IPython.external.decorator import decorator from error import UnmetDependency - - -# flags -ALL = 1 << 0 -ANY = 1 << 1 -HERE = 1 << 2 -ANYWHERE = 1 << 3 +from asyncresult import AsyncResult class depend(object): @@ -59,53 +53,58 @@ class Dependency(set): Subclassed from set().""" - mode='all' + all=True success_only=True - def __init__(self, dependencies=[], mode='all', success_only=True): + def __init__(self, dependencies=[], all=True, success_only=True): if isinstance(dependencies, dict): # load from dict - mode = dependencies.get('mode', mode) + all = dependencies.get('all', True) success_only = dependencies.get('success_only', success_only) dependencies = dependencies.get('dependencies', []) - set.__init__(self, dependencies) - self.mode = mode.lower() + ids = [] + if isinstance(dependencies, AsyncResult): + ids.extend(AsyncResult.msg_ids) + else: + for d in dependencies: + if isinstance(d, basestring): + ids.append(d) + elif isinstance(d, AsyncResult): + ids.extend(d.msg_ids) + else: + raise TypeError("invalid dependency type: %r"%type(d)) + set.__init__(self, ids) + self.all = all self.success_only=success_only - if self.mode not in ('any', 'all'): - raise NotImplementedError("Only any|all supported, not %r"%mode) def check(self, completed, failed=None): if failed is not None and not self.success_only: completed = completed.union(failed) if len(self) == 0: return True - if self.mode == 'all': + if self.all: return self.issubset(completed) - elif self.mode == 'any': - return not self.isdisjoint(completed) else: - raise NotImplementedError("Only any|all supported, not %r"%mode) + return not self.isdisjoint(completed) def unreachable(self, failed): if len(self) == 0 or len(failed) == 0 or not self.success_only: return False - print self, self.success_only, self.mode, failed - if self.mode == 'all': + # print self, self.success_only, self.all, failed + if self.all: return not self.isdisjoint(failed) - elif self.mode == 'any': - return self.issubset(failed) else: - raise NotImplementedError("Only any|all supported, not %r"%mode) + return self.issubset(failed) def as_dict(self): """Represent this dependency as a dict. For json compatibility.""" return dict( dependencies=list(self), - mode=self.mode, + all=self.all, success_only=self.success_only, ) -__all__ = ['depend', 'require', 'Dependency'] +__all__ = ['depend', 'require', 'dependent', 'Dependency'] diff --git a/IPython/zmq/parallel/error.py b/IPython/zmq/parallel/error.py index a52b512fc73..0cf4058f213 100644 --- a/IPython/zmq/parallel/error.py +++ b/IPython/zmq/parallel/error.py @@ -154,7 +154,10 @@ class UnmetDependency(KernelError): class ImpossibleDependency(UnmetDependency): pass -class DependencyTimeout(UnmetDependency): +class DependencyTimeout(ImpossibleDependency): + pass + +class InvalidDependency(ImpossibleDependency): pass class RemoteError(KernelError): diff --git a/IPython/zmq/parallel/hub.py b/IPython/zmq/parallel/hub.py index 3c9b31c8b12..a2637070580 100755 --- a/IPython/zmq/parallel/hub.py +++ b/IPython/zmq/parallel/hub.py @@ -100,7 +100,7 @@ class HubFactory(RegistrationFactory): """The Configurable for setting up a Hub.""" # name of a scheduler scheme - scheme = Str('lru', config=True) + scheme = Str('leastload', config=True) # port-pairs for monitoredqueues: hb = Instance(list, config=True) diff --git a/IPython/zmq/parallel/ipclusterapp.py b/IPython/zmq/parallel/ipclusterapp.py index b4fb995cf19..cbeed954a08 100755 --- a/IPython/zmq/parallel/ipclusterapp.py +++ b/IPython/zmq/parallel/ipclusterapp.py @@ -20,7 +20,9 @@ import os import signal import logging +import errno +import zmq from zmq.eventloop import ioloop from IPython.external.argparse import ArgumentParser, SUPPRESS @@ -385,7 +387,8 @@ def start_launchers(self, controller=True): # observing of engine stopping is inconsistent. Some launchers # might trigger on a single engine stopping, other wait until # all stop. TODO: think more about how to handle this. - + else: + self.controller_launcher = None el_class = import_item(config.Global.engine_launcher) self.engine_launcher = el_class( @@ -427,7 +430,7 @@ def start_engines(self, r=None): def stop_controller(self, r=None): # self.log.info("In stop_controller") - if self.controller_launcher.running: + if self.controller_launcher and self.controller_launcher.running: return self.controller_launcher.stop() def stop_engines(self, r=None): @@ -516,8 +519,13 @@ def start_app_start(self): self.write_pid_file() try: self.loop.start() - except: - self.log.info("stopping...") + except KeyboardInterrupt: + pass + except zmq.ZMQError as e: + if e.errno == errno.EINTR: + pass + else: + raise self.remove_pid_file() def start_app_engines(self): @@ -539,8 +547,13 @@ def start_app_engines(self): # self.write_pid_file() try: self.loop.start() - except: - self.log.fatal("stopping...") + except KeyboardInterrupt: + pass + except zmq.ZMQError as e: + if e.errno == errno.EINTR: + pass + else: + raise # self.remove_pid_file() def start_app_stop(self): diff --git a/IPython/zmq/parallel/scheduler.py b/IPython/zmq/parallel/scheduler.py index ae2f851129f..4e0430a39b6 100644 --- a/IPython/zmq/parallel/scheduler.py +++ b/IPython/zmq/parallel/scheduler.py @@ -127,7 +127,7 @@ class TaskScheduler(SessionFactory): mon_stream = Instance(zmqstream.ZMQStream) # hub-facing pub stream # internals: - dependencies = Dict() # dict by msg_id of [ msg_ids that depend on key ] + graph = Dict() # dict by msg_id of [ msg_ids that depend on key ] depending = Dict() # dict by msg_id of (msg_id, raw_msg, after, follow) pending = Dict() # dict by engine_uuid of submitted tasks completed = Dict() # dict by engine_uuid of completed tasks @@ -139,6 +139,7 @@ class TaskScheduler(SessionFactory): all_completed = Set() # set of all completed tasks all_failed = Set() # set of all failed tasks all_done = Set() # set of all finished tasks=union(completed,failed) + all_ids = Set() # set of all submitted task IDs blacklist = Dict() # dict by msg_id of locations where a job has encountered UnmetDependency auditor = Instance('zmq.eventloop.ioloop.PeriodicCallback') @@ -239,7 +240,7 @@ def handle_stranded_tasks(self, engine): msg = self.session.send(self.client_stream, 'apply_reply', content, parent=parent, ident=idents) self.session.send(self.mon_stream, msg, ident=['outtask']+idents) - self.update_dependencies(msg_id) + self.update_graph(msg_id) #----------------------------------------------------------------------- @@ -252,20 +253,21 @@ def dispatch_submission(self, raw_msg): self.notifier_stream.flush() try: idents, msg = self.session.feed_identities(raw_msg, copy=False) - except Exception as e: - self.log.error("task::Invaid msg: %s"%msg) + msg = self.session.unpack_message(msg, content=False, copy=False) + except: + self.log.error("task::Invaid task: %s"%raw_msg, exc_info=True) return # send to monitor self.mon_stream.send_multipart(['intask']+raw_msg, copy=False) - msg = self.session.unpack_message(msg, content=False, copy=False) header = msg['header'] msg_id = header['msg_id'] + self.all_ids.add(msg_id) # time dependencies after = Dependency(header.get('after', [])) - if after.mode == 'all': + if after.all: after.difference_update(self.all_completed) if not after.success_only: after.difference_update(self.all_failed) @@ -276,10 +278,16 @@ def dispatch_submission(self, raw_msg): # location dependencies follow = Dependency(header.get('follow', [])) - # check if unreachable: - if after.unreachable(self.all_failed) or follow.unreachable(self.all_failed): - self.depending[msg_id] = [raw_msg,MET,MET,None] - return self.fail_unreachable(msg_id) + + for dep in after,follow: + # check valid: + if msg_id in dep or dep.difference(self.all_ids): + self.depending[msg_id] = [raw_msg,MET,MET,None] + return self.fail_unreachable(msg_id, error.InvalidDependency) + # check if unreachable: + if dep.unreachable(self.all_failed): + self.depending[msg_id] = [raw_msg,MET,MET,None] + return self.fail_unreachable(msg_id) # turn timeouts into datetime objects: timeout = header.get('timeout', None) @@ -288,7 +296,7 @@ def dispatch_submission(self, raw_msg): if after.check(self.all_completed, self.all_failed): # time deps already met, try to run - if not self.maybe_run(msg_id, raw_msg, follow): + if not self.maybe_run(msg_id, raw_msg, follow, timeout): # can't run yet self.save_unmet(msg_id, raw_msg, after, follow, timeout) else: @@ -306,25 +314,23 @@ def audit_timeouts(self): self.fail_unreachable(msg_id, timeout=True) @logged - def fail_unreachable(self, msg_id, timeout=False): + def fail_unreachable(self, msg_id, why=error.ImpossibleDependency): """a message has become unreachable""" if msg_id not in self.depending: self.log.error("msg %r already failed!"%msg_id) return raw_msg, after, follow, timeout = self.depending.pop(msg_id) for mid in follow.union(after): - if mid in self.dependencies: - self.dependencies[mid].remove(msg_id) + if mid in self.graph: + self.graph[mid].remove(msg_id) # FIXME: unpacking a message I've already unpacked, but didn't save: idents,msg = self.session.feed_identities(raw_msg, copy=False) msg = self.session.unpack_message(msg, copy=False, content=False) header = msg['header'] - impossible = error.DependencyTimeout if timeout else error.ImpossibleDependency - try: - raise impossible() + raise why() except: content = ss.wrap_exception() @@ -335,10 +341,10 @@ def fail_unreachable(self, msg_id, timeout=False): parent=header, ident=idents) self.session.send(self.mon_stream, msg, ident=['outtask']+idents) - self.update_dependencies(msg_id, success=False) + self.update_graph(msg_id, success=False) @logged - def maybe_run(self, msg_id, raw_msg, follow=None): + def maybe_run(self, msg_id, raw_msg, follow=None, timeout=None): """check location dependencies, and run if they are met.""" if follow: @@ -349,8 +355,7 @@ def can_run(idx): indices = filter(can_run, range(len(self.targets))) if not indices: - # TODO evaluate unmeetable follow dependencies - if follow.mode == 'all': + if follow.all: dests = set() relevant = self.all_completed if follow.success_only else self.all_done for m in follow.intersection(relevant): @@ -363,7 +368,7 @@ def can_run(idx): else: indices = None - self.submit_task(msg_id, raw_msg, indices) + self.submit_task(msg_id, raw_msg, follow, timeout, indices) return True @logged @@ -372,12 +377,12 @@ def save_unmet(self, msg_id, raw_msg, after, follow, timeout): self.depending[msg_id] = [raw_msg,after,follow,timeout] # track the ids in follow or after, but not those already finished for dep_id in after.union(follow).difference(self.all_done): - if dep_id not in self.dependencies: - self.dependencies[dep_id] = set() - self.dependencies[dep_id].add(msg_id) + if dep_id not in self.graph: + self.graph[dep_id] = set() + self.graph[dep_id].add(msg_id) @logged - def submit_task(self, msg_id, raw_msg, follow=None, indices=None): + def submit_task(self, msg_id, raw_msg, follow, timeout, indices=None): """Submit a task to any of a subset of our targets.""" if indices: loads = [self.loads[i] for i in indices] @@ -391,7 +396,7 @@ def submit_task(self, msg_id, raw_msg, follow=None, indices=None): self.engine_stream.send(target, flags=zmq.SNDMORE, copy=False) self.engine_stream.send_multipart(raw_msg, copy=False) self.add_job(idx) - self.pending[target][msg_id] = (raw_msg, follow) + self.pending[target][msg_id] = (raw_msg, follow, timeout) content = dict(msg_id=msg_id, engine_id=target) self.session.send(self.mon_stream, 'task_destination', content=content, ident=['tracktask',self.session.session]) @@ -403,10 +408,11 @@ def submit_task(self, msg_id, raw_msg, follow=None, indices=None): def dispatch_result(self, raw_msg): try: idents,msg = self.session.feed_identities(raw_msg, copy=False) - except Exception as e: - self.log.error("task::Invaid result: %s"%msg) + msg = self.session.unpack_message(msg, content=False, copy=False) + except: + self.log.error("task::Invaid result: %s"%raw_msg, exc_info=True) return - msg = self.session.unpack_message(msg, content=False, copy=False) + header = msg['header'] if header.get('dependencies_met', True): success = (header['status'] == 'ok') @@ -438,7 +444,7 @@ def handle_result(self, idents, parent, raw_msg, success=True): self.all_done.add(msg_id) self.destinations[msg_id] = engine - self.update_dependencies(msg_id, success) + self.update_graph(msg_id, success) @logged def handle_unmet_dependency(self, idents, parent): @@ -448,30 +454,30 @@ def handle_unmet_dependency(self, idents, parent): self.blacklist[msg_id] = set() self.blacklist[msg_id].add(engine) raw_msg,follow,timeout = self.pending[engine].pop(msg_id) - if not self.maybe_run(msg_id, raw_msg, follow): + if not self.maybe_run(msg_id, raw_msg, follow, timeout): # resubmit failed, put it back in our dependency tree self.save_unmet(msg_id, raw_msg, MET, follow, timeout) pass @logged - def update_dependencies(self, dep_id, success=True): + def update_graph(self, dep_id, success=True): """dep_id just finished. Update our dependency table and submit any jobs that just became runable.""" # print ("\n\n***********") # pprint (dep_id) - # pprint (self.dependencies) + # pprint (self.graph) # pprint (self.depending) # pprint (self.all_completed) # pprint (self.all_failed) # print ("\n\n***********\n\n") - if dep_id not in self.dependencies: + if dep_id not in self.graph: return - jobs = self.dependencies.pop(dep_id) + jobs = self.graph.pop(dep_id) for msg_id in jobs: raw_msg, after, follow, timeout = self.depending[msg_id] # if dep_id in after: - # if after.mode == 'all' and (success or not after.success_only): + # if after.all and (success or not after.success_only): # after.remove(dep_id) if after.unreachable(self.all_failed) or follow.unreachable(self.all_failed): @@ -479,12 +485,12 @@ def update_dependencies(self, dep_id, success=True): elif after.check(self.all_completed, self.all_failed): # time deps met, maybe run self.depending[msg_id][1] = MET - if self.maybe_run(msg_id, raw_msg, follow): + if self.maybe_run(msg_id, raw_msg, follow, timeout): self.depending.pop(msg_id) for mid in follow.union(after): - if mid in self.dependencies: - self.dependencies[mid].remove(msg_id) + if mid in self.graph: + self.graph[mid].remove(msg_id) #---------------------------------------------------------------------- # methods to be overridden by subclasses @@ -506,7 +512,8 @@ def finish_job(self, idx): -def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, config=None,logname='ZMQ', log_addr=None, loglevel=logging.DEBUG, scheme='weighted'): +def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, config=None,logname='ZMQ', + log_addr=None, loglevel=logging.DEBUG, scheme='lru'): from zmq.eventloop import ioloop from zmq.eventloop.zmqstream import ZMQStream diff --git a/IPython/zmq/parallel/view.py b/IPython/zmq/parallel/view.py index 3249dd27b76..a67c4da688a 100644 --- a/IPython/zmq/parallel/view.py +++ b/IPython/zmq/parallel/view.py @@ -228,7 +228,12 @@ class DirectView(View): >>> dv_even = client[::2] >>> dv_some = client[1:3] - This object provides dictionary access + This object provides dictionary access to engine namespaces: + + # push a=5: + >>> dv['a'] = 5 + # pull 'foo': + >>> db['foo'] """ diff --git a/docs/examples/newparallel/demo/dag/dagdeps.py b/docs/examples/newparallel/dagdeps.py similarity index 84% rename from docs/examples/newparallel/demo/dag/dagdeps.py rename to docs/examples/newparallel/dagdeps.py index c3b6837f987..ee2bcc8fa71 100644 --- a/docs/examples/newparallel/demo/dag/dagdeps.py +++ b/docs/examples/newparallel/dagdeps.py @@ -57,7 +57,7 @@ def submit_jobs(client, G, jobs): """Submit jobs via client where G describes the time dependencies.""" results = {} for node in nx.topological_sort(G): - deps = [ results[n].msg_ids[0] for n in G.predecessors(node) ] + deps = [ results[n] for n in G.predecessors(node) ] results[node] = client.apply(jobs[node], after=deps) return results @@ -77,30 +77,34 @@ def main(nodes, edges): point at least slightly to the right if the graph is valid. """ from matplotlib.dates import date2num + from matplotlib.cm import gist_rainbow print "building DAG" G = random_dag(nodes, edges) jobs = {} pos = {} + colors = {} for node in G: jobs[node] = randomwait client = cmod.Client() - print "submitting tasks" + print "submitting %i tasks with %i dependencies"%(nodes,edges) results = submit_jobs(client, G, jobs) print "waiting for results" client.barrier() print "done" for node in G: - # times[node] = results[node].get() - t = date2num(results[node].metadata.started) - pos[node] = (t, G.in_degree(node)+random()) - + md = results[node].metadata + start = date2num(md.started) + runtime = date2num(md.completed) - start + pos[node] = (start, runtime) + colors[node] = md.engine_id validate_tree(G, results) - nx.draw(G, pos) + nx.draw(G, pos, node_list = colors.keys(), node_color=colors.values(), cmap=gist_rainbow) return G,results if __name__ == '__main__': import pylab - main(32,128) + # main(5,10) + main(32,96) pylab.show() \ No newline at end of file diff --git a/docs/source/parallelz/dag_dependencies.txt b/docs/source/parallelz/dag_dependencies.txt new file mode 100644 index 00000000000..b1816aefd64 --- /dev/null +++ b/docs/source/parallelz/dag_dependencies.txt @@ -0,0 +1,172 @@ +.. _dag_dependencies: + +================ +DAG Dependencies +================ + +Often, parallel workflow is described in terms of a `Directed Acyclic Graph +`_ or DAG. A popular library +for working with Graphs is NetworkX_. Here, we will walk through a demo mapping +a nx DAG to task dependencies. + +The full script that runs this demo can be found in +:file:`docs/examples/newparallel/dagdeps.py`. + +Why are DAGs good for task dependencies? +---------------------------------------- + +The 'G' in DAG is 'Graph'. A Graph is a collection of **nodes** and **edges** that connect +the nodes. For our purposes, each node would be a task, and each edge would be a +dependency. The 'D' in DAG stands for 'Directed'. This means that each edge has a +direction associated with it. So we can interpret the edge (a,b) as meaning that b depends +on a, whereas the edge (b,a) would mean a depends on b. The 'A' is 'Acyclic', meaning that +there must not be any closed loops in the graph. This is important for dependencies, +because if a loop were closed, then a task could ultimately depend on itself, and never be +able to run. If your workflow can be described as a DAG, then it is impossible for your +dependencies to cause a deadlock. + +A Sample DAG +------------ + +Here, we have a very simple 5-node DAG: + +.. figure:: simpledag.* + +With NetworkX, an arrow is just a fattened bit on the edge. Here, we can see that task 0 +depends on nothing, and can run immediately. 1 and 2 depend on 0; 3 depends on + 1 and 2; and 4 depends only on 1. + +A possible sequence of events for this workflow: + +0. Task 0 can run right away +1. 0 finishes, so 1,2 can start +2. 1 finishes, 3 is still waiting on 2, but 4 can start right away +3. 2 finishes, and 3 can finally start + + +Further, taking failures into account, assuming all dependencies are run with the default +`success_only=True`, the following cases would occur for each node's failure: + +0. fails: all other tasks fail as Impossible +1. 2 can still succeed, but 3,4 are unreachable +2. 3 becomes unreachable, but 4 is unaffected +3. and 4. are terminal, and can have no effect on other nodes + +The code to generate the simple DAG: + +.. sourcecode:: python + + import networkx as nx + + G = nx.DiGraph() + + # add 5 nodes, labeled 0-4: + map(G.add_node, range(5)) + # 1,2 depend on 0: + G.add_edge(0,1) + G.add_edge(0,2) + # 3 depends on 1,2 + G.add_edge(1,3) + G.add_edge(2,3) + # 4 depends on 1 + G.add_edge(1,4) + + # now draw the graph: + pos = { 0 : (0,0), 1 : (1,1), 2 : (-1,1), + 3 : (0,2), 4 : (2,2)} + nx.draw(G, pos, edge_color='r') + + +For demonstration purposes, we have a function that generates a random DAG with a given +number of nodes and edges. + +.. literalinclude:: ../../examples/newparallel/dagdeps.py + :language: python + :lines: 20-36 + +So first, we start with a graph of 32 nodes, with 128 edges: + +.. sourcecode:: ipython + + In [2]: G = random_dag(32,128) + +Now, we need to build our dict of jobs corresponding to the nodes on the graph: + +.. sourcecode:: ipython + + In [3]: jobs = {} + + # in reality, each job would presumably be different + # randomwait is just a function that sleeps for a random interval + In [4]: for node in G: + ...: jobs[node] = randomwait + +Once we have a dict of jobs matching the nodes on the graph, we can start submitting jobs, +and linking up the dependencies. Since we don't know a job's msg_id until it is submitted, +which is necessary for building dependencies, it is critical that we don't submit any jobs +before other jobs it may depend on. Fortunately, NetworkX provides a +:meth:`topological_sort` method which ensures exactly this. It presents an iterable, that +guarantees that when you arrive at a node, you have already visited all the nodes it +on which it depends: + +.. sourcecode:: ipython + + In [5]: c = client.Client() + + In [6]: results = {} + + In [7]: for node in G.topological_sort(): + ...: # get list of AsyncResult objects from nodes + ...: # leading into this one as dependencies + ...: deps = [ results[n] for n in G.predecessors(node) ] + ...: # submit and store AsyncResult object + ...: results[node] = client.apply(jobs[node], after=deps, block=False) + +Now that we have submitted all the jobs, we can wait for the results: + +.. sourcecode:: ipython + + In [8]: [ r.get() for r in results.values() ] + +Now, at least we know that all the jobs ran and did not fail (``r.get()`` would have +raised an error if a task failed). But we don't know that the ordering was properly +respected. For this, we can use the :attr:`metadata` attribute of each AsyncResult. + +These objects store a variety of metadata about each task, including various timestamps. +We can validate that the dependencies were respected by checking that each task was +started after all of its predecessors were completed: + +.. literalinclude:: ../../examples/newparallel/dagdeps.py + :language: python + :lines: 64-70 + +We can also validate the graph visually. By drawing the graph with each node's x-position +as its start time, all arrows must be pointing to the right if the order was respected. +For spreading, the y-position will be the in-degree, so tasks with lots of dependencies +will be at the top, and tasks with few dependencies will be at the bottom. + +.. sourcecode:: ipython + + In [10]: from matplotlib.dates import date2num + + In [11]: from matplotlib.cm import gist_rainbow + + In [12]: pos = {}; colors = {} + + In [12]: for node in G: + ...: md = results[node].metadata + ...: start = date2num(md.started) + ...: runtime = date2num(md.completed) - start + ...: pos[node] = (start, runtime) + ...: colors[node] = md.engine_id + + In [13]: nx.draw(G, pos, node_list=colors.keys(), node_color=colors.values(), + ...: cmap=gist_rainbow) + +.. figure:: dagdeps.* + + Time started on x, runtime on y, and color-coded by engine-id (in this case there + were four engines). + + +.. _NetworkX: http://networkx.lanl.gov/ diff --git a/docs/source/parallelz/dagdeps.pdf b/docs/source/parallelz/dagdeps.pdf new file mode 100644 index 00000000000..33bdc9d1825 Binary files /dev/null and b/docs/source/parallelz/dagdeps.pdf differ diff --git a/docs/source/parallelz/dagdeps.png b/docs/source/parallelz/dagdeps.png new file mode 100644 index 00000000000..a821aaebcff Binary files /dev/null and b/docs/source/parallelz/dagdeps.png differ diff --git a/docs/source/parallelz/index.txt b/docs/source/parallelz/index.txt index 2ec540ae76a..5f9aa940e07 100644 --- a/docs/source/parallelz/index.txt +++ b/docs/source/parallelz/index.txt @@ -15,5 +15,6 @@ Using IPython for parallel computing (ZMQ) parallel_security.txt parallel_winhpc.txt parallel_demos.txt + dag_dependencies.txt diff --git a/docs/source/parallelz/parallel_demos.txt b/docs/source/parallelz/parallel_demos.txt index ee1a32975ce..c1a872a6461 100644 --- a/docs/source/parallelz/parallel_demos.txt +++ b/docs/source/parallelz/parallel_demos.txt @@ -13,14 +13,7 @@ Matplotlib package. IPython can be started in this mode by typing:: ipython --pylab -at the system command line. If this prints an error message, you will -need to install the default profiles from within IPython by doing, - -.. sourcecode:: ipython - - In [1]: %install_profiles - -and then restarting IPython. +at the system command line. 150 million digits of pi ======================== diff --git a/docs/source/parallelz/parallel_multiengine.txt b/docs/source/parallelz/parallel_multiengine.txt index 37a19ddf03a..48c5039893c 100644 --- a/docs/source/parallelz/parallel_multiengine.txt +++ b/docs/source/parallelz/parallel_multiengine.txt @@ -132,11 +132,11 @@ The main method for doing remote execution (in fact, all methods that communicate with the engines are built on top of it), is :meth:`Client.apply`. Ideally, :meth:`apply` would have the signature ``apply(f,*args,**kwargs)``, which would call ``f(*args,**kwargs)`` remotely. However, since :class:`Clients` -require some more options, they cannot reasonably provide this interface. +require some more options, they cannot easily provide this interface. Instead, they provide the signature:: - c.apply(f, args=None, kwargs=None, bound=True, block=None, - targets=None, after=None, follow=None) + c.apply(f, args=None, kwargs=None, bound=True, block=None, targets=None, + after=None, follow=None, timeout=None) In order to provide the nicer interface, we have :class:`View` classes, which wrap :meth:`Client.apply` by using attributes and extra :meth:`apply_x` methods to determine @@ -184,7 +184,7 @@ blocks until the engines are done executing the command: In [5]: dview['b'] = 10 In [6]: dview.apply_bound(lambda x: a+b+x, 27) - Out[6]: [42,42,42,42] + Out[6]: [42, 42, 42, 42] Python commands can be executed on specific engines by calling execute using the ``targets`` keyword argument, or creating a :class:`DirectView` instance @@ -197,7 +197,7 @@ by index-access to the client: In [7]: rc[1::2].execute('c=a-b') # shorthand for rc.execute('c=a-b',targets=[1,3]) In [8]: rc[:]['c'] # shorthand for rc.pull('c',targets='all') - Out[8]: [15,-5,15,-5] + Out[8]: [15, -5, 15, -5] .. note:: @@ -258,7 +258,7 @@ time through its :meth:`get` method. .. Note:: - The :class:`AsyncResult` object provides the exact same interface as + The :class:`AsyncResult` object provides a superset of the interface in :py:class:`multiprocessing.pool.AsyncResult`. See the `official Python documentation `_ for more. @@ -270,15 +270,12 @@ local Python/IPython session: .. sourcecode:: ipython # define our function - In [35]: def wait(t): - ....: import time - ....: tic = time.time() - ....: time.sleep(t) - ....: return time.time()-tic + In [6]: def wait(t): + ...: import time + ...: tic = time.time() + ...: time.sleep(t) + ...: return time.time()-tic - # In blocking mode - In [6]: rc.apply('import time') - # In non-blocking mode In [7]: pr = rc[:].apply_async(wait, 2) @@ -316,8 +313,8 @@ local Python/IPython session: Often, it is desirable to wait until a set of :class:`AsyncResult` objects are done. For this, there is a the method :meth:`barrier`. This method takes a -tuple of :class:`AsyncResult` objects (or `msg_ids`) and blocks until all of the associated -results are ready: +tuple of :class:`AsyncResult` objects (or `msg_ids`) and blocks until all of the +associated results are ready: .. sourcecode:: ipython @@ -329,7 +326,7 @@ results are ready: # Wait until all of them are done In [74]: rc.barrier(pr_list) - # Then, their results are ready using get_result or the r attribute + # Then, their results are ready using get() or the `.r` attribute In [75]: pr_list[0].get() Out[75]: [2.9982571601867676, 2.9982588291168213, 2.9987530708312988, 2.9990990161895752] diff --git a/docs/source/parallelz/parallel_security.txt b/docs/source/parallelz/parallel_security.txt index 042edd9cd8c..a588db29dc9 100644 --- a/docs/source/parallelz/parallel_security.txt +++ b/docs/source/parallelz/parallel_security.txt @@ -320,4 +320,5 @@ channel is established. .. [RFC5246] - +.. [OpenSSH] +.. [Paramiko] diff --git a/docs/source/parallelz/parallel_task.txt b/docs/source/parallelz/parallel_task.txt index b188e8967c8..99213ffc90c 100644 --- a/docs/source/parallelz/parallel_task.txt +++ b/docs/source/parallelz/parallel_task.txt @@ -4,13 +4,13 @@ The IPython task interface ========================== -The task interface to the controller presents the engines as a fault tolerant, +The task interface to the cluster presents the engines as a fault tolerant, dynamic load-balanced system of workers. Unlike the multiengine interface, in -the task interface, the user have no direct access to individual engines. By -allowing the IPython scheduler to assign work, this interface is both simpler -and more powerful. +the task interface the user have no direct access to individual engines. By +allowing the IPython scheduler to assign work, this interface is simultaneously +simpler and more powerful. -Best of all the user can use both of these interfaces running at the same time +Best of all, the user can use both of these interfaces running at the same time to take advantage of their respective strengths. When the user can break up the user's work into segments that do not depend on previous execution, the task interface is ideal. But it also has more power and flexibility, allowing @@ -97,11 +97,275 @@ that turns any Python function into a parallel function: In [10]: @lview.parallel() ....: def f(x): ....: return 10.0*x**4 - ....: + ....: In [11]: f.map(range(32)) # this is done in parallel Out[11]: [0.0,10.0,160.0,...] +Dependencies +============ + +Often, pure atomic load-balancing is too primitive for your work. In these cases, you +may want to associate some kind of `Dependency` that describes when, where, or whether +a task can be run. In IPython, we provide two types of dependencies: +`Functional Dependencies`_ and `Graph Dependencies`_ + +.. note:: + + It is important to note that the pure ZeroMQ scheduler does not support dependencies, + and you will see errors or warnings if you try to use dependencies with the pure + scheduler. + +Functional Dependencies +----------------------- + +Functional dependencies are used to determine whether a given engine is capable of running +a particular task. This is implemented via a special :class:`Exception` class, +:class:`UnmetDependency`, found in `IPython.zmq.parallel.error`. Its use is very simple: +if a task fails with an UnmetDependency exception, then the scheduler, instead of relaying +the error up to the client like any other error, catches the error, and submits the task +to a different engine. This will repeat indefinitely, and a task will never be submitted +to a given engine a second time. + +You can manually raise the :class:`UnmetDependency` yourself, but IPython has provided +some decorators for facilitating this behavior. + +There are two decorators and a class used for functional dependencies: + +.. sourcecode:: ipython + + In [9]: from IPython.zmq.parallel.dependency import depend, require, dependent + +@require +******** + +The simplest sort of dependency is requiring that a Python module is available. The +``@require`` decorator lets you define a function that will only run on engines where names +you specify are importable: + +.. sourcecode:: ipython + + In [10]: @require('numpy', 'zmq') + ...: def myfunc(): + ...: import numpy,zmq + ...: return dostuff() + +Now, any time you apply :func:`myfunc`, the task will only run on a machine that has +numpy and pyzmq available. + +@depend +******* + +The ``@depend`` decorator lets you decorate any function with any *other* function to +evaluate the dependency. The dependency function will be called at the start of the task, +and if it returns ``False``, then the dependency will be considered unmet, and the task +will be assigned to another engine. If the dependency returns *anything other than +``False``*, the rest of the task will continue. + +.. sourcecode:: ipython + + In [10]: def platform_specific(plat): + ...: import sys + ...: return sys.platform == plat + + In [11]: @depend(platform_specific, 'darwin') + ...: def mactask(): + ...: do_mac_stuff() + + In [12]: @depend(platform_specific, 'nt') + ...: def wintask(): + ...: do_windows_stuff() + +In this case, any time you apply ``mytask``, it will only run on an OSX machine. +``@depend`` is just like ``apply``, in that it has a ``@depend(f,*args,**kwargs)`` +signature. + +dependents +********** + +You don't have to use the decorators on your tasks, if for instance you may want +to run tasks with a single function but varying dependencies, you can directly construct +the :class:`dependent` object that the decorators use: + +.. sourcecode::ipython + + In [13]: def mytask(*args): + ...: dostuff() + + In [14]: mactask = dependent(mytask, platform_specific, 'darwin') + # this is the same as decorating the declaration of mytask with @depend + # but you can do it again: + + In [15]: wintask = dependent(mytask, platform_specific, 'nt') + + # in general: + In [16]: t = dependent(f, g, *dargs, **dkwargs) + + # is equivalent to: + In [17]: @depend(g, *dargs, **dkwargs) + ...: def t(a,b,c): + ...: # contents of f + +Graph Dependencies +------------------ + +Sometimes you want to restrict the time and/or location to run a given task as a function +of the time and/or location of other tasks. This is implemented via a subclass of +:class:`set`, called a :class:`Dependency`. A Dependency is just a set of `msg_ids` +corresponding to tasks, and a few attributes to guide how to decide when the Dependency +has been met. + +The switches we provide for interpreting whether a given dependency set has been met: + +any|all + Whether the dependency is considered met if *any* of the dependencies are done, or + only after *all* of them have finished. This is set by a Dependency's :attr:`all` + boolean attribute, which defaults to ``True``. + +success_only + Whether to consider only tasks that did not raise an error as being fulfilled. + Sometimes you want to run a task after another, but only if that task succeeded. In + this case, ``success_only`` should be ``True``. However sometimes you may not care + whether the task succeeds, and always want the second task to run, in which case + you should use `success_only=False`. The default behavior is to only use successes. + +There are other switches for interpretation that are made at the *task* level. These are +specified via keyword arguments to the client's :meth:`apply` method. + +after,follow + You may want to run a task *after* a given set of dependencies have been run and/or + run it *where* another set of dependencies are met. To support this, every task has an + `after` dependency to restrict time, and a `follow` dependency to restrict + destination. + +timeout + You may also want to set a time-limit for how long the scheduler should wait before a + task's dependencies are met. This is done via a `timeout`, which defaults to 0, which + indicates that the task should never timeout. If the timeout is reached, and the + scheduler still hasn't been able to assign the task to an engine, the task will fail + with a :class:`DependencyTimeout`. + +.. note:: + + Dependencies only work within the task scheduler. You cannot instruct a load-balanced + task to run after a job submitted via the MUX interface. + +The simplest form of Dependencies is with `all=True,success_only=True`. In these cases, +you can skip using Dependency objects, and just pass msg_ids or AsyncResult objects as the +`follow` and `after` keywords to :meth:`client.apply`: + +.. sourcecode:: ipython + + In [14]: client.block=False + + In [15]: ar = client.apply(f, args, kwargs, targets=None) + + In [16]: ar2 = client.apply(f2, targets=None) + + In [17]: ar3 = client.apply(f3, after=[ar,ar2]) + + In [17]: ar4 = client.apply(f3, follow=[ar], timeout=2.5) + + +.. seealso:: + + Some parallel workloads can be described as a `Directed Acyclic Graph + `_, or DAG. See :ref:`DAG + Dependencies ` for an example demonstrating how to use map a NetworkX DAG + onto task dependencies. + + + +Impossible Dependencies +*********************** + +The schedulers do perform some analysis on graph dependencies to determine whether they +are not possible to be met. If the scheduler does discover that a dependency cannot be +met, then the task will fail with an :class:`ImpossibleDependency` error. This way, if the +scheduler realized that a task can never be run, it won't sit indefinitely in the +scheduler clogging the pipeline. + +The basic cases that are checked: + +* depending on nonexistent messages +* `follow` dependencies were run on more than one machine and `all=True` +* any dependencies failed and `all=True,success_only=True` +* all dependencies failed and `all=False,success_only=True` + +.. warning:: + + This analysis has not been proven to be rigorous, so it is likely possible for tasks + to become impossible to run in obscure situations, so a timeout may be a good choice. + +Schedulers +========== + +There are a variety of valid ways to determine where jobs should be assigned in a +load-balancing situation. In IPython, we support several standard schemes, and +even make it easy to define your own. The scheme can be selected via the ``--scheme`` +argument to :command:`ipcontrollerz`, or in the :attr:`HubFactory.scheme` attribute +of a controller config object. + +The built-in routing schemes: + +lru: Least Recently Used + + Always assign work to the least-recently-used engine. A close relative of + round-robin, it will be fair with respect to the number of tasks, agnostic + with respect to runtime of each task. + +plainrandom: Plain Random + Randomly picks an engine on which to run. + +twobin: Two-Bin Random + + **Depends on numpy** + + Pick two engines at random, and use the LRU of the two. This is known to be better + than plain random in many cases, but requires a small amount of computation. + +leastload: Least Load + + **This is the default scheme** + + Always assign tasks to the engine with the fewest outstanding tasks (LRU breaks tie). + +weighted: Weighted Two-Bin Random + + **Depends on numpy** + + Pick two engines at random using the number of outstanding tasks as inverse weights, + and use the one with the lower load. + + +Pure ZMQ Scheduler +------------------ + +For maximum throughput, the 'pure' scheme is not Python at all, but a C-level +:class:`MonitoredQueue` from PyZMQ, which uses a ZeroMQ ``XREQ`` socket to perform all +load-balancing. This scheduler does not support any of the advanced features of the Python +:class:`.Scheduler`. + +Disabled features when using the ZMQ Scheduler: + +* Engine unregistration + Task farming will be disabled if an engine unregisters. + Further, if an engine is unregistered during computation, the scheduler may not recover. +* Dependencies + Since there is no Python logic inside the Scheduler, routing decisions cannot be made + based on message content. +* Early destination notification + The Python schedulers know which engine gets which task, and notify the Hub. This + allows graceful handling of Engines coming and going. There is no way to know + where ZeroMQ messages have gone, so there is no way to know what tasks are on which + engine until they *finish*. This makes recovery from engine shutdown very difficult. + + +.. note:: + + TODO: performance comparisons + + More details ============ @@ -125,8 +389,7 @@ The following is an overview of how to use these classes together: tasks, or use the :meth:`AsyncResult.get` method of the results to wait for and then receive the results. -We are in the process of developing more detailed information about the task -interface. For now, the docstrings of the :meth:`Client.apply`, -and :func:`depend` methods should be consulted. +.. seealso:: + A demo of :ref:`DAG Dependencies ` with NetworkX and IPython. diff --git a/docs/source/parallelz/parallel_winhpc.txt b/docs/source/parallelz/parallel_winhpc.txt index 04772627cb5..eedd8c64c8a 100644 --- a/docs/source/parallelz/parallel_winhpc.txt +++ b/docs/source/parallelz/parallel_winhpc.txt @@ -123,7 +123,7 @@ opening a Windows Command Prompt and typing ``ipython``. This will start IPython's interactive shell and you should see something like the following screenshot: -.. image:: ipython_shell.* +.. image:: ../parallel/ipython_shell.* Starting an IPython cluster =========================== @@ -171,7 +171,7 @@ You should see a number of messages printed to the screen, ending with "IPython cluster: started". The result should look something like the following screenshot: -.. image:: ipclusterz_start.* +.. image:: ../parallel/ipcluster_start.* At this point, the controller and two engines are running on your local host. This configuration is useful for testing and for situations where you want to @@ -213,7 +213,7 @@ The output of this command is shown in the screenshot below. Notice how :command:`ipclusterz` prints out the location of the newly created cluster directory. -.. image:: ipclusterz_create.* +.. image:: ../parallel/ipcluster_create.* Configuring a cluster profile ----------------------------- @@ -282,7 +282,7 @@ must be run again to regenerate the XML job description files. The following screenshot shows what the HPC Job Manager interface looks like with a running IPython cluster. -.. image:: hpc_job_manager.* +.. image:: ../parallel/hpc_job_manager.* Performing a simple interactive parallel computation ==================================================== @@ -333,5 +333,5 @@ The :meth:`map` method has the same signature as Python's builtin :func:`map` function, but runs the calculation in parallel. More involved examples of using :class:`MultiEngineClient` are provided in the examples that follow. -.. image:: mec_simple.* +.. image:: ../parallel/mec_simple.* diff --git a/docs/source/parallelz/simpledag.pdf b/docs/source/parallelz/simpledag.pdf new file mode 100644 index 00000000000..5aadf0dfe27 Binary files /dev/null and b/docs/source/parallelz/simpledag.pdf differ diff --git a/docs/source/parallelz/simpledag.png b/docs/source/parallelz/simpledag.png new file mode 100644 index 00000000000..907c2becad0 Binary files /dev/null and b/docs/source/parallelz/simpledag.png differ