diff --git a/docs/conf.py b/docs/conf.py index 67e0a1b..4a96a83 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -58,11 +58,14 @@ nitpick_ignore = [ ("py:class", "_RDT"), # private type annotations ("py:class", "_PDT"), # private type annotations + ("py:class", "Executor"), # sphinx can't find it ("py:class", "Processor"), # docs aren't published yet + ("py:class", "Retry"), # docs aren't published yet ("py:class", "Sink"), # docs aren't published yet ("py:class", "Source"), # docs aren't published yet ("py:class", "WorkflowDefinition"), # docs aren't published yet ("py:class", "TracebackType"), # Used as type annotation. Only available when type checking + ("py:class", "concurrent.futures._base.Executor"), # sphinx can't find it ("py:class", "concurrent.futures._base.Future"), # sphinx can't find it ("py:class", "sghi.etl.commons.processors._RDT"), # private type annotations ("py:class", "sghi.etl.commons.processors._PDT"), # private type annotations diff --git a/src/sghi/etl/commons/__init__.py b/src/sghi/etl/commons/__init__.py index d1489d1..f6a9f00 100644 --- a/src/sghi/etl/commons/__init__.py +++ b/src/sghi/etl/commons/__init__.py @@ -7,11 +7,12 @@ processor, ) from .sinks import NullSink, sink -from .sources import source +from .sources import GatherSource, source from .utils import fail_fast, fail_fast_factory, ignored_failed from .workflow_definitions import SimpleWorkflowDefinition __all__ = [ + "GatherSource", "NOOPProcessor", "NullSink", "ProcessorPipe", diff --git a/src/sghi/etl/commons/sources.py b/src/sghi/etl/commons/sources.py index de685dc..85a94b1 100644 --- a/src/sghi/etl/commons/sources.py +++ b/src/sghi/etl/commons/sources.py @@ -3,7 +3,9 @@ from __future__ import annotations import logging -from collections.abc import Callable +from collections.abc import Callable, Iterable, Sequence +from concurrent.futures import Executor, Future, ThreadPoolExecutor +from contextlib import ExitStack from functools import update_wrapper from logging import Logger from typing import Final, Generic, Self, TypeVar, final @@ -12,7 +14,16 @@ from sghi.disposable import not_disposed from sghi.etl.core import Source -from sghi.utils import ensure_callable, type_fqn +from sghi.retry import Retry, noop_retry +from sghi.task import ConcurrentExecutor, Supplier, supplier +from sghi.utils import ( + ensure_callable, + ensure_instance_of, + ensure_not_none_nor_empty, + type_fqn, +) + +from .utils import fail_fast # ============================================================================= # TYPES @@ -22,6 +33,10 @@ _RDT = TypeVar("_RDT") """Raw Data Type.""" +_T = TypeVar("_T") + +_ResultGatherer = Callable[[Iterable[Future[_T]]], Iterable[_T]] + _SourceCallable = Callable[[], _RDT] @@ -80,6 +95,201 @@ def source(f: Callable[[], _RDT]) -> Source[_RDT]: # ============================================================================= +@final +class GatherSource(Source[Sequence[_RDT]], Generic[_RDT]): + """A :class:`Source` that aggregates raw data from multiple sources. + + This ``Source`` implementation asynchronously draws data from multiple + other sources (embedded sources) and returns the aggregated results. A + result gatherer function can be provided to specify how to handle draw + errors and/or which results from the embedded sources to pick. A + :class:`retry policy` to handle transient draw errors of the + embedded sources can also be provided. A suitable :class:`Executor` can be + specified at instantiation to control the asynchronous draw from the + embedded sources. + + Instances of this class are **NOT SAFE** to retry and **SHOULD NEVER** be + retried. However, they do support retrying their embedded sources. This + is disabled by default but can be enabled by providing a suitable value to + the ``retry_policy_factory`` constructor parameter when creating new + instances. When enabled, each embedded source will be retried individually + per the specified retry policy in case it fails. + + Disposing instances of this class also disposes of their embedded sources. + + .. admonition:: Regarding retry safety + :class: tip + + Instances of this ``Source`` are **NOT SAFE** to retry. + """ + + __slots__ = ( + "_sources", + "_retry_policy_factory", + "_executor_factory", + "_result_gatherer", + "_is_disposed", + "_logger", + "_exit_stack", + "_prepped_sources", + "_executor", + ) + + def __init__( + self, + sources: Sequence[Source[_RDT]], + retry_policy_factory: Callable[[], Retry] = noop_retry, + executor_factory: Callable[[], Executor] = ThreadPoolExecutor, + result_gatherer: _ResultGatherer[_RDT] = fail_fast, + ) -> None: + """Create a new ``GatherSource`` instance with the given properties. + + :param sources: A ``Sequence`` of sources to asynchronously draw data + from. These sources are also referred to as the embedded sources. + The given ``Sequence`` *MUST NOT* be empty. + :param retry_policy_factory: A callable that supplies retry policy + instance(s) to apply to each embedded source. This MUST be a valid + callable object. Defaults to a factory that returns retry policies + that do nothing. + :param executor_factory: A callable that suppliers suitable + ``Executor`` instance(s) to use for the asynchronous draws. This + MUST be a valid callable object. Defaults to a factory that returns + ``ThreadPoolExecutor`` instances. + :param result_gatherer: A function that specifies how to handle draw + errors and/or which results from the embedded sources to pick. This + MUST be a valid callable object. Defaults to a gatherer that fails + if drawing from any of the embedded sources failed, or returns all + the drawn data otherwise. + + :raise TypeError: If ``source`` is NOT a ``Sequence``. + :raise ValueError: If ``sources`` is empty or if + ``retry_policy_factory``, ``executor_factory`` and + ``result_gatherer`` are NOT callable objects. + """ + super().__init__() + ensure_not_none_nor_empty( + value=ensure_instance_of( + value=sources, + message="'sources' MUST be a collections.abc.Sequence object.", + klass=Sequence, + ), + message="'sources' MUST NOT be empty.", + ) + self._sources: Sequence[Source[_RDT]] = tuple(sources) + self._retry_policy_factory: Callable[[], Retry] = ensure_callable( + value=retry_policy_factory, + message="'retry_policy_factory' MUST be a callable.", + ) + self._executor_factory: Callable[[], Executor] = ensure_callable( + value=executor_factory, + message="'executor_factory' MUST be a callable.", + ) + self._result_gatherer: _ResultGatherer[_RDT] = ensure_callable( + value=result_gatherer, + message="'result_gatherer' MUST be a callable.", + ) + self._is_disposed: bool = False + self._logger: Logger = logging.getLogger(type_fqn(self.__class__)) + self._exit_stack: ExitStack = ExitStack() + + # Prepare embedded sources for execution by ensuring that they are all + # disposed of properly once this object is disposed. + self._prepped_sources: Sequence[Supplier[_RDT]] = tuple( + self._source_to_task(self._exit_stack.push(_source)) + for _source in self._sources + ) + self._executor: ConcurrentExecutor[None, _RDT] = ConcurrentExecutor( + *self._prepped_sources, executor=self._executor_factory() + ) + + @not_disposed + @override + def __enter__(self) -> Self: + """Return ``self`` upon entering the runtime context. + + .. admonition:: Don't use after dispose + :class: error + + Invoking this method on an instance that is disposed(i.e. the + :attr:`is_disposed` property on the instance is ``True``) will + result in a :exc:`ResourceDisposedError` being raised. + + :return: This instance. + + :raise ResourceDisposedError: If this source has already been disposed. + """ + return super(Source, self).__enter__() + + @property + @override + def is_disposed(self) -> bool: + return self._is_disposed + + @not_disposed + @override + def draw(self) -> Sequence[_RDT]: + """Draw data from embedded sources and return the aggregated results. + + The contents of the resulting aggregate, and their ordering, are + determined by the result-gatherer function provided at this object's + instantiation. That is, the contents of the returned ``Sequence``, will + be the same as those returned by this object's result-gatherer function + and in the same order. + + .. admonition:: Don't use after dispose + :class: error + + Invoking this method on an instance that is disposed(i.e. the + :attr:`is_disposed` property on the instance is ``True``) will + result in a :exc:`ResourceDisposedError` being raised. + + :return: The aggregated results of drawing from each embedded source. + + :raise ResourceDisposedError: If this source has already been disposed. + """ + self._logger.info("Aggregating data from all available sources.") + + with self._executor as executor: + futures = executor.execute(None) + + return tuple(self._result_gatherer(futures)) + + @override + def dispose(self) -> None: + """Release any underlying resources contained by this source. + + All embedded sources are also disposed. After this method returns + successfully, the :attr:`is_disposed` property should return ``True``. + + .. note:: + Unless otherwise specified, trying to use methods of a + ``Disposable`` instance decorated with the + :func:`~sghi.disposable.not_disposed` decorator after this method + returns should generally be considered a programming error and + should result in a :exc:`~sghi.disposable.ResourceDisposedError` + being raised. + + This method should be idempotent allowing it to be called more + than once; only the first call, however, should have an effect. + + :return: None. + """ + self._is_disposed = True + self._exit_stack.close() + self._executor.dispose() + self._logger.info("Disposal complete.") + + def _source_to_task(self, s: Source[_RDT]) -> Supplier[_RDT]: + @supplier + def do_draw() -> _RDT: + with s as _s: + draw = self._retry_policy_factory().retry(_s.draw) + return draw() + + # noinspection PyTypeChecker + return do_draw + + @final class _SourceOfCallable(Source[_RDT], Generic[_RDT]): __slots__ = ("_delegate_to", "_is_disposed", "_logger") @@ -151,5 +361,6 @@ def dispose(self) -> None: __all__ = [ + "GatherSource", "source", ] diff --git a/test/sghi/etl/commons_tests/sources_tests.py b/test/sghi/etl/commons_tests/sources_tests.py index 8de7ee6..d77da8b 100644 --- a/test/sghi/etl/commons_tests/sources_tests.py +++ b/test/sghi/etl/commons_tests/sources_tests.py @@ -3,12 +3,16 @@ from __future__ import annotations -from typing import TYPE_CHECKING +import time +from collections.abc import Sequence +from typing import TYPE_CHECKING, Any +from unittest import TestCase import pytest +from typing_extensions import override from sghi.disposable import ResourceDisposedError -from sghi.etl.commons import source +from sghi.etl.commons import GatherSource, source from sghi.etl.core import Source if TYPE_CHECKING: @@ -93,3 +97,186 @@ def supply_ints(count: int = 5) -> Iterable[int]: with pytest.raises(ResourceDisposedError): supply_ints.__enter__() + + +class TestGatherSource(TestCase): + """Tests for the :class:`sghi.etl.commons.GatherSource` class.""" + + @override + def setUp(self) -> None: + super().setUp() + + @source + def get_greeting() -> str: + return "Hello, World!" + + @source + def supply_ints(count: int = 5) -> Iterable[int]: + yield from range(count) + + @source + def supply_ints_slowly( + count: int = 5, delay: float = 0.5 + ) -> Iterable[int]: + for i in range(count): + time.sleep(delay) + yield i + + self._embedded_sources: Sequence[Source[Any]] = [ + get_greeting, + supply_ints, + supply_ints_slowly, + ] + self._instance: Source[Sequence[Any]] = GatherSource( + sources=self._embedded_sources, + ) + + @override + def tearDown(self) -> None: + super().tearDown() + self._instance.dispose() + + def test_dispose_has_the_intended_side_effects(self) -> None: + """Calling :meth:`GatherSource.dispose` should result in the + :attr:`GatherSource.is_disposed` property being set to ``True``. + + Each embedded ``Source`` should also be disposed. + """ + self._instance.dispose() + + assert self._instance.is_disposed + for _source in self._embedded_sources: + assert _source.is_disposed + + def test_draw_returns_the_expected_value(self) -> None: + """:meth:`GatherSource.draw` should return the aggregated raw data + after drawing from each embedded source. + """ + result = self._instance.draw() + assert isinstance(result, Sequence) + assert len(result) == len(self._embedded_sources) + assert result[0] == "Hello, World!" + assert tuple(result[1]) == (0, 1, 2, 3, 4) + assert tuple(result[2]) == (0, 1, 2, 3, 4) + + def test_instantiation_fails_on_an_empty_sources_arg(self) -> None: + """Instantiating a :class:`GatherSource` with an empty ``sources`` + argument should raise a :exc:`ValueError`. + """ + with pytest.raises(ValueError, match="NOT be empty") as exp_info: + GatherSource(sources=[]) + + assert exp_info.value.args[0] == "'sources' MUST NOT be empty." + + def test_instantiation_fails_on_non_callable_executor_factory_arg( + self, + ) -> None: + """Instantiating a :class:`GatherSource` with a non-callable value for + the ``executor_factory`` argument should raise a :exc:`ValueError`. + """ + with pytest.raises(ValueError, match="MUST be a callable") as exp_info: + GatherSource( + sources=self._embedded_sources, + executor_factory=None, # type: ignore + ) + + assert ( + exp_info.value.args[0] == "'executor_factory' MUST be a callable." + ) + + def test_instantiation_fails_on_non_callable_result_gatherer_arg( + self, + ) -> None: + """Instantiating a :class:`GatherSource` with a non-callable value for + the ``result_gatherer`` argument should raise a :exc:`ValueError`. + """ + with pytest.raises(ValueError, match="MUST be a callable") as exp_info: + GatherSource( + sources=self._embedded_sources, + result_gatherer=None, # type: ignore + ) + + assert ( + exp_info.value.args[0] == "'result_gatherer' MUST be a callable." + ) + + def test_instantiation_fails_on_non_callable_retry_policy_factory_arg( + self, + ) -> None: + """Instantiating a :class:`GatherSource` with a non-callable value for + the ``retry_policy_factory`` argument should raise a :exc:`ValueError`. + """ + with pytest.raises(ValueError, match="MUST be a callable") as exp_info: + GatherSource( + sources=self._embedded_sources, + retry_policy_factory=None, # type: ignore + ) + + assert ( + exp_info.value.args[0] + == "'retry_policy_factory' MUST be a callable." + ) + + def test_instantiation_fails_on_non_sequence_sources_arg(self) -> None: + """Instantiating a :class:`GatherSource` with a non ``Sequence`` + ``sources`` argument should raise a :exc:`TypeError`. + """ + values = (None, 67, self._embedded_sources[0]) + for value in values: + with pytest.raises(TypeError, match="Sequence") as exp_info: + GatherSource(sources=value) # type: ignore + + assert ( + exp_info.value.args[0] + == "'sources' MUST be a collections.abc.Sequence object." + ) + + def test_multiple_dispose_invocations_is_okay(self) -> None: + """Calling :meth:`GatherSource.dispose` multiple times should be okay. + + No errors should be raised and the object should remain disposed. + """ + for _ in range(10): + try: + self._instance.dispose() + except Exception as exc: # noqa: BLE001 + fail_reason: str = ( + "Calling 'GatherSource.dispose()' multiple times should " + f"be okay. But the following error was raised: {exc!s}" + ) + pytest.fail(fail_reason) + + assert self._instance.is_disposed + for _source in self._embedded_sources: + assert _source.is_disposed + + def test_usage_as_a_context_manager_behaves_as_expected(self) -> None: + """:class:`GatherSource` instances are valid context managers and + should behave correctly when used as so. + """ + with self._instance: + result = self._instance.draw() + assert isinstance(result, Sequence) + assert len(result) == len(self._embedded_sources) + + assert self._instance.is_disposed + for _source in self._embedded_sources: + assert _source.is_disposed + + def test_usage_when_is_disposed_fails(self) -> None: + """Invoking "resource-aware" methods of a disposed instance should + result in an :exc:`ResourceDisposedError` being raised. + + Specifically, invoking the following two methods on a disposed instance + should fail: + + - :meth:`GatherSource.__enter__` + - :meth:`GatherSource.draw` + """ + self._instance.dispose() + + with pytest.raises(ResourceDisposedError): + self._instance.draw() + + with pytest.raises(ResourceDisposedError): + self._instance.__enter__()