Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 0 additions & 35 deletions .github/workflows/check-macos-latest.yml

This file was deleted.

2 changes: 1 addition & 1 deletion .github/workflows/unittest-mpich.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ jobs:
strategy:
matrix:
include:
- operating-system: macos-11
- operating-system: macos-latest
python-version: '3.12'
label: osx-64-py-3-12-mpich
prefix: /Users/runner/miniconda3/envs/my-env
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/unittest-openmpi.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ jobs:
strategy:
matrix:
include:
- operating-system: macos-11
- operating-system: macos-latest
python-version: '3.12'
label: osx-64-py-3-12-openmpi
prefix: /Users/runner/miniconda3/envs/my-env
Expand Down
5 changes: 5 additions & 0 deletions pympipool/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ def __init__(
cwd=None,
sleep_interval=0.1,
executor=None,
hostname_localhost=False,
):
# Use __new__() instead of __init__(). This function is only implemented to enable auto-completion.
pass
Expand All @@ -87,6 +88,7 @@ def __new__(
cwd=None,
sleep_interval=0.1,
executor=None,
hostname_localhost=False,
):
"""
Instead of returning a pympipool.Executor object this function returns either a pympipool.mpi.PyMPIExecutor,
Expand Down Expand Up @@ -120,6 +122,7 @@ def __new__(
init_function=init_function,
cwd=cwd,
sleep_interval=sleep_interval,
hostname_localhost=hostname_localhost,
)
elif slurm_installed:
return PySlurmExecutor(
Expand All @@ -128,6 +131,7 @@ def __new__(
init_function=init_function,
cwd=cwd,
sleep_interval=sleep_interval,
hostname_localhost=hostname_localhost,
)
else:
if threads_per_core != 1:
Expand All @@ -150,4 +154,5 @@ def __new__(
init_function=init_function,
cwd=cwd,
sleep_interval=sleep_interval,
hostname_localhost=hostname_localhost,
)
7 changes: 7 additions & 0 deletions pympipool/flux/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ class PyFluxExecutor(ExecutorBase):
cwd (str/None): current working directory where the parallel python task is executed
sleep_interval (float): synchronization interval - default 0.1
executor (flux.job.FluxExecutor): Flux Python interface to submit the workers to flux
hostname_localhost (boolean): use localhost as hostname to establish the zmq connection

Examples:

Expand Down Expand Up @@ -60,6 +61,7 @@ def __init__(
cwd=None,
sleep_interval=0.1,
executor=None,
hostname_localhost=False,
):
super().__init__()
self._process = RaisingThread(
Expand All @@ -69,6 +71,7 @@ def __init__(
"future_queue": self._future_queue,
"max_workers": max_workers,
"sleep_interval": sleep_interval,
"hostname_localhost": hostname_localhost,
"executor_class": PyFluxSingleTaskExecutor,
# Executor Arguments
"cores": cores_per_worker,
Expand All @@ -93,6 +96,8 @@ class PyFluxSingleTaskExecutor(ExecutorBase):
init_function (None): optional function to preset arguments for functions which are submitted later
cwd (str/None): current working directory where the parallel python task is executed
executor (flux.job.FluxExecutor): Flux Python interface to submit the workers to flux
hostname_localhost (boolean): use localhost as hostname to establish the zmq connection

"""

