Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/pipeline.yml
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,7 @@ jobs:
run: echo -e "channels:\n - conda-forge\n" > .condarc
- uses: conda-incubator/setup-miniconda@v3
with:
python-version: '3.9'
python-version: '3.10'
miniforge-version: latest
condarc-file: .condarc
environment-file: .ci_support/environment-old.yml
Expand Down
5 changes: 5 additions & 0 deletions executorlib/backend/cache_parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import cloudpickle

from executorlib.standalone.error import backend_write_error_file
from executorlib.task_scheduler.file.backend import (
backend_load_file,
backend_write_file,
Expand Down Expand Up @@ -53,6 +54,10 @@ def main() -> None:
output={"error": error},
runtime=time.time() - time_start,
)
backend_write_error_file(
error=error,
apply_dict=apply_dict,
)
else:
if mpi_rank_zero:
backend_write_file(
Expand Down
5 changes: 5 additions & 0 deletions executorlib/backend/interactive_parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import cloudpickle
import zmq

from executorlib.standalone.error import backend_write_error_file
from executorlib.standalone.interactive.backend import call_funct, parse_arguments
from executorlib.standalone.interactive.communication import (
interface_connect,
Expand Down Expand Up @@ -82,6 +83,10 @@ def main() -> None:
socket=socket,
result_dict={"error": error},
)
backend_write_error_file(
error=error,
apply_dict=input_dict,
)
else:
# Send output
if mpi_rank_zero:
Expand Down
5 changes: 5 additions & 0 deletions executorlib/backend/interactive_serial.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from os.path import abspath
from typing import Optional

from executorlib.standalone.error import backend_write_error_file
from executorlib.standalone.interactive.backend import call_funct, parse_arguments
from executorlib.standalone.interactive.communication import (
interface_connect,
Expand Down Expand Up @@ -58,6 +59,10 @@ def main(argument_lst: Optional[list[str]] = None):
socket=socket,
result_dict={"error": error},
)
backend_write_error_file(
error=error,
apply_dict=input_dict,
)
else:
# Send output
interface_send(socket=socket, result_dict={"result": output})
Expand Down
10 changes: 10 additions & 0 deletions executorlib/executor/flux.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ class FluxJobExecutor(BaseExecutor):
Defaults to None.
- exclusive (bool): Whether to exclusively reserve the compute nodes, or allow sharing
compute notes. Defaults to False.
- error_log_file (str): Name of the error log file to use for storing exceptions raised
by the Python functions submitted to the Executor.
flux_executor (flux.job.FluxExecutor): Flux Python interface to submit the workers to flux
flux_executor_pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None (Flux only)
flux_executor_nesting (bool): Provide hierarchically nested Flux job scheduler inside the submitted function.
Expand Down Expand Up @@ -126,6 +128,8 @@ def __init__(
Defaults to None.
- exclusive (bool): Whether to exclusively reserve the compute nodes, or allow sharing
compute notes. Defaults to False.
- error_log_file (str): Name of the error log file to use for storing exceptions
raised by the Python functions submitted to the Executor.
flux_executor (flux.job.FluxExecutor): Flux Python interface to submit the workers to flux
flux_executor_pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None (Flux only)
flux_executor_nesting (bool): Provide hierarchically nested Flux job scheduler inside the submitted function.
Expand Down Expand Up @@ -229,6 +233,8 @@ class FluxClusterExecutor(BaseExecutor):
- openmpi_oversubscribe (bool): adds the `--oversubscribe` command line flag (OpenMPI and
SLURM only) - default False
- slurm_cmd_args (list): Additional command line arguments for the srun call (SLURM only)
- error_log_file (str): Name of the error log file to use for storing exceptions raised
by the Python functions submitted to the Executor.
pysqa_config_directory (str, optional): path to the pysqa config directory (only for pysqa based backend).
hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
context of an HPC cluster this essential to be able to communicate to an
Expand Down Expand Up @@ -308,6 +314,8 @@ def __init__(
and SLURM only) - default False
- slurm_cmd_args (list): Additional command line arguments for the srun call (SLURM
only)
- error_log_file (str): Name of the error log file to use for storing exceptions
raised by the Python functions submitted to the Executor.
pysqa_config_directory (str, optional): path to the pysqa config directory (only for pysqa based backend).
hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
context of an HPC cluster this essential to be able to communicate to an
Expand Down Expand Up @@ -424,6 +432,8 @@ def create_flux_executor(
Defaults to None.
- exclusive (bool): Whether to exclusively reserve the compute nodes, or allow sharing
compute notes. Defaults to False.
- error_log_file (str): Name of the error log file to use for storing exceptions raised
by the Python functions submitted to the Executor.
flux_executor (flux.job.FluxExecutor): Flux Python interface to submit the workers to flux
flux_executor_pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None (Flux only)
flux_executor_nesting (bool): Provide hierarchically nested Flux job scheduler inside the submitted function.
Expand Down
10 changes: 10 additions & 0 deletions executorlib/executor/single.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ class SingleNodeExecutor(BaseExecutor):
- openmpi_oversubscribe (bool): adds the `--oversubscribe` command line flag (OpenMPI and
SLURM only) - default False
- slurm_cmd_args (list): Additional command line arguments for the srun call (SLURM only)
- error_log_file (str): Name of the error log file to use for storing exceptions raised
by the Python functions submitted to the Executor.
hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
context of an HPC cluster this essential to be able to communicate to an
Executor running on a different compute node within the same allocation. And
Expand Down Expand Up @@ -116,6 +118,8 @@ def __init__(
and SLURM only) - default False
- slurm_cmd_args (list): Additional command line arguments for the srun call (SLURM
only)
- error_log_file (str): Name of the error log file to use for storing exceptions
raised by the Python functions submitted to the Executor.
hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
context of an HPC cluster this essential to be able to communicate to an
Executor running on a different compute node within the same allocation. And
Expand Down Expand Up @@ -202,6 +206,8 @@ class TestClusterExecutor(BaseExecutor):
- threads_per_core (int): number of OpenMP threads to be used for each function call
- gpus_per_core (int): number of GPUs per worker - defaults to 0
- cwd (str/None): current working directory where the parallel python task is executed
- error_log_file (str): Name of the error log file to use for storing exceptions raised
by the Python functions submitted to the Executor.
hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
context of an HPC cluster this essential to be able to communicate to an
Executor running on a different compute node within the same allocation. And
Expand Down Expand Up @@ -273,6 +279,8 @@ def __init__(
- threads_per_core (int): number of OpenMP threads to be used for each function call
- gpus_per_core (int): number of GPUs per worker - defaults to 0
- cwd (str/None): current working directory where the parallel python task is executed
- error_log_file (str): Name of the error log file to use for storing exceptions
raised by the Python functions submitted to the Executor.
hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
context of an HPC cluster this essential to be able to communicate to an
Executor running on a different compute node within the same allocation. And
Expand Down Expand Up @@ -381,6 +389,8 @@ def create_single_node_executor(
and SLURM only) - default False
- slurm_cmd_args (list): Additional command line arguments for the srun call (SLURM
only)
- error_log_file (str): Name of the error log file to use for storing exceptions raised
by the Python functions submitted to the Executor.
hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
context of an HPC cluster this essential to be able to communicate to an
Executor running on a different compute node within the same allocation. And
Expand Down
10 changes: 10 additions & 0 deletions executorlib/executor/slurm.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ class SlurmClusterExecutor(BaseExecutor):
- openmpi_oversubscribe (bool): adds the `--oversubscribe` command line flag (OpenMPI and
SLURM only) - default False
- slurm_cmd_args (list): Additional command line arguments for the srun call (SLURM only)
- error_log_file (str): Name of the error log file to use for storing exceptions raised
by the Python functions submitted to the Executor.
pysqa_config_directory (str, optional): path to the pysqa config directory (only for pysqa based backend).
hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
context of an HPC cluster this essential to be able to communicate to an
Expand Down Expand Up @@ -120,6 +122,8 @@ def __init__(
and SLURM only) - default False
- slurm_cmd_args (list): Additional command line arguments for the srun call (SLURM
only)
- error_log_file (str): Name of the error log file to use for storing exceptions
raised by the Python functions submitted to the Executor.
pysqa_config_directory (str, optional): path to the pysqa config directory (only for pysqa based backend).
hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
context of an HPC cluster this essential to be able to communicate to an
Expand Down Expand Up @@ -226,6 +230,8 @@ class SlurmJobExecutor(BaseExecutor):
Defaults to None.
- exclusive (bool): Whether to exclusively reserve the compute nodes, or allow sharing
compute notes. Defaults to False.
- error_log_file (str): Name of the error log file to use for storing exceptions raised
by the Python functions submitted to the Executor.
hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
context of an HPC cluster this essential to be able to communicate to an
Executor running on a different compute node within the same allocation. And
Expand Down Expand Up @@ -307,6 +313,8 @@ def __init__(
Defaults to None.
- exclusive (bool): Whether to exclusively reserve the compute nodes, or allow sharing
compute notes. Defaults to False.
- error_log_file (str): Name of the error log file to use for storing exceptions
raised by the Python functions submitted to the Executor.
hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
context of an HPC cluster this essential to be able to communicate to an
Executor running on a different compute node within the same allocation. And
Expand Down Expand Up @@ -408,6 +416,8 @@ def create_slurm_executor(
Defaults to None.
- exclusive (bool): Whether to exclusively reserve the compute nodes, or allow sharing
compute notes. Defaults to False.
- error_log_file (str): Name of the error log file to use for storing exceptions raised
by the Python functions submitted to the Executor.
hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
context of an HPC cluster this essential to be able to communicate to an
Executor running on a different compute node within the same allocation. And
Expand Down
1 change: 1 addition & 0 deletions executorlib/standalone/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
"error": "error",
"runtime": "runtime",
"queue_id": "queue_id",
"error_log_file": "error_log_file",
}


Expand Down
21 changes: 21 additions & 0 deletions executorlib/standalone/error.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import traceback


def backend_write_error_file(error: Exception, apply_dict: dict) -> None:
"""
Write an error to a file if specified in the apply_dict.

Args:
error (Exception): The error to be written.
apply_dict (dict): Dictionary containing additional parameters.

Returns:
None
"""
error_log_file = apply_dict.get("error_log_file")
if error_log_file is not None:
with open(error_log_file, "a") as f:
f.write("function: " + str(apply_dict["fn"]) + "\n")
f.write("args: " + str(apply_dict["args"]) + "\n")
f.write("kwargs: " + str(apply_dict["kwargs"]) + "\n")
traceback.print_exception(error, file=f)
5 changes: 5 additions & 0 deletions executorlib/task_scheduler/file/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import time
from typing import Any

from executorlib.standalone.error import backend_write_error_file
from executorlib.task_scheduler.file.hdf import dump, load
from executorlib.task_scheduler.file.shared import FutureItem

Expand Down Expand Up @@ -77,6 +78,10 @@ def backend_execute_task_in_file(file_name: str) -> None:
}
except Exception as error:
result = {"error": error}
backend_write_error_file(
error=error,
apply_dict=apply_dict,
)

backend_write_file(
file_name=file_name,
Expand Down
4 changes: 4 additions & 0 deletions executorlib/task_scheduler/file/hdf.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ def load(file_name: str) -> dict:
data_dict["kwargs"] = cloudpickle.loads(np.void(hdf["/input_kwargs"]))
else:
data_dict["kwargs"] = {}
if "error_log_file" in hdf:
data_dict["error_log_file"] = cloudpickle.loads(
np.void(hdf["/error_log_file"])
)
return data_dict


Expand Down
2 changes: 2 additions & 0 deletions executorlib/task_scheduler/file/shared.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,13 +126,15 @@ def execute_tasks_h5(
)
cache_key = task_resource_dict.pop("cache_key", None)
cache_directory = os.path.abspath(task_resource_dict.pop("cache_directory"))
error_log_file = task_resource_dict.pop("error_log_file", None)
task_key, data_dict = serialize_funct_h5(
fn=task_dict["fn"],
fn_args=task_args,
fn_kwargs=task_kwargs,
resource_dict=task_resource_dict,
cache_key=cache_key,
)
data_dict["error_log_file"] = error_log_file
if task_key not in memory_dict:
if os.path.join(
cache_directory, task_key + "_o.h5"
Expand Down
3 changes: 3 additions & 0 deletions executorlib/task_scheduler/interactive/shared.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ def execute_tasks(
cache_key: Optional[str] = None,
queue_join_on_shutdown: bool = True,
log_obj_size: bool = False,
error_log_file: Optional[str] = None,
**kwargs,
) -> None:
"""
Expand Down Expand Up @@ -70,6 +71,8 @@ def execute_tasks(
future_queue.join()
break
elif "fn" in task_dict and "future" in task_dict:
if error_log_file is not None:
task_dict["error_log_file"] = error_log_file
if cache_directory is None:
_execute_task_without_cache(
interface=interface, task_dict=task_dict, future_queue=future_queue
Expand Down
1 change: 0 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ classifiers = [
"License :: OSI Approved :: BSD License",
"Intended Audience :: Science/Research",
"Operating System :: OS Independent",
"Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
"Programming Language :: Python :: 3.12",
Expand Down
Loading
Loading