Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 7 additions & 32 deletions pympipool/flux/fluxbroker.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
from pympipool.shared.executorbase import (
ExecutorBase,
executor_broker,
get_executor_dict,
)
from pympipool.shared.thread import RaisingThread
from pympipool.flux.fluxtask import PyFluxSingleTaskExecutor
Expand Down Expand Up @@ -33,44 +32,20 @@ def __init__(
):
super().__init__()
self._process = RaisingThread(
target=_flux_executor_broker,
target=executor_broker,
kwargs={
# Broker Arguments
"future_queue": self._future_queue,
"max_workers": max_workers,
"cores_per_worker": cores_per_worker,
"sleep_interval": sleep_interval,
"executor_class": PyFluxSingleTaskExecutor,
# Executor Arguments
"cores": cores_per_worker,
"threads_per_core": threads_per_core,
"gpus_per_worker": gpus_per_worker,
"gpus_per_task": int(gpus_per_worker / cores_per_worker),
"init_function": init_function,
"cwd": cwd,
"sleep_interval": sleep_interval,
"executor": executor,
},
)
self._process.start()


def _flux_executor_broker(
future_queue,
max_workers,
cores_per_worker=1,
threads_per_core=1,
gpus_per_worker=0,
init_function=None,
cwd=None,
sleep_interval=0.1,
executor=None,
):
executor_broker(
future_queue=future_queue,
meta_future_lst=get_executor_dict(
max_workers=max_workers,
executor_class=PyFluxSingleTaskExecutor,
cores=cores_per_worker,
threads_per_core=threads_per_core,
gpus_per_task=int(gpus_per_worker / cores_per_worker),
init_function=init_function,
cwd=cwd,
executor=executor,
),
sleep_interval=sleep_interval,
)
46 changes: 6 additions & 40 deletions pympipool/flux/fluxtask.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,9 @@
from pympipool.shared.executorbase import (
cloudpickle_register,
ExecutorBase,
execute_parallel_tasks_loop,
get_backend_path,
execute_parallel_tasks,
)
from pympipool.shared.interface import BaseInterface
from pympipool.shared.communication import interface_bootup
from pympipool.shared.thread import RaisingThread


