Skip to content
Browse files

added py4science demos as examples + NetworkX DAG dependencies

  • Loading branch information...
1 parent f1b3eb2 commit b78cec8e3c84dd2812615c2e6e52a9305b8d3ad6 @minrk minrk committed Oct 28, 2010
View
103 examples/demo/dagdeps.py
@@ -0,0 +1,103 @@
+"""Example for generating an arbitrary DAG as a dependency map.
+
+This demo uses networkx to generate the graph.
+
+Authors
+-------
+* MinRK
+"""
+import networkx as nx
+from random import randint, random
+from IPython.zmq.parallel import client as cmod
+
+def randomwait():
+ import time
+ from random import random
+ time.sleep(random())
+ return time.time()
+
+
+def random_dag(nodes, edges):
+ """Generate a random Directed Acyclic Graph (DAG) with a given number of nodes and edges."""
+ G = nx.DiGraph()
+ for i in range(nodes):
+ G.add_node(i)
+ while edges > 0:
+ a = randint(0,nodes-1)
+ b=a
+ while b==a:
+ b = randint(0,nodes-1)
+ G.add_edge(a,b)
+ if nx.is_directed_acyclic_graph(G):
+ edges -= 1
+ else:
+ # we closed a loop!
+ G.remove_edge(a,b)
+ return G
+
+def add_children(G, parent, level, n=2):
+ """Add children recursively to a binary tree."""
+ if level == 0:
+ return
+ for i in range(n):
+ child = parent+str(i)
+ G.add_node(child)
+ G.add_edge(parent,child)
+ add_children(G, child, level-1, n)
+
+def make_bintree(levels):
+ """Make a symmetrical binary tree with @levels"""
+ G = nx.DiGraph()
+ root = '0'
+ G.add_node(root)
+ add_children(G, root, levels, 2)
+ return G
+
+def submit_jobs(client, G, jobs):
+ """Submit jobs via client where G describes the time dependencies."""
+ msg_ids = {}
+ for node in nx.topological_sort(G):
+ deps = [ msg_ids[n] for n in G.predecessors(node) ]
+ msg_ids[node] = client.apply(jobs[node], after=deps)
+ return msg_ids
+
+def validate_tree(G, times):
+ """Validate that jobs executed after their dependencies."""
+ for node in G:
+ t = times[node]
+ for parent in G.predecessors(node):
+ pt = times[parent]
+ assert t > pt, "%s should have happened after %s"%(node, parent)
+
+def main(nodes, edges):
+ """Generate a random graph, submit jobs, then validate that the
+ dependency order was enforced.
+ Finally, plot the graph, with time on the x-axis, and
+ in-degree on the y (just for spread). All arrows must
+ point at least slightly to the right if the graph is valid.
+ """
+ G = random_dag(nodes, edges)
+ jobs = {}
+ msg_ids = {}
+ times = {}
+ pos = {}
+ for node in G:
+ jobs[node] = randomwait
+
+ client = cmod.Client('tcp://127.0.0.1:10101')
+
+ msg_ids = submit_jobs(client, G, jobs)
+ client.barrier()
+ for node in G:
+ times[node] = client.results[msg_ids[node]]
+ pos[node] = (times[node], G.in_degree(node)+random())
+
+ validate_tree(G, times)
+ nx.draw(G, pos)
+ return G,times,msg_ids
+
+if __name__ == '__main__':
+ import pylab
+ main(32,128)
+ pylab.show()
+
View
35 examples/demo/dependencies.py
@@ -0,0 +1,35 @@
+from IPython.zmq.parallel.client import *
+
+client = Client('tcp://127.0.0.1:10101')
+
+@require('numpy')
+def norm(A):
+ from numpy.linalg import norm
+ return norm(A,2)
+
+def checkpid(pid):
+ import os
+ return os.getpid() == pid
+
+def checkhostname(host):
+ import socket
+ return socket.gethostname() == host
+
+def getpid():
+ import os
+ return os.getpid()
+
+pid0 = client.apply(getpid, targets=0, block=True)
+
+@depend(checkpid, pid0)
+def getpid2():
+ import os
+ return os.getpid()
+
+rns = client[None]
+rns.block=True
+
+pids1 = [ rns.apply(getpid) for i in range(len(client.ids)) ]
+pids2 = [ rns.apply(getpid2) for i in range(len(client.ids)) ]
+print pids1
+print pids2
View
20 examples/demo/loadbalance.py
@@ -0,0 +1,20 @@
+import time
+from IPython.zmq.parallel.client import *
+
+def wait(t):
+ import time
+ time.sleep(t)
+ return t
+
+client = Client('tcp://127.0.0.1:10101')
+view = client[None]
+
+tic = time.time()
+for i in range(128):
+ view.apply(wait, 1e-2*i)
+ # limit to 1k msgs/s
+ time.sleep(1e-2)
+
+client.barrier()
+toc = time.time()
+print toc-tic
View
20 examples/demo/map.py
@@ -0,0 +1,20 @@
+from IPython.zmq.parallel.client import *
+
+client = Client('tcp://127.0.0.1:10101')
+
+@remote(client, block=True)
+def square(a):
+ """return square of a number"""
+ return a*a
+
+squares = map(square, range(42))
+
+# but that blocked between each result, not exactly useful
+square.block=False
+msg_ids = map(square, range(42))
+# submitted very fast
+# wait for them to be done:
+client.barrier(msg_id)
+squares2 = map(client.results.get, msg_ids)
+print squares == squares2
+# True
View
44 examples/demo/noncopying.py
@@ -0,0 +1,44 @@
+"""non-copying sends"""
+import zmq
+import numpy
+
+n = 10
+iface = 'inproc://pub'
+
+ctx = zmq.Context()
+
+p = ctx.socket(zmq.PUB)
+p.bind(iface)
+
+# connect 2 subs
+s1 = ctx.socket(zmq.SUB)
+s1.connect(iface)
+s1.setsockopt(zmq.SUBSCRIBE, '')
+
+s2 = ctx.socket(zmq.SUB)
+s2.connect(iface)
+s2.setsockopt(zmq.SUBSCRIBE, '')
+
+A = numpy.random.random((1024,1024))
+
+# send
+p.send(A, copy=False)
+# recv on 1 non-copy
+msg1 = s1.recv(copy=False)
+B1 = numpy.frombuffer(msg1.buffer, dtype=A.dtype).reshape(A.shape)
+# recv on 2 copy
+msg2 = s2.recv(copy=False)
+B2 = numpy.frombuffer(buffer(msg2.bytes), dtype=A.dtype).reshape(A.shape)
+
+print (B1==B2).all()
+print (B1==A).all()
+A[0][0] += 10
+print "~"
+# after changing A in-place, B1 changes too, proving non-copying sends
+print (B1==A).all()
+# but B2 is fixed, since it called the msg.bytes attr, which copies
+print (B1==B2).all()
+
+
+
+
View
22 examples/demo/remotefunction.py
@@ -0,0 +1,22 @@
+from IPython.zmq.parallel.client import *
+
+client = Client('tcp://127.0.0.1:10101')
+
+@remote(client, bound=True)
+def getkey(name):
+ """fetch something from globals"""
+ return globals().get(name)
+
+@remote(client, bound=True, targets='all')
+def setpids():
+ import os
+ globals()['pid'] = os.getpid()
+
+# set pid in the globals
+setpids()
+getkey('pid')
+getkey.targets=[1,2]
+getkey('pid')
+getkey.bound=False
+getkey('pid') is None
+
View
89 examples/demo/throughput.py
@@ -0,0 +1,89 @@
+import time
+import numpy as np
+from IPython.zmq.parallel import client as clientmod
+
+nlist = map(int, np.logspace(2,9,16,base=2))
+nlist2 = map(int, np.logspace(2,8,15,base=2))
+tlist = map(int, np.logspace(7,22,16,base=2))
+nt = 16
+def wait(t=0):
+ import time
+ time.sleep(t)
+
+def echo(s=''):
+ return s
+
+def time_throughput(nmessages, t=0, f=wait):
+ client = clientmod.Client('tcp://127.0.0.1:10101')
+ view = client[None]
+ # do one ping before starting timing
+ if f is echo:
+ t = np.random.random(t/8)
+ view.apply_sync(echo, '')
+ client.spin()
+ tic = time.time()
+ for i in xrange(nmessages):
+ view.apply(f, t)
+ lap = time.time()
+ client.barrier()
+ toc = time.time()
+ return lap-tic, toc-tic
+
+def time_twisted(nmessages, t=0, f=wait):
+ from IPython.kernel import client as kc
+ client = kc.TaskClient()
+ if f is wait:
+ s = "import time; time.sleep(%f)"%t
+ task = kc.StringTask(s)
+ elif f is echo:
+ t = np.random.random(t/8)
+ s = "s=t"
+ task = kc.StringTask(s, push=dict(t=t), pull=['s'])
+ else:
+ raise
+ # do one ping before starting timing
+ client.barrier(client.run(task))
+ tic = time.time()
+ tids = []
+ for i in xrange(nmessages):
+ tids.append(client.run(task))
+ lap = time.time()
+ client.barrier(tids)
+ toc = time.time()
+ return lap-tic, toc-tic
+
+def do_runs(nlist,t=0,f=wait, trials=2, runner=time_throughput):
+ A = np.zeros((len(nlist),2))
+ for i,n in enumerate(nlist):
+ t1 = t2 = 0
+ for _ in range(trials):
+ time.sleep(.25)
+ ts = runner(n,t,f)
+ t1 += ts[0]
+ t2 += ts[1]
+ t1 /= trials
+ t2 /= trials
+ A[i] = (t1,t2)
+ A[i] = n/A[i]
+ print n,A[i]
+ return A
+
+def do_echo(n,tlist=[0],f=echo, trials=2, runner=time_throughput):
+ A = np.zeros((len(tlist),2))
+ for i,t in enumerate(tlist):
+ t1 = t2 = 0
+ for _ in range(trials):
+ time.sleep(.25)
+ ts = runner(n,t,f)
+ t1 += ts[0]
+ t2 += ts[1]
+ t1 /= trials
+ t2 /= trials
+ A[i] = (t1,t2)
+ A[i] = n/A[i]
+ print t,A[i]
+ return A
+
+def start_cluster(n, scheduler):
+ pass
+
View
15 examples/demo/views.py
@@ -0,0 +1,15 @@
+from IPython.zmq.parallel.client import *
+
+client = Client('tcp://127.0.0.1:10101')
+
+for id in client.ids:
+ client.push(dict(ids=id*id), targets=id)
+
+rns = client[0]
+rns['a'] = 5
+
+print rns['a']
+
+remotes = client[:]
+
+print remotes['ids']

0 comments on commit b78cec8

Please sign in to comment.
Something went wrong with that request. Please try again.