From 538a30324f46bfa69261f699cd7ee34e7de559ff Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Sun, 30 Jul 2023 10:08:03 -0600 Subject: [PATCH] Refactor ExecutorBase --- pympipool/interfaces/base.py | 45 +++++++++++++++++++++ pympipool/interfaces/taskbroker.py | 41 ++----------------- pympipool/interfaces/taskexecutor.py | 54 +------------------------ pympipool/legacy/interfaces/executor.py | 4 +- 4 files changed, 53 insertions(+), 91 deletions(-) create mode 100644 pympipool/interfaces/base.py diff --git a/pympipool/interfaces/base.py b/pympipool/interfaces/base.py new file mode 100644 index 00000000..0a95e895 --- /dev/null +++ b/pympipool/interfaces/base.py @@ -0,0 +1,45 @@ +from concurrent.futures import Executor as FutureExecutor, Future +import queue + +from pympipool.shared.taskexecutor import cancel_items_in_queue + + +class ExecutorBase(FutureExecutor): + def __init__(self): + self._future_queue = queue.Queue() + self._process = None + + def submit(self, fn, *args, **kwargs): + """Submits a callable to be executed with the given arguments. + + Schedules the callable to be executed as fn(*args, **kwargs) and returns + a Future instance representing the execution of the callable. + + Returns: + A Future representing the given call. + """ + f = Future() + self._future_queue.put({"fn": fn, "args": args, "kwargs": kwargs, "future": f}) + return f + + def shutdown(self, wait=True, *, cancel_futures=False): + """Clean-up the resources associated with the Executor. + + It is safe to call this method several times. Otherwise, no other + methods can be called after this one. + + Args: + wait: If True then shutdown will not return until all running + futures have finished executing and the resources used by the + parallel_executors have been reclaimed. + cancel_futures: If True then shutdown will cancel all pending + futures. Futures that are completed or running will not be + cancelled. + """ + if cancel_futures: + cancel_items_in_queue(que=self._future_queue) + self._future_queue.put({"shutdown": True, "wait": wait}) + self._process.join() + + def __len__(self): + return self._future_queue.qsize() diff --git a/pympipool/interfaces/taskbroker.py b/pympipool/interfaces/taskbroker.py index 54904e7d..23b4c3c9 100644 --- a/pympipool/interfaces/taskbroker.py +++ b/pympipool/interfaces/taskbroker.py @@ -1,12 +1,9 @@ -from concurrent.futures import Executor as FutureExecutor, Future -import queue - +from pympipool.interfaces.base import ExecutorBase from pympipool.shared.thread import RaisingThread from pympipool.shared.broker import executor_broker -from pympipool.shared.taskexecutor import cancel_items_in_queue -class HPCExecutor(FutureExecutor): +class HPCExecutor(ExecutorBase): def __init__( self, max_workers, @@ -21,7 +18,7 @@ def __init__( queue_adapter=None, queue_adapter_kwargs=None, ): - self._future_queue = queue.Queue() + super().__init__() self._process = RaisingThread( target=executor_broker, kwargs={ @@ -40,35 +37,3 @@ def __init__( }, ) self._process.start() - - def submit(self, fn, *args, **kwargs): - """Submits a callable to be executed with the given arguments. - - Schedules the callable to be executed as fn(*args, **kwargs) and returns - a Future instance representing the execution of the callable. - - Returns: - A Future representing the given call. - """ - f = Future() - self._future_queue.put({"fn": fn, "args": args, "kwargs": kwargs, "future": f}) - return f - - def shutdown(self, wait=True, *, cancel_futures=False): - """Clean-up the resources associated with the Executor. - - It is safe to call this method several times. Otherwise, no other - methods can be called after this one. - - Args: - wait: If True then shutdown will not return until all running - futures have finished executing and the resources used by the - parallel_executors have been reclaimed. - cancel_futures: If True then shutdown will cancel all pending - futures. Futures that are completed or running will not be - cancelled. - """ - if cancel_futures: - cancel_items_in_queue(que=self._future_queue) - self._future_queue.put({"shutdown": True, "wait": wait}) - self._process.join() diff --git a/pympipool/interfaces/taskexecutor.py b/pympipool/interfaces/taskexecutor.py index 910f502b..5fae9c43 100644 --- a/pympipool/interfaces/taskexecutor.py +++ b/pympipool/interfaces/taskexecutor.py @@ -1,62 +1,11 @@ -from abc import ABC -from concurrent.futures import Executor as FutureExecutor, Future -from queue import Queue - +from pympipool.interfaces.base import ExecutorBase from pympipool.shared.thread import RaisingThread from pympipool.shared.taskexecutor import ( execute_parallel_tasks, cloudpickle_register, - cancel_items_in_queue, ) -class ExecutorBase(FutureExecutor, ABC): - """ - Base class for the Executor and PoolExecutor class defined below. The ExecutorBase class is not intended to be used - alone. Rather it implements the submit(), shutdown() and __len__() function shared between the derived classes. - """ - - def __init__(self): - self._future_queue = Queue() - self._process = None - cloudpickle_register(ind=3) - - def submit(self, fn, *args, **kwargs): - """Submits a callable to be executed with the given arguments. - - Schedules the callable to be executed as fn(*args, **kwargs) and returns - a Future instance representing the execution of the callable. - - Returns: - A Future representing the given call. - """ - f = Future() - self._future_queue.put({"fn": fn, "args": args, "kwargs": kwargs, "future": f}) - return f - - def shutdown(self, wait=True, *, cancel_futures=False): - """Clean-up the resources associated with the Executor. - - It is safe to call this method several times. Otherwise, no other - methods can be called after this one. - - Args: - wait: If True then shutdown will not return until all running - futures have finished executing and the resources used by the - parallel_executors have been reclaimed. - cancel_futures: If True then shutdown will cancel all pending - futures. Futures that are completed or running will not be - cancelled. - """ - if cancel_futures: - cancel_items_in_queue(que=self._future_queue) - self._future_queue.put({"shutdown": True, "wait": wait}) - self._process.join() - - def __len__(self): - return self._future_queue.qsize() - - class Executor(ExecutorBase): """ The pympipool.Executor behaves like the concurrent.futures.Executor but it uses mpi4py to execute parallel tasks. @@ -130,3 +79,4 @@ def __init__( self._future_queue.put( {"init": True, "fn": init_function, "args": (), "kwargs": {}} ) + cloudpickle_register(ind=3) diff --git a/pympipool/legacy/interfaces/executor.py b/pympipool/legacy/interfaces/executor.py index 72281e45..56b2905e 100644 --- a/pympipool/legacy/interfaces/executor.py +++ b/pympipool/legacy/interfaces/executor.py @@ -1,6 +1,7 @@ -from pympipool.interfaces.taskexecutor import ExecutorBase +from pympipool.interfaces.base import ExecutorBase from pympipool.shared.thread import RaisingThread from pympipool.legacy.shared.interface import execute_serial_tasks +from pympipool.shared.taskexecutor import cloudpickle_register class PoolExecutor(ExecutorBase): @@ -68,3 +69,4 @@ def __init__( }, ) self._process.start() + cloudpickle_register(ind=3)