Skip to content

Commit

Permalink
added shutdown helper method to destroy cached pools
Browse files Browse the repository at this point in the history
git-svn-id: svn+ssh://svn.mystic.cacr.caltech.edu/pathos/pathos@1246 8bfda07e-5b16-0410-ab1d-fd04ec2748df
  • Loading branch information
mmckerns committed Sep 26, 2019
1 parent d4fc77a commit 640e84e
Show file tree
Hide file tree
Showing 27 changed files with 78 additions and 22 deletions.
6 changes: 5 additions & 1 deletion examples/async_map.py
Expand Up @@ -80,11 +80,15 @@ def test_ready(pool, f, maxtries, delay):
#from pathos.pools import ProcessPool as Pool
#from pathos.pools import ThreadPool as Pool
from pathos.pools import ParallelPool as Pool
#from pathos.helpers import freeze_support
#from pathos.helpers import freeze_support, shutdown
#freeze_support()

pool = Pool(nodes=4)
test_ready( pool, f, maxtries, delay )

# shutdown
pool.close()
pool.join()
pool.clear()

# EOF
2 changes: 2 additions & 0 deletions examples/mp_class_example.py
Expand Up @@ -45,6 +45,8 @@ def parcompute_example():

r3 = ProcessPool(4).map(dc3.compute, inp_data)
r4 = ThreadPool(4).map(dc4.compute, inp_data)
ProcessPool.__state__.clear()
ThreadPool.__state__.clear()
assert(r4 == r3 == r2)
assert(len(dc3.cache) == 0)
assert(len(dc4.cache) == n_datapoints)
Expand Down
4 changes: 3 additions & 1 deletion examples/nested.py
Expand Up @@ -21,7 +21,7 @@ def f(x,y):


if __name__ == '__main__':
from pathos.helpers import freeze_support
from pathos.helpers import freeze_support, shutdown
freeze_support()

from pathos.pools import ProcessPool, ThreadPool
Expand All @@ -39,4 +39,6 @@ def _f(m, g, x, y):

print(amap(tmap, [sin,cos], [x,x]).get())

shutdown()

# EOF
2 changes: 2 additions & 0 deletions examples/pp_map.py
Expand Up @@ -73,3 +73,5 @@ def busybeaver(x):
print(pool.map(busybeaver, range(10)))
print('Iteration time: %s' % (time.time() - start))

# cleanup
pool.clear()
4 changes: 3 additions & 1 deletion examples/test_mpmap.py
Expand Up @@ -12,7 +12,7 @@ def host(id):


if __name__ == '__main__':
from pathos.helpers import freeze_support
from pathos.helpers import freeze_support, shutdown
freeze_support()

from pathos.pools import ProcessPool as Pool
Expand All @@ -31,4 +31,6 @@ def host(id):
print(pool)
print('\n'.join(res5))

shutdown()

# end of file
2 changes: 2 additions & 0 deletions examples/test_mpmap2.py
Expand Up @@ -36,4 +36,6 @@ def host(id):
print('\n'.join(res9))
print('')

tpool.clear()

# end of file
5 changes: 4 additions & 1 deletion examples/test_mpmap3.py
Expand Up @@ -21,7 +21,7 @@ def inner(addend):


if __name__ == '__main__':
from pathos.helpers import freeze_support
from pathos.helpers import freeze_support, shutdown
freeze_support()

from pathos.pools import ProcessPool as Pool
Expand Down Expand Up @@ -50,4 +50,7 @@ def inner(addend):
print(tpool.map(squ, range(10)))
print('')

# shutdown all cached pools
shutdown()

# end of file
3 changes: 3 additions & 0 deletions examples/test_mpmap_dill.py
Expand Up @@ -90,4 +90,7 @@ def inner(addend):
print('%s' % p2res)
print('')

# shutdown the pool
pool.close()

# end of file
2 changes: 2 additions & 0 deletions examples/test_ppmap.py
Expand Up @@ -29,4 +29,6 @@ def host(id):
print('\n'.join(res5))
print(stats())

pool.clear()

# end of file
2 changes: 2 additions & 0 deletions examples/test_ppmap2.py
Expand Up @@ -26,4 +26,6 @@ def host(id):
print(stats())
print('')

pool.clear()

