Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 20 additions & 17 deletions pympipool/scheduler/__init__.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
import os
import shutil
from typing import Optional
from pympipool.scheduler.local import (
PyLocalExecutor,
PyLocalStepExecutor,
from pympipool.scheduler.universal import (
UniversalExecutor,
UniversalStepExecutor,
)
from pympipool.scheduler.slurm import (
PySlurmExecutor,
PySlurmStepExecutor,
from pympipool.scheduler.interface import (
MpiExecInterface,
SLURM_COMMAND,
SrunInterface,
)
from pympipool.shared.interface import SLURM_COMMAND
from pympipool.shared.inputcheck import (
check_command_line_argument_lst,
check_gpus_per_worker,
Expand All @@ -23,10 +23,7 @@
)

try: # The PyFluxExecutor requires flux-core to be installed.
from pympipool.scheduler.flux import (
PyFluxExecutor,
PyFluxStepExecutor,
)
from pympipool.scheduler.flux import FluxPythonInterface

flux_installed = "FLUX_URI" in os.environ
except ImportError:
Expand Down Expand Up @@ -111,14 +108,16 @@ def create_executor(
executor_kwargs["pmi"] = pmi
if block_allocation:
executor_kwargs["init_function"] = init_function
return PyFluxExecutor(
return UniversalExecutor(
max_workers=int(max_cores / cores_per_worker),
executor_kwargs=executor_kwargs,
interface_class=FluxPythonInterface,
)
else:
return PyFluxStepExecutor(
return UniversalStepExecutor(
max_cores=max_cores,
executor_kwargs=executor_kwargs,
interface_class=FluxPythonInterface,
)
elif backend == "slurm":
check_executor(executor=executor)
Expand All @@ -128,14 +127,16 @@ def create_executor(
executor_kwargs["oversubscribe"] = oversubscribe
if block_allocation:
executor_kwargs["init_function"] = init_function
return PySlurmExecutor(
return UniversalExecutor(
max_workers=int(max_cores / cores_per_worker),
executor_kwargs=executor_kwargs,
interface_class=SrunInterface,
)
else:
return PySlurmStepExecutor(
return UniversalStepExecutor(
max_cores=max_cores,
executor_kwargs=executor_kwargs,
interface_class=SrunInterface,
)
else: # backend="local"
check_threads_per_core(threads_per_core=threads_per_core)
Expand All @@ -147,12 +148,14 @@ def create_executor(
executor_kwargs["oversubscribe"] = oversubscribe
if block_allocation:
executor_kwargs["init_function"] = init_function
return PyLocalExecutor(
return UniversalExecutor(
max_workers=int(max_cores / cores_per_worker),
executor_kwargs=executor_kwargs,
interface_class=MpiExecInterface,
)
else:
return PyLocalStepExecutor(
return UniversalStepExecutor(
max_cores=max_cores,
executor_kwargs=executor_kwargs,
interface_class=MpiExecInterface,
)
107 changes: 1 addition & 106 deletions pympipool/scheduler/flux.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,112 +3,7 @@

import flux.job

from pympipool.shared.executorbase import (
execute_parallel_tasks,
execute_separate_tasks,
ExecutorBroker,
ExecutorSteps,
)
from pympipool.shared.interface import BaseInterface
from pympipool.shared.thread import RaisingThread


class PyFluxExecutor(ExecutorBroker):
"""
The pympipool.flux.PyFluxExecutor leverages the flux framework to distribute python tasks within a queuing system
allocation. In analogy to the pympipool.slurm.PySlurmExecutur it provides the option to specify the number of
threads per worker as well as the number of GPUs per worker in addition to specifying the number of cores per
worker.

Args:
max_workers (int): defines the number workers which can execute functions in parallel
executor_kwargs (dict): keyword arguments for the executor

Examples:

>>> import numpy as np
>>> from pympipool.scheduler.flux import PyFluxExecutor
>>>
>>> def calc(i, j, k):
>>> from mpi4py import MPI
>>> size = MPI.COMM_WORLD.Get_size()
>>> rank = MPI.COMM_WORLD.Get_rank()
>>> return np.array([i, j, k]), size, rank
>>>
>>> def init_k():
>>> return {"k": 3}
>>>
>>> with PyFluxExecutor(max_workers=2, executor_kwargs={"init_function": init_k}) as p:
>>> fs = p.submit(calc, 2, j=4)
>>> print(fs.result())
[(array([2, 4, 3]), 2, 0), (array([2, 4, 3]), 2, 1)]

"""

def __init__(
self,
max_workers: int = 1,
executor_kwargs: dict = {},
):
super().__init__()
executor_kwargs["future_queue"] = self._future_queue
executor_kwargs["interface_class"] = FluxPythonInterface
self._set_process(
process=[
RaisingThread(
target=execute_parallel_tasks,
kwargs=executor_kwargs,
)
for _ in range(max_workers)
],
)


class PyFluxStepExecutor(ExecutorSteps):
"""
The pympipool.flux.PyFluxStepExecutor leverages the flux framework to distribute python tasks within a queuing
system allocation. In analogy to the pympipool.slurm.PySlurmExecutur it provides the option to specify the number
of threads per worker as well as the number of GPUs per worker in addition to specifying the number of cores per
worker.

Args:
max_cores (int): defines the number workers which can execute functions in parallel
executor_kwargs (dict): keyword arguments for the executor

Examples:

>>> import numpy as np
>>> from pympipool.scheduler.flux import PyFluxStepExecutor
>>>
>>> def calc(i, j, k):
>>> from mpi4py import MPI
>>> size = MPI.COMM_WORLD.Get_size()
>>> rank = MPI.COMM_WORLD.Get_rank()
>>> return np.array([i, j, k]), size, rank
>>>
>>> with PyFluxStepExecutor(max_cores=2) as p:
>>> fs = p.submit(calc, 2, j=4, k=3, resource_dict={"cores": 2})
>>> print(fs.result())

[(array([2, 4, 3]), 2, 0), (array([2, 4, 3]), 2, 1)]

"""

def __init__(
self,
max_cores: int = 1,
executor_kwargs: dict = {},
):
super().__init__()
executor_kwargs["future_queue"] = self._future_queue
executor_kwargs["interface_class"] = FluxPythonInterface
executor_kwargs["max_cores"] = max_cores
self._set_process(
RaisingThread(
target=execute_separate_tasks,
kwargs=executor_kwargs,
)
)
from pympipool.scheduler.interface import BaseInterface


class FluxPythonInterface(BaseInterface):
Expand Down
File renamed without changes.
104 changes: 0 additions & 104 deletions pympipool/scheduler/local.py

This file was deleted.

Loading