From 1ca3cb30c674403fbd99d71f156a267141bb2b9a Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Thu, 27 Jul 2023 11:15:32 -0600 Subject: [PATCH 1/4] join queue at the end of each test --- tests/test_meta.py | 1 + tests/test_multitask.py | 2 ++ tests/test_queue.py | 1 + tests/test_worker.py | 4 ++++ tests/test_worker_memory.py | 1 + 5 files changed, 9 insertions(+) diff --git a/tests/test_meta.py b/tests/test_meta.py index 8f605e60..4836dc85 100644 --- a/tests/test_meta.py +++ b/tests/test_meta.py @@ -52,6 +52,7 @@ def test_executor_broker(self): _executor_broker(future_queue=q, max_workers=1) self.assertTrue(f.done()) self.assertEqual(f.result(), 1) + q.join() class TestMetaExecutor(unittest.TestCase): diff --git a/tests/test_multitask.py b/tests/test_multitask.py index 536485a4..95b5d83b 100644 --- a/tests/test_multitask.py +++ b/tests/test_multitask.py @@ -53,6 +53,7 @@ def test_execute_task(self): enable_flux_backend=False ) self.assertEqual(f.result(), np.array(4)) + q.join() def test_pool_cancel(self): with PoolExecutor(max_workers=2, sleep_interval=0) as p: @@ -85,6 +86,7 @@ def test_cancel_task(self): ) self.assertTrue(fs1.done()) self.assertTrue(fs1.cancelled()) + q.join() def test_waiting(self): exe = PoolExecutor(max_workers=2) diff --git a/tests/test_queue.py b/tests/test_queue.py index 48af3266..bdd36acd 100644 --- a/tests/test_queue.py +++ b/tests/test_queue.py @@ -19,3 +19,4 @@ def test_cancel_items_in_queue(self): self.assertTrue(fs2.done()) with self.assertRaises(CancelledError): self.assertTrue(fs2.result()) + q.join() diff --git a/tests/test_worker.py b/tests/test_worker.py index b849300c..bcc57caa 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -97,6 +97,7 @@ def test_execute_task_failed_no_argument(self): oversubscribe=False, enable_flux_backend=False ) + q.join() def test_execute_task_failed_wrong_argument(self): f = Future() @@ -111,6 +112,7 @@ def test_execute_task_failed_wrong_argument(self): oversubscribe=False, enable_flux_backend=False ) + q.join() def test_execute_task(self): f = Future() @@ -125,6 +127,7 @@ def test_execute_task(self): enable_flux_backend=False ) self.assertEqual(f.result(), np.array(4)) + q.join() def test_execute_task_parallel(self): f = Future() @@ -139,3 +142,4 @@ def test_execute_task_parallel(self): enable_flux_backend=False ) self.assertEqual(f.result(), [np.array(4), np.array(4)]) + q.join() diff --git a/tests/test_worker_memory.py b/tests/test_worker_memory.py index 185d94e5..3780daa1 100644 --- a/tests/test_worker_memory.py +++ b/tests/test_worker_memory.py @@ -43,3 +43,4 @@ def test_execute_task(self): enable_flux_backend=False ) self.assertEqual(f.result(), np.array([5])) + q.join() From c9e5d344ccbddf70279d3494984969b6088fc491 Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Thu, 27 Jul 2023 11:34:36 -0600 Subject: [PATCH 2/4] Mark tasks as done when completed --- pympipool/external_interfaces/meta.py | 5 ++++- pympipool/shared_functions/external_interfaces.py | 7 +++++++ 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/pympipool/external_interfaces/meta.py b/pympipool/external_interfaces/meta.py index f90014da..22400b40 100644 --- a/pympipool/external_interfaces/meta.py +++ b/pympipool/external_interfaces/meta.py @@ -132,9 +132,12 @@ def _executor_broker( except queue.Empty: sleep(sleep_interval) else: - if not _execute_task_dict( + if _execute_task_dict( task_dict=task_dict, meta_future_lst=meta_future_lst ): + future_queue.task_done() + else: + future_queue.task_done() break diff --git a/pympipool/shared_functions/external_interfaces.py b/pympipool/shared_functions/external_interfaces.py index 3014ea2d..fe4422a8 100644 --- a/pympipool/shared_functions/external_interfaces.py +++ b/pympipool/shared_functions/external_interfaces.py @@ -272,6 +272,7 @@ def _execute_parallel_tasks_loop(interface, future_queue): task_dict = future_queue.get() if "shutdown" in task_dict.keys() and task_dict["shutdown"]: interface.shutdown(wait=task_dict["wait"]) + future_queue.task_done() break elif "fn" in task_dict.keys() and "future" in task_dict.keys(): f = task_dict.pop("future") @@ -279,10 +280,14 @@ def _execute_parallel_tasks_loop(interface, future_queue): try: f.set_result(interface.send_and_receive_dict(input_dict=task_dict)) except Exception as thread_exeception: + future_queue.task_done() f.set_exception(exception=thread_exeception) raise thread_exeception + else: + future_queue.task_done() elif "fn" in task_dict.keys() and "init" in task_dict.keys(): interface.send_dict(input_dict=task_dict) + future_queue.task_done() def _execute_serial_tasks_loop( @@ -300,11 +305,13 @@ def _execute_serial_tasks_loop( for k, v in done_dict.items(): if k in future_dict.keys() and not future_dict[k].cancelled(): future_dict.pop(k).set_result(v) + future_queue.task_done() break elif "fn" in task_dict.keys() and "future" in task_dict.keys(): f = task_dict.pop("future") future_hash = interface.send_and_receive_dict(input_dict=task_dict) future_dict[future_hash] = f + future_queue.task_done() _update_future_dict( interface=interface, future_dict=future_dict, sleep_interval=sleep_interval ) From b8d67113a0beddcaa4c7b9ff0d169609baa5b9b3 Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Thu, 27 Jul 2023 11:48:16 -0600 Subject: [PATCH 3/4] Fixes --- pympipool/shared_functions/external_interfaces.py | 2 ++ tests/test_worker.py | 2 -- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pympipool/shared_functions/external_interfaces.py b/pympipool/shared_functions/external_interfaces.py index fe4422a8..53055b1b 100644 --- a/pympipool/shared_functions/external_interfaces.py +++ b/pympipool/shared_functions/external_interfaces.py @@ -22,6 +22,7 @@ def cancel_items_in_queue(que): item = que.get_nowait() if isinstance(item, dict) and "future" in item.keys(): item["future"].cancel() + que.task_done() except queue.Empty: break @@ -280,6 +281,7 @@ def _execute_parallel_tasks_loop(interface, future_queue): try: f.set_result(interface.send_and_receive_dict(input_dict=task_dict)) except Exception as thread_exeception: + interface.shutdown(wait=True) future_queue.task_done() f.set_exception(exception=thread_exeception) raise thread_exeception diff --git a/tests/test_worker.py b/tests/test_worker.py index bcc57caa..0fbe8ef5 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -88,7 +88,6 @@ def test_execute_task_failed_no_argument(self): f = Future() q = Queue() q.put({"fn": calc, 'args': (), "kwargs": {}, "future": f}) - q.put({"shutdown": True, "wait": True}) cloudpickle_register(ind=1) with self.assertRaises(TypeError): execute_parallel_tasks( @@ -103,7 +102,6 @@ def test_execute_task_failed_wrong_argument(self): f = Future() q = Queue() q.put({"fn": calc, 'args': (), "kwargs": {"j": 4}, "future": f}) - q.put({"shutdown": True, "wait": True}) cloudpickle_register(ind=1) with self.assertRaises(TypeError): execute_parallel_tasks( From 1dc4b8f97113d21a91804ea68b91c7b3e07f5146 Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Thu, 27 Jul 2023 11:48:40 -0600 Subject: [PATCH 4/4] black formatting --- pympipool/external_interfaces/meta.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/pympipool/external_interfaces/meta.py b/pympipool/external_interfaces/meta.py index 22400b40..8d294e54 100644 --- a/pympipool/external_interfaces/meta.py +++ b/pympipool/external_interfaces/meta.py @@ -132,9 +132,7 @@ def _executor_broker( except queue.Empty: sleep(sleep_interval) else: - if _execute_task_dict( - task_dict=task_dict, meta_future_lst=meta_future_lst - ): + if _execute_task_dict(task_dict=task_dict, meta_future_lst=meta_future_lst): future_queue.task_done() else: future_queue.task_done()