Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 109 additions & 0 deletions executorlib/base/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
def submit( # type: ignore
self,
fn: Callable,
/,
*args,
resource_dict: Optional[dict] = None,
**kwargs,
Expand Down Expand Up @@ -178,3 +179,111 @@
"""
with contextlib.suppress(AttributeError, RuntimeError):
self.shutdown(wait=False)


class ExecutorInterface(FutureExecutor):
"""
Interface class for the executor.

Args:
executor (ExecutorBase): internal executor
"""

def __init__(self, executor: ExecutorBase):
self._task_scheduler = executor

@property
def max_workers(self) -> Optional[int]:
return self._task_scheduler.max_workers

@max_workers.setter
def max_workers(self, max_workers: int):
self._task_scheduler.max_workers = max_workers

@property
def info(self) -> Optional[dict]:
"""
Get the information about the executor.

Returns:
Optional[dict]: Information about the executor.
"""
return self._task_scheduler.info

@property
def future_queue(self) -> Optional[queue.Queue]:
"""
Get the future queue.

Returns:
queue.Queue: The future queue.
"""
return self._task_scheduler.future_queue

Check warning on line 221 in executorlib/base/executor.py

View check run for this annotation

Codecov / codecov/patch

executorlib/base/executor.py#L221

Added line #L221 was not covered by tests

def submit( # type: ignore
self,
fn: Callable,
/,
*args,
resource_dict: Optional[dict] = None,
**kwargs,
) -> Future:
"""
Submits a callable to be executed with the given arguments.

Schedules the callable to be executed as fn(*args, **kwargs) and returns
a Future instance representing the execution of the callable.

Args:
fn (callable): function to submit for execution
args: arguments for the submitted function
kwargs: keyword arguments for the submitted function
resource_dict (dict): resource dictionary, which defines the resources used for the execution of the
function. Example resource dictionary: {
cores: 1,
threads_per_core: 1,
gpus_per_worker: 0,
oversubscribe: False,
cwd: None,
executor: None,
hostname_localhost: False,
}

Returns:
Future: A Future representing the given call.
"""
return self._task_scheduler.submit(
*([fn] + list(args)), resource_dict=resource_dict, **kwargs
)

def shutdown(self, wait: bool = True, *, cancel_futures: bool = False):
"""
Clean-up the resources associated with the Executor.

It is safe to call this method several times. Otherwise, no other
methods can be called after this one.

Args:
wait (bool): If True then shutdown will not return until all running
futures have finished executing and the resources used by the
parallel_executors have been reclaimed.
cancel_futures (bool): If True then shutdown will cancel all pending
futures. Futures that are completed or running will not be
cancelled.
"""
self._task_scheduler.shutdown(wait=wait, cancel_futures=cancel_futures)

def __len__(self) -> int:
"""
Get the length of the executor.

Returns:
int: The length of the executor.
"""
return len(self._task_scheduler)

def __exit__(self, *args, **kwargs) -> None:
"""
Exit method called when exiting the context manager.
"""
self._task_scheduler.__exit__(*args, **kwargs)
146 changes: 58 additions & 88 deletions executorlib/interfaces/flux.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import contextlib
from typing import Callable, Optional, Union

from executorlib.base.executor import ExecutorInterface
from executorlib.interactive.blockallocation import BlockAllocationExecutor
from executorlib.interactive.dependency import DependencyExecutor
from executorlib.interactive.onetoone import OneTaskPerProcessExecutor
Expand All @@ -21,7 +22,7 @@
)


class FluxJobExecutor:
class FluxJobExecutor(ExecutorInterface):
"""
The executorlib.Executor leverages either the message passing interface (MPI), the SLURM workload manager or
preferable the flux framework for distributing python functions within a given resource allocation. In contrast to
Expand Down Expand Up @@ -104,27 +105,6 @@ def __init__(
refresh_rate: float = 0.01,
plot_dependency_graph: bool = False,
plot_dependency_graph_filename: Optional[str] = None,
):
# Use __new__() instead of __init__(). This function is only implemented to enable auto-completion.
pass

def __new__(
cls,
max_workers: Optional[int] = None,
cache_directory: Optional[str] = None,
max_cores: Optional[int] = None,
resource_dict: Optional[dict] = None,
flux_executor=None,
flux_executor_pmi_mode: Optional[str] = None,
flux_executor_nesting: bool = False,
flux_log_files: bool = False,
hostname_localhost: Optional[bool] = None,
block_allocation: bool = False,
init_function: Optional[Callable] = None,
disable_dependencies: bool = False,
refresh_rate: float = 0.01,
plot_dependency_graph: bool = False,
plot_dependency_graph_filename: Optional[str] = None,
):
"""
Instead of returning a executorlib.Executor object this function returns either a executorlib.mpi.PyMPIExecutor,
Expand Down Expand Up @@ -186,7 +166,31 @@ def __new__(
{k: v for k, v in default_resource_dict.items() if k not in resource_dict}
)
if not disable_dependencies:
return DependencyExecutor(
super().__init__(
executor=DependencyExecutor(
executor=create_flux_executor(
max_workers=max_workers,
cache_directory=cache_directory,
max_cores=max_cores,
resource_dict=resource_dict,
flux_executor=flux_executor,
flux_executor_pmi_mode=flux_executor_pmi_mode,
flux_executor_nesting=flux_executor_nesting,
flux_log_files=flux_log_files,
hostname_localhost=hostname_localhost,
block_allocation=block_allocation,
init_function=init_function,
),
max_cores=max_cores,
refresh_rate=refresh_rate,
plot_dependency_graph=plot_dependency_graph,
plot_dependency_graph_filename=plot_dependency_graph_filename,
)
)
Comment on lines +169 to +189
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Copy resource_dict to maintain functional purity

Same mutability concern as raised for SingleNodeExecutor. Copy the user-supplied mapping before modification to protect callers from hidden side-effects.

else:
check_plot_dependency_graph(plot_dependency_graph=plot_dependency_graph)
check_refresh_rate(refresh_rate=refresh_rate)
super().__init__(
executor=create_flux_executor(
max_workers=max_workers,
cache_directory=cache_directory,
Expand All @@ -199,31 +203,11 @@ def __new__(
hostname_localhost=hostname_localhost,
block_allocation=block_allocation,
init_function=init_function,
),
max_cores=max_cores,
refresh_rate=refresh_rate,
plot_dependency_graph=plot_dependency_graph,
plot_dependency_graph_filename=plot_dependency_graph_filename,
)
else:
check_plot_dependency_graph(plot_dependency_graph=plot_dependency_graph)
check_refresh_rate(refresh_rate=refresh_rate)
return create_flux_executor(
max_workers=max_workers,
cache_directory=cache_directory,
max_cores=max_cores,
resource_dict=resource_dict,
flux_executor=flux_executor,
flux_executor_pmi_mode=flux_executor_pmi_mode,
flux_executor_nesting=flux_executor_nesting,
flux_log_files=flux_log_files,
hostname_localhost=hostname_localhost,
block_allocation=block_allocation,
init_function=init_function,
)
)


class FluxClusterExecutor:
class FluxClusterExecutor(ExecutorInterface):
"""
The executorlib.Executor leverages either the message passing interface (MPI), the SLURM workload manager or
preferable the flux framework for distributing python functions within a given resource allocation. In contrast to
Expand Down Expand Up @@ -299,24 +283,6 @@ def __init__(
refresh_rate: float = 0.01,
plot_dependency_graph: bool = False,
plot_dependency_graph_filename: Optional[str] = None,
):
# Use __new__() instead of __init__(). This function is only implemented to enable auto-completion.
pass

def __new__(
cls,
max_workers: Optional[int] = None,
cache_directory: Optional[str] = None,
max_cores: Optional[int] = None,
resource_dict: Optional[dict] = None,
pysqa_config_directory: Optional[str] = None,
hostname_localhost: Optional[bool] = None,
block_allocation: bool = False,
init_function: Optional[Callable] = None,
disable_dependencies: bool = False,
refresh_rate: float = 0.01,
plot_dependency_graph: bool = False,
plot_dependency_graph_filename: Optional[str] = None,
):
"""
Instead of returning a executorlib.Executor object this function returns either a executorlib.mpi.PyMPIExecutor,
Expand Down Expand Up @@ -377,41 +343,45 @@ def __new__(
if not plot_dependency_graph:
from executorlib.cache.executor import create_file_executor

return create_file_executor(
max_workers=max_workers,
backend="flux_submission",
max_cores=max_cores,
cache_directory=cache_directory,
resource_dict=resource_dict,
flux_executor=None,
flux_executor_pmi_mode=None,
flux_executor_nesting=False,
flux_log_files=False,
pysqa_config_directory=pysqa_config_directory,
hostname_localhost=hostname_localhost,
block_allocation=block_allocation,
init_function=init_function,
disable_dependencies=disable_dependencies,
)
else:
return DependencyExecutor(
executor=create_flux_executor(
super().__init__(
executor=create_file_executor(
max_workers=max_workers,
cache_directory=cache_directory,
backend="flux_submission",
max_cores=max_cores,
cache_directory=cache_directory,
resource_dict=resource_dict,
flux_executor=None,
flux_executor_pmi_mode=None,
flux_executor_nesting=False,
flux_log_files=False,
pysqa_config_directory=pysqa_config_directory,
hostname_localhost=hostname_localhost,
block_allocation=block_allocation,
init_function=init_function,
),
max_cores=max_cores,
refresh_rate=refresh_rate,
plot_dependency_graph=plot_dependency_graph,
plot_dependency_graph_filename=plot_dependency_graph_filename,
disable_dependencies=disable_dependencies,
)
)
else:
super().__init__(
executor=DependencyExecutor(
executor=create_flux_executor(
max_workers=max_workers,
cache_directory=cache_directory,
max_cores=max_cores,
resource_dict=resource_dict,
flux_executor=None,
flux_executor_pmi_mode=None,
flux_executor_nesting=False,
flux_log_files=False,
hostname_localhost=hostname_localhost,
block_allocation=block_allocation,
init_function=init_function,
),
max_cores=max_cores,
refresh_rate=refresh_rate,
plot_dependency_graph=plot_dependency_graph,
plot_dependency_graph_filename=plot_dependency_graph_filename,
)
)


Expand Down
Loading
Loading