diff --git a/.github/workflows/unittest-mpich.yml b/.github/workflows/unittest-mpich.yml index c8312a9c..e73ced83 100644 --- a/.github/workflows/unittest-mpich.yml +++ b/.github/workflows/unittest-mpich.yml @@ -59,4 +59,4 @@ jobs: - name: Test shell: bash -l {0} timeout-minutes: 30 - run: python -m unittest discover tests + run: for f in $(ls tests/test_*.py); do python -m unittest $f; done diff --git a/.github/workflows/unittest-openmpi.yml b/.github/workflows/unittest-openmpi.yml index 8d377c89..16ebbae0 100644 --- a/.github/workflows/unittest-openmpi.yml +++ b/.github/workflows/unittest-openmpi.yml @@ -59,7 +59,7 @@ jobs: - name: Test shell: bash -l {0} timeout-minutes: 30 - run: coverage run --omit pympipool/_version.py -m unittest discover tests + run: for f in $(ls tests/test_*.py); do coverage run --omit pympipool/_version.py -m unittest $f; done env: OMPI_MCA_plm: 'isolated' OMPI_MCA_rmaps_base_oversubscribe: 'yes' diff --git a/.github/workflows/unittest-win.yml b/.github/workflows/unittest-win.yml index 8ea85d67..d80489b2 100644 --- a/.github/workflows/unittest-win.yml +++ b/.github/workflows/unittest-win.yml @@ -34,4 +34,4 @@ jobs: - name: Test shell: bash -l {0} timeout-minutes: 30 - run: python -m unittest discover tests + run: for f in $(ls tests/test_*.py); do python -m unittest $f; done diff --git a/pympipool/__init__.py b/pympipool/__init__.py index fa9a3568..5fc2398c 100644 --- a/pympipool/__init__.py +++ b/pympipool/__init__.py @@ -6,6 +6,7 @@ receive_instruction, ) from pympipool.external_interfaces.executor import Executor, PoolExecutor +from pympipool.external_interfaces.meta import MetaExecutor from pympipool.external_interfaces.pool import Pool, MPISpawnPool from pympipool.external_interfaces.thread import RaisingThread from pympipool.shared_functions.external_interfaces import cancel_items_in_queue diff --git a/pympipool/external_interfaces/meta.py b/pympipool/external_interfaces/meta.py new file mode 100644 index 00000000..f90014da --- /dev/null +++ b/pympipool/external_interfaces/meta.py @@ -0,0 +1,188 @@ +from concurrent.futures import as_completed, Executor as FutureExecutor, Future +import queue +from threading import Thread +from time import sleep + +from pympipool.external_interfaces.executor import Executor +from pympipool.shared_functions.external_interfaces import cancel_items_in_queue + + +class MetaExecutorFuture(object): + def __init__(self, future, executor): + self.future = future + self.executor = executor + + @property + def _condition(self): + return self.future._condition + + @property + def _state(self): + return self.future._state + + @property + def _waiters(self): + return self.future._waiters + + def done(self): + return self.future.done() + + def submit(self, task_dict): + self.future = task_dict["future"] + self.executor._future_queue.put(task_dict) + + +class MetaExecutor(FutureExecutor): + def __init__( + self, + max_workers, + cores_per_worker=1, + gpus_per_worker=0, + oversubscribe=False, + enable_flux_backend=False, + enable_slurm_backend=False, + init_function=None, + cwd=None, + sleep_interval=0.1, + queue_adapter=None, + queue_adapter_kwargs=None, + ): + self._future_queue = queue.Queue() + self._process = Thread( + target=_executor_broker, + kwargs={ + "future_queue": self._future_queue, + "max_workers": max_workers, + "cores_per_worker": cores_per_worker, + "gpus_per_worker": gpus_per_worker, + "oversubscribe": oversubscribe, + "enable_flux_backend": enable_flux_backend, + "enable_slurm_backend": enable_slurm_backend, + "init_function": init_function, + "cwd": cwd, + "sleep_interval": sleep_interval, + "queue_adapter": queue_adapter, + "queue_adapter_kwargs": queue_adapter_kwargs, + }, + ) + self._process.start() + + def submit(self, fn, *args, **kwargs): + """Submits a callable to be executed with the given arguments. + + Schedules the callable to be executed as fn(*args, **kwargs) and returns + a Future instance representing the execution of the callable. + + Returns: + A Future representing the given call. + """ + f = Future() + self._future_queue.put({"fn": fn, "args": args, "kwargs": kwargs, "future": f}) + return f + + def shutdown(self, wait=True, *, cancel_futures=False): + """Clean-up the resources associated with the Executor. + + It is safe to call this method several times. Otherwise, no other + methods can be called after this one. + + Args: + wait: If True then shutdown will not return until all running + futures have finished executing and the resources used by the + parallel_executors have been reclaimed. + cancel_futures: If True then shutdown will cancel all pending + futures. Futures that are completed or running will not be + cancelled. + """ + if cancel_futures: + cancel_items_in_queue(que=self._future_queue) + self._future_queue.put({"shutdown": True, "wait": wait}) + self._process.join() + + +def _executor_broker( + future_queue, + max_workers, + cores_per_worker=1, + gpus_per_worker=0, + oversubscribe=False, + enable_flux_backend=False, + enable_slurm_backend=False, + init_function=None, + cwd=None, + sleep_interval=0.1, + queue_adapter=None, + queue_adapter_kwargs=None, +): + meta_future_lst = _get_executor_list( + max_workers=max_workers, + cores_per_worker=cores_per_worker, + gpus_per_worker=gpus_per_worker, + oversubscribe=oversubscribe, + enable_flux_backend=enable_flux_backend, + enable_slurm_backend=enable_slurm_backend, + init_function=init_function, + cwd=cwd, + queue_adapter=queue_adapter, + queue_adapter_kwargs=queue_adapter_kwargs, + ) + while True: + try: + task_dict = future_queue.get_nowait() + except queue.Empty: + sleep(sleep_interval) + else: + if not _execute_task_dict( + task_dict=task_dict, meta_future_lst=meta_future_lst + ): + break + + +def _execute_task_dict(task_dict, meta_future_lst): + if "fn" in task_dict.keys(): + meta_future = next(as_completed(meta_future_lst)) + meta_future.submit(task_dict=task_dict) + return True + elif "shutdown" in task_dict.keys() and task_dict["shutdown"]: + for meta in meta_future_lst: + meta.executor.shutdown(wait=task_dict["wait"]) + return False + else: + raise ValueError("Unrecognized Task in task_dict: ", task_dict) + + +def _get_executor_list( + max_workers, + cores_per_worker=1, + gpus_per_worker=0, + oversubscribe=False, + enable_flux_backend=False, + enable_slurm_backend=False, + init_function=None, + cwd=None, + queue_adapter=None, + queue_adapter_kwargs=None, +): + return [ + MetaExecutorFuture( + future=_get_future_done(), + executor=Executor( + cores=cores_per_worker, + gpus_per_task=int(gpus_per_worker / cores_per_worker), + oversubscribe=oversubscribe, + enable_flux_backend=enable_flux_backend, + enable_slurm_backend=enable_slurm_backend, + init_function=init_function, + cwd=cwd, + queue_adapter=queue_adapter, + queue_adapter_kwargs=queue_adapter_kwargs, + ), + ) + for _ in range(max_workers) + ] + + +def _get_future_done(): + f = Future() + f.set_result(True) + return f diff --git a/tests/test_meta.py b/tests/test_meta.py new file mode 100644 index 00000000..8f605e60 --- /dev/null +++ b/tests/test_meta.py @@ -0,0 +1,65 @@ +from concurrent.futures import as_completed, Future, Executor +from queue import Queue +import unittest +from pympipool.external_interfaces.meta import ( + _executor_broker, + _execute_task_dict, + _get_future_done, + _get_executor_list, + MetaExecutor, +) + + +def calc(i): + return i + + +class TestFutureCreation(unittest.TestCase): + def test_get_future_done(self): + f = _get_future_done() + self.assertTrue(isinstance(f, Future)) + self.assertTrue(f.done()) + + +class TestMetaExecutorFuture(unittest.TestCase): + def test_meta_executor_future(self): + meta_future = _get_executor_list(max_workers=1)[0] + self.assertTrue(isinstance(meta_future.future, Future)) + self.assertTrue(isinstance(meta_future.executor, Executor)) + self.assertTrue(meta_future.done()) + self.assertEqual(meta_future, next(as_completed([meta_future]))) + meta_future.submit(task_dict={"shutdown": True, "wait": True, "future": _get_future_done()}) + + def test_execute_task_dict(self): + meta_future_lst = _get_executor_list(max_workers=1) + f = Future() + self.assertTrue(_execute_task_dict( + task_dict={"fn": calc, "args": (1,), "kwargs": {}, "future": f}, + meta_future_lst=meta_future_lst + )) + self.assertEqual(f.result(), 1) + self.assertTrue(f.done()) + self.assertFalse(_execute_task_dict( + task_dict={"shutdown": True, "wait": True}, + meta_future_lst=meta_future_lst + )) + + def test_executor_broker(self): + q = Queue() + f = Future() + q.put({"fn": calc, "args": (1,), "kwargs": {}, "future": f}) + q.put({"shutdown": True, "wait": True}) + _executor_broker(future_queue=q, max_workers=1) + self.assertTrue(f.done()) + self.assertEqual(f.result(), 1) + + +class TestMetaExecutor(unittest.TestCase): + def test_meta_executor(self): + with MetaExecutor(max_workers=2) as exe: + fs_1 = exe.submit(calc, 1) + fs_2 = exe.submit(calc, 2) + self.assertEqual(fs_1.result(), 1) + self.assertEqual(fs_2.result(), 2) + self.assertTrue(fs_1.done()) + self.assertTrue(fs_2.done())