# end of file
10 changes: 4 additions & 6 deletions examples2/all_scatter_gather.py
Expand Up @@ -16,7 +16,7 @@
"""

import numpy as np
from pathos.helpers import freeze_support
from pathos.helpers import freeze_support, shutdown
from pathos.pools import ProcessPool
from pathos.pools import ParallelPool
from pathos.pools import ThreadPool
Expand All @@ -43,35 +43,33 @@ def sin2(xi):
x = np.arange(N * nodes, dtype=np.float64)
print("Input: %s\n" % x)


# run sin2 in series, then print to screen
print("Running serial python ...")
y = list(map(sin2, x))
print("Output: %s\n" % np.asarray(y))


if HAS_PYINA:
# map sin2 to the workers, then print to screen
print("Running mpi4py on %d cores..." % nodes)
y = MpiPool(nodes).map(sin2, x)
print("Output: %s\n" % np.asarray(y))


# map sin2 to the workers, then print to screen
print("Running multiprocesing on %d processors..." % nodes)
y = ProcessPool(nodes).map(sin2, x)
print("Output: %s\n" % np.asarray(y))


# map sin2 to the workers, then print to screen
print("Running multiprocesing on %d threads..." % nodes)
y = ThreadPool(nodes).map(sin2, x)
print("Output: %s\n" % np.asarray(y))


# map sin2 to the workers, then print to screen
print("Running parallelpython on %d cpus..." % nodes)
y = ParallelPool(nodes).map(sin2, x)
print("Output: %s\n" % np.asarray(y))

# ensure all pools shutdown
shutdown()

# EOF
9 changes: 4 additions & 5 deletions examples2/all_scatter_gather2.py
Expand Up @@ -16,7 +16,7 @@
"""

import numpy as np
from pathos.helpers import freeze_support
from pathos.helpers import freeze_support, shutdown
from pathos.pools import ProcessPool
from pathos.pools import ParallelPool
from pathos.pools import ThreadPool
Expand Down Expand Up @@ -49,29 +49,28 @@ def sin_diff(x, xp):
y = list(map(sin_diff, x, xp))
print("Output: %s\n" % np.asarray(y))


if HAS_PYINA:
# map sin_diff to the workers, then print to screen
print("Running mpi4py on %d cores..." % nodes)
y = MpiPool(nodes).map(sin_diff, x, xp)
print("Output: %s\n" % np.asarray(y))


# map sin_diff to the workers, then print to screen
print("Running multiprocesing on %d processors..." % nodes)
y = ProcessPool(nodes).map(sin_diff, x, xp)
print("Output: %s\n" % np.asarray(y))


# map sin_diff to the workers, then print to screen
print("Running multiprocesing on %d threads..." % nodes)
y = ThreadPool(nodes).map(sin_diff, x, xp)
print("Output: %s\n" % np.asarray(y))


# map sin_diff to the workers, then print to screen
print("Running parallelpython on %d cpus..." % nodes)
y = ParallelPool(nodes).map(sin_diff, x, xp)
print("Output: %s\n" % np.asarray(y))

# ensure all pools shutdown
shutdown()

# EOF
3 changes: 2 additions & 1 deletion examples2/optimize_cheby_powell_mpmap.py
Expand Up @@ -61,12 +61,13 @@ def powell_chebyshev(x0, *args, **kwds):


if __name__ == '__main__':
from pathos.helpers import freeze_support
from pathos.helpers import freeze_support, shutdown
freeze_support()
target = 'cheby'
print("Function: %s" % target)
print("Solver: %s" % 'fmin_powell')
optimize(powell_chebyshev, mppool, nodes=10, target=target)
shutdown()


# end of file
1 change: 1 addition & 0 deletions pathos/__init__.py
Expand Up @@ -62,6 +62,7 @@ def logger(level=None, handler=None, **kwds):

# tools, utilities, etc
from . import util
from . import helpers

# backward compatibility
python = serial
Expand Down
2 changes: 2 additions & 0 deletions pathos/helpers/__init__.py
Expand Up @@ -63,3 +63,5 @@ def _help_stuff_finish(inqueue, task_handler, size):
from multiprocessing import freeze_support
except ValueError: pass
del HAS_FORK

from pathos.pools import _clear as shutdown
1 change: 1 addition & 0 deletions pathos/multiprocessing.py
Expand Up @@ -210,6 +210,7 @@ def join(self):
# interface
ncpus = property(__get_nodes, __set_nodes)
nodes = property(__get_nodes, __set_nodes)
__state__ = __STATE
pass


