diff --git a/executorlib/cache/queue_spawner.py b/executorlib/cache/queue_spawner.py index 86f93a22..e7c1eeb8 100644 --- a/executorlib/cache/queue_spawner.py +++ b/executorlib/cache/queue_spawner.py @@ -68,7 +68,9 @@ def execute_with_pysqa( if k in resource_dict: del resource_dict[k] if "job_name" not in resource_dict: - resource_dict["job_name"] = "pysqa" + resource_dict["job_name"] = os.path.basename( + os.path.dirname(os.path.abspath(cwd)) + ) submit_kwargs.update(resource_dict) queue_id = qa.submit_job(**submit_kwargs) dump(file_name=file_name, data_dict={"queue_id": queue_id}) diff --git a/executorlib/cache/shared.py b/executorlib/cache/shared.py index d7eea67c..7e15764c 100644 --- a/executorlib/cache/shared.py +++ b/executorlib/cache/shared.py @@ -106,8 +106,13 @@ def execute_tasks_h5( resource_dict=task_resource_dict, ) if task_key not in memory_dict: - if task_key + ".h5out" not in os.listdir(cache_directory): - file_name = os.path.join(cache_directory, task_key + ".h5in") + if not ( + task_key in os.listdir(cache_directory) + and "cache.h5out" + in os.listdir(os.path.join(cache_directory, task_key)) + ): + os.makedirs(os.path.join(cache_directory, task_key), exist_ok=True) + file_name = os.path.join(cache_directory, task_key, "cache.h5in") dump(file_name=file_name, data_dict=data_dict) if not disable_dependencies: task_dependent_lst = [ @@ -131,10 +136,10 @@ def execute_tasks_h5( resource_dict=task_resource_dict, config_directory=pysqa_config_directory, backend=backend, - cache_directory=cache_directory, + cache_directory=os.path.join(cache_directory, task_key), ) file_name_dict[task_key] = os.path.join( - cache_directory, task_key + ".h5out" + cache_directory, task_key, "cache.h5out" ) memory_dict[task_key] = task_dict["future"] future_queue.task_done() @@ -190,7 +195,7 @@ def _check_task_output( Future: The updated future object. """ - file_name = os.path.join(cache_directory, task_key + ".h5out") + file_name = os.path.join(cache_directory, task_key, "cache.h5out") if not os.path.exists(file_name): return future_obj exec_flag, result = get_output(file_name=file_name) diff --git a/executorlib/interactive/shared.py b/executorlib/interactive/shared.py index 1c39aaad..86fa62dd 100644 --- a/executorlib/interactive/shared.py +++ b/executorlib/interactive/shared.py @@ -148,9 +148,12 @@ def _execute_task_with_cache( fn_kwargs=task_dict["kwargs"], resource_dict=task_dict.get("resource_dict", {}), ) - os.makedirs(cache_directory, exist_ok=True) - file_name = os.path.join(cache_directory, task_key + ".h5out") - if task_key + ".h5out" not in os.listdir(cache_directory): + os.makedirs(os.path.join(cache_directory, task_key), exist_ok=True) + file_name = os.path.join(cache_directory, task_key, "cache.h5out") + if not ( + task_key in os.listdir(cache_directory) + and "cache.h5out" in os.listdir(os.path.join(cache_directory, task_key)) + ): f = task_dict.pop("future") if f.set_running_or_notify_cancel(): try: diff --git a/executorlib/standalone/hdf.py b/executorlib/standalone/hdf.py index 9fb1b744..aa765555 100644 --- a/executorlib/standalone/hdf.py +++ b/executorlib/standalone/hdf.py @@ -104,13 +104,16 @@ def get_queue_id(file_name: Optional[str]) -> Optional[int]: def get_cache_data(cache_directory: str) -> list[dict]: file_lst = [] - for file_name in os.listdir(cache_directory): - with h5py.File(os.path.join(cache_directory, file_name), "r") as hdf: - file_content_dict = { - key: cloudpickle.loads(np.void(hdf["/" + key])) - for key in group_dict.values() - if key in hdf - } - file_content_dict["filename"] = file_name - file_lst.append(file_content_dict) + for task_key in os.listdir(cache_directory): + file_name = os.path.join(cache_directory, task_key, "cache.h5out") + os.makedirs(os.path.join(cache_directory, task_key), exist_ok=True) + if os.path.exists(file_name): + with h5py.File(file_name, "r") as hdf: + file_content_dict = { + key: cloudpickle.loads(np.void(hdf["/" + key])) + for key in group_dict.values() + if key in hdf + } + file_content_dict["filename"] = file_name + file_lst.append(file_content_dict) return file_lst diff --git a/tests/test_cache_shared.py b/tests/test_cache_shared.py index 76c62dfb..46cbb718 100644 --- a/tests/test_cache_shared.py +++ b/tests/test_cache_shared.py @@ -31,7 +31,8 @@ def test_execute_function_mixed(self): fn_args=[1], fn_kwargs={"b": 2}, ) - file_name = os.path.join(cache_directory, task_key + ".h5in") + file_name = os.path.join(cache_directory, task_key, "cache.h5in") + os.makedirs(os.path.join(cache_directory, task_key), exist_ok=True) dump(file_name=file_name, data_dict=data_dict) backend_execute_task_in_file(file_name=file_name) future_obj = Future() @@ -41,11 +42,11 @@ def test_execute_function_mixed(self): self.assertTrue(future_obj.done()) self.assertEqual(future_obj.result(), 3) self.assertTrue( - get_runtime(file_name=os.path.join(cache_directory, task_key + ".h5out")) + get_runtime(file_name=os.path.join(cache_directory, task_key, "cache.h5out")) > 0.0 ) future_file_obj = FutureItem( - file_name=os.path.join(cache_directory, task_key + ".h5out") + file_name=os.path.join(cache_directory, task_key, "cache.h5out") ) self.assertTrue(future_file_obj.done()) self.assertEqual(future_file_obj.result(), 3) @@ -58,7 +59,8 @@ def test_execute_function_args(self): fn_args=[1, 2], fn_kwargs={}, ) - file_name = os.path.join(cache_directory, task_key + ".h5in") + file_name = os.path.join(cache_directory, task_key, "cache.h5in") + os.makedirs(os.path.join(cache_directory, task_key), exist_ok=True) dump(file_name=file_name, data_dict=data_dict) backend_execute_task_in_file(file_name=file_name) future_obj = Future() @@ -68,11 +70,11 @@ def test_execute_function_args(self): self.assertTrue(future_obj.done()) self.assertEqual(future_obj.result(), 3) self.assertTrue( - get_runtime(file_name=os.path.join(cache_directory, task_key + ".h5out")) + get_runtime(file_name=os.path.join(cache_directory, task_key, "cache.h5out")) > 0.0 ) future_file_obj = FutureItem( - file_name=os.path.join(cache_directory, task_key + ".h5out") + file_name=os.path.join(cache_directory, task_key, "cache.h5out") ) self.assertTrue(future_file_obj.done()) self.assertEqual(future_file_obj.result(), 3) @@ -85,7 +87,8 @@ def test_execute_function_kwargs(self): fn_args=[], fn_kwargs={"a": 1, "b": 2}, ) - file_name = os.path.join(cache_directory, task_key + ".h5in") + file_name = os.path.join(cache_directory, task_key, "cache.h5in") + os.makedirs(os.path.join(cache_directory, task_key), exist_ok=True) dump(file_name=file_name, data_dict=data_dict) backend_execute_task_in_file(file_name=file_name) future_obj = Future() @@ -95,11 +98,11 @@ def test_execute_function_kwargs(self): self.assertTrue(future_obj.done()) self.assertEqual(future_obj.result(), 3) self.assertTrue( - get_runtime(file_name=os.path.join(cache_directory, task_key + ".h5out")) + get_runtime(file_name=os.path.join(cache_directory, task_key, "cache.h5out")) > 0.0 ) future_file_obj = FutureItem( - file_name=os.path.join(cache_directory, task_key + ".h5out") + file_name=os.path.join(cache_directory, task_key, "cache.h5out") ) self.assertTrue(future_file_obj.done()) self.assertEqual(future_file_obj.result(), 3)