Skip to content

Commit

Permalink
feat(processors): add a ProcessorPipe
Browse files Browse the repository at this point in the history
Add `sghi.etl.commons.processors.ProcessorPipe`, a `Processor` that
pipes the raw data applied to it through a series of other `Processor`
instances, passing the output of one `Processor` as the input to the
next.
  • Loading branch information
kennedykori committed Apr 15, 2024
1 parent 2794f6a commit 704a302
Show file tree
Hide file tree
Showing 5 changed files with 311 additions and 7 deletions.
1 change: 1 addition & 0 deletions docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
("py:class", "sghi.etl.core.Processor"), # docs aren't published yet
("py:class", "sghi.etl.core.Sink"), # docs aren't published yet
("py:class", "sghi.etl.core.Source"), # docs aren't published yet
("py:class", "sghi.retry.Retry"), # docs aren't published yet
("py:exc", "ResourceDisposedError"), # docs aren't published yet
("py:exc", "sghi.disposable.ResourceDisposedError"), # docs aren't published yet
("py:func", "sghi.disposable.not_disposed"), # docs aren't published yet
Expand Down
9 changes: 8 additions & 1 deletion src/sghi/etl/commons/__init__.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,23 @@
"""Collection of utilities for working with SGHI ETL Workflows."""

from .processors import NOOPProcessor, processor
from .processors import (
NOOPProcessor,
ProcessorPipe,
pipe_processors,
processor,
)
from .sinks import NullSink, sink
from .sources import source
from .utils import fail_fast, fail_fast_factory, ignored_failed

