From 226e6401206d30b1ca2a20e95286daed96b86eac Mon Sep 17 00:00:00 2001 From: Kennedy Kori Date: Mon, 15 Apr 2024 11:37:53 +0300 Subject: [PATCH] feat(sinks): add a `NullSink` Add `sghi.etl.commons.sinks.NullSink`, a `Sink` that discards all the data it receives. This is mostly useful as a placeholder or where consumption of processed data is not required. --- src/sghi/etl/commons/__init__.py | 3 +- src/sghi/etl/commons/sinks.py | 75 ++++++++++++++++++++++ test/sghi/etl/commons_tests/sinks_tests.py | 70 +++++++++++++++++++- 3 files changed, 146 insertions(+), 2 deletions(-) diff --git a/src/sghi/etl/commons/__init__.py b/src/sghi/etl/commons/__init__.py index 1e2e842..494eac8 100644 --- a/src/sghi/etl/commons/__init__.py +++ b/src/sghi/etl/commons/__init__.py @@ -1,12 +1,13 @@ """Collection of utilities for working with SGHI ETL Workflows.""" from .processors import NOOPProcessor, processor -from .sinks import sink +from .sinks import NullSink, sink from .sources import source from .utils import fail_fast, fail_fast_factory, ignored_failed __all__ = [ "NOOPProcessor", + "NullSink", "fail_fast", "fail_fast_factory", "ignored_failed", diff --git a/src/sghi/etl/commons/sinks.py b/src/sghi/etl/commons/sinks.py index 3651954..d883d3a 100644 --- a/src/sghi/etl/commons/sinks.py +++ b/src/sghi/etl/commons/sinks.py @@ -79,6 +79,80 @@ def sink(f: Callable[[_PDT], None]) -> Sink[_PDT]: # ============================================================================= +@final +class NullSink(Sink[_PDT], Generic[_PDT]): + """A :class:`Sink` that discards all the data it receives. + + Like to ``dev/null`` on Unix, instances of this ``Sink`` discard all data + drained to them but report the drain operation as successful. This is + mostly useful as a placeholder or where further consumption of processed + data is not required. + + .. admonition:: Regarding retry safety + :class: tip + + Instances of this ``Sink`` are idempotent and thus inherently safe to + retry. + """ + + __slots__ = ("_is_disposed", "_logger") + + def __init__(self) -> None: + """Create a new ``NullSink`` instance.""" + super().__init__() + self._is_disposed: bool = False + self._logger: Logger = logging.getLogger(type_fqn(self.__class__)) + + @not_disposed + @override + def __enter__(self) -> Self: + """Return ``self`` upon entering the runtime context. + + .. admonition:: Don't use after dispose + :class: error + + Invoking this method on an instance that is disposed(i.e. the + :attr:`is_disposed` property on the instance is ``True``) will + result in a :exc:`ResourceDisposedError` being raised. + + :return: This instance. + + :raise ResourceDisposedError: If this sink has already been disposed. + """ + return super(Sink, self).__enter__() + + @property + @override + def is_disposed(self) -> bool: + return self._is_disposed + + @not_disposed + @override + def drain(self, processed_data: _PDT) -> None: + """Discard all the received data. + + .. admonition:: Don't use after dispose + :class: error + + Invoking this method on an instance that is disposed(i.e. the + :attr:`is_disposed` property on the instance is ``True``) will + result in a :exc:`ResourceDisposedError` being raised. + + :param processed_data: The processed data to consume/drain. + + :return: None. + + :raise ResourceDisposedError: If this sink has already been disposed. + """ + self._logger.info("Discarding all received data.") + # Do nothing with the received data. + + @override + def dispose(self) -> None: + self._is_disposed = True + self._logger.info("Disposal complete.") + + @final class _SinkOfCallable(Sink[_PDT], Generic[_PDT]): __slots__ = ("_delegate_to", "_is_disposed", "_logger") @@ -152,5 +226,6 @@ def dispose(self) -> None: __all__ = [ + "NullSink", "sink", ] diff --git a/test/sghi/etl/commons_tests/sinks_tests.py b/test/sghi/etl/commons_tests/sinks_tests.py index 069bb4b..1a5d22f 100644 --- a/test/sghi/etl/commons_tests/sinks_tests.py +++ b/test/sghi/etl/commons_tests/sinks_tests.py @@ -4,11 +4,12 @@ from __future__ import annotations from typing import TYPE_CHECKING +from unittest import TestCase import pytest from sghi.disposable import ResourceDisposedError -from sghi.etl.commons import sink +from sghi.etl.commons import NullSink, sink from sghi.etl.core import Sink if TYPE_CHECKING: @@ -98,3 +99,70 @@ def save_ints(values: Iterable[int]) -> None: with pytest.raises(ResourceDisposedError): save_ints.__enter__() + + +class TestNullSink(TestCase): + """Tests for the :class:`sghi.etl.commons.NullSInk` class.""" + + def test_dispose_has_the_intended_side_effects(self) -> None: + """Calling :meth:`NullSink.dispose` should result in the + :attr:`NullSink.is_disposed` property being set to ``True``. + """ + instance = NullSink() + instance.dispose() + + assert instance.is_disposed + + def test_multiple_dispose_invocations_is_okay(self) -> None: + """Calling :meth:`NullSink.dispose` should be okay. + + No errors should be raised and the object should remain disposed. + """ + instance = NullSink() + + for _ in range(10): + try: + instance.dispose() + except Exception as exc: # noqa: BLE001 + fail_reason: str = ( + "Calling 'NullSink.dispose()' multiple times should be " + f"okay. But the following error was raised: {exc!s}" + ) + pytest.fail(fail_reason) + + assert instance.is_disposed + + def test_usage_as_a_context_manager_behaves_as_expected(self) -> None: + """:class:`NullSink` instances are valid context managers and should + behave correctly when used as so. + """ + processed_data: list[str] = [ + "some", + "very", + "important", + "processed", + "data", + ] + with NullSink() as _sink: + _sink.drain(processed_data) + + assert _sink.is_disposed + + def test_usage_when_is_disposed_fails(self) -> None: + """Invoking "resource-aware" methods of a disposed instance should + result in an :exc:`ResourceDisposedError` being raised. + + Specifically, invoking the following two methods on a disposed instance + should fail: + + - :meth:`NullSink.__enter__` + - :meth:`NullSink.apply` + """ + instance = NullSink() + instance.dispose() + + with pytest.raises(ResourceDisposedError): + instance.drain("some processed data.") + + with pytest.raises(ResourceDisposedError): + instance.__enter__()