From dc9636c68bf97c417f1929054081abfc7fa57688 Mon Sep 17 00:00:00 2001 From: Kennedy Kori Date: Tue, 20 Sep 2022 21:11:37 +0300 Subject: [PATCH] feat(lib): implement concurrent executor task Add a proper implementation for the `app.lib.concurrent.ConcurrentExecutor` task. This will allow the app to run multiple tasks concurrently. --- app/core/__init__.py | 3 +- app/lib/tasks/__init__.py | 3 +- app/lib/tasks/concurrent.py | 170 ++++++++++++++++++++++---- app/use_cases/fetch_metadata.py | 57 ++++++--- app/use_cases/run_extraction.py | 23 +++- app/use_cases/upload_extracts.py | 47 ++++++-- tests/lib/tasks/test_concurrent.py | 185 +++++++++++++++++++++++++++++ 7 files changed, 428 insertions(+), 60 deletions(-) create mode 100644 tests/lib/tasks/test_concurrent.py diff --git a/app/core/__init__.py b/app/core/__init__.py index f24a3e0..9a911dd 100644 --- a/app/core/__init__.py +++ b/app/core/__init__.py @@ -12,7 +12,7 @@ TransportClosedError, TransportError, ) -from .mixins import InitFromMapping, ToMapping, ToTask +from .mixins import Disposable, InitFromMapping, ToMapping, ToTask from .task import Task from .transport import Transport, TransportOptions @@ -20,6 +20,7 @@ "AbstractDomainObject", "DataSource", "DataSourceType", + "Disposable", "ExtractMetadata", "IDRClientException", "IdentifiableDomainObject", diff --git a/app/lib/tasks/__init__.py b/app/lib/tasks/__init__.py index 6d32ca4..09706af 100644 --- a/app/lib/tasks/__init__.py +++ b/app/lib/tasks/__init__.py @@ -1,5 +1,5 @@ from .common import Chainable, Consumer, Pipeline -from .concurrent import ConcurrentExecutor +from .concurrent import ConcurrentExecutor, completed_successfully from .pandas import ChunkDataFrame from .sql import SimpleSQLSelect, SQLTask @@ -11,4 +11,5 @@ "Pipeline", "SQLTask", "SimpleSQLSelect", + "completed_successfully", ] diff --git a/app/lib/tasks/concurrent.py b/app/lib/tasks/concurrent.py index f70411f..73d2dd2 100644 --- a/app/lib/tasks/concurrent.py +++ b/app/lib/tasks/concurrent.py @@ -1,16 +1,17 @@ +import sys +from concurrent.futures import Executor, Future, ThreadPoolExecutor from functools import reduce +from logging import getLogger from typing import ( - Any, Callable, Generic, - List, + MutableSequence, Optional, Sequence, TypeVar, - cast, ) -from app.core import Task +from app.core import Disposable, IDRClientException, Task # ============================================================================= # TYPES @@ -21,46 +22,167 @@ _RT = TypeVar("_RT") +if sys.version_info >= (3, 9): # pragma: no branch + Accumulator = Callable[ + [MutableSequence[Future[_RT]], Future[_RT]], + MutableSequence[Future[_RT]], + ] +else: # pragma: no branch + Accumulator = Callable[ + [MutableSequence[Future], Future], MutableSequence[Future] + ] + + +# ============================================================================= +# CONSTANTS +# ============================================================================= + + +_LOGGER = getLogger(__name__) + + +# ============================================================================= +# HELPERS +# ============================================================================= + + +def completed_successfully(future: Future) -> bool: + """ + Checks if a :class:`future ` completed successfully and returns + ``True`` if so and ``False`` otherwise. In this context a *future* is + considered to have completed successfully if it wasn't cancelled and no + exception was raised on it's callee. + + :param future: A future instance to check for successful completion. + + :return: ``True` if the future completed successfully, ``False`` otherwise. + """ + return bool( + future.done() and not future.cancelled() and future.exception() is None + ) + + +class ConcurrentExecutorDisposedError(IDRClientException): + """ + An exception indicating that an erroneous usage of a disposed + :class:`concurrent executor ` was made. + """ + + def __init__( + self, message: Optional[str] = "ConcurrentExecutor disposed." + ): + super().__init__(message=message) + # ============================================================================= # TASK # ============================================================================= -class ConcurrentExecutor(Generic[_IN, _RT], Task[_IN, _RT]): +class ConcurrentExecutor( + Generic[_IN, _RT], Task[_IN, MutableSequence["Future[_RT]"]], Disposable +): + """ + A :class:`task ` that takes multiple tasks with a common input and + executes them concurrently. The output of the task is a ``MutableSequence`` + of :class:`futures ` representing the execution of each of those + tasks. + + .. note:: + By default, this task uses a :class:`ThreadPoolExecutor` to run the + tasks concurrently which is more suitable for `I/O-bound` tasks but + poorly suited for computation intensive tasks. To work with the later + kind of tasks, a :class:`ProcessPoolExecutor` is recommended. This + can be passed to the constructor of this class when initializing it + using the `executor` parameter. + + For more details, see the official python + `threading docs `_. + + .. _threading_docs: https://docs.python.org/3/library/threading.html + """ + def __init__( self, - *tasks: Task[Any, Any], - accumulator: Optional[Callable[[_RT, Any], _RT]] = None, - initial_value: _RT = cast(_RT, list()), + *tasks: Task[_IN, _RT], + accumulator: Optional[Accumulator] = None, + initial_value: Optional[MutableSequence["Future[_RT]"]] = None, + executor: Optional[Executor] = None, ): - self._tasks: Sequence[Task[Any, Any]] = tuple(tasks) # TODO: queue?? - self._accumulator: Callable[[_RT, Any], _RT] = ( + """ + Initialize a new `ConcurrentExecutor` instance with the given + arguments. + + :param tasks: The tasks to be executed concurrently. + :param accumulator: An optional callable to collect the results + (futures) of executing the given tasks concurrently. If one is not + provided, then a default is used that simply appends each result to + a ``MutableSequence``. + :param initial_value: An optional ``MutableSequence`` instance to house + the results(futures) of executing the tasks concurrently. Defaults + to an empty list when one isn't given. + :param executor: The :class:`executor ` + instance to use when executing the tasks. A ``ThreadPoolExecutor`` + is used by default when one isn't provided. + """ + self._tasks: Sequence[Task[_IN, _RT]] = tuple(tasks) + self._accumulator: Accumulator = ( accumulator or self._default_accumulator ) - self._initial_value: _RT = initial_value + self._initial_value: MutableSequence["Future[_RT]"] + self._initial_value = initial_value or list() + self._executor: Executor = executor or ThreadPoolExecutor() + self._is_disposed: bool = False + + def __enter__(self) -> "ConcurrentExecutor": + self._ensure_not_disposed() + return self + + @property + def is_disposed(self) -> bool: + return self._is_disposed @property - def tasks(self) -> Sequence[Task[Any, Any]]: + def tasks(self) -> Sequence[Task[_IN, _RT]]: return self._tasks - def execute(self, an_input: _IN) -> _RT: - # TODO: Add a proper implementation that executes the tasks - # concurrently. + def dispose(self) -> None: + self._executor.shutdown(wait=True) + self._is_disposed = True + + def execute(self, an_input: _IN) -> MutableSequence["Future[_RT]"]: + self._ensure_not_disposed() return reduce( lambda _partial, _tsk: self._accumulator( - _partial, _tsk.execute(an_input) + _partial, + self._executor.submit(self._do_execute_task, _tsk, an_input), ), self.tasks, self._initial_value, ) + def _ensure_not_disposed(self) -> None: + if self._is_disposed: + raise ConcurrentExecutorDisposedError() + @staticmethod - def _default_accumulator(partial_result: _RT, task_output: Any) -> _RT: - _partial: List[Any] = list( - # An assumption is made here that the default initial value will - # always be a sequence of some kind. - cast(Sequence, partial_result) - ) - _partial.append(task_output) - return cast(_RT, _partial) + def _default_accumulator( + partial_results: MutableSequence["Future[_RT]"], + task_output: "Future[_RT]", + ) -> MutableSequence["Future[_RT]"]: + partial_results.append(task_output) + return partial_results + + @staticmethod + def _do_execute_task(task: Task[_IN, _RT], an_input: _IN) -> _RT: + try: + result: _RT = task.execute(an_input) + except Exception as exp: + _LOGGER.error( + 'Error while executing task of type="%s.%s".', + task.__module__, + task.__class__.__name__, + exc_info=exp, + ) + raise exp + return result diff --git a/app/use_cases/fetch_metadata.py b/app/use_cases/fetch_metadata.py index eda8c0d..f5f9403 100644 --- a/app/use_cases/fetch_metadata.py +++ b/app/use_cases/fetch_metadata.py @@ -1,3 +1,4 @@ +from concurrent.futures import Future, as_completed from itertools import chain from logging import getLogger from typing import Iterable, Mapping, Sequence @@ -9,7 +10,7 @@ Task, Transport, ) -from app.lib import ConcurrentExecutor +from app.lib import ConcurrentExecutor, completed_successfully # ============================================================================= # CONSTANTS @@ -24,7 +25,7 @@ # ============================================================================= -class DoFetchDataSources(Task[DataSourceType, Sequence[DataSource]]): +class DoFetchDataSources(Task[Transport, Sequence[DataSource]]): """Fetches all the data sources of a given data source type.""" def __init__(self, data_source_type: DataSourceType): @@ -45,7 +46,7 @@ def execute(self, an_input: Transport) -> Sequence[DataSource]: return data_sources -class DoFetchExtractMetadata(Task[DataSource, Sequence[ExtractMetadata]]): +class DoFetchExtractMetadata(Task[Transport, Sequence[ExtractMetadata]]): """Fetch all the extract metadata of a given data source.""" def __init__(self, data_source: DataSource): @@ -86,14 +87,26 @@ def execute( self, an_input: Sequence[DataSourceType] ) -> Sequence[DataSource]: _LOGGER.info("Fetching data sources.") - executor: ConcurrentExecutor[ - Transport, Sequence[Sequence[DataSource]] - ] = ConcurrentExecutor( - *self._data_source_types_to_tasks(an_input), initial_value=list() + executor: ConcurrentExecutor[Transport, Sequence[DataSource]] + executor = ConcurrentExecutor( + *self._data_source_types_to_tasks(an_input) + ) + with executor: + futures: Sequence[Future[Sequence[DataSource]]] + futures = executor(self._transport) # noqa + # Focus on completed tasks and ignore the ones that failed. + completed_futures = as_completed(futures) + return tuple( + chain.from_iterable( + map( + lambda _f: _f.result(), + filter( + lambda _f: completed_successfully(_f), + completed_futures, + ), + ) + ) ) - data_sources: Sequence[Sequence[DataSource]] - data_sources = executor(self._transport) # noqa - return tuple(chain.from_iterable(data_sources)) @staticmethod def _data_source_types_to_tasks( @@ -121,14 +134,24 @@ def execute( self, an_input: Sequence[DataSource] ) -> Sequence[ExtractMetadata]: _LOGGER.info("Fetching extract metadata.") - executor: ConcurrentExecutor[ - Transport, Sequence[Sequence[ExtractMetadata]] - ] = ConcurrentExecutor( - *self._data_sources_to_tasks(an_input), initial_value=list() + executor: ConcurrentExecutor[Transport, Sequence[ExtractMetadata]] + executor = ConcurrentExecutor(*self._data_sources_to_tasks(an_input)) + with executor: + futures: Sequence[Future[Sequence[ExtractMetadata]]] + futures = executor(self._transport) # noqa + # Focus on completed tasks and ignore the ones that failed. + completed_futures = as_completed(futures) + return tuple( + chain.from_iterable( + map( + lambda _f: _f.result(), + filter( + lambda _f: completed_successfully(_f), + completed_futures, + ), + ) + ) ) - extracts: Sequence[Sequence[ExtractMetadata]] - extracts = executor(self._transport) # noqa - return tuple(chain.from_iterable(extracts)) @staticmethod def _data_sources_to_tasks( diff --git a/app/use_cases/run_extraction.py b/app/use_cases/run_extraction.py index c121d1b..37fa693 100644 --- a/app/use_cases/run_extraction.py +++ b/app/use_cases/run_extraction.py @@ -1,9 +1,10 @@ +from concurrent.futures import Future, as_completed from itertools import chain, groupby from logging import getLogger from typing import Any, Sequence, Tuple from app.core import DataSource, ExtractMetadata, Task -from app.lib import ConcurrentExecutor +from app.lib import ConcurrentExecutor, completed_successfully from .types import RunExtractionResult @@ -106,14 +107,24 @@ def run_data_source_extracts( 'Running extraction for data source="%s"', str(data_source) ) with data_source: - executor: ConcurrentExecutor[ - DataSource, Sequence[RunExtractionResult] - ] + executor: ConcurrentExecutor[DataSource, RunExtractionResult] executor = ConcurrentExecutor( *(DoExtract(_extract) for _extract in extracts), - initial_value=list(), ) - return executor(data_source) # noqa + with executor: + futures: Sequence[Future[RunExtractionResult]] + futures = executor(data_source) # noqa + # Focus on completed tasks and ignore the ones that failed. + completed_futures = as_completed(futures) + return tuple( + map( + lambda _f: _f.result(), + filter( + lambda _f: completed_successfully(_f), + completed_futures, + ), + ) + ) # TODO: Add more tasks here to post process extraction results. E.g, handle diff --git a/app/use_cases/upload_extracts.py b/app/use_cases/upload_extracts.py index 9747901..cc5d600 100644 --- a/app/use_cases/upload_extracts.py +++ b/app/use_cases/upload_extracts.py @@ -1,3 +1,4 @@ +from concurrent.futures import Future, as_completed from logging import getLogger from typing import Any, Iterable, Sequence, Tuple, Type @@ -11,7 +12,7 @@ UploadChunk, UploadMetadata, ) -from app.lib import ConcurrentExecutor, Consumer +from app.lib import ConcurrentExecutor, Consumer, completed_successfully from .types import RunExtractionResult, UploadExtractResult @@ -111,13 +112,23 @@ def execute( self, an_input: Sequence[RunExtractionResult] ) -> Sequence[_PostedUpload]: _LOGGER.info("Posting uploads.") - executor: ConcurrentExecutor[Transport, Sequence[_PostedUpload]] + executor: ConcurrentExecutor[Transport, _PostedUpload] executor = ConcurrentExecutor( - *self._extraction_results_to_tasks(an_input), initial_value=list() + *self._extraction_results_to_tasks(an_input) + ) + with executor: + futures: Sequence[Future[_PostedUpload]] + futures = executor(self._transport) # noqa + # Focus on completed tasks and ignore the ones that failed. + completed_futures = as_completed(futures) + return tuple( + map( + lambda _f: _f.result(), + filter( + lambda _f: completed_successfully(_f), completed_futures + ), + ) ) - uploads: Sequence[_PostedUpload] - uploads = executor(self._transport) # noqa - return uploads @staticmethod def _extraction_results_to_tasks( @@ -174,17 +185,28 @@ def execute( def _post_upload_chunks( upload: UploadMetadata, chunks: Sequence[bytes], transport: Transport ) -> UploadExtractResult: - executor: ConcurrentExecutor[Transport, Sequence[UploadChunk]] + executor: ConcurrentExecutor[Transport, UploadChunk] executor = ConcurrentExecutor( *( DoPostChunk( upload=upload, chunk_index=_index, chunk_content=_chunk ) for _index, _chunk in enumerate(chunks) - ), - initial_value=list(), + ) + ) + with executor: + futures: Sequence[Future[UploadChunk]] + futures = executor(transport) # noqa + # Focus on completed tasks and ignore the ones that failed. + completed_futures = as_completed(futures) + uploaded_chunks: Sequence[UploadChunk] = tuple( + map( + lambda _f: _f.result(), + filter( + lambda _f: completed_successfully(_f), completed_futures + ), + ) ) - uploaded_chunks: Sequence[UploadChunk] = executor(transport) # noqa return upload, uploaded_chunks @@ -204,4 +226,7 @@ def _mark_uploads_as_complete( for _posted_upload in posted_uploads ) ) - executor(an_input=self._transport) # noqa + with executor: + futures: Sequence[Future[Any]] + futures = executor(an_input=self._transport) # noqa + as_completed(futures) diff --git a/tests/lib/tasks/test_concurrent.py b/tests/lib/tasks/test_concurrent.py new file mode 100644 index 0000000..1354d53 --- /dev/null +++ b/tests/lib/tasks/test_concurrent.py @@ -0,0 +1,185 @@ +from concurrent.futures import Executor, ThreadPoolExecutor, wait +from typing import Sequence +from unittest import TestCase + +import pytest + +from app.core import Task +from app.lib import ConcurrentExecutor, completed_successfully +from app.lib.tasks.concurrent import ConcurrentExecutorDisposedError + +# ============================================================================= +# HELPERS +# ============================================================================= + + +class _AddOne(Task[int, int]): + """ + A simple task that takes an integer and returns the sum of the integer + and 1. + """ + + def execute(self, an_input: int) -> int: + return an_input + 1 + + +class _DivideByZero(Task[int, int]): + """ + A simple task that always fails because it divides it's input by zero. + """ + + def execute(self, an_input: int) -> int: + return int(an_input / 0) + + +class _IntToString(Task[int, str]): + """ + A simple task that takes an integer and returns the integer's string + representation. + """ + + def execute(self, an_input: int) -> str: + return str(an_input) + + +# ============================================================================= +# TEST CASES +# ============================================================================= + + +class TestConcurrentExecutor(TestCase): + """Tests for the ``ConcurrentExecutor`` class.""" + + def setUp(self) -> None: + super().setUp() + self._tasks: Sequence[_AddOne] = tuple(_AddOne() for _ in range(10)) + self._instance: ConcurrentExecutor[int, int] = ConcurrentExecutor( + *self._tasks + ) + + def tearDown(self) -> None: + super().tearDown() + self._instance.dispose() + + def test_dispose_method_idempotency(self) -> None: + """ + Assert that the ``dispose`` method can be called multiple times without + failing. + """ + # Call the dispose method 10 times + for _ in range(10): + self._instance.dispose() + + assert self._instance.is_disposed + + def test_dispose_method_side_effects(self) -> None: + """ + Assert the ``dispose``method shutdowns the embedded + `:class:executor ` instance and marks the + instance as disposed. + """ + executor_service: Executor = ThreadPoolExecutor() + instance: ConcurrentExecutor[int, str] = ConcurrentExecutor( + _IntToString(), _IntToString(), executor=executor_service + ) + instance.dispose() + + assert instance.is_disposed + # A shutdown executor service should raise a RunTimeError on attempted + # usage. + with pytest.raises(RuntimeError): + executor_service.submit(_AddOne().execute, 1) + + def test_execute_method_return_value(self) -> None: + """ + Assert that the execute method returns the expected value. + """ + results1 = self._instance.execute(0) + results2 = ConcurrentExecutor( + _DivideByZero(), _DivideByZero() + ).execute(1) + result3 = ConcurrentExecutor( + *(_IntToString() for _ in range(10)) + ).execute(10) + + for result in results1: + assert result.result() == 1 + assert result.done() + for result in results2: + assert isinstance(result.exception(), ZeroDivisionError) + assert result.done() + for result in result3: + assert result.result() == "10" + assert result.done() + + def test_is_dispose_property_return_value(self) -> None: + """Assert the ``is_disposed`` property returns the expected result.""" + instance: ConcurrentExecutor[int, str] = ConcurrentExecutor( + _IntToString(), _IntToString() + ) + instance.dispose() + + assert instance.is_disposed + assert not self._instance.is_disposed + + def test_tasks_return_value(self) -> None: + """ + Assert that the ``tasks`` property returns the expected result. The + property should return a sequence of the same tasks given to the + instance during initialization. + """ + self.assertTupleEqual(tuple(self._tasks), tuple(self._instance.tasks)) + + def test_using_a_disposed_executor_raises_expected_errors(self) -> None: + """ + Assert that using a disposed concurrent executor instance results in + ``ConcurrentExecutorDisposedError`` being raised. + """ + self._instance.dispose() + with pytest.raises(ConcurrentExecutorDisposedError): + self._instance.execute(10) + + +class TestConcurrentModule(TestCase): + """Tests for the ``app.lib.concurrent`` module globals.""" + + def setUp(self) -> None: + super().setUp() + self._erroneous_tasks: Sequence[_DivideByZero] = tuple( + _DivideByZero() for _ in range(10) + ) + self._valid_tasks: Sequence[_AddOne] = tuple( + _AddOne() for _ in range(15) + ) + + def test_completed_successfully_function_return_value(self) -> None: + """ + Assert that the ``completed_successfully`` function returns the + expected value. + """ + + with ConcurrentExecutor(*self._erroneous_tasks) as c: + results1 = c.execute(10) + wait(results1) + with ConcurrentExecutor(*self._valid_tasks) as c: + results2 = c.execute(1) + wait(results2) + with ConcurrentExecutor( + *(tuple(self._erroneous_tasks) + tuple(self._valid_tasks)) + ) as c: + results3 = c.execute(1) + wait(results3) + + for result in results1: + assert not completed_successfully(result) + for result in results2: + assert completed_successfully(result) + assert len(tuple(filter(completed_successfully, results3))) == 15 + assert ( + len( + tuple( + filter(lambda _r: not completed_successfully(_r), results3) + ) + ) + == 10 + )