Skip to content

Commit

Permalink
enable id kwd for pools with non-fixed default
Browse files Browse the repository at this point in the history
  • Loading branch information
mmckerns committed Jun 11, 2021
1 parent 186bbe6 commit 691ada8
Show file tree
Hide file tree
Showing 5 changed files with 26 additions and 10 deletions.
2 changes: 1 addition & 1 deletion pathos/abstract_launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ def __init__(self, *args, **kwds):
"""
object.__init__(self)#, *args, **kwds)
self.__init(*args, **kwds)
self._id = None
self._id = kwds.get('id', None)
return
def __enter__(self):
return self
Expand Down
4 changes: 3 additions & 1 deletion pathos/multiprocessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,9 @@ def __init__(self, *args, **kwds):
self.__nodes = kwds.get('ncpus', cpu_count())

# Create an identifier for the pool
self._id = 'pool'
self._id = kwds.get('id', None) #'pool'
if self._id is None:
self._id = self.__nodes

# Create a new server if one isn't already initialized
self._serve()
Expand Down
5 changes: 4 additions & 1 deletion pathos/parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,10 @@ def __init__(self, *args, **kwds):
#from _ppserver_config import ppservers as servers # config file

# Create an identifier for the pool
self._id = 'server'
self._id = kwds.get('id', None) #'server'
if self._id is None:
_nodes = str(ncpus) if type(ncpus) is int else '*'
self._id = '@'.join([_nodes, '+'.join(sorted(servers))])

#XXX: throws 'socket.error' when starting > 1 server with autodetect
# Create a new server if one isn't already initialized
Expand Down
4 changes: 3 additions & 1 deletion pathos/threading.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,9 @@ def __init__(self, *args, **kwds):
self.__nodes = kwds.get('nthreads', cpu_count())

# Create an identifier for the pool
self._id = 'threads'
self._id = kwds.get('id', None) #'threads'
if self._id is None:
self._id = self.__nodes

# Create a new server if one isn't already initialized
self._serve()
Expand Down
21 changes: 15 additions & 6 deletions tests/test_join.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ def check_basic(pool, state):


def check_nodes(pool, state):
tag = 'fixed' if pool._id == 'fixed' else None
new_pool = type(pool)

nodes = cpu_count()
Expand All @@ -162,7 +163,7 @@ def check_nodes(pool, state):
pool.close()

# doesn't create a new pool... IS IT BETTER IF IT DOES?
pool = new_pool()
pool = new_pool(id=tag)
try:
pool.map(squared, range(2))
except PoolClosedError:
Expand All @@ -174,7 +175,7 @@ def check_nodes(pool, state):
def nnodes(pool):
return getattr(pool, '_'+new_pool.__name__+'__nodes')
old_nodes = nnodes(pool)
pool = new_pool(nodes=half)
pool = new_pool(nodes=half, id=tag)
new_nodes = nnodes(pool)
if isinstance(pool, ParallelPool):
print('SKIPPING: new_pool check for ParallelPool')#FIXME
Expand All @@ -191,16 +192,21 @@ def nnodes(pool):
else:
raise AssertionError

# creates a new pool (nodes are different)
pool = new_pool()
# return to old number of nodes
if tag is None:
pool.clear() # clear 'half' pool
pool = new_pool(id=tag)
pool.restart() # restart old pool
else: # creates a new pool (update nodes)
pool = new_pool(id=tag)
if isinstance(pool, ParallelPool):
print('SKIPPING: new_pool check for ParallelPool')#FIXME
else:
res = pool.map(squared, range(2))
assert res == [0, 1]
pool.close()
# doesn't create a new pool... IS IT BETTER IF IT DOES?
pool = new_pool()
pool = new_pool(id=tag)
try:
pool.map(squared, range(2))
except PoolClosedError:
Expand All @@ -211,7 +217,7 @@ def nnodes(pool):
assert len(state) == 1
pool.clear()
assert len(state) == 0
pool = new_pool()
pool = new_pool(id=tag)
res = pool.map(squared, range(2))
assert res == [0, 1]
assert len(state) == 1
Expand Down Expand Up @@ -276,6 +282,9 @@ def test_rename():
check_rename(ParallelPool(), pstate)

def test_nodes():
check_nodes(ThreadPool(id='fixed'), tstate)
check_nodes(ProcessPool(id='fixed'), mstate)
check_nodes(ParallelPool(id='fixed'), pstate)
check_nodes(ThreadPool(), tstate)
check_nodes(ProcessPool(), mstate)
check_nodes(ParallelPool(), pstate)
Expand Down

0 comments on commit 691ada8

Please sign in to comment.