Skip to content

Commit

Permalink
feat(sinks): add a SplitSink
Browse files Browse the repository at this point in the history
Add `sghi.etl.commons.sink.SplitSink`, a `Sink` that splits aggregated
processed data into its consitituent data parts, before draining each data
part to multiple other sinks concurrently.
  • Loading branch information
kennedykori committed May 25, 2024
1 parent 63f22a7 commit b4c99d7
Show file tree
Hide file tree
Showing 3 changed files with 455 additions and 5 deletions.
3 changes: 2 additions & 1 deletion src/sghi/etl/commons/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
pipe_processors,
processor,
)
from .sinks import NullSink, sink
from .sinks import NullSink, SplitSink, sink
from .sources import GatherSource, source
from .utils import fail_fast, fail_fast_factory, ignored_failed
from .workflow_definitions import SimpleWorkflowDefinition
Expand All @@ -21,6 +21,7 @@
"SimpleWorkflowDefinition",
"ScatterGatherProcessor",
"SplitGatherProcessor",
"SplitSink",
"fail_fast",
"fail_fast_factory",
"ignored_failed",
Expand Down
251 changes: 249 additions & 2 deletions src/sghi/etl/commons/sinks.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
from __future__ import annotations

import logging
from collections.abc import Callable
from collections.abc import Callable, Iterable, Sequence
from concurrent.futures import Executor, Future, ThreadPoolExecutor
from contextlib import ExitStack
from functools import update_wrapper
from logging import Logger
from typing import Final, Generic, Self, TypeVar, final
Expand All @@ -12,7 +14,17 @@

from sghi.disposable import not_disposed
from sghi.etl.core import Sink
from sghi.utils import ensure_callable, type_fqn
from sghi.retry import Retry, noop_retry
from sghi.task import ConcurrentExecutor, Task, task
from sghi.utils import (
ensure_callable,
ensure_instance_of,
ensure_not_none_nor_empty,
ensure_predicate,
type_fqn,
)

from .utils import fail_fast

# =============================================================================
# TYPES
Expand All @@ -22,6 +34,10 @@
_PDT = TypeVar("_PDT")
"""Type variable representing the data type after processing."""

_T = TypeVar("_T")

_ResultGatherer = Callable[[Iterable[Future[_T]]], Iterable[_T]]

_SinkCallable = Callable[[_PDT], None]


Expand Down Expand Up @@ -153,6 +169,236 @@ def dispose(self) -> None:
self._logger.info("Disposal complete.")


@final
class SplitSink(Sink[Sequence[_PDT]], Generic[_PDT]):
"""A :class:`Sink` that splits processed data and drains the split data to
multiple sinks.
This ``Sink`` implementation takes aggregated processed data, splits it
into its constituent data parts, and then drains each data part to each
embedded sink concurrently. That is, each data part is mapped to each
embedded sink before executing all the embedded sinks concurrently. As
such, the supplied processed data's size must equal the number of embedded
sinks contained by a sink of this type. A result gatherer function can be
provided to specify how to handle errors while draining. A :class:`retry
policy<Retry>` to handle transient draining errors to the embedded sinks
can also be provided. A suitable :class:`Executor` can be specified at
instantiation to control the concurrent draining to the embedded sinks.
Instances of this class are **NOT SAFE** to retry and **SHOULD NEVER** be
retried. However, they do support retrying their embedded sinks. This is
disabled by default but can be enabled by providing a suitable value to
the ``retry_policy_factory`` constructor parameter when creating new
instances. When enabled, each embedded sink will be retried individually
per the specified retry policy in case it fails.
Disposing instances of this class also disposes of their embedded sinks.
.. admonition:: Regarding retry safety
:class: tip
Instances of this ``Sink`` are **NOT SAFE** to retry.
""" # noqa: D205

__slots__ = (
"_sinks",
"_retry_policy_factory",
"_executor_factory",
"_result_gatherer",
"_is_disposed",
"_logger",
"_exit_stack",
"_prepped_sinks",
"_executor",
)

def __init__(
self,
sinks: Sequence[Sink[_PDT]],
retry_policy_factory: Callable[[], Retry] = noop_retry,
executor_factory: Callable[[], Executor] = ThreadPoolExecutor,
result_gatherer: _ResultGatherer[None] = fail_fast,
) -> None:
"""Create a new ``SplitSink`` of the given properties.
:param sinks: A ``Sequence`` of sinks to drain each processed data part
to. These sinks are also referred to as the embedded sinks. The
given ``Sequence`` *MUST NOT* be empty.
:param retry_policy_factory: A callable that supplies retry policy
instance(s) to apply to each embedded sink. This *MUST* be a valid
callable object. Defaults to a factory that returns retry policies
that do nothing.
:param executor_factory: A callable that suppliers suitable
``Executor`` instance(s) to use for the concurrent draining. This
*MUST* be a valid callable object. Defaults to a factory that
returns ``ThreadPoolExecutor`` instances.
:param result_gatherer: A function that specifies how to handle
draining errors. This *MUST* be a valid callable object. Defaults
to a gatherer that fails if draining to any of the embedded sinks
failed, or returns silently otherwise.
:raise TypeError: If ``sinks`` is NOT a ``Sequence``.
:raise ValueError: If ``sinks`` is empty or if
``retry_policy_factory``, ``executor_factory`` and
``result_gatherer`` are NOT callable objects.
"""
super().__init__()
ensure_not_none_nor_empty(
value=ensure_instance_of(
value=sinks,
message="'sinks' MUST be a collections.abc.Sequence object.",
klass=Sequence,
),
message="'sinks' MUST NOT be empty.",
)
self._sinks: Sequence[Sink[_PDT]] = tuple(sinks)
self._retry_policy_factory: Callable[[], Retry] = ensure_callable(
value=retry_policy_factory,
message="'retry_policy_factory' MUST be a callable.",
)
self._executor_factory: Callable[[], Executor] = ensure_callable(
value=executor_factory,
message="'executor_factory' MUST be a callable.",
)
self._result_gatherer: _ResultGatherer[None] = ensure_callable(
value=result_gatherer,
message="'result_gatherer' MUST be a callable.",
)
self._is_disposed: bool = False
self._logger: Logger = logging.getLogger(type_fqn(self.__class__))
self._exit_stack: ExitStack = ExitStack()

