Skip to content

Commit

Permalink
feat: add core interfaces
Browse files Browse the repository at this point in the history
Add the core interfaces that constitute an SGHI ETL workflow. Those are:

- `sghi.miniETL.core.Source`
- `sghi.miniETL.core.Processor`
- `sghi.miniETL.core.Sink`
- `sghi.miniETL.core.WorkflowDefinition`
  • Loading branch information
kennedykori committed Mar 11, 2024
1 parent 78051a3 commit ce305e4
Show file tree
Hide file tree
Showing 5 changed files with 447 additions and 16 deletions.
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ repos:
- --py311-plus

- repo: https://github.com/charliermarsh/ruff-pre-commit
rev: v0.1.11
rev: v0.3.2
hooks:
- id: ruff
args:
Expand Down
9 changes: 8 additions & 1 deletion docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import os
import sys
from datetime import datetime

sys.path.insert(0, os.path.abspath("src"))

Expand All @@ -17,7 +18,7 @@
# -----------------------------------------------------------------------------

author = "Savannah Global Health Institute"
copyright = "2023, Savannah Global Health Institute"
copyright = f"{datetime.today().year}, Savannah Global Health Institute"
project = "sghi-miniETL"


Expand Down Expand Up @@ -55,10 +56,16 @@
nitpicky = True

nitpick_ignore = [
("py:class", "_PDT"), # private type annotations
("py:class", "_RDT"), # private type annotations
("py:class", "TracebackType"), # Used as type annotation. Only available when type checking
("py:class", "sghi.disposable.Disposable"), # docs aren't published yet
("py:class", "sghi.miniETL.core._PDT"), # private type annotations
("py:class", "sghi.miniETL.core._RDT"), # private type annotations
("py:exc", "sghi.disposable.ResourceDisposedError"), # docs aren't published yet
("py:func", "sghi.disposable.not_disposed"), # docs aren't published yet
("py:obj", "sghi.miniETL.core._PDT"), # private type annotations
("py:obj", "sghi.miniETL.core._RDT"), # private type annotations
]

templates_path = ["templates"]
Expand Down
30 changes: 16 additions & 14 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ test = [
"pytest-forked~=1.6.0",
"pytest-sugar~=0.9.7",
"pytest-xdist~=3.5.0",
"ruff~=0.1.11",
"ruff~=0.3.2",
"tox~=4.11.4",
"tox-gh-actions~=3.2.0",
]
Expand Down Expand Up @@ -186,6 +186,17 @@ exclude = [
"node_modules",
"venv",
]
line-length = 79
src = ["src", "test"]
target-version = "py311"

[tool.ruff.format]
docstring-code-format = true
indent-style = "space"
quote-style = "double"
skip-magic-trailing-comma = false

