Skip to content

Commit

Permalink
feat(processors): add a NOOP Processor
Browse files Browse the repository at this point in the history
Add a new `sghi.etl.core.Processor` implementation, `NOOPProcessor`.
This processor simply returns any data it receives without any
modifications. It can be used as a placeholder or for situations where
data transformation is not needed.
  • Loading branch information
kennedykori committed Apr 14, 2024
1 parent 6bee92c commit 7ad1deb
Show file tree
Hide file tree
Showing 4 changed files with 230 additions and 1 deletion.
11 changes: 11 additions & 0 deletions docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,19 @@
nitpicky = True

nitpick_ignore = [
("py:class", "TracebackType"), # Used as type annotation. Only available when type checking
("py:class", "concurrent.futures._base.Future"), # sphinx can't find it
("py:class", "sghi.etl.commons.processors._RDT"), # private type annotations
("py:class", "sghi.etl.commons.processors._PDT"), # private type annotations
("py:class", "sghi.etl.commons.utils.result_gatherers._T"), # private type annotations
("py:class", "sghi.etl.core._RDT"), # private type annotations
("py:class", "sghi.etl.core._PDT"), # private type annotations
("py:class", "sghi.etl.core.Processor"), # docs aren't published yet
("py:exc", "ResourceDisposedError"), # docs aren't published yet
("py:exc", "sghi.disposable.ResourceDisposedError"), # docs aren't published yet
("py:func", "sghi.disposable.not_disposed"), # docs aren't published yet
("py:obj", "sghi.etl.commons.processors._PDT"), # private type annotations
("py:obj", "sghi.etl.commons.processors._RDT"), # private type annotations
]

templates_path = ["templates"]
Expand Down
12 changes: 11 additions & 1 deletion src/sghi/etl/commons/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1,11 @@
"""Collection of utilities for working with SGHI ETL Worflows."""
"""Collection of utilities for working with SGHI ETL Workflows."""

from .processors import NOOPProcessor
from .utils import fail_fast, fail_fast_factory, ignored_failed

__all__ = [
"NOOPProcessor",
"fail_fast",
"fail_fast_factory",
"ignored_failed",
]
117 changes: 117 additions & 0 deletions src/sghi/etl/commons/processors.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
"""Common :class:`~sghi.etl.core.Processor` implementations."""

from __future__ import annotations

import logging
from logging import Logger
from typing import Generic, Self, TypeVar, final

from typing_extensions import override

from sghi.disposable import not_disposed
from sghi.etl.core import Processor
from sghi.utils import type_fqn

# =============================================================================
# TYPES
# =============================================================================


_PDT = TypeVar("_PDT") # noqa: PYI018
""""Type variable representing the data type after processing."""

_RDT = TypeVar("_RDT")
"""Type variable representing the raw data type."""


# =============================================================================
# PROCESSOR IMPLEMENTATIONS
# =============================================================================


@final
class NOOPProcessor(Processor[_RDT, _RDT], Generic[_RDT]):
"""A ``Processor`` that simply returns the received raw data as it is.
This :class:`~sghi.etl.core.Processor` subclass is a "no-operation" (NOOP)
processor. When the :meth:`apply` method is invoked on its instances, it
returns the received raw data (without any modifications) as processed data
to its caller. This can be useful as a placeholder processor or for
situations where data transformation is not needed.
.. admonition:: Regarding retry safety
:class: tip
Instances of this ``Processor`` are idempotent and thus inherently safe
to retry.
"""

__slots__ = ("_is_disposed", "_logger")

def __init__(self) -> None:
"""Create a ``NOOPProcessor`` instance."""
self._is_disposed: bool = False
self._logger: Logger = logging.getLogger(type_fqn(self.__class__))

@not_disposed
@override
def __enter__(self) -> Self:
"""Return ``self`` upon entering the runtime context.
.. admonition:: Don't use after dispose
:class: error
Invoking this method on an instance that is disposed(i.e. the
:attr:`is_disposed` property on the instance is ``True``) will
result in a :exc:`ResourceDisposedError` being raised.
:return: This instance.
:raise ResourceDisposedError: If this processor has already been
disposed.
"""
return super(Processor, self).__enter__()

