Skip to content
Browse files

added preliminary tests for zmq.parallel

  • Loading branch information...
1 parent 03b00f7 commit 39eab52f53f11eec8661c6d51669840938bf4ca1 @minrk minrk committed Jan 28, 2011
View
51 IPython/zmq/parallel/tests/__init__.py
@@ -0,0 +1,51 @@
+"""toplevel setup/teardown for prallel tests."""
+import time
+
+from IPython.zmq.parallel.ipcluster import launch_process
+from IPython.zmq.parallel.entry_point import select_random_ports
+# from multiprocessing import Process
+
+cluster_logs = dict(
+ regport=0,
+ processes = [],
+)
+
+def setup():
+ p = select_random_ports(1)[0]
+ cluster_logs['regport']=p
+ cp = launch_process('controller',('--scheduler lru --ping 100 --regport %i'%p).split())
+ # cp.start()
+ cluster_logs['processes'].append(cp)
+ add_engine(p)
+ time.sleep(2)
+
+def add_engine(port=None):
+ if port is None:
+ port = cluster_logs['regport']
+ ep = launch_process('engine', ['--regport',str(port)])
+ # ep.start()
+ cluster_logs['processes'].append(ep)
+ return ep
+
+def teardown():
+ time.sleep(1)
+ processes = cluster_logs['processes']
+ while processes:
+ p = processes.pop()
+ if p.poll() is None:
+ try:
+ print 'terminating'
+ p.terminate()
+ except Exception, e:
+ print e
+ pass
+ if p.poll() is None:
+ time.sleep(.25)
+ if p.poll() is None:
+ try:
+ print 'killing'
+ p.kill()
+ except:
+ print "couldn't shutdown process: ",p
+
+
View
55 IPython/zmq/parallel/tests/clienttest.py
@@ -0,0 +1,55 @@
+import time
+from signal import SIGINT
+from multiprocessing import Process
+
+from zmq.tests import BaseZMQTestCase
+
+from IPython.zmq.parallel.ipcluster import launch_process
+from IPython.zmq.parallel.entry_point import select_random_ports
+from IPython.zmq.parallel.client import Client
+from IPython.zmq.parallel.tests import cluster_logs,add_engine
+
+
+class ClusterTestCase(BaseZMQTestCase):
+
+ def add_engines(self, n=1):
+ """add multiple engines to our cluster"""
+ for i in range(n):
+ self.engines.append(add_engine())
+
+ def wait_on_engines(self):
+ """wait for our engines to connect."""
+ while len(self.client.ids) < len(self.engines)+self.base_engine_count:
+ time.sleep(0.1)
+
+ def start_cluster(self, n=1):
+ """start a cluster"""
+ raise NotImplementedError("Don't use this anymore")
+ rport = select_random_ports(1)[0]
+ args = [ '--regport', str(rport), '--ip', '127.0.0.1' ]
+ cp = launch_process('controller', args)
+ eps = [ launch_process('engine', args+['--ident', 'engine-%i'%i]) for i in range(n) ]
+ return rport, args, cp, eps
+
+ def connect_client(self, port=None):
+ """connect a client with my Context, and track its sockets for cleanup"""
+ if port is None:
+ port = cluster_logs['regport']
+ c = Client('tcp://127.0.0.1:%i'%port,context=self.context)
+ for name in filter(lambda n:n.endswith('socket'), dir(c)):
+ self.sockets.append(getattr(c, name))
+ return c
+
+ def setUp(self):
+ BaseZMQTestCase.setUp(self)
+ self.client = self.connect_client()
+ self.base_engine_count=len(self.client.ids)
+ self.engines=[]
+
+ def tearDown(self):
+ [ e.terminate() for e in filter(lambda e: e.poll() is None, self.engines) ]
+ # while len(self.client.ids) > self.base_engine_count:
+ # time.sleep(.1)
+ del self.engines
+ BaseZMQTestCase.tearDown(self)
+
View
42 IPython/zmq/parallel/tests/test_client.py
@@ -0,0 +1,42 @@
+import time
+
+from IPython.zmq.parallel.view import LoadBalancedView, DirectView
+
+from clienttest import ClusterTestCase
+
+class TestClient(ClusterTestCase):
+
+ def test_ids(self):
+ self.assertEquals(len(self.client.ids), 1)
+ self.add_engines(3)
+ self.wait_on_engines()
+ self.assertEquals(self.client.ids, set(range(4)))
+
+ def test_segfault(self):
+ def segfault():
+ import ctypes
+ ctypes.memset(-1,0,1)
+ self.client[0].apply(segfault)
+ while 0 in self.client.ids:
+ time.sleep(.01)
+ self.client.spin()
+
+ def test_view_indexing(self):
+ self.add_engines(7)
+ self.wait_on_engines()
+ targets = self.client._build_targets('all')[-1]
+ v = self.client[:]
+ self.assertEquals(v.targets, targets)
+ v =self.client[2]
+ self.assertEquals(v.targets, 2)
+ v =self.client[1,2]
+ self.assertEquals(v.targets, [1,2])
+ v =self.client[::2]
+ self.assertEquals(v.targets, targets[::2])
+ v =self.client[1::3]
+ self.assertEquals(v.targets, targets[1::3])
+ v =self.client[:-3]
+ self.assertEquals(v.targets, targets[:-3])
+ v =self.client[None]
+ self.assert_(isinstance(v, LoadBalancedView))
+

0 comments on commit 39eab52

Please sign in to comment.
Something went wrong with that request. Please try again.