Skip to content

Commit

Permalink
improve multiprocessing test cases (#57)
Browse files Browse the repository at this point in the history
  • Loading branch information
svenkreiss committed May 1, 2017
1 parent 4595a7c commit 75b7f26
Showing 1 changed file with 106 additions and 109 deletions.
215 changes: 106 additions & 109 deletions tests/test_multiprocessing.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from __future__ import print_function
from __future__ import division, print_function

import cloudpickle
from concurrent import futures
Expand All @@ -8,160 +8,160 @@
import os
import pickle
import pprint
from pysparkling import Context
from random import random, choice
import pysparkling
import random
import time
import timeit
import unittest


def test_multiprocessing():
p = multiprocessing.Pool(4)
c = Context(pool=p, serializer=cloudpickle.dumps,
deserializer=pickle.loads)
my_rdd = c.parallelize([1, 3, 4])
r = my_rdd.map(lambda x: x ** 2).collect()
print(r)
assert 16 in r
class Processor(object):
"""This modifies lines but also keeps track whether it was executed."""
def __init__(self):
self.executed = False

def indent_line(self, line):
self.executed = True
return '--- {}'.format(line)

def test_concurrent():
with futures.ThreadPoolExecutor(4) as p:
my_rdd = Context(pool=p).parallelize([1, 3, 4])
r = my_rdd.map(math.sqrt).collect()
print(r)
assert 2 in r

class LazyTestInjection(object):
def lazy_execution_test(self):
r = self.sc.textFile('tests/test_multiprocessing.py')

def test_first_mp():
p = multiprocessing.Pool(4)
c = Context(pool=p, serializer=cloudpickle.dumps,
deserializer=pickle.loads)
my_rdd = c.parallelize([1, 2, 2, 4, 1, 3, 5, 9], 3)
print(my_rdd.first())
assert my_rdd.first() == 1
processor = Processor()

r = r.map(processor.indent_line)
self.assertFalse(processor.executed)
r = r.map(processor.indent_line).cache()
self.assertFalse(processor.executed)
r = r.map(processor.indent_line)
r.collect()
self.assertTrue(processor.executed)

def test_lazy_execution():

class I(object):
def __init__(self):
self.executed = False
class Multiprocessing(unittest.TestCase):
def setUp(self):
pool = multiprocessing.Pool(4)
self.sc = pysparkling.Context(pool=pool,
serializer=cloudpickle.dumps,
deserializer=pickle.loads)

def indent_line(self, l):
# global indent_was_executed
self.executed = True
return '--- ' + l
def test_basic(self):
my_rdd = self.sc.parallelize([1, 3, 4])
r = my_rdd.map(lambda x: x ** 2).collect()
self.assertIn(16, r)

r = Context().textFile('tests/test_multiprocessing.py')
i = I()
def test_first(self):
my_rdd = self.sc.parallelize([1, 2, 2, 4, 1, 3, 5, 9], 3)
self.assertEqual(my_rdd.first(), 1)

r = r.map(i.indent_line)
exec_before_collect = i.executed
# at this point, no map() or foreach() should have been executed
r = r.map(i.indent_line).cache()
print(r.collect())
r = r.map(i.indent_line)
r.collect()
exec_after_collect = i.executed
print((exec_before_collect, exec_after_collect))
assert not exec_before_collect and exec_after_collect

class NotParallel(unittest.TestCase, LazyTestInjection):
"""Test cases in the spirit of the parallel test cases for reference."""

def test_lazy_execution_threadpool():
def indent_line(l):
return '--- ' + l
def setUp(self):
self.sc = pysparkling.Context()

with futures.ThreadPoolExecutor(4) as p:
r = Context(pool=p).textFile('tests/test_multiprocessing.py')
r = r.map(indent_line).cache()
r.collect()
r = r.map(indent_line)
r = r.collect()
# ThreadPool is not lazy although it returns generators.
print(r)
assert '--- --- from pysparkling import Context' in r


def test_lazy_execution_processpool():
def indent_line(l):
return '--- ' + l

with futures.ProcessPoolExecutor(4) as p:
r = Context(
pool=p,
serializer=cloudpickle.dumps,
deserializer=pickle.loads,
).textFile('tests/test_multiprocessing.py') # .take(10)
print(r.collect())
r = r.map(indent_line)
print(r.collect())
r = r.cache()
print(r.collect())
r = r.map(indent_line)
r = r.collect()
# ProcessPool is not lazy although it returns generators.
print(r)
assert '--- --- from pysparkling import Context' in r


def test_processpool_distributed_cache():
with futures.ProcessPoolExecutor(4) as p:
r = Context(
pool=p,
serializer=cloudpickle.dumps,
deserializer=pickle.loads,
).parallelize(range(3), 3)

class ThreadPool(unittest.TestCase, LazyTestInjection):
def setUp(self):
self.pool = futures.ThreadPoolExecutor(4)
self.sc = pysparkling.Context(pool=self.pool)

def tearDown(self):
self.pool.shutdown()

def test_basic(self):
r = self.sc.parallelize([1, 3, 4]).map(math.sqrt).collect()
self.assertIn(2, r)


class ProcessPool(unittest.TestCase): # cannot work here: LazyTestInjection):
def setUp(self):
self.pool = futures.ProcessPoolExecutor(4)
self.sc = pysparkling.Context(pool=self.pool,
serializer=cloudpickle.dumps,
deserializer=pickle.loads)

def tearDown(self):
self.pool.shutdown()

def test_basic(self):
r = self.sc.parallelize([1, 3, 4]).map(math.sqrt).collect()
self.assertIn(2, r)

def test_cache(self):
r = self.sc.parallelize(range(3), 3)
r = r.map(lambda _: time.sleep(0.1)).cache()
r.collect()

time_start = time.time()
print(r.collect())
time_end = time.time()
assert time_end - time_start < 0.3
start = time.time()
r.collect()
self.assertLess(time.time() - start, 0.1)


class ProcessPoolIdlePerformance(unittest.TestCase):
"""Idle performance tests.
The "load" on these tests are sleeps.
"""

def runtime(self, n=10, processes=1):
start = time.time()
with futures.ProcessPoolExecutor(processes) as pool:
sc = pysparkling.Context(pool=pool,
serializer=cloudpickle.dumps,
deserializer=pickle.loads)
rdd = sc.parallelize(range(n), 10)
rdd.map(lambda _: time.sleep(0.1)).collect()
return time.time() - start

def test_basic(self):
t1 = self.runtime(processes=1)
t10 = self.runtime(processes=10)
self.assertLess(t10, t1 / 2.0)


# pickle-able map function
def map1(ft):
return [choice(ft[1].split()) for _ in range(1000)]
return [random.choice(ft[1].split()) for _ in range(1000)]


def map_pi(n):
return sum((
1 for x in (random() ** 2 + random() ** 2 for _ in range(n))
1 for x in (random.random() ** 2 + random.random() ** 2
for _ in range(n))
if x < 1.0
))


@unittest.skipIf(os.getenv('PERFORMANCE', False) is False,
"no PERFORMANCE=1 env")
'PERFORMANCE env variable not set')
def test_performance():
# not pickle-able map function
# def map2(ft):
# return [random.choice(ft[1].split()) for _ in range(1000)]

