Skip to content

Commit

Permalink
feat(sources): add a source decorator (#5)
Browse files Browse the repository at this point in the history
Add `sghi.etl.commons.sources.source`, a decorator that marks a callable
as a `Source`. The decorated callable *MUST NOT* have any required
arguments but *MUST* return a value (the drawn data).
  • Loading branch information
kennedykori committed Apr 14, 2024
1 parent 9f3b039 commit b42328e
Show file tree
Hide file tree
Showing 4 changed files with 256 additions and 0 deletions.
4 changes: 4 additions & 0 deletions docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,19 +57,23 @@

nitpick_ignore = [
("py:class", "Processor"), # docs aren't published yet
("py:class", "Source"), # docs aren't published yet
("py:class", "TracebackType"), # Used as type annotation. Only available when type checking
("py:class", "concurrent.futures._base.Future"), # sphinx can't find it
("py:class", "sghi.etl.commons.processors._RDT"), # private type annotations
("py:class", "sghi.etl.commons.processors._PDT"), # private type annotations
("py:class", "sghi.etl.commons.sources._RDT"), # private type annotations
("py:class", "sghi.etl.commons.utils.result_gatherers._T"), # private type annotations
("py:class", "sghi.etl.core._RDT"), # private type annotations
("py:class", "sghi.etl.core._PDT"), # private type annotations
("py:class", "sghi.etl.core.Processor"), # docs aren't published yet
("py:class", "sghi.etl.core.Source"), # docs aren't published yet
("py:exc", "ResourceDisposedError"), # docs aren't published yet
("py:exc", "sghi.disposable.ResourceDisposedError"), # docs aren't published yet
("py:func", "sghi.disposable.not_disposed"), # docs aren't published yet
("py:obj", "sghi.etl.commons.processors._PDT"), # private type annotations
("py:obj", "sghi.etl.commons.processors._RDT"), # private type annotations
("py:obj", "sghi.etl.commons.sources._RDT"), # private type annotations
]

templates_path = ["templates"]
Expand Down
2 changes: 2 additions & 0 deletions src/sghi/etl/commons/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""Collection of utilities for working with SGHI ETL Workflows."""

from .processors import NOOPProcessor, processor
from .sources import source
from .utils import fail_fast, fail_fast_factory, ignored_failed

__all__ = [
Expand All @@ -9,4 +10,5 @@
"fail_fast_factory",
"ignored_failed",
"processor",
"source",
]
155 changes: 155 additions & 0 deletions src/sghi/etl/commons/sources.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
"""Common :class:`~sghi.etl.core.Source` implementations."""

from __future__ import annotations

import logging
from collections.abc import Callable
from functools import update_wrapper
from logging import Logger
from typing import Final, Generic, Self, TypeVar, final

from typing_extensions import override

from sghi.disposable import not_disposed
from sghi.etl.core import Source
from sghi.utils import ensure_callable, type_fqn

# =============================================================================
# TYPES
# =============================================================================


_RDT = TypeVar("_RDT")
"""Raw Data Type."""

_SourceCallable = Callable[[], _RDT]


# =============================================================================
# TYPES
# =============================================================================


_OF_CALLABLE_LOGGER_PREFIX: Final[str] = f"{__name__}.@source"


# =============================================================================
# DECORATORS
# =============================================================================


def source(f: Callable[[], _RDT]) -> Source[_RDT]:
"""Mark/decorate a ``Callable`` as a :class:`Source`.
The result is that the callable is converted into a ``Source`` instance.
When used as a decorator, invoking the decorated callable has the same
effect as invoking the ``draw`` method of the resulting ``Source``
instance.
.. important::
The decorated callable *MUST NOT* have any required arguments but MUST
return a value (the drawn data).
.. note::
The resulting values are true ``Source`` instances that can be
disposed. Once disposed, any attempts to invoke these instances will
result in an :exc:`ResourceDisposedError` being raised.
.. admonition:: Regarding retry safety
:class: tip
The resulting ``Source`` is safe to retry if and only if, the
decorated callable is safe to retry.
:param f: The callable to be decorated. The callable *MUST NOT* have any
required arguments but *MUST* return a value (the drawn data).
:return: A ``Source`` instance.
:raise ValueError: If the given value is NOT a ``Callable``.
"""
ensure_callable(f, message="A callable object is required.")

return _SourceOfCallable(delegate_to=f)


# =============================================================================
# SOURCE IMPLEMENTATIONS
# =============================================================================


@final
class _SourceOfCallable(Source[_RDT], Generic[_RDT]):
__slots__ = ("_delegate_to", "_is_disposed", "_logger")

def __init__(self, delegate_to: _SourceCallable[_RDT]) -> None:
super().__init__()
ensure_callable(
value=delegate_to,
message="'delegate_to' MUST be a callable object.",
)
self._delegate_to: _SourceCallable[_RDT] = delegate_to
self._is_disposed: bool = False
self._logger: Logger = logging.getLogger(
f"{_OF_CALLABLE_LOGGER_PREFIX}({type_fqn(self._delegate_to)})"
)
update_wrapper(self, self._delegate_to)

@not_disposed
@override
def __enter__(self) -> Self:
"""Return ``self`` upon entering the runtime context.
.. admonition:: Don't use after dispose
:class: error
Invoking this method on an instance that is disposed(i.e. the
:attr:`is_disposed` property on the instance is ``True``) will
result in a :exc:`ResourceDisposedError` being raised.
:return: This instance.
:raise ResourceDisposedError: If this source has already been disposed.
"""
return super(Source, self).__enter__()

@property
@override
def is_disposed(self) -> bool:
return self._is_disposed

@not_disposed
@override
def draw(self) -> _RDT:
"""Delegate data retrival to the wrapped callable.
.. admonition:: Don't use after dispose
:class: error
Invoking this method on an instance that is disposed(i.e. the
:attr:`is_disposed` property on the instance is ``True``) will
result in a :exc:`ResourceDisposedError` being raised.
:return: The drawn, raw data as returned by the wrapped callable.
:raise ResourceDisposedError: If this source has already been disposed.
"""
self._logger.info("Delegating to '%s'.", type_fqn(self._delegate_to))
return self._delegate_to()

@override
def dispose(self) -> None:
self._is_disposed = True
self._logger.info("Disposal complete.")


# =============================================================================
# MODULE EXPORTS
# =============================================================================


__all__ = [
"source",
]
95 changes: 95 additions & 0 deletions test/sghi/etl/commons_tests/sources_tests.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
# ruff: noqa: D205
"""Tests for the :module:`sghi.etl.commons.processors` module."""

from __future__ import annotations

from typing import TYPE_CHECKING

import pytest

from sghi.disposable import ResourceDisposedError
from sghi.etl.commons import source
from sghi.etl.core import Source

if TYPE_CHECKING:
from collections.abc import Iterable


def test_source_decorator_delegates_to_the_wrapped_callable() -> None:
""":func:`source` should delegate to the wrapped callable when invoked."""

def supply_ints(count: int = 4) -> Iterable[int]:
yield from range(count)

int_supplier_source: Source[Iterable[int]] = source(supply_ints)

assert list(int_supplier_source()) == list(supply_ints()) == [0, 1, 2, 3]


def test_source_decorator_fails_on_non_callable_input_value() -> None:
""":func:`source` should raise a :exc:`ValueError` when given a
non-callable` value.
"""
with pytest.raises(ValueError, match="callable object") as exc_info:
source("Not a function") # type: ignore

assert exc_info.value.args[0] == "A callable object is required."


def test_source_decorator_fails_on_a_none_input_value() -> None:
""":func:`source` should raise a :exc:`ValueError` when given a ``None``
value.
"""
with pytest.raises(ValueError, match="callable object") as exc_info:
source(None) # type: ignore

assert exc_info.value.args[0] == "A callable object is required."


def test_source_decorator_returns_expected_value() -> None:
""":func:`source` should return a ``Source`` instance."""

@source
def supply_ints(count: int = 5) -> Iterable[int]:
yield from range(count)

empty_string_supplier: Source[str] = source(str)

assert isinstance(supply_ints, Source)
assert isinstance(empty_string_supplier, Source)


def test_source_decorated_value_usage_as_a_context_manager() -> None:
""":func:`source` decorated callables are valid context managers and
should behave correctly when used as so.
"""

def supply_ints(count: int = 5) -> Iterable[int]:
yield from range(count)

with source(supply_ints) as int_supplier:
result: tuple[int, ...] = tuple(int_supplier())

assert result == (0, 1, 2, 3, 4)
assert int_supplier.is_disposed


def test_source_decorated_value_usage_when_is_disposed_fails() -> None:
"""Usage of a :func:`source` decorated callable should raise
:exc:`ResourceDisposedError` when invoked after being disposed.
"""

@source
def supply_ints(count: int = 5) -> Iterable[int]:
yield from range(count)

supply_ints.dispose()

with pytest.raises(ResourceDisposedError):
supply_ints()

with pytest.raises(ResourceDisposedError):
supply_ints.draw()

with pytest.raises(ResourceDisposedError):
supply_ints.__enter__()

0 comments on commit b42328e

Please sign in to comment.