/
test_challenges.py
163 lines (146 loc) · 6.2 KB
/
test_challenges.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
"""
Tests for `txacme.challenges`.
"""
from acme import challenges
from acme.jose import b64encode
from hypothesis import strategies as s
from hypothesis import example, given
from testtools import TestCase
from testtools.matchers import Contains, Equals, Is, MatchesPredicate, Not
from zope.interface.verify import verifyObject
from txacme.challenges import _MergingMappingProxy, TLSSNI01Responder
from txacme.interfaces import IResponder
from txacme.test.test_client import RSA_KEY_512, RSA_KEY_512_RAW
class ResponderTests(TestCase):
"""
`.TLSSNI01Responder` is a responder for tls-sni-01 challenges that works
with txsni.
"""
def test_interface(self):
"""
The `.IResponder` interface is correctly implemented.
"""
verifyObject(IResponder, TLSSNI01Responder())
@example(token=b'BWYcfxzmOha7-7LoxziqPZIUr99BCz3BfbN9kzSFnrU')
@given(token=s.binary(min_size=32, max_size=32).map(b64encode))
def test_stop_responding_already_stopped(self, token):
"""
Calling ``stop_responding`` when we are not responding for a server
name does nothing.
"""
challenge = challenges.TLSSNI01(token=token)
response = challenge.response(RSA_KEY_512)
responder = TLSSNI01Responder()
responder.stop_responding(response)
@example(token=b'BWYcfxzmOha7-7LoxziqPZIUr99BCz3BfbN9kzSFnrU')
@given(token=s.binary(min_size=32, max_size=32).map(b64encode))
def test_start_responding(self, token):
"""
Calling ``start_responding`` makes an appropriate entry appear in the
host map.
"""
ckey = RSA_KEY_512_RAW
challenge = challenges.TLSSNI01(token=token)
response = challenge.response(RSA_KEY_512)
server_name = response.z_domain.decode('ascii')
host_map = {}
responder = TLSSNI01Responder()
responder._generate_private_key = lambda key_type: ckey
wrapped_host_map = responder.wrap_host_map(host_map)
self.assertThat(wrapped_host_map, Not(Contains(server_name)))
responder.start_responding(response)
self.assertThat(
wrapped_host_map.get(server_name.encode('utf-8')).certificate,
MatchesPredicate(response.verify_cert, '%r does not verify'))
# Starting twice before stopping doesn't break things
responder.start_responding(response)
self.assertThat(
wrapped_host_map.get(server_name.encode('utf-8')).certificate,
MatchesPredicate(response.verify_cert, '%r does not verify'))
responder.stop_responding(response)
self.assertThat(wrapped_host_map, Not(Contains(server_name)))
class MergingProxyTests(TestCase):
"""
`._MergingMappingProxy` merges two mappings together.
"""
@example(underlay={}, overlay={}, key=u'foo')
@given(underlay=s.dictionaries(s.text(), s.builds(object)),
overlay=s.dictionaries(s.text(), s.builds(object)),
key=s.text())
def test_get_overlay(self, underlay, overlay, key):
"""
Getting an key that only exists in the overlay returns the value from
the overlay.
"""
underlay.pop(key, None)
overlay[key] = object()
proxy = _MergingMappingProxy(
overlay=overlay, underlay=underlay)
self.assertThat(proxy[key], Is(overlay[key]))
@example(underlay={}, overlay={}, key=u'foo')
@given(underlay=s.dictionaries(s.text(), s.builds(object)),
overlay=s.dictionaries(s.text(), s.builds(object)),
key=s.text())
def test_get_underlay(self, underlay, overlay, key):
"""
Getting an key that only exists in the underlay returns the value from
the underlay.
"""
underlay[key] = object()
overlay.pop(key, None)
proxy = _MergingMappingProxy(
overlay=overlay, underlay=underlay)
self.assertThat(proxy[key], Is(underlay[key]))
@example(underlay={}, overlay={}, key=u'foo')
@given(underlay=s.dictionaries(s.text(), s.builds(object)),
overlay=s.dictionaries(s.text(), s.builds(object)),
key=s.text())
def test_get_both(self, underlay, overlay, key):
"""
Getting an key that exists in both the underlay and the overlay returns
the value from the overlay.
"""
underlay[key] = object()
overlay[key] = object()
proxy = _MergingMappingProxy(
overlay=overlay, underlay=underlay)
self.assertThat(proxy[key], Not(Is(underlay[key])))
self.assertThat(proxy[key], Is(overlay[key]))
@example(underlay={u'foo': object(), u'bar': object()},
overlay={u'bar': object(), u'baz': object()})
@given(underlay=s.dictionaries(s.text(), s.builds(object)),
overlay=s.dictionaries(s.text(), s.builds(object)))
def test_len(self, underlay, overlay):
"""
``__len__`` of the proxy does not count duplicates.
"""
proxy = _MergingMappingProxy(
overlay=overlay, underlay=underlay)
self.assertThat(len(proxy), Equals(len(list(proxy))))
@example(underlay={u'foo': object(), u'bar': object()},
overlay={u'bar': object(), u'baz': object()})
@given(underlay=s.dictionaries(s.text(), s.builds(object)),
overlay=s.dictionaries(s.text(), s.builds(object)))
def test_iter(self, underlay, overlay):
"""
``__iter__`` of the proxy does not produce duplicate keys.
"""
proxy = _MergingMappingProxy(
overlay=overlay, underlay=underlay)
keys = sorted(list(proxy))
self.assertThat(keys, Equals(sorted(list(set(keys)))))
@example(underlay={u'foo': object()}, overlay={}, key=u'foo')
@example(underlay={}, overlay={}, key=u'bar')
@given(underlay=s.dictionaries(s.text(), s.builds(object)),
overlay=s.dictionaries(s.text(), s.builds(object)),
key=s.text())
def test_contains(self, underlay, overlay, key):
"""
The mapping only contains a key if it can be gotten.
"""
proxy = _MergingMappingProxy(
overlay=overlay, underlay=underlay)
self.assertThat(
key in proxy,
Equals(proxy.get(key) is not None))
__all__ = ['ResponderTests']