[tool.ruff.lint]
ignore = [
"ANN002",
"ANN003",
Expand All @@ -198,7 +209,6 @@ ignore = [
"ISC001",
"S101",
]
line-length = 79
select = [
"A", # flake8-builtins
"ANN", # flake8-annotations
Expand Down Expand Up @@ -231,23 +241,15 @@ select = [
"W", # pycodestyle Warning
"YTT", # flake8-2020
]
src = ["src", "test"]
target-version = "py311"

[tool.ruff.flake8-quotes]
[tool.ruff.lint.flake8-quotes]
inline-quotes = "double"
docstring-quotes = "double"

[tool.ruff.format]
docstring-code-format = true
indent-style = "space"
quote-style = "double"
skip-magic-trailing-comma = false

[tool.ruff.isort]
[tool.ruff.lint.isort]
known-first-party = ["src", "test"]

[tool.ruff.mccabe]
[tool.ruff.lint.mccabe]
max-complexity = 10

[tool.setuptools]
Expand Down Expand Up @@ -279,7 +281,7 @@ legacy_tox_ini = """
[testenv]
commands =
ruff .
ruff check .
ruff format --check .
pyright .
coverage erase
Expand Down
245 changes: 245 additions & 0 deletions src/sghi/miniETL/core.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,245 @@
"""The API specification for components of the SGHI ETL workflow."""

from __future__ import annotations

from abc import ABCMeta, abstractmethod
from typing import TYPE_CHECKING, Generic, TypeVar

from sghi.disposable import Disposable

if TYPE_CHECKING:
from collections.abc import Callable

# =============================================================================
# TYPES
# =============================================================================


_PDT = TypeVar("_PDT")
"""Processed Data Type."""

_RDT = TypeVar("_RDT")
"""Raw Data Type."""


# =============================================================================
# BASE INTERFACES
# =============================================================================


class Source(Disposable, Generic[_RDT], metaclass=ABCMeta):
"""An entity that contains or provides data of interest.
This class defines the interface of a data source, which is a provider of
raw data. Subclasses implementing this interface should override the
:meth:`draw` method to specify how the data is obtained.
In a typical ETL workflow, the `Extract` phase corresponds to the ``draw``
method of this class. Consequently, a ``Source`` forms the initial step of
an SGHI ETL workflow and is thus executed first. The obtained data is then
passed to a :class:`~sghi.miniETL.core.Processor` for further processing.
.. tip::
This class implements the :class:`~sghi.disposable.Disposable`
interface allowing for easy resource(s) management and clean up.
"""

__slots__ = ()

def __call__(self) -> _RDT:
"""Obtain raw data from this :class:`data source<Source>`.
Call this ``Source`` instance as a callable. Delegate actual call
to :meth:`draw`.
:return: The raw data from this `Source`.
"""
return self.draw()

@abstractmethod
def draw(self) -> _RDT:
"""Obtain raw data from this :class:`data source<Source>`.
:return: The raw data from this `Source`.
"""
...


class Processor(Disposable, Generic[_RDT, _PDT], metaclass=ABCMeta):
"""The post-extraction transformation(s)/ops performed on raw data.
This class defines a blueprint for processing raw data and converting it
into processed data ready for further consumption downstream. Subclasses
implementing this interface should override the :meth:`process` method to
specify how the data processing occurs.
In a typical ETL workflow, the `Transform` phase is functionally equivalent
to the ``process`` method of this class. Accordingly, a ``Processor`` is
thus executed immediately after the :class:`~sghi.miniETL.core.Source`
finishes in an SGHI ETL workflow. The raw data obtained from the ``Source``
is taken as input. The output of the ``Processor`` is then passed to a
:class:`~sghi.miniETL.core.Sink` for storage or transmission.
.. tip::
This class implements the :class:`~sghi.disposable.Disposable`
interface allowing for easy resource(s) management and clean up.
"""

__slots__ = ()

def __call__(self, raw_data: _RDT) -> _PDT:
"""Transform raw data into processed, clean data and return it.
Call this ``Processor`` as a callable. Delegate actual call to
:meth:`process`.
:param raw_data: The unprocessed data drawn from a `Source`.
:return: The processed, cleaned data that is ready for further
consumption downstream.
"""
return self.process(raw_data)

@abstractmethod
def process(self, raw_data: _RDT) -> _PDT:
"""Transform raw data into processed, clean data and return it.
:param raw_data: The unprocessed data drawn from a `Source`.
:return: The processed, cleaned data that is ready for further
consumption downstream.
"""
...


class Sink(Disposable, Generic[_PDT], metaclass=ABCMeta):
"""An entity that consumes processed data.
This interface represents entities that consume processed data(the output
of a :meth:`data process operation<sghi.miniETL.core.Processor.process>`)
and the final step of an SGHI ETL workflow. Subclasses implementing this
interface should override the :meth:`drain` method to specify how the
processed data is consumed.
In a typical ETL workflow, the `Load` phase is equivalent to the ``drain``
method of this class. Therefore, a ``Sink`` is the last step of an SGHI ETL
workflow and is executed next after the associated :meth:`Processor`
completes. The Sink takes as input the output of the ``Processor``.
.. tip::
This class implements the :class:`~sghi.disposable.Disposable`
interface allowing for easy resource(s) management and clean up.
"""

__slots__ = ()

def __call__(self, processed_data: _PDT) -> None:
"""Consume processed data.
Call this ``Sink`` as a callable. Delegate actual call to
:meth:`drain`.
:param processed_data: The processed data to be consumed.
:return: None.
"""
return self.drain(processed_data)

@abstractmethod
def drain(self, processed_data: _PDT) -> None:
"""Consume processed data.
:param processed_data: The processed data to be consumed.
:return: None.
"""
...


class WorkflowDefinition(Generic[_RDT, _PDT], metaclass=ABCMeta):
"""An object that defines the components of an SGHI ETL Workflow.
A ``WorkflowDefinition`` serves to assemble all the essential parts of an
ETL Workflow and also serves as the primary unit of execution within an
SGHI ETL process.
"""

__slots__ = ()

@property
@abstractmethod
def id(self) -> str:
"""The unique identifier of this workflow.
This can be used to select a workflow to execute or operate on.
:return: The unique identifier of this workflow.
"""
...

@property
@abstractmethod
def name(self) -> str:
"""The name of this workflow.
This is a short, human-friendly "identifier" for this workflow. Unlike
the :attr:`id`, there is no requirement that this should be unique
across different workflows. However, workflow authors should strive to
make the name unique also.
:return: The name of this workflow.
"""
...

@property
@abstractmethod
def description(self) -> str | None:
"""The description of this workflow, if available.
:return: The description of this workflow or ``None`` if not available.
"""
...

@property
@abstractmethod
def source_factory(self) -> Callable[[], Source[_RDT]]:
"""The factory that creates this workflow's :class:`Source`.
This factory function is invoked early on during the execution of the
workflow to get the ``Source`` associated with this workflow.
:return: A factory function that, when invoked, returns the
``Source`` instance associated with this workflow.
"""
...

@property
@abstractmethod
def processor_factory(
self,
) -> Callable[[], Processor[_RDT, _PDT]]:
"""The factory that created this workflow's :class:`Processor`.
This factory function is invoked early on during the execution of the
workflow to get the ``Processor`` associated with this workflow.
:return: A factory function that, when invoked, returns the
``Processor`` instance associated with this workflow.
"""
...

@property
@abstractmethod
def sink_factory(self) -> Callable[[], Sink[_PDT]]:
"""The factory that created this workflow's :class:`Sink`.
This factory function is invoked early on during the execution of the
workflow to get the ``Sink`` associated with this workflow.
:return: A factory function that, when invoked, returns the
``Sink`` instance associated with this workflow.
"""
...
Loading

0 comments on commit ce305e4

Please sign in to comment.