From 178403d2f688dd53b586c42ccff2e524b91efecd Mon Sep 17 00:00:00 2001 From: Kennedy Kori Date: Thu, 6 Jun 2024 21:37:40 +0300 Subject: [PATCH] chore(sources): delay embedded sources disposal Refactor `sghi.etl.commons.sources.GatherSource` to delay the disposal of embedded sources. All embedded sources will be disposed of when the `GatherSource` is disposed of. This will support streaming sources that may need to remain "live" even after their `draw()` method returns. --- src/sghi/etl/commons/sources.py | 12 +++--- test/sghi/etl/commons_tests/sources_tests.py | 45 +++++++++++++++++--- 2 files changed, 46 insertions(+), 11 deletions(-) diff --git a/src/sghi/etl/commons/sources.py b/src/sghi/etl/commons/sources.py index 7ca0c14..ea2a2d9 100644 --- a/src/sghi/etl/commons/sources.py +++ b/src/sghi/etl/commons/sources.py @@ -118,7 +118,7 @@ class GatherSource(Source[Sequence[_RDT]], Generic[_RDT]): Disposing instances of this class also disposes of their embedded sources. .. admonition:: Regarding retry safety - :class: tip + :class: caution Instances of this ``Source`` are **NOT SAFE** to retry. """ @@ -256,8 +256,8 @@ def draw(self) -> Sequence[_RDT]: """ self._logger.info("Aggregating data from all available sources.") - with self._executor as executor: - futures = executor.execute(None) + executor = self._executor.__enter__() + futures = executor.execute(None) return tuple(self._result_gatherer(futures)) @@ -289,9 +289,9 @@ def dispose(self) -> None: def _source_to_task(self, s: Source[_RDT]) -> Supplier[_RDT]: @supplier def do_draw() -> _RDT: - with s as _s: - draw = self._retry_policy_factory().retry(_s.draw) - return draw() + _s = s.__enter__() + draw = self._retry_policy_factory().retry(_s.draw) + return draw() # noinspection PyTypeChecker return do_draw diff --git a/test/sghi/etl/commons_tests/sources_tests.py b/test/sghi/etl/commons_tests/sources_tests.py index d77da8b..300ccf3 100644 --- a/test/sghi/etl/commons_tests/sources_tests.py +++ b/test/sghi/etl/commons_tests/sources_tests.py @@ -4,19 +4,52 @@ from __future__ import annotations import time -from collections.abc import Sequence -from typing import TYPE_CHECKING, Any +from collections.abc import Iterable, Sequence +from typing import Any from unittest import TestCase import pytest from typing_extensions import override -from sghi.disposable import ResourceDisposedError +from sghi.disposable import ResourceDisposedError, not_disposed from sghi.etl.commons import GatherSource, source from sghi.etl.core import Source -if TYPE_CHECKING: - from collections.abc import Iterable +# ============================================================================= +# HELPERS +# ============================================================================= + + +class _StreamingSource(Source[Iterable[int]]): + def __init__(self) -> None: + self._yielded: int = 0 + self._is_disposed: bool = False + + @property + @override + def is_disposed(self) -> bool: + return self._is_disposed + + @not_disposed + @override + def draw(self) -> Iterable[int]: + for _ in range(3): + yield from self._do_yield() + self._yielded = 0 + + @override + def dispose(self) -> None: + self._is_disposed = True + + @not_disposed + def _do_yield(self) -> Iterable[int]: + yield from range(self._yielded, self._yielded + 4) + self._yielded += 4 + + +# ============================================================================= +# TESTS +# ============================================================================= def test_source_decorator_delegates_to_the_wrapped_callable() -> None: @@ -126,6 +159,7 @@ def supply_ints_slowly( get_greeting, supply_ints, supply_ints_slowly, + _StreamingSource(), ] self._instance: Source[Sequence[Any]] = GatherSource( sources=self._embedded_sources, @@ -158,6 +192,7 @@ def test_draw_returns_the_expected_value(self) -> None: assert result[0] == "Hello, World!" assert tuple(result[1]) == (0, 1, 2, 3, 4) assert tuple(result[2]) == (0, 1, 2, 3, 4) + assert tuple(result[3]) == (0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11) def test_instantiation_fails_on_an_empty_sources_arg(self) -> None: """Instantiating a :class:`GatherSource` with an empty ``sources``