Expand Down Expand Up @@ -61,12 +59,15 @@ def __init__(
):
super().__init__()
self._process = RaisingThread(
target=_flux_execute_parallel_tasks,
target=execute_parallel_tasks,
kwargs={
# Executor Arguments
"future_queue": self._future_queue,
"cores": cores,
"interface_class": FluxPythonInterface,
# Interface Arguments
"threads_per_core": threads_per_core,
"gpus_per_task": gpus_per_task,
"gpus_per_core": gpus_per_task,
"cwd": cwd,
"executor": executor,
},
Expand Down Expand Up @@ -129,38 +130,3 @@ def shutdown(self, wait=True):

def poll(self):
return self._future is not None and not self._future.done()


def _flux_execute_parallel_tasks(
future_queue,
cores,
threads_per_core=1,
gpus_per_task=0,
cwd=None,
executor=None,
):
"""
Execute a single tasks in parallel using the message passing interface (MPI).

Args:
future_queue (queue.Queue): task queue of dictionary objects which are submitted to the parallel process
cores (int): defines the total number of MPI ranks to use
threads_per_core (int): number of OpenMP threads to be used for each function call
gpus_per_task (int): number of GPUs per MPI rank - defaults to 0
cwd (str/None): current working directory where the parallel python task is executed
executor (flux.job.FluxExecutor/None): flux executor to submit tasks to - optional
"""
execute_parallel_tasks_loop(
interface=interface_bootup(
command_lst=get_backend_path(cores=cores),
connections=FluxPythonInterface(
cwd=cwd,
cores=cores,
threads_per_core=threads_per_core,
gpus_per_core=gpus_per_task,
oversubscribe=False,
executor=executor,
),
),
future_queue=future_queue,
)
41 changes: 7 additions & 34 deletions pympipool/mpi/mpibroker.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
from pympipool.shared.executorbase import (
ExecutorBase,
executor_broker,
get_executor_dict,
)
from pympipool.shared.thread import RaisingThread
from pympipool.mpi.mpitask import PyMPISingleTaskExecutor
Expand Down Expand Up @@ -46,47 +45,21 @@ def __init__(
+ "to manage GPUs use the SLURM queuing system enable_slurm_backend=True ."
)
self._process = RaisingThread(
target=_mpi_executor_broker,
target=executor_broker,
kwargs={
# Broker Arguments
"future_queue": self._future_queue,
"max_workers": max_workers,
"cores_per_worker": cores_per_worker,
"sleep_interval": sleep_interval,
"executor_class": PyMPISingleTaskExecutor,
# Executor Arguments
"cores": cores_per_worker,
"threads_per_core": threads_per_core,
"gpus_per_worker": gpus_per_worker,
"gpus_per_task": int(gpus_per_worker / cores_per_worker),
"oversubscribe": oversubscribe,
"init_function": init_function,
"cwd": cwd,
"sleep_interval": sleep_interval,
"enable_slurm_backend": enable_slurm_backend,
},
)
self._process.start()


def _mpi_executor_broker(
future_queue,
max_workers,
cores_per_worker=1,
threads_per_core=1,
gpus_per_worker=0,
oversubscribe=False,
init_function=None,
cwd=None,
sleep_interval=0.1,
enable_slurm_backend=False,
):
executor_broker(
future_queue=future_queue,
meta_future_lst=get_executor_dict(
max_workers=max_workers,
executor_class=PyMPISingleTaskExecutor,
cores=cores_per_worker,
threads_per_core=threads_per_core,
gpus_per_task=int(gpus_per_worker / cores_per_worker),
oversubscribe=oversubscribe,
init_function=init_function,
cwd=cwd,
enable_slurm_backend=enable_slurm_backend,
),
sleep_interval=sleep_interval,
)
54 changes: 9 additions & 45 deletions pympipool/mpi/mpitask.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
from pympipool.shared.executorbase import (
cloudpickle_register,
execute_parallel_tasks_loop,
execute_parallel_tasks,
ExecutorBase,
get_backend_path,
)
from pympipool.shared.thread import RaisingThread
from pympipool.shared.communication import interface_bootup
from pympipool.shared.interface import MpiExecInterface, SlurmSubprocessInterface


Expand Down Expand Up @@ -59,12 +57,15 @@ def __init__(
):
super().__init__()
self._process = RaisingThread(
target=_mpi_execute_parallel_tasks,
target=execute_parallel_tasks,
kwargs={
# Executor Arguments
"future_queue": self._future_queue,
"cores": cores,
"interface_class": get_interface,
# Interface Arguments
"threads_per_core": threads_per_core,
"gpus_per_task": gpus_per_task,
"gpus_per_core": gpus_per_task,
"cwd": cwd,
"oversubscribe": oversubscribe,
"enable_slurm_backend": enable_slurm_backend,
Expand All @@ -78,47 +79,10 @@ def __init__(
cloudpickle_register(ind=3)


def _mpi_execute_parallel_tasks(
future_queue,
cores,
threads_per_core=1,
gpus_per_task=0,
cwd=None,
oversubscribe=False,
enable_slurm_backend=False,
):
"""
Execute a single tasks in parallel using the message passing interface (MPI).

Args:
future_queue (queue.Queue): task queue of dictionary objects which are submitted to the parallel process
cores (int): defines the total number of MPI ranks to use
threads_per_core (int): number of OpenMP threads to be used for each function call
gpus_per_task (int): number of GPUs per MPI rank - defaults to 0
cwd (str/None): current working directory where the parallel python task is executed
oversubscribe (bool): enable of disable the oversubscribe feature of OpenMPI - defaults to False
enable_slurm_backend (bool): enable the SLURM queueing system as backend - defaults to False
"""
execute_parallel_tasks_loop(
interface=interface_bootup(
command_lst=get_backend_path(cores=cores),
connections=get_interface(
cores=cores,
threads_per_core=threads_per_core,
gpus_per_task=gpus_per_task,
cwd=cwd,
oversubscribe=oversubscribe,
enable_slurm_backend=enable_slurm_backend,
),
),
future_queue=future_queue,
)


def get_interface(
cores=1,
threads_per_core=1,
gpus_per_task=0,
gpus_per_core=0,
cwd=None,
oversubscribe=False,
enable_slurm_backend=False,
Expand All @@ -128,14 +92,14 @@ def get_interface(
cwd=cwd,
cores=cores,
threads_per_core=threads_per_core,
gpus_per_core=gpus_per_task,
gpus_per_core=gpus_per_core,
oversubscribe=oversubscribe,
)
else:
return SlurmSubprocessInterface(
cwd=cwd,
cores=cores,
threads_per_core=threads_per_core,
gpus_per_core=gpus_per_task,
gpus_per_core=gpus_per_core,
oversubscribe=oversubscribe,
)
50 changes: 41 additions & 9 deletions pympipool/shared/executorbase.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@

import cloudpickle

from pympipool.shared.communication import interface_bootup


class ExecutorBase(FutureExecutor):
def __init__(self):
Expand Down Expand Up @@ -97,6 +99,29 @@ def cloudpickle_register(ind=2):
pass


def execute_parallel_tasks(
future_queue,
cores,
interface_class,
**kwargs,
):
"""
Execute a single tasks in parallel using the message passing interface (MPI).

Args:
future_queue (queue.Queue): task queue of dictionary objects which are submitted to the parallel process
cores (int): defines the total number of MPI ranks to use
interface_class:
"""
execute_parallel_tasks_loop(
interface=interface_bootup(
command_lst=_get_backend_path(cores=cores),
connections=interface_class(cores=cores, **kwargs),
),
future_queue=future_queue,
)


def execute_parallel_tasks_loop(interface, future_queue):
while True:
task_dict = future_queue.get()
Expand All @@ -123,9 +148,16 @@ def execute_parallel_tasks_loop(interface, future_queue):

def executor_broker(
future_queue,
meta_future_lst,
max_workers,
executor_class,
sleep_interval=0.1,
**kwargs,
):
meta_future_lst = _get_executor_dict(
max_workers=max_workers,
executor_class=executor_class,
**kwargs,
)
while True:
try:
task_dict = future_queue.get_nowait()
Expand Down Expand Up @@ -154,11 +186,7 @@ def execute_task_dict(task_dict, meta_future_lst):
raise ValueError("Unrecognized Task in task_dict: ", task_dict)


def _get_command_path(executable):
return os.path.abspath(os.path.join(__file__, "..", "..", "backend", executable))


def get_backend_path(cores):
def _get_backend_path(cores):
command_lst = [sys.executable]
if cores > 1:
command_lst += [_get_command_path(executable="mpiexec.py")]
Expand All @@ -167,11 +195,15 @@ def get_backend_path(cores):
return command_lst


def get_executor_dict(max_workers, executor_class, **kwargs):
return {get_future_done(): executor_class(**kwargs) for _ in range(max_workers)}
def _get_command_path(executable):
return os.path.abspath(os.path.join(__file__, "..", "..", "backend", executable))


def _get_executor_dict(max_workers, executor_class, **kwargs):
return {_get_future_done(): executor_class(**kwargs) for _ in range(max_workers)}


def get_future_done():
def _get_future_done():
f = Future()
f.set_result(True)
return f
Loading