Skip to content

Commit

Permalink
feat(task): add sghi.task.Task composition support
Browse files Browse the repository at this point in the history
Add support for `sghi.task.Task` instances composition. `Task` instances
can be composed using the `<<` and `>>` operators. See the example
below:

```python
import operator
from functools import partial

from sghi.task import Task, task

add_100: Task[int, int] = task(partial(operator.add, 100))
int_from_str: Task[str, int] = task(int)
int_to_str: Task[int, str] = task(str)
mul_by_10: Task[int, int] = task(partial(operator.mul, 10))

chained: Task[int, str] = add_100 >> mul_by_10 >> int_to_str
composed: Task[str, int] = add_100 << mul_by_10 << int_from_str

assert isinstance(chained, Task)
assert isinstance(composed, Task)

assert chained(50) == "1500"
assert composed("50") == 600
```
  • Loading branch information
kennedykori committed Apr 11, 2024
1 parent 874db3f commit 0d701ef
Show file tree
Hide file tree
Showing 3 changed files with 380 additions and 129 deletions.
2 changes: 2 additions & 0 deletions docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,9 @@
("py:class", "sghi.dispatch._ST_contra"), # private type annotations
("py:class", "sghi.retry._RT"), # private type annotations
("py:class", "sghi.task._IT"), # private type annotations
("py:class", "sghi.task._IT1"), # private type annotations
("py:class", "sghi.task._OT"), # private type annotations
("py:class", "sghi.task._OT1"), # private type annotations
("py:class", "sghi.utils.checkers._Comparable"), # private type annotations
("py:class", "sghi.utils.checkers._CT"), # private type annotations
("py:class", "sghi.utils.checkers._ST"), # private type annotations
Expand Down
139 changes: 134 additions & 5 deletions src/sghi/task/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,15 @@
)
from functools import reduce, update_wrapper
from logging import Logger, getLogger
from typing import TYPE_CHECKING, Any, Generic, TypeVar, cast, final
from typing import TYPE_CHECKING, Any, Generic, TypeVar, cast, final, overload

from typing_extensions import deprecated, override

