Skip to content

Commit

Permalink
feat(processors): add a ScatterGatherProcessor
Browse files Browse the repository at this point in the history
Add `sghi.etl.commons.processors.ScatterGatherProcessor`, a `Processor`
that applies multiple other processors to the same raw data, and then
returns the aggregated outputs of these processors.
  • Loading branch information
kennedykori committed May 19, 2024
1 parent d8a8c5d commit 859567e
Show file tree
Hide file tree
Showing 3 changed files with 405 additions and 2 deletions.
2 changes: 2 additions & 0 deletions src/sghi/etl/commons/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from .processors import (
NOOPProcessor,
ProcessorPipe,
ScatterGatherProcessor,
SplitGatherProcessor,
pipe_processors,
processor,
Expand All @@ -18,6 +19,7 @@
"NullSink",
"ProcessorPipe",
"SimpleWorkflowDefinition",
"ScatterGatherProcessor",
"SplitGatherProcessor",
"fail_fast",
"fail_fast_factory",
Expand Down
220 changes: 219 additions & 1 deletion src/sghi/etl/commons/processors.py
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,223 @@ def do_apply(raw_data: _RDT) -> _PDT:
pipe_processors = ProcessorPipe


@final
class ScatterGatherProcessor(
Processor[_RDT, Sequence[_PDT]], Generic[_RDT, _PDT]
):
"""A :class:`Processor` that applies raw data to multiple other processors.
This ``Processor`` implementation applies multiple other processors
(embedded processors) to the same raw data, and then returns the aggregated
outputs of these processors. The applications of the embedded processors to
the raw data are performed concurrently. A suitable :class:`Executor` can
be specified at instantiation to control the concurrent execution of the
embedded processors. A result gatherer function can be provided to specify
how to handle processing errors and/or which results from the embedded
processors to pick. A :class:`retry policy<Retry>` to handle transient
processing errors of the embedded processors can also be provided.
Instances of this class are **NOT SAFE** to retry and **SHOULD NEVER** be
retried. However, they do support retrying their embedded processors. This
is disabled by default but can be enabled by providing a suitable value to
the ``retry_policy_factory`` constructor parameter when creating new
instances. When enabled, each embedded processor will be retried
individually per the specified retry policy in case it fails.
Disposing instances of this class also disposes of their embedded
processors.
.. admonition:: Regarding retry safety
:class: tip
Instances of this ``Processor`` are **NOT SAFE** to retry.
"""

__slots__ = (
"_processors",
"_retry_policy_factory",
"_executor_factory",
"_result_gatherer",
"_is_disposed",
"_logger",
"_exit_stack",
"_prepped_processors",
"_executor",
)

def __init__(
self,
processors: Sequence[Processor[_RDT, _PDT]],
retry_policy_factory: Callable[[], Retry] = noop_retry,
executor_factory: Callable[[], Executor] = ThreadPoolExecutor,
result_gatherer: _ResultGatherer[_PDT] = fail_fast,
) -> None:
"""Create a new ``ScatterGatherProcessor`` of the given properties.
:param processors: A ``Sequence`` of processors to be applied to the
raw data. These processors are also referred to as the embedded
processors. The given ``Sequence`` *MUST NOT* be empty.
:param retry_policy_factory: A callable that supplies retry policy
instance(s) to apply to each embedded processor. This *MUST* be a
valid callable object. Defaults to a factory that returns retry
policies that do nothing.
:param executor_factory: A callable that suppliers suitable
``Executor`` instance(s) to use for the concurrent processing. This
*MUST* be a valid callable object. Defaults to a factory that
returns ``ThreadPoolExecutor`` instances.
:param result_gatherer: A function that specifies how to handle
processing errors and/or which results from the embedded processors
to pick. This *MUST* be a valid callable object. Defaults to a
gatherer that fails if applying any of the embedded processors
failed, or returns all the processed data otherwise.
:raise TypeError: If ``processors`` is NOT a ``Sequence``.
:raise ValueError: If ``processors`` is empty or if
``retry_policy_factory``, ``executor_factory`` and
``result_gatherer`` are NOT callable objects.
"""
super().__init__()
ensure_not_none_nor_empty(
value=ensure_instance_of(
value=processors,
message=(
"'processors' MUST be a collections.abc.Sequence object."
),
klass=Sequence,
),
message="'processors' MUST NOT be empty.",
)
self._processors: Sequence[Processor[_RDT, _PDT]] = tuple(processors)
self._retry_policy_factory: Callable[[], Retry] = ensure_callable(
value=retry_policy_factory,
message="'retry_policy_factory' MUST be a callable.",
)
self._executor_factory: Callable[[], Executor] = ensure_callable(
value=executor_factory,
message="'executor_factory' MUST be a callable.",
)
self._result_gatherer: _ResultGatherer[_PDT] = ensure_callable(
value=result_gatherer,
message="'result_gatherer' MUST be a callable.",
)
self._is_disposed: bool = False
self._logger: Logger = logging.getLogger(type_fqn(self.__class__))
self._exit_stack: ExitStack = ExitStack()

# Prepare embedded processors for execution by ensuring that they are
# all disposed of properly once this object is disposed.
self._prepped_processors: Sequence[Task[Any, Any]] = tuple(
self._processor_to_task(self._exit_stack.push(_processor))
for _processor in self._processors
)
self._executor: ConcurrentExecutor[_RDT, _PDT]
self._executor = ConcurrentExecutor(
*self._prepped_processors, executor=self._executor_factory()
)

@not_disposed
@override
def __enter__(self) -> Self:
"""Return ``self`` upon entering the runtime context.
.. admonition:: Don't use after dispose
:class: error
Invoking this method on an instance that is disposed(i.e. the
:attr:`is_disposed` property on the instance is ``True``) will
result in a :exc:`ResourceDisposedError` being raised.
:return: This instance.
:raise ResourceDisposedError: If this processor has already been
disposed.
"""
return super(Processor, self).__enter__()

@property
@override
def is_disposed(self) -> bool:
return self._is_disposed

@not_disposed
@override
def apply(self, raw_data: _RDT) -> Sequence[_PDT]:
"""Process the supplied raw data using all embedded processors and
return the results.
This method concurrently applies all the embedded processors to the
supplied raw data. It then applies the results-gatherer function
assigned to this instance (at creation) to the :class:`Future` objects
resulting from the concurrent execution. Each of these ``Future``
objects wraps the result of processing the supplied raw data using an
embedded processor contained in this ``ScatterGatherProcessor``, and
they preserve the same order.
The contents of the resulting aggregate, and their ordering, are
determined by the provided result-gatherer function. That is, the
contents of the returned ``Sequence``, will be the same as those
returned by this object's result-gatherer function and in the same
order.
.. admonition:: Don't use after dispose
:class: error
Invoking this method on an instance that is disposed(i.e. the
:attr:`is_disposed` property on the instance is ``True``) will
result in a :exc:`ResourceDisposedError` being raised.
:param raw_data: The data to be processed.
:return: The aggregated results of applying all the embedded processors
to the provided raw data.
:raise ResourceDisposedError: If this processor has already been
disposed.
""" # noqa: D205
self._logger.info(
"Forking processing of the received data to all embedded "
"processors."
)
with self._executor as executor:
futures = executor.execute(raw_data)

return tuple(self._result_gatherer(futures))

@override
def dispose(self) -> None:
"""Release any underlying resources contained by this processor.
All embedded processors are also disposed. After this method returns
successfully, the :attr:`is_disposed` property should return ``True``.
.. note::
Unless otherwise specified, trying to use methods of a
``Disposable`` instance decorated with the
:func:`~sghi.disposable.not_disposed` decorator after this method
returns should generally be considered a programming error and
should result in a :exc:`~sghi.disposable.ResourceDisposedError`
being raised.
This method should be idempotent allowing it to be called more
than once; only the first call, however, should have an effect.
:return: None.
"""
self._is_disposed = True
self._exit_stack.close()
self._executor.dispose()
self._logger.info("Disposal complete.")

def _processor_to_task(self, p: Processor[_RDT, _PDT]) -> Task[_RDT, _PDT]:
@task
def do_apply(raw_data: _RDT) -> _PDT:
with p as _p:
apply = self._retry_policy_factory().retry(_p.apply)
return apply(raw_data)

return do_apply


@final
class SplitGatherProcessor(
Processor[Sequence[_RDT], Sequence[_PDT]], Generic[_RDT, _PDT]
Expand Down Expand Up @@ -430,6 +647,7 @@ def __init__(
),
message="'processors' MUST NOT be empty.",
)
self._processors: Sequence[Processor[_RDT, _PDT]] = tuple(processors)
self._retry_policy_factory: Callable[[], Retry] = ensure_callable(
value=retry_policy_factory,
message="'retry_policy_factory' MUST be a callable.",
Expand All @@ -442,7 +660,6 @@ def __init__(
value=result_gatherer,
message="'result_gatherer' MUST be a callable.",
)
self._processors: Sequence[Processor[_RDT, _PDT]] = tuple(processors)
self._is_disposed: bool = False
self._logger: Logger = logging.getLogger(type_fqn(self.__class__))
self._exit_stack: ExitStack = ExitStack()
Expand Down Expand Up @@ -661,6 +878,7 @@ def dispose(self) -> None:
__all__ = [
"NOOPProcessor",
"ProcessorPipe",
"ScatterGatherProcessor",
"SplitGatherProcessor",
"pipe_processors",
"processor",
Expand Down

0 comments on commit 859567e

Please sign in to comment.