Skip to content

Commit

Permalink
feat(utils): add a workflow runner utility
Browse files Browse the repository at this point in the history
Add `sghi.etl.commons.utils.run_workflow`, a utility function that
executes ETL Workflows. This function accepts a factory function that
supplies an SGHI ETL `WorkflowDefinition` instance, it then invokes the
function to get the `WorkflowDefinition`/workflow and then executes it.
  • Loading branch information
kennedykori committed Jun 4, 2024
1 parent 6274a31 commit 4f26bc9
Show file tree
Hide file tree
Showing 5 changed files with 247 additions and 1 deletion.
4 changes: 4 additions & 0 deletions docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@
nitpicky = True

nitpick_ignore = [
("py:attr", "sghi.etl.core.WorkflowDefinition.processor_factory"), # docs aren't published yet
("py:attr", "sghi.etl.core.WorkflowDefinition.sink_factory"), # docs aren't published yet
("py:attr", "sghi.etl.core.WorkflowDefinition.source_factory"), # docs aren't published yet
("py:class", "_RDT"), # private type annotations
("py:class", "_PDT"), # private type annotations
("py:class", "Executor"), # sphinx can't find it
Expand Down Expand Up @@ -85,6 +88,7 @@
("py:exc", "ResourceDisposedError"), # docs aren't published yet
("py:exc", "sghi.disposable.ResourceDisposedError"), # docs aren't published yet
("py:func", "sghi.disposable.not_disposed"), # docs aren't published yet
("py:meth", "sghi.etl.core.Source.draw"), # docs aren't published yet
("py:obj", "sghi.etl.commons.processors._PDT"), # private type annotations
("py:obj", "sghi.etl.commons.processors._RDT"), # private type annotations
("py:obj", "sghi.etl.commons.sinks._PDT"), # private type annotations
Expand Down
3 changes: 2 additions & 1 deletion src/sghi/etl/commons/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
)
from .sinks import NullSink, ScatterSink, SplitSink, sink
from .sources import GatherSource, source
from .utils import fail_fast, fail_fast_factory, ignored_failed
from .utils import fail_fast, fail_fast_factory, ignored_failed, run_workflow
from .workflow_definitions import SimpleWorkflowDefinition

__all__ = [
Expand All @@ -28,6 +28,7 @@
"ignored_failed",
"pipe_processors",
"processor",
"run_workflow",
"sink",
"source",
]
2 changes: 2 additions & 0 deletions src/sghi/etl/commons/utils/__init__.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
"""Common utilities."""

from .others import run_workflow
from .result_gatherers import fail_fast, fail_fast_factory, ignored_failed

__all__ = [
"fail_fast",
"fail_fast_factory",
"ignored_failed",
"run_workflow",
]
108 changes: 108 additions & 0 deletions src/sghi/etl/commons/utils/others.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
"""Other useful utilities."""

from __future__ import annotations

import logging
from logging import Logger
from typing import TYPE_CHECKING, Final, TypeVar

from sghi.utils import ensure_callable

if TYPE_CHECKING:
from collections.abc import Callable

from sghi.etl.core import WorkflowDefinition

# =============================================================================
# TYPES
# =============================================================================


_PDT = TypeVar("_PDT")
"""Type variable representing the data type after processing."""

_RDT = TypeVar("_RDT")
"""Type variable representing the raw data type."""


# =============================================================================
# CONSTANTS
# =============================================================================


_LOGGER: Final[Logger] = logging.getLogger(name=__name__)


# =============================================================================
# UTILITIES
# =============================================================================


def run_workflow(wf: Callable[[], WorkflowDefinition[_RDT, _PDT]]) -> None:
"""Execute an ETL :class:`Workflow<WorkflowDefinition>`.
.. tip::
In the context of this function, **"ETL Workflow"** or the shorter
version **"Workflow”**, refers to an instance of the
:class:`WorkflowDefinition` class that is being executed or about to
be executed.
This function accepts a factory function that supplies an ETL
``WorkflowDefinition`` instance, it then invokes the function to get the
``WorkflowDefinition``/workflow and then executes it. The execution of the
workflow proceeds as follows:
- The callable returned by the
:attr:`~sghi.etl.core.WorkflowDefinition.source_factory` property of
the supplied ``WorkflowDefinition`` is used to get the
:class:`~sghi.etl.core.Source` associated with the workflow. The
:meth:`~sghi.etl.core.Source.draw` method of this ``Source`` is then
invoked to get the raw data to process.
- The callable returned by the
:attr:`~sghi.etl.core.WorkflowDefinition.processor_factory` property
of the supplied ``WorkflowDefinition`` is invoked to get the
:class:`~sghi.etl.core.Processor` associated with the workflow. This
``Processor`` is then applied to the raw data retrieved from the
``Source`` in the previous step to obtain processed data.
- The callable returned by the
:attr:`~sghi.etl.core.WorkflowDefinition.sink_factory` property of
the supplied ``WorkflowDefinition`` is invoked to get the
:class:`~sghi.etl.core.Sink` associated with the workflow. The
processed data from the previous step is drained into this ``Sink``.
- The ``Source``, ``Processor`` and ``Sink`` created in the previous
steps are disposed of. Note that this disposal also happens if an
error occurs during the workflow execution.
.. note::
The above is a general description of how the workflow execution
occurs. The actual implementation may vary slightly from this
description.
If an exception is raised during the workflow execution, all the workflow's
components (source, processor, sink) are disposed of followed by the
propagation of the error to the caller. If the supplied value **IS NOT** a
valid callable object, a :exc:`ValueError` is raised.
:param wf: A factory function that supplies the ``WorkflowDefinition``
instance to be executed. This function is only invoked once. The given
value *MUST* be a valid callable object, and it *MUST NOT* have any
required arguments.
:return: None.
:raise ValueError: If ``wf`` is NOT a callable object.
"""
ensure_callable(wf, message="'wf' MUST be a valid callable object.")