def __init__(
Expand All @@ -103,6 +108,7 @@ def __init__(
init_function=None,
cwd=None,
executor=None,
hostname_localhost=False,
):
super().__init__()
self._process = RaisingThread(
Expand All @@ -112,6 +118,7 @@ def __init__(
"future_queue": self._future_queue,
"cores": cores,
"interface_class": FluxPythonInterface,
"hostname_localhost": hostname_localhost,
# Interface Arguments
"threads_per_core": threads_per_core,
"gpus_per_core": gpus_per_task,
Expand Down
6 changes: 6 additions & 0 deletions pympipool/mpi/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ class PyMPIExecutor(ExecutorBase):
init_function (None): optional function to preset arguments for functions which are submitted later
cwd (str/None): current working directory where the parallel python task is executed
sleep_interval (float): synchronization interval - default 0.1
hostname_localhost (boolean): use localhost as hostname to establish the zmq connection

Examples:

Expand Down Expand Up @@ -53,6 +54,7 @@ def __init__(
init_function=None,
cwd=None,
sleep_interval=0.1,
hostname_localhost=False,
):
super().__init__()
self._process = RaisingThread(
Expand All @@ -63,6 +65,7 @@ def __init__(
"max_workers": max_workers,
"sleep_interval": sleep_interval,
"executor_class": PyMPISingleTaskExecutor,
"hostname_localhost": hostname_localhost,
# Executor Arguments
"cores": cores_per_worker,
"oversubscribe": oversubscribe,
Expand All @@ -82,6 +85,7 @@ class PyMPISingleTaskExecutor(ExecutorBase):
oversubscribe (bool): adds the `--oversubscribe` command line flag (OpenMPI only) - default False
init_function (None): optional function to preset arguments for functions which are submitted later
cwd (str/None): current working directory where the parallel python task is executed
hostname_localhost (boolean): use localhost as hostname to establish the zmq connection

"""

Expand All @@ -91,6 +95,7 @@ def __init__(
oversubscribe=False,
init_function=None,
cwd=None,
hostname_localhost=False,
):
super().__init__()
self._process = RaisingThread(
Expand All @@ -103,6 +108,7 @@ def __init__(
# Interface Arguments
"cwd": cwd,
"oversubscribe": oversubscribe,
"hostname_localhost": hostname_localhost,
},
)
self._process.start()
Expand Down
10 changes: 6 additions & 4 deletions pympipool/shared/communication.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,11 +98,13 @@ def __del__(self):
def interface_bootup(
command_lst,
connections,
hostname_localhost=False,
):
command_lst += [
"--host",
gethostname(),
]
if not hostname_localhost:
command_lst += [
"--host",
gethostname(),
]
interface = SocketInterface(interface=connections)
command_lst += [
"--zmqport",
Expand Down
3 changes: 3 additions & 0 deletions pympipool/shared/executorbase.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ def execute_parallel_tasks(
future_queue,
cores,
interface_class,
hostname_localhost=False,
**kwargs,
):
"""
Expand All @@ -128,11 +129,13 @@ def execute_parallel_tasks(
future_queue (queue.Queue): task queue of dictionary objects which are submitted to the parallel process
cores (int): defines the total number of MPI ranks to use
interface_class:
hostname_localhost (boolean): use localhost as hostname to establish the zmq connection
"""
execute_parallel_tasks_loop(
interface=interface_bootup(
command_lst=_get_backend_path(cores=cores),
connections=interface_class(cores=cores, **kwargs),
hostname_localhost=hostname_localhost,
),
future_queue=future_queue,
)
Expand Down
6 changes: 6 additions & 0 deletions pympipool/slurm/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ class PySlurmExecutor(ExecutorBase):
init_function (None): optional function to preset arguments for functions which are submitted later
cwd (str/None): current working directory where the parallel python task is executed
sleep_interval (float): synchronization interval - default 0.1
hostname_localhost (boolean): use localhost as hostname to establish the zmq connection

Examples:

Expand Down Expand Up @@ -56,6 +57,7 @@ def __init__(
init_function=None,
cwd=None,
sleep_interval=0.1,
hostname_localhost=False,
):
super().__init__()
self._process = RaisingThread(
Expand All @@ -65,6 +67,7 @@ def __init__(
"future_queue": self._future_queue,
"max_workers": max_workers,
"sleep_interval": sleep_interval,
"hostname_localhost": hostname_localhost,
"executor_class": PySlurmSingleTaskExecutor,
# Executor Arguments
"cores": cores_per_worker,
Expand All @@ -89,6 +92,7 @@ class PySlurmSingleTaskExecutor(ExecutorBase):
oversubscribe (bool): adds the `--oversubscribe` command line flag (OpenMPI only) - default False
init_function (None): optional function to preset arguments for functions which are submitted later
cwd (str/None): current working directory where the parallel python task is executed
hostname_localhost (boolean): use localhost as hostname to establish the zmq connection

"""

Expand All @@ -100,6 +104,7 @@ def __init__(
oversubscribe=False,
init_function=None,
cwd=None,
hostname_localhost=False,
):
super().__init__()
self._process = RaisingThread(
Expand All @@ -114,6 +119,7 @@ def __init__(
"gpus_per_core": gpus_per_task,
"cwd": cwd,
"oversubscribe": oversubscribe,
"hostname_localhost": hostname_localhost,
},
)
self._process.start()
Expand Down
8 changes: 4 additions & 4 deletions tests/test_future.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ def calc(i):

class TestFuture(unittest.TestCase):
def test_pool_serial(self):
with PyMPISingleTaskExecutor(cores=1) as p:
with PyMPISingleTaskExecutor(cores=1, hostname_localhost=True) as p:
output = p.submit(calc, i=2)
self.assertTrue(isinstance(output, Future))
self.assertFalse(output.done())
Expand All @@ -20,7 +20,7 @@ def test_pool_serial(self):
self.assertEqual(output.result(), np.array(4))

def test_pool_serial_multi_core(self):
with PyMPISingleTaskExecutor(cores=2) as p:
with PyMPISingleTaskExecutor(cores=2, hostname_localhost=True) as p:
output = p.submit(calc, i=2)
self.assertTrue(isinstance(output, Future))
self.assertFalse(output.done())
Expand Down Expand Up @@ -48,7 +48,7 @@ def callback(future):
def submit():
# Executor only exists in this scope and can get garbage collected after
# this function is exits
future = PyMPISingleTaskExecutor().submit(slow_callable)
future = PyMPISingleTaskExecutor(hostname_localhost=True).submit(slow_callable)
future.add_done_callback(callback)
return future

Expand Down Expand Up @@ -84,7 +84,7 @@ def __init__(self):
def run(self):
self.running = True

future = PyMPISingleTaskExecutor().submit(self.return_42)
future = PyMPISingleTaskExecutor(hostname_localhost=True).submit(self.return_42)
future.add_done_callback(self.finished)

return future
Expand Down
15 changes: 9 additions & 6 deletions tests/test_meta.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ def test_meta_executor_future(self):
meta_future = _get_executor_dict(
max_workers=1,
executor_class=PyMPISingleTaskExecutor,
hostname_localhost=True,
)
future_obj = list(meta_future.keys())[0]
executor_obj = list(meta_future.values())[0]
Expand All @@ -47,6 +48,7 @@ def test_execute_task_dict(self):
meta_future_lst = _get_executor_dict(
max_workers=1,
executor_class=PyMPISingleTaskExecutor,
hostname_localhost=True,
)
f = Future()
self.assertTrue(
Expand All @@ -68,6 +70,7 @@ def test_execute_task_dict_error(self):
meta_future_lst = _get_executor_dict(
max_workers=1,
executor_class=PyMPISingleTaskExecutor,
hostname_localhost=True,
)
with self.assertRaises(ValueError):
execute_task_dict(task_dict={}, meta_future_lst=meta_future_lst)
Expand All @@ -78,15 +81,15 @@ def test_executor_broker(self):
f = Future()
q.put({"fn": calc, "args": (1,), "kwargs": {}, "future": f})
q.put({"shutdown": True, "wait": True})
executor_broker(future_queue=q, max_workers=1, executor_class=PyMPISingleTaskExecutor)
executor_broker(future_queue=q, max_workers=1, executor_class=PyMPISingleTaskExecutor, hostname_localhost=True)
self.assertTrue(f.done())
self.assertEqual(f.result(), 1)
q.join()


class TestMetaExecutor(unittest.TestCase):
def test_meta_executor_serial(self):
with PyMPIExecutor(max_workers=2) as exe:
with PyMPIExecutor(max_workers=2, hostname_localhost=True) as exe:
fs_1 = exe.submit(calc, 1)
fs_2 = exe.submit(calc, 2)
self.assertEqual(fs_1.result(), 1)
Expand All @@ -95,7 +98,7 @@ def test_meta_executor_serial(self):
self.assertTrue(fs_2.done())

def test_meta_executor_single(self):
with PyMPIExecutor(max_workers=1) as exe:
with PyMPIExecutor(max_workers=1, hostname_localhost=True) as exe:
fs_1 = exe.submit(calc, 1)
fs_2 = exe.submit(calc, 2)
self.assertEqual(fs_1.result(), 1)
Expand All @@ -104,13 +107,13 @@ def test_meta_executor_single(self):
self.assertTrue(fs_2.done())

def test_meta_executor_parallel(self):
with PyMPIExecutor(max_workers=1, cores_per_worker=2) as exe:
with PyMPIExecutor(max_workers=1, cores_per_worker=2, hostname_localhost=True) as exe:
fs_1 = exe.submit(mpi_funct, 1)
self.assertEqual(fs_1.result(), [(1, 2, 0), (1, 2, 1)])
self.assertTrue(fs_1.done())

def test_errors(self):
with self.assertRaises(TypeError):
PyMPIExecutor(max_workers=1, cores_per_worker=1, threads_per_core=2)
PyMPIExecutor(max_workers=1, cores_per_worker=1, threads_per_core=2, hostname_localhost=True)
with self.assertRaises(TypeError):
PyMPIExecutor(max_workers=1, cores_per_worker=1, gpus_per_worker=1)
PyMPIExecutor(max_workers=1, cores_per_worker=1, gpus_per_worker=1, hostname_localhost=True)
Loading