Skip to content

Commit

Permalink
feat(lib): add essential application libraries
Browse files Browse the repository at this point in the history
# TODO: Add a description of each.
  • Loading branch information
kennedykori committed Nov 5, 2023
1 parent bdf8b53 commit dda602d
Show file tree
Hide file tree
Showing 7 changed files with 788 additions and 0 deletions.
Empty file.
17 changes: 17 additions & 0 deletions src/sghi/ml_pipeline/lib/common/__init__.py
@@ -0,0 +1,17 @@
from .data_processors import NoOpDataProcessor, data_processor
from .data_sinks import FanOutDataSink, NullDataSink, data_sink
from .data_sources import FanInDataSource, data_source
from .workflow_builder import WorkflowBuilder
from .workflow_descriptors import WorkflowDescriptor

__all__ = [
FanInDataSource,
FanOutDataSink,
NoOpDataProcessor,
NullDataSink,
WorkflowBuilder,
WorkflowDescriptor,
data_sink,
data_source,
data_processor,
]
136 changes: 136 additions & 0 deletions src/sghi/ml_pipeline/lib/common/data_processors.py
@@ -0,0 +1,136 @@
from __future__ import annotations

import logging
from collections.abc import Callable
from logging import Logger
from typing import TYPE_CHECKING, Generic, TypeVar

from attrs import define, field, validators

from sghi.disposable import not_disposed
from sghi.ml_pipeline.domain import DataProcessor
from sghi.utils import ensure_not_none, type_fqn

if TYPE_CHECKING:
from typing_extensions import Self

# =============================================================================
# TYPES
# =============================================================================


_PDT = TypeVar("_PDT")
"""Processed Data Type."""

_RDT = TypeVar("_RDT")
"""Raw Data Type."""

_DataProcessorCallable = Callable[[_RDT], _PDT]


# =============================================================================
# SPEC IMPLEMENTATIONS
# =============================================================================


@define
class ComposeDataProcessors: # Aggregate Data Processors
...


@define
class PipeDataProcessors:
...


@define
class NoOpDataProcessor(DataProcessor[_RDT, _RDT], Generic[_RDT]):
"""
A :class:`DataProcessor` implementation that performs no processing on the
received data and returns it as is.
"""

_is_disposed: bool = field(default=False, init=False)
_logger: Logger = field(init=False, repr=False)

def __attrs_post_init__(self) -> None:
self._logger: Logger = logging.getLogger(type_fqn(self.__class__))

@not_disposed
def __call__(self, raw_data: _RDT) -> _RDT:
return self.process(raw_data)

@not_disposed
def __enter__(self) -> Self:
return super(DataProcessor, self).__enter__()

@property
def is_disposed(self) -> bool:
return self._is_disposed

def dispose(self) -> None:
self._is_disposed = True
self._logger.debug("Disposal complete.")

@not_disposed
def process(self, raw_data: _RDT) -> _RDT:
self._logger.debug("Skipping data processing. Return raw data as is.")
return raw_data


@define
class _DataProcessorOfCallable(DataProcessor[_RDT, _PDT]):
_callable: _DataProcessorCallable[_RDT, _PDT] = field(
validator=validators.is_callable(),
)
_is_disposed: bool = field(default=False, init=False)
_logger: Logger = field(init=False, repr=False)

def __attrs_post_init__(self) -> None:
self._logger: Logger = logging.getLogger(type_fqn(self.__class__))

@not_disposed
def __call__(self, raw_data: _RDT) -> _PDT:
return self.process(raw_data)

@not_disposed
def __enter__(self) -> Self:
return super(DataProcessor, self).__enter__()

@property
def is_disposed(self) -> bool:
return self._is_disposed

def dispose(self) -> None:
self._is_disposed = True
self._logger.debug("Disposal complete.")

def process(self, raw_data: _RDT) -> _PDT:
self._logger.debug(
"Processing data using '%s'.",
type_fqn(self._callable),
)
return self._callable(raw_data)


# =============================================================================
# DECORATORS
# =============================================================================


def data_processor(f: Callable[[_RDT], _PDT]) -> DataProcessor[_RDT, _PDT]:
"""Mark a ``Callable`` as a :class:`DataProcessor`.
:param f: The callable to be decorated. The callable *MUST* have at *MOST*,
one required argument (the raw data to be processed).
:return: A ``DataProcessor`` instance.
:raise ValueError: If ``f`` is ``None``.
"""
ensure_not_none(f, "'f' MUST not be None.")

def wrapper(_f: Callable[[_RDT], _PDT]) -> DataProcessor[_RDT, _PDT]:
return _DataProcessorOfCallable(callable=_f) # pyright: ignore

return wrapper(f)
173 changes: 173 additions & 0 deletions src/sghi/ml_pipeline/lib/common/data_sinks.py
@@ -0,0 +1,173 @@
from __future__ import annotations

