Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ Pros:
Python 3.7
PyPy3 3.5+

.. note:: For python 2.7/3.4/PyPy you can use versions 1.x.x

Decorators:

* `ThreadPooled` - native ``concurrent.futures.ThreadPool``.
Expand Down Expand Up @@ -77,7 +79,7 @@ Mostly it is required decorator: submit function to ThreadPoolExecutor on call.

.. note::

By default, if executor is not configured - it configures with default parameters: ``max_workers=(CPU_COUNT or 1) * 5``
By default, if executor is not configured - it configures with default parameters: ``max_workers=CPU_COUNT * 5``

.. code-block:: python

Expand Down Expand Up @@ -219,7 +221,7 @@ Post function to ``gevent.threadpool.ThreadPool``.

.. note::

By default, if executor is not configured - it configures with default parameters: ``max_workers=(CPU_COUNT or 1) * 5``
By default, if executor is not configured - it configures with default parameters: ``max_workers=CPU_COUNT * 5``

.. note::

Expand Down
2 changes: 0 additions & 2 deletions doc/source/asynciotask.rst
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ API: Decorators: `AsyncIOTask`, `asynciotask`.
.. py:module:: pooled
.. py:currentmodule:: pooled

.. note:: Python 3 only.

.. py:class:: AsyncIOTask

Wrap to asyncio.Task.
Expand Down
2 changes: 2 additions & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,9 @@ def _extension(modpath):
requires_optimization = [
_extension('threaded._class_decorator'),
_extension('threaded._base_threaded'),
_extension('threaded._asynciotask'),
_extension('threaded._threaded'),
_extension('threaded._threadpooled'),
_extension('threaded._gthreadpooled'),
]

Expand Down
14 changes: 5 additions & 9 deletions threaded/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,10 @@
import typing # noqa # pylint: disable=unused-import

# pylint: disable=no-name-in-module
from ._threaded import (
ThreadPooled,
Threaded,
AsyncIOTask,
threadpooled,
threaded,
asynciotask
)
from ._asynciotask import AsyncIOTask, asynciotask
from ._threaded import Threaded, threaded
from ._threadpooled import ThreadPooled, threadpooled


try: # pragma: no cover
from ._gthreadpooled import GThreadPooled, gthreadpooled
Expand All @@ -45,7 +41,7 @@
'gthreadpooled'
)

