Skip to content

Commit

Permalink
feat(sinks): add a sink decorator
Browse files Browse the repository at this point in the history
Add `sghi.etl.commons.sinks.sink`, a decorator that marks a callable as
a `Sink`. The decorated callable *MUST* accept at least one argument but
have at *MOST* one required argument (the processed data to drain/consume).
  • Loading branch information
kennedykori committed Apr 14, 2024
1 parent 9e5d6c6 commit d3925b9
Show file tree
Hide file tree
Showing 5 changed files with 263 additions and 1 deletion.
4 changes: 4 additions & 0 deletions docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,22 +57,26 @@

nitpick_ignore = [
("py:class", "Processor"), # docs aren't published yet
("py:class", "Sink"), # docs aren't published yet
("py:class", "Source"), # docs aren't published yet
("py:class", "TracebackType"), # Used as type annotation. Only available when type checking
("py:class", "concurrent.futures._base.Future"), # sphinx can't find it
("py:class", "sghi.etl.commons.processors._RDT"), # private type annotations
("py:class", "sghi.etl.commons.processors._PDT"), # private type annotations
("py:class", "sghi.etl.commons.sinks._PDT"), # private type annotations
("py:class", "sghi.etl.commons.sources._RDT"), # private type annotations
("py:class", "sghi.etl.commons.utils.result_gatherers._T"), # private type annotations
("py:class", "sghi.etl.core._RDT"), # private type annotations
("py:class", "sghi.etl.core._PDT"), # private type annotations
("py:class", "sghi.etl.core.Processor"), # docs aren't published yet
("py:class", "sghi.etl.core.Sink"), # docs aren't published yet
("py:class", "sghi.etl.core.Source"), # docs aren't published yet
("py:exc", "ResourceDisposedError"), # docs aren't published yet
("py:exc", "sghi.disposable.ResourceDisposedError"), # docs aren't published yet
("py:func", "sghi.disposable.not_disposed"), # docs aren't published yet
("py:obj", "sghi.etl.commons.processors._PDT"), # private type annotations
("py:obj", "sghi.etl.commons.processors._RDT"), # private type annotations
("py:obj", "sghi.etl.commons.sinks._PDT"), # private type annotations
("py:obj", "sghi.etl.commons.sources._RDT"), # private type annotations
]

Expand Down
2 changes: 2 additions & 0 deletions src/sghi/etl/commons/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""Collection of utilities for working with SGHI ETL Workflows."""

from .processors import NOOPProcessor, processor
from .sinks import sink
from .sources import source
from .utils import fail_fast, fail_fast_factory, ignored_failed

Expand All @@ -10,5 +11,6 @@
"fail_fast_factory",
"ignored_failed",
"processor",
"sink",
"source",
]
156 changes: 156 additions & 0 deletions src/sghi/etl/commons/sinks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
"""Common :class:`~sghi.etl.core.Sink` implementations."""

from __future__ import annotations

import logging
from collections.abc import Callable
from functools import update_wrapper
from logging import Logger
from typing import Final, Generic, Self, TypeVar, final

from typing_extensions import override

from sghi.disposable import not_disposed
from sghi.etl.core import Sink
from sghi.utils import ensure_callable, type_fqn

# =============================================================================
# TYPES
# =============================================================================


_PDT = TypeVar("_PDT")
""""Type variable representing the data type after processing."""

_SinkCallable = Callable[[_PDT], None]


# =============================================================================
# TYPES
# =============================================================================


_OF_CALLABLE_LOGGER_PREFIX: Final[str] = f"{__name__}.@sink"


# =============================================================================
# DECORATORS
# =============================================================================


def sink(f: Callable[[_PDT], None]) -> Sink[_PDT]:
"""Mark/decorate a ``Callable`` as a :class:`Sink`.
The result is that the callable is converted into a ``Sink`` instance.
When used as a decorator, invoking the decorated callable has the same
effect as invoking the ``drain`` method of the resulting ``Sink`` instance.
.. important::
The decorated callable *MUST* accept at least one argument but have at
*MOST* one required argument (the processed data to drain/consume).
.. note::
The resulting values are true ``Sink`` instances that can be disposed.
Once disposed, any attempts to invoke these instances will
result in an :exc:`ResourceDisposedError` being raised.
.. admonition:: Regarding retry safety
:class: tip
The resulting ``Sink`` is safe to retry if and only if, the decorated
callable is safe to retry.
:param f: The callable to be decorated. The callable *MUST* have at *MOST*
one required argument (the processed data to drain/consume).
:return: A ``Sink`` instance.
:raise ValueError: If the given value is NOT a ``Callable``.
"""
ensure_callable(f, message="A callable object is required.")

return _SourceOfCallable(delegate_to=f)


# =============================================================================
# SINK IMPLEMENTATIONS
# =============================================================================


@final
class _SourceOfCallable(Sink[_PDT], Generic[_PDT]):
__slots__ = ("_delegate_to", "_is_disposed", "_logger")