# Prepare embedded sinks for execution by ensuring that they are all
# disposed of properly once this object is disposed.
self._prepped_sinks: Sequence[Task[Sequence[_PDT], None]] = tuple(
self._sink_to_task(self._exit_stack.push(_sink), _i)
for _i, _sink in enumerate(self._sinks)
)
self._executor: ConcurrentExecutor[Sequence[_PDT], None]
self._executor = ConcurrentExecutor(
*self._prepped_sinks, executor=self._executor_factory()
)

@not_disposed
@override
def __enter__(self) -> Self:
"""Return ``self`` upon entering the runtime context.
.. admonition:: Don't use after dispose
:class: error
Invoking this method on an instance that is disposed(i.e. the
:attr:`is_disposed` property on the instance is ``True``) will
result in a :exc:`ResourceDisposedError` being raised.
:return: This instance.
:raise ResourceDisposedError: If this sink has already been disposed.
"""
return super(Sink, self).__enter__()

@property
@override
def is_disposed(self) -> bool:
return self._is_disposed

@not_disposed
@override
def drain(self, processed_data: Sequence[_PDT]) -> None:
"""Split the supplied processed data and consume each data part.
This method decomposes the provided aggregated processed data into its
constituent data parts and maps each data part to an embedded sink;
before executing all the embedded sinks concurrently. It then applies
the result-gatherer function assigned to this instance (at creation) to
the :class:`Future` objects resulting from the concurrent execution.
Each of these ``Future`` objects wraps the result of draining each data
part to an embedded sink contained in this ``SplitSink``, and they
preserve the same order.
.. admonition:: Don't use after dispose
:class: error
Invoking this method on an instance that is disposed(i.e. the
:attr:`is_disposed` property on the instance is ``True``) will
result in a :exc:`ResourceDisposedError` being raised.
:param processed_data: The aggregated processed data to consume. This
*MUST* be a ``Sequence`` of processed data values whose size *MUST
EQUAL* the number of embedded sinks contained by this
``SplitSink``.
:return: None.
:raise ResourceDisposedError: If this sink has already been disposed.
:raise TypeError: If ``processed_data`` *IS NOT* a ``Sequence``.
:raise ValueError: If the size of ``processed_data`` *DOES NOT EQUAL*
the number of embedded sinks in this ``SplitSink``.
"""
ensure_instance_of(
value=processed_data,
klass=Sequence,
message=(
"'processed_data' MUST be a collections.abc.Sequence object."
),
)
ensure_predicate(
test=len(processed_data) == len(self._sinks),
message=(
f"Expected 'processed_data' to be of size {len(self._sinks)} "
f"but got size {len(processed_data)} instead."
),
exc_factory=ValueError,
)
self._logger.info(
"Splitting aggregated processed data and draining each data part "
"to all available sinks."
)

with self._executor as executor:
futures = executor.execute(processed_data)

self._result_gatherer(futures)

@override
def dispose(self) -> None:
"""Release any underlying resources contained by this sink.
All embedded sinks are also disposed. After this method returns
successfully, the :attr:`is_disposed` property should return ``True``.
.. note::
Unless otherwise specified, trying to use methods of a
``Disposable`` instance decorated with the
:func:`~sghi.disposable.not_disposed` decorator after this method
returns should generally be considered a programming error and
should result in a :exc:`~sghi.disposable.ResourceDisposedError`
being raised.
This method should be idempotent allowing it to be called more
than once; only the first call, however, should have an effect.
:return: None.
"""
self._is_disposed = True
self._exit_stack.close()
self._executor.dispose()
self._logger.info("Disposal complete.")

def _sink_to_task(
self,
s: Sink[_PDT],
i: int,
) -> Task[Sequence[_PDT], None]:
@task
def do_drain(processed_data: Sequence[_PDT]) -> None:
with s as _s:
drain = self._retry_policy_factory().retry(_s.drain)
return drain(processed_data[i])

return do_drain


@final
class _SinkOfCallable(Sink[_PDT], Generic[_PDT]):
__slots__ = ("_delegate_to", "_is_disposed", "_logger")
Expand Down Expand Up @@ -227,5 +473,6 @@ def dispose(self) -> None:

__all__ = [
"NullSink",
"SplitSink",
"sink",
]
Loading

0 comments on commit b4c99d7

Please sign in to comment.