diff --git a/pympipool/external_interfaces/meta.py b/pympipool/external_interfaces/meta.py index f90014da..8d294e54 100644 --- a/pympipool/external_interfaces/meta.py +++ b/pympipool/external_interfaces/meta.py @@ -132,9 +132,10 @@ def _executor_broker( except queue.Empty: sleep(sleep_interval) else: - if not _execute_task_dict( - task_dict=task_dict, meta_future_lst=meta_future_lst - ): + if _execute_task_dict(task_dict=task_dict, meta_future_lst=meta_future_lst): + future_queue.task_done() + else: + future_queue.task_done() break diff --git a/pympipool/shared_functions/external_interfaces.py b/pympipool/shared_functions/external_interfaces.py index 3014ea2d..53055b1b 100644 --- a/pympipool/shared_functions/external_interfaces.py +++ b/pympipool/shared_functions/external_interfaces.py @@ -22,6 +22,7 @@ def cancel_items_in_queue(que): item = que.get_nowait() if isinstance(item, dict) and "future" in item.keys(): item["future"].cancel() + que.task_done() except queue.Empty: break @@ -272,6 +273,7 @@ def _execute_parallel_tasks_loop(interface, future_queue): task_dict = future_queue.get() if "shutdown" in task_dict.keys() and task_dict["shutdown"]: interface.shutdown(wait=task_dict["wait"]) + future_queue.task_done() break elif "fn" in task_dict.keys() and "future" in task_dict.keys(): f = task_dict.pop("future") @@ -279,10 +281,15 @@ def _execute_parallel_tasks_loop(interface, future_queue): try: f.set_result(interface.send_and_receive_dict(input_dict=task_dict)) except Exception as thread_exeception: + interface.shutdown(wait=True) + future_queue.task_done() f.set_exception(exception=thread_exeception) raise thread_exeception + else: + future_queue.task_done() elif "fn" in task_dict.keys() and "init" in task_dict.keys(): interface.send_dict(input_dict=task_dict) + future_queue.task_done() def _execute_serial_tasks_loop( @@ -300,11 +307,13 @@ def _execute_serial_tasks_loop( for k, v in done_dict.items(): if k in future_dict.keys() and not future_dict[k].cancelled(): future_dict.pop(k).set_result(v) + future_queue.task_done() break elif "fn" in task_dict.keys() and "future" in task_dict.keys(): f = task_dict.pop("future") future_hash = interface.send_and_receive_dict(input_dict=task_dict) future_dict[future_hash] = f + future_queue.task_done() _update_future_dict( interface=interface, future_dict=future_dict, sleep_interval=sleep_interval ) diff --git a/tests/test_meta.py b/tests/test_meta.py index 8f605e60..4836dc85 100644 --- a/tests/test_meta.py +++ b/tests/test_meta.py @@ -52,6 +52,7 @@ def test_executor_broker(self): _executor_broker(future_queue=q, max_workers=1) self.assertTrue(f.done()) self.assertEqual(f.result(), 1) + q.join() class TestMetaExecutor(unittest.TestCase): diff --git a/tests/test_multitask.py b/tests/test_multitask.py index 536485a4..95b5d83b 100644 --- a/tests/test_multitask.py +++ b/tests/test_multitask.py @@ -53,6 +53,7 @@ def test_execute_task(self): enable_flux_backend=False ) self.assertEqual(f.result(), np.array(4)) + q.join() def test_pool_cancel(self): with PoolExecutor(max_workers=2, sleep_interval=0) as p: @@ -85,6 +86,7 @@ def test_cancel_task(self): ) self.assertTrue(fs1.done()) self.assertTrue(fs1.cancelled()) + q.join() def test_waiting(self): exe = PoolExecutor(max_workers=2) diff --git a/tests/test_queue.py b/tests/test_queue.py index 48af3266..bdd36acd 100644 --- a/tests/test_queue.py +++ b/tests/test_queue.py @@ -19,3 +19,4 @@ def test_cancel_items_in_queue(self): self.assertTrue(fs2.done()) with self.assertRaises(CancelledError): self.assertTrue(fs2.result()) + q.join() diff --git a/tests/test_worker.py b/tests/test_worker.py index b849300c..0fbe8ef5 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -88,7 +88,6 @@ def test_execute_task_failed_no_argument(self): f = Future() q = Queue() q.put({"fn": calc, 'args': (), "kwargs": {}, "future": f}) - q.put({"shutdown": True, "wait": True}) cloudpickle_register(ind=1) with self.assertRaises(TypeError): execute_parallel_tasks( @@ -97,12 +96,12 @@ def test_execute_task_failed_no_argument(self): oversubscribe=False, enable_flux_backend=False ) + q.join() def test_execute_task_failed_wrong_argument(self): f = Future() q = Queue() q.put({"fn": calc, 'args': (), "kwargs": {"j": 4}, "future": f}) - q.put({"shutdown": True, "wait": True}) cloudpickle_register(ind=1) with self.assertRaises(TypeError): execute_parallel_tasks( @@ -111,6 +110,7 @@ def test_execute_task_failed_wrong_argument(self): oversubscribe=False, enable_flux_backend=False ) + q.join() def test_execute_task(self): f = Future() @@ -125,6 +125,7 @@ def test_execute_task(self): enable_flux_backend=False ) self.assertEqual(f.result(), np.array(4)) + q.join() def test_execute_task_parallel(self): f = Future() @@ -139,3 +140,4 @@ def test_execute_task_parallel(self): enable_flux_backend=False ) self.assertEqual(f.result(), [np.array(4), np.array(4)]) + q.join() diff --git a/tests/test_worker_memory.py b/tests/test_worker_memory.py index 185d94e5..3780daa1 100644 --- a/tests/test_worker_memory.py +++ b/tests/test_worker_memory.py @@ -43,3 +43,4 @@ def test_execute_task(self): enable_flux_backend=False ) self.assertEqual(f.result(), np.array([5])) + q.join()