-
Notifications
You must be signed in to change notification settings - Fork 11
/
futures.py
113 lines (78 loc) · 2.94 KB
/
futures.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
# encoding: utf-8
import atexit
import threading
import weakref
import math
try:
import queue
except ImportError:
import Queue as queue
try:
from concurrent import futures
except ImportError:
raise ImportError("You must install the futures package to use the dynamic scaling thread pool.")
__all__ = ['ScalingPoolExecutor']
log = __import__('logging').getLogger(__name__)
def thread_worker(executor, jobs, timeout, maximum):
i = maximum + 1
try:
while i:
i -= 1
try:
work = jobs.get(True, timeout)
if work is None:
runner = executor()
if runner is None or runner._shutdown:
log.debug("Worker instructed to shut down.")
break
del runner
continue
except queue.Empty:
log.debug("Worker death from starvation.")
break
else:
work.run()
else:
log.debug("Worker death from exhaustion.")
except:
log.critical("Unhandled exception in worker.", exc_info=True)
runner = executor()
if runner:
runner._threads.discard(threading.current_thread())
class ScalingPoolExecutor(futures.ThreadPoolExecutor):
def __init__(self, workers, divisor, timeout):
self._max_workers = workers
self.divisor = divisor
self.timeout = timeout
self._work_queue = queue.Queue()
self._threads = set()
self._shutdown = False
self._shutdown_lock = threading.Lock()
self._management_lock = threading.Lock()
atexit.register(self._atexit)
def shutdown(self, wait=True):
with self._shutdown_lock:
self._shutdown = True
for i in range(len(self._threads)):
self._work_queue.put(None)
if wait:
for thread in list(self._threads):
thread.join()
def _atexit(self):
self.shutdown(True)
def _spawn(self):
t = threading.Thread(target=thread_worker, args=(weakref.ref(self), self._work_queue, self.divisor, self.timeout))
t.daemon = True
t.start()
with self._management_lock:
self._threads.add(t)
def _adjust_thread_count(self):
pool = len(self._threads)
if pool < self._optimum_workers:
tospawn = int(self._optimum_workers - pool)
log.debug("Spawning %d thread%s." % (tospawn, tospawn != 1 and "s" or ""))
for i in range(tospawn):
self._spawn()
@property
def _optimum_workers(self):
return min(self._max_workers, math.ceil(self._work_queue.qsize() / float(self.divisor)))