/
test_thread_util.py
212 lines (166 loc) · 6.78 KB
/
test_thread_util.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
# Copyright 2012 10gen, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Test the thread_util module."""
import gc
import sys
import threading
import time
import unittest
from functools import partial
sys.path[0:0] = [""]
from nose.plugins.skip import SkipTest
from pymongo import thread_util
if thread_util.have_greenlet:
import greenlet
from test.utils import looplet, RendezvousThread
class TestIdent(unittest.TestCase):
"""Ensure thread_util.Ident works for threads and greenlets. This has
gotten intricate from refactoring: we have classes, Watched and Unwatched,
that implement the logic for the two child threads / greenlets. For the
greenlet case it's easy to ensure the two children are alive at once, so
we run the Watched and Unwatched logic directly. For the thread case we
mix in the RendezvousThread class so we're sure both children are alive
when they call Ident.get().
1. Store main thread's / greenlet's id
2. Start 2 child threads / greenlets
3. Store their values for Ident.get()
4. Children reach rendezvous point
5. Children call Ident.watch()
6. One of the children calls Ident.unwatch()
7. Children terminate
8. Assert that children got different ids from each other and from main,
and assert watched child's callback was executed, and that unwatched
child's callback was not
"""
def _test_ident(self, use_greenlets):
ident = thread_util.create_ident(use_greenlets)
ids = set([ident.get()])
unwatched_id = []
done = set([ident.get()]) # Start with main thread's / greenlet's id
died = set()
class Watched(object):
def __init__(self, ident):
self._ident = ident
def before_rendezvous(self):
self.my_id = self._ident.get()
ids.add(self.my_id)
def after_rendezvous(self):
assert not self._ident.watching()
self._ident.watch(lambda ref: died.add(self.my_id))
assert self._ident.watching()
done.add(self.my_id)
class Unwatched(Watched):
def before_rendezvous(self):
Watched.before_rendezvous(self)
unwatched_id.append(self.my_id)
def after_rendezvous(self):
Watched.after_rendezvous(self)
self._ident.unwatch()
assert not self._ident.watching()
if use_greenlets:
class WatchedGreenlet(Watched):
def run(self):
self.before_rendezvous()
self.after_rendezvous()
class UnwatchedGreenlet(Unwatched):
def run(self):
self.before_rendezvous()
self.after_rendezvous()
t_watched = greenlet.greenlet(WatchedGreenlet(ident).run)
t_unwatched = greenlet.greenlet(UnwatchedGreenlet(ident).run)
looplet([t_watched, t_unwatched])
else:
class WatchedThread(Watched, RendezvousThread):
def __init__(self, ident, state):
Watched.__init__(self, ident)
RendezvousThread.__init__(self, state)
class UnwatchedThread(Unwatched, RendezvousThread):
def __init__(self, ident, state):
Unwatched.__init__(self, ident)
RendezvousThread.__init__(self, state)
state = RendezvousThread.create_shared_state(2)
t_watched = WatchedThread(ident, state)
t_watched.start()
t_unwatched = UnwatchedThread(ident, state)
t_unwatched.start()
RendezvousThread.wait_for_rendezvous(state)
RendezvousThread.resume_after_rendezvous(state)
t_watched.join()
t_unwatched.join()
self.assertTrue(t_watched.passed)
self.assertTrue(t_unwatched.passed)
# Remove references, let weakref callbacks run
del t_watched
del t_unwatched
# Accessing the thread-local triggers cleanup in Python <= 2.6
# http://bugs.python.org/issue1868
ident.get()
self.assertEqual(3, len(ids))
self.assertEqual(3, len(done))
# Make sure thread is really gone
slept = 0
while not died and slept < 10:
time.sleep(1)
gc.collect()
slept += 1
self.assertEqual(1, len(died))
self.assertFalse(unwatched_id[0] in died)
def test_thread_ident(self):
self._test_ident(False)
def test_greenlet_ident(self):
if not thread_util.have_greenlet:
raise SkipTest('greenlet not installed')
self._test_ident(True)
class TestCounter(unittest.TestCase):
def _test_counter(self, use_greenlets):
counter = thread_util.Counter(use_greenlets)
self.assertEqual(0, counter.dec())
self.assertEqual(0, counter.get())
self.assertEqual(0, counter.dec())
self.assertEqual(0, counter.get())
done = set()
def f(n):
for i in xrange(n):
self.assertEqual(i, counter.get())
self.assertEqual(i + 1, counter.inc())
for i in xrange(n, 0, -1):
self.assertEqual(i, counter.get())
self.assertEqual(i - 1, counter.dec())
self.assertEqual(0, counter.get())
# Extra decrements have no effect
self.assertEqual(0, counter.dec())
self.assertEqual(0, counter.get())
self.assertEqual(0, counter.dec())
self.assertEqual(0, counter.get())
done.add(n)
if use_greenlets:
greenlets = [
greenlet.greenlet(partial(f, i)) for i in xrange(10)]
looplet(greenlets)
else:
threads = [
threading.Thread(target=partial(f, i)) for i in xrange(10)]
for t in threads:
t.start()
for t in threads:
t.join()
self.assertEqual(10, len(done))
def test_thread_counter(self):
self._test_counter(False)
def test_greenlet_counter(self):
if not thread_util.have_greenlet:
raise SkipTest('greenlet not installed')
self._test_counter(True)
if __name__ == "__main__":
unittest.main()