Expand Down
1 change: 1 addition & 0 deletions pathos/parallel.py
Expand Up @@ -420,6 +420,7 @@ def join(self):
ncpus = property(__get_nodes, __set_nodes)
nodes = property(__get_nodes, __set_nodes)
servers = property(__get_servers, __set_servers)
__state__ = __STATE
pass


Expand Down
20 changes: 20 additions & 0 deletions pathos/pools.py
Expand Up @@ -9,9 +9,29 @@
pools: pools of pathos workers, providing map and pipe constructs
"""

def _clear(type=None):
"destroy all cached pools (of the given type)"
pools = (ProcessPool, ThreadPool, ParallelPool, SerialPool)
_pools = (_ProcessPool, _ThreadPool)
#pools += _pools
if type is None:
for pool in pools:
pool.__state__.clear()
elif type in pools:
type.__state__.clear()
elif type in _pools:
msg = "use the close() method to shutdown"
raise NotImplementedError(msg)
else:
msg = "'%s' is not one of the pathos.pools" % type
raise TypeError(msg)
return


from pathos.helpers import ProcessPool as _ProcessPool
from pathos.helpers import ThreadPool as _ThreadPool
from pathos.multiprocessing import ProcessPool
from pathos.threading import ThreadPool
from pathos.parallel import ParallelPool
from pathos.serial import SerialPool

1 change: 1 addition & 0 deletions pathos/serial.py
Expand Up @@ -139,6 +139,7 @@ def clear(self):
__get_nodes = __get_nodes__
__set_nodes = __set_nodes__
nodes = property(__get_nodes, __set_nodes)
__state__ = __STATE
pass


Expand Down
1 change: 1 addition & 0 deletions pathos/threading.py
Expand Up @@ -207,6 +207,7 @@ def join(self):
# interface
nthreads = property(__get_nodes, __set_nodes)
nodes = property(__get_nodes, __set_nodes)
__state__ = __STATE
pass


Expand Down
3 changes: 2 additions & 1 deletion tests/test_decorate.py
Expand Up @@ -62,6 +62,7 @@ def test_wrap():


if __name__ == '__main__':
from pathos.helpers import freeze_support
from pathos.helpers import freeze_support, shutdown
freeze_support()
test_wrap()
shutdown()
3 changes: 2 additions & 1 deletion tests/test_join.py
Expand Up @@ -282,8 +282,9 @@ def test_nodes():


if __name__ == '__main__':
from pathos.helpers import freeze_support
from pathos.helpers import freeze_support, shutdown
freeze_support()
test_basic()
test_rename()
test_nodes()
shutdown()
3 changes: 2 additions & 1 deletion tests/test_map.py
Expand Up @@ -81,9 +81,10 @@ def test_threading():
print("CONFIG: items = %s" % items)
print("")

from pathos.helpers import freeze_support
from pathos.helpers import freeze_support, shutdown
freeze_support()
test_serial()
test_pp()
test_processing()
test_threading()
shutdown()
3 changes: 2 additions & 1 deletion tests/test_mp.py
Expand Up @@ -29,6 +29,7 @@ def test_mp():


if __name__ == '__main__':
from pathos.helpers import freeze_support
from pathos.helpers import freeze_support, shutdown
freeze_support()
test_mp()
shutdown()
1 change: 1 addition & 0 deletions tests/test_pp.py
Expand Up @@ -21,6 +21,7 @@ def run_ppmap(obj):
p = ParallelPool(2)
x = [1,2,3]
assert list(map(obj, x)) == p.map(obj, x)
p.clear()


def test_pp():
Expand Down
3 changes: 2 additions & 1 deletion tests/test_star.py
Expand Up @@ -188,8 +188,9 @@ def test_pp():


if __name__ == '__main__':
from pathos.helpers import freeze_support
from pathos.helpers import freeze_support, shutdown
freeze_support()
test_mp()
test_tp()
test_pp()
shutdown()
3 changes: 2 additions & 1 deletion tests/test_with.py
Expand Up @@ -66,7 +66,8 @@ def test_with_mp():


if __name__ == '__main__':
from pathos.helpers import freeze_support
from pathos.helpers import freeze_support, shutdown
freeze_support()
test_with_mp()
test_with_pp()
shutdown()

0 comments on commit 640e84e

Please sign in to comment.