From eb3f2e683b779fb01c92c1027d11521b8f5a384b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Sun, 9 Mar 2025 17:24:20 +0100 Subject: [PATCH 1/8] Create cache directory --- executorlib/cache/shared.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/executorlib/cache/shared.py b/executorlib/cache/shared.py index d7eea67c..3996fb20 100644 --- a/executorlib/cache/shared.py +++ b/executorlib/cache/shared.py @@ -106,8 +106,9 @@ def execute_tasks_h5( resource_dict=task_resource_dict, ) if task_key not in memory_dict: - if task_key + ".h5out" not in os.listdir(cache_directory): - file_name = os.path.join(cache_directory, task_key + ".h5in") + if task_key not in os.listdir(cache_directory) and "cache.h5out" not in os.listdir(os.path.join(cache_directory, task_key)): + os.makedirs(os.path.join(cache_directory, task_key), exist_ok=True) + file_name = os.path.join(cache_directory, task_key, "cache.h5in") dump(file_name=file_name, data_dict=data_dict) if not disable_dependencies: task_dependent_lst = [ @@ -131,10 +132,10 @@ def execute_tasks_h5( resource_dict=task_resource_dict, config_directory=pysqa_config_directory, backend=backend, - cache_directory=cache_directory, + cache_directory=os.path.join(cache_directory, task_key), ) file_name_dict[task_key] = os.path.join( - cache_directory, task_key + ".h5out" + cache_directory, task_key, "cache.h5out" ) memory_dict[task_key] = task_dict["future"] future_queue.task_done() From 34b40b11289331dcae823c3d32aabc0b4536f48d Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Sun, 9 Mar 2025 16:24:47 +0000 Subject: [PATCH 2/8] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- executorlib/cache/shared.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/executorlib/cache/shared.py b/executorlib/cache/shared.py index 3996fb20..39b0002f 100644 --- a/executorlib/cache/shared.py +++ b/executorlib/cache/shared.py @@ -106,9 +106,13 @@ def execute_tasks_h5( resource_dict=task_resource_dict, ) if task_key not in memory_dict: - if task_key not in os.listdir(cache_directory) and "cache.h5out" not in os.listdir(os.path.join(cache_directory, task_key)): + if task_key not in os.listdir( + cache_directory + ) and "cache.h5out" not in os.listdir( + os.path.join(cache_directory, task_key) + ): os.makedirs(os.path.join(cache_directory, task_key), exist_ok=True) - file_name = os.path.join(cache_directory, task_key, "cache.h5in") + file_name = os.path.join(cache_directory, task_key, "cache.h5in") dump(file_name=file_name, data_dict=data_dict) if not disable_dependencies: task_dependent_lst = [ From 71d2023acc73381654b4395f7694b7fd956227a3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Sun, 9 Mar 2025 17:43:18 +0100 Subject: [PATCH 3/8] fixes --- executorlib/cache/shared.py | 9 ++++----- executorlib/standalone/hdf.py | 5 +++-- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/executorlib/cache/shared.py b/executorlib/cache/shared.py index 39b0002f..a9bbae64 100644 --- a/executorlib/cache/shared.py +++ b/executorlib/cache/shared.py @@ -106,10 +106,9 @@ def execute_tasks_h5( resource_dict=task_resource_dict, ) if task_key not in memory_dict: - if task_key not in os.listdir( - cache_directory - ) and "cache.h5out" not in os.listdir( - os.path.join(cache_directory, task_key) + if not ( + task_key in os.listdir(cache_directory) and + "cache.h5out" in os.listdir(os.path.join(cache_directory, task_key)) ): os.makedirs(os.path.join(cache_directory, task_key), exist_ok=True) file_name = os.path.join(cache_directory, task_key, "cache.h5in") @@ -195,7 +194,7 @@ def _check_task_output( Future: The updated future object. """ - file_name = os.path.join(cache_directory, task_key + ".h5out") + file_name = os.path.join(cache_directory, task_key, "cache.h5out") if not os.path.exists(file_name): return future_obj exec_flag, result = get_output(file_name=file_name) diff --git a/executorlib/standalone/hdf.py b/executorlib/standalone/hdf.py index 9fb1b744..024c2ef7 100644 --- a/executorlib/standalone/hdf.py +++ b/executorlib/standalone/hdf.py @@ -104,8 +104,9 @@ def get_queue_id(file_name: Optional[str]) -> Optional[int]: def get_cache_data(cache_directory: str) -> list[dict]: file_lst = [] - for file_name in os.listdir(cache_directory): - with h5py.File(os.path.join(cache_directory, file_name), "r") as hdf: + for task_key in os.listdir(cache_directory): + file_name = os.path.join(cache_directory, task_key, "cache.h5in") + with h5py.File(file_name, "r") as hdf: file_content_dict = { key: cloudpickle.loads(np.void(hdf["/" + key])) for key in group_dict.values() From eceb2a62d1027a45be55856fe41e6203921dd140 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Sun, 9 Mar 2025 16:43:30 +0000 Subject: [PATCH 4/8] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- executorlib/cache/shared.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/executorlib/cache/shared.py b/executorlib/cache/shared.py index a9bbae64..7e15764c 100644 --- a/executorlib/cache/shared.py +++ b/executorlib/cache/shared.py @@ -107,8 +107,9 @@ def execute_tasks_h5( ) if task_key not in memory_dict: if not ( - task_key in os.listdir(cache_directory) and - "cache.h5out" in os.listdir(os.path.join(cache_directory, task_key)) + task_key in os.listdir(cache_directory) + and "cache.h5out" + in os.listdir(os.path.join(cache_directory, task_key)) ): os.makedirs(os.path.join(cache_directory, task_key), exist_ok=True) file_name = os.path.join(cache_directory, task_key, "cache.h5in") From 5e80e91af4f97b7fad588504cf9631d3d5724f06 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Sun, 9 Mar 2025 17:58:33 +0100 Subject: [PATCH 5/8] more fixes --- executorlib/interactive/shared.py | 6 +++--- executorlib/standalone/hdf.py | 20 +++++++++++--------- tests/test_cache_shared.py | 21 ++++++++++++--------- 3 files changed, 26 insertions(+), 21 deletions(-) diff --git a/executorlib/interactive/shared.py b/executorlib/interactive/shared.py index 1c39aaad..789afd75 100644 --- a/executorlib/interactive/shared.py +++ b/executorlib/interactive/shared.py @@ -148,9 +148,9 @@ def _execute_task_with_cache( fn_kwargs=task_dict["kwargs"], resource_dict=task_dict.get("resource_dict", {}), ) - os.makedirs(cache_directory, exist_ok=True) - file_name = os.path.join(cache_directory, task_key + ".h5out") - if task_key + ".h5out" not in os.listdir(cache_directory): + os.makedirs(os.path.join(cache_directory, task_key), exist_ok=True) + file_name = os.path.join(cache_directory, task_key, "cache.h5out") + if not (task_key in os.listdir(cache_directory) and "cache.h5out" in os.listdir(os.path.join(cache_directory, task_key))): f = task_dict.pop("future") if f.set_running_or_notify_cancel(): try: diff --git a/executorlib/standalone/hdf.py b/executorlib/standalone/hdf.py index 024c2ef7..aa765555 100644 --- a/executorlib/standalone/hdf.py +++ b/executorlib/standalone/hdf.py @@ -105,13 +105,15 @@ def get_queue_id(file_name: Optional[str]) -> Optional[int]: def get_cache_data(cache_directory: str) -> list[dict]: file_lst = [] for task_key in os.listdir(cache_directory): - file_name = os.path.join(cache_directory, task_key, "cache.h5in") - with h5py.File(file_name, "r") as hdf: - file_content_dict = { - key: cloudpickle.loads(np.void(hdf["/" + key])) - for key in group_dict.values() - if key in hdf - } - file_content_dict["filename"] = file_name - file_lst.append(file_content_dict) + file_name = os.path.join(cache_directory, task_key, "cache.h5out") + os.makedirs(os.path.join(cache_directory, task_key), exist_ok=True) + if os.path.exists(file_name): + with h5py.File(file_name, "r") as hdf: + file_content_dict = { + key: cloudpickle.loads(np.void(hdf["/" + key])) + for key in group_dict.values() + if key in hdf + } + file_content_dict["filename"] = file_name + file_lst.append(file_content_dict) return file_lst diff --git a/tests/test_cache_shared.py b/tests/test_cache_shared.py index 76c62dfb..46cbb718 100644 --- a/tests/test_cache_shared.py +++ b/tests/test_cache_shared.py @@ -31,7 +31,8 @@ def test_execute_function_mixed(self): fn_args=[1], fn_kwargs={"b": 2}, ) - file_name = os.path.join(cache_directory, task_key + ".h5in") + file_name = os.path.join(cache_directory, task_key, "cache.h5in") + os.makedirs(os.path.join(cache_directory, task_key), exist_ok=True) dump(file_name=file_name, data_dict=data_dict) backend_execute_task_in_file(file_name=file_name) future_obj = Future() @@ -41,11 +42,11 @@ def test_execute_function_mixed(self): self.assertTrue(future_obj.done()) self.assertEqual(future_obj.result(), 3) self.assertTrue( - get_runtime(file_name=os.path.join(cache_directory, task_key + ".h5out")) + get_runtime(file_name=os.path.join(cache_directory, task_key, "cache.h5out")) > 0.0 ) future_file_obj = FutureItem( - file_name=os.path.join(cache_directory, task_key + ".h5out") + file_name=os.path.join(cache_directory, task_key, "cache.h5out") ) self.assertTrue(future_file_obj.done()) self.assertEqual(future_file_obj.result(), 3) @@ -58,7 +59,8 @@ def test_execute_function_args(self): fn_args=[1, 2], fn_kwargs={}, ) - file_name = os.path.join(cache_directory, task_key + ".h5in") + file_name = os.path.join(cache_directory, task_key, "cache.h5in") + os.makedirs(os.path.join(cache_directory, task_key), exist_ok=True) dump(file_name=file_name, data_dict=data_dict) backend_execute_task_in_file(file_name=file_name) future_obj = Future() @@ -68,11 +70,11 @@ def test_execute_function_args(self): self.assertTrue(future_obj.done()) self.assertEqual(future_obj.result(), 3) self.assertTrue( - get_runtime(file_name=os.path.join(cache_directory, task_key + ".h5out")) + get_runtime(file_name=os.path.join(cache_directory, task_key, "cache.h5out")) > 0.0 ) future_file_obj = FutureItem( - file_name=os.path.join(cache_directory, task_key + ".h5out") + file_name=os.path.join(cache_directory, task_key, "cache.h5out") ) self.assertTrue(future_file_obj.done()) self.assertEqual(future_file_obj.result(), 3) @@ -85,7 +87,8 @@ def test_execute_function_kwargs(self): fn_args=[], fn_kwargs={"a": 1, "b": 2}, ) - file_name = os.path.join(cache_directory, task_key + ".h5in") + file_name = os.path.join(cache_directory, task_key, "cache.h5in") + os.makedirs(os.path.join(cache_directory, task_key), exist_ok=True) dump(file_name=file_name, data_dict=data_dict) backend_execute_task_in_file(file_name=file_name) future_obj = Future() @@ -95,11 +98,11 @@ def test_execute_function_kwargs(self): self.assertTrue(future_obj.done()) self.assertEqual(future_obj.result(), 3) self.assertTrue( - get_runtime(file_name=os.path.join(cache_directory, task_key + ".h5out")) + get_runtime(file_name=os.path.join(cache_directory, task_key, "cache.h5out")) > 0.0 ) future_file_obj = FutureItem( - file_name=os.path.join(cache_directory, task_key + ".h5out") + file_name=os.path.join(cache_directory, task_key, "cache.h5out") ) self.assertTrue(future_file_obj.done()) self.assertEqual(future_file_obj.result(), 3) From e3598cee33319f5021a0a859b89852ac4acaafba Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Sun, 9 Mar 2025 16:58:43 +0000 Subject: [PATCH 6/8] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- executorlib/interactive/shared.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/executorlib/interactive/shared.py b/executorlib/interactive/shared.py index 789afd75..86fa62dd 100644 --- a/executorlib/interactive/shared.py +++ b/executorlib/interactive/shared.py @@ -150,7 +150,10 @@ def _execute_task_with_cache( ) os.makedirs(os.path.join(cache_directory, task_key), exist_ok=True) file_name = os.path.join(cache_directory, task_key, "cache.h5out") - if not (task_key in os.listdir(cache_directory) and "cache.h5out" in os.listdir(os.path.join(cache_directory, task_key))): + if not ( + task_key in os.listdir(cache_directory) + and "cache.h5out" in os.listdir(os.path.join(cache_directory, task_key)) + ): f = task_dict.pop("future") if f.set_running_or_notify_cancel(): try: From 7ae04ec575a092acae3dfe524c37a6095bce9310 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Sun, 9 Mar 2025 18:24:05 +0100 Subject: [PATCH 7/8] set job name based on file name --- executorlib/cache/queue_spawner.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/executorlib/cache/queue_spawner.py b/executorlib/cache/queue_spawner.py index 86f93a22..088e0f6c 100644 --- a/executorlib/cache/queue_spawner.py +++ b/executorlib/cache/queue_spawner.py @@ -68,7 +68,7 @@ def execute_with_pysqa( if k in resource_dict: del resource_dict[k] if "job_name" not in resource_dict: - resource_dict["job_name"] = "pysqa" + resource_dict["job_name"] = os.path.basename(os.path.dirname(os.path.abspath(cwd))) submit_kwargs.update(resource_dict) queue_id = qa.submit_job(**submit_kwargs) dump(file_name=file_name, data_dict={"queue_id": queue_id}) From 11955c018beb3e10b7d4d5032fcf1e721b93a06d Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Sun, 9 Mar 2025 17:24:17 +0000 Subject: [PATCH 8/8] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- executorlib/cache/queue_spawner.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/executorlib/cache/queue_spawner.py b/executorlib/cache/queue_spawner.py index 088e0f6c..e7c1eeb8 100644 --- a/executorlib/cache/queue_spawner.py +++ b/executorlib/cache/queue_spawner.py @@ -68,7 +68,9 @@ def execute_with_pysqa( if k in resource_dict: del resource_dict[k] if "job_name" not in resource_dict: - resource_dict["job_name"] = os.path.basename(os.path.dirname(os.path.abspath(cwd))) + resource_dict["job_name"] = os.path.basename( + os.path.dirname(os.path.abspath(cwd)) + ) submit_kwargs.update(resource_dict) queue_id = qa.submit_job(**submit_kwargs) dump(file_name=file_name, data_dict={"queue_id": queue_id})