__all__ = [
"NOOPProcessor",
"NullSink",
"ProcessorPipe",
"fail_fast",
"fail_fast_factory",
"ignored_failed",
"pipe_processors",
"processor",
"sink",
"source",
Expand Down
175 changes: 172 additions & 3 deletions src/sghi/etl/commons/processors.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,19 @@
from __future__ import annotations

import logging
from collections.abc import Callable
from collections.abc import Callable, Sequence
from contextlib import ExitStack
from functools import update_wrapper
from logging import Logger
from typing import Final, Generic, Self, TypeVar, final
from typing import Any, Final, Generic, Self, TypeVar, final

from typing_extensions import override

from sghi.disposable import not_disposed
from sghi.etl.core import Processor
from sghi.utils import ensure_callable, type_fqn
from sghi.retry import Retry, noop_retry
from sghi.task import Task, pipe, task
from sghi.utils import ensure_callable, ensure_not_none_nor_empty, type_fqn

# =============================================================================
# TYPES
Expand Down Expand Up @@ -161,6 +164,170 @@ def dispose(self) -> None:
self._logger.info("Disposal complete.")


@final
class ProcessorPipe(Processor[_RDT, _PDT], Generic[_RDT, _PDT]):
"""A :class:`Processor` that pipes raw data to other embedded processors.
This ``Processor`` pipes the raw data applied to it through a series of
other ``Processor`` instances, passing the output of one ``Processor`` as
the input to the next. If an unhandled error occurs in one of the embedded
processors, the entire pipeline fails and propagates the error to the
caller.
Instances of this class are **NOT SAFE** to retry and **SHOULD NEVER** be
retried. However, they do support retrying their embedded processors. This
is disabled by default but can be enabled by providing a suitable value to
the ``retry_policy_factory`` constructor parameter when creating new
instances. When enabled, each embedded processor will be retried
individually per the specified retry policy in case it fails.
Disposing instances of this class also disposes of their embedded
processors.
.. admonition:: Regarding retry safety
:class: tip
Instances of this ``Processor`` are **NOT SAFE** to retry.
"""

__slots__ = (
"_processors",
"_retry_policy_factory",
"_is_disposed",
"_logger",
"_exit_stack",
"_prepped_processors",
)

def __init__(
self,
processors: Sequence[Processor[Any, Any]],
retry_policy_factory: Callable[[], Retry] = noop_retry,
) -> None:
"""Create a new ``ProcessorPipe`` instance with the given properties.
:param processors: A ``Sequence`` of processors to pipe the raw data
applied to this processor. This *MUST NOT* be empty.
:param retry_policy_factory: A function that supplies retry policy
instance(s) to apply to each embedded processor. This MUST be a
callable object. Defaults to a factory that returns retry policies
that do nothing.
:raise ValueError: If ``processors`` is ``None`` or empty, or if
``retry_policy_factory`` is NOT a callable object.
"""
super().__init__()
ensure_not_none_nor_empty(
value=processors,
message="'processors' MUST NOT be None or empty.",
)
self._processors: Sequence[Processor[Any, Any]]
self._processors = tuple(processors)
self._retry_policy_factory: Callable[[], Retry] = ensure_callable(
value=retry_policy_factory,
message="'retry_policy_factory' MUST be a callable.",
)
self._is_disposed: bool = False
self._logger: Logger = logging.getLogger(type_fqn(self.__class__))
self._exit_stack: ExitStack = ExitStack()

# Prepare embedded processors for execution by ensuring that they are
# all disposed of properly once this object is disposed.
self._prepped_processors: Sequence[Task[Any, Any]] = tuple(
self._processor_to_task(self._exit_stack.push(_processor))
for _processor in self._processors
)

@not_disposed
@override
def __enter__(self) -> Self:
"""Return ``self`` upon entering the runtime context.
.. admonition:: Don't use after dispose
:class: error
Invoking this method on an instance that is disposed(i.e. the
:attr:`is_disposed` property on the instance is ``True``) will
result in a :exc:`ResourceDisposedError` being raised.
:return: This instance.
:raise ResourceDisposedError: If this processor has already been
disposed.
"""
return super(Processor, self).__enter__()

@property
@override
def is_disposed(self) -> bool:
return self._is_disposed

@not_disposed
@override
def apply(self, raw_data: _RDT) -> _PDT:
"""Pipe the given raw data through all the embedded processors.
The output of each embedded ``Processor`` becomes the input to the next
one. The result of the final ``Processor`` is the output of this apply
operation. If an unhandled error occurs in one of the embedded
processors, the entire operation fails and propagates the error to the
caller.
.. admonition:: Don't use after dispose
:class: error
Invoking this method on an instance that is disposed(i.e. the
:attr:`is_disposed` property on the instance is ``True``) will
result in a :exc:`ResourceDisposedError` being raised.
:param raw_data: The data to be processed.
:return: The processed data after being piped through the embedded
processors.
:raise ResourceDisposedError: If this processor has already been
disposed.
"""
self._logger.info("Piping received data through all processors.")
return pipe(*self._prepped_processors).execute(raw_data)

@override
def dispose(self) -> None:
"""Release any underlying resources contained by this processor.
All embedded processors are also disposed. After this method returns
successfully, the :attr:`is_disposed` property should return ``True``.
.. note::
Unless otherwise specified, trying to use methods of a
``Disposable`` instance decorated with the
:func:`~sghi.disposable.not_disposed` decorator after this method
returns should generally be considered a programming error and
should result in a :exc:`~sghi.disposable.ResourceDisposedError`
being raised.
This method should be idempotent allowing it to be called more
than once; only the first call, however, should have an effect.
:return: None.
"""
self._is_disposed = True
self._exit_stack.close()
self._logger.info("Disposal complete.")

def _processor_to_task(self, p: Processor[_RDT, _PDT]) -> Task[_RDT, _PDT]:
@task
def do_apply(raw_data: _RDT) -> _PDT:
with p as _p:
apply = self._retry_policy_factory().retry(_p.apply)
return apply(raw_data)

return do_apply


pipe_processors = ProcessorPipe


@final
class _ProcessorOfCallable(Processor[_RDT, _PDT], Generic[_RDT, _PDT]):
__slots__ = ("_delegate_to", "_is_disposed", "_logger")
Expand Down Expand Up @@ -237,5 +404,7 @@ def dispose(self) -> None:

__all__ = [
"NOOPProcessor",
"ProcessorPipe",
"pipe_processors",
"processor",
]
131 changes: 129 additions & 2 deletions test/sghi/etl/commons_tests/processors_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,20 @@

from __future__ import annotations

from typing import TYPE_CHECKING
from unittest import TestCase

import pytest
from typing_extensions import override

from sghi.disposable import ResourceDisposedError
from sghi.etl.commons import NOOPProcessor, processor
from sghi.etl.commons import NOOPProcessor, ProcessorPipe, processor
from sghi.etl.core import Processor
from sghi.task import task

if TYPE_CHECKING:
from collections.abc import Iterable, Sequence


def test_processor_decorator_delegates_to_the_wrapped_callable() -> None:
""":func:`processor` should delegate to the wrapped callable when
Expand Down Expand Up @@ -138,7 +143,7 @@ def test_dispose_has_the_intended_side_effects(self) -> None:
assert instance.is_disposed

def test_multiple_dispose_invocations_is_okay(self) -> None:
"""Calling :meth:`NOOPProcessor.dispose` should be okay.
"""Calling :meth:`NOOPProcessor.dispose` multiple times should be okay.
No errors should be raised and the object should remain disposed.
"""
Expand Down Expand Up @@ -185,3 +190,125 @@ def test_usage_when_is_disposed_fails(self) -> None:

with pytest.raises(ResourceDisposedError):
instance.__enter__()


class TestProcessorPipe(TestCase):
"""Tests for the :class:`sghi.etl.commons.ProcessorPipe` class."""

@override
def setUp(self) -> None:
super().setUp()

@processor
def add_65(ints: Iterable[int]) -> Iterable[int]:
yield from (v + 65 for v in ints)

@processor
def ints_to_chars(ints: Iterable[int]) -> Iterable[str]:
yield from map(chr, ints)

@processor
def join_chars(values: Iterable[str]) -> str:
return "".join(list(values))

self._embedded_processors: Sequence[Processor] = [
add_65,
ints_to_chars,
join_chars,
]
self._instance: Processor[Iterable[int], str] = ProcessorPipe(
processors=self._embedded_processors,
)

@override
def tearDown(self) -> None:
super().tearDown()
self._instance.dispose()

def test_apply_returns_the_expected_value(self) -> None:
""":meth:`ProcessorPipe.apply` should return the result after applying
the given raw data through its embedded processors.
"""
assert self._instance.apply(range(10)) == "ABCDEFGHIJ"

def test_instantiation_fails_on_none_processors_argument(self) -> None:
"""Instantiating a :class:`ProcessorPipe` with a ``None``
``processors`` argument should raise a :exc:`ValueError`.
"""
with pytest.raises(ValueError, match="None or empty") as exp_info:
ProcessorPipe(processors=None) # type: ignore

assert (
exp_info.value.args[0] == "'processors' MUST NOT be None or empty."
)

def test_instantiation_fails_on_an_empty_processors_argument(self) -> None:
"""Instantiating a :class:`ProcessorPipe` with an empty
``processors`` argument should raise a :exc:`ValueError`.
"""
with pytest.raises(ValueError, match="None or empty") as exp_info:
ProcessorPipe(processors=[])

assert (
exp_info.value.args[0] == "'processors' MUST NOT be None or empty."
)

def test_dispose_has_the_intended_side_effects(self) -> None:
"""Calling :meth:`ProcessorPipe.dispose` should result in the
:attr:`ProcessorPipe.is_disposed` property being set to ``True``.
Each embedded ``Processor`` should also be disposed.
"""
self._instance.dispose()

assert self._instance.is_disposed
for _processor in self._embedded_processors:
assert _processor.is_disposed

def test_multiple_dispose_invocations_is_okay(self) -> None:
"""Calling :meth:`ProcessorPipe.dispose` multiple times should be okay.
No errors should be raised and the object should remain disposed.
"""
for _ in range(10):
try:
self._instance.dispose()
except Exception as exc: # noqa: BLE001
fail_reason: str = (
"Calling 'ProcessorPipe.dispose()' multiple times should "
f"be okay. But the following error was raised: {exc!s}"
)
pytest.fail(fail_reason)

assert self._instance.is_disposed
for _processor in self._embedded_processors:
assert _processor.is_disposed

def test_usage_as_a_context_manager_behaves_as_expected(self) -> None:
""":class:`ProcessorPipe` instances are valid context managers and
should behave correctly when used as so.
"""
with self._instance:
assert self._instance.apply(range(5, 10)) == "FGHIJ"

assert self._instance.is_disposed
for _processor in self._embedded_processors:
assert _processor.is_disposed

def test_usage_when_is_disposed_fails(self) -> None:
"""Invoking "resource-aware" methods of a disposed instance should
result in an :exc:`ResourceDisposedError` being raised.
Specifically, invoking the following two methods on a disposed instance
should fail:
- :meth:`ProcessorPipe.__enter__`
- :meth:`ProcessorPipe.apply`
"""
self._instance.dispose()

with pytest.raises(ResourceDisposedError):
self._instance.apply(range(5))

with pytest.raises(ResourceDisposedError):
self._instance.__enter__()
Loading

0 comments on commit 704a302

Please sign in to comment.