Skip to content

Commit

Permalink
chore(task): improve sghi.task module typings (#33)
Browse files Browse the repository at this point in the history
Mark all `Task` implementations as final and mark all overridden
methods as so.
  • Loading branch information
kennedykori committed Apr 9, 2024
1 parent fdaff2e commit f5ce628
Showing 1 changed file with 17 additions and 2 deletions.
19 changes: 17 additions & 2 deletions src/sghi/task/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@
)
from functools import reduce
from logging import Logger, getLogger
from typing import TYPE_CHECKING, Any, Generic, TypeVar, cast
from typing import TYPE_CHECKING, Any, Generic, TypeVar, cast, final

from typing_extensions import override

from ..disposable import Disposable, ResourceDisposedError
from ..disposable import not_disposed as _nd_factory
Expand Down Expand Up @@ -161,6 +163,7 @@ def of_callable(source_callable: Callable[[_IT], _OT]) -> Task[_IT, _OT]:
# =============================================================================


@final
class Chain(Task[Callable[[_IT], Any], "Chain[Any]"], Generic[_IT]):
"""
A :class:`Task` that wraps a value and applies a transformation (or series
Expand Down Expand Up @@ -195,6 +198,7 @@ def value(self) -> _IT:
"""
return self._value

@override
def execute(self, an_input: Callable[[_IT], _OT]) -> Chain[_OT]:
"""Perform the given transformation on the wrapped value and wrap the
result in a new ``Chain`` instance.
Expand All @@ -212,6 +216,7 @@ def execute(self, an_input: Callable[[_IT], _OT]) -> Chain[_OT]:
return Chain(bind(self._value))


@final
class Consume(Task[_IT, _IT], Generic[_IT]):
"""A :class:`Task` that applies an action to it's inputs.
Expand Down Expand Up @@ -259,11 +264,13 @@ def _compose_accept(an_input: _IT) -> None:

return Consume(accept=_compose_accept)

@override
def execute(self, an_input: _IT) -> _IT:
self._accept(an_input)
return an_input


@final
class Pipe(Task[_IT, _OT], Generic[_IT, _OT]):
"""A :class:`Task` that pipes its input through a ``Sequence`` of tasks.
Expand Down Expand Up @@ -295,6 +302,7 @@ def tasks(self) -> Sequence[Task[Any, Any]]:
"""
return self._tasks

@override
def execute(self, an_input: _IT) -> _OT:
"""
Apply the given input to the :class:`tasks <Task>` that comprise this
Expand Down Expand Up @@ -334,6 +342,7 @@ def execute(self, an_input: _IT) -> _OT:
not_disposed = _nd_factory(exc_factory=ConcurrentExecutorDisposedError)


@final
class ConcurrentExecutor(
Task[_IT, Iterable[Future[_OT]]],
Disposable,
Expand Down Expand Up @@ -384,7 +393,7 @@ def __init__(
wait_for_completion: bool = True,
executor: Executor | None = None,
):
"""Initialize a new `ConcurrentExecutor` instance with the given
"""Initialize a new ``ConcurrentExecutor`` instance with the given
properties.
:param tasks: The tasks to be executed concurrently. This MUST not be
Expand All @@ -408,6 +417,7 @@ def __init__(
self._logger: Logger = getLogger(type_fqn(self.__class__))

@not_disposed
@override
def __enter__(self) -> Self:
super().__enter__()
if not self._wait_for_completion:
Expand All @@ -419,6 +429,7 @@ def __enter__(self) -> Self:
return self

@property
@override
def is_disposed(self) -> bool:
return self._is_disposed

Expand All @@ -432,6 +443,7 @@ def tasks(self) -> Sequence[Task[_IT, _OT]]:
"""
return self._tasks

@override
def dispose(self) -> None:
"""
Shutdown the underlying :class:`~concurrent.futures.Executor` powering
Expand Down Expand Up @@ -463,6 +475,7 @@ def dispose(self) -> None:
self._is_disposed = True

@not_disposed
@override
def execute(self, an_input: _IT) -> Iterable[Future[_OT]]:
"""Execute the tasks concurrently with the given input.
Expand Down Expand Up @@ -532,6 +545,7 @@ def _do_execute_task(self, task: Task[_IT, _OT], an_input: _IT) -> _OT:
# =============================================================================


@final
class _OfCallable(Task[_IT, _OT]):
__slots__ = ("_source_callable",)

Expand All @@ -541,6 +555,7 @@ def __init__(self, source_callable: Callable[[_IT], _OT]):
self._source_callable: Callable[[_IT], _OT]
self._source_callable = source_callable

@override
def execute(self, an_input: _IT) -> _OT:
return self._source_callable(an_input)

Expand Down

0 comments on commit f5ce628

Please sign in to comment.