From be9be1b3933d8291d74bb88628b8d8ff2ebb9c24 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Thu, 3 Aug 2023 22:49:47 -0600 Subject: [PATCH 1/5] submit multiple tasks to a broker but execute only one --- tests/test_meta.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/tests/test_meta.py b/tests/test_meta.py index 869f9de3..6bf54051 100644 --- a/tests/test_meta.py +++ b/tests/test_meta.py @@ -65,3 +65,12 @@ def test_meta_executor(self): self.assertEqual(fs_2.result(), 2) self.assertTrue(fs_1.done()) self.assertTrue(fs_2.done()) + + def test_meta_executor_single(self): + with HPCExecutor(max_workers=1) as exe: + fs_1 = exe.submit(calc, 1) + fs_2 = exe.submit(calc, 2) + self.assertEqual(fs_1.result(), 1) + self.assertEqual(fs_2.result(), 2) + self.assertTrue(fs_1.done()) + self.assertTrue(fs_2.done()) From 3c5e2719bddbef82072caa015a296b9bd4e4aa3e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Thu, 3 Aug 2023 23:12:52 -0600 Subject: [PATCH 2/5] Add multi-step test to Executor --- tests/test_worker.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/tests/test_worker.py b/tests/test_worker.py index 63c04dad..1222467f 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -40,6 +40,15 @@ def test_pool_serial(self): self.assertEqual(len(p), 0) self.assertEqual(output.result(), np.array(4)) + def test_executor_multi_submission(self): + with Executor(cores=1) as p: + fs_1 = p.submit(calc, i=2) + fs_2 = p.submit(calc, i=2) + self.assertEqual(fs_1.result(), np.array(4)) + self.assertEqual(fs_2.result(), np.array(4)) + self.assertTrue(fs_1.done()) + self.assertTrue(fs_2.done()) + def test_shutdown(self): p = Executor(cores=1) fs1 = p.submit(sleep_one, i=2) From 1fa14a48921368571647143f25e970f35247dd16 Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Fri, 4 Aug 2023 10:44:02 -0600 Subject: [PATCH 3/5] series of fixes --- pympipool/interfaces/fluxbroker.py | 20 ++++------ pympipool/shared/broker.py | 62 +++++++++--------------------- tests/test_meta.py | 14 ++++--- 3 files changed, 34 insertions(+), 62 deletions(-) diff --git a/pympipool/interfaces/fluxbroker.py b/pympipool/interfaces/fluxbroker.py index 4d64c9ee..69663c10 100644 --- a/pympipool/interfaces/fluxbroker.py +++ b/pympipool/interfaces/fluxbroker.py @@ -5,7 +5,6 @@ from time import sleep from pympipool.shared.broker import ( - MetaExecutorFuture, _get_future_done, _execute_task_dict, ) @@ -219,16 +218,13 @@ def _get_executor_list( cwd=None, executor=None, ): - return [ - MetaExecutorFuture( - future=_get_future_done(), - executor=SingleTaskExecutor( - cores=cores_per_worker, - gpus_per_task=int(gpus_per_worker / cores_per_worker), - init_function=init_function, - cwd=cwd, - executor=executor, - ), + return { + _get_future_done(): SingleTaskExecutor( + cores=cores_per_worker, + gpus_per_task=int(gpus_per_worker / cores_per_worker), + init_function=init_function, + cwd=cwd, + executor=executor, ) for _ in range(max_workers) - ] + } diff --git a/pympipool/shared/broker.py b/pympipool/shared/broker.py index ec6f318e..bf7790d8 100644 --- a/pympipool/shared/broker.py +++ b/pympipool/shared/broker.py @@ -5,31 +5,6 @@ from pympipool.interfaces.taskexecutor import Executor -class MetaExecutorFuture(object): - def __init__(self, future, executor): - self.future = future - self.executor = executor - - @property - def _condition(self): - return self.future._condition - - @property - def _state(self): - return self.future._state - - @property - def _waiters(self): - return self.future._waiters - - def done(self): - return self.future.done() - - def submit(self, task_dict): - self.future = task_dict["future"] - self.executor._future_queue.put(task_dict) - - def executor_broker( future_queue, max_workers, @@ -71,12 +46,14 @@ def executor_broker( def _execute_task_dict(task_dict, meta_future_lst): if "fn" in task_dict.keys(): - meta_future = next(as_completed(meta_future_lst)) - meta_future.submit(task_dict=task_dict) + meta_future = next(as_completed(meta_future_lst.keys())) + executor = meta_future_lst.pop(meta_future) + executor._future_queue.put(task_dict) + meta_future_lst[task_dict["future"]] = executor return True elif "shutdown" in task_dict.keys() and task_dict["shutdown"]: - for meta in meta_future_lst: - meta.executor.shutdown(wait=task_dict["wait"]) + for executor in meta_future_lst.values(): + executor.shutdown(wait=task_dict["wait"]) return False else: raise ValueError("Unrecognized Task in task_dict: ", task_dict) @@ -94,23 +71,20 @@ def _get_executor_list( queue_adapter=None, queue_adapter_kwargs=None, ): - return [ - MetaExecutorFuture( - future=_get_future_done(), - executor=Executor( - cores=cores_per_worker, - gpus_per_task=int(gpus_per_worker / cores_per_worker), - oversubscribe=oversubscribe, - enable_flux_backend=enable_flux_backend, - enable_slurm_backend=enable_slurm_backend, - init_function=init_function, - cwd=cwd, - queue_adapter=queue_adapter, - queue_adapter_kwargs=queue_adapter_kwargs, - ), + return { + _get_future_done(): Executor( + cores=cores_per_worker, + gpus_per_task=int(gpus_per_worker / cores_per_worker), + oversubscribe=oversubscribe, + enable_flux_backend=enable_flux_backend, + enable_slurm_backend=enable_slurm_backend, + init_function=init_function, + cwd=cwd, + queue_adapter=queue_adapter, + queue_adapter_kwargs=queue_adapter_kwargs, ) for _ in range(max_workers) - ] + } def _get_future_done(): diff --git a/tests/test_meta.py b/tests/test_meta.py index 2fd47abb..47e55c71 100644 --- a/tests/test_meta.py +++ b/tests/test_meta.py @@ -31,12 +31,14 @@ def test_get_future_done(self): class TestMetaExecutorFuture(unittest.TestCase): def test_meta_executor_future(self): - meta_future = _get_executor_list(max_workers=1)[0] - self.assertTrue(isinstance(meta_future.future, Future)) - self.assertTrue(isinstance(meta_future.executor, Executor)) - self.assertTrue(meta_future.done()) - self.assertEqual(meta_future, next(as_completed([meta_future]))) - meta_future.submit(task_dict={"shutdown": True, "wait": True, "future": _get_future_done()}) + meta_future = _get_executor_list(max_workers=1) + future_obj = list(meta_future.keys())[0] + executor_obj = list(meta_future.values())[0] + self.assertTrue(isinstance(future_obj, Future)) + self.assertTrue(isinstance(executor_obj, Executor)) + self.assertTrue(future_obj.done()) + self.assertEqual(future_obj, next(as_completed(meta_future.keys()))) + executor_obj.shutdown(wait=True) def test_execute_task_dict(self): meta_future_lst = _get_executor_list(max_workers=1) From 9be04e94a508659ad9603adcd5d898b41346e13e Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Fri, 4 Aug 2023 10:51:12 -0600 Subject: [PATCH 4/5] Add additional test --- tests/test_meta.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/tests/test_meta.py b/tests/test_meta.py index 47e55c71..bcc069bb 100644 --- a/tests/test_meta.py +++ b/tests/test_meta.py @@ -54,6 +54,11 @@ def test_execute_task_dict(self): meta_future_lst=meta_future_lst )) + def test_execute_task_dict_error(self): + meta_future_lst = _get_executor_list(max_workers=1) + with self.assertRaises(ValueError): + _execute_task_dict(task_dict={}, meta_future_lst=meta_future_lst) + def test_executor_broker(self): q = Queue() f = Future() From 4b3d594ba09f0f932d706bf13a826eaad6f36e0e Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Fri, 4 Aug 2023 10:52:02 -0600 Subject: [PATCH 5/5] shutdown executor after test --- tests/test_meta.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/test_meta.py b/tests/test_meta.py index bcc069bb..ef72853c 100644 --- a/tests/test_meta.py +++ b/tests/test_meta.py @@ -58,6 +58,7 @@ def test_execute_task_dict_error(self): meta_future_lst = _get_executor_list(max_workers=1) with self.assertRaises(ValueError): _execute_task_dict(task_dict={}, meta_future_lst=meta_future_lst) + list(meta_future_lst.values())[0].shutdown(wait=True) def test_executor_broker(self): q = Queue()