diff --git a/docs/conf.py b/docs/conf.py index 0dbffa3..7b7b7f3 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -60,23 +60,12 @@ ("py:class", "_DT"), # type annotation only available when type checking ("py:class", "_P"), # type annotation only available when type checking ("py:class", "_RT"), # type annotation only available when type checking + ("py:class", "Chain[Any]"), # Used as type annotation. Only available when type checking ("py:class", "TracebackType"), # Used as type annotation. Only available when type checking ("py:class", "concurrent.futures._base.Executor"), # sphinx can't find it ("py:class", "concurrent.futures._base.Future"), # sphinx can't find it - ("py:class", "sghi.disposable.decorators._D"), # private type annotations - ("py:class", "sghi.disposable.decorators._DE"), # private type annotations - ("py:class", "sghi.disposable.decorators._P"), # private type annotations - ("py:class", "sghi.disposable.decorators._R"), # private type annotations - ("py:class", "sghi.disposable.decorators.not_disposed._D"), # private type annotations - ("py:class", "sghi.disposable.decorators.not_disposed._DE"), # private type annotations - ("py:class", "sghi.disposable.decorators.not_disposed._P"), # private type annotations - ("py:class", "sghi.disposable.decorators.not_disposed._R"), # private type annotations - ("py:class", "sghi.task.task._IT"), # private type annotations - ("py:class", "sghi.task.task._OT"), # private type annotations - ("py:class", "sghi.task.common._IT"), # private type annotations - ("py:class", "sghi.task.common._OT"), # private type annotations - ("py:class", "sghi.task.concurrent._IT"), # private type annotations - ("py:class", "sghi.task.concurrent._OT"), # private type annotations + ("py:class", "sghi.task._IT"), # private type annotations + ("py:class", "sghi.task._OT"), # private type annotations ("py:class", "sghi.utils.checkers._Comparable"), # private type annotations ("py:class", "sghi.utils.checkers._ST"), # private type annotations ("py:class", "sghi.utils.checkers._T"), # private type annotations @@ -84,12 +73,8 @@ ("py:class", "sghi.typing._CT_contra"), # private type annotations ("py:obj", "sghi.disposable.decorators.not_disposed._P"), # private type annotations ("py:obj", "sghi.disposable.decorators.not_disposed._R"), # private type annotations - ("py:obj", "sghi.task.task._IT"), # private type annotations - ("py:obj", "sghi.task.task._OT"), # private type annotations - ("py:obj", "sghi.task.common._IT"), # private type annotations - ("py:obj", "sghi.task.common._OT"), # private type annotations - ("py:obj", "sghi.task.concurrent._IT"), # private type annotations - ("py:obj", "sghi.task.concurrent._OT"), # private type annotations + ("py:obj", "sghi.task._IT"), # private type annotations + ("py:obj", "sghi.task._OT"), # private type annotations ("py:obj", "sghi.typing._CT_contra"), # private type annotations ] diff --git a/docs/index.rst b/docs/index.rst index 172ddc9..2cc2732 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -43,6 +43,7 @@ API Reference sghi.app sghi.disposable sghi.exceptions + sghi.task sghi.typing sghi.utils diff --git a/src/sghi/disposable/__init__.py b/src/sghi/disposable/__init__.py index c5d0451..d65d812 100644 --- a/src/sghi/disposable/__init__.py +++ b/src/sghi/disposable/__init__.py @@ -106,10 +106,11 @@ def dispose(self) -> None: .. note:: Unless otherwise specified, trying to use methods of a - ``Disposable`` instance decorated with the :func:`not_disposed` - decorator after this method returns should generally be considered - a programming error and should result in a - :exc:`ResourceDisposedError` being raised. + ``Disposable`` instance decorated with the + :func:`~sghi.disposable.not_disposed` decorator after this method + returns should generally be considered a programming error and + should result in a :exc:`~sghi.disposable.ResourceDisposedError` + being raised. This method should be idempotent allowing it to be called more than once; only the first call, however, should have an effect. diff --git a/src/sghi/task/__init__.py b/src/sghi/task/__init__.py new file mode 100644 index 0000000..9418aaf --- /dev/null +++ b/src/sghi/task/__init__.py @@ -0,0 +1,548 @@ +""" +``Task`` interface definition together with its common implementations. +""" +from __future__ import annotations + +import warnings +from abc import ABCMeta, abstractmethod +from collections.abc import Callable, Iterable, MutableSequence, Sequence +from concurrent.futures import ( + ALL_COMPLETED, + Executor, + Future, + ThreadPoolExecutor, + wait, +) +from functools import reduce +from logging import Logger, getLogger +from typing import TYPE_CHECKING, Any, Generic, TypeVar, cast + +from ..disposable import Disposable, ResourceDisposedError +from ..disposable import not_disposed as _nd_factory +from ..utils import ensure_not_none, ensure_not_none_nor_empty, type_fqn + +if TYPE_CHECKING: + from typing_extensions import Self + +# ============================================================================= +# TYPES +# ============================================================================= + + +_IT = TypeVar("_IT") +_OT = TypeVar("_OT") + + +# ============================================================================= +# HELPERS +# ============================================================================= + + +def _callables_to_tasks_as_necessary( + tasks: Sequence[Task[_IT, _OT] | Callable[[_IT], _OT]], +) -> Sequence[Task[_IT, _OT]]: + """Convert callables to :class:`Task` instances if necessary. + + The given callables should accept a single parameter of type ``IT`` and + return a value of type ``OT``. Instances of ``Tasks`` in the given + ``Sequence`` will be returned as is. + + :param tasks: A ``Sequence`` of ``Task`` instances or callables. + + :return: A ``Sequence`` of ``Task`` instances. + + :raises ValueError: If `tasks` is ``None``. + """ + ensure_not_none(tasks, "'tasks' MUST not be None.") + return tuple( + task if isinstance(task, Task) else Task.of_callable(task) + for task in tasks + ) + + +# ============================================================================= +# EXCEPTIONS +# ============================================================================= + + +class ConcurrentExecutorDisposedError(ResourceDisposedError): + """ + Indicates that an erroneous usage of a disposed :class:`ConcurrentExecutor` + was made. + """ + + def __init__(self, message: str | None = "ConcurrentExecutor disposed."): + """ + Initialize a ``ConcurrentExecutorDisposedError`` with an optional + message. + + :param message: An optional message for the exception. + """ + super().__init__(message=message) + + +# ============================================================================= +# TASK INTERFACE +# ============================================================================= + + +class Task(Generic[_IT, _OT], metaclass=ABCMeta): + """A job or action to perform. + + An interface that describes a job or action to be performed. The interface + defines a single method :meth:`execute`, that accepts a single input value + and returns a result. A `Task` instance can also be used as a callable, the + actual call is delegated to the ``execute`` method. + """ + + __slots__ = () + + def __call__(self, an_input: _IT) -> _OT: + """Perform a computation given an input and return a result. + + Call the ``Task`` as a callable. Delegate actual call to + :meth:`execute`. + + :param an_input: An input to the tasks. + + :return: The result of the computation. + """ + return self.execute(an_input) + + @abstractmethod + def execute(self, an_input: _IT) -> _OT: + """Perform a computation given an input and return a result. + + :param an_input: An input to the task. + + :return: The result of the computation. + """ + ... + + @staticmethod + def of_callable(source_callable: Callable[[_IT], _OT]) -> Task[_IT, _OT]: + """Create a :class:`Task` instance from a callable. + + .. note:: + + The given callable *MUST* have at *MOST*, one required argument. + + :param source_callable: The callable function to wrap as a ``Task``. + This *MUST* not be ``None``. + + :return: A ``Task`` instance. + + :raises ValueError: If `source_callable` is ``None``. + """ + return _OfCallable(source_callable=source_callable) + + +# ============================================================================= +# COMMON IMPLEMENTATIONS +# ============================================================================= + + +class Chain(Task[Callable[[_IT], Any], "Chain[Any]"], Generic[_IT]): + """ + A :class:`Task` that wraps a value and applies a transformation (or series + of transformations) on the value. + + The output of each transformation is wrapped in a new ``Chain`` instance, + facilitating the chaining together of multiple transformations. This + capability can be employed to compose complex transformations from smaller + ones. + + The wrapped value can be retrieved using the :attr:`value` property. + """ + + __slots__ = ("_value",) + + def __init__(self, value: _IT) -> None: + """Initialize a new :class:`Chain` instance that wraps the given value. + + :param value: The value to wrap and apply transformations to. + """ + super().__init__() + self._value: _IT = value + + def __add__(self, __an_input: Callable[[_IT], _OT], /) -> Chain[_OT]: + return self.execute(an_input=__an_input) + + @property + def value(self) -> _IT: + """Return the wrapped value. + + :return: The wrapped value. + """ + return self._value + + def execute(self, an_input: Callable[[_IT], _OT]) -> Chain[_OT]: + """ + Perform the given transformation on the wrapped value and wrap the + result in a new ``Chain`` instance. + + :param an_input: A callable defining a transformation to be applied to + the wrapped value. This MUST not be ``None``. + + :return: A new ``Chain`` instance that wraps the result of the given + transformation. + + :raises ValueError: If the given transformation is ``None``. + """ + bind: Callable[[_IT], _OT] + bind = ensure_not_none(an_input, "'an_input' MUST not be None.") + return Chain(bind(self._value)) + + +class Consume(Task[_IT, _IT], Generic[_IT]): + """A :class:`Task` that applies an action to it's inputs. + + This ``Task`` wraps a callable and applies it to its input. It returns + its input value as is on execution and is better suited for + operations with side effects. + """ + + __slots__ = ("_accept",) + + def __init__(self, accept: Callable[[_IT], Any]) -> None: + """ + Initialize a new :class:`Consume` instance that applies the given + action to it's inputs. + + :param accept: A callable to apply to this task's inputs. This MUST not + be None. + + :raises ValueError: If the given callable is ``None``. + """ + super().__init__() + ensure_not_none(accept, "'accept' MUST not be None.") + self._accept: Callable[[_IT], Any] = accept + + def __add__(self, __an_input: Callable[[_IT], Any], /) -> Consume[_IT]: + return self.and_then(accept=__an_input) + + def and_then(self, accept: Callable[[_IT], Any]) -> Consume[_IT]: + """Compose this :class:`Consume` action with the provided action. + + This creates a new ``Consume`` instance that performs both this task's + action and the provided action. + + :param accept: The action to compose with this task's action. This + MUST be not None. + + :return: A new ``Consume`` instance that performs both actions. + + :raises ValueError: If the given callable is ``None``. + """ + ensure_not_none(accept, "'accept' MUST not be None.") + + def _compose_accept(an_input: _IT) -> None: + self._accept(an_input) + accept(an_input) + + return Consume(accept=_compose_accept) + + def execute(self, an_input: _IT) -> _IT: + self._accept(an_input) + return an_input + + +class Pipe(Task[_IT, _OT], Generic[_IT, _OT]): + """A :class:`Task` that pipes its input through a ``Sequence`` of tasks. + + This ``Task`` applies a series of tasks to its input, passing the output of + one ``Task`` as the input to the next. + """ + + __slots__ = ("_tasks",) + + def __init__(self, *tasks: Task[Any, Any] | Callable[[Any], Any]): + """Create a new :class:`Pipe` instance of the given tasks. + + :param tasks: A ``Sequence`` of the tasks to apply this pipe's inputs + to. This MUST not be empty. + + :raises ValueError: If no tasks were specified, i.e. ``tasks`` is + empty. + """ + super().__init__() + ensure_not_none_nor_empty(tasks, "'tasks' MUST not be None or empty.") + self._tasks: Sequence[Task[Any, Any]] + self._tasks = _callables_to_tasks_as_necessary(tasks) + + @property + def tasks(self) -> Sequence[Task[Any, Any]]: + """The ``Sequence`` of :class:`tasks ` that comprise this pipe. + + :return: The tasks that comprise this pipe. + """ + return self._tasks + + def execute(self, an_input: _IT) -> _OT: + """ + Apply the given input to the :class:`tasks ` that comprise this + pipe, sequentially. + + The output of each task becomes the input of the next one. The result + of the final ``Task`` is the output of this *pipe* operation. + + :param an_input: The input to start with. + + :return: The final output after applying all the tasks. + """ + _acc: Any + _tsk: Task[Any, Any] + return cast( + _OT, + reduce( + lambda _acc, _tsk: _tsk(_acc), + self.tasks[1:], + self.tasks[0](an_input), + ), + ) + + +chain = Chain + +consume = Consume + +pipe = Pipe + + +# ============================================================================= +# CONCURRENT EXECUTOR +# ============================================================================= + + +not_disposed = _nd_factory(exc_factory=ConcurrentExecutorDisposedError) + + +class ConcurrentExecutor( + Task[_IT, Iterable[Future[_OT]]], + Disposable, + Generic[_IT, _OT], +): + """ + A :class:`Task` that concurrently executes multiple `tasks` with a shared + input. + + The output of these tasks is an :class:`Iterable` of + :external+python:py:class:`futures` + representing the execution of the given tasks. If the + ``wait_for_completion`` constructor parameter is set to ``True``, the + default, the `execute` method will block until all tasks have completed. + + .. important:: + + When ``wait_for_completion`` is set to ``False``, instances of this + class should not be used as context managers. This is because the + underlying ``Executor`` will be shutdown immediately once ``execute`` + returns. + + .. tip:: + + By default, this tasks uses a + :external+python:py:class:`~concurrent.futures.ThreadPoolExecutor` + to run the tasks concurrently. This is suitable for short `I/O-bound` + tasks. However, for compute-intensive tasks, consider using a + :external+python:py:class:`~concurrent.futures.ProcessPoolExecutor` by + passing it through the ``executor`` constructor parameter during + initialization. + + For more details, see the official Python + :doc:`concurrency docs `. + """ + + __slots__ = ( + "_tasks", + "_wait_for_completion", + "_executor", + "_is_disposed", + "_logger", + ) + + def __init__( + self, + *tasks: Task[_IT, _OT] | Callable[[_IT], _OT], + wait_for_completion: bool = True, + executor: Executor | None = None, + ): + """ + Initialize a new `ConcurrentExecutor` instance with the given + properties. + + :param tasks: The tasks to be executed concurrently. This MUST not be + ``None`` or empty. + :param wait_for_completion: Whether ``execute`` should block and wait + for all the given tasks to complete execution. Defaults to + ``True``. + :param executor: The :class:`executor ` + instance to use when executing the tasks. If not provided, a + ``ThreadPoolExecutor`` is used. + + :raises ValueError: If tasks is ``None`` or empty. + """ + super().__init__() + ensure_not_none_nor_empty(tasks, "'tasks' MUST not be None or empty.") + self._tasks: Sequence[Task[_IT, _OT]] + self._tasks = _callables_to_tasks_as_necessary(tasks) + self._wait_for_completion: bool = wait_for_completion + self._executor: Executor = executor or ThreadPoolExecutor() + self._is_disposed: bool = False + self._logger: Logger = getLogger(type_fqn(self.__class__)) + + @not_disposed + def __enter__(self) -> Self: + super().__enter__() + if not self._wait_for_completion: + msg: str = ( + "Using instances of this class as a context manager when " + "'wait_for_completion' is set to 'False' is discouraged." + ) + warnings.warn(message=msg, stacklevel=3) + return self + + @property + def is_disposed(self) -> bool: + return self._is_disposed + + @property + def tasks(self) -> Sequence[Task[_IT, _OT]]: + """ + Get the sequence of :class:`tasks` that will be executed + concurrently. + + :return: The sequence of tasks. + """ + return self._tasks + + def dispose(self) -> None: + """ + Shutdown the underlying :class:`~concurrent.futures.Executor` powering + this ``Task``. + + After this method returns successfully, the :attr:`is_disposed` + property will always return ``True``. + + .. important:: + + When ``wait_for_completion`` is set to ``True``, this method will + wait for all scheduled tasks to complete before returning. If set + to ``False``, this is not guaranteed, which might result in some + tasks not being executed or being canceled. + + .. note:: + Unless otherwise specified, trying to use methods of this class + decorated with the :func:`~sghi.disposable.not_disposed` decorator + after this method returns is considered a programming error and + will result in a :exc:`ConcurrentExecutorDisposedError` being + raised. + + This method is idempotent allowing it to be called more than once; + only the first call, however, has an effect. + + :return: None. + """ + self._executor.shutdown(wait=self._wait_for_completion) + self._is_disposed = True + + @not_disposed + def execute(self, an_input: _IT) -> Iterable[Future[_OT]]: + """Execute the tasks concurrently with the given input. + + .. note:: + + If the ``wait_for_completion`` property is ``True``, this method + will block until all tasks finish execution. If set to ``False``, + all tasks will be scheduled for concurrent execution, after which + this method will return immediately, regardless of whether all + tasks have completed execution. + + :param an_input: The shared input to pass to each task. + + :return: An ``Iterable`` of futures representing the execution of each + task. + + :raises ConcurrentExecutorDisposedError: If this executor is already + disposed. + """ + futures: Iterable[Future[_OT]] = reduce( + lambda _partial, _tsk: self._accumulate( + _partial, + self._executor.submit(self._do_execute_task, _tsk, an_input), + ), + self.tasks, + [], + ) + if self._wait_for_completion: + wait(futures, return_when=ALL_COMPLETED) + return futures + + @staticmethod + def _accumulate( + partial_results: MutableSequence[Future[_OT]], + task_output: Future[_OT], + ) -> MutableSequence[Future[_OT]]: + partial_results.append(task_output) + return partial_results + + def _do_execute_task(self, task: Task[_IT, _OT], an_input: _IT) -> _OT: + """Execute an individual :class:`Task` with the provided input. + + :param task: The ``Task`` to execute. + :param an_input: The input to pass to the ``Task``. + + :return: The result of the tasks's execution. + + :raises Exception: If the tasks execution encounters an exception. + """ + try: + result: _OT = task(an_input) + except Exception as exp: + self._logger.error( + "Error while executing tasks of type='%s'.", + type_fqn(type(task)), + exc_info=exp, + ) + raise exp + return result + + +execute_concurrently = ConcurrentExecutor + + +# ============================================================================= +# FROM CALLABLE +# ============================================================================= + + +class _OfCallable(Task[_IT, _OT]): + __slots__ = ("_source_callable",) + + def __init__(self, source_callable: Callable[[_IT], _OT]): + super().__init__() + ensure_not_none(source_callable, "'source_callable' MUST not be None.") + self._source_callable: Callable[[_IT], _OT] + self._source_callable = source_callable + + def execute(self, an_input: _IT) -> _OT: + return self._source_callable(an_input) + + +# ============================================================================= +# MODULE EXPORTS +# ============================================================================= + + +__all__ = [ + "Chain", + "ConcurrentExecutor", + "ConcurrentExecutorDisposedError", + "Consume", + "Pipe", + "Task", + "chain", + "consume", + "execute_concurrently", + "pipe", +] diff --git a/test/sghi/task_tests.py b/test/sghi/task_tests.py new file mode 100644 index 0000000..d448c41 --- /dev/null +++ b/test/sghi/task_tests.py @@ -0,0 +1,462 @@ +import operator +import time +from collections.abc import Sequence +from concurrent.futures import wait +from functools import partial +from typing import TYPE_CHECKING +from unittest import TestCase + +import pytest + +from sghi.task import ( + ConcurrentExecutor, + ConcurrentExecutorDisposedError, + Task, + chain, + consume, + pipe, +) +from sghi.utils import ensure_greater_than, future_succeeded + +if TYPE_CHECKING: + from collections.abc import Callable + + +class TestConsume(TestCase): + """Tests for the :class:`consume` ``Task``.""" + + def test_and_then_method_returns_expected_value(self) -> None: + """ + :meth:`consume.and_then` method should return a new instance of + :class:`consume` that composes both the action of the current + ``consume`` instance and the new action. + """ + + collection1: list[int] = [] + collection2: set[int] = set() + + collector: consume[int] + collector = consume(collection1.append).and_then(collection2.add) + + assert isinstance(collector, consume) + + collector.execute(10) + assert len(collection1) == len(collection2) == 1 + + collector.execute(20) + assert len(collection1) == len(collection2) == 2 + + collector.execute(30) + assert len(collection1) == len(collection2) == 3 + + def test_instantiation_with_a_none_input_fails(self) -> None: + """ + :class:`consume` constructor should raise ``ValueError`` when given a + ``None`` input. + """ + with pytest.raises(ValueError, match="MUST not be None.") as exc_info: + consume(accept=None) # type: ignore + + assert exc_info.value.args[0] == "'accept' MUST not be None." + + def test_execute_performs_the_expected_side_effects(self) -> None: + """ + :meth:`consume.execute` method should apply it's input to the wrapped + action only, i.e., it should perform it's intended side effects only. + """ + results: list[int] = [] + collector: consume[int] = consume(accept=results.append) + + assert collector.execute(10) == 10 + assert len(results) == 1 + assert results[0] == 10 + + assert collector.execute(20) == 20 + assert len(results) == 2 + assert results[0] == 10 + assert results[1] == 20 + + value: int = 30 + assert collector.execute(value) == value + assert len(results) == 3 + assert results[0] == 10 + assert results[1] == 20 + assert results[2] == value + assert value == 30 # value should not have changed + + def test_different_and_then_method_invocation_styles_return_same_value(self) -> None: # noqa: E501 + """ + :meth:`consume.execute` should return the same value regardless of how + it was invoked. + """ + + collection1: list[int] = [] + collection2: set[int] = set() + collection3: list[int] = [] + collection4: set[int] = set() + + # Style 1, explicit invocation + collector1: consume[int] + collector1 = consume(collection1.append).and_then(collection2.add) + # Style 2, using the plus operator + collector2: consume[int] + collector2 = consume(collection3.append) + collection4.add + + assert isinstance(collector1, consume) + assert isinstance(collector2, consume) + + collector1(10) + collector2(10) + assert len(collection1) == len(collection2) == 1 + assert len(collection3) == len(collection4) == 1 + + collector1(20) + collector1(30) + collector2(20) + collector2(30) + assert len(collection1) == len(collection2) == 3 + assert len(collection3) == len(collection4) == 3 + + def test_different_execute_invocation_styles_return_same_value(self) -> None: # noqa: E501 + """ + :meth:`consume.execute` should return the same value regardless of how + it was invoked. + """ + + results1: list[int] = [] + collector1: consume[int] = consume(accept=results1.append) + results2: list[int] = [] + collector2: consume[int] = consume(accept=results2.append) + + # Style 1, explicit invocation + collector1.execute(10) + # Style 2, invoke as callable + collector2(10) + + assert len(results1) == len(results2) == 1 + assert results1[0] == results2[0] == 10 + + # Style 1, explicit invocation + collector1.execute(20) + # Style 2, invoke as callable + collector2(20) + + assert len(results1) == len(results2) == 2 + assert results1[0] == results2[0] == 10 + assert results1[1] == results2[1] == 20 + + +class TestChain(TestCase): + """Tests for the :class:`chain` ``Task``.""" + + def setUp(self) -> None: + super().setUp() + self._add_30 = partial(operator.add, 30) + self._multiply_by_2 = partial(operator.mul, 2) + self._chain_of_10: chain = chain(10) + + def test_execute_fails_on_none_input(self) -> None: + """ + :meth:`chain.execute` method should raise a :exc:`ValueError` when + invoked with a ``None`` argument. + """ + + with pytest.raises(ValueError, match="MUST not be None.") as exc_info: + self._chain_of_10(None) # type: ignore + + assert exc_info.value.args[0] == "'an_input' MUST not be None." + + def test_execute_return_value(self) -> None: + """ + :meth:`chain.execute` method should return a new :class:`chain` + instance with the new computed value. + """ + + instance: chain[int] + instance = self._chain_of_10\ + .execute(self._multiply_by_2)\ + .execute(self._add_30) + + assert isinstance(self._chain_of_10.execute(self._add_30), chain) + assert self._chain_of_10.execute(self._multiply_by_2).value == 20 + assert self._chain_of_10.execute(self._add_30).value == 40 + assert instance.value == 50 + + def test_different_execute_invocation_styles_return_same_value(self) -> None: # noqa: E501 + """ + :meth:`chain.execute` should return the same value regardless of how + it was invoked. + """ + + instance1: chain[int] + instance2: chain[int] + instance3: chain[int] + + # Style 1, explicit invocation + instance1 = self._chain_of_10\ + .execute(self._multiply_by_2)\ + .execute(self._add_30) + # Style 2, invoke as callable + instance2 = self._chain_of_10(self._multiply_by_2)(self._add_30) + # Style 3, using the plus operator + instance3 = self._chain_of_10 + self._multiply_by_2 + self._add_30 + + assert instance1.value == instance2.value == instance3.value == 50 + + def test_value_property_return_value(self) -> None: + """ + :attr:`chain.value` should return the wrapped value of the current + chain instance. + """ + + assert self._chain_of_10.value == 10 + assert self._chain_of_10(self._multiply_by_2).value == 20 + assert self._chain_of_10(self._add_30).value == 40 + + +class TestConcurrentExecutor(TestCase): + """Tests for the :class:`ConcurrentExecutor` ``Task``.""" + + def setUp(self) -> None: + super().setUp() + self._longer_io_tasks: Sequence[Task[float, float]] = tuple( + Task.of_callable(self._do_longer_io_bound_task) + for _ in range(3) + ) + self._short_io_tasks: Sequence[Task[float, float]] = tuple( + Task.of_callable(self._do_io_bound_task) + for _ in range(5) + ) + self._blocking_executor: ConcurrentExecutor[float, float] + self._blocking_executor = ConcurrentExecutor( + *self._longer_io_tasks, *self._short_io_tasks, + ) + self._non_blocking_executor: ConcurrentExecutor[float, float] + self._non_blocking_executor = ConcurrentExecutor( + *self._longer_io_tasks, *self._short_io_tasks, + wait_for_completion=False, + ) + + def tearDown(self) -> None: + super().tearDown() + self._blocking_executor.dispose() + self._non_blocking_executor.dispose() + + def test_enter_context_on_a_disposed_executor_fails(self) -> None: + """ + :meth:`ConcurrentExecutor.__enter__` should raise + :exc:`ConcurrentExecutorDisposedError` when invoked on a disposed + instance. + """ + self._blocking_executor.dispose() + self._non_blocking_executor.dispose() + + ced = ConcurrentExecutorDisposedError + err_msg: str = "ConcurrentExecutor disposed." + with pytest.raises(ced) as exc_info1: # noqa: SIM117 + with self._blocking_executor: + ... + + with pytest.raises(ced) as exc_info2: # noqa: SIM117 + with self._non_blocking_executor: + ... + + assert exc_info1.value.message == err_msg + assert exc_info2.value.message == err_msg + + def test_enter_context_on_non_blocking_mode_warns(self) -> None: + """ + :meth:`ConcurrentExecutor.__enter__` should warn the user. This is + most likely erroneous API usage. + """ + + with pytest.warns(UserWarning, match="is discouraged"): # noqa: SIM117 + with self._non_blocking_executor: + ... + + def test_failing_wrapped_tasks_errors_are_re_raised(self) -> None: + """ + Errors raised by the :meth:`ConcurrentExecutor.execute` should be + propagated as is the returned futures. + """ + tasks = tuple(self._do_failing_io_bound_task for _ in range(3)) + with ConcurrentExecutor(*tasks) as executor: + futures = tuple(executor(2)) + for exc in map(operator.methodcaller("exception"), futures): + assert isinstance(exc, ZeroDivisionError) + + def test_execute_invocation_on_a_disposed_executor_fails(self) -> None: + """ + :meth:`ConcurrentExecutor.execute` should raise a + :exc:`ConcurrentExecutorDisposedError` when invoked on a disposed + instance. + """ + self._blocking_executor.dispose() + self._non_blocking_executor.dispose() + + err_msg: str = "ConcurrentExecutor disposed." + with pytest.raises(ConcurrentExecutorDisposedError) as exc_info1: + self._blocking_executor(10) + + with pytest.raises(ConcurrentExecutorDisposedError) as exc_info2: + self._non_blocking_executor(30) + + assert exc_info1.value.message == err_msg + assert exc_info2.value.message == err_msg + + def test_execute_return_value_in_blocking_mode(self) -> None: + """ + :meth:`ConcurrentExecutor.execute` method should return completed + futures. + """ + with self._blocking_executor as executor: + futures = tuple(executor(3)) + + # All futures should be complete by the time execute returns. + assert all(map(future_succeeded, futures)) + assert all( + v == 3.0 or v == 9.0 + for v in map(operator.methodcaller("result", 1), futures) + ) + assert len(futures) == ( + len(self._longer_io_tasks) + len(self._short_io_tasks) + ) + futures[0].result() + + def test_execute_return_value_in_non_blocking_mode(self) -> None: + """ + :meth:`ConcurrentExecutor.execute` method should return immediately + after scheduling tasks to be executed concurrently. This means that + some/all tasks will not have completed yet once the method returns. + """ + futures = tuple(self._non_blocking_executor(2)) + + # FIXME: Find a better test, this might fail due to a race condition. + assert any( + # Not completed. That is, check that some future has not yet + # completed. + map(operator.not_, map(operator.methodcaller("done"), futures)), + ) + wait(futures) + assert all( + v == 2.0 or v == 6.0 + for v in map(operator.methodcaller("result", 1), futures) + ) + assert len(futures) == ( + len(self._longer_io_tasks) + len(self._short_io_tasks) + ) + + def test_dispose_side_effects(self) -> None: + """ + :meth:`ConcurrentExecutor.dispose` should result in the + :attr:`ConcurrentExecutor.is_disposed` property returning ``True``. + """ + + assert not self._blocking_executor.is_disposed + assert not self._non_blocking_executor.is_disposed + + self._blocking_executor.dispose() + self._non_blocking_executor.dispose() + + assert self._blocking_executor.is_disposed + assert self._non_blocking_executor.is_disposed + + @staticmethod + def _do_failing_io_bound_task(task_input: float) -> float: + tce = TestConcurrentExecutor + return tce._do_io_bound_task(task_input) / 0 + + @staticmethod + def _do_io_bound_task(task_input: float) -> float: + ensure_greater_than(task_input, 0, message="expected task_input > 0") + time.sleep(task_input) + return task_input + + @staticmethod + def _do_longer_io_bound_task(task_input: float) -> float: + tce = TestConcurrentExecutor + return tce._do_io_bound_task(task_input * 3) + + +class TestPipe(TestCase): + """Tests for the :class:`pipe` ``Task``.""" + + def setUp(self) -> None: + super().setUp() + self._add_100: Callable[[int], int] = partial(operator.add, 100) + self._multiply_by_10: Callable[[int], int] = partial(operator.mul, 10) + self._instance: pipe[int, str] = pipe( + self._add_100, + self._add_100, + self._add_100, + self._add_100, + Task.of_callable(self._add_100), + Task.of_callable(self._multiply_by_10), + str, + ) + + def test_execute_return_value(self) -> None: + """ + :meth:`pipe.execute` should return the value of applying it's input + value to all it's tasks sequentially. + """ + + assert self._instance(0) == "5000" + assert self._instance(500) == "10000" + assert self._instance(-500) == "0" + + def test_pipe_instantiation_with_empty_tasks_fails(self) -> None: + """ + Instantiation of :class:`pipe` with no tasks should raise + ``ValueError``. + """ + with pytest.raises(ValueError, match="MUST not be None or empty."): + pipe() + + def test_tasks_property_return_value(self) -> None: + """ + :attr:`pipe.tasks` should return the ``Sequence`` of tasks that + comprise the ``pipe``. + """ + + assert len(self._instance.tasks) == 7 + assert isinstance(self._instance.tasks, Sequence) + assert isinstance(self._instance.tasks[0], Task) + + def test_tasks_property_return_value_has_tasks_only(self) -> None: + """ + :attr:`pipe.tasks` should return the ``Sequence`` of ``Task`` instances + that comprise the ``pipe``. This should be ``Task`` instances + regardless of whether the original callable was a ``Task``. + """ + for task in self._instance.tasks: + assert isinstance(task, Task) + + +class TestTask(TestCase): + """Tests of the :class:`Task` interface default method implementations.""" + + def test_of_callable_fails_on_none_input_value(self) -> None: + """ + :meth:`Task.of_callable` should raise a :exc:`ValueError` when given a + ``None`` callable as input. + """ + with pytest.raises(ValueError, match="MUST not be None.") as exc_info: + Task.of_callable(source_callable=None) # type: ignore + + assert exc_info.value.args[0] == "'source_callable' MUST not be None." + + def test_of_callable_method_returns_expected_value(self) -> None: + """ + :meth:`Task.of_callable` should return a new ``Task`` instance wrapping + the given callable. + """ + + add_100: Callable[[int], int] = partial(operator.add, 100) + multiply_by_10: Callable[[int], int] = partial(operator.mul, 10) + + task1: Task[int, int] = Task.of_callable(add_100) + task2: Task[int, int] = Task.of_callable(multiply_by_10) + + assert add_100(-100) == task1(-100) == 0 + assert multiply_by_10(100) == task2(100) == 1000