Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
100 commits
Select commit Hold shift + click to select a range
a72c2b0
Move scheduler to standalone
jan-janssen Jul 27, 2025
cbd8aa6
Merge remote-tracking branch 'refs/remotes/origin/main' into worker
jan-janssen Jul 27, 2025
2ad819e
fix subprocess spawner docstring
jan-janssen Jul 27, 2025
fc5f199
file executor fix parallel execution
jan-janssen Jul 27, 2025
293adc3
add command tests
jan-janssen Jul 27, 2025
07e8409
move slurm command to standalone
jan-janssen Jul 27, 2025
1220868
implement spawner for pysqa
jan-janssen Jul 27, 2025
7dbf1ed
Merge remote-tracking branch 'origin/main' into worker
jan-janssen Jul 27, 2025
62c4c91
transfer changes
jan-janssen Jul 27, 2025
7786585
Format black
pyiron-runner Jul 27, 2025
36b0b47
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Jul 27, 2025
90ac694
Merge branch 'main' into worker
jan-janssen Jul 28, 2025
e5ddb24
Merge commit '30f1c5d2e522973dda9f286f75d8a3f6e0ac4cba' into worker
jan-janssen Aug 19, 2025
1cc7044
block_allocation
jan-janssen Aug 19, 2025
b42dd06
Merge commit '83653a350296f9b184842a19492c8980adc8402d' into worker
jan-janssen Aug 19, 2025
02f0ce7
Format black
pyiron-runner Aug 19, 2025
2804562
fix type hint
jan-janssen Aug 19, 2025
6fb86f7
implement additional options for SLURM
jan-janssen Aug 19, 2025
ff10b0d
Format black
pyiron-runner Aug 19, 2025
38e0220
fixes
jan-janssen Aug 19, 2025
7138878
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Aug 19, 2025
88d623b
Merge commit 'b3cf4c8985316f853cfc6c90adff5feff337f7f7' into worker
jan-janssen Aug 19, 2025
a9c4c68
add test for flux block allocation
jan-janssen Aug 19, 2025
0e60b28
fixes
jan-janssen Aug 19, 2025
faf4c50
more fixes
jan-janssen Aug 19, 2025
4bd0001
fixes
jan-janssen Aug 19, 2025
cf1cfe9
handle different types
jan-janssen Aug 19, 2025
3887d16
fixes
jan-janssen Aug 19, 2025
a519306
Merge commit '6c9de68e47a560da86a429046817c89635f28234' into worker
jan-janssen Aug 24, 2025
b3ab3a2
Add print commands
jan-janssen Aug 24, 2025
3936620
Format black
pyiron-runner Aug 24, 2025
a9f4eea
hash for worker directory
jan-janssen Aug 24, 2025
6fb2dec
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Aug 24, 2025
ef7f5bf
Update test_fluxclusterexecutor.py
jan-janssen Aug 24, 2025
199d3d8
fixes
jan-janssen Aug 24, 2025
18e2b01
fix test
jan-janssen Aug 24, 2025
15b69d2
only receive jobs when worker is running
jan-janssen Aug 29, 2025
fc7a382
fix job resubmission
jan-janssen Aug 29, 2025
7a3b191
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Aug 29, 2025
4033bf3
fix type hints
jan-janssen Aug 29, 2025
ed922cd
Merge branch 'main' into worker
jan-janssen Aug 29, 2025
1c3e263
restart workers after they were killed
jan-janssen Aug 29, 2025
cea4ca1
Format black
pyiron-runner Aug 29, 2025
35a9372
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Aug 29, 2025
17f1c3a
type fixes
jan-janssen Aug 29, 2025
acd91fe
helper function
jan-janssen Aug 29, 2025
337fa45
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Aug 29, 2025
19e4cbf
introduce restart function
jan-janssen Aug 29, 2025
9053074
fix spelling
jan-janssen Aug 29, 2025
495fcb4
Merge commit '45d504864fabdf07fa631ee004e64bc89df07c20' into worker
jan-janssen Aug 30, 2025
5362c73
shutdown on del
jan-janssen Aug 30, 2025
1b4baa9
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Aug 30, 2025
0855ee9
type fixes
jan-janssen Aug 30, 2025
8cb53ca
Introduce stop function (#791)
jan-janssen Aug 30, 2025
03c0772
Merge branch 'main' into worker
jan-janssen Aug 30, 2025
3593ed9
Merge remote-tracking branch 'origin/main' into worker
jan-janssen Aug 31, 2025
79842e6
merge changes
jan-janssen Aug 31, 2025
8551eda
fix docstring
jan-janssen Aug 31, 2025
a969dd9
fixes
jan-janssen Aug 31, 2025
928465b
fix types
jan-janssen Aug 31, 2025
4eed8e3
Merge remote-tracking branch 'origin/main' into worker
jan-janssen Aug 31, 2025
3e36a5a
consistent naming scheme
jan-janssen Aug 31, 2025
eb134dc
Merge remote-tracking branch 'origin/main' into worker
jan-janssen Aug 31, 2025
ad6ca17
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Aug 31, 2025
f1d0aff
remove duplicated task_done() call
jan-janssen Aug 31, 2025
27bad49
Merge remote-tracking branch 'origin/worker' into worker
jan-janssen Aug 31, 2025
88d0cd6
fixes
jan-janssen Aug 31, 2025
68b1408
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Aug 31, 2025
1458079
cancel items in queue
jan-janssen Aug 31, 2025
e2f01fe
Merge branch 'main' into worker
jan-janssen Aug 31, 2025
095385f
fixes
jan-janssen Aug 31, 2025
ae7ac00
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Aug 31, 2025
78aa031
Merge remote-tracking branch 'origin/main' into worker
jan-janssen Aug 31, 2025
c7f9eaf
fix return
jan-janssen Aug 31, 2025
d4babd8
fix duplicated arguments
jan-janssen Aug 31, 2025
72da39d
resort
jan-janssen Aug 31, 2025
60e2dee
remove unused statement
jan-janssen Aug 31, 2025
dbf3e65
rename variable
jan-janssen Aug 31, 2025
6b73ab7
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Aug 31, 2025
fd9b630
Update shared.py
jan-janssen Aug 31, 2025
b60d3a2
Update blockallocation.py
jan-janssen Aug 31, 2025
c27713f
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Aug 31, 2025
d66d78d
Merge remote-tracking branch 'origin/main' into worker
jan-janssen Sep 8, 2025
a647574
Add docstrings
jan-janssen Sep 8, 2025
4e097b6
test for generate_command()
jan-janssen Sep 8, 2025
1c0148a
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Sep 8, 2025
62d4898
Add more tests
jan-janssen Sep 8, 2025
dba3b48
smaller tests
jan-janssen Sep 8, 2025
ad4e45c
submit a big job
jan-janssen Sep 8, 2025
6248561
extend tests
jan-janssen Sep 8, 2025
1e2d21c
no command
jan-janssen Sep 8, 2025
10db91c
remove error test
jan-janssen Sep 8, 2025
8daa42c
extend tests
jan-janssen Sep 8, 2025
77ae767
change error name
jan-janssen Sep 8, 2025
54c5a23
check more errors
jan-janssen Sep 8, 2025
676b4ec
clean up
jan-janssen Sep 8, 2025
9b497d2
extend tests
jan-janssen Sep 8, 2025
6c624bd
more tests
jan-janssen Sep 8, 2025
6c02845
validate initialization
jan-janssen Sep 8, 2025
5f7b676
fix test
jan-janssen Sep 8, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 40 additions & 20 deletions executorlib/executor/flux.py
Original file line number Diff line number Diff line change
Expand Up @@ -358,28 +358,48 @@ def __init__(
if not plot_dependency_graph:
import pysqa # noqa

from executorlib.task_scheduler.file.task_scheduler import (
create_file_executor,
)
if block_allocation:
from executorlib.task_scheduler.interactive.spawner_pysqa import (
create_pysqa_block_allocation_scheduler,
)

super().__init__(
executor=create_file_executor(
max_workers=max_workers,
backend="flux",
max_cores=max_cores,
cache_directory=cache_directory,
resource_dict=resource_dict,
flux_executor=None,
pmi_mode=pmi_mode,
flux_executor_nesting=False,
flux_log_files=False,
pysqa_config_directory=pysqa_config_directory,
hostname_localhost=hostname_localhost,
block_allocation=block_allocation,
init_function=init_function,
disable_dependencies=disable_dependencies,
super().__init__(
executor=create_pysqa_block_allocation_scheduler(
max_cores=max_cores,
cache_directory=cache_directory,
hostname_localhost=hostname_localhost,
log_obj_size=log_obj_size,
pmi_mode=pmi_mode,
init_function=init_function,
max_workers=max_workers,
resource_dict=resource_dict,
pysqa_config_directory=pysqa_config_directory,
backend="flux",
)
)
else:
from executorlib.task_scheduler.file.task_scheduler import (
create_file_executor,
)

super().__init__(
executor=create_file_executor(
max_workers=max_workers,
backend="flux",
max_cores=max_cores,
cache_directory=cache_directory,
resource_dict=resource_dict,
flux_executor=None,
pmi_mode=pmi_mode,
flux_executor_nesting=False,
flux_log_files=False,
pysqa_config_directory=pysqa_config_directory,
hostname_localhost=hostname_localhost,
block_allocation=block_allocation,
init_function=init_function,
disable_dependencies=disable_dependencies,
)
)
)
else:
super().__init__(
executor=DependencyTaskScheduler(
Expand Down
61 changes: 41 additions & 20 deletions executorlib/executor/slurm.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,28 +166,49 @@ def __init__(
if not plot_dependency_graph:
import pysqa # noqa

from executorlib.task_scheduler.file.task_scheduler import (
create_file_executor,
)
if block_allocation:
from executorlib.task_scheduler.interactive.spawner_pysqa import (
create_pysqa_block_allocation_scheduler,
)

super().__init__(
executor=create_file_executor(
max_workers=max_workers,
backend="slurm",
max_cores=max_cores,
cache_directory=cache_directory,
resource_dict=resource_dict,
pmi_mode=pmi_mode,
flux_executor=None,
flux_executor_nesting=False,
flux_log_files=False,
pysqa_config_directory=pysqa_config_directory,
hostname_localhost=hostname_localhost,
block_allocation=block_allocation,
init_function=init_function,
disable_dependencies=disable_dependencies,
super().__init__(
executor=create_pysqa_block_allocation_scheduler(
max_cores=max_cores,
cache_directory=cache_directory,
hostname_localhost=hostname_localhost,
log_obj_size=log_obj_size,
pmi_mode=pmi_mode,
init_function=init_function,
max_workers=max_workers,
resource_dict=resource_dict,
pysqa_config_directory=pysqa_config_directory,
backend="slurm",
),
)

else:
from executorlib.task_scheduler.file.task_scheduler import (
create_file_executor,
)

super().__init__(
executor=create_file_executor(
max_workers=max_workers,
backend="slurm",
max_cores=max_cores,
cache_directory=cache_directory,
resource_dict=resource_dict,
pmi_mode=pmi_mode,
flux_executor=None,
flux_executor_nesting=False,
flux_log_files=False,
pysqa_config_directory=pysqa_config_directory,
hostname_localhost=hostname_localhost,
block_allocation=block_allocation,
init_function=init_function,
disable_dependencies=disable_dependencies,
)
)
)
else:
super().__init__(
executor=DependencyTaskScheduler(
Expand Down
248 changes: 248 additions & 0 deletions executorlib/task_scheduler/interactive/spawner_pysqa.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,248 @@
import hashlib
import os
from time import sleep
from typing import Callable, Optional

from pysqa import QueueAdapter

from executorlib.standalone.inputcheck import validate_number_of_cores
from executorlib.standalone.interactive.spawner import BaseSpawner
from executorlib.standalone.scheduler import pysqa_execute_command, terminate_with_pysqa
from executorlib.task_scheduler.interactive.blockallocation import (
BlockAllocationTaskScheduler,
)


class PysqaSpawner(BaseSpawner):
def __init__(
self,
cwd: Optional[str] = None,
cores: int = 1,
threads_per_core: int = 1,
gpus_per_core: int = 0,
num_nodes: Optional[int] = None,
exclusive: bool = False,
openmpi_oversubscribe: bool = False,
slurm_cmd_args: Optional[list[str]] = None,
pmi_mode: Optional[str] = None,
config_directory: Optional[str] = None,
backend: Optional[str] = None,
**kwargs,
):
Comment on lines +16 to +31
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Plumb cache_directory and worker_id into the spawner; don’t forward them to PySQA.

Ensures the worker dir logic can use these values and avoids leaking non-PySQA keys to submit_job.

 class PysqaSpawner(BaseSpawner):
     def __init__(
         self,
         cwd: Optional[str] = None,
         cores: int = 1,
         threads_per_core: int = 1,
         gpus_per_core: int = 0,
         num_nodes: Optional[int] = None,
         exclusive: bool = False,
         openmpi_oversubscribe: bool = False,
         slurm_cmd_args: Optional[list[str]] = None,
         pmi_mode: Optional[str] = None,
         config_directory: Optional[str] = None,
         backend: Optional[str] = None,
+        cache_directory: Optional[str] = None,
         **kwargs,
     ):
@@
-        self._pysqa_submission_kwargs = kwargs
+        # Internal/non-PySQA fields
+        self._worker_id = kwargs.pop("worker_id", None)
+        # Accept via explicit arg or kwargs for BC
+        self._cache_directory = cache_directory or kwargs.pop("cache_directory", None)
+        # Remaining kwargs are forwarded to PySQA submit_job
+        self._pysqa_submission_kwargs = kwargs
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
class PysqaSpawner(BaseSpawner):
def __init__(
self,
cwd: Optional[str] = None,
cores: int = 1,
threads_per_core: int = 1,
gpus_per_core: int = 0,
num_nodes: Optional[int] = None,
exclusive: bool = False,
openmpi_oversubscribe: bool = False,
slurm_cmd_args: Optional[list[str]] = None,
pmi_mode: Optional[str] = None,
config_directory: Optional[str] = None,
backend: Optional[str] = None,
**kwargs,
):
class PysqaSpawner(BaseSpawner):
def __init__(
self,
cwd: Optional[str] = None,
cores: int = 1,
threads_per_core: int = 1,
gpus_per_core: int = 0,
num_nodes: Optional[int] = None,
exclusive: bool = False,
openmpi_oversubscribe: bool = False,
slurm_cmd_args: Optional[list[str]] = None,
pmi_mode: Optional[str] = None,
config_directory: Optional[str] = None,
backend: Optional[str] = None,
cache_directory: Optional[str] = None,
**kwargs,
):
# … (other initialization, e.g. self.cwd = cwd, etc.)
# Internal/non-PySQA fields
self._worker_id = kwargs.pop("worker_id", None)
# Accept via explicit arg or kwargs for BC
self._cache_directory = cache_directory or kwargs.pop("cache_directory", None)
# Remaining kwargs are forwarded to PySQA submit_job
self._pysqa_submission_kwargs = kwargs
🤖 Prompt for AI Agents
In executorlib/task_scheduler/interactive/spawner_pysqa.py around lines 16 to
31, the PysqaSpawner __init__ must accept and store cache_directory and
worker_id for use by the worker directory logic and must not forward these
non-PySQA keys to PySQA submit_job; add optional parameters cache_directory:
Optional[str] = None and worker_id: Optional[str] = None to the constructor
signature, assign them to self.cache_directory and self.worker_id, and ensure
any call site that forwards kwargs to PySQA strips these keys from the kwargs
(or builds a new dict without them) before calling submit_job so only valid
PySQA arguments are passed.

"""
Subprocess interface implementation.

Args:
cwd (str, optional): The current working directory. Defaults to None.
cores (int): The number of cores to use. Defaults to 1.
threads_per_core (int): The number of threads per core. Defaults to 1.
gpus_per_core (int): number of GPUs per worker - defaults to 0
num_nodes (int, optional): The number of compute nodes to use for executing the task. Defaults to None.
exclusive (bool): Whether to exclusively reserve the compute nodes, or allow sharing compute notes. Defaults
to False.
openmpi_oversubscribe (bool): Whether to oversubscribe the cores. Defaults to False.
slurm_cmd_args (list, optional): Additional command line arguments for the srun call (SLURM only)
pmi_mode (str, optional): PMI interface to use (OpenMPI v5 requires pmix) default is None
config_directory (str, optional): path to the pysqa config directory (only for pysqa based backend).
backend (str): name of the backend used to spawn tasks.
"""
super().__init__(
cwd=cwd,
cores=cores,
openmpi_oversubscribe=openmpi_oversubscribe,
)
self._threads_per_core = threads_per_core
self._gpus_per_core = gpus_per_core
self._num_nodes = num_nodes
self._exclusive = exclusive
self._slurm_cmd_args = slurm_cmd_args
self._pmi_mode = pmi_mode
self._config_directory = config_directory
self._backend = backend
self._pysqa_submission_kwargs = kwargs
self._process: Optional[int] = None
self._queue_adapter: Optional[QueueAdapter] = None

def bootup(
self,
command_lst: list[str],
stop_function: Optional[Callable] = None,
):
"""
Method to start the subprocess interface.

Args:
command_lst (list[str]): The command list to execute.
stop_function (Callable): Function to stop the interface.

Returns:
bool: Whether the interface was successfully started.
"""
self._queue_adapter = QueueAdapter(
directory=self._config_directory,
queue_type=self._backend,
execute_command=pysqa_execute_command,
)
self._process = self._start_process_helper(
command_lst=command_lst,
queue_adapter=self._queue_adapter,
)
while True:
if self._check_process_helper(command_lst=command_lst):
return True
elif stop_function is not None and stop_function():
self.shutdown(wait=True)
return False
else:
sleep(1) # Wait for the process to start

def generate_command(self, command_lst: list[str]) -> list[str]:
"""
Method to generate the command list.

Args:
command_lst (list[str]): The command list.

Returns:
list[str]: The generated command list.
"""
if self._cores > 1 and self._backend == "slurm":
command_prepend = ["srun", "-n", str(self._cores)]
if self._pmi_mode is not None:
command_prepend += ["--mpi=" + self._pmi_mode]
if self._num_nodes is not None:
command_prepend += ["-N", str(self._num_nodes)]
if self._threads_per_core > 1:
command_prepend += ["--cpus-per-task=" + str(self._threads_per_core)]
if self._gpus_per_core > 0:
command_prepend += ["--gpus-per-task=" + str(self._gpus_per_core)]
if self._exclusive:
command_prepend += ["--exact"]
if self._openmpi_oversubscribe:
command_prepend += ["--oversubscribe"]
if self._slurm_cmd_args is not None and len(self._slurm_cmd_args) > 0:
command_prepend += self._slurm_cmd_args
elif self._cores > 1 and self._backend == "flux":
command_prepend = ["flux", "run", "-n", str(self._cores)]
if self._pmi_mode is not None:
command_prepend += ["-o", "pmi=" + self._pmi_mode]
if self._num_nodes is not None:
raise ValueError()
if self._threads_per_core > 1:
raise ValueError()
if self._gpus_per_core > 0:
raise ValueError()
if self._exclusive:
raise ValueError()
if self._openmpi_oversubscribe:
raise ValueError()
elif self._cores > 1:
raise ValueError(
f"backend should be None, slurm or flux, not {self._backend}"
)
else:
command_prepend = []
return command_prepend + command_lst

def shutdown(self, wait: bool = True):
"""
Method to shutdown the subprocess interface.

Args:
wait (bool, optional): Whether to wait for the interface to shutdown. Defaults to True.
"""
if self._process is not None:
terminate_with_pysqa(
queue_id=self._process,
config_directory=self._config_directory,
backend=self._backend,
)
self._process = None

def poll(self) -> bool:
"""
Method to check if the subprocess interface is running.

Returns:
bool: True if the interface is running, False otherwise.
"""
if self._process is not None and self._queue_adapter is not None:
status = self._queue_adapter.get_status_of_job(process_id=self._process)
return status in ["running", "pending"]
else:
return False

def _start_process_helper(
self, command_lst: list[str], queue_adapter: QueueAdapter
) -> int:
hash = hashlib.md5(str(self).encode()).hexdigest()
if self._cwd is not None:
working_directory = os.path.join(self._cwd, hash)
else:
working_directory = os.path.abspath(hash)
return queue_adapter.submit_job(
command=" ".join(self.generate_command(command_lst=command_lst)),
working_directory=working_directory,
cores=int(self._cores * self._threads_per_core),
**self._pysqa_submission_kwargs,
)
Comment on lines +175 to +188
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Stop hashing into cwd; derive working_directory from cache_directory + worker_id (addresses open TODO).

  • Avoids insecure md5 and shadowing built-in hash.
  • Produces stable per-worker directories under the cache dir as requested in the PR checklist.
-    def _start_process_helper(
-        self, command_lst: list[str], queue_adapter: QueueAdapter
-    ) -> int:
-        hash = hashlib.md5(str(self).encode()).hexdigest()
-        if self._cwd is not None:
-            working_directory = os.path.join(self._cwd, hash)
-        else:
-            working_directory = os.path.abspath(hash)
+    def _start_process_helper(
+        self, command_lst: list[str], queue_adapter: QueueAdapter
+    ) -> int:
+        # Prefer cache_directory + worker_id; fall back to a short blake2 hash
+        base_dir = (
+            self._cache_directory
+            or (self._cwd if self._cwd is not None else os.path.abspath("."))
+        )
+        if getattr(self, "_worker_id", None) is not None:
+            worker_dir = f"worker_{self._worker_id}"
+        else:
+            # stable-ish fallback without using insecure md5
+            worker_dir = "worker_" + hashlib.blake2s(
+                repr(self).encode(), digest_size=8
+            ).hexdigest()
+        working_directory = os.path.join(base_dir, worker_dir)
+        os.makedirs(working_directory, exist_ok=True)
         return queue_adapter.submit_job(
             command=" ".join(self.generate_command(command_lst=command_lst)),
             working_directory=working_directory,
             cores=int(self._cores * self._threads_per_core),
             **self._pysqa_submission_kwargs,
         )
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def _start_process_helper(
self, command_lst: list[str], queue_adapter: QueueAdapter
) -> int:
hash = hashlib.md5(str(self).encode()).hexdigest()
if self._cwd is not None:
working_directory = os.path.join(self._cwd, hash)
else:
working_directory = os.path.abspath(hash)
return queue_adapter.submit_job(
command=" ".join(self.generate_command(command_lst=command_lst)),
working_directory=working_directory,
cores=int(self._cores * self._threads_per_core),
**self._pysqa_submission_kwargs,
)
def _start_process_helper(
self, command_lst: list[str], queue_adapter: QueueAdapter
) -> int:
# Prefer cache_directory + worker_id; fall back to a short blake2 hash
base_dir = (
self._cache_directory
or (self._cwd if self._cwd is not None else os.path.abspath("."))
)
if getattr(self, "_worker_id", None) is not None:
worker_dir = f"worker_{self._worker_id}"
else:
# stable-ish fallback without using insecure md5
worker_dir = "worker_" + hashlib.blake2s(
repr(self).encode(), digest_size=8
).hexdigest()
working_directory = os.path.join(base_dir, worker_dir)
os.makedirs(working_directory, exist_ok=True)
return queue_adapter.submit_job(
command=" ".join(self.generate_command(command_lst=command_lst)),
working_directory=working_directory,
cores=int(self._cores * self._threads_per_core),
**self._pysqa_submission_kwargs,
)
🧰 Tools
🪛 Ruff (0.12.2)

178-178: Probable use of insecure hash functions in hashlib: md5

(S324)

🤖 Prompt for AI Agents
In executorlib/task_scheduler/interactive/spawner_pysqa.py around lines 175-188,
replace the current MD5-based working_directory logic (which shadows the
built-in name `hash` and hashes self) with a stable per-worker path derived from
the configured cache directory and the worker id: build working_directory =
os.path.join(self._cache_directory, str(self._worker_id)) (or use the existing
cache dir attribute name if different), ensure the cache directory exists
(os.makedirs(..., exist_ok=True)), avoid using hashlib/md5 and do not shadow
built-ins, and keep the rest of the queue_adapter.submit_job call unchanged so
each worker uses a persistent, predictable subdirectory under the cache dir.


def _check_process_helper(self, command_lst: list[str]) -> bool:
if self._queue_adapter is not None:
status = self._queue_adapter.get_status_of_job(process_id=self._process)
else:
status = None
if status == "running":
return True
elif status is None:
raise RuntimeError(
f"Failed to start the process with command: {command_lst}"
)
elif status == "error":
self._process = self._start_process_helper(
command_lst=command_lst, queue_adapter=self._queue_adapter
)
return False

def __del__(self):
self.shutdown(wait=True)


def create_pysqa_block_allocation_scheduler(
max_cores: Optional[int] = None,
cache_directory: Optional[str] = None,
hostname_localhost: Optional[bool] = None,
log_obj_size: bool = False,
pmi_mode: Optional[str] = None,
init_function: Optional[Callable] = None,
max_workers: Optional[int] = None,
resource_dict: Optional[dict] = None,
pysqa_config_directory: Optional[str] = None,
backend: Optional[str] = None,
):
if resource_dict is None:
resource_dict = {}
cores_per_worker = resource_dict.get("cores", 1)
if "cwd" in resource_dict and resource_dict["cwd"] is not None:
resource_dict["cwd"] = os.path.abspath(resource_dict["cwd"])
if cache_directory is not None:
resource_dict["cache_directory"] = os.path.abspath(cache_directory)
else:
resource_dict["cache_directory"] = os.path.abspath(".")
resource_dict["hostname_localhost"] = hostname_localhost
resource_dict["log_obj_size"] = log_obj_size
resource_dict["pmi_mode"] = pmi_mode
resource_dict["init_function"] = init_function
resource_dict["config_directory"] = pysqa_config_directory
resource_dict["backend"] = backend
max_workers = validate_number_of_cores(
max_cores=max_cores,
max_workers=max_workers,
cores_per_worker=cores_per_worker,
set_local_cores=False,
)
return BlockAllocationTaskScheduler(
max_workers=max_workers,
executor_kwargs=resource_dict,
spawner=PysqaSpawner,
)
Loading
Loading