def create_context(n_processes=0):
if not n_processes:
return Context()
return pysparkling.Context()

p = futures.ProcessPoolExecutor(n_processes)
return Context(
pool=p,
serializer=cloudpickle.dumps,
# serializer=pickle.dumps,
deserializer=pickle.loads,
)
pool = futures.ProcessPoolExecutor(n_processes)
return pysparkling.Context(pool=pool,
serializer=cloudpickle.dumps,
# serializer=pickle.dumps,
deserializer=pickle.loads)

def test(n_processes):
c = create_context(n_processes)
sc = create_context(n_processes)
t = timeit.Timer(
# lambda: c.wholeTextFiles('tests/*.py').map(map1).collect()
lambda: c.parallelize(
lambda: sc.parallelize(
[1000 for _ in range(100)],
100,
).map(map_pi).collect()
).timeit(number=10)
return (t, c._stats)
return (t, sc._stats)

print('starting processing')
n_cpu = multiprocessing.cpu_count()
Expand All @@ -179,9 +179,6 @@ def test(n_processes):
for n, v in test_results.items()
})

# running on two cores takes less than 70% of the time running on one
assert test_results[2][0] / test_results[1][0] < 0.7

return (n_cpu, test_results)


Expand Down

0 comments on commit 75b7f26

Please sign in to comment.