diff --git a/src/sghi/task/__init__.py b/src/sghi/task/__init__.py index 619f7ab..cd4fdb2 100644 --- a/src/sghi/task/__init__.py +++ b/src/sghi/task/__init__.py @@ -14,7 +14,9 @@ ) from functools import reduce from logging import Logger, getLogger -from typing import TYPE_CHECKING, Any, Generic, TypeVar, cast +from typing import TYPE_CHECKING, Any, Generic, TypeVar, cast, final + +from typing_extensions import override from ..disposable import Disposable, ResourceDisposedError from ..disposable import not_disposed as _nd_factory @@ -161,6 +163,7 @@ def of_callable(source_callable: Callable[[_IT], _OT]) -> Task[_IT, _OT]: # ============================================================================= +@final class Chain(Task[Callable[[_IT], Any], "Chain[Any]"], Generic[_IT]): """ A :class:`Task` that wraps a value and applies a transformation (or series @@ -195,6 +198,7 @@ def value(self) -> _IT: """ return self._value + @override def execute(self, an_input: Callable[[_IT], _OT]) -> Chain[_OT]: """Perform the given transformation on the wrapped value and wrap the result in a new ``Chain`` instance. @@ -212,6 +216,7 @@ def execute(self, an_input: Callable[[_IT], _OT]) -> Chain[_OT]: return Chain(bind(self._value)) +@final class Consume(Task[_IT, _IT], Generic[_IT]): """A :class:`Task` that applies an action to it's inputs. @@ -259,11 +264,13 @@ def _compose_accept(an_input: _IT) -> None: return Consume(accept=_compose_accept) + @override def execute(self, an_input: _IT) -> _IT: self._accept(an_input) return an_input +@final class Pipe(Task[_IT, _OT], Generic[_IT, _OT]): """A :class:`Task` that pipes its input through a ``Sequence`` of tasks. @@ -295,6 +302,7 @@ def tasks(self) -> Sequence[Task[Any, Any]]: """ return self._tasks + @override def execute(self, an_input: _IT) -> _OT: """ Apply the given input to the :class:`tasks ` that comprise this @@ -334,6 +342,7 @@ def execute(self, an_input: _IT) -> _OT: not_disposed = _nd_factory(exc_factory=ConcurrentExecutorDisposedError) +@final class ConcurrentExecutor( Task[_IT, Iterable[Future[_OT]]], Disposable, @@ -384,7 +393,7 @@ def __init__( wait_for_completion: bool = True, executor: Executor | None = None, ): - """Initialize a new `ConcurrentExecutor` instance with the given + """Initialize a new ``ConcurrentExecutor`` instance with the given properties. :param tasks: The tasks to be executed concurrently. This MUST not be @@ -408,6 +417,7 @@ def __init__( self._logger: Logger = getLogger(type_fqn(self.__class__)) @not_disposed + @override def __enter__(self) -> Self: super().__enter__() if not self._wait_for_completion: @@ -419,6 +429,7 @@ def __enter__(self) -> Self: return self @property + @override def is_disposed(self) -> bool: return self._is_disposed @@ -432,6 +443,7 @@ def tasks(self) -> Sequence[Task[_IT, _OT]]: """ return self._tasks + @override def dispose(self) -> None: """ Shutdown the underlying :class:`~concurrent.futures.Executor` powering @@ -463,6 +475,7 @@ def dispose(self) -> None: self._is_disposed = True @not_disposed + @override def execute(self, an_input: _IT) -> Iterable[Future[_OT]]: """Execute the tasks concurrently with the given input. @@ -532,6 +545,7 @@ def _do_execute_task(self, task: Task[_IT, _OT], an_input: _IT) -> _OT: # ============================================================================= +@final class _OfCallable(Task[_IT, _OT]): __slots__ = ("_source_callable",) @@ -541,6 +555,7 @@ def __init__(self, source_callable: Callable[[_IT], _OT]): self._source_callable: Callable[[_IT], _OT] self._source_callable = source_callable + @override def execute(self, an_input: _IT) -> _OT: return self._source_callable(an_input)