Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions executorlib/__init__.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
from executorlib._version import get_versions as _get_versions
from executorlib.interfaces.flux import (
from executorlib.executor.flux import (
FluxClusterExecutor,
FluxJobExecutor,
)
from executorlib.interfaces.single import SingleNodeExecutor
from executorlib.interfaces.slurm import (
from executorlib.executor.single import SingleNodeExecutor
from executorlib.executor.slurm import (
SlurmClusterExecutor,
SlurmJobExecutor,
)
Expand Down
5 changes: 4 additions & 1 deletion executorlib/backend/cache_parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@

import cloudpickle

from executorlib.cache.backend import backend_load_file, backend_write_file
from executorlib.task_scheduler.file.backend import (
backend_load_file,
backend_write_file,
)


def main() -> None:
Expand Down
2 changes: 1 addition & 1 deletion executorlib/backend/cache_serial.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import sys

from executorlib.cache.backend import backend_execute_task_in_file
from executorlib.task_scheduler.file.backend import backend_execute_task_in_file

if __name__ == "__main__":
backend_execute_task_in_file(file_name=sys.argv[1])
File renamed without changes.
118 changes: 118 additions & 0 deletions executorlib/executor/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
import queue
from concurrent.futures import (
Executor as FutureExecutor,
)
from concurrent.futures import (
Future,
)
from typing import Callable, Optional

from executorlib.task_scheduler.base import TaskSchedulerBase


class ExecutorBase(FutureExecutor):
"""
Interface class for the executor.

Args:
executor (TaskSchedulerBase): internal executor
"""

def __init__(self, executor: TaskSchedulerBase):
self._task_scheduler = executor

@property
def max_workers(self) -> Optional[int]:
return self._task_scheduler.max_workers

@max_workers.setter
def max_workers(self, max_workers: int):
self._task_scheduler.max_workers = max_workers

@property
def info(self) -> Optional[dict]:
"""
Get the information about the executor.

Returns:
Optional[dict]: Information about the executor.
"""
return self._task_scheduler.info

@property
def future_queue(self) -> Optional[queue.Queue]:
"""
Get the future queue.

Returns:
queue.Queue: The future queue.
"""
return self._task_scheduler.future_queue

Check warning on line 50 in executorlib/executor/base.py

View check run for this annotation

Codecov / codecov/patch

executorlib/executor/base.py#L50

Added line #L50 was not covered by tests

def submit( # type: ignore
self,
fn: Callable,
/,
*args,
resource_dict: Optional[dict] = None,
**kwargs,
) -> Future:
"""
Submits a callable to be executed with the given arguments.

Schedules the callable to be executed as fn(*args, **kwargs) and returns
a Future instance representing the execution of the callable.

Args:
fn (callable): function to submit for execution
args: arguments for the submitted function
kwargs: keyword arguments for the submitted function
resource_dict (dict): resource dictionary, which defines the resources used for the execution of the
function. Example resource dictionary: {
cores: 1,
threads_per_core: 1,
gpus_per_worker: 0,
oversubscribe: False,
cwd: None,
executor: None,
hostname_localhost: False,
}

Returns:
Future: A Future representing the given call.
"""
return self._task_scheduler.submit(
*([fn] + list(args)), resource_dict=resource_dict, **kwargs
)

def shutdown(self, wait: bool = True, *, cancel_futures: bool = False):
"""
Clean-up the resources associated with the Executor.

It is safe to call this method several times. Otherwise, no other
methods can be called after this one.

Args:
wait (bool): If True then shutdown will not return until all running
futures have finished executing and the resources used by the
parallel_executors have been reclaimed.
cancel_futures (bool): If True then shutdown will cancel all pending
futures. Futures that are completed or running will not be
cancelled.
"""
self._task_scheduler.shutdown(wait=wait, cancel_futures=cancel_futures)

def __len__(self) -> int:
"""
Get the length of the executor.

Returns:
int: The length of the executor.
"""
return len(self._task_scheduler)

def __exit__(self, *args, **kwargs) -> None:
"""
Exit method called when exiting the context manager.
"""
self._task_scheduler.__exit__(*args, **kwargs)
Comment on lines +102 to +118
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Missing __enter__ breaks context-manager usage

ExecutorBase implements __exit__ but not __enter__.
Any subclass used with a with-statement (see the doctstrings for the Slurm executors) will fail.

Add a trivial pass-through implementation:

+    def __enter__(self):
+        # Delegate to the underlying task-scheduler’s context behaviour if present
+        if hasattr(self._task_scheduler, "__enter__"):
+            self._task_scheduler.__enter__()
+        return self
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
"""
self._task_scheduler.shutdown(wait=wait, cancel_futures=cancel_futures)
def __len__(self) -> int:
"""
Get the length of the executor.
Returns:
int: The length of the executor.
"""
return len(self._task_scheduler)
def __exit__(self, *args, **kwargs) -> None:
"""
Exit method called when exiting the context manager.
"""
self._task_scheduler.__exit__(*args, **kwargs)
def __len__(self) -> int:
"""
Get the length of the executor.
Returns:
int: The length of the executor.
"""
return len(self._task_scheduler)
def __enter__(self):
# Delegate to the underlying task-scheduler’s context behaviour if present
if hasattr(self._task_scheduler, "__enter__"):
self._task_scheduler.__enter__()
return self
def __exit__(self, *args, **kwargs) -> None:
"""
Exit method called when exiting the context manager.
"""
self._task_scheduler.__exit__(*args, **kwargs)

34 changes: 19 additions & 15 deletions executorlib/interfaces/flux.py → executorlib/executor/flux.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
import contextlib
from typing import Callable, Optional, Union

from executorlib.base.executor import ExecutorInterface
from executorlib.interactive.blockallocation import BlockAllocationExecutor
from executorlib.interactive.dependency import DependencyExecutor
from executorlib.interactive.onetoone import OneTaskPerProcessExecutor
from executorlib.executor.base import ExecutorBase
from executorlib.standalone.inputcheck import (
check_command_line_argument_lst,
check_init_function,
Expand All @@ -14,15 +11,20 @@
check_refresh_rate,
validate_number_of_cores,
)
from executorlib.task_scheduler.interactive.blockallocation import (
BlockAllocationTaskScheduler,
)
from executorlib.task_scheduler.interactive.dependency import DependencyTaskScheduler
from executorlib.task_scheduler.interactive.onetoone import OneProcessTaskScheduler

with contextlib.suppress(ImportError):
from executorlib.interactive.fluxspawner import (
from executorlib.task_scheduler.interactive.fluxspawner import (
FluxPythonSpawner,
validate_max_workers,
)


class FluxJobExecutor(ExecutorInterface):
class FluxJobExecutor(ExecutorBase):
"""
The executorlib.Executor leverages either the message passing interface (MPI), the SLURM workload manager or
preferable the flux framework for distributing python functions within a given resource allocation. In contrast to
Expand Down Expand Up @@ -70,7 +72,7 @@ class FluxJobExecutor(ExecutorInterface):
Examples:
```
>>> import numpy as np
>>> from executorlib.interfaces.flux import FluxJobExecutor
>>> from executorlib.executor.flux import FluxJobExecutor
>>>
>>> def calc(i, j, k):
>>> from mpi4py import MPI
Expand Down Expand Up @@ -167,7 +169,7 @@ def __init__(
)
if not disable_dependencies:
super().__init__(
executor=DependencyExecutor(
executor=DependencyTaskScheduler(
executor=create_flux_executor(
max_workers=max_workers,
cache_directory=cache_directory,
Expand Down Expand Up @@ -207,7 +209,7 @@ def __init__(
)


class FluxClusterExecutor(ExecutorInterface):
class FluxClusterExecutor(ExecutorBase):
"""
The executorlib.Executor leverages either the message passing interface (MPI), the SLURM workload manager or
preferable the flux framework for distributing python functions within a given resource allocation. In contrast to
Expand Down Expand Up @@ -251,7 +253,7 @@ class FluxClusterExecutor(ExecutorInterface):
Examples:
```
>>> import numpy as np
>>> from executorlib.interfaces.flux import FluxClusterExecutor
>>> from executorlib.executor.flux import FluxClusterExecutor
>>>
>>> def calc(i, j, k):
>>> from mpi4py import MPI
Expand Down Expand Up @@ -341,7 +343,9 @@ def __init__(
{k: v for k, v in default_resource_dict.items() if k not in resource_dict}
)
if not plot_dependency_graph:
from executorlib.cache.executor import create_file_executor
from executorlib.task_scheduler.file.task_scheduler import (
create_file_executor,
)

super().__init__(
executor=create_file_executor(
Expand All @@ -363,7 +367,7 @@ def __init__(
)
else:
super().__init__(
executor=DependencyExecutor(
executor=DependencyTaskScheduler(
executor=create_flux_executor(
max_workers=max_workers,
cache_directory=cache_directory,
Expand Down Expand Up @@ -397,7 +401,7 @@ def create_flux_executor(
hostname_localhost: Optional[bool] = None,
block_allocation: bool = False,
init_function: Optional[Callable] = None,
) -> Union[OneTaskPerProcessExecutor, BlockAllocationExecutor]:
) -> Union[OneProcessTaskScheduler, BlockAllocationTaskScheduler]:
"""
Create a flux executor

Expand Down Expand Up @@ -468,13 +472,13 @@ def create_flux_executor(
cores=cores_per_worker,
threads_per_core=resource_dict.get("threads_per_core", 1),
)
return BlockAllocationExecutor(
return BlockAllocationTaskScheduler(
max_workers=max_workers,
executor_kwargs=resource_dict,
spawner=FluxPythonSpawner,
)
else:
return OneTaskPerProcessExecutor(
return OneProcessTaskScheduler(
max_cores=max_cores,
max_workers=max_workers,
executor_kwargs=resource_dict,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
from typing import Callable, Optional, Union

from executorlib.base.executor import ExecutorInterface
from executorlib.interactive.blockallocation import BlockAllocationExecutor
from executorlib.interactive.dependency import DependencyExecutor
from executorlib.interactive.onetoone import OneTaskPerProcessExecutor
from executorlib.executor.base import ExecutorBase
from executorlib.standalone.inputcheck import (
check_command_line_argument_lst,
check_gpus_per_worker,
Expand All @@ -13,9 +10,14 @@
validate_number_of_cores,
)
from executorlib.standalone.interactive.spawner import MpiExecSpawner
from executorlib.task_scheduler.interactive.blockallocation import (
BlockAllocationTaskScheduler,
)
from executorlib.task_scheduler.interactive.dependency import DependencyTaskScheduler
from executorlib.task_scheduler.interactive.onetoone import OneProcessTaskScheduler


class SingleNodeExecutor(ExecutorInterface):
class SingleNodeExecutor(ExecutorBase):
"""
The executorlib.Executor leverages either the message passing interface (MPI), the SLURM workload manager or
preferable the flux framework for distributing python functions within a given resource allocation. In contrast to
Expand Down Expand Up @@ -58,7 +60,7 @@ class SingleNodeExecutor(ExecutorInterface):
Examples:
```
>>> import numpy as np
>>> from executorlib.interfaces.single import SingleNodeExecutor
>>> from executorlib.executor.single import SingleNodeExecutor
>>>
>>> def calc(i, j, k):
>>> from mpi4py import MPI
Expand Down Expand Up @@ -147,7 +149,7 @@ def __init__(
)
if not disable_dependencies:
super().__init__(
executor=DependencyExecutor(
executor=DependencyTaskScheduler(
executor=create_single_node_executor(
max_workers=max_workers,
cache_directory=cache_directory,
Expand Down Expand Up @@ -187,7 +189,7 @@ def create_single_node_executor(
hostname_localhost: Optional[bool] = None,
block_allocation: bool = False,
init_function: Optional[Callable] = None,
) -> Union[OneTaskPerProcessExecutor, BlockAllocationExecutor]:
) -> Union[OneProcessTaskScheduler, BlockAllocationTaskScheduler]:
"""
Create a single node executor

Expand Down Expand Up @@ -241,7 +243,7 @@ def create_single_node_executor(
del resource_dict["slurm_cmd_args"]
if block_allocation:
resource_dict["init_function"] = init_function
return BlockAllocationExecutor(
return BlockAllocationTaskScheduler(
max_workers=validate_number_of_cores(
max_cores=max_cores,
max_workers=max_workers,
Expand All @@ -252,7 +254,7 @@ def create_single_node_executor(
spawner=MpiExecSpawner,
)
else:
return OneTaskPerProcessExecutor(
return OneProcessTaskScheduler(
max_cores=max_cores,
max_workers=max_workers,
executor_kwargs=resource_dict,
Expand Down
Loading
Loading