Skip to content

Commit

Permalink
fix: #253 add chunksize to mp maps
Browse files Browse the repository at this point in the history
  • Loading branch information
mmckerns committed Dec 19, 2022
1 parent 5e6ac7e commit d7e9e0b
Show file tree
Hide file tree
Showing 6 changed files with 99 additions and 14 deletions.
1 change: 1 addition & 0 deletions pathos/abstract_launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ def __init__(self, *args, **kwds):
def __enter__(self):
return self
def __exit__(self, *args):
#self.clear()
return
def __init(self, *args, **kwds):
"""default filter for __init__ inputs
Expand Down
8 changes: 4 additions & 4 deletions pathos/multiprocessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,22 +132,22 @@ def _clear(self): #XXX: should be STATE method; use id
def map(self, f, *args, **kwds):
AbstractWorkerPool._AbstractWorkerPool__map(self, f, *args, **kwds)
_pool = self._serve()
return _pool.map(star(f), zip(*args)) # chunksize
return _pool.map(star(f), zip(*args), **kwds)
map.__doc__ = AbstractWorkerPool.map.__doc__
def imap(self, f, *args, **kwds):
AbstractWorkerPool._AbstractWorkerPool__imap(self, f, *args, **kwds)
_pool = self._serve()
return _pool.imap(star(f), zip(*args)) # chunksize
return _pool.imap(star(f), zip(*args), **kwds)
imap.__doc__ = AbstractWorkerPool.imap.__doc__
def uimap(self, f, *args, **kwds):
AbstractWorkerPool._AbstractWorkerPool__imap(self, f, *args, **kwds)
_pool = self._serve()
return _pool.imap_unordered(star(f), zip(*args)) # chunksize
return _pool.imap_unordered(star(f), zip(*args), **kwds)
uimap.__doc__ = AbstractWorkerPool.uimap.__doc__
def amap(self, f, *args, **kwds): # register a callback ?
AbstractWorkerPool._AbstractWorkerPool__map(self, f, *args, **kwds)
_pool = self._serve()
return _pool.map_async(star(f), zip(*args)) # chunksize
return _pool.map_async(star(f), zip(*args), **kwds)
amap.__doc__ = AbstractWorkerPool.amap.__doc__
########################################################################
# PIPES
Expand Down
8 changes: 4 additions & 4 deletions pathos/parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ def _clear(self): #XXX: should be STATE method; use id
clear = _clear
def map(self, f, *args, **kwds):
AbstractWorkerPool._AbstractWorkerPool__map(self, f, *args, **kwds)
return list(self.imap(f, *args))
return list(self.imap(f, *args)) # chunksize
map.__doc__ = AbstractWorkerPool.map.__doc__
def imap(self, f, *args, **kwds):
AbstractWorkerPool._AbstractWorkerPool__imap(self, f, *args, **kwds)
Expand All @@ -236,7 +236,7 @@ def submit(*argz):
except pp.DestroyedServerError:
self._is_alive(None)
# submit all jobs, then collect results as they become available
return (subproc() for subproc in list(builtins.map(submit, *args)))
return (subproc() for subproc in list(builtins.map(submit, *args))) # chunksize
imap.__doc__ = AbstractWorkerPool.imap.__doc__
def uimap(self, f, *args, **kwds):
AbstractWorkerPool._AbstractWorkerPool__imap(self, f, *args, **kwds)
Expand All @@ -260,7 +260,7 @@ def imap_unordered(it):
# *subprocess* # alternately, loop in a subprocess
return #raise StopIteration
# submit all jobs, then collect results as they become available
return imap_unordered(builtins.map(submit, *args))
return imap_unordered(builtins.map(submit, *args)) # chunksize
uimap.__doc__ = AbstractWorkerPool.uimap.__doc__
def amap(self, f, *args, **kwds):
AbstractWorkerPool._AbstractWorkerPool__map(self, f, *args, **kwds)
Expand Down Expand Up @@ -289,7 +289,7 @@ def submit(*argz):
if not nodes: nodes = 1
# try to quickly find a small chunksize that gives good results
maxsize = 2**62 #XXX: HOPEFULLY, this will never be reached...
chunksize = 1
chunksize = 1 # chunksize
while chunksize < maxsize:
chunksize, extra = divmod(length, nodes * elem_size)
if override: break # the user *wants* to override this loop
Expand Down
4 changes: 2 additions & 2 deletions pathos/serial.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,12 +72,12 @@ class SerialPool(AbstractWorkerPool):
def map(self, f, *args, **kwds):
#AbstractWorkerPool._AbstractWorkerPool__map(self, f, *args, **kwds)
if self._exiting: self._is_alive()
return _map(f, *args)#, **kwds)
return _map(f, *args)#, **kwds) # chunksize
map.__doc__ = AbstractWorkerPool.map.__doc__
def imap(self, f, *args, **kwds):
#AbstractWorkerPool._AbstractWorkerPool__imap(self, f, *args, **kwds)
if self._exiting: self._is_alive()
return _imap(f, *args)#, **kwds)
return _imap(f, *args)#, **kwds) # chunksize
imap.__doc__ = AbstractWorkerPool.imap.__doc__
########################################################################
# PIPES
Expand Down
84 changes: 84 additions & 0 deletions pathos/tests/test_mp.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,92 @@ def test_mp():
assert result == _result


def test_chunksize():
# instantiate and configure the worker pool
from pathos.pools import ProcessPool, _ProcessPool, ThreadPool
from pathos.helpers.mp_helper import starargs as star
pool = _ProcessPool(4)
ppool = ProcessPool(4)
tpool = ThreadPool(4)

# do a blocking map on the chosen function
result1 = pool.map(star(pow), zip([1,2,3,4],[5,6,7,8]), 1)
assert result1 == ppool.map(pow, [1,2,3,4], [5,6,7,8], chunksize=1)
assert result1 == tpool.map(pow, [1,2,3,4], [5,6,7,8], chunksize=1)
result0 = pool.map(star(pow), zip([1,2,3,4],[5,6,7,8]), 0)
assert result0 == ppool.map(pow, [1,2,3,4], [5,6,7,8], chunksize=0)
assert result0 == tpool.map(pow, [1,2,3,4], [5,6,7,8], chunksize=0)

# do an asynchronous map, then get the results
result1 = pool.map_async(star(pow), zip([1,2,3,4],[5,6,7,8]), 1).get()
assert result1 == ppool.amap(pow, [1,2,3,4], [5,6,7,8], chunksize=1).get()
assert result1 == tpool.amap(pow, [1,2,3,4], [5,6,7,8], chunksize=1).get()
result0 = pool.map_async(star(pow), zip([1,2,3,4],[5,6,7,8]), 0).get()
assert result0 == ppool.amap(pow, [1,2,3,4], [5,6,7,8], chunksize=0).get()
assert result0 == tpool.amap(pow, [1,2,3,4], [5,6,7,8], chunksize=0).get()

# do a non-blocking map, then extract the result from the iterator
result1 = list(pool.imap(star(pow), zip([1,2,3,4],[5,6,7,8]), 1))
assert result1 == list(ppool.imap(pow, [1,2,3,4], [5,6,7,8], chunksize=1))
assert result1 == list(tpool.imap(pow, [1,2,3,4], [5,6,7,8], chunksize=1))
try:
list(pool.imap(star(pow), zip([1,2,3,4],[5,6,7,8]), 0))
error = AssertionError
except Exception:
import sys
error = sys.exc_info()[0]
try:
list(ppool.imap(pow, [1,2,3,4], [5,6,7,8], chunksize=0))
assert False
except error:
pass
except Exception:
import sys
e = sys.exc_info()[1]
raise AssertionError(str(e))
try:
list(tpool.imap(pow, [1,2,3,4], [5,6,7,8], chunksize=0))
assert False
except error:
pass
except Exception:
import sys
e = sys.exc_info()[1]
raise AssertionError(str(e))

# do a non-blocking map, then extract the result from the iterator
res1 = sorted(pool.imap_unordered(star(pow), zip([1,2,3,4],[5,6,7,8]), 1))
assert res1 == sorted(ppool.uimap(pow, [1,2,3,4], [5,6,7,8], chunksize=1))
assert res1 == sorted(tpool.uimap(pow, [1,2,3,4], [5,6,7,8], chunksize=1))
try:
sorted(pool.imap_unordered(star(pow), zip([1,2,3,4],[5,6,7,8]), 0))
error = AssertionError
except Exception:
import sys
error = sys.exc_info()[0]
try:
sorted(ppool.uimap(pow, [1,2,3,4], [5,6,7,8], chunksize=0))
assert False
except error:
pass
except Exception:
import sys
e = sys.exc_info()[1]
raise AssertionError(str(e))
try:
sorted(tpool.uimap(pow, [1,2,3,4], [5,6,7,8], chunksize=0))
assert False
except error:
pass
except Exception:
import sys
e = sys.exc_info()[1]
raise AssertionError(str(e))


if __name__ == '__main__':
from pathos.helpers import freeze_support, shutdown
freeze_support()
test_mp()
test_chunksize()
shutdown()
8 changes: 4 additions & 4 deletions pathos/threading.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,22 +129,22 @@ def _clear(self): #XXX: should be STATE method; use id
def map(self, f, *args, **kwds):
AbstractWorkerPool._AbstractWorkerPool__map(self, f, *args, **kwds)
_pool = self._serve()
return _pool.map(star(f), zip(*args)) # chunksize
return _pool.map(star(f), zip(*args), **kwds)
map.__doc__ = AbstractWorkerPool.map.__doc__
def imap(self, f, *args, **kwds):
AbstractWorkerPool._AbstractWorkerPool__imap(self, f, *args, **kwds)
_pool = self._serve()
return _pool.imap(star(f), zip(*args)) # chunksize
return _pool.imap(star(f), zip(*args), **kwds)
imap.__doc__ = AbstractWorkerPool.imap.__doc__
def uimap(self, f, *args, **kwds):
AbstractWorkerPool._AbstractWorkerPool__imap(self, f, *args, **kwds)
_pool = self._serve()
return _pool.imap_unordered(star(f), zip(*args)) # chunksize
return _pool.imap_unordered(star(f), zip(*args), **kwds)
uimap.__doc__ = AbstractWorkerPool.uimap.__doc__
def amap(self, f, *args, **kwds): # register a callback ?
AbstractWorkerPool._AbstractWorkerPool__map(self, f, *args, **kwds)
_pool = self._serve()
return _pool.map_async(star(f), zip(*args)) # chunksize
return _pool.map_async(star(f), zip(*args), **kwds)
amap.__doc__ = AbstractWorkerPool.amap.__doc__
########################################################################
# PIPES
Expand Down

0 comments on commit d7e9e0b

Please sign in to comment.