Skip to content

Commit ceb227f

Browse files
author
Joel Collins
committed
Unit tests for core locks
1 parent 118029d commit ceb227f

File tree

1 file changed

+103
-0
lines changed

1 file changed

+103
-0
lines changed

tests/test_core_lock.py

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
from labthings.core import lock
2+
import pytest
3+
from gevent.hub import getcurrent
4+
5+
# Fixtures
6+
7+
8+
@pytest.fixture
9+
def rlock():
10+
return lock.RLock()
11+
12+
13+
@pytest.fixture
14+
def strictlock():
15+
return lock.StrictLock()
16+
17+
18+
@pytest.fixture
19+
def compositelock():
20+
return lock.CompositeLock([lock.StrictLock(), lock.StrictLock()])
21+
22+
23+
@pytest.fixture(
24+
params=[
25+
lock.RLock(),
26+
lock.StrictLock(),
27+
lock.CompositeLock([lock.StrictLock(), lock.StrictLock()]),
28+
],
29+
ids=["RLock", "StrictLock", "CompositeLock"],
30+
)
31+
def this_lock(request):
32+
return request.param
33+
34+
35+
# RLock
36+
37+
38+
def test_rlock_acquire(this_lock):
39+
# Assert no owner
40+
assert not this_lock.locked()
41+
42+
# Acquire lock
43+
assert this_lock.acquire()
44+
# Assert owner
45+
assert this_lock._is_owned()
46+
47+
# Release lock
48+
this_lock.release()
49+
50+
# Release lock, assert not held
51+
assert not this_lock.locked()
52+
53+
54+
def test_rlock_entry(this_lock):
55+
# Acquire lock
56+
with this_lock:
57+
# Assert owner
58+
assert this_lock._is_owned()
59+
60+
# Release lock, assert no owner
61+
assert not this_lock.locked()
62+
63+
64+
def test_rlock_reentry(this_lock):
65+
# Acquire lock
66+
with this_lock:
67+
# Assert owner
68+
assert this_lock._is_owned()
69+
# Assert acquirable
70+
with this_lock as acquired_return:
71+
assert acquired_return
72+
# Assert still owned
73+
assert this_lock._is_owned()
74+
75+
# Release lock, assert no owner
76+
assert not this_lock.locked()
77+
78+
79+
def test_rlock_block(this_lock):
80+
# Acquire lock
81+
assert this_lock.acquire()
82+
83+
# Override owner to force acquisition failure
84+
this_lock._owner = None
85+
print(this_lock._owner)
86+
# Assert not owner
87+
assert not this_lock._is_owned()
88+
89+
# Assert acquisition fails
90+
assert not this_lock.acquire(blocking=True, timeout=0.01)
91+
92+
# Ensure an unheld lock cannot be released
93+
with pytest.raises(RuntimeError):
94+
this_lock.release()
95+
96+
# Force ownership
97+
this_lock._owner = getcurrent()
98+
99+
# Release lock
100+
this_lock.release()
101+
102+
# Release lock, assert no owner
103+
assert not this_lock._is_owned()

0 commit comments

Comments
 (0)