-
Notifications
You must be signed in to change notification settings - Fork 1.2k
/
_threadworker.py
65 lines (50 loc) · 1.72 KB
/
_threadworker.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
# -*- test-case-name: twisted.threads.test.test_threadworker -*-
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.
"""
Implementation of an L{IWorker} based on native threads and queues.
"""
from zope.interface import implementer
from ._ithreads import IWorker
from ._convenience import Quit
_stop = object()
@implementer(IWorker)
class ThreadWorker(object):
"""
An L{IWorker} implemented based on a thread and a queue.
"""
def __init__(self, createThread, createQueue):
"""
@param createThread: create a L{threading.Thread} to perform work on.
@type createThread: 1-argument callable taking a 0-argument callablea
and returning a L{thrreading.Thread}
@param createQueue: Create an object like a L{Queue.Queue}, with C{put}
and C{get} methods.
@param createQueue: 0-argument callable returning a L{Queue.Queue}
"""
self._q = createQueue()
def work():
for task in iter(self._q.get, _stop):
task()
self._thread = createThread(work)
self._thread.start()
self._hasQuit = Quit()
def do(self, task):
"""
Perform the given task on a thread.
@param task: the function to call on a thread.
"""
self._hasQuit.check()
self._q.put(task)
def quit(self):
"""
Reject all future work and stop the thread started by C{__init__}.
"""
# Reject all future work. Set this _before_ enqueueing _stop, so
# that no work is ever enqueued _after_ _stop.
self._hasQuit.set()
self._q.put(_stop)
try:
self._thread.join()
except RuntimeError:
pass