From c20f6982666265f449ffb8925c751638ed490917 Mon Sep 17 00:00:00 2001 From: Matteo Cafasso Date: Mon, 20 Nov 2023 22:15:12 +0200 Subject: [PATCH] issue #112, asynchronous: add type hints to decorator functions Signed-off-by: Matteo Cafasso --- pebble/asynchronous/process.py | 54 +++++++++++++++++++++++++--------- pebble/asynchronous/thread.py | 18 ++++++++---- 2 files changed, 52 insertions(+), 20 deletions(-) diff --git a/pebble/asynchronous/process.py b/pebble/asynchronous/process.py index 5aec601..1c6003e 100644 --- a/pebble/asynchronous/process.py +++ b/pebble/asynchronous/process.py @@ -24,6 +24,7 @@ from itertools import count from functools import wraps +from typing import Any, Callable from concurrent.futures import TimeoutError from pebble.common import ProcessExpired @@ -31,7 +32,7 @@ from pebble.common import launch_process, stop_process, SLEEP_UNIT -def process(*args, **kwargs): +def process(*args, **kwargs) -> Callable: """Runs the decorated function in a concurrent process, taking care of the result and error management. @@ -75,7 +76,13 @@ def decorating_function(function): return decorating_function -def _process_wrapper(function, timeout, name, daemon, mp_context): +def _process_wrapper( + function: Callable, + timeout: float, + name: str, + daemon: bool, + mp_context: multiprocessing.context.BaseContext +) -> Callable: if isinstance(function, types.FunctionType): _register_function(function) @@ -85,7 +92,7 @@ def _process_wrapper(function, timeout, name, daemon, mp_context): start_method = 'spawn' if os.name == 'nt' else 'fork' @wraps(function) - def wrapper(*args, **kwargs): + def wrapper(*args, **kwargs) -> asyncio.Future: loop = _get_asyncio_loop() future = loop.create_future() reader, writer = mp_context.Pipe(duplex=False) @@ -109,7 +116,12 @@ def wrapper(*args, **kwargs): return wrapper -async def _worker_handler(future, worker, pipe, timeout): +async def _worker_handler( + future: asyncio.Future, + worker: multiprocessing.Process, + pipe: multiprocessing.Pipe, + timeout: float +): """Worker lifecycle manager. Waits for the worker to be perform its task, @@ -130,7 +142,11 @@ async def _worker_handler(future, worker, pipe, timeout): future.set_result(result) -async def _get_result(future, pipe, timeout): +async def _get_result( + future: asyncio.Future, + pipe: multiprocessing.Pipe, + timeout: float +) -> Any: """Waits for result and handles communication errors.""" counter = count(step=SLEEP_UNIT) @@ -138,10 +154,10 @@ async def _get_result(future, pipe, timeout): while not pipe.poll(): if timeout is not None and next(counter) >= timeout: return TimeoutError('Task Timeout', timeout) - elif future.cancelled(): + if future.cancelled(): return asyncio.CancelledError() - else: - await asyncio.sleep(SLEEP_UNIT) + + await asyncio.sleep(SLEEP_UNIT) return pipe.recv() except (EOFError, OSError): @@ -150,7 +166,12 @@ async def _get_result(future, pipe, timeout): return error -def _function_handler(function, args, kwargs, pipe): +def _function_handler( + function: Callable, + args: list, + kwargs: dict, + pipe: multiprocessing.Pipe +): """Runs the actual function in separate process and returns its result.""" signal.signal(signal.SIGINT, signal.SIG_IGN) @@ -162,7 +183,12 @@ def _function_handler(function, args, kwargs, pipe): send_result(writer, result) -def _validate_parameters(timeout, name, daemon, mp_context): +def _validate_parameters( + timeout: float, + name: str, + daemon: bool, + mp_context: multiprocessing.context.BaseContext +): if timeout is not None and not isinstance(timeout, (int, float)): raise TypeError('Timeout expected to be None or integer or float') if name is not None and not isinstance(name, str): @@ -174,7 +200,7 @@ def _validate_parameters(timeout, name, daemon, mp_context): raise TypeError('Context expected to be None or multiprocessing.context') -def _get_asyncio_loop(): +def _get_asyncio_loop() -> asyncio.BaseEventLoop: """Backwards compatible loop getter.""" try: return asyncio.get_running_loop() @@ -189,13 +215,13 @@ def _get_asyncio_loop(): _registered_functions = {} -def _register_function(function): +def _register_function(function: Callable) -> Callable: _registered_functions[function.__qualname__] = function return function -def _trampoline(name, module, *args, **kwargs): +def _trampoline(name: str, module: Any, *args, **kwargs) -> Any: """Trampoline function for decorators. Lookups the function between the registered ones; @@ -207,7 +233,7 @@ def _trampoline(name, module, *args, **kwargs): return function(*args, **kwargs) -def _function_lookup(name, module): +def _function_lookup(name: str, module: Any) -> Callable: """Searches the function between the registered ones. If not found, it imports the module forcing its registration. diff --git a/pebble/asynchronous/thread.py b/pebble/asynchronous/thread.py index e158cb0..23a45ab 100644 --- a/pebble/asynchronous/thread.py +++ b/pebble/asynchronous/thread.py @@ -16,13 +16,14 @@ import asyncio +from typing import Callable from functools import wraps from traceback import format_exc from pebble.common import launch_thread -def thread(*args, **kwargs): +def thread(*args, **kwargs) -> Callable: """Runs the decorated function within a concurrent thread, taking care of the result and error management. @@ -56,9 +57,9 @@ def decorating_function(function): return decorating_function -def _thread_wrapper(function, name, daemon): +def _thread_wrapper(function: Callable, name: str, daemon: bool) -> Callable: @wraps(function) - def wrapper(*args, **kwargs): + def wrapper(*args, **kwargs) -> asyncio.Future: loop = _get_asyncio_loop() future = loop.create_future() @@ -69,7 +70,12 @@ def wrapper(*args, **kwargs): return wrapper -def _function_handler(function, args, kwargs, future): +def _function_handler( + function: Callable, + args: list, + kwargs: dict, + future: asyncio.Future +): """Runs the actual function in separate thread and returns its result.""" loop = future.get_loop() @@ -82,14 +88,14 @@ def _function_handler(function, args, kwargs, future): loop.call_soon_threadsafe(future.set_result, result) -def _validate_parameters(name, daemon): +def _validate_parameters(name: str, daemon: bool): if name is not None and not isinstance(name, str): raise TypeError('Name expected to be None or string') if daemon is not None and not isinstance(daemon, bool): raise TypeError('Daemon expected to be None or bool') -def _get_asyncio_loop(): +def _get_asyncio_loop() -> asyncio.BaseEventLoop: """Backwards compatible loop getter.""" try: return asyncio.get_running_loop()