Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(task): assorted fixes for sghi.task module #34

Merged
merged 1 commit into from
Apr 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 59 additions & 40 deletions src/sghi/task/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
ThreadPoolExecutor,
wait,
)
from functools import reduce
from functools import reduce, update_wrapper
from logging import Logger, getLogger
from typing import TYPE_CHECKING, Any, Generic, TypeVar, cast, final

Expand All @@ -21,9 +21,9 @@
from ..disposable import Disposable, ResourceDisposedError
from ..disposable import not_disposed as _nd_factory
from ..utils import (
ensure_callable,
ensure_not_none,
ensure_not_none_nor_empty,
ensure_predicate,
type_fqn,
)

Expand Down Expand Up @@ -61,23 +61,26 @@ def _callables_to_tasks_as_necessary(
"""
ensure_not_none(tasks, "'tasks' MUST not be None.")
return tuple(
task if isinstance(task, Task) else Task.of_callable(task)
for task in tasks
task if isinstance(task, Task) else _OfCallable(task) for task in tasks
)


def task(f: Callable[[_IT], _OT]) -> Task[_IT, _OT]:
"""Mark/Decorate a ``Callable`` object as a :class:`Task`.
"""Mark/Decorate a callable object as a :class:`~sghi.task.Task`.

:param f: The callable object to be decorated. The callable *MUST* have at
*MOST* one required argument.
.. important::

The decorated callable *MUST* accept at least one argument but have
at *MOST* one required argument.

:return: A ``Task`` instance that wraps the given ``Callable`` object.
:param f: The callable object to be decorated. The callable *MUST* accept
at least one argument but have at *MOST* one required argument.

:raise ValueError: If the given value is ``None`` or not a ``Callable``.
:return: A ``Task`` instance that wraps the given callable object.

:raise ValueError: If the given value is NOT a callable.
"""
ensure_not_none(f, message="The given callable MUST not be None.")
ensure_predicate(callable(f), message="A callable object is required.")
ensure_callable(f, message="A callable object is required.")

return _OfCallable(source_callable=f)

Expand Down Expand Up @@ -144,17 +147,22 @@ def execute(self, an_input: _IT) -> _OT:
def of_callable(source_callable: Callable[[_IT], _OT]) -> Task[_IT, _OT]:
"""Create a :class:`~sghi.task.Task` instance from a callable.

.. note::
.. important::

The given callable *MUST* have at *MOST*, one required argument.
The given callable *MUST* accept at least one argument but have
at *MOST* one required argument.

:param source_callable: The callable function to wrap as a ``Task``.
This *MUST* not be ``None``.
This *MUST* accept at least one argument but have at *MOST* one
required argument.

:return: A ``Task`` instance.

:raises ValueError: If `source_callable` is ``None``.
:raises ValueError: If ``source_callable`` is NOT a callable.

.. seealso:: :func:`~sghi.task.task` decorator.
"""
# FIXME: rename 'source_callable' to 'target_callable' instead.
return _OfCallable(source_callable=source_callable)


Expand Down Expand Up @@ -204,15 +212,15 @@ def execute(self, an_input: Callable[[_IT], _OT]) -> Chain[_OT]:
result in a new ``Chain`` instance.

:param an_input: A callable defining a transformation to be applied to
the wrapped value. This MUST not be ``None``.
the wrapped value.

:return: A new ``Chain`` instance that wraps the result of the given
transformation.

:raises ValueError: If the given transformation is ``None``.
:raises ValueError: If the given transformation is NOT a callable.
"""
bind: Callable[[_IT], _OT]
bind = ensure_not_none(an_input, "'an_input' MUST not be None.")
bind = ensure_callable(an_input, "'an_input' MUST be a callable.")
return Chain(bind(self._value))


Expand All @@ -234,10 +242,10 @@ def __init__(self, accept: Callable[[_IT], Any]) -> None:
:param accept: A callable to apply to this task's inputs. This MUST not
be None.

:raises ValueError: If the given callable is ``None``.
:raises ValueError: If the given callable is NOT a callable.
"""
super().__init__()
ensure_not_none(accept, "'accept' MUST not be None.")
ensure_callable(accept, "'accept' MUST be a callable.")
self._accept: Callable[[_IT], Any] = accept

def __add__(self, __an_input: Callable[[_IT], Any], /) -> Consume[_IT]:
Expand All @@ -250,13 +258,13 @@ def and_then(self, accept: Callable[[_IT], Any]) -> Consume[_IT]:
action and the provided action.

:param accept: The action to compose with this task's action. This
MUST be not None.
MUST be a callable object.

:return: A new ``Consume`` instance that performs both actions.

:raises ValueError: If the given callable is ``None``.
:raises ValueError: If ``accept`` is NOT a callable.
"""
ensure_not_none(accept, "'accept' MUST not be None.")
ensure_callable(accept, "'accept' MUST be a callable.")

def _compose_accept(an_input: _IT) -> None:
self._accept(an_input)
Expand All @@ -283,8 +291,10 @@ class Pipe(Task[_IT, _OT], Generic[_IT, _OT]):
def __init__(self, *tasks: Task[Any, Any] | Callable[[Any], Any]):
"""Create a new :class:`Pipe` instance of the given tasks.

:param tasks: A ``Sequence`` of the tasks to apply this pipe's inputs
to. This MUST not be empty.
:param tasks: A ``Sequence`` of the tasks or callables to apply this
pipe's inputs to. This MUST not be empty. If callables are given,
they MUST accept at least one argument but have at MOST one
required argument.

:raises ValueError: If no tasks were specified, i.e. ``tasks`` is
empty.
Expand Down Expand Up @@ -320,9 +330,9 @@ def execute(self, an_input: _IT) -> _OT:
return cast(
_OT,
reduce(
lambda _acc, _tsk: _tsk(_acc),
lambda _acc, _tsk: _tsk.execute(_acc),
self.tasks[1:],
self.tasks[0](an_input),
self.tasks[0].execute(an_input),
),
)

Expand Down Expand Up @@ -356,18 +366,21 @@ class ConcurrentExecutor(
:external+python:py:class:`futures<concurrent.futures.Future>`
representing the execution of the given tasks. If the
``wait_for_completion`` constructor parameter is set to ``True``, the
default, the `execute` method will block until all tasks have completed.
default, the :meth:`execute` method will block until all tasks have
completed.

.. important::

When ``wait_for_completion`` is set to ``False``, instances of this
class should not be used as context managers. This is because the
underlying ``Executor`` will be shutdown immediately once ``execute``
returns.
underlying ``Executor`` will be shutdown immediately on exit of the
``with`` block. This will happen regardless of the completion status
of the embedded tasks, leading to cancellations of those tasks, which
might not be the intended behaviour.

.. tip::

By default, this tasks uses a
By default, instances of this class use a
:external+python:py:class:`~concurrent.futures.ThreadPoolExecutor`
to run the tasks concurrently. This is suitable for short `I/O-bound`
tasks. However, for compute-intensive tasks, consider using a
Expand Down Expand Up @@ -396,8 +409,10 @@ def __init__(
"""Initialize a new ``ConcurrentExecutor`` instance with the given
properties.

:param tasks: The tasks to be executed concurrently. This MUST not be
``None`` or empty.
:param tasks: The tasks or callables to be executed concurrently. This
MUST not be ``None`` or empty. If callables are given, they MUST
accept at least one argument but have at MOST one required
argument.
:param wait_for_completion: Whether ``execute`` should block and wait
for all the given tasks to complete execution. Defaults to
``True``.
Expand Down Expand Up @@ -509,11 +524,11 @@ def execute(self, an_input: _IT) -> Iterable[Future[_OT]]:

@staticmethod
def _accumulate(
partial_results: MutableSequence[Future[_OT]],
task_output: Future[_OT],
scheduled_tasks: MutableSequence[Future[_OT]],
new_submission: Future[_OT],
) -> MutableSequence[Future[_OT]]:
partial_results.append(task_output)
return partial_results
scheduled_tasks.append(new_submission)
return scheduled_tasks

def _do_execute_task(self, task: Task[_IT, _OT], an_input: _IT) -> _OT:
"""Execute an individual :class:`Task` with the provided input.
Expand All @@ -526,7 +541,7 @@ def _do_execute_task(self, task: Task[_IT, _OT], an_input: _IT) -> _OT:
:raises Exception: If the tasks execution encounters an exception.
"""
try:
result: _OT = task(an_input)
result: _OT = task.execute(an_input)
except Exception as exp:
self._logger.error(
"Error while executing tasks of type='%s'.",
Expand All @@ -547,13 +562,17 @@ def _do_execute_task(self, task: Task[_IT, _OT], an_input: _IT) -> _OT:

@final
class _OfCallable(Task[_IT, _OT]):
__slots__ = ("_source_callable",)
__slots__ = ("_source_callable", "__dict__")

def __init__(self, source_callable: Callable[[_IT], _OT]):
super().__init__()
ensure_not_none(source_callable, "'source_callable' MUST not be None.")
ensure_callable(
source_callable,
message="'source_callable' MUST be a callable object.",
)
self._source_callable: Callable[[_IT], _OT]
self._source_callable = source_callable
update_wrapper(self, self._source_callable)

@override
def execute(self, an_input: _IT) -> _OT:
Expand Down
19 changes: 11 additions & 8 deletions test/sghi/task_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,10 @@ def test_task_decorator_fails_on_a_none_input_value() -> None:
:func:`task` should raise a :exc:`ValueError` when given a ``None`` value.
"""

with pytest.raises(ValueError, match="MUST not be None.") as exc_info:
with pytest.raises(ValueError, match="callable object") as exc_info:
task(None) # type: ignore

assert exc_info.value.args[0] == "The given callable MUST not be None."
assert exc_info.value.args[0] == "A callable object is required."


def test_task_decorator_returns_correct_value() -> None:
Expand Down Expand Up @@ -109,10 +109,10 @@ def test_instantiation_with_a_none_input_fails(self) -> None:
:class:`consume` constructor should raise ``ValueError`` when given a
``None`` input.
"""
with pytest.raises(ValueError, match="MUST not be None.") as exc_info:
with pytest.raises(ValueError, match="MUST be a callable") as exc_info:
consume(accept=None) # type: ignore

assert exc_info.value.args[0] == "'accept' MUST not be None."
assert exc_info.value.args[0] == "'accept' MUST be a callable."

def test_execute_performs_the_expected_side_effects(self) -> None:
"""
Expand Down Expand Up @@ -217,10 +217,10 @@ def test_execute_fails_on_none_input(self) -> None:
:meth:`chain.execute` method should raise a :exc:`ValueError` when
invoked with a ``None`` argument.
"""
with pytest.raises(ValueError, match="MUST not be None.") as exc_info:
with pytest.raises(ValueError, match="MUST be a callable") as exc_info:
self._chain_of_10(None) # type: ignore

assert exc_info.value.args[0] == "'an_input' MUST not be None."
assert exc_info.value.args[0] == "'an_input' MUST be a callable."

def test_execute_return_value(self) -> None:
"""
Expand Down Expand Up @@ -492,10 +492,13 @@ def test_of_callable_fails_on_none_input_value(self) -> None:
:meth:`Task.of_callable` should raise a :exc:`ValueError` when given a
``None`` callable as input.
"""
with pytest.raises(ValueError, match="MUST not be None.") as exc_info:
with pytest.raises(ValueError, match="MUST be a callable") as exc_info:
Task.of_callable(source_callable=None) # type: ignore

assert exc_info.value.args[0] == "'source_callable' MUST not be None."
assert (
exc_info.value.args[0]
== "'source_callable' MUST be a callable object."
)

def test_of_callable_method_returns_expected_value(self) -> None:
"""
Expand Down