From 859567ec8b5de03782848b4085290aa85fe119ed Mon Sep 17 00:00:00 2001 From: Kennedy Kori Date: Sun, 19 May 2024 21:03:48 +0300 Subject: [PATCH] feat(processors): add a `ScatterGatherProcessor` Add `sghi.etl.commons.processors.ScatterGatherProcessor`, a `Processor` that applies multiple other processors to the same raw data, and then returns the aggregated outputs of these processors. --- src/sghi/etl/commons/__init__.py | 2 + src/sghi/etl/commons/processors.py | 220 +++++++++++++++++- .../etl/commons_tests/processors_tests.py | 185 ++++++++++++++- 3 files changed, 405 insertions(+), 2 deletions(-) diff --git a/src/sghi/etl/commons/__init__.py b/src/sghi/etl/commons/__init__.py index bd79e31..f1fd162 100644 --- a/src/sghi/etl/commons/__init__.py +++ b/src/sghi/etl/commons/__init__.py @@ -3,6 +3,7 @@ from .processors import ( NOOPProcessor, ProcessorPipe, + ScatterGatherProcessor, SplitGatherProcessor, pipe_processors, processor, @@ -18,6 +19,7 @@ "NullSink", "ProcessorPipe", "SimpleWorkflowDefinition", + "ScatterGatherProcessor", "SplitGatherProcessor", "fail_fast", "fail_fast_factory", diff --git a/src/sghi/etl/commons/processors.py b/src/sghi/etl/commons/processors.py index 7afe6e4..35acf8b 100644 --- a/src/sghi/etl/commons/processors.py +++ b/src/sghi/etl/commons/processors.py @@ -341,6 +341,223 @@ def do_apply(raw_data: _RDT) -> _PDT: pipe_processors = ProcessorPipe +@final +class ScatterGatherProcessor( + Processor[_RDT, Sequence[_PDT]], Generic[_RDT, _PDT] +): + """A :class:`Processor` that applies raw data to multiple other processors. + + This ``Processor`` implementation applies multiple other processors + (embedded processors) to the same raw data, and then returns the aggregated + outputs of these processors. The applications of the embedded processors to + the raw data are performed concurrently. A suitable :class:`Executor` can + be specified at instantiation to control the concurrent execution of the + embedded processors. A result gatherer function can be provided to specify + how to handle processing errors and/or which results from the embedded + processors to pick. A :class:`retry policy` to handle transient + processing errors of the embedded processors can also be provided. + + Instances of this class are **NOT SAFE** to retry and **SHOULD NEVER** be + retried. However, they do support retrying their embedded processors. This + is disabled by default but can be enabled by providing a suitable value to + the ``retry_policy_factory`` constructor parameter when creating new + instances. When enabled, each embedded processor will be retried + individually per the specified retry policy in case it fails. + + Disposing instances of this class also disposes of their embedded + processors. + + .. admonition:: Regarding retry safety + :class: tip + + Instances of this ``Processor`` are **NOT SAFE** to retry. + """ + + __slots__ = ( + "_processors", + "_retry_policy_factory", + "_executor_factory", + "_result_gatherer", + "_is_disposed", + "_logger", + "_exit_stack", + "_prepped_processors", + "_executor", + ) + + def __init__( + self, + processors: Sequence[Processor[_RDT, _PDT]], + retry_policy_factory: Callable[[], Retry] = noop_retry, + executor_factory: Callable[[], Executor] = ThreadPoolExecutor, + result_gatherer: _ResultGatherer[_PDT] = fail_fast, + ) -> None: + """Create a new ``ScatterGatherProcessor`` of the given properties. + + :param processors: A ``Sequence`` of processors to be applied to the + raw data. These processors are also referred to as the embedded + processors. The given ``Sequence`` *MUST NOT* be empty. + :param retry_policy_factory: A callable that supplies retry policy + instance(s) to apply to each embedded processor. This *MUST* be a + valid callable object. Defaults to a factory that returns retry + policies that do nothing. + :param executor_factory: A callable that suppliers suitable + ``Executor`` instance(s) to use for the concurrent processing. This + *MUST* be a valid callable object. Defaults to a factory that + returns ``ThreadPoolExecutor`` instances. + :param result_gatherer: A function that specifies how to handle + processing errors and/or which results from the embedded processors + to pick. This *MUST* be a valid callable object. Defaults to a + gatherer that fails if applying any of the embedded processors + failed, or returns all the processed data otherwise. + + :raise TypeError: If ``processors`` is NOT a ``Sequence``. + :raise ValueError: If ``processors`` is empty or if + ``retry_policy_factory``, ``executor_factory`` and + ``result_gatherer`` are NOT callable objects. + """ + super().__init__() + ensure_not_none_nor_empty( + value=ensure_instance_of( + value=processors, + message=( + "'processors' MUST be a collections.abc.Sequence object." + ), + klass=Sequence, + ), + message="'processors' MUST NOT be empty.", + ) + self._processors: Sequence[Processor[_RDT, _PDT]] = tuple(processors) + self._retry_policy_factory: Callable[[], Retry] = ensure_callable( + value=retry_policy_factory, + message="'retry_policy_factory' MUST be a callable.", + ) + self._executor_factory: Callable[[], Executor] = ensure_callable( + value=executor_factory, + message="'executor_factory' MUST be a callable.", + ) + self._result_gatherer: _ResultGatherer[_PDT] = ensure_callable( + value=result_gatherer, + message="'result_gatherer' MUST be a callable.", + ) + self._is_disposed: bool = False + self._logger: Logger = logging.getLogger(type_fqn(self.__class__)) + self._exit_stack: ExitStack = ExitStack() + + # Prepare embedded processors for execution by ensuring that they are + # all disposed of properly once this object is disposed. + self._prepped_processors: Sequence[Task[Any, Any]] = tuple( + self._processor_to_task(self._exit_stack.push(_processor)) + for _processor in self._processors + ) + self._executor: ConcurrentExecutor[_RDT, _PDT] + self._executor = ConcurrentExecutor( + *self._prepped_processors, executor=self._executor_factory() + ) + + @not_disposed + @override + def __enter__(self) -> Self: + """Return ``self`` upon entering the runtime context. + + .. admonition:: Don't use after dispose + :class: error + + Invoking this method on an instance that is disposed(i.e. the + :attr:`is_disposed` property on the instance is ``True``) will + result in a :exc:`ResourceDisposedError` being raised. + + :return: This instance. + + :raise ResourceDisposedError: If this processor has already been + disposed. + """ + return super(Processor, self).__enter__() + + @property + @override + def is_disposed(self) -> bool: + return self._is_disposed + + @not_disposed + @override + def apply(self, raw_data: _RDT) -> Sequence[_PDT]: + """Process the supplied raw data using all embedded processors and + return the results. + + This method concurrently applies all the embedded processors to the + supplied raw data. It then applies the results-gatherer function + assigned to this instance (at creation) to the :class:`Future` objects + resulting from the concurrent execution. Each of these ``Future`` + objects wraps the result of processing the supplied raw data using an + embedded processor contained in this ``ScatterGatherProcessor``, and + they preserve the same order. + + The contents of the resulting aggregate, and their ordering, are + determined by the provided result-gatherer function. That is, the + contents of the returned ``Sequence``, will be the same as those + returned by this object's result-gatherer function and in the same + order. + + .. admonition:: Don't use after dispose + :class: error + + Invoking this method on an instance that is disposed(i.e. the + :attr:`is_disposed` property on the instance is ``True``) will + result in a :exc:`ResourceDisposedError` being raised. + + :param raw_data: The data to be processed. + + :return: The aggregated results of applying all the embedded processors + to the provided raw data. + + :raise ResourceDisposedError: If this processor has already been + disposed. + """ # noqa: D205 + self._logger.info( + "Forking processing of the received data to all embedded " + "processors." + ) + with self._executor as executor: + futures = executor.execute(raw_data) + + return tuple(self._result_gatherer(futures)) + + @override + def dispose(self) -> None: + """Release any underlying resources contained by this processor. + + All embedded processors are also disposed. After this method returns + successfully, the :attr:`is_disposed` property should return ``True``. + + .. note:: + Unless otherwise specified, trying to use methods of a + ``Disposable`` instance decorated with the + :func:`~sghi.disposable.not_disposed` decorator after this method + returns should generally be considered a programming error and + should result in a :exc:`~sghi.disposable.ResourceDisposedError` + being raised. + + This method should be idempotent allowing it to be called more + than once; only the first call, however, should have an effect. + + :return: None. + """ + self._is_disposed = True + self._exit_stack.close() + self._executor.dispose() + self._logger.info("Disposal complete.") + + def _processor_to_task(self, p: Processor[_RDT, _PDT]) -> Task[_RDT, _PDT]: + @task + def do_apply(raw_data: _RDT) -> _PDT: + with p as _p: + apply = self._retry_policy_factory().retry(_p.apply) + return apply(raw_data) + + return do_apply + + @final class SplitGatherProcessor( Processor[Sequence[_RDT], Sequence[_PDT]], Generic[_RDT, _PDT] @@ -430,6 +647,7 @@ def __init__( ), message="'processors' MUST NOT be empty.", ) + self._processors: Sequence[Processor[_RDT, _PDT]] = tuple(processors) self._retry_policy_factory: Callable[[], Retry] = ensure_callable( value=retry_policy_factory, message="'retry_policy_factory' MUST be a callable.", @@ -442,7 +660,6 @@ def __init__( value=result_gatherer, message="'result_gatherer' MUST be a callable.", ) - self._processors: Sequence[Processor[_RDT, _PDT]] = tuple(processors) self._is_disposed: bool = False self._logger: Logger = logging.getLogger(type_fqn(self.__class__)) self._exit_stack: ExitStack = ExitStack() @@ -661,6 +878,7 @@ def dispose(self) -> None: __all__ = [ "NOOPProcessor", "ProcessorPipe", + "ScatterGatherProcessor", "SplitGatherProcessor", "pipe_processors", "processor", diff --git a/test/sghi/etl/commons_tests/processors_tests.py b/test/sghi/etl/commons_tests/processors_tests.py index fbcd637..609bbfb 100644 --- a/test/sghi/etl/commons_tests/processors_tests.py +++ b/test/sghi/etl/commons_tests/processors_tests.py @@ -15,6 +15,7 @@ from sghi.etl.commons import ( NOOPProcessor, ProcessorPipe, + ScatterGatherProcessor, SplitGatherProcessor, processor, ) @@ -321,6 +322,188 @@ def test_usage_when_is_disposed_fails(self) -> None: self._instance.__enter__() +class TestScatterGatherProcessor(TestCase): + """Tests for the :class:`sghi.etl.commons.ScatterGatherProcessor` class.""" + + @override + def setUp(self) -> None: + super().setUp() + + @processor + def add_100(value: int) -> int: + return value + 100 + + @processor + def mul_by_10(value: int) -> int: + time.sleep(0.5) # simulate IO blocking + return value * 10 + + @processor + def sub_50(value: int) -> int: + return value - 50 + + self._embedded_processors: Sequence[Processor] = [ + add_100, + mul_by_10, + sub_50, + ] + self._instance: Processor[int, Sequence[int]] + self._instance = ScatterGatherProcessor(self._embedded_processors) + + @override + def tearDown(self) -> None: + super().tearDown() + self._instance.dispose() + + def test_apply_returns_the_expected_value(self) -> None: + """:meth:`ScatterGatherProcessor.apply` should return the result of + aggregating the outputs of applying the given input to all embedded + processors contained by the ``ScatterGatherProcessor``. + """ + assert tuple(self._instance.apply(-45)) == (55, -450, -95) + + def test_dispose_has_the_intended_side_effects(self) -> None: + """Calling :meth:`ScatterGatherProcessor.dispose` should result in the + :attr:`ScatterGatherProcessor.is_disposed` property being set to + ``True``. + + Each embedded ``Processor`` should also be disposed. + """ + self._instance.dispose() + + assert self._instance.is_disposed + for _processor in self._embedded_processors: + assert _processor.is_disposed + + def test_instantiation_fails_on_an_empty_processors_arg(self) -> None: + """Instantiating a :class:`ScatterGatherProcessor` with an empty + ``processors`` argument should raise a :exc:`ValueError`. + """ + with pytest.raises(ValueError, match="NOT be empty") as exp_info: + ScatterGatherProcessor(processors=[]) + + assert exp_info.value.args[0] == "'processors' MUST NOT be empty." + + def test_instantiation_fails_on_non_callable_executor_factory_arg( + self, + ) -> None: + """Instantiating a :class:`ScatterGatherProcessor` with a non-callable + value for the ``executor_factory`` argument should raise a + :exc:`ValueError`. + """ + with pytest.raises(ValueError, match="MUST be a callable") as exp_info: + ScatterGatherProcessor( + processors=self._embedded_processors, + executor_factory=None, # type: ignore + ) + + assert ( + exp_info.value.args[0] == "'executor_factory' MUST be a callable." + ) + + def test_instantiation_fails_on_non_callable_result_gatherer_arg( + self, + ) -> None: + """Instantiating a :class:`ScatterGatherProcessor` with a non-callable + value for the ``result_gatherer`` argument should raise a + :exc:`ValueError`. + """ + with pytest.raises(ValueError, match="MUST be a callable") as exp_info: + ScatterGatherProcessor( + processors=self._embedded_processors, + result_gatherer=None, # type: ignore + ) + + assert ( + exp_info.value.args[0] == "'result_gatherer' MUST be a callable." + ) + + def test_instantiation_fails_on_non_callable_retry_policy_factory_arg( + self, + ) -> None: + """Instantiating a :class:`ScatterGatherProcessor` with a non-callable + value for the ``retry_policy_factory`` argument should raise a + :exc:`ValueError`. + """ + with pytest.raises(ValueError, match="MUST be a callable") as exp_info: + ScatterGatherProcessor( + processors=self._embedded_processors, + retry_policy_factory=None, # type: ignore + ) + + assert ( + exp_info.value.args[0] + == "'retry_policy_factory' MUST be a callable." + ) + + def test_instantiation_fails_on_non_sequence_processors_arg(self) -> None: + """Instantiating a :class:`ScatterGatherProcessor` with a non + ``Sequence`` ``processors`` argument should raise a :exc:`TypeError`. + """ + values = (None, 67, self._embedded_processors[0]) + for value in values: + with pytest.raises(TypeError, match="Sequence") as exp_info: + ScatterGatherProcessor(processors=value) # type: ignore + + assert ( + exp_info.value.args[0] + == "'processors' MUST be a collections.abc.Sequence object." + ) + + def test_multiple_dispose_invocations_is_okay(self) -> None: + """Calling :meth:`ScatterGatherProcessor.dispose` multiple times should + be okay. + + No errors should be raised and the object should remain disposed. + """ + for _ in range(10): + try: + self._instance.dispose() + except Exception as exc: # noqa: BLE001 + fail_reason: str = ( + "Calling 'ScatterGatherProcessor.dispose()' multiple " + "times should be okay. But the following error was " + f"raised: {exc!s}" + ) + pytest.fail(fail_reason) + + assert self._instance.is_disposed + for _processors in self._embedded_processors: + assert _processors.is_disposed + + def test_usage_as_a_context_manager_behaves_as_expected(self) -> None: + """:class:`ScatterGatherProcessor` instances are valid context managers + and should behave correctly when used as so. + """ + with self._instance: + result = self._instance.apply(50) + assert isinstance(result, Sequence) + assert len(result) == len(self._embedded_processors) + assert tuple(result) == (150, 500, 0) + + assert self._instance.is_disposed + for _processors in self._embedded_processors: + assert _processors.is_disposed + + def test_usage_when_is_disposed_fails(self) -> None: + """Invoking "resource-aware" methods of a disposed instance should + result in an :exc:`ResourceDisposedError` being raised. + + Specifically, invoking the following two methods on a disposed instance + should fail: + + - :meth:`ScatterGatherProcessor.__enter__` + - :meth:`ScatterGatherProcessor.apply` + """ + self._instance.dispose() + + with pytest.raises(ResourceDisposedError): + self._instance.apply(100) + + with pytest.raises(ResourceDisposedError): + self._instance.__enter__() + + class TestSplitGatherProcessor(TestCase): """Tests for the :class:`sghi.etl.commons.SplitGatherProcessor` class.""" @@ -492,7 +675,7 @@ def test_multiple_dispose_invocations_is_okay(self) -> None: self._instance.dispose() except Exception as exc: # noqa: BLE001 fail_reason: str = ( - "Calling 'SplitGatherProcessors.dispose()' multiple times " + "Calling 'SplitGatherProcessor.dispose()' multiple times " "should be okay. But the following error was raised: " f"{exc!s}" )