wd: WorkflowDefinition = wf()
_LOGGER.info("[%s:%s] Starting workflow execution ...", wd.id, wd.name)
with (
wd.source_factory() as source,
wd.processor_factory() as processor,
wd.sink_factory() as sink,
):
sink.drain(processor.apply(source.draw()))

_LOGGER.info("[%s:%s] Workflow execution complete.", wd.id, wd.name)
131 changes: 131 additions & 0 deletions test/sghi/etl/commons_tests/utils_tests/others_tests.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
# ruff: noqa: D205
"""Tests for the ``sghi.etl.commons.utils.others`` module."""

from __future__ import annotations

from typing import TYPE_CHECKING

import pytest

from sghi.etl.commons import (
NOOPProcessor,
NullSink,
ProcessorPipe,
SimpleWorkflowDefinition,
processor,
run_workflow,
sink,
source,
)

if TYPE_CHECKING:
from collections.abc import Callable, Iterable, MutableSequence

from sghi.etl.core import WorkflowDefinition

# =============================================================================
# HELPERS
# =============================================================================


def _workflow_factory_generator(
repository: MutableSequence[str],
start: int = 0,
stop: int = 5,
step: int = 1,
) -> Callable[[], WorkflowDefinition[Iterable[int], Iterable[str]]]:
@source
def supply_ints() -> Iterable[int]:
yield from range(start, stop, step)

@processor
def add_100(values: Iterable[int]) -> Iterable[int]:
for v in values:
yield v + 100

@processor
def ints_as_strings(ints: Iterable[int]) -> Iterable[str]:
yield from map(str, ints)

@sink
def save_strings_to_repo(strings: Iterable[str]) -> None:
repository.extend(strings)

def _create_workflow() -> WorkflowDefinition[Iterable[int], Iterable[str]]:
return SimpleWorkflowDefinition(
id="test_workflow",
name="Test Workflow",
source_factory=lambda: supply_ints,
processor_factory=lambda: ProcessorPipe(
[add_100, ints_as_strings],
),
sink_factory=lambda: save_strings_to_repo,
)

return _create_workflow


# =============================================================================
# TEST CASES
# =============================================================================


def test_run_workflow_fails_on_non_callable_input() -> None:
""":func:`sghi.etl.commons.utils.run_workflow` should raise a
:exc:`ValueError` when given a non-callable input value.
"""
wf = _workflow_factory_generator([])
for non_callable in (None, wf()):
with pytest.raises(ValueError, match="callable object.") as exp_info:
run_workflow(wf=non_callable) # type: ignore

assert (
exp_info.value.args[0] == "'wf' MUST be a valid callable object."
)


def test_run_workflow_side_effects_on_failed_execution() -> None:
""":func:`sghi.etl.commons.utils.run_workflow` should dispose all the
workflow components (source, processor and sink) if an error occurs during
execution.
"""

@source
def failing_source() -> str:
_err_msg: str = "Oops, something failed."
raise RuntimeError(_err_msg)

_processor = NOOPProcessor()
_sink = NullSink()

def create_failing_workflow() -> WorkflowDefinition[str, str]:
return SimpleWorkflowDefinition(
id="failing_workflow",
name="Failing Workflow",
source_factory=lambda: failing_source,
processor_factory=lambda: _processor,
sink_factory=lambda: _sink,
)

with pytest.raises(RuntimeError, match="Oops, something failed."):
run_workflow(wf=create_failing_workflow)

assert failing_source.is_disposed
assert _processor.is_disposed
assert _sink.is_disposed


def test_run_workflow_side_effects_on_successful_execution() -> None:
"""func:`sghi.etl.commons.utils.run_workflow` should execute an ETL
Workflow when given a factory function that returns the workflow.
"""
repository1: list[str] = []
repository2: list[str] = []
wf1 = _workflow_factory_generator(repository1)
wf2 = _workflow_factory_generator(repository2, 10, 60, 10)

run_workflow(wf1)
run_workflow(wf2)

assert repository1 == ["100", "101", "102", "103", "104"]
assert repository2 == ["110", "120", "130", "140", "150"]

0 comments on commit 4f26bc9

Please sign in to comment.