Skip to content

Commit

Permalink
feat(sinks): add a NullSink
Browse files Browse the repository at this point in the history
Add `sghi.etl.commons.sinks.NullSink`, a `Sink` that discards all the
data it receives. This is mostly useful as a placeholder or where
consumption of processed data is not required.
  • Loading branch information
kennedykori committed Apr 15, 2024
1 parent e6f08e5 commit 226e640
Show file tree
Hide file tree
Showing 3 changed files with 146 additions and 2 deletions.
3 changes: 2 additions & 1 deletion src/sghi/etl/commons/__init__.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
"""Collection of utilities for working with SGHI ETL Workflows."""

from .processors import NOOPProcessor, processor
from .sinks import sink
from .sinks import NullSink, sink
from .sources import source
from .utils import fail_fast, fail_fast_factory, ignored_failed

__all__ = [
"NOOPProcessor",
"NullSink",
"fail_fast",
"fail_fast_factory",
"ignored_failed",
Expand Down
75 changes: 75 additions & 0 deletions src/sghi/etl/commons/sinks.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,80 @@ def sink(f: Callable[[_PDT], None]) -> Sink[_PDT]:
# =============================================================================


@final
class NullSink(Sink[_PDT], Generic[_PDT]):
"""A :class:`Sink` that discards all the data it receives.
Like to ``dev/null`` on Unix, instances of this ``Sink`` discard all data
drained to them but report the drain operation as successful. This is
mostly useful as a placeholder or where further consumption of processed
data is not required.
.. admonition:: Regarding retry safety
:class: tip
Instances of this ``Sink`` are idempotent and thus inherently safe to
retry.
"""

__slots__ = ("_is_disposed", "_logger")

def __init__(self) -> None:
"""Create a new ``NullSink`` instance."""
super().__init__()
self._is_disposed: bool = False
self._logger: Logger = logging.getLogger(type_fqn(self.__class__))

@not_disposed
@override
def __enter__(self) -> Self:
"""Return ``self`` upon entering the runtime context.
.. admonition:: Don't use after dispose
:class: error
Invoking this method on an instance that is disposed(i.e. the
:attr:`is_disposed` property on the instance is ``True``) will
result in a :exc:`ResourceDisposedError` being raised.
:return: This instance.
:raise ResourceDisposedError: If this sink has already been disposed.
"""
return super(Sink, self).__enter__()

@property
@override
def is_disposed(self) -> bool:
return self._is_disposed

@not_disposed
@override
def drain(self, processed_data: _PDT) -> None:
"""Discard all the received data.
.. admonition:: Don't use after dispose
:class: error
Invoking this method on an instance that is disposed(i.e. the
:attr:`is_disposed` property on the instance is ``True``) will
result in a :exc:`ResourceDisposedError` being raised.
:param processed_data: The processed data to consume/drain.
:return: None.
:raise ResourceDisposedError: If this sink has already been disposed.
"""
self._logger.info("Discarding all received data.")
# Do nothing with the received data.

@override
def dispose(self) -> None:
self._is_disposed = True
self._logger.info("Disposal complete.")


@final
class _SinkOfCallable(Sink[_PDT], Generic[_PDT]):
__slots__ = ("_delegate_to", "_is_disposed", "_logger")
Expand Down Expand Up @@ -152,5 +226,6 @@ def dispose(self) -> None:


__all__ = [
"NullSink",
"sink",
]
70 changes: 69 additions & 1 deletion test/sghi/etl/commons_tests/sinks_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@
from __future__ import annotations

from typing import TYPE_CHECKING
from unittest import TestCase

import pytest

from sghi.disposable import ResourceDisposedError
from sghi.etl.commons import sink
from sghi.etl.commons import NullSink, sink
from sghi.etl.core import Sink

if TYPE_CHECKING:
Expand Down Expand Up @@ -98,3 +99,70 @@ def save_ints(values: Iterable[int]) -> None:

with pytest.raises(ResourceDisposedError):
save_ints.__enter__()


class TestNullSink(TestCase):
"""Tests for the :class:`sghi.etl.commons.NullSInk` class."""

def test_dispose_has_the_intended_side_effects(self) -> None:
"""Calling :meth:`NullSink.dispose` should result in the
:attr:`NullSink.is_disposed` property being set to ``True``.
"""
instance = NullSink()
instance.dispose()

assert instance.is_disposed

def test_multiple_dispose_invocations_is_okay(self) -> None:
"""Calling :meth:`NullSink.dispose` should be okay.
No errors should be raised and the object should remain disposed.
"""
instance = NullSink()

for _ in range(10):
try:
instance.dispose()
except Exception as exc: # noqa: BLE001
fail_reason: str = (
"Calling 'NullSink.dispose()' multiple times should be "
f"okay. But the following error was raised: {exc!s}"
)
pytest.fail(fail_reason)

assert instance.is_disposed

def test_usage_as_a_context_manager_behaves_as_expected(self) -> None:
""":class:`NullSink` instances are valid context managers and should
behave correctly when used as so.
"""
processed_data: list[str] = [
"some",
"very",
"important",
"processed",
"data",
]
with NullSink() as _sink:
_sink.drain(processed_data)

assert _sink.is_disposed

def test_usage_when_is_disposed_fails(self) -> None:
"""Invoking "resource-aware" methods of a disposed instance should
result in an :exc:`ResourceDisposedError` being raised.
Specifically, invoking the following two methods on a disposed instance
should fail:
- :meth:`NullSink.__enter__`
- :meth:`NullSink.apply`
"""
instance = NullSink()
instance.dispose()

with pytest.raises(ResourceDisposedError):
instance.drain("some processed data.")

with pytest.raises(ResourceDisposedError):
instance.__enter__()

0 comments on commit 226e640

Please sign in to comment.