-
Notifications
You must be signed in to change notification settings - Fork 3
Add terminate_tasks_in_cache() function to kill remaining tasks #739
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
WalkthroughA new function, Changes
Sequence Diagram(s)sequenceDiagram
participant User
participant executorlib
participant queue_spawner
participant FileSystem
participant QueueSystem
User->>executorlib: import terminate_tasks_in_cache
executorlib->>queue_spawner: import terminate_tasks_in_cache (if available)
User->>queue_spawner: terminate_tasks_in_cache(cache_directory)
queue_spawner->>FileSystem: Recursively scan for *_i.h5 files
loop For each found file
queue_spawner->>queue_spawner: get_queue_id(file)
queue_spawner->>QueueSystem: terminate_with_pysqa(queue_id, config_directory, backend)
end
Poem
✨ Finishing Touches
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🧹 Nitpick comments (2)
executorlib/task_scheduler/file/queue_spawner.py (2)
130-131: Fix unused loop variable.The static analysis tool correctly identifies that the
foldervariable is not used within the loop body.- for root, folder, files in os.walk(cache_directory): + for root, _, files in os.walk(cache_directory):
131-131: Consider using a more robust file filtering approach.The current string slicing approach for filtering files ending with "_i.h5" works but could be improved for better readability and maintainability.
- hdf5_file_lst += [os.path.join(root, f) for f in files if f[-5:] == "_i.h5"] + hdf5_file_lst += [os.path.join(root, f) for f in files if f.endswith("_i.h5")]
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
executorlib/__init__.py(1 hunks)executorlib/task_scheduler/file/queue_spawner.py(1 hunks)
🧰 Additional context used
🧬 Code Graph Analysis (2)
executorlib/task_scheduler/file/queue_spawner.py (1)
executorlib/task_scheduler/file/hdf.py (1)
get_queue_id(98-112)
executorlib/__init__.py (1)
executorlib/task_scheduler/file/queue_spawner.py (1)
terminate_tasks_in_cache(116-138)
🪛 Ruff (0.12.2)
executorlib/task_scheduler/file/queue_spawner.py
130-130: Loop control variable folder not used within loop body
Rename unused folder to _folder
(B007)
🔇 Additional comments (1)
executorlib/__init__.py (1)
39-44: Well-implemented conditional import pattern.The conditional import follows Python best practices for optional functionality. The try-except block gracefully handles import failures, and the function is only exposed in the public API when successfully imported.
| def terminate_tasks_in_cache( | ||
| cache_directory: str, | ||
| config_directory: Optional[str] = None, | ||
| backend: Optional[str] = None, | ||
| ): | ||
| """ | ||
| Delete all jobs stored in the cache directory from the queuing system | ||
| Args: | ||
| cache_directory (str): The directory to store cache files. | ||
| config_directory (str, optional): path to the config directory. | ||
| backend (str, optional): name of the backend used to spawn tasks. | ||
| """ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Add input validation for cache_directory parameter.
The function doesn't validate that the cache_directory exists or is accessible, which could lead to unclear error messages.
def terminate_tasks_in_cache(
cache_directory: str,
config_directory: Optional[str] = None,
backend: Optional[str] = None,
):
"""
Delete all jobs stored in the cache directory from the queuing system
Args:
cache_directory (str): The directory to store cache files.
config_directory (str, optional): path to the config directory.
backend (str, optional): name of the backend used to spawn tasks.
"""
+ if not os.path.exists(cache_directory):
+ raise ValueError(f"Cache directory does not exist: {cache_directory}")
+ if not os.path.isdir(cache_directory):
+ raise ValueError(f"Cache directory is not a directory: {cache_directory}")Committable suggestion skipped: line range outside the PR's diff.
🤖 Prompt for AI Agents
In executorlib/task_scheduler/file/queue_spawner.py around lines 116 to 128, the
function terminate_tasks_in_cache lacks validation for the cache_directory
parameter. Add input validation to check if cache_directory exists and is
accessible before proceeding. If the directory does not exist or is not
accessible, raise a clear and descriptive exception to prevent unclear errors
later in the function.
| for f in hdf5_file_lst: | ||
| terminate_with_pysqa( | ||
| queue_id=get_queue_id(f), | ||
| config_directory=config_directory, | ||
| backend=backend, | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Add error handling for individual task termination failures.
The function doesn't handle cases where get_queue_id returns None or where terminate_with_pysqa might fail for individual files. This could cause the entire operation to fail when only some tasks have issues.
for f in hdf5_file_lst:
- terminate_with_pysqa(
- queue_id=get_queue_id(f),
- config_directory=config_directory,
- backend=backend,
- )
+ try:
+ queue_id = get_queue_id(f)
+ if queue_id is not None:
+ terminate_with_pysqa(
+ queue_id=queue_id,
+ config_directory=config_directory,
+ backend=backend,
+ )
+ except Exception:
+ # Continue processing other files even if one fails
+ pass📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| for f in hdf5_file_lst: | |
| terminate_with_pysqa( | |
| queue_id=get_queue_id(f), | |
| config_directory=config_directory, | |
| backend=backend, | |
| ) | |
| for f in hdf5_file_lst: | |
| try: | |
| queue_id = get_queue_id(f) | |
| if queue_id is not None: | |
| terminate_with_pysqa( | |
| queue_id=queue_id, | |
| config_directory=config_directory, | |
| backend=backend, | |
| ) | |
| except Exception: | |
| # Continue processing other files even if one fails | |
| pass |
🤖 Prompt for AI Agents
In executorlib/task_scheduler/file/queue_spawner.py around lines 133 to 138, add
error handling inside the loop to manage cases where get_queue_id returns None
or terminate_with_pysqa raises an exception. For each file, check if
get_queue_id returns a valid ID before calling terminate_with_pysqa, and wrap
the termination call in a try-except block to catch and log errors without
stopping the entire loop.
…erminate_tasks_in_cache # Conflicts: # executorlib/task_scheduler/file/queue_spawner.py
for more information, see https://pre-commit.ci
…erminate_tasks_in_cache
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #739 +/- ##
==========================================
- Coverage 96.96% 96.85% -0.12%
==========================================
Files 31 31
Lines 1385 1398 +13
==========================================
+ Hits 1343 1354 +11
- Misses 42 44 +2 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (1)
tests/test_fluxclusterexecutor.py (1)
96-102: Good test setup, but consider adding verification of internal calls.The test properly sets up the cache directory with a test HDF5 file containing queue_id data, which matches the expected workflow. However, consider enhancing the test to verify that the termination actually occurs:
+ @unittest.mock.patch('executorlib.task_scheduler.file.queue_spawner.terminate_with_pysqa') + def test_terminate_tasks_in_cache(self, mock_terminate): file = os.path.join("executorlib_cache", "test_i.h5") dump(file_name=file, data_dict={"queue_id": 1}) - self.assertIsNone(terminate_tasks_in_cache( + + result = terminate_tasks_in_cache( cache_directory="executorlib_cache", backend="flux", - )) + ) + + self.assertIsNone(result) + mock_terminate.assert_called_once_with( + queue_id=1, + config_directory=None, + backend="flux" + )This would verify that
terminate_with_pysqais called with the correct queue_id extracted from the cache file.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
executorlib/task_scheduler/file/queue_spawner.py(3 hunks)tests/test_fluxclusterexecutor.py(3 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- executorlib/task_scheduler/file/queue_spawner.py
🧰 Additional context used
🧬 Code Graph Analysis (1)
tests/test_fluxclusterexecutor.py (2)
executorlib/task_scheduler/file/queue_spawner.py (2)
terminate_with_pysqa(94-115)terminate_tasks_in_cache(118-142)executorlib/task_scheduler/file/hdf.py (1)
dump(11-28)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (3)
- GitHub Check: benchmark (ubuntu-latest, 3.13, .ci_support/environment-openmpi.yml)
- GitHub Check: benchmark (ubuntu-latest, 3.13, .ci_support/environment-mpich.yml)
- GitHub Check: notebooks_integration
🔇 Additional comments (1)
tests/test_fluxclusterexecutor.py (1)
12-12: LGTM: Import statement is correctly placed.The import statement is properly structured within the try-except block to handle missing dependencies gracefully.
| def test_terminate_with_pysqa(self): | ||
| self.assertIsNone(terminate_with_pysqa(queue_id=1, backend="flux")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Enhance test coverage for better validation.
The current test only verifies that the function executes without errors, but doesn't validate the actual termination behavior. Consider:
- Mock the QueueAdapter: Use mocking to verify that
get_status_of_jobanddelete_jobare called appropriately - Test edge cases: Test scenarios where the job doesn't exist or is already finished
- Parameterize backends: Test both "slurm" and "flux" backends if possible
Example improvement:
+ @unittest.mock.patch('executorlib.task_scheduler.file.queue_spawner.QueueAdapter')
+ def test_terminate_with_pysqa(self, mock_qa_class):
+ mock_qa = mock_qa_class.return_value
+ mock_qa.get_status_of_job.return_value = "running"
+
+ result = terminate_with_pysqa(queue_id=1, backend="flux")
+
+ self.assertIsNone(result)
+ mock_qa.get_status_of_job.assert_called_once_with(process_id=1)
+ mock_qa.delete_job.assert_called_once_with(process_id=1)
- def test_terminate_with_pysqa(self):
- self.assertIsNone(terminate_with_pysqa(queue_id=1, backend="flux"))📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| def test_terminate_with_pysqa(self): | |
| self.assertIsNone(terminate_with_pysqa(queue_id=1, backend="flux")) | |
| @unittest.mock.patch('executorlib.task_scheduler.file.queue_spawner.QueueAdapter') | |
| def test_terminate_with_pysqa(self, mock_qa_class): | |
| mock_qa = mock_qa_class.return_value | |
| mock_qa.get_status_of_job.return_value = "running" | |
| result = terminate_with_pysqa(queue_id=1, backend="flux") | |
| self.assertIsNone(result) | |
| mock_qa.get_status_of_job.assert_called_once_with(process_id=1) | |
| mock_qa.delete_job.assert_called_once_with(process_id=1) |
🤖 Prompt for AI Agents
In tests/test_fluxclusterexecutor.py around lines 62 to 63, the test for
terminate_with_pysqa only checks for no errors but does not verify the
function's behavior. Enhance the test by mocking the QueueAdapter to assert that
get_status_of_job and delete_job are called correctly. Add tests for edge cases
such as when the job does not exist or is already finished. Also, parameterize
the test to cover both "slurm" and "flux" backends to ensure broader coverage.
Summary by CodeRabbit