__version__ = '2.0.0'
__version__ = '2.0.1'
__author__ = "Alexey Stepanov"
__author_email__ = 'penguinolog@gmail.com'
__maintainers__ = {
Expand Down
209 changes: 209 additions & 0 deletions threaded/_asynciotask.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,209 @@
# Copyright 2017-2018 Alexey Stepanov aka penguinolog
##
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

"""AsyncIOTask implementation."""

import asyncio
import functools
import typing

from . import _class_decorator

__all__ = (
'AsyncIOTask',
'asynciotask',
)


class AsyncIOTask(_class_decorator.BaseDecorator):
"""Wrap to asyncio.Task."""

__slots__ = (
'__loop_getter',
'__loop_getter_need_context',
)

def __init__(
self,
func: typing.Optional[typing.Callable] = None,
*,
loop_getter: typing.Union[
typing.Callable[..., asyncio.AbstractEventLoop],
asyncio.AbstractEventLoop
] = asyncio.get_event_loop,
loop_getter_need_context: bool = False
) -> None:
"""Wrap function in future and return.

:param func: Function to wrap
:type func: typing.Optional[typing.Callable]
:param loop_getter: Method to get event loop, if wrap in asyncio task
:type loop_getter: typing.Union[
typing.Callable[..., asyncio.AbstractEventLoop],
asyncio.AbstractEventLoop
]
:param loop_getter_need_context: Loop getter requires function context
:type loop_getter_need_context: bool
"""
super(AsyncIOTask, self).__init__(func=func)
self.__loop_getter = loop_getter
self.__loop_getter_need_context = loop_getter_need_context

@property
def loop_getter(
self
) -> typing.Union[
typing.Callable[..., asyncio.AbstractEventLoop],
asyncio.AbstractEventLoop
]:
"""Loop getter.

:rtype: typing.Union[
typing.Callable[..., asyncio.AbstractEventLoop],
asyncio.AbstractEventLoop
]
"""
return self.__loop_getter

@property
def loop_getter_need_context(self) -> bool:
"""Loop getter need execution context.

:rtype: bool
"""
return self.__loop_getter_need_context

def get_loop(
self,
*args, # type: typing.Tuple
**kwargs # type: typing.Dict
) -> asyncio.AbstractEventLoop:
"""Get event loop in decorator class."""
if callable(self.loop_getter):
if self.loop_getter_need_context:
return self.loop_getter(*args, **kwargs) # pylint: disable=not-callable
return self.loop_getter() # pylint: disable=not-callable
return self.loop_getter

def _get_function_wrapper(
self,
func: typing.Callable
) -> typing.Callable[..., asyncio.Task]:
"""Here should be constructed and returned real decorator.

:param func: Wrapped function
:type func: typing.Callable
:rtype: typing.Callable[..., asyncio.Task]
"""
# pylint: disable=missing-docstring
# noinspection PyMissingOrEmptyDocstring
@functools.wraps(func)
def wrapper(
*args, # type: typing.Tuple
**kwargs # type: typing.Dict
) -> asyncio.Task:
loop = self.get_loop(*args, **kwargs) # type: ignore
return loop.create_task(func(*args, **kwargs))

# pylint: enable=missing-docstring
return wrapper

def __call__( # pylint: disable=useless-super-delegation
self,
*args: typing.Union[typing.Tuple, typing.Callable],
**kwargs: typing.Dict
) -> typing.Union[asyncio.Task, typing.Callable[..., asyncio.Task]]:
"""Callable instance."""
return super(AsyncIOTask, self).__call__(*args, **kwargs) # type: ignore

def __repr__(self) -> str:
"""For debug purposes."""
return (
"<{cls}("
"{func!r}, "
"loop_getter={self.loop_getter!r}, "
"loop_getter_need_context={self.loop_getter_need_context!r}, "
") at 0x{id:X}>".format(
cls=self.__class__.__name__,
func=self._func,
self=self,
id=id(self)
)
) # pragma: no cover


# pylint: disable=function-redefined, unused-argument
@typing.overload
def asynciotask(
func: None = None,
*,
loop_getter: typing.Union[
typing.Callable[..., asyncio.AbstractEventLoop],
asyncio.AbstractEventLoop
] = asyncio.get_event_loop,
loop_getter_need_context: bool = False
) -> AsyncIOTask:
"""Overload: no function."""
pass # pragma: no cover


@typing.overload # noqa: F811
def asynciotask(
func: typing.Callable,
*,
loop_getter: typing.Union[
typing.Callable[..., asyncio.AbstractEventLoop],
asyncio.AbstractEventLoop
] = asyncio.get_event_loop,
loop_getter_need_context: bool = False
) -> typing.Callable[..., asyncio.Task]:
"""Overload: provided function."""
pass # pragma: no cover


# pylint: enable=unused-argument
def asynciotask( # noqa: F811
func: typing.Optional[typing.Callable] = None,
*,
loop_getter: typing.Union[
typing.Callable[..., asyncio.AbstractEventLoop],
asyncio.AbstractEventLoop
] = asyncio.get_event_loop,
loop_getter_need_context: bool = False
) -> typing.Union[AsyncIOTask, typing.Callable[..., asyncio.Task]]:
"""Wrap function in future and return.

:param func: Function to wrap
:type func: typing.Optional[typing.Callable]
:param loop_getter: Method to get event loop, if wrap in asyncio task
:type loop_getter: typing.Union[
typing.Callable[..., asyncio.AbstractEventLoop],
asyncio.AbstractEventLoop
]
:param loop_getter_need_context: Loop getter requires function context
:type loop_getter_need_context: bool
:rtype: typing.Union[AsyncIOTask, typing.Callable[..., asyncio.Task]]
"""
if func is None:
return AsyncIOTask(
func=func,
loop_getter=loop_getter,
loop_getter_need_context=loop_getter_need_context
)
return AsyncIOTask( # type: ignore
func=None,
loop_getter=loop_getter,
loop_getter_need_context=loop_getter_need_context
)(func)
# pylint: enable=unexpected-keyword-arg, no-value-for-parameter, function-redefined
74 changes: 3 additions & 71 deletions threaded/_base_threaded.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,13 @@
"""Base classes for ThreadPooled and Threaded."""

import abc
import concurrent.futures
import typing

from . import _class_decorator


__all__ = (
'APIPooled',
'BasePooled',
'ThreadPoolExecutor',
)


Expand All @@ -36,6 +33,7 @@ class APIPooled(_class_decorator.BaseDecorator, metaclass=abc.ABCMeta):
__executor = None # type: typing.Optional[typing.Any]

@classmethod
@abc.abstractmethod
def configure(
cls: typing.Type['APIPooled'],
max_workers: typing.Optional[int] = None,
Expand All @@ -48,79 +46,13 @@ def configure(
raise NotImplementedError() # pragma: no cover

@classmethod
@abc.abstractmethod
def shutdown(cls: typing.Type['APIPooled']) -> None:
"""Shutdown executor."""
raise NotImplementedError() # pragma: no cover

@property
@abc.abstractmethod
def executor(self) -> typing.Any:
"""Executor instance."""
raise NotImplementedError() # pragma: no cover


class BasePooled(APIPooled, metaclass=abc.ABCMeta): # pylint: disable=abstract-method
"""Base ThreadPooled class."""

__slots__ = ()

__executor = None # type: typing.Optional[ThreadPoolExecutor]

@classmethod
def configure(
cls: typing.Type['BasePooled'],
max_workers: typing.Optional[int] = None,
) -> None:
"""Pool executor create and configure.

:param max_workers: Maximum workers
:type max_workers: typing.Optional[int]
"""
if isinstance(cls.__executor, ThreadPoolExecutor):
if cls.__executor.max_workers == max_workers:
return
cls.__executor.shutdown()

cls.__executor = ThreadPoolExecutor(
max_workers=max_workers,
)

@classmethod
def shutdown(cls: typing.Type['BasePooled']) -> None:
"""Shutdown executor."""
if cls.__executor is not None:
cls.__executor.shutdown()

@property
def executor(self) -> 'ThreadPoolExecutor':
"""Executor instance.

:rtype: ThreadPoolExecutor
"""
if not isinstance(self.__executor, ThreadPoolExecutor) or self.__executor.is_shutdown:
self.configure()
return self.__executor # type: ignore


class ThreadPoolExecutor(concurrent.futures.ThreadPoolExecutor):
"""Provide readers for protected attributes.

Simply extend concurrent.futures.ThreadPoolExecutor.
"""

__slots__ = ()

@property
def max_workers(self) -> int:
"""MaxWorkers.

:rtype: int
"""
return self._max_workers # type: ignore

@property
def is_shutdown(self) -> bool:
"""Executor shutdown state.

:rtype: bool
"""
return self._shutdown # type: ignore
Loading