@property
@override
def is_disposed(self) -> bool:
return self._is_disposed

@not_disposed
@override
def apply(self, raw_data: _RDT) -> _RDT:
"""Apply the processing logic (NOOP in this case).
Return the received raw data without any modifications to the caller.
.. admonition:: Don't use after dispose
:class: error
Invoking this method on an instance that is disposed(i.e. the
:attr:`is_disposed` property on the instance is ``True``) will
result in a :exc:`ResourceDisposedError` being raised.
:param raw_data: The data to be processed.
:return: The raw data itself (without any modifications).
:raise ResourceDisposedError: If this processor has already been
disposed.
"""
self._logger.info("Skipping data processing. Return raw data as is.")
return raw_data

@override
def dispose(self) -> None:
self._is_disposed = True
self._logger.info("Disposal complete.")


# =============================================================================
# MODULE EXPORTS
# =============================================================================


__all__ = [
"NOOPProcessor",
]
91 changes: 91 additions & 0 deletions test/sghi/etl/commons_tests/processors_tests.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
# ruff: noqa: D205
"""Tests for the :module:`sghi.etl.commons.processors` module."""

from __future__ import annotations

from unittest import TestCase

import pytest

from sghi.disposable import ResourceDisposedError
from sghi.etl.commons import NOOPProcessor


class TestNOOPProcessor(TestCase):
"""Tests for the :class:`sghi.etl.commons.NOOPProcessor` class."""

def test_apply_returns_the_expected_value(self) -> None:
""":meth:`NOOPProcessor.apply` should return its argument without any
modifications.
"""
raw_data1: list[str] = ["some", "very", "important", "raw", "data"]
raw_data2: str = "some very important raw data"
raw_data3: int = 37
raw_data4: str | None = None

instance = NOOPProcessor()

assert instance.apply(raw_data1) is raw_data1
assert instance.apply(raw_data2) is raw_data2
assert instance.apply(raw_data3) == raw_data3
assert instance.apply(raw_data4) is None

instance.dispose()

def test_dispose_has_the_intended_side_effects(self) -> None:
"""Calling :meth:`NOOPProcessor.dispose` should result in the
:attr:`NOOPProcessor.is_disposed` property being set to ``True``.
"""
instance = NOOPProcessor()
instance.dispose()

assert instance.is_disposed

def test_multiple_dispose_invocations_is_okay(self) -> None:
"""Calling :meth:`NOOPProcessor.dispose` should be okay.
No errors should be raised and the object should remain disposed.
"""
instance = NOOPProcessor()

for _ in range(10):
try:
instance.dispose()
except Exception as exc: # noqa: BLE001
fail_reason: str = (
"Calling 'NOOPProcessor.dispose()' multiple times should "
f"be okay. But the following error was raised: {exc!s}"
)
pytest.fail(fail_reason)

assert instance.is_disposed

def test_usage_as_a_context_manager_behaves_as_expected(self) -> None:
""":class:`NOOPProcessor` instances are valid context managers and
should behave correctly when used as so.
"""
raw_data: list[str] = ["some", "very", "important", "raw", "data"]
with NOOPProcessor() as processor:
clean_data = processor.apply(raw_data)
assert clean_data is raw_data

assert processor.is_disposed

def test_usage_when_is_disposed_fails(self) -> None:
"""Invoking "resource-aware" methods of a disposed instance should
result in an :exc:`ResourceDisposedError` being raised.
Specifically, invoking the following two methods on a disposed instance
should fail:
- :meth:`NOOPProcessor.__enter__`
- :meth:`NOOPProcessor.apply`
"""
instance = NOOPProcessor()
instance.dispose()

with pytest.raises(ResourceDisposedError):
instance.apply("some raw data.")

with pytest.raises(ResourceDisposedError):
instance.__enter__()

0 comments on commit 7ad1deb

Please sign in to comment.