Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

tweak dagdeps for new AsyncResult objects

  • Loading branch information...
commit 9f1a03abe249bf089519eec18dba3867c662621e 1 parent 2d79d3e
@minrk minrk authored
Showing with 10 additions and 7 deletions.
  1. +10 −7 examples/demo/dag/dagdeps.py
View
17 examples/demo/dag/dagdeps.py
@@ -55,11 +55,11 @@ def make_bintree(levels):
def submit_jobs(client, G, jobs):
"""Submit jobs via client where G describes the time dependencies."""
- msg_ids = {}
+ results = {}
for node in nx.topological_sort(G):
- deps = [ msg_ids[n] for n in G.predecessors(node) ]
- msg_ids[node] = client.apply(jobs[node], after=deps)
- return msg_ids
+ deps = [ results[n].msg_ids[0] for n in G.predecessors(node) ]
+ results[node] = client.apply(jobs[node], after=deps)
+ return results
def validate_tree(G, times):
"""Validate that jobs executed after their dependencies."""
@@ -76,6 +76,7 @@ def main(nodes, edges):
in-degree on the y (just for spread). All arrows must
point at least slightly to the right if the graph is valid.
"""
+ print "building DAG"
G = random_dag(nodes, edges)
jobs = {}
msg_ids = {}
@@ -85,11 +86,13 @@ def main(nodes, edges):
jobs[node] = randomwait
client = cmod.Client('tcp://127.0.0.1:10101')
-
- msg_ids = submit_jobs(client, G, jobs)
+ print "submitting tasks"
+ results = submit_jobs(client, G, jobs)
+ print "waiting for results"
client.barrier()
+ print "done"
for node in G:
- times[node] = client.results[msg_ids[node]]
+ times[node] = results[node].get()
pos[node] = (times[node], G.in_degree(node)+random())
validate_tree(G, times)
Please sign in to comment.
Something went wrong with that request. Please try again.