Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 24 additions & 35 deletions pympipool/scheduler/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ def create_executor(
gpus_per_worker (int): number of GPUs per worker - defaults to 0
oversubscribe (bool): adds the `--oversubscribe` command line flag (OpenMPI and SLURM only) - default False
cwd (str/None): current working directory where the parallel python task is executed
executor (flux.job.FluxExecutor): Flux Python interface to submit the workers to flux
hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
context of an HPC cluster this essential to be able to communicate to an Executor
running on a different compute node within the same allocation. And in principle
Expand All @@ -94,56 +95,47 @@ def create_executor(
backend=backend, flux_installed=flux_installed, slurm_installed=slurm_installed
)
check_pmi(backend=backend, pmi=pmi)
executor_kwargs = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider using a more robust default for executor_kwargs to avoid potential issues with mutable default arguments.

- executor_kwargs = {
+ executor_kwargs = executor_kwargs or {}

Committable suggestion was skipped due low confidence.

"cores": cores_per_worker,
"hostname_localhost": hostname_localhost,
"cwd": cwd,
}
if backend == "flux":
check_oversubscribe(oversubscribe=oversubscribe)
check_command_line_argument_lst(
command_line_argument_lst=command_line_argument_lst
)
executor_kwargs["threads_per_core"] = threads_per_core
executor_kwargs["gpus_per_core"] = int(gpus_per_worker / cores_per_worker)
executor_kwargs["executor"] = executor
executor_kwargs["pmi"] = pmi
if block_allocation:
executor_kwargs["init_function"] = init_function
return PyFluxExecutor(
max_workers=int(max_cores / cores_per_worker),
cores_per_worker=cores_per_worker,
threads_per_core=threads_per_core,
gpus_per_worker=gpus_per_worker,
init_function=init_function,
cwd=cwd,
executor=executor,
hostname_localhost=hostname_localhost,
pmi=pmi,
executor_kwargs=executor_kwargs,
)
else:
return PyFluxStepExecutor(
max_cores=max_cores,
cores_per_worker=cores_per_worker,
threads_per_core=threads_per_core,
gpus_per_worker=gpus_per_worker,
cwd=cwd,
executor=executor,
hostname_localhost=hostname_localhost,
pmi=pmi,
executor_kwargs=executor_kwargs,
)
elif backend == "slurm":
check_executor(executor=executor)
executor_kwargs["threads_per_core"] = threads_per_core
executor_kwargs["gpus_per_core"] = int(gpus_per_worker / cores_per_worker)
executor_kwargs["command_line_argument_lst"] = command_line_argument_lst
executor_kwargs["oversubscribe"] = oversubscribe
if block_allocation:
executor_kwargs["init_function"] = init_function
return PySlurmExecutor(
max_workers=int(max_cores / cores_per_worker),
cores_per_worker=cores_per_worker,
threads_per_core=threads_per_core,
gpus_per_worker=gpus_per_worker,
oversubscribe=oversubscribe,
init_function=init_function,
cwd=cwd,
hostname_localhost=hostname_localhost,
executor_kwargs=executor_kwargs,
)
else:
return PySlurmStepExecutor(
max_cores=max_cores,
cores_per_worker=cores_per_worker,
threads_per_core=threads_per_core,
gpus_per_worker=gpus_per_worker,
oversubscribe=oversubscribe,
cwd=cwd,
hostname_localhost=hostname_localhost,
executor_kwargs=executor_kwargs,
)
else: # backend="local"
check_threads_per_core(threads_per_core=threads_per_core)
Expand All @@ -152,18 +144,15 @@ def create_executor(
command_line_argument_lst=command_line_argument_lst
)
check_executor(executor=executor)
executor_kwargs["oversubscribe"] = oversubscribe
if block_allocation:
executor_kwargs["init_function"] = init_function
return PyLocalExecutor(
max_workers=int(max_cores / cores_per_worker),
cores_per_worker=cores_per_worker,
init_function=init_function,
cwd=cwd,
hostname_localhost=hostname_localhost,
executor_kwargs=executor_kwargs,
)
else:
return PyLocalStepExecutor(
max_cores=max_cores,
cores_per_worker=cores_per_worker,
cwd=cwd,
hostname_localhost=hostname_localhost,
executor_kwargs=executor_kwargs,
)
83 changes: 12 additions & 71 deletions pympipool/scheduler/flux.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,7 @@ class PyFluxExecutor(ExecutorBroker):

Args:
max_workers (int): defines the number workers which can execute functions in parallel
cores_per_worker (int): number of MPI cores to be used for each function call
threads_per_core (int): number of OpenMP threads to be used for each function call
gpus_per_worker (int): number of GPUs per worker - defaults to 0
init_function (None): optional function to preset arguments for functions which are submitted later
cwd (str/None): current working directory where the parallel python task is executed
executor (flux.job.FluxExecutor): Flux Python interface to submit the workers to flux
pmi (str): PMI interface to use (OpenMPI v5 requires pmix) default is None
hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
context of an HPC cluster this essential to be able to communicate to an
Executor running on a different compute node within the same allocation. And
in principle any computer should be able to resolve that their own hostname
points to the same address as localhost. Still MacOS >= 12 seems to disable
this look up for security reasons. So on MacOS it is required to set this
option to true
executor_kwargs (dict): keyword arguments for the executor

Examples:

Expand All @@ -51,7 +38,7 @@ class PyFluxExecutor(ExecutorBroker):
>>> def init_k():
>>> return {"k": 3}
>>>
>>> with PyFluxExecutor(max_workers=2, init_function=init_k) as p:
>>> with PyFluxExecutor(max_workers=2, executor_kwargs={"init_function": init_k}) as p:
>>> fs = p.submit(calc, 2, j=4)
>>> print(fs.result())
[(array([2, 4, 3]), 2, 0), (array([2, 4, 3]), 2, 1)]
Expand All @@ -61,34 +48,16 @@ class PyFluxExecutor(ExecutorBroker):
def __init__(
self,
max_workers: int = 1,
cores_per_worker: int = 1,
threads_per_core: int = 1,
gpus_per_worker: int = 0,
init_function: Optional[callable] = None,
cwd: Optional[str] = None,
executor: Optional[flux.job.FluxExecutor] = None,
pmi: Optional[str] = None,
hostname_localhost: Optional[bool] = False,
executor_kwargs: dict = {},
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider using a more robust default for executor_kwargs to avoid potential issues with mutable default arguments.

- def __init__(self, max_workers: int = 1, executor_kwargs: dict = {}):
+ def __init__(self, max_workers: int = 1, executor_kwargs: dict = None):
+     if executor_kwargs is None:
+         executor_kwargs = {}

Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.

Suggested change
executor_kwargs: dict = {},
executor_kwargs: dict = None,
if executor_kwargs is None:
executor_kwargs = {}

):
super().__init__()
executor_kwargs["future_queue"] = self._future_queue
executor_kwargs["interface_class"] = FluxPythonInterface
self._set_process(
process=[
RaisingThread(
target=execute_parallel_tasks,
kwargs={
# Executor Arguments
"future_queue": self._future_queue,
"cores": cores_per_worker,
"interface_class": FluxPythonInterface,
"hostname_localhost": hostname_localhost,
"init_function": init_function,
# Interface Arguments
"threads_per_core": threads_per_core,
"gpus_per_core": int(gpus_per_worker / cores_per_worker),
"cwd": cwd,
"executor": executor,
"pmi": pmi,
},
kwargs=executor_kwargs,
)
for _ in range(max_workers)
],
Expand All @@ -104,19 +73,7 @@ class PyFluxStepExecutor(ExecutorSteps):

Args:
max_cores (int): defines the number workers which can execute functions in parallel
cores_per_worker (int): number of MPI cores to be used for each function call
threads_per_core (int): number of OpenMP threads to be used for each function call
gpus_per_worker (int): number of GPUs per worker - defaults to 0
cwd (str/None): current working directory where the parallel python task is executed
executor (flux.job.FluxExecutor): Flux Python interface to submit the workers to flux
pmi (str): PMI interface to use (OpenMPI v5 requires pmix) default is None
hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
context of an HPC cluster this essential to be able to communicate to an
Executor running on a different compute node within the same allocation. And
in principle any computer should be able to resolve that their own hostname
points to the same address as localhost. Still MacOS >= 12 seems to disable
this look up for security reasons. So on MacOS it is required to set this
option to true
executor_kwargs (dict): keyword arguments for the executor

Examples:

Expand All @@ -140,32 +97,16 @@ class PyFluxStepExecutor(ExecutorSteps):
def __init__(
self,
max_cores: int = 1,
cores_per_worker: int = 1,
threads_per_core: int = 1,
gpus_per_worker: int = 0,
cwd: Optional[str] = None,
executor: Optional[flux.job.FluxExecutor] = None,
pmi: Optional[str] = None,
hostname_localhost: Optional[bool] = False,
executor_kwargs: dict = {},
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ensure that executor_kwargs is safely initialized to avoid issues with mutable default arguments.

- def __init__(self, max_cores: int = 1, executor_kwargs: dict = {}):
+ def __init__(self, max_cores: int = 1, executor_kwargs: dict = None):
+     if executor_kwargs is None:
+         executor_kwargs = {}

Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.

Suggested change
executor_kwargs: dict = {},
def __init__(self, max_cores: int = 1, executor_kwargs: dict = None):
if executor_kwargs is None:
executor_kwargs = {}

):
super().__init__()
executor_kwargs["future_queue"] = self._future_queue
executor_kwargs["interface_class"] = FluxPythonInterface
executor_kwargs["max_cores"] = max_cores
self._set_process(
RaisingThread(
target=execute_separate_tasks,
kwargs={
# Executor Arguments
"future_queue": self._future_queue,
"cores": cores_per_worker,
"interface_class": FluxPythonInterface,
"max_cores": max_cores,
"hostname_localhost": hostname_localhost,
# Interface Arguments
"threads_per_core": threads_per_core,
"gpus_per_core": int(gpus_per_worker / cores_per_worker),
"cwd": cwd,
"executor": executor,
"pmi": pmi,
},
kwargs=executor_kwargs,
)
)

Expand Down
75 changes: 14 additions & 61 deletions pympipool/scheduler/local.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
from typing import Optional

from pympipool.shared.executorbase import (
execute_parallel_tasks,
execute_separate_tasks,
Expand All @@ -12,25 +10,15 @@

class PyLocalExecutor(ExecutorBroker):
"""
The pympipool.mpi.PyMPIExecutor leverages the message passing interface MPI to distribute python tasks on a
The pympipool.mpi.PyLocalExecutor leverages the message passing interface MPI to distribute python tasks on a
workstation. In contrast to the mpi4py.futures.MPIPoolExecutor the pympipool.mpi.PyLocalExecutor can be executed
in a serial python process and does not require the python script to be executed with MPI. Consequently, it is
primarily an abstraction of its functionality to improve the usability in particular when used in combination with
Jupyter notebooks.

Args:
max_workers (int): defines the number workers which can execute functions in parallel
cores_per_worker (int): number of MPI cores to be used for each function call
oversubscribe (bool): adds the `--oversubscribe` command line flag (OpenMPI only) - default False
init_function (None): optional function to preset arguments for functions which are submitted later
cwd (str/None): current working directory where the parallel python task is executed
hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
context of an HPC cluster this essential to be able to communicate to an
Executor running on a different compute node within the same allocation. And
in principle any computer should be able to resolve that their own hostname
points to the same address as localhost. Still MacOS >= 12 seems to disable
this look up for security reasons. So on MacOS it is required to set this
option to true
executor_kwargs (dict): keyword arguments for the executor

Examples:

Expand All @@ -46,38 +34,22 @@ class PyLocalExecutor(ExecutorBroker):
>>> def init_k():
>>> return {"k": 3}
>>>
>>> with PyLocalExecutor(max_workers=2, init_function=init_k) as p:
>>> with PyLocalExecutor(max_workers=2, executor_kwargs={"init_function": init_k}) as p:
>>> fs = p.submit(calc, 2, j=4)
>>> print(fs.result())
[(array([2, 4, 3]), 2, 0), (array([2, 4, 3]), 2, 1)]

"""

def __init__(
self,
max_workers: int = 1,
cores_per_worker: int = 1,
oversubscribe: bool = False,
init_function: Optional[callable] = None,
cwd: Optional[str] = None,
hostname_localhost: bool = False,
):
def __init__(self, max_workers: int = 1, executor_kwargs: dict = {}):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider using a more robust default for executor_kwargs to avoid potential issues with mutable default arguments.

- def __init__(self, max_workers: int = 1, executor_kwargs: dict = {}):
+ def __init__(self, max_workers: int = 1, executor_kwargs: dict = None):
+     if executor_kwargs is None:
+         executor_kwargs = {}

Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.

Suggested change
def __init__(self, max_workers: int = 1, executor_kwargs: dict = {}):
def __init__(self, max_workers: int = 1, executor_kwargs: dict = None):
if executor_kwargs is None:
executor_kwargs = {}

super().__init__()
executor_kwargs["future_queue"] = self._future_queue
executor_kwargs["interface_class"] = MpiExecInterface
self._set_process(
process=[
RaisingThread(
target=execute_parallel_tasks,
kwargs={
# Executor Arguments
"future_queue": self._future_queue,
"cores": cores_per_worker,
"interface_class": MpiExecInterface,
"hostname_localhost": hostname_localhost,
"init_function": init_function,
# Interface Arguments
"cwd": cwd,
"oversubscribe": oversubscribe,
},
kwargs=executor_kwargs,
)
for _ in range(max_workers)
],
Expand All @@ -94,16 +66,7 @@ class PyLocalStepExecutor(ExecutorSteps):

Args:
max_cores (int): defines the number cores which can be used in parallel
cores_per_worker (int): number of MPI cores to be used for each function call
oversubscribe (bool): adds the `--oversubscribe` command line flag (OpenMPI only) - default False
cwd (str/None): current working directory where the parallel python task is executed
hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
context of an HPC cluster this essential to be able to communicate to an
Executor running on a different compute node within the same allocation. And
in principle any computer should be able to resolve that their own hostname
points to the same address as localhost. Still MacOS >= 12 seems to disable
this look up for security reasons. So on MacOS it is required to set this
option to true
executor_kwargs (dict): keyword arguments for the executor

Examples:

Expand All @@ -116,7 +79,7 @@ class PyLocalStepExecutor(ExecutorSteps):
>>> rank = MPI.COMM_WORLD.Get_rank()
>>> return np.array([i, j, k]), size, rank
>>>
>>> with PyMPIStepExecutor(max_cores=2) as p:
>>> with PyLocalStepExecutor(max_cores=2) as p:
>>> fs = p.submit(calc, 2, j=4, k=3, resource_dict={"cores": 2})
>>> print(fs.result())

Expand All @@ -127,25 +90,15 @@ class PyLocalStepExecutor(ExecutorSteps):
def __init__(
self,
max_cores: int = 1,
cores_per_worker: int = 1,
oversubscribe: bool = False,
cwd: Optional[str] = None,
hostname_localhost: bool = False,
executor_kwargs: dict = {},
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ensure that executor_kwargs is safely initialized to avoid issues with mutable default arguments.

- def __init__(self, max_cores: int = 1, executor_kwargs: dict = {}):
+ def __init__(self, max_cores: int = 1, executor_kwargs: dict = None):
+     if executor_kwargs is None:
+         executor_kwargs = {}

Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.

Suggested change
executor_kwargs: dict = {},
executor_kwargs: dict = None):
if executor_kwargs is None:
executor_kwargs = {}

):
super().__init__()
executor_kwargs["future_queue"] = self._future_queue
executor_kwargs["interface_class"] = MpiExecInterface
executor_kwargs["max_cores"] = max_cores
self._set_process(
RaisingThread(
target=execute_separate_tasks,
kwargs={
# Executor Arguments
"future_queue": self._future_queue,
"cores": cores_per_worker,
"interface_class": MpiExecInterface,
"max_cores": max_cores,
"hostname_localhost": hostname_localhost,
# Interface Arguments
"cwd": cwd,
"oversubscribe": oversubscribe,
},
kwargs=executor_kwargs,
)
)
Loading