Skip to content

Commit

Permalink
chore(minor): deprecate Processor.process in favor of apply
Browse files Browse the repository at this point in the history
Add a new method, `apply`, to the `sghi.etl.core.Processor` interface.
This will have the same role as the `process` method currently has on
the same interface. The verb _apply_ best describes how a `Processor`
is used. That is, you _draw_ raw data from a `Source`, _apply_ a
`Processor` to the drawn data to get clean data, and then you _drain_
the clean data to a `Sink`. Therefore, the `process` method now
delegates to the new `apply` method and is deprecated.
  • Loading branch information
kennedykori committed Apr 2, 2024
1 parent e1f39a0 commit 5d0f269
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 15 deletions.
29 changes: 24 additions & 5 deletions src/sghi/etl/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
from abc import ABCMeta, abstractmethod
from typing import TYPE_CHECKING, Generic, TypeVar

from typing_extensions import deprecated

from sghi.disposable import Disposable

if TYPE_CHECKING:
Expand Down Expand Up @@ -71,11 +73,11 @@ class Processor(Disposable, Generic[_RDT, _PDT], metaclass=ABCMeta):
This class defines a blueprint for processing raw data and converting it
into processed data ready for further consumption downstream. Subclasses
implementing this interface should override the :meth:`process` method to
implementing this interface should override the :meth:`apply` method to
specify how the data processing occurs.
In a typical ETL workflow, the `Transform` phase is functionally equivalent
to the ``process`` method of this class. Accordingly, a ``Processor`` is
to the ``apply`` method of this class. Accordingly, a ``Processor`` is
thus executed immediately after the :class:`~sghi.etl.core.Source`
finishes in an SGHI ETL workflow. The raw data obtained from the ``Source``
is taken as input. The output of the ``Processor`` is then passed to a
Expand All @@ -93,17 +95,17 @@ def __call__(self, raw_data: _RDT) -> _PDT:
"""Transform raw data into processed, clean data and return it.
Call this ``Processor`` as a callable. Delegate actual call to
:meth:`process`.
:meth:`apply`.
:param raw_data: The unprocessed data drawn from a `Source`.
:return: The processed, cleaned data that is ready for further
consumption downstream.
"""
return self.process(raw_data)
return self.apply(raw_data)

@abstractmethod
def process(self, raw_data: _RDT) -> _PDT:
def apply(self, raw_data: _RDT) -> _PDT:
"""Transform raw data into processed, clean data and return it.
:param raw_data: The unprocessed data drawn from a `Source`.
Expand All @@ -113,6 +115,23 @@ def process(self, raw_data: _RDT) -> _PDT:
"""
...

@deprecated('Use "apply" instead. Will be removed in 2.0', stacklevel=1)
def process(self, raw_data: _RDT) -> _PDT:
"""Transform raw data into processed, clean data and return it.
.. warning::
This method is deprecated and will be removed in a future
version. Clients of this class should use the :meth:`apply`
method instead, which this method delegates to.
:param raw_data: The unprocessed data drawn from a `Source`.
:return: The processed, cleaned data that is ready for further
consumption downstream.
"""
return self.apply(raw_data)


class Sink(Disposable, Generic[_PDT], metaclass=ABCMeta):
"""An entity that consumes processed data.
Expand Down
52 changes: 42 additions & 10 deletions test/sghi/etl/core_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from dataclasses import dataclass, field
from unittest import TestCase

import pytest
from typing_extensions import override

from sghi.disposable import not_disposed
Expand Down Expand Up @@ -46,7 +47,7 @@ class IntsToStrings(Processor[Iterable[int], Iterable[str]]):

@not_disposed
@override
def process(self, raw_data: Iterable[int]) -> Iterable[str]:
def apply(self, raw_data: Iterable[int]) -> Iterable[str]:
yield from map(str, raw_data)

@property
Expand Down Expand Up @@ -101,8 +102,7 @@ def test_invoking_source_as_a_callable_returns_expected_value(
In short, ensure that invoking a ``Source`` instance as a callable
delegates the actual call to :meth:`~sghi.etl.core.Source.draw`.
""" # noqa: D202, D205

""" # noqa: D205
instance1: IntsSupplier
instance2: IntsSupplier
max_ints: int = 4
Expand Down Expand Up @@ -138,22 +138,55 @@ def test_invoking_processor_as_a_callable_returns_expected_value(
value when invoked as a callable.
In short, ensure that invoking a ``Processor`` instance as a callable
delegates the actual call to
:meth:`~sghi.etl.core.Processor.process`.
""" # noqa: D202, D205

delegates the actual call to :meth:`~sghi.etl.core.Processor.apply`.
""" # noqa: D205
raw_values = tuple(self._source())

instance1: IntsToStrings
instance2: IntsToStrings

with IntsToStrings() as instance1, IntsToStrings() as instance2:
assert (
tuple(instance1.process(raw_values))
tuple(instance1.apply(raw_values))
== tuple(instance2(raw_values))
== ("0", "1", "2", "3", "4")
)

def test_invoking_the_process_method_returns_expected_value(self) -> None:
""":meth:`~sghi.etl.core.Processor.process` should return the expected
value.
That is, invoking the ``process`` method of a ``Processor`` instance
should return the same value as invoking the
:meth:`~sghi.etl.core.Processor.apply` method of the same instance.
""" # noqa: D205
raw_values = tuple(self._source())

instance1: IntsToStrings
instance2: IntsToStrings

with IntsToStrings() as instance1, IntsToStrings() as instance2:
assert (
tuple(instance1.apply(raw_values))
== tuple(instance2.process(raw_values)) # type: ignore
== ("0", "1", "2", "3", "4")
)

def test_invoking_the_process_method_raises_a_deprecation_waring(
self,
) -> None:
""":meth:`~sghi.etl.core.Processor.process` is deprecated for removal.
Ensure that invoking the ``process`` method raised a
``DeprecationWarning``.
"""
raw_values = tuple(self._source())
instance: IntsToStrings = IntsToStrings()
with pytest.warns(DeprecationWarning, match='Use "apply" instead'):
instance.process(raw_values) # type: ignore

instance.dispose()


class TestSink(TestCase):
"""Tests for the :class:`sghi.etl.core.Processor` interface.
Expand All @@ -180,8 +213,7 @@ def test_invoking_sink_as_a_callable_returns_expected_value(self) -> None:
In short, ensure that invoking a ``Sink`` instance as a callable
delegates the actual call to :meth:`~sghi.etl.core.Sink.drain`.
""" # noqa: D202, D205

""" # noqa: D205
processed_data: tuple[str, ...]
processed_data = tuple(self._processor(self._source()))

Expand Down

0 comments on commit 5d0f269

Please sign in to comment.