def __init__(self, delegate_to: _SinkCallable[_PDT]) -> None:
super().__init__()
ensure_callable(
value=delegate_to,
message="'delegate_to' MUST be a callable object.",
)
self._delegate_to: _SinkCallable[_PDT] = delegate_to
self._is_disposed: bool = False
self._logger: Logger = logging.getLogger(
f"{_OF_CALLABLE_LOGGER_PREFIX}({type_fqn(self._delegate_to)})"
)
update_wrapper(self, self._delegate_to)

@not_disposed
@override
def __enter__(self) -> Self:
"""Return ``self`` upon entering the runtime context.
.. admonition:: Don't use after dispose
:class: error
Invoking this method on an instance that is disposed(i.e. the
:attr:`is_disposed` property on the instance is ``True``) will
result in a :exc:`ResourceDisposedError` being raised.
:return: This instance.
:raise ResourceDisposedError: If this sink has already been disposed.
"""
return super(Sink, self).__enter__()

@property
@override
def is_disposed(self) -> bool:
return self._is_disposed

@not_disposed
@override
def drain(self, processed_data: _PDT) -> None:
"""Delegate consumption of the processed data to the wrapped callable.
.. admonition:: Don't use after dispose
:class: error
Invoking this method on an instance that is disposed(i.e. the
:attr:`is_disposed` property on the instance is ``True``) will
result in a :exc:`ResourceDisposedError` being raised.
:param processed_data: The processed data to consume/drain.
:return: None.
:raise ResourceDisposedError: If this sink has already been disposed.
"""
self._logger.info("Delegating to '%s'.", type_fqn(self._delegate_to))
self._delegate_to(processed_data)

@override
def dispose(self) -> None:
self._is_disposed = True
self._logger.info("Disposal complete.")


# =============================================================================
# MODULE EXPORTS
# =============================================================================


__all__ = [
"sink",
]
100 changes: 100 additions & 0 deletions test/sghi/etl/commons_tests/sinks_tests.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
# ruff: noqa: D205
"""Tests for the :module:`sghi.etl.commons.sinks` module."""

from __future__ import annotations

from typing import TYPE_CHECKING

import pytest

from sghi.disposable import ResourceDisposedError
from sghi.etl.commons import sink
from sghi.etl.core import Sink

if TYPE_CHECKING:
from collections.abc import Iterable, MutableSequence


def test_sink_decorator_delegates_to_the_wrapped_callable() -> None:
""":func:`sink` should delegate to the wrapped callable when invoked."""
repository: MutableSequence[int] = []

def save_ints(values: Iterable[int]) -> None:
repository.extend(values)

ints_consumer: Sink[Iterable[int]] = sink(save_ints)
ints_consumer(range(5))

assert repository == [0, 1, 2, 3, 4]


def test_sink_decorator_fails_on_non_callable_input_value() -> None:
""":func:`sink` should raise a :exc:`ValueError` when given a
non-callable` value.
"""
with pytest.raises(ValueError, match="callable object") as exc_info:
sink("Not a function") # type: ignore

assert exc_info.value.args[0] == "A callable object is required."


def test_sink_decorator_fails_on_a_none_input_value() -> None:
""":func:`sink` should raise a :exc:`ValueError` when given a ``None``
value.
"""
with pytest.raises(ValueError, match="callable object") as exc_info:
sink(None) # type: ignore

assert exc_info.value.args[0] == "A callable object is required."


def test_sink_decorator_returns_expected_value() -> None:
""":func:`sink` should return a ``Sink`` instance."""
repository: MutableSequence[int] = []

@sink
def save_ints(values: Iterable[int]) -> None:
repository.extend(values)

print_all: Sink[str] = sink(print)

assert isinstance(save_ints, Sink)
assert isinstance(print_all, Sink)


def test_sink_decorated_value_usage_as_a_context_manager() -> None:
""":func:`sink` decorated callables are valid context managers and
should behave correctly when used as so.
"""
repository: MutableSequence[int] = []

def save_ints(values: Iterable[int]) -> None:
repository.extend(values)

with sink(save_ints) as ints_consumer:
ints_consumer(range(5))

assert repository == [0, 1, 2, 3, 4]
assert ints_consumer.is_disposed


def test_sink_decorated_value_usage_when_is_disposed_fails() -> None:
"""Usage of a :func:`sink` decorated callable should raise
:exc:`ResourceDisposedError` when invoked after being disposed.
"""
repository: MutableSequence[int] = []

@sink
def save_ints(values: Iterable[int]) -> None:
repository.extend(values)

save_ints.dispose()

with pytest.raises(ResourceDisposedError):
save_ints(range(5))

with pytest.raises(ResourceDisposedError):
save_ints.drain(range(5))

with pytest.raises(ResourceDisposedError):
save_ints.__enter__()
2 changes: 1 addition & 1 deletion test/sghi/etl/commons_tests/sources_tests.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# ruff: noqa: D205
"""Tests for the :module:`sghi.etl.commons.processors` module."""
"""Tests for the :module:`sghi.etl.commons.sources` module."""

from __future__ import annotations

Expand Down

0 comments on commit d3925b9

Please sign in to comment.