import logging
from collections.abc import Callable, Sequence
from contextlib import ExitStack
from logging import Logger
from typing import TYPE_CHECKING, Any, Generic, TypeVar

from attrs import define, field, validators

from sghi.disposable import not_disposed
from sghi.ml_pipeline.domain import DataSink
from sghi.task import ConcurrentExecutor
from sghi.utils import ensure_not_none, type_fqn

if TYPE_CHECKING:
from typing_extensions import Self

# =============================================================================
# TYPES
# =============================================================================


_PDT = TypeVar("_PDT")
"""Processed Data Type."""

_DataSinkCallable = Callable[[_PDT], None]


# =============================================================================
# SPEC IMPLEMENTATIONS
# =============================================================================


@define
class FanOutDataSink(DataSink[_PDT], Generic[_PDT]): # CompositeDataSink
"""One-To-Many Multiplexer"""

_data_sinks: Sequence[DataSink[_PDT]] = field(
converter=tuple, # Make a copy
repr=False,
validator=[
validators.min_len(1),
validators.deep_iterable(
member_validator=validators.instance_of(DataSink),
iterable_validator=validators.instance_of(Sequence),
),
],
)
_is_disposed: bool = field(default=False, init=False)
_logger: Logger = field(init=False, repr=False)
_executor: ConcurrentExecutor[_PDT, None] = field(init=False, repr=False)
_exit_stack: ExitStack = field(factory=ExitStack, init=False, repr=False)

def __attrs_post_init__(self) -> None:
self._logger: Logger = logging.getLogger(type_fqn(self.__class__))
self._executor = ConcurrentExecutor(*self._data_sinks)

@not_disposed
def __call__(self, processed_data: Any) -> None: # noqa: ANN401
return self.drain(processed_data)

@not_disposed
def __enter__(self) -> Self:
return super(DataSink, self).__enter__()

@property
def is_disposed(self) -> bool:
return self._is_disposed

def dispose(self) -> None:
self._is_disposed = True
self._exit_stack.close()
self._executor.dispose()
self._logger.debug("Disposal complete.")

def drain(self, processed_data: _PDT) -> None: # TODO: Add error handling
self._logger.debug("Multiplexing processed data to consumers.")
with self._exit_stack:
self._executor(an_input=processed_data)


@define
class NullDataSink(DataSink[Any]):
"""
A :class:`DataSink` implementation that discards processed the data it
receives.
"""

_is_disposed: bool = field(default=False, init=False)
_logger: Logger = field(init=False, repr=False)

def __attrs_post_init__(self) -> None:
self._logger: Logger = logging.getLogger(type_fqn(self.__class__))

@not_disposed
def __call__(self, processed_data: Any) -> None: # noqa: ANN401
return self.drain(processed_data)

@not_disposed
def __enter__(self) -> Self:
return super(DataSink, self).__enter__()

@property
def is_disposed(self) -> bool:
return self._is_disposed

def dispose(self) -> None:
self._is_disposed = True
self._logger.debug("Disposal complete.")

@not_disposed
def drain(self, processed_data: Any) -> None: # noqa: ANN401
self._logger.debug("Discarding processed data.")


@define
class _DataSinkOfCallable(DataSink[_PDT]):
_callable: _DataSinkCallable[_PDT] = field(
validator=validators.is_callable(),
)
_is_disposed: bool = field(default=False, init=False)
_logger: Logger = field(init=False, repr=False)

def __attrs_post_init__(self) -> None:
self._logger: Logger = logging.getLogger(type_fqn(self.__class__))

@not_disposed
def __call__(self, processed_data: Any) -> None: # noqa: ANN401
return self.drain(processed_data)

@not_disposed
def __enter__(self) -> Self:
return super(DataSink, self).__enter__()

@property
def is_disposed(self) -> bool:
return self._is_disposed

def dispose(self) -> None:
self._is_disposed = True
self._logger.debug("Disposal complete.")

@not_disposed
def drain(self, processed_data: Any) -> None: # noqa: ANN401
self._logger.debug(
"Draining data to '%s'.",
type_fqn(self._callable),
)
self._callable(processed_data)


# =============================================================================
# DECORATORS
# =============================================================================


def data_sink(f: Callable[[_PDT], None]) -> DataSink[_PDT]:
"""Mark a ``Callable`` as a :class:`DataSink`.
:param f: The callable to be decorated. The callable *MUST* have at *MOST*,
one required argument (the processed data to consume).
:return: A ``DataSink`` instance.
:raise ValueError: If ``f`` is ``None``.
"""
ensure_not_none(f, "'f' MUST not be None.")

def wrapper(_f: Callable[[_PDT], None]) -> DataSink[_PDT]:
return _DataSinkOfCallable(callable=_f) # pyright: ignore

return wrapper(f)

0 comments on commit dda602d

Please sign in to comment.