Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(processors): add a processor decorator #4

Merged
merged 1 commit into from
Apr 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
nitpicky = True

nitpick_ignore = [
("py:class", "Processor"), # docs aren't published yet
("py:class", "TracebackType"), # Used as type annotation. Only available when type checking
("py:class", "concurrent.futures._base.Future"), # sphinx can't find it
("py:class", "sghi.etl.commons.processors._RDT"), # private type annotations
Expand Down
3 changes: 2 additions & 1 deletion src/sghi/etl/commons/__init__.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
"""Collection of utilities for working with SGHI ETL Workflows."""

from .processors import NOOPProcessor
from .processors import NOOPProcessor, processor
from .utils import fail_fast, fail_fast_factory, ignored_failed

__all__ = [
"NOOPProcessor",
"fail_fast",
"fail_fast_factory",
"ignored_failed",
"processor",
]
129 changes: 126 additions & 3 deletions src/sghi/etl/commons/processors.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,26 +3,80 @@
from __future__ import annotations

import logging
from collections.abc import Callable
from functools import update_wrapper
from logging import Logger
from typing import Generic, Self, TypeVar, final
from typing import Final, Generic, Self, TypeVar, final

from typing_extensions import override

from sghi.disposable import not_disposed
from sghi.etl.core import Processor
from sghi.utils import type_fqn
from sghi.utils import ensure_callable, type_fqn

# =============================================================================
# TYPES
# =============================================================================


_PDT = TypeVar("_PDT") # noqa: PYI018
_PDT = TypeVar("_PDT")
""""Type variable representing the data type after processing."""

_RDT = TypeVar("_RDT")
"""Type variable representing the raw data type."""

_ProcessorCallable = Callable[[_RDT], _PDT]


# =============================================================================
# CONSTANTS
# =============================================================================


_OF_CALLABLE_LOGGER_PREFIX: Final[str] = f"{__name__}.@processor"


# =============================================================================
# DECORATORS
# =============================================================================


def processor(f: Callable[[_RDT], _PDT]) -> Processor[_RDT, _PDT]:
"""Mark/decorate a ``Callable`` as a :class:`Processor`.

The result is that the callable is converted into a ``Processor`` instance.
When used as a decorator, invoking the decorated callable has the same
effect as invoking the ``apply`` method of the resulting ``Processor``
instance.

.. important::

The decorated callable *MUST* accept at least one argument but have
at *MOST* one required argument.

.. note::

The resulting values are true ``Processor`` instances that can be
disposed. Once disposed, any attempts to invoke these instances will
result in an :exc:`ResourceDisposedError` being raised.

.. admonition:: Regarding retry safety
:class: tip

The resulting ``Processor`` is safe to retry if and only if, the
decorated callable is safe to retry.

:param f: The callable to be decorated. The callable *MUST* have at *MOST*
one required argument (the raw data to be processed).

:return: A ``Processor`` instance.

:raise ValueError: If the given value is NOT a ``Callable``.
"""
ensure_callable(f, message="A callable object is required.")

return _ProcessorOfCallable(delegate_to=f)


# =============================================================================
# PROCESSOR IMPLEMENTATIONS
Expand Down Expand Up @@ -107,11 +161,80 @@ def dispose(self) -> None:
self._logger.info("Disposal complete.")


class _ProcessorOfCallable(Processor[_RDT, _PDT], Generic[_RDT, _PDT]):
__slots__ = ("_delegate_to", "_is_disposed", "_logger")

def __init__(self, delegate_to: _ProcessorCallable[_RDT, _PDT]) -> None:
super().__init__()
ensure_callable(
value=delegate_to,
message="'delegate_to' MUST be a callable object.",
)
self._delegate_to: _ProcessorCallable[_RDT, _PDT] = delegate_to
self._is_disposed: bool = False
self._logger: Logger = logging.getLogger(
f"{_OF_CALLABLE_LOGGER_PREFIX}({type_fqn(self._delegate_to)})"
)
update_wrapper(self, self._delegate_to)

@not_disposed
@override
def __enter__(self) -> Self:
"""Return ``self`` upon entering the runtime context.

.. admonition:: Don't use after dispose
:class: error

Invoking this method on an instance that is disposed(i.e. the
:attr:`is_disposed` property on the instance is ``True``) will
result in a :exc:`ResourceDisposedError` being raised.

:return: This instance.

:raise ResourceDisposedError: If this processor has already been
disposed.
"""
return super(Processor, self).__enter__()

@property
@override
def is_disposed(self) -> bool:
return self._is_disposed

@not_disposed
@override
def apply(self, raw_data: _RDT) -> _PDT:
"""Delegate processing to the wrapped callable.

.. admonition:: Don't use after dispose
:class: error

Invoking this method on an instance that is disposed(i.e. the
:attr:`is_disposed` property on the instance is ``True``) will
result in a :exc:`ResourceDisposedError` being raised.

:param raw_data: The data to be processed.

:return: The processed data as returned by the wrapped callable.

:raise ResourceDisposedError: If this processor has already been
disposed.
"""
self._logger.info("Delegating to '%s'.", type_fqn(self._delegate_to))
return self._delegate_to(raw_data)

@override
def dispose(self) -> None:
self._is_disposed = True
self._logger.info("Disposal complete.")


# =============================================================================
# MODULE EXPORTS
# =============================================================================


__all__ = [
"NOOPProcessor",
"processor",
]
104 changes: 100 additions & 4 deletions test/sghi/etl/commons_tests/processors_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,103 @@
import pytest

from sghi.disposable import ResourceDisposedError
from sghi.etl.commons import NOOPProcessor
from sghi.etl.commons import NOOPProcessor, processor
from sghi.etl.core import Processor
from sghi.task import task


def test_processor_decorator_delegates_to_the_wrapped_callable() -> None:
""":func:`processor` should delegate to the wrapped callable when
invoked.
"""
int_to_str: Processor[int, str] = processor(str)

def add_100(value: int) -> int:
return value + 100

add_100_processor: Processor[int, int] = processor(add_100)

assert int_to_str(3) == str(3) == "3"
assert int_to_str(10) == str(10) == "10"
assert add_100_processor(10) == add_100(10) == 110
assert add_100_processor(-10) == add_100(-10) == 90


def test_processor_decorator_fails_on_non_callable_input_value() -> None:
""":func:`processor` should raise a :exc:`ValueError` when given a
non-callable` value.
"""
with pytest.raises(ValueError, match="callable object") as exc_info:
processor("Not a function") # type: ignore

assert exc_info.value.args[0] == "A callable object is required."


def test_processor_decorator_fails_on_a_none_input_value() -> None:
""":func:`processor` should raise a :exc:`ValueError` when given a ``None``
value.
"""
with pytest.raises(ValueError, match="callable object") as exc_info:
processor(None) # type: ignore

assert exc_info.value.args[0] == "A callable object is required."


def test_processor_decorator_returns_expected_value() -> None:
""":func:`processor` should return a ``Processor`` instance."""

@processor
def int_to_str(value: int) -> str:
return str(value)

def add_100(value: int) -> int:
return value + 100

add_100_processor: Processor[int, int] = processor(add_100)

assert isinstance(int_to_str, Processor)
assert isinstance(add_100_processor, Processor)


def test_processor_decorated_value_usage_as_a_context_manager() -> None:
""":func:`processor` decorated callables are valid context managers and
should behave correctly when used as so.
"""

@task
def add_100(value: int) -> int:
return value + 100

@task
def int_to_str(value: int) -> str:
return str(value)

with processor(add_100 >> int_to_str) as _processor:
result: str = _processor(10)

assert result == "110"
assert _processor.is_disposed


def test_processor_decorated_value_usage_when_is_disposed_fails() -> None:
"""Usage of a :func:`processor` decorated callable should raise
:exc:`ResourceDisposedError` when invoked after being disposed.
"""

@processor
def int_to_str(value: int) -> str:
return str(value)

int_to_str.dispose()

with pytest.raises(ResourceDisposedError):
int_to_str(10)

with pytest.raises(ResourceDisposedError):
int_to_str.apply(10)

with pytest.raises(ResourceDisposedError):
int_to_str.__enter__()


class TestNOOPProcessor(TestCase):
Expand Down Expand Up @@ -65,11 +161,11 @@ def test_usage_as_a_context_manager_behaves_as_expected(self) -> None:
should behave correctly when used as so.
"""
raw_data: list[str] = ["some", "very", "important", "raw", "data"]
with NOOPProcessor() as processor:
clean_data = processor.apply(raw_data)
with NOOPProcessor() as _processor:
clean_data = _processor.apply(raw_data)
assert clean_data is raw_data

assert processor.is_disposed
assert _processor.is_disposed

def test_usage_when_is_disposed_fails(self) -> None:
"""Invoking "resource-aware" methods of a disposed instance should
Expand Down
Loading