Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 45 additions & 0 deletions pympipool/interfaces/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
from concurrent.futures import Executor as FutureExecutor, Future
import queue

from pympipool.shared.taskexecutor import cancel_items_in_queue


class ExecutorBase(FutureExecutor):
def __init__(self):
self._future_queue = queue.Queue()
self._process = None

def submit(self, fn, *args, **kwargs):
"""Submits a callable to be executed with the given arguments.

Schedules the callable to be executed as fn(*args, **kwargs) and returns
a Future instance representing the execution of the callable.

Returns:
A Future representing the given call.
"""
f = Future()
self._future_queue.put({"fn": fn, "args": args, "kwargs": kwargs, "future": f})
return f

def shutdown(self, wait=True, *, cancel_futures=False):
"""Clean-up the resources associated with the Executor.

It is safe to call this method several times. Otherwise, no other
methods can be called after this one.

Args:
wait: If True then shutdown will not return until all running
futures have finished executing and the resources used by the
parallel_executors have been reclaimed.
cancel_futures: If True then shutdown will cancel all pending
futures. Futures that are completed or running will not be
cancelled.
"""
if cancel_futures:
cancel_items_in_queue(que=self._future_queue)
self._future_queue.put({"shutdown": True, "wait": wait})
self._process.join()

def __len__(self):
return self._future_queue.qsize()
41 changes: 3 additions & 38 deletions pympipool/interfaces/taskbroker.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
from concurrent.futures import Executor as FutureExecutor, Future
import queue

from pympipool.interfaces.base import ExecutorBase
from pympipool.shared.thread import RaisingThread
from pympipool.shared.broker import executor_broker
from pympipool.shared.taskexecutor import cancel_items_in_queue


class HPCExecutor(FutureExecutor):
class HPCExecutor(ExecutorBase):
def __init__(
self,
max_workers,
Expand All @@ -21,7 +18,7 @@ def __init__(
queue_adapter=None,
queue_adapter_kwargs=None,
):
self._future_queue = queue.Queue()
super().__init__()
self._process = RaisingThread(
target=executor_broker,
kwargs={
Expand All @@ -40,35 +37,3 @@ def __init__(
},
)
self._process.start()

def submit(self, fn, *args, **kwargs):
"""Submits a callable to be executed with the given arguments.

Schedules the callable to be executed as fn(*args, **kwargs) and returns
a Future instance representing the execution of the callable.

Returns:
A Future representing the given call.
"""
f = Future()
self._future_queue.put({"fn": fn, "args": args, "kwargs": kwargs, "future": f})
return f

def shutdown(self, wait=True, *, cancel_futures=False):
"""Clean-up the resources associated with the Executor.

It is safe to call this method several times. Otherwise, no other
methods can be called after this one.

Args:
wait: If True then shutdown will not return until all running
futures have finished executing and the resources used by the
parallel_executors have been reclaimed.
cancel_futures: If True then shutdown will cancel all pending
futures. Futures that are completed or running will not be
cancelled.
"""
if cancel_futures:
cancel_items_in_queue(que=self._future_queue)
self._future_queue.put({"shutdown": True, "wait": wait})
self._process.join()
54 changes: 2 additions & 52 deletions pympipool/interfaces/taskexecutor.py
Original file line number Diff line number Diff line change
@@ -1,62 +1,11 @@
from abc import ABC
from concurrent.futures import Executor as FutureExecutor, Future
from queue import Queue

from pympipool.interfaces.base import ExecutorBase
from pympipool.shared.thread import RaisingThread
from pympipool.shared.taskexecutor import (
execute_parallel_tasks,
cloudpickle_register,
cancel_items_in_queue,
)


class ExecutorBase(FutureExecutor, ABC):
"""
Base class for the Executor and PoolExecutor class defined below. The ExecutorBase class is not intended to be used
alone. Rather it implements the submit(), shutdown() and __len__() function shared between the derived classes.
"""

def __init__(self):
self._future_queue = Queue()
self._process = None
cloudpickle_register(ind=3)

def submit(self, fn, *args, **kwargs):
"""Submits a callable to be executed with the given arguments.

Schedules the callable to be executed as fn(*args, **kwargs) and returns
a Future instance representing the execution of the callable.

Returns:
A Future representing the given call.
"""
f = Future()
self._future_queue.put({"fn": fn, "args": args, "kwargs": kwargs, "future": f})
return f

def shutdown(self, wait=True, *, cancel_futures=False):
"""Clean-up the resources associated with the Executor.

It is safe to call this method several times. Otherwise, no other
methods can be called after this one.

Args:
wait: If True then shutdown will not return until all running
futures have finished executing and the resources used by the
parallel_executors have been reclaimed.
cancel_futures: If True then shutdown will cancel all pending
futures. Futures that are completed or running will not be
cancelled.
"""
if cancel_futures:
cancel_items_in_queue(que=self._future_queue)
self._future_queue.put({"shutdown": True, "wait": wait})
self._process.join()

def __len__(self):
return self._future_queue.qsize()


class Executor(ExecutorBase):
"""
The pympipool.Executor behaves like the concurrent.futures.Executor but it uses mpi4py to execute parallel tasks.
Expand Down Expand Up @@ -130,3 +79,4 @@ def __init__(
self._future_queue.put(
{"init": True, "fn": init_function, "args": (), "kwargs": {}}
)
cloudpickle_register(ind=3)
4 changes: 3 additions & 1 deletion pympipool/legacy/interfaces/executor.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from pympipool.interfaces.taskexecutor import ExecutorBase
from pympipool.interfaces.base import ExecutorBase
from pympipool.shared.thread import RaisingThread
from pympipool.legacy.shared.interface import execute_serial_tasks
from pympipool.shared.taskexecutor import cloudpickle_register


class PoolExecutor(ExecutorBase):
Expand Down Expand Up @@ -68,3 +69,4 @@ def __init__(
},
)
self._process.start()
cloudpickle_register(ind=3)