from ..disposable import Disposable, ResourceDisposedError
from ..disposable import not_disposed as _nd_factory
from ..utils import (
ensure_callable,
ensure_instance_of,
ensure_not_none,
ensure_not_none_nor_empty,
type_fqn,
Expand All @@ -36,7 +37,9 @@


_IT = TypeVar("_IT")
_IT1 = TypeVar("_IT1")
_OT = TypeVar("_OT")
_OT1 = TypeVar("_OT1")


# =============================================================================
Expand Down Expand Up @@ -114,9 +117,13 @@ class Task(Generic[_IT, _OT], metaclass=ABCMeta):
"""A job or action to perform.
An interface that describes a job or action to be performed. The interface
defines a single method :meth:`execute`, that accepts a single input value
and returns a result. A `Task` instance can also be used as a callable, the
actual call is delegated to the ``execute`` method.
defines a single abstract method :meth:`execute`, that accepts a single
input value and returns a result. A `Task` instance can also be used as a
callable, the actual call is delegated to the ``execute`` method.
``Task`` instances can be composed using the ``<<`` and ``>>`` operators.
The resulting compositions have the same effect as the :meth:`compose` and
:meth:`and_then` methods respectively.
"""

__slots__ = ()
Expand All @@ -133,6 +140,54 @@ def __call__(self, an_input: _IT) -> _OT:
"""
return self.execute(an_input)

def __lshift__(self, __before: Task[_IT1, _IT]) -> Task[_IT1, _OT]:
"""Compose two :class:`tasks<sghi.task.Task>` together.
This operator creates a new task that performs the computation of the
right operand (``__before``) before the computation of the left operand
(``self``). In other words, the output of the right operand becomes
the input to the left operand.
.. versionadded:: 1.4
:param __before: The task to be executed prior to ``self``. This MUST
be a ``Task`` instance.
:return: A new ``Task`` that combines the computations of ``__before``
and ``self`` in that order.
.. seealso:: :meth:`~sghi.task.Task.compose`.
"""
return (
self.compose(__before)
if isinstance(__before, Task)
else NotImplemented
)

def __rshift__(self, __after: Task[_OT, _OT1], /) -> Task[_IT, _OT1]:
"""Chain two :class:`tasks<sghi.task.Task>` together.
This operator creates a new task that performs the computation of the
right operand (``__after``) after the computation of the left operand
(``self``). In other words, the output of left becomes the input to
``__after``.
.. versionadded:: 1.4
:param __after: The task to chain to ``self``. This MUST be a ``Task``
instance.
:return: A new ``Task`` that combines the computations of ``self`` and
``__after`` in that order.
.. seealso:: :meth:`~sghi.task.Task.and_then`.
"""
return (
self.and_then(__after)
if isinstance(__after, Task)
else NotImplemented
)

@abstractmethod
def execute(self, an_input: _IT) -> _OT:
"""Perform a computation given an input and return a result.
Expand All @@ -143,6 +198,70 @@ def execute(self, an_input: _IT) -> _OT:
"""
...

def and_then(self, after: Task[_OT, _OT1]) -> Task[_IT, _OT1]:
"""Chain two :class:`tasks<sghi.task.Task>` together.
Return a new ``Task`` that performs the computation of the given
task after the computation of this task. The returned task first
:meth:`applies<execute>` this task to its input, and then applies the
``after`` task to the result. That is, the out output of
``self.execute()`` becomes the input to ``after.execute()``.
.. versionadded:: 1.4
:param after: The task to be executed following this one. This MUST be
a ``Task`` instance.
:return: A new ``Task`` that combines the computations of this task
and the given one in that order.
:raise ValueError: If ``after`` is NOT a ``Task`` instance.
.. seealso:: :meth:`~sghi.task.Task.compose`.
"""
ensure_instance_of(
value=after,
klass=Task,
message="'after' MUST be an 'sghi.task.Task' instance.",
)

def _do_chain(an_input: _IT) -> _OT1:
return after.execute(self.execute(an_input))

return _OfCallable(_do_chain)

def compose(self, before: Task[_IT1, _IT]) -> Task[_IT1, _OT]:
"""Compose two :class:`tasks<sghi.task.Task>` together.
Return a new ``Task`` that performs the computation of the given
task before the computation of this task. The returned task first
:meth:`applies<execute>` the ``after`` task to its input, and then
applies this task to the result. That is, the output of
``before.execute()`` becomes the input to ``self.execute()``.
.. versionadded:: 1.4
:param before: The task to be executed prior to this one. This MUST be
a ``Task`` instance.
:return: A new ``Task`` that combines the computations of the given
task and this one in that order.
:raise ValueError: If ``before`` is NOT a ``Task`` instance.
.. seealso:: :meth:`~sghi.task.Task.and_then`.
"""
ensure_instance_of(
value=before,
klass=Task,
message="'before' MUST be an 'sghi.task.Task' instance.",
)

def _do_compose(an_input: _IT1) -> _OT:
return self.execute(before.execute(an_input))

return _OfCallable(_do_compose)

@staticmethod
def of_callable(source_callable: Callable[[_IT], _OT]) -> Task[_IT, _OT]:
"""Create a :class:`~sghi.task.Task` instance from a callable.
Expand All @@ -160,7 +279,7 @@ def of_callable(source_callable: Callable[[_IT], _OT]) -> Task[_IT, _OT]:
:raises ValueError: If ``source_callable`` is NOT a callable.
.. seealso:: :func:`~sghi.task.task` decorator.
.. seealso:: :func:`@task<sghi.task.task>` decorator.
"""
# FIXME: rename 'source_callable' to 'target_callable' instead.
return _OfCallable(source_callable=source_callable)
Expand Down Expand Up @@ -256,8 +375,15 @@ def __init__(self, accept: Callable[[_IT], Any]) -> None:
def __add__(self, __an_input: Callable[[_IT], Any], /) -> Consume[_IT]: # type: ignore[reportDeprecated]
return self.and_then(accept=__an_input) # type: ignore[reportDeprecated]

@overload
def and_then(self, after: Task[_OT, _OT1]) -> Task[_IT, _OT1]: ...

@deprecated("To be removed in v2.x")
@overload
def and_then(self, accept: Callable[[_IT], Any]) -> Consume[_IT]: # type: ignore[reportDeprecated]
...

def and_then(self, accept): # type: ignore[reportDeprecated]
"""Compose this :class:`Consume` action with the provided action.
This creates a new ``Consume`` instance that performs both this task's
Expand All @@ -273,6 +399,9 @@ def and_then(self, accept: Callable[[_IT], Any]) -> Consume[_IT]: # type: ignor
:raises ValueError: If ``accept`` is NOT a callable.
"""
if isinstance(accept, Task):
return super().and_then(accept)

ensure_callable(accept, "'accept' MUST be a callable.")

def _compose_accept(an_input: _IT) -> None:
Expand Down
Loading

0 comments on commit 0d701ef

Please sign in to comment.