Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions executorlib/task_scheduler/file/queue_spawner.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,19 @@

def execute_with_pysqa(
command: list,
cache_directory: str,
task_dependent_lst: Optional[list[int]] = None,
file_name: Optional[str] = None,
resource_dict: Optional[dict] = None,
config_directory: Optional[str] = None,
backend: Optional[str] = None,
cache_directory: Optional[str] = None,
) -> Optional[int]:
"""
Execute a command by submitting it to the queuing system

Args:
command (list): The command to be executed.
cache_directory (str): The directory to store the HDF5 files.
task_dependent_lst (list): A list of subprocesses that the current subprocess depends on. Defaults to [].
file_name (str): Name of the HDF5 file which contains the Python function
resource_dict (dict): resource dictionary, which defines the resources used for the execution of the function.
Expand All @@ -30,7 +31,6 @@ def execute_with_pysqa(
}
config_directory (str, optional): path to the config directory.
backend (str, optional): name of the backend used to spawn tasks.
cache_directory (str): The directory to store the HDF5 files.

Returns:
int: queuing system ID
Expand All @@ -50,7 +50,9 @@ def execute_with_pysqa(
if "cwd" in resource_dict and resource_dict["cwd"] is not None:
cwd = resource_dict["cwd"]
else:
cwd = cache_directory
folder = command[-1].split("_i.h5")[0]
cwd = os.path.join(cache_directory, folder)
os.makedirs(cwd, exist_ok=True)
Comment on lines +53 to +55
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Add input validation and improve robustness of folder name extraction.

The current implementation has several potential issues:

  1. Missing input validation: No check if command is empty, which would cause IndexError on command[-1]
  2. Brittle string parsing: The code assumes command[-1] contains "_i.h5", but if it doesn't, the entire string becomes the folder name
  3. Path security: The folder name comes from user input and could contain path traversal characters like "../"

Consider this more robust implementation:

-            folder = command[-1].split("_i.h5")[0]
-            cwd = os.path.join(cache_directory, folder)
-            os.makedirs(cwd, exist_ok=True)
+            if not command:
+                raise ValueError("Command list cannot be empty")
+            
+            # Extract folder name safely
+            last_arg = command[-1]
+            if "_i.h5" in last_arg:
+                folder = last_arg.split("_i.h5")[0]
+            else:
+                # Fallback: use basename without extension
+                folder = os.path.splitext(os.path.basename(last_arg))[0]
+            
+            # Sanitize folder name to prevent path traversal
+            folder = os.path.basename(folder)  # Remove any path components
+            if not folder or folder in (".", ".."):
+                folder = "default_job"
+            
+            cwd = os.path.join(cache_directory, folder)
+            os.makedirs(cwd, exist_ok=True)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
folder = command[-1].split("_i.h5")[0]
cwd = os.path.join(cache_directory, folder)
os.makedirs(cwd, exist_ok=True)
if not command:
raise ValueError("Command list cannot be empty")
# Extract folder name safely
last_arg = command[-1]
if "_i.h5" in last_arg:
folder = last_arg.split("_i.h5")[0]
else:
# Fallback: use basename without extension
folder = os.path.splitext(os.path.basename(last_arg))[0]
# Sanitize folder name to prevent path traversal
folder = os.path.basename(folder) # Remove any path components
if not folder or folder in (".", ".."):
folder = "default_job"
cwd = os.path.join(cache_directory, folder)
os.makedirs(cwd, exist_ok=True)
🤖 Prompt for AI Agents
In executorlib/task_scheduler/file/queue_spawner.py around lines 53 to 55, add
validation to ensure the command list is not empty before accessing command[-1]
to prevent IndexError. Check if the last element contains the expected "_i.h5"
substring before splitting; if not, handle it gracefully or raise an error.
Sanitize the extracted folder name to remove or neutralize any path traversal
characters like "../" to avoid security risks. Use a safe method to construct
the folder path and then create the directory with os.makedirs as before.

submit_kwargs = {
"command": " ".join(command),
"dependency_list": [str(qid) for qid in task_dependent_lst],
Expand Down
14 changes: 14 additions & 0 deletions tests/test_fluxclusterexecutor.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,20 @@ def test_executor(self):
fs1 = exe.submit(mpi_funct, 1)
self.assertFalse(fs1.done())
self.assertEqual(fs1.result(), [(1, 2, 0), (1, 2, 1)])
self.assertEqual(len(os.listdir("executorlib_cache")), 4)
self.assertTrue(fs1.done())

def test_executor_no_cwd(self):
with FluxClusterExecutor(
resource_dict={"cores": 2},
block_allocation=False,
cache_directory="executorlib_cache",
) as exe:
cloudpickle_register(ind=1)
fs1 = exe.submit(mpi_funct, 1)
self.assertFalse(fs1.done())
self.assertEqual(fs1.result(), [(1, 2, 0), (1, 2, 1)])
self.assertEqual(len(os.listdir("executorlib_cache")), 2)
self.assertTrue(fs1.done())

def tearDown(self):
Expand Down
Loading