diff --git a/executorlib/task_scheduler/file/queue_spawner.py b/executorlib/task_scheduler/file/queue_spawner.py index ac6ecc5b..ec3c34f1 100644 --- a/executorlib/task_scheduler/file/queue_spawner.py +++ b/executorlib/task_scheduler/file/queue_spawner.py @@ -10,18 +10,19 @@ def execute_with_pysqa( command: list, + cache_directory: str, task_dependent_lst: Optional[list[int]] = None, file_name: Optional[str] = None, resource_dict: Optional[dict] = None, config_directory: Optional[str] = None, backend: Optional[str] = None, - cache_directory: Optional[str] = None, ) -> Optional[int]: """ Execute a command by submitting it to the queuing system Args: command (list): The command to be executed. + cache_directory (str): The directory to store the HDF5 files. task_dependent_lst (list): A list of subprocesses that the current subprocess depends on. Defaults to []. file_name (str): Name of the HDF5 file which contains the Python function resource_dict (dict): resource dictionary, which defines the resources used for the execution of the function. @@ -30,7 +31,6 @@ def execute_with_pysqa( } config_directory (str, optional): path to the config directory. backend (str, optional): name of the backend used to spawn tasks. - cache_directory (str): The directory to store the HDF5 files. Returns: int: queuing system ID @@ -50,7 +50,9 @@ def execute_with_pysqa( if "cwd" in resource_dict and resource_dict["cwd"] is not None: cwd = resource_dict["cwd"] else: - cwd = cache_directory + folder = command[-1].split("_i.h5")[0] + cwd = os.path.join(cache_directory, folder) + os.makedirs(cwd, exist_ok=True) submit_kwargs = { "command": " ".join(command), "dependency_list": [str(qid) for qid in task_dependent_lst], diff --git a/tests/test_fluxclusterexecutor.py b/tests/test_fluxclusterexecutor.py index dab9985e..695f0709 100644 --- a/tests/test_fluxclusterexecutor.py +++ b/tests/test_fluxclusterexecutor.py @@ -41,6 +41,20 @@ def test_executor(self): fs1 = exe.submit(mpi_funct, 1) self.assertFalse(fs1.done()) self.assertEqual(fs1.result(), [(1, 2, 0), (1, 2, 1)]) + self.assertEqual(len(os.listdir("executorlib_cache")), 4) + self.assertTrue(fs1.done()) + + def test_executor_no_cwd(self): + with FluxClusterExecutor( + resource_dict={"cores": 2}, + block_allocation=False, + cache_directory="executorlib_cache", + ) as exe: + cloudpickle_register(ind=1) + fs1 = exe.submit(mpi_funct, 1) + self.assertFalse(fs1.done()) + self.assertEqual(fs1.result(), [(1, 2, 0), (1, 2, 1)]) + self.assertEqual(len(os.listdir("executorlib_cache")), 2) self.assertTrue(fs1.done()) def tearDown(self):