-
Notifications
You must be signed in to change notification settings - Fork 3
Worker for overflow queue #763
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
# Conflicts: # executorlib/standalone/scheduler.py # executorlib/task_scheduler/file/queue_spawner.py
# Conflicts: # executorlib/standalone/command.py # executorlib/task_scheduler/file/shared.py # executorlib/task_scheduler/interactive/slurmspawner.py # tests/test_standalone_command.py
WalkthroughAdds a PySQA-backed block-allocation pathway. Flux and Slurm executors now conditionally create a block-allocation scheduler via create_pysqa_block_allocation_scheduler when block_allocation=True; otherwise they use the file-based executor. Introduces PysqaSpawner and a factory to build BlockAllocationTaskScheduler. Tests added for Flux block allocation, PysqaSpawner lifecycle/commands, and Slurm init gating. Changes
Sequence Diagram(s)sequenceDiagram
autonumber
participant U as User code
participant Exec as Flux/Slurm Executor
participant FFactory as create_file_executor
participant BFactory as create_pysqa_block_allocation_scheduler
participant Sched as BlockAllocationTaskScheduler
participant Sp as PysqaSpawner
U->>Exec: __init__(block_allocation, resource_dict, ...)
alt block_allocation == True
Exec->>BFactory: build scheduler (backend="flux" or "slurm", config, resources)
BFactory->>Sp: construct PysqaSpawner(config/resources)
BFactory->>Sched: BlockAllocationTaskScheduler(spawner=Sp, max_workers)
Exec-->>U: executor with block-allocation scheduler
else
Exec->>FFactory: create_file_executor(...)
Exec-->>U: executor with file-based scheduler
end
sequenceDiagram
autonumber
participant Sched as BlockAllocationTaskScheduler
participant Sp as PysqaSpawner
participant QA as QueueAdapter
participant Q as Queue (Slurm/Flux)
participant W as Worker process
Sched->>Sp: bootup(command_lst, stop_function?)
Sp->>QA: submit(cores, cwd, cmd)
QA->>Q: enqueue job
Sp->>Sp: _check_process_helper()
Q-->>QA: status (PENDING/RUNNING/ERROR)
QA-->>Sp: status
alt status == RUNNING
Sp-->>Sched: True (started)
else status == ERROR
Sp->>QA: terminate()
Sp-->>Sched: False
end
loop polling
Sched->>Sp: poll()
Sp->>QA: status()
QA-->>Sp: RUNNING/PENDING/DONE
Sp-->>Sched: bool
end
Sched->>Sp: shutdown(wait=True)
Sp->>QA: terminate()
Sp-->>Sched: complete
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Poem
✨ Finishing Touches
🧪 Generate unit tests
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
for more information, see https://pre-commit.ci
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #763 +/- ##
==========================================
+ Coverage 97.84% 98.12% +0.27%
==========================================
Files 32 33 +1
Lines 1530 1649 +119
==========================================
+ Hits 1497 1618 +121
+ Misses 33 31 -2 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
for more information, see https://pre-commit.ci
for more information, see https://pre-commit.ci
for more information, see https://pre-commit.ci
# Conflicts: # executorlib/standalone/interactive/communication.py # executorlib/standalone/interactive/spawner.py # executorlib/task_scheduler/interactive/blockallocation.py # executorlib/task_scheduler/interactive/shared.py # executorlib/task_scheduler/interactive/spawner_flux.py
for more information, see https://pre-commit.ci
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 4
🧹 Nitpick comments (11)
executorlib/executor/slurm.py (1)
189-211: Non-block path: init_function will raise in create_file_executor; consider an early, clearer error.Passing init_function to create_file_executor triggers a ValueError (“not available with the pysqa based backend”). A short pre-check here with a tailored message would be more direct for users.
super().__init__( executor=create_file_executor( max_workers=max_workers, backend="slurm", max_cores=max_cores, cache_directory=cache_directory, resource_dict=resource_dict, pmi_mode=pmi_mode, flux_executor=None, flux_executor_nesting=False, flux_log_files=False, pysqa_config_directory=pysqa_config_directory, hostname_localhost=hostname_localhost, - block_allocation=block_allocation, - init_function=init_function, + block_allocation=block_allocation, + init_function=init_function, # will raise downstream disable_dependencies=disable_dependencies, ) )Alternatively (preferred), add an explicit guard just before the call:
if init_function is not None: raise ValueError("init_function is unsupported with the pysqa file backend; use block_allocation=True.")executorlib/executor/flux.py (1)
381-401: Non-block path mirrors existing behavior; minor polish only.Looks fine. If you adopt the earlier pre-check for init_function in Slurm, consider mirroring it here for consistency.
tests/test_fluxclusterexecutor.py (4)
64-78: Avoid brittle assertions on cache directory size.
len(os.listdir("executorlib_cache"))is environment- and implementation-dependent. Assert minimum expectations or patterns instead.Apply this diff:
- self.assertEqual(len(os.listdir("executorlib_cache")), 2) + # block-allocation creates at least 2 artifacts (task + input) + self.assertGreaterEqual(len(os.listdir("executorlib_cache")), 2)
79-97: Same here: relax directory-size assertion for robustness.Use a lower bound rather than an exact count.
- self.assertEqual(len(os.listdir("executorlib_cache")), 4) + # two tasks -> expect at least four artifacts + self.assertGreaterEqual(len(os.listdir("executorlib_cache")), 4)
28-31: Speed up test without reducing signal.A full second sleep slows CI. 100–200 ms is typically enough to exercise scheduling here.
- sleep(1) + sleep(0.2)
171-190: Prefer public APIs over private members in tests.The test reaches into
_processand_check_process_helper, which is fragile. Assert viapoll()and observable behavior instead.- process_id = interface_flux._process - interface_flux.shutdown(wait=True) - interface_flux._process = process_id - self.assertFalse(interface_flux.poll()) - self.assertFalse(interface_flux._check_process_helper(command_lst=["sleep", "1"])) + interface_flux.shutdown(wait=True) + self.assertFalse(interface_flux.poll())tests/test_standalone_interactive_backend.py (1)
138-140: Make the failure assertion backend-agnostic.On some systems SLURM binaries exist but submission fails, raising
subprocess.CalledProcessErrorinstead ofFileNotFoundError.- with self.assertRaises(FileNotFoundError): + import subprocess + with self.assertRaises((FileNotFoundError, subprocess.CalledProcessError)): interface_slurm.bootup(command_lst=["sleep", "1"])executorlib/task_scheduler/interactive/spawner_pysqa.py (4)
129-138: Add actionable error messages for Flux constraints.Bare
ValueError()hinders debugging.- if self._num_nodes is not None: - raise ValueError() - if self._threads_per_core > 1: - raise ValueError() - if self._gpus_per_core > 0: - raise ValueError() - if self._exclusive: - raise ValueError() - if self._openmpi_oversubscribe: - raise ValueError() + if self._num_nodes is not None: + raise ValueError("Flux backend: num_nodes is not supported when cores>1") + if self._threads_per_core > 1: + raise ValueError("Flux backend: threads_per_core>1 is not supported") + if self._gpus_per_core > 0: + raise ValueError("Flux backend: gpus_per_core>0 is not supported") + if self._exclusive: + raise ValueError("Flux backend: exclusive allocation is not supported") + if self._openmpi_oversubscribe: + raise ValueError("Flux backend: oversubscribe is not supported")
140-142: Fix misleading error message for multi-core without backend.Backend cannot be None when
cores > 1.- raise ValueError( - f"backend should be None, slurm or flux, not {self._backend}" - ) + raise ValueError( + f"When cores>1, backend must be 'slurm' or 'flux'; got {self._backend!r}" + )
147-160: Silence unused-arg warning forwaitor implement waiting.Currently unused; keep interface but declare intent.
def shutdown(self, wait: bool = True): """ Method to shutdown the subprocess interface. @@ - if self._process is not None: + # 'wait' is intentionally unused for queue-backed jobs + if self._process is not None:Alternatively:
- def shutdown(self, wait: bool = True): + def shutdown(self, wait: bool = True): # noqa: ARG002
90-98: Consider a bootup timeout.An endless loop can hang indefinitely without a
stop_function. Expose astartup_timeoutor retry/backoff.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (6)
executorlib/executor/flux.py(1 hunks)executorlib/executor/slurm.py(1 hunks)executorlib/task_scheduler/interactive/spawner_pysqa.py(1 hunks)tests/test_fluxclusterexecutor.py(5 hunks)tests/test_slurmclusterexecutor.py(2 hunks)tests/test_standalone_interactive_backend.py(2 hunks)
🧰 Additional context used
🧬 Code graph analysis (6)
tests/test_slurmclusterexecutor.py (1)
executorlib/executor/slurm.py (1)
SlurmClusterExecutor(22-229)
executorlib/task_scheduler/interactive/spawner_pysqa.py (4)
executorlib/standalone/inputcheck.py (1)
validate_number_of_cores(176-200)executorlib/standalone/interactive/spawner.py (1)
BaseSpawner(9-64)executorlib/standalone/scheduler.py (2)
pysqa_execute_command(32-65)terminate_with_pysqa(8-29)executorlib/task_scheduler/interactive/blockallocation.py (1)
BlockAllocationTaskScheduler(29-197)
tests/test_fluxclusterexecutor.py (1)
executorlib/task_scheduler/interactive/spawner_pysqa.py (2)
PysqaSpawner(16-208)_check_process_helper(190-205)
executorlib/executor/slurm.py (2)
executorlib/task_scheduler/interactive/spawner_pysqa.py (1)
create_pysqa_block_allocation_scheduler(211-248)executorlib/task_scheduler/file/task_scheduler.py (1)
create_file_executor(85-131)
tests/test_standalone_interactive_backend.py (1)
executorlib/task_scheduler/interactive/spawner_pysqa.py (3)
PysqaSpawner(16-208)create_pysqa_block_allocation_scheduler(211-248)_check_process_helper(190-205)
executorlib/executor/flux.py (6)
executorlib/task_scheduler/interactive/spawner_pysqa.py (1)
create_pysqa_block_allocation_scheduler(211-248)executorlib/task_scheduler/interactive/blockallocation.py (2)
max_workers(95-96)max_workers(99-121)executorlib/task_scheduler/interactive/dependency.py (2)
max_workers(87-95)max_workers(98-111)executorlib/executor/base.py (2)
max_workers(26-27)max_workers(30-31)executorlib/task_scheduler/base.py (2)
max_workers(36-37)max_workers(40-41)executorlib/task_scheduler/file/task_scheduler.py (1)
create_file_executor(85-131)
🪛 Ruff (0.12.2)
executorlib/task_scheduler/interactive/spawner_pysqa.py
140-142: Avoid specifying long messages outside the exception class
(TRY003)
147-147: Unused method argument: wait
(ARG002)
178-178: Probable use of insecure hash functions in hashlib: md5
(S324)
198-200: Avoid specifying long messages outside the exception class
(TRY003)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: notebooks_integration
🔇 Additional comments (2)
executorlib/executor/slurm.py (1)
169-187: Block-allocation via PySQA path looks correct; verify default cache_directory semantics.create_pysqa_block_allocation_scheduler forces cache_directory to os.path.abspath(".") when None, which may diverge from the documented default "executorlib_cache". Please confirm intended behavior across Slurm/Flux so users don’t see different defaults depending on block_allocation.
executorlib/executor/flux.py (1)
361-379: Flux block-allocation via PySQA path is consistent with Slurm; re-check default cache_directory.Same note as for Slurm: create_pysqa_block_allocation_scheduler defaults to "." when cache_directory is None. Ensure this matches documented defaults and user expectations.
| class PysqaSpawner(BaseSpawner): | ||
| def __init__( | ||
| self, | ||
| cwd: Optional[str] = None, | ||
| cores: int = 1, | ||
| threads_per_core: int = 1, | ||
| gpus_per_core: int = 0, | ||
| num_nodes: Optional[int] = None, | ||
| exclusive: bool = False, | ||
| openmpi_oversubscribe: bool = False, | ||
| slurm_cmd_args: Optional[list[str]] = None, | ||
| pmi_mode: Optional[str] = None, | ||
| config_directory: Optional[str] = None, | ||
| backend: Optional[str] = None, | ||
| **kwargs, | ||
| ): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Plumb cache_directory and worker_id into the spawner; don’t forward them to PySQA.
Ensures the worker dir logic can use these values and avoids leaking non-PySQA keys to submit_job.
class PysqaSpawner(BaseSpawner):
def __init__(
self,
cwd: Optional[str] = None,
cores: int = 1,
threads_per_core: int = 1,
gpus_per_core: int = 0,
num_nodes: Optional[int] = None,
exclusive: bool = False,
openmpi_oversubscribe: bool = False,
slurm_cmd_args: Optional[list[str]] = None,
pmi_mode: Optional[str] = None,
config_directory: Optional[str] = None,
backend: Optional[str] = None,
+ cache_directory: Optional[str] = None,
**kwargs,
):
@@
- self._pysqa_submission_kwargs = kwargs
+ # Internal/non-PySQA fields
+ self._worker_id = kwargs.pop("worker_id", None)
+ # Accept via explicit arg or kwargs for BC
+ self._cache_directory = cache_directory or kwargs.pop("cache_directory", None)
+ # Remaining kwargs are forwarded to PySQA submit_job
+ self._pysqa_submission_kwargs = kwargs📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| class PysqaSpawner(BaseSpawner): | |
| def __init__( | |
| self, | |
| cwd: Optional[str] = None, | |
| cores: int = 1, | |
| threads_per_core: int = 1, | |
| gpus_per_core: int = 0, | |
| num_nodes: Optional[int] = None, | |
| exclusive: bool = False, | |
| openmpi_oversubscribe: bool = False, | |
| slurm_cmd_args: Optional[list[str]] = None, | |
| pmi_mode: Optional[str] = None, | |
| config_directory: Optional[str] = None, | |
| backend: Optional[str] = None, | |
| **kwargs, | |
| ): | |
| class PysqaSpawner(BaseSpawner): | |
| def __init__( | |
| self, | |
| cwd: Optional[str] = None, | |
| cores: int = 1, | |
| threads_per_core: int = 1, | |
| gpus_per_core: int = 0, | |
| num_nodes: Optional[int] = None, | |
| exclusive: bool = False, | |
| openmpi_oversubscribe: bool = False, | |
| slurm_cmd_args: Optional[list[str]] = None, | |
| pmi_mode: Optional[str] = None, | |
| config_directory: Optional[str] = None, | |
| backend: Optional[str] = None, | |
| cache_directory: Optional[str] = None, | |
| **kwargs, | |
| ): | |
| # … (other initialization, e.g. self.cwd = cwd, etc.) | |
| # Internal/non-PySQA fields | |
| self._worker_id = kwargs.pop("worker_id", None) | |
| # Accept via explicit arg or kwargs for BC | |
| self._cache_directory = cache_directory or kwargs.pop("cache_directory", None) | |
| # Remaining kwargs are forwarded to PySQA submit_job | |
| self._pysqa_submission_kwargs = kwargs |
🤖 Prompt for AI Agents
In executorlib/task_scheduler/interactive/spawner_pysqa.py around lines 16 to
31, the PysqaSpawner __init__ must accept and store cache_directory and
worker_id for use by the worker directory logic and must not forward these
non-PySQA keys to PySQA submit_job; add optional parameters cache_directory:
Optional[str] = None and worker_id: Optional[str] = None to the constructor
signature, assign them to self.cache_directory and self.worker_id, and ensure
any call site that forwards kwargs to PySQA strips these keys from the kwargs
(or builds a new dict without them) before calling submit_job so only valid
PySQA arguments are passed.
| def _start_process_helper( | ||
| self, command_lst: list[str], queue_adapter: QueueAdapter | ||
| ) -> int: | ||
| hash = hashlib.md5(str(self).encode()).hexdigest() | ||
| if self._cwd is not None: | ||
| working_directory = os.path.join(self._cwd, hash) | ||
| else: | ||
| working_directory = os.path.abspath(hash) | ||
| return queue_adapter.submit_job( | ||
| command=" ".join(self.generate_command(command_lst=command_lst)), | ||
| working_directory=working_directory, | ||
| cores=int(self._cores * self._threads_per_core), | ||
| **self._pysqa_submission_kwargs, | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Stop hashing into cwd; derive working_directory from cache_directory + worker_id (addresses open TODO).
- Avoids insecure md5 and shadowing built-in
hash. - Produces stable per-worker directories under the cache dir as requested in the PR checklist.
- def _start_process_helper(
- self, command_lst: list[str], queue_adapter: QueueAdapter
- ) -> int:
- hash = hashlib.md5(str(self).encode()).hexdigest()
- if self._cwd is not None:
- working_directory = os.path.join(self._cwd, hash)
- else:
- working_directory = os.path.abspath(hash)
+ def _start_process_helper(
+ self, command_lst: list[str], queue_adapter: QueueAdapter
+ ) -> int:
+ # Prefer cache_directory + worker_id; fall back to a short blake2 hash
+ base_dir = (
+ self._cache_directory
+ or (self._cwd if self._cwd is not None else os.path.abspath("."))
+ )
+ if getattr(self, "_worker_id", None) is not None:
+ worker_dir = f"worker_{self._worker_id}"
+ else:
+ # stable-ish fallback without using insecure md5
+ worker_dir = "worker_" + hashlib.blake2s(
+ repr(self).encode(), digest_size=8
+ ).hexdigest()
+ working_directory = os.path.join(base_dir, worker_dir)
+ os.makedirs(working_directory, exist_ok=True)
return queue_adapter.submit_job(
command=" ".join(self.generate_command(command_lst=command_lst)),
working_directory=working_directory,
cores=int(self._cores * self._threads_per_core),
**self._pysqa_submission_kwargs,
)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| def _start_process_helper( | |
| self, command_lst: list[str], queue_adapter: QueueAdapter | |
| ) -> int: | |
| hash = hashlib.md5(str(self).encode()).hexdigest() | |
| if self._cwd is not None: | |
| working_directory = os.path.join(self._cwd, hash) | |
| else: | |
| working_directory = os.path.abspath(hash) | |
| return queue_adapter.submit_job( | |
| command=" ".join(self.generate_command(command_lst=command_lst)), | |
| working_directory=working_directory, | |
| cores=int(self._cores * self._threads_per_core), | |
| **self._pysqa_submission_kwargs, | |
| ) | |
| def _start_process_helper( | |
| self, command_lst: list[str], queue_adapter: QueueAdapter | |
| ) -> int: | |
| # Prefer cache_directory + worker_id; fall back to a short blake2 hash | |
| base_dir = ( | |
| self._cache_directory | |
| or (self._cwd if self._cwd is not None else os.path.abspath(".")) | |
| ) | |
| if getattr(self, "_worker_id", None) is not None: | |
| worker_dir = f"worker_{self._worker_id}" | |
| else: | |
| # stable-ish fallback without using insecure md5 | |
| worker_dir = "worker_" + hashlib.blake2s( | |
| repr(self).encode(), digest_size=8 | |
| ).hexdigest() | |
| working_directory = os.path.join(base_dir, worker_dir) | |
| os.makedirs(working_directory, exist_ok=True) | |
| return queue_adapter.submit_job( | |
| command=" ".join(self.generate_command(command_lst=command_lst)), | |
| working_directory=working_directory, | |
| cores=int(self._cores * self._threads_per_core), | |
| **self._pysqa_submission_kwargs, | |
| ) |
🧰 Tools
🪛 Ruff (0.12.2)
178-178: Probable use of insecure hash functions in hashlib: md5
(S324)
🤖 Prompt for AI Agents
In executorlib/task_scheduler/interactive/spawner_pysqa.py around lines 175-188,
replace the current MD5-based working_directory logic (which shadows the
built-in name `hash` and hashes self) with a stable per-worker path derived from
the configured cache directory and the worker id: build working_directory =
os.path.join(self._cache_directory, str(self._worker_id)) (or use the existing
cache dir attribute name if different), ensure the cache directory exists
(os.makedirs(..., exist_ok=True)), avoid using hashlib/md5 and do not shadow
built-ins, and keep the rest of the queue_adapter.submit_job call unchanged so
each worker uses a persistent, predictable subdirectory under the cache dir.
| try: | ||
| import pysqa | ||
|
|
||
| skip_pysqa_test = False | ||
| except ImportError: | ||
| skip_pysqa_test = True | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good: explicit PySQA gating flag. Also gate earlier tests that rely on PySQA.
The TestCacheExecutorPysqa class (Lines 50-54) relies on the file backend, which needs PySQA. Add skip_pysqa_test to that skipIf to avoid ImportError when PySQA isn’t installed.
Proposed change outside this hunk (around Line 50):
@unittest.skipIf(
skip_slurm_test or skip_mpi4py_test or skip_h5py_test or skip_pysqa_test,
"h5py or mpi4py or SLURM or pysqa are not installed; tests skipped.",
)🤖 Prompt for AI Agents
In tests/test_slurmclusterexecutor.py around lines 50 to 54, the
TestCacheExecutorPysqa class skip decorator needs to include the earlier defined
skip_pysqa_test flag so tests that depend on PySQA are skipped when it’s not
installed; update the @unittest.skipIf condition to add skip_pysqa_test to the
OR list and adjust the skip message to mention pysqa (e.g., "h5py or mpi4py or
SLURM or pysqa are not installed; tests skipped.").
| @unittest.skipIf(skip_pysqa_test, "pysqa is not installed, so the pysqa tests are skipped.") | ||
| class TestSlurmClusterInit(unittest.TestCase): | ||
| def test_slurm_cluster_block_allocation(self): | ||
| with self.assertRaises(ValueError): | ||
| SlurmClusterExecutor(block_allocation=True) | ||
|
|
||
| def test_slurm_cluster_file(self): | ||
| self.assertTrue(SlurmClusterExecutor(block_allocation=False)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Verification agent
🧩 Analysis chain
Fix test: block_allocation no longer raises ValueError when PySQA is available.
The code now instantiates a PySQA-backed BlockAllocationTaskScheduler; expecting ValueError is incorrect and will fail when PySQA is installed. Assert successful construction instead. Also, make the “file” test assert type rather than truthiness.
@unittest.skipIf(skip_pysqa_test, "pysqa is not installed, so the pysqa tests are skipped.")
class TestSlurmClusterInit(unittest.TestCase):
def test_slurm_cluster_block_allocation(self):
- with self.assertRaises(ValueError):
- SlurmClusterExecutor(block_allocation=True)
+ exe = SlurmClusterExecutor(block_allocation=True)
+ self.assertIsInstance(exe, SlurmClusterExecutor)
def test_slurm_cluster_file(self):
- self.assertTrue(SlurmClusterExecutor(block_allocation=False))
+ exe = SlurmClusterExecutor(block_allocation=False)
+ self.assertIsInstance(exe, SlurmClusterExecutor)If you specifically want to test behavior when PySQA is missing, add a separate test guarded by skipUnless(skip_pysqa_test) and assert ImportError on construction (since the import happens before any ValueError).
🏁 Script executed:
#!/bin/bash
set -e
# Locate the SlurmClusterExecutor class definition
rg -nP --type=py 'class\s+SlurmClusterExecutor' -C5
# Inspect its __init__ method for block_allocation handling
rg -nP --type=py 'def\s+__init__\s*\(.*block_allocation' -C5
# Search for all occurrences of block_allocation within this class
rg -nP --type=py --context 5 'block_allocation' --no-headingLength of output: 925
🏁 Script executed:
#!/bin/bash
set -e
# Show the beginning of the SlurmClusterExecutor implementation
sed -n '1,200p' executorlib/executor/slurm.py
# Find any __init__ definitions in slurm.py
rg -nP --type=py 'def\s+__init__' -n executorlib/executor/slurm.py -C3
# Search for block_allocation occurrences across the repository
rg -nP --type=py 'block_allocation' -n -C3Length of output: 93539
Update tests in tests/test_slurmclusterexecutor.py to reflect PySQA-backed behavior.
- In test_slurm_cluster_block_allocation, replace the ValueError assertion with:
exe = SlurmClusterExecutor(block_allocation=True) self.assertIsInstance(exe, SlurmClusterExecutor)
- In test_slurm_cluster_file, replace
assertTrue(...)with:exe = SlurmClusterExecutor(block_allocation=False) self.assertIsInstance(exe, SlurmClusterExecutor)
- Add a new test guarded by
@unittest.skipUnless(skip_pysqa_test, ...)that verifiesSlurmClusterExecutor(block_allocation=True)raisesImportErrorwhen PySQA is unavailable.
🤖 Prompt for AI Agents
tests/test_slurmclusterexecutor.py lines 120-127: update the tests to reflect
PySQA-backed behavior by replacing the ValueError expectation with creating the
executor and asserting its type (exe =
SlurmClusterExecutor(block_allocation=True); self.assertIsInstance(exe,
SlurmClusterExecutor)), replace the assertTrue call with the same pattern for
block_allocation=False, and add a new test function decorated with
@unittest.skipUnless(skip_pysqa_test, "pysqa is not installed, so the pysqa
tests are skipped.") that asserts SlurmClusterExecutor(block_allocation=True)
raises ImportError when PySQA is unavailable.
A different formulation would be
block_allocationfor theSlurmClusterExecutorand theFluxClusterExecutor.Open Tasks:
Summary by CodeRabbit