From 640e84e3b74943d3be63ca849d24480142cf3393 Mon Sep 17 00:00:00 2001 From: mmckerns Date: Thu, 26 Sep 2019 21:26:13 +0000 Subject: [PATCH] added shutdown helper method to destroy cached pools git-svn-id: svn+ssh://svn.mystic.cacr.caltech.edu/pathos/pathos@1246 8bfda07e-5b16-0410-ab1d-fd04ec2748df --- examples/async_map.py | 6 +++++- examples/mp_class_example.py | 2 ++ examples/nested.py | 4 +++- examples/pp_map.py | 2 ++ examples/test_mpmap.py | 4 +++- examples/test_mpmap2.py | 2 ++ examples/test_mpmap3.py | 5 ++++- examples/test_mpmap_dill.py | 3 +++ examples/test_ppmap.py | 2 ++ examples/test_ppmap2.py | 2 ++ examples2/all_scatter_gather.py | 10 ++++------ examples2/all_scatter_gather2.py | 9 ++++----- examples2/optimize_cheby_powell_mpmap.py | 3 ++- pathos/__init__.py | 1 + pathos/helpers/__init__.py | 2 ++ pathos/multiprocessing.py | 1 + pathos/parallel.py | 1 + pathos/pools.py | 20 ++++++++++++++++++++ pathos/serial.py | 1 + pathos/threading.py | 1 + tests/test_decorate.py | 3 ++- tests/test_join.py | 3 ++- tests/test_map.py | 3 ++- tests/test_mp.py | 3 ++- tests/test_pp.py | 1 + tests/test_star.py | 3 ++- tests/test_with.py | 3 ++- 27 files changed, 78 insertions(+), 22 deletions(-) diff --git a/examples/async_map.py b/examples/async_map.py index 38d6aff..612cc81 100644 --- a/examples/async_map.py +++ b/examples/async_map.py @@ -80,11 +80,15 @@ def test_ready(pool, f, maxtries, delay): #from pathos.pools import ProcessPool as Pool #from pathos.pools import ThreadPool as Pool from pathos.pools import ParallelPool as Pool - #from pathos.helpers import freeze_support + #from pathos.helpers import freeze_support, shutdown #freeze_support() pool = Pool(nodes=4) test_ready( pool, f, maxtries, delay ) + # shutdown + pool.close() + pool.join() + pool.clear() # EOF diff --git a/examples/mp_class_example.py b/examples/mp_class_example.py index 9745d02..a0e4aae 100644 --- a/examples/mp_class_example.py +++ b/examples/mp_class_example.py @@ -45,6 +45,8 @@ def parcompute_example(): r3 = ProcessPool(4).map(dc3.compute, inp_data) r4 = ThreadPool(4).map(dc4.compute, inp_data) + ProcessPool.__state__.clear() + ThreadPool.__state__.clear() assert(r4 == r3 == r2) assert(len(dc3.cache) == 0) assert(len(dc4.cache) == n_datapoints) diff --git a/examples/nested.py b/examples/nested.py index e07c782..ed779fa 100644 --- a/examples/nested.py +++ b/examples/nested.py @@ -21,7 +21,7 @@ def f(x,y): if __name__ == '__main__': - from pathos.helpers import freeze_support + from pathos.helpers import freeze_support, shutdown freeze_support() from pathos.pools import ProcessPool, ThreadPool @@ -39,4 +39,6 @@ def _f(m, g, x, y): print(amap(tmap, [sin,cos], [x,x]).get()) + shutdown() +# EOF diff --git a/examples/pp_map.py b/examples/pp_map.py index c3347d7..715aa0d 100755 --- a/examples/pp_map.py +++ b/examples/pp_map.py @@ -73,3 +73,5 @@ def busybeaver(x): print(pool.map(busybeaver, range(10))) print('Iteration time: %s' % (time.time() - start)) + # cleanup + pool.clear() diff --git a/examples/test_mpmap.py b/examples/test_mpmap.py index ca2aa52..a901627 100644 --- a/examples/test_mpmap.py +++ b/examples/test_mpmap.py @@ -12,7 +12,7 @@ def host(id): if __name__ == '__main__': - from pathos.helpers import freeze_support + from pathos.helpers import freeze_support, shutdown freeze_support() from pathos.pools import ProcessPool as Pool @@ -31,4 +31,6 @@ def host(id): print(pool) print('\n'.join(res5)) + shutdown() + # end of file diff --git a/examples/test_mpmap2.py b/examples/test_mpmap2.py index 79bce3b..79a2ad9 100755 --- a/examples/test_mpmap2.py +++ b/examples/test_mpmap2.py @@ -36,4 +36,6 @@ def host(id): print('\n'.join(res9)) print('') + tpool.clear() + # end of file diff --git a/examples/test_mpmap3.py b/examples/test_mpmap3.py index eebb391..ee14ddb 100755 --- a/examples/test_mpmap3.py +++ b/examples/test_mpmap3.py @@ -21,7 +21,7 @@ def inner(addend): if __name__ == '__main__': - from pathos.helpers import freeze_support + from pathos.helpers import freeze_support, shutdown freeze_support() from pathos.pools import ProcessPool as Pool @@ -50,4 +50,7 @@ def inner(addend): print(tpool.map(squ, range(10))) print('') + # shutdown all cached pools + shutdown() + # end of file diff --git a/examples/test_mpmap_dill.py b/examples/test_mpmap_dill.py index b6bde99..bb85ba2 100755 --- a/examples/test_mpmap_dill.py +++ b/examples/test_mpmap_dill.py @@ -90,4 +90,7 @@ def inner(addend): print('%s' % p2res) print('') + # shutdown the pool + pool.close() + # end of file diff --git a/examples/test_ppmap.py b/examples/test_ppmap.py index 41bdd84..61272c7 100755 --- a/examples/test_ppmap.py +++ b/examples/test_ppmap.py @@ -29,4 +29,6 @@ def host(id): print('\n'.join(res5)) print(stats()) +pool.clear() + # end of file diff --git a/examples/test_ppmap2.py b/examples/test_ppmap2.py index f60d1c0..a806d49 100755 --- a/examples/test_ppmap2.py +++ b/examples/test_ppmap2.py @@ -26,4 +26,6 @@ def host(id): print(stats()) print('') +pool.clear() + # end of file diff --git a/examples2/all_scatter_gather.py b/examples2/all_scatter_gather.py index bc37d23..81940ed 100644 --- a/examples2/all_scatter_gather.py +++ b/examples2/all_scatter_gather.py @@ -16,7 +16,7 @@ """ import numpy as np -from pathos.helpers import freeze_support +from pathos.helpers import freeze_support, shutdown from pathos.pools import ProcessPool from pathos.pools import ParallelPool from pathos.pools import ThreadPool @@ -43,35 +43,33 @@ def sin2(xi): x = np.arange(N * nodes, dtype=np.float64) print("Input: %s\n" % x) - # run sin2 in series, then print to screen print("Running serial python ...") y = list(map(sin2, x)) print("Output: %s\n" % np.asarray(y)) - if HAS_PYINA: # map sin2 to the workers, then print to screen print("Running mpi4py on %d cores..." % nodes) y = MpiPool(nodes).map(sin2, x) print("Output: %s\n" % np.asarray(y)) - # map sin2 to the workers, then print to screen print("Running multiprocesing on %d processors..." % nodes) y = ProcessPool(nodes).map(sin2, x) print("Output: %s\n" % np.asarray(y)) - # map sin2 to the workers, then print to screen print("Running multiprocesing on %d threads..." % nodes) y = ThreadPool(nodes).map(sin2, x) print("Output: %s\n" % np.asarray(y)) - # map sin2 to the workers, then print to screen print("Running parallelpython on %d cpus..." % nodes) y = ParallelPool(nodes).map(sin2, x) print("Output: %s\n" % np.asarray(y)) + # ensure all pools shutdown + shutdown() + # EOF diff --git a/examples2/all_scatter_gather2.py b/examples2/all_scatter_gather2.py index 09df787..138e193 100644 --- a/examples2/all_scatter_gather2.py +++ b/examples2/all_scatter_gather2.py @@ -16,7 +16,7 @@ """ import numpy as np -from pathos.helpers import freeze_support +from pathos.helpers import freeze_support, shutdown from pathos.pools import ProcessPool from pathos.pools import ParallelPool from pathos.pools import ThreadPool @@ -49,29 +49,28 @@ def sin_diff(x, xp): y = list(map(sin_diff, x, xp)) print("Output: %s\n" % np.asarray(y)) - if HAS_PYINA: # map sin_diff to the workers, then print to screen print("Running mpi4py on %d cores..." % nodes) y = MpiPool(nodes).map(sin_diff, x, xp) print("Output: %s\n" % np.asarray(y)) - # map sin_diff to the workers, then print to screen print("Running multiprocesing on %d processors..." % nodes) y = ProcessPool(nodes).map(sin_diff, x, xp) print("Output: %s\n" % np.asarray(y)) - # map sin_diff to the workers, then print to screen print("Running multiprocesing on %d threads..." % nodes) y = ThreadPool(nodes).map(sin_diff, x, xp) print("Output: %s\n" % np.asarray(y)) - # map sin_diff to the workers, then print to screen print("Running parallelpython on %d cpus..." % nodes) y = ParallelPool(nodes).map(sin_diff, x, xp) print("Output: %s\n" % np.asarray(y)) + # ensure all pools shutdown + shutdown() + # EOF diff --git a/examples2/optimize_cheby_powell_mpmap.py b/examples2/optimize_cheby_powell_mpmap.py index d04045c..e50ef15 100755 --- a/examples2/optimize_cheby_powell_mpmap.py +++ b/examples2/optimize_cheby_powell_mpmap.py @@ -61,12 +61,13 @@ def powell_chebyshev(x0, *args, **kwds): if __name__ == '__main__': - from pathos.helpers import freeze_support + from pathos.helpers import freeze_support, shutdown freeze_support() target = 'cheby' print("Function: %s" % target) print("Solver: %s" % 'fmin_powell') optimize(powell_chebyshev, mppool, nodes=10, target=target) + shutdown() # end of file diff --git a/pathos/__init__.py b/pathos/__init__.py index fc4d5f5..fc8df13 100644 --- a/pathos/__init__.py +++ b/pathos/__init__.py @@ -62,6 +62,7 @@ def logger(level=None, handler=None, **kwds): # tools, utilities, etc from . import util +from . import helpers # backward compatibility python = serial diff --git a/pathos/helpers/__init__.py b/pathos/helpers/__init__.py index 59c903b..89b1b4d 100644 --- a/pathos/helpers/__init__.py +++ b/pathos/helpers/__init__.py @@ -63,3 +63,5 @@ def _help_stuff_finish(inqueue, task_handler, size): from multiprocessing import freeze_support except ValueError: pass del HAS_FORK + +from pathos.pools import _clear as shutdown diff --git a/pathos/multiprocessing.py b/pathos/multiprocessing.py index afeb264..e32737c 100644 --- a/pathos/multiprocessing.py +++ b/pathos/multiprocessing.py @@ -210,6 +210,7 @@ def join(self): # interface ncpus = property(__get_nodes, __set_nodes) nodes = property(__get_nodes, __set_nodes) + __state__ = __STATE pass diff --git a/pathos/parallel.py b/pathos/parallel.py index 2f8a376..020bfef 100644 --- a/pathos/parallel.py +++ b/pathos/parallel.py @@ -420,6 +420,7 @@ def join(self): ncpus = property(__get_nodes, __set_nodes) nodes = property(__get_nodes, __set_nodes) servers = property(__get_servers, __set_servers) + __state__ = __STATE pass diff --git a/pathos/pools.py b/pathos/pools.py index 513fbac..c397411 100644 --- a/pathos/pools.py +++ b/pathos/pools.py @@ -9,9 +9,29 @@ pools: pools of pathos workers, providing map and pipe constructs """ +def _clear(type=None): + "destroy all cached pools (of the given type)" + pools = (ProcessPool, ThreadPool, ParallelPool, SerialPool) + _pools = (_ProcessPool, _ThreadPool) + #pools += _pools + if type is None: + for pool in pools: + pool.__state__.clear() + elif type in pools: + type.__state__.clear() + elif type in _pools: + msg = "use the close() method to shutdown" + raise NotImplementedError(msg) + else: + msg = "'%s' is not one of the pathos.pools" % type + raise TypeError(msg) + return + + from pathos.helpers import ProcessPool as _ProcessPool from pathos.helpers import ThreadPool as _ThreadPool from pathos.multiprocessing import ProcessPool from pathos.threading import ThreadPool from pathos.parallel import ParallelPool from pathos.serial import SerialPool + diff --git a/pathos/serial.py b/pathos/serial.py index acb744b..0e1978b 100644 --- a/pathos/serial.py +++ b/pathos/serial.py @@ -139,6 +139,7 @@ def clear(self): __get_nodes = __get_nodes__ __set_nodes = __set_nodes__ nodes = property(__get_nodes, __set_nodes) + __state__ = __STATE pass diff --git a/pathos/threading.py b/pathos/threading.py index 6cc2c5a..352d88f 100644 --- a/pathos/threading.py +++ b/pathos/threading.py @@ -207,6 +207,7 @@ def join(self): # interface nthreads = property(__get_nodes, __set_nodes) nodes = property(__get_nodes, __set_nodes) + __state__ = __STATE pass diff --git a/tests/test_decorate.py b/tests/test_decorate.py index b33fb19..4e62971 100644 --- a/tests/test_decorate.py +++ b/tests/test_decorate.py @@ -62,6 +62,7 @@ def test_wrap(): if __name__ == '__main__': - from pathos.helpers import freeze_support + from pathos.helpers import freeze_support, shutdown freeze_support() test_wrap() + shutdown() diff --git a/tests/test_join.py b/tests/test_join.py index 725aa51..39c436e 100644 --- a/tests/test_join.py +++ b/tests/test_join.py @@ -282,8 +282,9 @@ def test_nodes(): if __name__ == '__main__': - from pathos.helpers import freeze_support + from pathos.helpers import freeze_support, shutdown freeze_support() test_basic() test_rename() test_nodes() + shutdown() diff --git a/tests/test_map.py b/tests/test_map.py index d92e8a2..8b5967d 100644 --- a/tests/test_map.py +++ b/tests/test_map.py @@ -81,9 +81,10 @@ def test_threading(): print("CONFIG: items = %s" % items) print("") - from pathos.helpers import freeze_support + from pathos.helpers import freeze_support, shutdown freeze_support() test_serial() test_pp() test_processing() test_threading() + shutdown() diff --git a/tests/test_mp.py b/tests/test_mp.py index 2e5ada5..eb617f5 100644 --- a/tests/test_mp.py +++ b/tests/test_mp.py @@ -29,6 +29,7 @@ def test_mp(): if __name__ == '__main__': - from pathos.helpers import freeze_support + from pathos.helpers import freeze_support, shutdown freeze_support() test_mp() + shutdown() diff --git a/tests/test_pp.py b/tests/test_pp.py index 06b7c65..e6e22fb 100644 --- a/tests/test_pp.py +++ b/tests/test_pp.py @@ -21,6 +21,7 @@ def run_ppmap(obj): p = ParallelPool(2) x = [1,2,3] assert list(map(obj, x)) == p.map(obj, x) + p.clear() def test_pp(): diff --git a/tests/test_star.py b/tests/test_star.py index 76862ae..1f67796 100644 --- a/tests/test_star.py +++ b/tests/test_star.py @@ -188,8 +188,9 @@ def test_pp(): if __name__ == '__main__': - from pathos.helpers import freeze_support + from pathos.helpers import freeze_support, shutdown freeze_support() test_mp() test_tp() test_pp() + shutdown() diff --git a/tests/test_with.py b/tests/test_with.py index 9abd8cb..169d09d 100644 --- a/tests/test_with.py +++ b/tests/test_with.py @@ -66,7 +66,8 @@ def test_with_mp(): if __name__ == '__main__': - from pathos.helpers import freeze_support + from pathos.helpers import freeze_support, shutdown freeze_support() test_with_mp() test_with_pp() + shutdown()