forked from jupyterhub/oauthenticator
/
mocks.py
270 lines (231 loc) · 8.53 KB
/
mocks.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
"""Mocking utilities for testing"""
import json
import os
import re
import uuid
from io import BytesIO
from unittest.mock import Mock
from urllib.parse import parse_qs, urlparse
import pytest
from tornado import web
from tornado.httpclient import HTTPResponse
from tornado.httputil import HTTPServerRequest
from tornado.log import app_log
from tornado.simple_httpclient import SimpleAsyncHTTPClient
RegExpType = type(re.compile('.'))
class MockAsyncHTTPClient(SimpleAsyncHTTPClient):
"""A mock AsyncHTTPClient that allows registering handlers for mocked requests
Call .add_host to mock requests made to a given host.
"""
def initialize(self, *args, **kwargs):
super().initialize(*args, **kwargs)
self.hosts = {}
def add_host(self, host, paths):
"""Add a host whose requests should be mocked.
Args:
host (str): the host to mock (e.g. 'api.github.com')
paths (list[(str|regex, callable)]): a list of paths (or regexps for paths)
and callables to be called for those paths.
The mock handlers will receive the request as their only argument.
Mock handlers can return:
- None
- int (empty response with this status code)
- str, bytes for raw response content (status=200)
- list, dict for JSON response (status=200)
- HTTPResponse (passed unmodified)
Example::
client.add_host('api.github.com', [
('/user', lambda request: {'login': 'name'})
])
"""
self.hosts[host] = paths
def fetch_impl(self, request, response_callback):
urlinfo = urlparse(request.url)
host = urlinfo.hostname
if host not in self.hosts:
app_log.warning(f"Not mocking request to {request.url}")
return super().fetch_impl(request, response_callback)
paths = self.hosts[host]
response = None
for path_spec, handler in paths:
if isinstance(path_spec, str):
if path_spec == urlinfo.path:
response = handler(request)
break
else:
if path_spec.match(urlinfo.path):
response = handler(request)
break
if response is None:
response = HTTPResponse(request=request, code=404, reason=request.url)
elif isinstance(response, int):
response = HTTPResponse(request=request, code=response)
elif isinstance(response, bytes):
response = HTTPResponse(
request=request,
code=200,
buffer=BytesIO(response),
)
elif isinstance(response, str):
response = HTTPResponse(
request=request,
code=200,
buffer=BytesIO(response.encode('utf8')),
)
elif isinstance(response, (dict, list)):
response = HTTPResponse(
request=request,
code=200,
buffer=BytesIO(json.dumps(response).encode('utf8')),
headers={'Content-Type': 'application/json'},
)
response_callback(response)
def setup_oauth_mock(
client,
host,
access_token_path,
user_path=None,
token_type='Bearer',
token_request_style='post',
):
"""setup the mock client for OAuth
generates and registers two handlers common to OAuthenticators:
- create the access token (POST access_token_path)
- get the user info (GET user_path)
and adds a method for creating a new mock handler to pass to .authenticate():
client.handler_for_user(user)
where user is the user-model to be returned by the user request.
Args:
host (str): the host to mock (e.g. api.github.com)
access_token_path (str): The path for the access token request (e.g. /access_token)
user_path (str): The path for requesting (e.g. /user)
token_type (str): the token_type field for the provider
"""
if user_path is None and token_request_style != "jwt":
raise TypeError("user_path is required unless token_request_style is jwt")
client.oauth_codes = oauth_codes = {}
client.access_tokens = access_tokens = {}
def access_token(request):
"""Handler for access token endpoint
Checks code and allocates a new token.
Replies with JSON model for the token.
"""
assert request.method == 'POST', request.method
if token_request_style == 'json':
body = request.body.decode('utf8')
try:
body = json.loads(body)
except ValueError:
return HTTPResponse(
request=request,
code=400,
reason="Body not JSON: %r" % body,
)
else:
code = body['code']
else:
query = urlparse(request.url).query
if not query:
query = request.body.decode('utf8')
query = parse_qs(query)
if 'code' not in query:
return HTTPResponse(
request=request,
code=400,
reason=f"No code in access token request: url={request.url}, body={request.body}",
)
code = query['code'][0]
if code not in oauth_codes:
return HTTPResponse(
request=request, code=403, reason=f"No such code: {code}"
)
# consume code, allocate token
token = uuid.uuid4().hex
user = oauth_codes.pop(code)
access_tokens[token] = user
model = {
'access_token': token,
'token_type': token_type,
}
if token_request_style == 'jwt':
model['id_token'] = user['id_token']
return model
def get_user(request):
assert request.method == 'GET', request.method
auth_header = request.headers.get('Authorization')
if auth_header:
token = auth_header.split(None, 1)[1]
else:
query = parse_qs(urlparse(request.url).query)
if 'access_token' in query:
token = query['access_token'][0]
else:
return HTTPResponse(
request=request,
code=403,
reason='Missing Authorization header',
)
if token not in access_tokens:
return HTTPResponse(
request=request,
code=403,
reason='No such access token: %r' % token,
)
return access_tokens.get(token)
if isinstance(host, str):
hosts = [host]
else:
hosts = host
for host in hosts:
client.add_host(
host,
[
(access_token_path, access_token),
(user_path, get_user),
],
)
def handler_for_user(user):
"""Return a new mock RequestHandler
user should be the JSONable model that will ultimately be returned
from the get_user request.
"""
code = uuid.uuid4().hex
oauth_codes[code] = user
handler = Mock(spec=web.RequestHandler)
handler.find_user = Mock(return_value=None)
handler.get_argument = Mock(return_value=code)
handler.request = HTTPServerRequest(
method="GET", uri=f"https://hub.example.com?code={code}"
)
handler.hub = Mock(server=Mock(base_url='/hub/'), base_url='/hub/')
return handler
client.handler_for_user = handler_for_user
def mock_handler(Handler, uri='https://hub.example.com', method='GET', **settings):
"""Instantiate a Handler in a mock application"""
application = web.Application(
hub=Mock(
base_url='/hub/',
server=Mock(base_url='/hub/'),
),
cookie_secret=os.urandom(32),
db=Mock(rollback=Mock(return_value=None)),
**settings,
)
request = HTTPServerRequest(
method=method,
uri=uri,
connection=Mock(),
)
handler = Handler(
application=application,
request=request,
)
handler._transforms = []
return handler
async def no_code_test(authenticator):
"""Run a test to exercise no code in the request"""
handler = Mock(spec=web.RequestHandler)
handler.get_argument = Mock(return_value=None)
with pytest.raises(web.HTTPError) as exc:
name = await authenticator.authenticate(handler)
assert exc.value.status_code == 400