forked from Pylons/pyramid_ldap
-
Notifications
You must be signed in to change notification settings - Fork 1
/
tests.py
299 lines (249 loc) · 10.5 KB
/
tests.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
import contextlib
import unittest
import sys
from pyramid.compat import (
text_type,
text_,
)
from pyramid import testing
from pyramid.exceptions import ConfigurationError
class Test_includeme(unittest.TestCase):
def _callFUT(self, config):
from pyramid_ldap import includeme
includeme(config)
def test_it(self):
config = DummyConfig()
self._callFUT(config)
self.assertEqual(config.directives,
['ldap_setup', 'ldap_set_login_query',
'ldap_set_groups_query'])
class Test__ldap_decode(unittest.TestCase):
def _callFUT(self, val):
from pyramid_ldap import _ldap_decode
return _ldap_decode(val)
def test_decode_str(self):
result = self._callFUT('abc')
self.assertEqual(type(result), text_type)
self.assertEqual(result, text_('abc'))
def test_decode_list(self):
result = self._callFUT(['abc', 'def'])
self.assertEqual(type(result), list)
self.assertEqual(result[0], text_('abc'))
self.assertEqual(result[1], text_('def'))
def test_decode_tuple(self):
result = self._callFUT(('abc', 'def'))
self.assertEqual(type(result), tuple)
self.assertEqual(result[0], text_('abc'))
self.assertEqual(result[1], text_('def'))
def test_decode_dict(self):
import ldap
result = self._callFUT({'abc':'def'})
self.assertTrue(isinstance(result, ldap.cidict.cidict))
self.assertEqual(result[text_('abc')], text_('def'))
def test_decode_nested(self):
import ldap
result = self._callFUT({'abc':['def', 'jkl']})
self.assertTrue(isinstance(result, ldap.cidict.cidict))
self.assertEqual(result[text_('abc')], [text_('def'), text_('jkl')])
def test_undecodeable(self):
uid = b'\xdd\xafw:PuUO\x8a#\x17\xaa\xc2\xc7\x8e\xf6'
result = self._callFUT(uid)
self.assertTrue(isinstance(result, bytes))
class Test_groupfinder(unittest.TestCase):
def _callFUT(self, dn, request):
from pyramid_ldap import groupfinder
return groupfinder(dn, request)
def test_no_group_list(self):
request = testing.DummyRequest()
request.ldap_connector = DummyLDAPConnector('dn', None)
result = self._callFUT('dn', request)
self.assertEqual(result, None)
def test_with_group_list(self):
request = testing.DummyRequest()
request.ldap_connector = DummyLDAPConnector('dn', [('groupdn', None)])
result = self._callFUT('dn', request)
self.assertEqual(result, ['groupdn'])
class Test_get_ldap_connector(unittest.TestCase):
def _callFUT(self, request):
from pyramid_ldap import get_ldap_connector
return get_ldap_connector(request)
def test_no_connector(self):
request = testing.DummyRequest()
self.assertRaises(ConfigurationError, self._callFUT, request)
def test_with_connector(self):
request = testing.DummyRequest()
request.ldap_connector = True
result = self._callFUT(request)
self.assertEqual(result, True)
class Test_ldap_setup(unittest.TestCase):
def _callFUT(self, config, uri, **kw):
from pyramid_ldap import ldap_setup
return ldap_setup(config, uri, **kw)
def test_it_defaults(self):
from pyramid_ldap import Connector
config = DummyConfig()
self._callFUT(config, 'ldap://')
self.assertEqual(config.prop_name, 'ldap_connector')
self.assertEqual(config.prop_reify, True)
request = testing.DummyRequest()
self.assertEqual(config.prop(request).__class__, Connector)
class Test_ldap_set_groups_query(unittest.TestCase):
def _callFUT(self, config, base_dn, filter_tmpl, **kw):
from pyramid_ldap import ldap_set_groups_query
return ldap_set_groups_query(config, base_dn, filter_tmpl, **kw)
def test_it_defaults(self):
import ldap
config = DummyConfig()
self._callFUT(config, 'dn', 'tmpl')
self.assertEqual(config.registry.ldap_groups_query.base_dn, 'dn')
self.assertEqual(config.registry.ldap_groups_query.filter_tmpl, 'tmpl')
self.assertEqual(config.registry.ldap_groups_query.scope,
ldap.SCOPE_SUBTREE)
self.assertEqual(config.registry.ldap_groups_query.cache_period, 0)
class Test_ldap_set_login_query(unittest.TestCase):
def _callFUT(self, config, base_dn, filter_tmpl, **kw):
from pyramid_ldap import ldap_set_login_query
return ldap_set_login_query(config, base_dn, filter_tmpl, **kw)
def test_it_defaults(self):
import ldap
config = DummyConfig()
self._callFUT(config, 'dn', 'tmpl')
self.assertEqual(config.registry.ldap_login_query.base_dn, 'dn')
self.assertEqual(config.registry.ldap_login_query.filter_tmpl, 'tmpl')
self.assertEqual(config.registry.ldap_login_query.scope,
ldap.SCOPE_ONELEVEL)
self.assertEqual(config.registry.ldap_login_query.cache_period, 0)
class TestConnector(unittest.TestCase):
def _makeOne(self, registry, manager):
from pyramid_ldap import Connector
return Connector(registry, manager)
def test_authenticate_no_ldap_login_query(self):
manager = DummyManager()
inst = self._makeOne(None, manager)
self.assertRaises(ConfigurationError, inst.authenticate, None, None)
def test_authenticate_search_returns_non_one_result(self):
manager = DummyManager()
registry = Dummy()
registry.ldap_login_query = DummySearch([])
inst = self._makeOne(registry, manager)
self.assertEqual(inst.authenticate(None, None), None)
def test_authenticate_search_returns_one_result(self):
manager = DummyManager()
registry = Dummy()
registry.ldap_login_query = DummySearch([('a', 'b')])
inst = self._makeOne(registry, manager)
self.assertEqual(inst.authenticate(None, None), ('a', 'b'))
def test_authenticate_search_bind_raises(self):
import ldap
manager = DummyManager([None, ldap.LDAPError])
registry = Dummy()
registry.ldap_login_query = DummySearch([('a', 'b')])
inst = self._makeOne(registry, manager)
self.assertEqual(inst.authenticate(None, None), None)
def test_user_groups_no_ldap_groups_query(self):
manager = DummyManager()
inst = self._makeOne(None, manager)
self.assertRaises(ConfigurationError, inst.user_groups, None)
def test_user_groups_search_returns_result(self):
manager = DummyManager()
registry = Dummy()
registry.ldap_groups_query = DummySearch([('a', 'b')])
inst = self._makeOne(registry, manager)
self.assertEqual(inst.user_groups(None), [('a', 'b')])
def test_user_groups_execute_raises(self):
import ldap
manager = DummyManager()
registry = Dummy()
registry.ldap_groups_query = DummySearch([('a', 'b')], ldap.LDAPError)
inst = self._makeOne(registry, manager)
self.assertEqual(inst.user_groups(None), None)
class Test_LDAPQuery(unittest.TestCase):
def _makeOne(self, base_dn, filter_tmpl, scope, cache_period):
from pyramid_ldap import _LDAPQuery
return _LDAPQuery(base_dn, filter_tmpl, scope, cache_period)
def test_query_cache_no_rollover(self):
inst = self._makeOne(None, None, None, 1)
inst.last_timeslice = sys.maxint
inst.cache['foo'] = 'bar'
self.assertEqual(inst.query_cache('foo'), 'bar')
def test_query_cache_with_rollover(self):
inst = self._makeOne(None, None, None, 1)
inst.cache['foo'] = 'bar'
self.assertEqual(inst.query_cache('foo'), None)
self.assertEqual(inst.cache, {})
self.assertNotEqual(inst.last_timeslice, 0)
def test_execute_no_cache_period(self):
inst = self._makeOne('%(login)s', '%(login)s', None, 0)
conn = DummyConnection('abc')
result = inst.execute(conn, login='foo')
self.assertEqual(result, 'abc')
self.assertEqual(conn.arg, ('foo', None, 'foo'))
def test_execute_with_cache_period_miss(self):
inst = self._makeOne('%(login)s', '%(login)s', None, 1)
conn = DummyConnection('abc')
result = inst.execute(conn, login='foo')
self.assertEqual(result, 'abc')
self.assertEqual(conn.arg, ('foo', None, 'foo'))
def test_execute_with_cache_period_hit(self):
inst = self._makeOne('%(login)s', '%(login)s', None, 1)
inst.last_timeslice = sys.maxint
inst.cache[('foo', None, 'foo')] = 'def'
conn = DummyConnection('abc')
result = inst.execute(conn, login='foo')
self.assertEqual(result, 'def')
class DummyLDAPConnector(object):
def __init__(self, dn, group_list):
self.dn = dn
self.group_list = group_list
def user_groups(self, dn):
return self.group_list
class Dummy(object):
def __init__(self, *arg, **kw):
pass
class DummyConfig(object):
introspectable = Dummy
def __init__(self):
self.registry = Dummy()
self.directives = []
def add_directive(self, name, fn):
self.directives.append(name)
def set_request_property(self, prop, name, reify=False):
self.prop_reify = reify
self.prop_name = name
self.prop = prop
def action(self, discriminator, callable, introspectables=()):
if callable:
callable()
class DummyManager(object):
def __init__(self, with_errors=()):
self.with_errors = with_errors
@contextlib.contextmanager
def connection(self, username=None, password=None):
yield self
if self.with_errors:
e = self.with_errors.pop(0)
if e is not None:
raise e
class DummySearch(object):
def __init__(self, result, exc=None, search_after_bind=False):
self.result = result
self.exc = exc
self.search_after_bind = search_after_bind
def execute(self, conn, **kw):
if self.exc is not None:
raise self.exc
self.kw = kw
return self.result
class DummyConnection(object):
def __init__(self, result):
self.result = result
def search_s(self, *arg):
self.arg = arg
return self.result
def search_ext_s(self, *arg, **kw):
import ldap
sizelimit = kw.get('sizelimit', 0)
self.arg = arg
if sizelimit and len(self.result) > sizelimit:
raise ldap.SIZELIMIT_EXCEEDED
return self.result