-
Notifications
You must be signed in to change notification settings - Fork 1
/
_gthreadpooled.py
136 lines (110 loc) · 4.78 KB
/
_gthreadpooled.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
# Copyright 2017-2018 Alexey Stepanov aka penguinolog
##
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Python 2 threaded implementation.
Uses backport of concurrent.futures.
"""
from __future__ import absolute_import
import typing # noqa # pylint: disable=unused-import
import gevent.event # type: ignore # noqa # pylint: disable=unused-import
import gevent.threadpool # type: ignore # noqa # pylint: disable=unused-import
import six
from . import _base_threaded
__all__ = (
'GThreadPooled',
'gthreadpooled',
)
class GThreadPooled(_base_threaded.APIPooled):
"""Post function to gevent.threadpool.ThreadPool."""
__slots__ = ()
__executor = None # type: typing.Optional[gevent.threadpool.ThreadPool]
@classmethod
def configure( # pylint: disable=arguments-differ
cls, # type: typing.Type[GThreadPooled]
max_workers=None, # type: typing.Optional[int]
hub=None # type: typing.Optional[gevent.hub.Hub]
): # type: (...) -> None
"""Pool executor create and configure.
:param max_workers: Maximum workers
:type max_workers: typing.Optional[int]
:param hub: Event-loop hub
:type hub: typing.Optional[gevent.hub.Hub]
"""
if max_workers is None:
max_workers = _base_threaded.cpu_count() * 5
if isinstance(cls.__executor, gevent.threadpool.ThreadPool):
if hub is None or hub == cls.__executor.hub:
if max_workers == cls.__executor.maxsize:
return # Nothing to change)
cls.__executor.maxsize = max_workers # We can use internals
return
# Hub change. Very special case.
cls.__executor.kill() # pragma: no cover
cls.__executor = gevent.threadpool.ThreadPool(
maxsize=max_workers,
hub=hub
)
@classmethod
def shutdown(cls): # type: (typing.Type[GThreadPooled]) -> None
"""Shutdown executor.
Due to not implemented method, set maxsize to 0 (do not accept new).
"""
if cls.__executor is not None:
cls.__executor.kill()
@property
def executor(self): # type: () -> gevent.threadpool.ThreadPool
"""Executor instance.
:rtype: gevent.threadpool.ThreadPool
"""
if not isinstance(self.__executor, gevent.threadpool.ThreadPool):
self.configure()
return self.__executor
def _get_function_wrapper(
self,
func # type: typing.Callable
): # type: (...) -> typing.Callable[..., gevent.event.AsyncResult]
"""Here should be constructed and returned real decorator.
:param func: Wrapped function
:type func: typing.Callable
:return: wrapped function
:rtype: typing.Callable[..., gevent.event.AsyncResult]
"""
# noinspection PyMissingOrEmptyDocstring
@six.wraps(func)
def wrapper( # pylint: disable=missing-docstring
*args, # type: typing.Any
**kwargs # type: typing.Any
): # type: (...) -> gevent.event.AsyncResult
return self.executor.spawn(func, *args, **kwargs)
return wrapper
def __call__( # pylint: disable=useless-super-delegation
self,
*args, # type: typing.Union[typing.Callable, typing.Any]
**kwargs # type: typing.Any
): # type: (...) -> typing.Union[gevent.event.AsyncResult, typing.Callable[..., gevent.event.AsyncResult]]
"""Callable instance."""
return super(GThreadPooled, self).__call__(*args, **kwargs)
# pylint: disable=unexpected-keyword-arg, no-value-for-parameter
def gthreadpooled(
func=None # type: typing.Optional[typing.Callable]
): # type: (...) -> typing.Union[GThreadPooled, typing.Callable[..., gevent.event.AsyncResult]]
"""Post function to gevent.threadpool.ThreadPool.
:param func: function to wrap
:type func: typing.Optional[typing.Callable]
:return: GThreadPooled instance, if called as function or argumented decorator, else callable wrapper
:rtype: typing.Union[GThreadPooled, typing.Callable[..., gevent.event.AsyncResult]]
"""
if func is None:
return GThreadPooled(func=func)
return GThreadPooled(func=None)(func)
# pylint: enable=unexpected-keyword-arg, no-value-for-parameter