Skip to content

Commit

Permalink
chore(sources): delay embedded sources disposal (#24)
Browse files Browse the repository at this point in the history
Refactor `sghi.etl.commons.sources.GatherSource` to delay the disposal
of embedded sources. All embedded sources will be disposed of when the
`GatherSource` is disposed of. This will support streaming sources that
may need to remain "live" even after their `draw()` method returns.
  • Loading branch information
kennedykori committed Jun 6, 2024
1 parent 7807ca2 commit e5e3ce4
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 11 deletions.
12 changes: 6 additions & 6 deletions src/sghi/etl/commons/sources.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ class GatherSource(Source[Sequence[_RDT]], Generic[_RDT]):
Disposing instances of this class also disposes of their embedded sources.
.. admonition:: Regarding retry safety
:class: tip
:class: caution
Instances of this ``Source`` are **NOT SAFE** to retry.
"""
Expand Down Expand Up @@ -256,8 +256,8 @@ def draw(self) -> Sequence[_RDT]:
"""
self._logger.info("Aggregating data from all available sources.")

with self._executor as executor:
futures = executor.execute(None)
executor = self._executor.__enter__()
futures = executor.execute(None)

return tuple(self._result_gatherer(futures))

Expand Down Expand Up @@ -289,9 +289,9 @@ def dispose(self) -> None:
def _source_to_task(self, s: Source[_RDT]) -> Supplier[_RDT]:
@supplier
def do_draw() -> _RDT:
with s as _s:
draw = self._retry_policy_factory().retry(_s.draw)
return draw()
_s = s.__enter__()
draw = self._retry_policy_factory().retry(_s.draw)
return draw()

# noinspection PyTypeChecker
return do_draw
Expand Down
45 changes: 40 additions & 5 deletions test/sghi/etl/commons_tests/sources_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,52 @@
from __future__ import annotations

import time
from collections.abc import Sequence
from typing import TYPE_CHECKING, Any
from collections.abc import Iterable, Sequence
from typing import Any
from unittest import TestCase

import pytest
from typing_extensions import override

from sghi.disposable import ResourceDisposedError
from sghi.disposable import ResourceDisposedError, not_disposed
from sghi.etl.commons import GatherSource, source
from sghi.etl.core import Source

if TYPE_CHECKING:
from collections.abc import Iterable
# =============================================================================
# HELPERS
# =============================================================================


class _StreamingSource(Source[Iterable[int]]):
def __init__(self) -> None:
self._yielded: int = 0
self._is_disposed: bool = False

@property
@override
def is_disposed(self) -> bool:
return self._is_disposed

@not_disposed
@override
def draw(self) -> Iterable[int]:
for _ in range(3):
yield from self._do_yield()
self._yielded = 0

@override
def dispose(self) -> None:
self._is_disposed = True

@not_disposed
def _do_yield(self) -> Iterable[int]:
yield from range(self._yielded, self._yielded + 4)
self._yielded += 4


# =============================================================================
# TESTS
# =============================================================================


def test_source_decorator_delegates_to_the_wrapped_callable() -> None:
Expand Down Expand Up @@ -126,6 +159,7 @@ def supply_ints_slowly(
get_greeting,
supply_ints,
supply_ints_slowly,
_StreamingSource(),
]
self._instance: Source[Sequence[Any]] = GatherSource(
sources=self._embedded_sources,
Expand Down Expand Up @@ -158,6 +192,7 @@ def test_draw_returns_the_expected_value(self) -> None:
assert result[0] == "Hello, World!"
assert tuple(result[1]) == (0, 1, 2, 3, 4)
assert tuple(result[2]) == (0, 1, 2, 3, 4)
assert tuple(result[3]) == (0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11)

def test_instantiation_fails_on_an_empty_sources_arg(self) -> None:
"""Instantiating a :class:`GatherSource` with an empty ``sources``
Expand Down

0 comments on commit e5e3ce4

Please sign in to comment.