Permalink
Browse files

some initial tests for newparallel

  • Loading branch information...
minrk committed Feb 26, 2011
1 parent 154798b commit e0abb03577399784091c63f66cfc57fb73a48c08
@@ -1,35 +1,30 @@
-"""toplevel setup/teardown for prallel tests."""
+"""toplevel setup/teardown for parallel tests."""
+
import time
+from subprocess import Popen, PIPE
from IPython.zmq.parallel.ipcluster import launch_process
from IPython.zmq.parallel.entry_point import select_random_ports
-# from multiprocessing import Process
-cluster_logs = dict(
- regport=0,
- processes = [],
-)
+processes = []
+
+# nose setup/teardown
def setup():
- p = select_random_ports(1)[0]
- cluster_logs['regport']=p
- cp = launch_process('controller',('--scheduler lru --ping 100 --regport %i'%p).split())
- # cp.start()
- cluster_logs['processes'].append(cp)
- add_engine(p)
- time.sleep(2)
+ cp = Popen('ipcontrollerz --profile iptest -r --log-level 40'.split(), stdout=PIPE, stdin=PIPE, stderr=PIPE)
+ processes.append(cp)
+ time.sleep(.5)
+ add_engine()
+ time.sleep(3)
-def add_engine(port=None):
- if port is None:
- port = cluster_logs['regport']
- ep = launch_process('engine', ['--regport',str(port)])
+def add_engine(profile='iptest'):
+ ep = Popen(['ipenginez']+ ['--profile', profile, '--log-level', '40'], stdout=PIPE, stdin=PIPE, stderr=PIPE)
# ep.start()
- cluster_logs['processes'].append(ep)
+ processes.append(ep)
return ep
def teardown():
time.sleep(1)
- processes = cluster_logs['processes']
while processes:
p = processes.pop()
if p.poll() is None:
@@ -48,4 +43,3 @@ def teardown():
except:
print "couldn't shutdown process: ",p
-
@@ -2,40 +2,70 @@
from signal import SIGINT
from multiprocessing import Process
+from nose import SkipTest
+
from zmq.tests import BaseZMQTestCase
+from IPython.external.decorator import decorator
+
from IPython.zmq.parallel.ipcluster import launch_process
from IPython.zmq.parallel.entry_point import select_random_ports
from IPython.zmq.parallel.client import Client
-from IPython.zmq.parallel.tests import cluster_logs,add_engine
+from IPython.zmq.parallel.tests import processes,add_engine
+
+# simple tasks for use in apply tests
+
+def segfault():
+ """"""
+ import ctypes
+ ctypes.memset(-1,0,1)
+
+def wait(n):
+ """sleep for a time"""
+ import time
+ time.sleep(n)
+ return n
+
+def raiser(eclass):
+ """raise an exception"""
+ raise eclass()
+
+# test decorator for skipping tests when libraries are unavailable
+def skip_without(*names):
+ """skip a test if some names are not importable"""
+ @decorator
+ def skip_without_names(f, *args, **kwargs):
+ """decorator to skip tests in the absence of numpy."""
+ for name in names:
+ try:
+ __import__(name)
+ except ImportError:
+ raise SkipTest
+ return f(*args, **kwargs)
+ return skip_without_names
class ClusterTestCase(BaseZMQTestCase):
- def add_engines(self, n=1):
+ def add_engines(self, n=1, block=True):
"""add multiple engines to our cluster"""
for i in range(n):
self.engines.append(add_engine())
+ if block:
+ self.wait_on_engines()
- def wait_on_engines(self):
+ def wait_on_engines(self, timeout=5):
"""wait for our engines to connect."""
- while len(self.client.ids) < len(self.engines)+self.base_engine_count:
+ n = len(self.engines)+self.base_engine_count
+ tic = time.time()
+ while time.time()-tic < timeout and len(self.client.ids) < n:
time.sleep(0.1)
+
+ assert not self.client.ids < n, "waiting for engines timed out"
- def start_cluster(self, n=1):
- """start a cluster"""
- raise NotImplementedError("Don't use this anymore")
- rport = select_random_ports(1)[0]
- args = [ '--regport', str(rport), '--ip', '127.0.0.1' ]
- cp = launch_process('controller', args)
- eps = [ launch_process('engine', args+['--ident', 'engine-%i'%i]) for i in range(n) ]
- return rport, args, cp, eps
-
- def connect_client(self, port=None):
+ def connect_client(self):
"""connect a client with my Context, and track its sockets for cleanup"""
- if port is None:
- port = cluster_logs['regport']
- c = Client('tcp://127.0.0.1:%i'%port,context=self.context)
+ c = Client(profile='iptest',context=self.context)
for name in filter(lambda n:n.endswith('socket'), dir(c)):
self.sockets.append(getattr(c, name))
return c
@@ -1,42 +1,117 @@
import time
+import nose.tools as nt
+
+from IPython.zmq.parallel.asyncresult import AsyncResult
from IPython.zmq.parallel.view import LoadBalancedView, DirectView
-from clienttest import ClusterTestCase
+from clienttest import ClusterTestCase, segfault
class TestClient(ClusterTestCase):
def test_ids(self):
self.assertEquals(len(self.client.ids), 1)
self.add_engines(3)
- self.wait_on_engines()
- self.assertEquals(self.client.ids, set(range(4)))
+ self.assertEquals(len(self.client.ids), 4)
def test_segfault(self):
- def segfault():
- import ctypes
- ctypes.memset(-1,0,1)
- self.client[0].apply(segfault)
- while 0 in self.client.ids:
+ self.add_engines(1)
+ eid = self.client.ids[-1]
+ self.client[eid].apply(segfault)
+ while eid in self.client.ids:
time.sleep(.01)
self.client.spin()
def test_view_indexing(self):
- self.add_engines(7)
- self.wait_on_engines()
+ self.add_engines(4)
targets = self.client._build_targets('all')[-1]
v = self.client[:]
self.assertEquals(v.targets, targets)
- v =self.client[2]
- self.assertEquals(v.targets, 2)
- v =self.client[1,2]
- self.assertEquals(v.targets, [1,2])
- v =self.client[::2]
+ t = self.client.ids[2]
+ v = self.client[t]
+ self.assert_(isinstance(v, DirectView))
+ self.assertEquals(v.targets, t)
+ t = self.client.ids[2:4]
+ v = self.client[t]
+ self.assert_(isinstance(v, DirectView))
+ self.assertEquals(v.targets, t)
+ v = self.client[::2]
+ self.assert_(isinstance(v, DirectView))
self.assertEquals(v.targets, targets[::2])
- v =self.client[1::3]
+ v = self.client[1::3]
+ self.assert_(isinstance(v, DirectView))
self.assertEquals(v.targets, targets[1::3])
- v =self.client[:-3]
+ v = self.client[:-3]
+ self.assert_(isinstance(v, DirectView))
self.assertEquals(v.targets, targets[:-3])
- v =self.client[None]
- self.assert_(isinstance(v, LoadBalancedView))
+ nt.assert_raises(TypeError, lambda : self.client[None])
+
+ def test_view_cache(self):
+ """test blocking and non-blocking behavior"""
+ v = self.client[:2]
+ v2 =self.client[:2]
+ self.assertTrue(v is v2)
+ v = self.client.view()
+ v2 = self.client.view(balanced=True)
+ self.assertTrue(v is v2)
+
+ def test_targets(self):
+ """test various valid targets arguments"""
+ pass
+
+ def test_clear(self):
+ """test clear behavior"""
+ # self.add_engines(4)
+ # self.client.push()
+
+ def test_push_pull(self):
+ data = dict(a=10, b=1.05, c=range(10), d={'e':(1,2),'f':'hi'})
+ self.add_engines(4)
+ push = self.client.push
+ pull = self.client.pull
+ self.client.block=True
+ nengines = len(self.client)
+ push({'data':data}, targets=0)
+ d = pull('data', targets=0)
+ self.assertEquals(d, data)
+ push({'data':data})
+ d = pull('data')
+ self.assertEquals(d, nengines*[data])
+ ar = push({'data':data}, block=False)
+ self.assertTrue(isinstance(ar, AsyncResult))
+ r = ar.get()
+ ar = pull('data', block=False)
+ self.assertTrue(isinstance(ar, AsyncResult))
+ r = ar.get()
+ self.assertEquals(r, nengines*[data])
+ push(dict(a=10,b=20))
+ r = pull(('a','b'))
+ self.assertEquals(r, nengines*[[10,20]])
+
+ def test_push_pull_function(self):
+ def testf(x):
+ return 2.0*x
+
+ self.add_engines(4)
+ self.client.block=True
+ push = self.client.push
+ pull = self.client.pull
+ execute = self.client.execute
+ push({'testf':testf}, targets=0)
+ r = pull('testf', targets=0)
+ self.assertEqual(r(1.0), testf(1.0))
+ execute('r = testf(10)', targets=0)
+ r = pull('r', targets=0)
+ self.assertEquals(r, testf(10))
+ ar = push({'testf':testf}, block=False)
+ ar.get()
+ ar = pull('testf', block=False)
+ rlist = ar.get()
+ for r in rlist:
+ self.assertEqual(r(1.0), testf(1.0))
+ execute("def g(x): return x*x", targets=0)
+ r = pull(('testf','g'),targets=0)
+ self.assertEquals((r[0](10),r[1](10)), (testf(10), 100))
+
+
@@ -1,4 +1,89 @@
+"""test serialization with newserialized"""
from unittest import TestCase
-# from zmq.tests import BaseZMQTest
+import nose.tools as nt
+
+from IPython.testing.parametric import parametric
+from IPython.utils import newserialized as ns
+from IPython.utils.pickleutil import can, uncan, CannedObject, CannedFunction
+from IPython.zmq.parallel.tests.clienttest import skip_without
+
+
+class CanningTestCase(TestCase):
+ def test_canning(self):
+ d = dict(a=5,b=6)
+ cd = can(d)
+ nt.assert_true(isinstance(cd, dict))
+
+ def test_canned_function(self):
+ f = lambda : 7
+ cf = can(f)
+ nt.assert_true(isinstance(cf, CannedFunction))
+
+ @parametric
+ def test_can_roundtrip(cls):
+ objs = [
+ dict(),
+ set(),
+ list(),
+ ['a',1,['a',1],u'e'],
+ ]
+ return map(cls.run_roundtrip, objs)
+
+ @classmethod
+ def run_roundtrip(cls, obj):
+ o = uncan(can(obj))
+ nt.assert_equals(obj, o)
+
+ def test_serialized_interfaces(self):
+
+ us = {'a':10, 'b':range(10)}
+ s = ns.serialize(us)
+ uus = ns.unserialize(s)
+ nt.assert_true(isinstance(s, ns.SerializeIt))
+ nt.assert_equals(uus, us)
+
+ def test_pickle_serialized(self):
+ obj = {'a':1.45345, 'b':'asdfsdf', 'c':10000L}
+ original = ns.UnSerialized(obj)
+ originalSer = ns.SerializeIt(original)
+ firstData = originalSer.getData()
+ firstTD = originalSer.getTypeDescriptor()
+ firstMD = originalSer.getMetadata()
+ nt.assert_equals(firstTD, 'pickle')
+ nt.assert_equals(firstMD, {})
+ unSerialized = ns.UnSerializeIt(originalSer)
+ secondObj = unSerialized.getObject()
+ for k, v in secondObj.iteritems():
+ nt.assert_equals(obj[k], v)
+ secondSer = ns.SerializeIt(ns.UnSerialized(secondObj))
+ nt.assert_equals(firstData, secondSer.getData())
+ nt.assert_equals(firstTD, secondSer.getTypeDescriptor() )
+ nt.assert_equals(firstMD, secondSer.getMetadata())
+
+ @skip_without('numpy')
+ def test_ndarray_serialized(self):
+ import numpy
+ a = numpy.linspace(0.0, 1.0, 1000)
+ unSer1 = ns.UnSerialized(a)
+ ser1 = ns.SerializeIt(unSer1)
+ td = ser1.getTypeDescriptor()
+ nt.assert_equals(td, 'ndarray')
+ md = ser1.getMetadata()
+ nt.assert_equals(md['shape'], a.shape)
+ nt.assert_equals(md['dtype'], a.dtype.str)
+ buff = ser1.getData()
+ nt.assert_equals(buff, numpy.getbuffer(a))
+ s = ns.Serialized(buff, td, md)
+ final = ns.unserialize(s)
+ nt.assert_equals(numpy.getbuffer(a), numpy.getbuffer(final))
+ nt.assert_true((a==final).all())
+ nt.assert_equals(a.dtype.str, final.dtype.str)
+ nt.assert_equals(a.shape, final.shape)
+ # test non-copying:
+ a[2] = 1e9
+ nt.assert_true((a==final).all())
+
+
+

0 comments on commit e0abb03

Please sign in to comment.