diff --git a/README.rst b/README.rst index e2955a8..200e1ce 100644 --- a/README.rst +++ b/README.rst @@ -42,6 +42,8 @@ Pros: Python 3.7 PyPy3 3.5+ +.. note:: For python 2.7/3.4/PyPy you can use versions 1.x.x + Decorators: * `ThreadPooled` - native ``concurrent.futures.ThreadPool``. @@ -77,7 +79,7 @@ Mostly it is required decorator: submit function to ThreadPoolExecutor on call. .. note:: - By default, if executor is not configured - it configures with default parameters: ``max_workers=(CPU_COUNT or 1) * 5`` + By default, if executor is not configured - it configures with default parameters: ``max_workers=CPU_COUNT * 5`` .. code-block:: python @@ -219,7 +221,7 @@ Post function to ``gevent.threadpool.ThreadPool``. .. note:: - By default, if executor is not configured - it configures with default parameters: ``max_workers=(CPU_COUNT or 1) * 5`` + By default, if executor is not configured - it configures with default parameters: ``max_workers=CPU_COUNT * 5`` .. note:: diff --git a/doc/source/asynciotask.rst b/doc/source/asynciotask.rst index 3b35531..9c45e58 100644 --- a/doc/source/asynciotask.rst +++ b/doc/source/asynciotask.rst @@ -6,8 +6,6 @@ API: Decorators: `AsyncIOTask`, `asynciotask`. .. py:module:: pooled .. py:currentmodule:: pooled -.. note:: Python 3 only. - .. py:class:: AsyncIOTask Wrap to asyncio.Task. diff --git a/setup.py b/setup.py index f7aaf65..e253e80 100644 --- a/setup.py +++ b/setup.py @@ -54,7 +54,9 @@ def _extension(modpath): requires_optimization = [ _extension('threaded._class_decorator'), _extension('threaded._base_threaded'), + _extension('threaded._asynciotask'), _extension('threaded._threaded'), + _extension('threaded._threadpooled'), _extension('threaded._gthreadpooled'), ] diff --git a/threaded/__init__.py b/threaded/__init__.py index 9e961f5..7c2e4b9 100644 --- a/threaded/__init__.py +++ b/threaded/__init__.py @@ -17,14 +17,10 @@ import typing # noqa # pylint: disable=unused-import # pylint: disable=no-name-in-module -from ._threaded import ( - ThreadPooled, - Threaded, - AsyncIOTask, - threadpooled, - threaded, - asynciotask -) +from ._asynciotask import AsyncIOTask, asynciotask +from ._threaded import Threaded, threaded +from ._threadpooled import ThreadPooled, threadpooled + try: # pragma: no cover from ._gthreadpooled import GThreadPooled, gthreadpooled @@ -45,7 +41,7 @@ 'gthreadpooled' ) -__version__ = '2.0.0' +__version__ = '2.0.1' __author__ = "Alexey Stepanov" __author_email__ = 'penguinolog@gmail.com' __maintainers__ = { diff --git a/threaded/_asynciotask.py b/threaded/_asynciotask.py new file mode 100644 index 0000000..9fc1bbb --- /dev/null +++ b/threaded/_asynciotask.py @@ -0,0 +1,209 @@ +# Copyright 2017-2018 Alexey Stepanov aka penguinolog +## +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +"""AsyncIOTask implementation.""" + +import asyncio +import functools +import typing + +from . import _class_decorator + +__all__ = ( + 'AsyncIOTask', + 'asynciotask', +) + + +class AsyncIOTask(_class_decorator.BaseDecorator): + """Wrap to asyncio.Task.""" + + __slots__ = ( + '__loop_getter', + '__loop_getter_need_context', + ) + + def __init__( + self, + func: typing.Optional[typing.Callable] = None, + *, + loop_getter: typing.Union[ + typing.Callable[..., asyncio.AbstractEventLoop], + asyncio.AbstractEventLoop + ] = asyncio.get_event_loop, + loop_getter_need_context: bool = False + ) -> None: + """Wrap function in future and return. + + :param func: Function to wrap + :type func: typing.Optional[typing.Callable] + :param loop_getter: Method to get event loop, if wrap in asyncio task + :type loop_getter: typing.Union[ + typing.Callable[..., asyncio.AbstractEventLoop], + asyncio.AbstractEventLoop + ] + :param loop_getter_need_context: Loop getter requires function context + :type loop_getter_need_context: bool + """ + super(AsyncIOTask, self).__init__(func=func) + self.__loop_getter = loop_getter + self.__loop_getter_need_context = loop_getter_need_context + + @property + def loop_getter( + self + ) -> typing.Union[ + typing.Callable[..., asyncio.AbstractEventLoop], + asyncio.AbstractEventLoop + ]: + """Loop getter. + + :rtype: typing.Union[ + typing.Callable[..., asyncio.AbstractEventLoop], + asyncio.AbstractEventLoop + ] + """ + return self.__loop_getter + + @property + def loop_getter_need_context(self) -> bool: + """Loop getter need execution context. + + :rtype: bool + """ + return self.__loop_getter_need_context + + def get_loop( + self, + *args, # type: typing.Tuple + **kwargs # type: typing.Dict + ) -> asyncio.AbstractEventLoop: + """Get event loop in decorator class.""" + if callable(self.loop_getter): + if self.loop_getter_need_context: + return self.loop_getter(*args, **kwargs) # pylint: disable=not-callable + return self.loop_getter() # pylint: disable=not-callable + return self.loop_getter + + def _get_function_wrapper( + self, + func: typing.Callable + ) -> typing.Callable[..., asyncio.Task]: + """Here should be constructed and returned real decorator. + + :param func: Wrapped function + :type func: typing.Callable + :rtype: typing.Callable[..., asyncio.Task] + """ + # pylint: disable=missing-docstring + # noinspection PyMissingOrEmptyDocstring + @functools.wraps(func) + def wrapper( + *args, # type: typing.Tuple + **kwargs # type: typing.Dict + ) -> asyncio.Task: + loop = self.get_loop(*args, **kwargs) # type: ignore + return loop.create_task(func(*args, **kwargs)) + + # pylint: enable=missing-docstring + return wrapper + + def __call__( # pylint: disable=useless-super-delegation + self, + *args: typing.Union[typing.Tuple, typing.Callable], + **kwargs: typing.Dict + ) -> typing.Union[asyncio.Task, typing.Callable[..., asyncio.Task]]: + """Callable instance.""" + return super(AsyncIOTask, self).__call__(*args, **kwargs) # type: ignore + + def __repr__(self) -> str: + """For debug purposes.""" + return ( + "<{cls}(" + "{func!r}, " + "loop_getter={self.loop_getter!r}, " + "loop_getter_need_context={self.loop_getter_need_context!r}, " + ") at 0x{id:X}>".format( + cls=self.__class__.__name__, + func=self._func, + self=self, + id=id(self) + ) + ) # pragma: no cover + + +# pylint: disable=function-redefined, unused-argument +@typing.overload +def asynciotask( + func: None = None, + *, + loop_getter: typing.Union[ + typing.Callable[..., asyncio.AbstractEventLoop], + asyncio.AbstractEventLoop + ] = asyncio.get_event_loop, + loop_getter_need_context: bool = False +) -> AsyncIOTask: + """Overload: no function.""" + pass # pragma: no cover + + +@typing.overload # noqa: F811 +def asynciotask( + func: typing.Callable, + *, + loop_getter: typing.Union[ + typing.Callable[..., asyncio.AbstractEventLoop], + asyncio.AbstractEventLoop + ] = asyncio.get_event_loop, + loop_getter_need_context: bool = False +) -> typing.Callable[..., asyncio.Task]: + """Overload: provided function.""" + pass # pragma: no cover + + +# pylint: enable=unused-argument +def asynciotask( # noqa: F811 + func: typing.Optional[typing.Callable] = None, + *, + loop_getter: typing.Union[ + typing.Callable[..., asyncio.AbstractEventLoop], + asyncio.AbstractEventLoop + ] = asyncio.get_event_loop, + loop_getter_need_context: bool = False +) -> typing.Union[AsyncIOTask, typing.Callable[..., asyncio.Task]]: + """Wrap function in future and return. + + :param func: Function to wrap + :type func: typing.Optional[typing.Callable] + :param loop_getter: Method to get event loop, if wrap in asyncio task + :type loop_getter: typing.Union[ + typing.Callable[..., asyncio.AbstractEventLoop], + asyncio.AbstractEventLoop + ] + :param loop_getter_need_context: Loop getter requires function context + :type loop_getter_need_context: bool + :rtype: typing.Union[AsyncIOTask, typing.Callable[..., asyncio.Task]] + """ + if func is None: + return AsyncIOTask( + func=func, + loop_getter=loop_getter, + loop_getter_need_context=loop_getter_need_context + ) + return AsyncIOTask( # type: ignore + func=None, + loop_getter=loop_getter, + loop_getter_need_context=loop_getter_need_context + )(func) +# pylint: enable=unexpected-keyword-arg, no-value-for-parameter, function-redefined diff --git a/threaded/_base_threaded.py b/threaded/_base_threaded.py index 2c6e400..28e0e33 100644 --- a/threaded/_base_threaded.py +++ b/threaded/_base_threaded.py @@ -15,7 +15,6 @@ """Base classes for ThreadPooled and Threaded.""" import abc -import concurrent.futures import typing from . import _class_decorator @@ -23,8 +22,6 @@ __all__ = ( 'APIPooled', - 'BasePooled', - 'ThreadPoolExecutor', ) @@ -36,6 +33,7 @@ class APIPooled(_class_decorator.BaseDecorator, metaclass=abc.ABCMeta): __executor = None # type: typing.Optional[typing.Any] @classmethod + @abc.abstractmethod def configure( cls: typing.Type['APIPooled'], max_workers: typing.Optional[int] = None, @@ -48,79 +46,13 @@ def configure( raise NotImplementedError() # pragma: no cover @classmethod + @abc.abstractmethod def shutdown(cls: typing.Type['APIPooled']) -> None: """Shutdown executor.""" raise NotImplementedError() # pragma: no cover @property + @abc.abstractmethod def executor(self) -> typing.Any: """Executor instance.""" raise NotImplementedError() # pragma: no cover - - -class BasePooled(APIPooled, metaclass=abc.ABCMeta): # pylint: disable=abstract-method - """Base ThreadPooled class.""" - - __slots__ = () - - __executor = None # type: typing.Optional[ThreadPoolExecutor] - - @classmethod - def configure( - cls: typing.Type['BasePooled'], - max_workers: typing.Optional[int] = None, - ) -> None: - """Pool executor create and configure. - - :param max_workers: Maximum workers - :type max_workers: typing.Optional[int] - """ - if isinstance(cls.__executor, ThreadPoolExecutor): - if cls.__executor.max_workers == max_workers: - return - cls.__executor.shutdown() - - cls.__executor = ThreadPoolExecutor( - max_workers=max_workers, - ) - - @classmethod - def shutdown(cls: typing.Type['BasePooled']) -> None: - """Shutdown executor.""" - if cls.__executor is not None: - cls.__executor.shutdown() - - @property - def executor(self) -> 'ThreadPoolExecutor': - """Executor instance. - - :rtype: ThreadPoolExecutor - """ - if not isinstance(self.__executor, ThreadPoolExecutor) or self.__executor.is_shutdown: - self.configure() - return self.__executor # type: ignore - - -class ThreadPoolExecutor(concurrent.futures.ThreadPoolExecutor): - """Provide readers for protected attributes. - - Simply extend concurrent.futures.ThreadPoolExecutor. - """ - - __slots__ = () - - @property - def max_workers(self) -> int: - """MaxWorkers. - - :rtype: int - """ - return self._max_workers # type: ignore - - @property - def is_shutdown(self) -> bool: - """Executor shutdown state. - - :rtype: bool - """ - return self._shutdown # type: ignore diff --git a/threaded/_class_decorator.py b/threaded/_class_decorator.py index c961626..272b173 100644 --- a/threaded/_class_decorator.py +++ b/threaded/_class_decorator.py @@ -17,6 +17,7 @@ """Base class for decorators.""" import abc +import asyncio import functools import typing @@ -120,6 +121,24 @@ def __call__( return wrapper(*l_args, **kwargs) return wrapper + @staticmethod + def _await_if_required(target: typing.Callable) -> typing.Callable[..., typing.Any]: + """Await result if coroutine was returned.""" + @functools.wraps(target) + def wrapper( + *args, # type: typing.Tuple + **kwargs # type: typing.Dict + ) -> typing.Any: + """Decorator/wrapper.""" + result = target(*args, **kwargs) + if asyncio.iscoroutine(result): + loop = asyncio.new_event_loop() + result = loop.run_until_complete(result) + loop.close() + return result + + return wrapper + def __repr__(self) -> str: """For debug purposes.""" return "<{cls}({func!r}) at 0x{id:X}>".format( diff --git a/threaded/_gthreadpooled.py b/threaded/_gthreadpooled.py index a4d1de0..bd1c1a2 100644 --- a/threaded/_gthreadpooled.py +++ b/threaded/_gthreadpooled.py @@ -17,7 +17,6 @@ Asyncio is supported """ -import asyncio import functools import os import typing @@ -103,28 +102,16 @@ def _get_function_wrapper( :return: wrapped coroutine or function :rtype: typing.Callable[..., gevent.event.AsyncResult] """ - # pylint: disable=missing-docstring - # noinspection PyMissingOrEmptyDocstring - @functools.wraps(func) - def await_if_required( - *args, # type: typing.Tuple - **kwargs # type: typing.Dict - ) -> typing.Any: - """Decorator/wrapper.""" - result = func(*args, **kwargs) - if asyncio.iscoroutine(result): - loop = asyncio.new_event_loop() - result = loop.run_until_complete(result) - loop.close() - return result + prepared = self._await_if_required(func) + # pylint: disable=missing-docstring # noinspection PyMissingOrEmptyDocstring - @functools.wraps(await_if_required) + @functools.wraps(prepared) def wrapper( *args, # type: typing.Tuple **kwargs # type: typing.Dict ) -> gevent.event.AsyncResult: - return self.executor.spawn(await_if_required, *args, **kwargs) + return self.executor.spawn(prepared, *args, **kwargs) # pylint: enable=missing-docstring return wrapper diff --git a/threaded/_threaded.py b/threaded/_threaded.py index 7de8307..2af054b 100644 --- a/threaded/_threaded.py +++ b/threaded/_threaded.py @@ -12,190 +12,23 @@ # License for the specific language governing permissions and limitations # under the License. -"""Python 3 threaded implementation. +"""Threaded implementation. Asyncio is supported """ -import asyncio -import concurrent.futures import functools import threading import typing -from . import _base_threaded from . import _class_decorator __all__ = ( - 'ThreadPooled', 'Threaded', - 'AsyncIOTask', - 'threadpooled', 'threaded', - 'asynciotask', ) -def _await_if_required(target: typing.Callable) -> typing.Callable[..., typing.Any]: - """Await result if coroutine was returned.""" - @functools.wraps(target) - def wrapper( - *args: typing.Tuple, - **kwargs: typing.Dict - ) -> typing.Any: - """Decorator/wrapper.""" - result = target(*args, **kwargs) - if asyncio.iscoroutine(result): - loop = asyncio.new_event_loop() - result = loop.run_until_complete(result) - loop.close() - return result - - return wrapper - - -class ThreadPooled(_base_threaded.BasePooled): - """Post function to ThreadPoolExecutor.""" - - __slots__ = ( - '__loop_getter', - '__loop_getter_need_context' - ) - - def __init__( - self, - func: typing.Optional[typing.Callable] = None, - *, - loop_getter: typing.Optional[ - typing.Union[ - typing.Callable[..., asyncio.AbstractEventLoop], - asyncio.AbstractEventLoop - ] - ] = None, - loop_getter_need_context: bool = False - ) -> None: - """Wrap function in future and return. - - :param func: function to wrap - :type func: typing.Optional[typing.Callable] - :param loop_getter: Method to get event loop, if wrap in asyncio task - :type loop_getter: typing.Union[ - None, - typing.Callable[..., asyncio.AbstractEventLoop], - asyncio.AbstractEventLoop - ] - :param loop_getter_need_context: Loop getter requires function context - :type loop_getter_need_context: bool - """ - super(ThreadPooled, self).__init__(func=func) - self.__loop_getter = loop_getter - self.__loop_getter_need_context = loop_getter_need_context - - @property - def loop_getter( - self - ) -> typing.Optional[ - typing.Union[ - typing.Callable[..., asyncio.AbstractEventLoop], - asyncio.AbstractEventLoop - ] - ]: - """Loop getter. - - :rtype: typing.Union[ - None, - typing.Callable[..., asyncio.AbstractEventLoop], - asyncio.AbstractEventLoop - ] - """ - return self.__loop_getter - - @property - def loop_getter_need_context(self) -> bool: - """Loop getter need execution context. - - :rtype: bool - """ - return self.__loop_getter_need_context - - def _get_loop( - self, - *args: typing.Tuple, - **kwargs: typing.Dict - ) -> typing.Optional[asyncio.AbstractEventLoop]: - """Get event loop in decorator class.""" - if callable(self.loop_getter): - if self.loop_getter_need_context: - return self.loop_getter(*args, **kwargs) # pylint: disable=not-callable - return self.loop_getter() # pylint: disable=not-callable - return self.loop_getter - - def _get_function_wrapper( - self, - func: typing.Callable - ) -> typing.Callable[..., typing.Union[typing.Awaitable, concurrent.futures.Future]]: - """Here should be constructed and returned real decorator. - - :param func: Wrapped function - :type func: typing.Callable - :return: wrapped coroutine or function - :rtype: typing.Callable[..., typing.Union[typing.Awaitable, concurrent.futures.Future]] - """ - prepared = _await_if_required(func) - - # pylint: disable=missing-docstring - # noinspection PyMissingOrEmptyDocstring - @functools.wraps(prepared) - def wrapper( - *args: typing.Tuple, - **kwargs: typing.Dict - ) -> typing.Union[ - typing.Awaitable, concurrent.futures.Future, - typing.Callable[..., typing.Union[typing.Awaitable, concurrent.futures.Future]] - ]: - loop = self._get_loop(*args, **kwargs) # type: ignore - - if loop is None: - return self.executor.submit(prepared, *args, **kwargs) - - return loop.run_in_executor( - self.executor, - functools.partial( - prepared, - *args, **kwargs - ) - ) - - # pylint: enable=missing-docstring - return wrapper - - def __call__( # pylint: disable=useless-super-delegation - self, - *args: typing.Union[typing.Tuple, typing.Callable], - **kwargs: typing.Dict - ) -> typing.Union[ - concurrent.futures.Future, typing.Awaitable, - typing.Callable[..., typing.Union[typing.Awaitable, concurrent.futures.Future]] - ]: - """Callable instance.""" - return super(ThreadPooled, self).__call__(*args, **kwargs) # type: ignore - - def __repr__(self) -> str: - """For debug purposes.""" - return ( - "<{cls}(" - "{func!r}, " - "loop_getter={self.loop_getter!r}, " - "loop_getter_need_context={self.loop_getter_need_context!r}, " - ") at 0x{id:X}>".format( - cls=self.__class__.__name__, - func=self._func, - self=self, - id=id(self) - ) - ) # pragma: no cover - - class Threaded(_class_decorator.BaseDecorator): """Run function in separate thread.""" @@ -281,7 +114,7 @@ def _get_function_wrapper( :return: wrapped function :rtype: typing.Callable[..., threading.Thread] """ - prepared = _await_if_required(func) + prepared = self._await_if_required(func) name = self.name if name is None: name = 'Threaded: ' + getattr( @@ -320,208 +153,8 @@ def __call__( # pylint: disable=useless-super-delegation return super(Threaded, self).__call__(*args, **kwargs) # type: ignore -class AsyncIOTask(_class_decorator.BaseDecorator): - """Wrap to asyncio.Task.""" - - __slots__ = ( - '__loop_getter', - '__loop_getter_need_context', - ) - - def __init__( - self, - func: typing.Optional[typing.Callable] = None, - *, - loop_getter: typing.Union[ - typing.Callable[..., asyncio.AbstractEventLoop], - asyncio.AbstractEventLoop - ] = asyncio.get_event_loop, - loop_getter_need_context: bool = False - ) -> None: - """Wrap function in future and return. - - :param func: Function to wrap - :type func: typing.Optional[typing.Callable] - :param loop_getter: Method to get event loop, if wrap in asyncio task - :type loop_getter: typing.Union[ - typing.Callable[..., asyncio.AbstractEventLoop], - asyncio.AbstractEventLoop - ] - :param loop_getter_need_context: Loop getter requires function context - :type loop_getter_need_context: bool - """ - super(AsyncIOTask, self).__init__(func=func) - self.__loop_getter = loop_getter - self.__loop_getter_need_context = loop_getter_need_context - - @property - def loop_getter( - self - ) -> typing.Union[ - typing.Callable[..., asyncio.AbstractEventLoop], - asyncio.AbstractEventLoop - ]: - """Loop getter. - - :rtype: typing.Union[ - typing.Callable[..., asyncio.AbstractEventLoop], - asyncio.AbstractEventLoop - ] - """ - return self.__loop_getter - - @property - def loop_getter_need_context(self) -> bool: - """Loop getter need execution context. - - :rtype: bool - """ - return self.__loop_getter_need_context - - def get_loop( - self, - *args, # type: typing.Tuple - **kwargs # type: typing.Dict - ) -> asyncio.AbstractEventLoop: - """Get event loop in decorator class.""" - if callable(self.loop_getter): - if self.loop_getter_need_context: - return self.loop_getter(*args, **kwargs) # pylint: disable=not-callable - return self.loop_getter() # pylint: disable=not-callable - return self.loop_getter - - def _get_function_wrapper( - self, - func: typing.Callable - ) -> typing.Callable[..., asyncio.Task]: - """Here should be constructed and returned real decorator. - - :param func: Wrapped function - :type func: typing.Callable - :rtype: typing.Callable[..., asyncio.Task] - """ - # pylint: disable=missing-docstring - # noinspection PyMissingOrEmptyDocstring - @functools.wraps(func) - def wrapper( - *args, # type: typing.Tuple - **kwargs # type: typing.Dict - ) -> asyncio.Task: - loop = self.get_loop(*args, **kwargs) # type: ignore - return loop.create_task(func(*args, **kwargs)) - - # pylint: enable=missing-docstring - return wrapper - - def __call__( # pylint: disable=useless-super-delegation - self, - *args: typing.Union[typing.Tuple, typing.Callable], - **kwargs: typing.Dict - ) -> typing.Union[asyncio.Task, typing.Callable[..., asyncio.Task]]: - """Callable instance.""" - return super(AsyncIOTask, self).__call__(*args, **kwargs) # type: ignore - - def __repr__(self) -> str: - """For debug purposes.""" - return ( - "<{cls}(" - "{func!r}, " - "loop_getter={self.loop_getter!r}, " - "loop_getter_need_context={self.loop_getter_need_context!r}, " - ") at 0x{id:X}>".format( - cls=self.__class__.__name__, - func=self._func, - self=self, - id=id(self) - ) - ) # pragma: no cover - - # pylint: disable=function-redefined, unused-argument @typing.overload -def threadpooled( - func: typing.Callable, - *, - loop_getter: None = None, - loop_getter_need_context: bool = False -) -> typing.Callable[..., concurrent.futures.Future]: - """Overload: function callable, no loop getter.""" - pass # pragma: no cover - - -@typing.overload # noqa: F811 -def threadpooled( - func: typing.Callable, - *, - loop_getter: typing.Union[ - typing.Callable[..., asyncio.AbstractEventLoop], - asyncio.AbstractEventLoop - ], - loop_getter_need_context: bool = False -) -> typing.Callable[..., asyncio.Task]: - """Overload: function callable, loop getter available.""" - pass # pragma: no cover - - -@typing.overload # noqa: F811 -def threadpooled( - func: None = None, - *, - loop_getter: typing.Union[ - None, - typing.Callable[..., asyncio.AbstractEventLoop], - asyncio.AbstractEventLoop - ] = None, - loop_getter_need_context: bool = False -) -> ThreadPooled: - """Overload: No function.""" - pass # pragma: no cover - - -# pylint: enable=unused-argument -# pylint: disable=unexpected-keyword-arg, no-value-for-parameter -def threadpooled( # noqa: F811 - func: typing.Optional[typing.Callable] = None, - *, - loop_getter: typing.Union[ - None, - typing.Callable[..., asyncio.AbstractEventLoop], - asyncio.AbstractEventLoop - ] = None, - loop_getter_need_context: bool = False -) -> typing.Union[ - ThreadPooled, - typing.Callable[..., typing.Union[concurrent.futures.Future, typing.Awaitable]] -]: - """Post function to ThreadPoolExecutor. - - :param func: function to wrap - :type func: typing.Optional[typing.Callable] - :param loop_getter: Method to get event loop, if wrap in asyncio task - :type loop_getter: typing.Union[ - None, - typing.Callable[..., asyncio.AbstractEventLoop], - asyncio.AbstractEventLoop - ] - :param loop_getter_need_context: Loop getter requires function context - :type loop_getter_need_context: bool - :rtype: typing.Union[ThreadPooled, typing.Callable[..., typing.Union[concurrent.futures.Future, typing.Awaitable]]] - """ - if func is None: - return ThreadPooled( - func=func, - loop_getter=loop_getter, - loop_getter_need_context=loop_getter_need_context - ) - return ThreadPooled( # type: ignore - func=None, - loop_getter=loop_getter, - loop_getter_need_context=loop_getter_need_context - )(func) - - -# pylint: disable=unused-argument -@typing.overload def threaded( name: typing.Callable, daemon: bool = False, @@ -566,69 +199,4 @@ def threaded( # noqa: F811 ) return Threaded(name=name, daemon=daemon, started=started)(func) # type: ignore return Threaded(name=name, daemon=daemon, started=started) - - -# pylint: disable=unused-argument -@typing.overload -def asynciotask( - func: None = None, - *, - loop_getter: typing.Union[ - typing.Callable[..., asyncio.AbstractEventLoop], - asyncio.AbstractEventLoop - ] = asyncio.get_event_loop, - loop_getter_need_context: bool = False -) -> AsyncIOTask: - """Overload: no function.""" - pass # pragma: no cover - - -@typing.overload # noqa: F811 -def asynciotask( - func: typing.Callable, - *, - loop_getter: typing.Union[ - typing.Callable[..., asyncio.AbstractEventLoop], - asyncio.AbstractEventLoop - ] = asyncio.get_event_loop, - loop_getter_need_context: bool = False -) -> typing.Callable[..., asyncio.Task]: - """Overload: provided function.""" - pass # pragma: no cover - - -# pylint: enable=unused-argument -def asynciotask( # noqa: F811 - func: typing.Optional[typing.Callable] = None, - *, - loop_getter: typing.Union[ - typing.Callable[..., asyncio.AbstractEventLoop], - asyncio.AbstractEventLoop - ] = asyncio.get_event_loop, - loop_getter_need_context: bool = False -) -> typing.Union[AsyncIOTask, typing.Callable[..., asyncio.Task]]: - """Wrap function in future and return. - - :param func: Function to wrap - :type func: typing.Optional[typing.Callable] - :param loop_getter: Method to get event loop, if wrap in asyncio task - :type loop_getter: typing.Union[ - typing.Callable[..., asyncio.AbstractEventLoop], - asyncio.AbstractEventLoop - ] - :param loop_getter_need_context: Loop getter requires function context - :type loop_getter_need_context: bool - :rtype: typing.Union[AsyncIOTask, typing.Callable[..., asyncio.Task]] - """ - if func is None: - return AsyncIOTask( - func=func, - loop_getter=loop_getter, - loop_getter_need_context=loop_getter_need_context - ) - return AsyncIOTask( # type: ignore - func=None, - loop_getter=loop_getter, - loop_getter_need_context=loop_getter_need_context - )(func) # pylint: enable=unexpected-keyword-arg, no-value-for-parameter, function-redefined diff --git a/threaded/_threadpooled.py b/threaded/_threadpooled.py new file mode 100644 index 0000000..c748696 --- /dev/null +++ b/threaded/_threadpooled.py @@ -0,0 +1,318 @@ +# Copyright 2017-2018 Alexey Stepanov aka penguinolog +## +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +"""ThreadPooled implementation. + +Asyncio is supported +""" + +import asyncio +import concurrent.futures +import functools +import typing + +from . import _base_threaded + +__all__ = ( + 'ThreadPooled', + 'threadpooled', +) + + +class ThreadPooled(_base_threaded.APIPooled): + """Post function to ThreadPoolExecutor.""" + + __slots__ = ( + '__loop_getter', + '__loop_getter_need_context' + ) + + __executor = None # type: typing.Optional[ThreadPoolExecutor] + + @classmethod + def configure( + cls: typing.Type['ThreadPooled'], + max_workers: typing.Optional[int] = None, + ) -> None: + """Pool executor create and configure. + + :param max_workers: Maximum workers + :type max_workers: typing.Optional[int] + """ + if isinstance(cls.__executor, ThreadPoolExecutor): + if cls.__executor.max_workers == max_workers: + return + cls.__executor.shutdown() + + cls.__executor = ThreadPoolExecutor( + max_workers=max_workers, + ) + + @classmethod + def shutdown(cls: typing.Type['ThreadPooled']) -> None: + """Shutdown executor.""" + if cls.__executor is not None: + cls.__executor.shutdown() + + @property + def executor(self) -> 'ThreadPoolExecutor': + """Executor instance. + + :rtype: ThreadPoolExecutor + """ + if not isinstance(self.__executor, ThreadPoolExecutor) or self.__executor.is_shutdown: + self.configure() + return self.__executor # type: ignore + + def __init__( + self, + func: typing.Optional[typing.Callable] = None, + *, + loop_getter: typing.Optional[ + typing.Union[ + typing.Callable[..., asyncio.AbstractEventLoop], + asyncio.AbstractEventLoop + ] + ] = None, + loop_getter_need_context: bool = False + ) -> None: + """Wrap function in future and return. + + :param func: function to wrap + :type func: typing.Optional[typing.Callable] + :param loop_getter: Method to get event loop, if wrap in asyncio task + :type loop_getter: typing.Union[ + None, + typing.Callable[..., asyncio.AbstractEventLoop], + asyncio.AbstractEventLoop + ] + :param loop_getter_need_context: Loop getter requires function context + :type loop_getter_need_context: bool + """ + super(ThreadPooled, self).__init__(func=func) + self.__loop_getter = loop_getter + self.__loop_getter_need_context = loop_getter_need_context + + @property + def loop_getter( + self + ) -> typing.Optional[ + typing.Union[ + typing.Callable[..., asyncio.AbstractEventLoop], + asyncio.AbstractEventLoop + ] + ]: + """Loop getter. + + :rtype: typing.Union[ + None, + typing.Callable[..., asyncio.AbstractEventLoop], + asyncio.AbstractEventLoop + ] + """ + return self.__loop_getter + + @property + def loop_getter_need_context(self) -> bool: + """Loop getter need execution context. + + :rtype: bool + """ + return self.__loop_getter_need_context + + def _get_loop( + self, + *args: typing.Tuple, + **kwargs: typing.Dict + ) -> typing.Optional[asyncio.AbstractEventLoop]: + """Get event loop in decorator class.""" + if callable(self.loop_getter): + if self.loop_getter_need_context: + return self.loop_getter(*args, **kwargs) # pylint: disable=not-callable + return self.loop_getter() # pylint: disable=not-callable + return self.loop_getter + + def _get_function_wrapper( + self, + func: typing.Callable + ) -> typing.Callable[..., typing.Union[typing.Awaitable, concurrent.futures.Future]]: + """Here should be constructed and returned real decorator. + + :param func: Wrapped function + :type func: typing.Callable + :return: wrapped coroutine or function + :rtype: typing.Callable[..., typing.Union[typing.Awaitable, concurrent.futures.Future]] + """ + prepared = self._await_if_required(func) + + # pylint: disable=missing-docstring + # noinspection PyMissingOrEmptyDocstring + @functools.wraps(prepared) + def wrapper( + *args: typing.Tuple, + **kwargs: typing.Dict + ) -> typing.Union[ + typing.Awaitable, concurrent.futures.Future, + typing.Callable[..., typing.Union[typing.Awaitable, concurrent.futures.Future]] + ]: + loop = self._get_loop(*args, **kwargs) # type: ignore + + if loop is None: + return self.executor.submit(prepared, *args, **kwargs) + + return loop.run_in_executor( + self.executor, + functools.partial( + prepared, + *args, **kwargs + ) + ) + + # pylint: enable=missing-docstring + return wrapper + + def __call__( # pylint: disable=useless-super-delegation + self, + *args: typing.Union[typing.Tuple, typing.Callable], + **kwargs: typing.Dict + ) -> typing.Union[ + concurrent.futures.Future, typing.Awaitable, + typing.Callable[..., typing.Union[typing.Awaitable, concurrent.futures.Future]] + ]: + """Callable instance.""" + return super(ThreadPooled, self).__call__(*args, **kwargs) # type: ignore + + def __repr__(self) -> str: + """For debug purposes.""" + return ( + "<{cls}(" + "{func!r}, " + "loop_getter={self.loop_getter!r}, " + "loop_getter_need_context={self.loop_getter_need_context!r}, " + ") at 0x{id:X}>".format( + cls=self.__class__.__name__, + func=self._func, + self=self, + id=id(self) + ) + ) # pragma: no cover + + +# pylint: disable=function-redefined, unused-argument +@typing.overload +def threadpooled( + func: typing.Callable, + *, + loop_getter: None = None, + loop_getter_need_context: bool = False +) -> typing.Callable[..., concurrent.futures.Future]: + """Overload: function callable, no loop getter.""" + pass # pragma: no cover + + +@typing.overload # noqa: F811 +def threadpooled( + func: typing.Callable, + *, + loop_getter: typing.Union[ + typing.Callable[..., asyncio.AbstractEventLoop], + asyncio.AbstractEventLoop + ], + loop_getter_need_context: bool = False +) -> typing.Callable[..., asyncio.Task]: + """Overload: function callable, loop getter available.""" + pass # pragma: no cover + + +@typing.overload # noqa: F811 +def threadpooled( + func: None = None, + *, + loop_getter: typing.Union[ + None, + typing.Callable[..., asyncio.AbstractEventLoop], + asyncio.AbstractEventLoop + ] = None, + loop_getter_need_context: bool = False +) -> ThreadPooled: + """Overload: No function.""" + pass # pragma: no cover + + +# pylint: enable=unused-argument +# pylint: disable=unexpected-keyword-arg, no-value-for-parameter +def threadpooled( # noqa: F811 + func: typing.Optional[typing.Callable] = None, + *, + loop_getter: typing.Union[ + None, + typing.Callable[..., asyncio.AbstractEventLoop], + asyncio.AbstractEventLoop + ] = None, + loop_getter_need_context: bool = False +) -> typing.Union[ + ThreadPooled, + typing.Callable[..., typing.Union[concurrent.futures.Future, typing.Awaitable]] +]: + """Post function to ThreadPoolExecutor. + + :param func: function to wrap + :type func: typing.Optional[typing.Callable] + :param loop_getter: Method to get event loop, if wrap in asyncio task + :type loop_getter: typing.Union[ + None, + typing.Callable[..., asyncio.AbstractEventLoop], + asyncio.AbstractEventLoop + ] + :param loop_getter_need_context: Loop getter requires function context + :type loop_getter_need_context: bool + :rtype: typing.Union[ThreadPooled, typing.Callable[..., typing.Union[concurrent.futures.Future, typing.Awaitable]]] + """ + if func is None: + return ThreadPooled( + func=func, + loop_getter=loop_getter, + loop_getter_need_context=loop_getter_need_context + ) + return ThreadPooled( # type: ignore + func=None, + loop_getter=loop_getter, + loop_getter_need_context=loop_getter_need_context + )(func) +# pylint: enable=unexpected-keyword-arg, no-value-for-parameter, function-redefined + + +class ThreadPoolExecutor(concurrent.futures.ThreadPoolExecutor): + """Provide readers for protected attributes. + + Simply extend concurrent.futures.ThreadPoolExecutor. + """ + + __slots__ = () + + @property + def max_workers(self) -> int: + """MaxWorkers. + + :rtype: int + """ + return self._max_workers # type: ignore + + @property + def is_shutdown(self) -> bool: + """Executor shutdown state. + + :rtype: bool + """ + return self._shutdown # type: ignore