-
Notifications
You must be signed in to change notification settings - Fork 45
/
test_e2e.py
307 lines (282 loc) · 13.1 KB
/
test_e2e.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this file,
# You can obtain one at http://mozilla.org/MPL/2.0/.
from base64 import urlsafe_b64decode
import hmac
import json
import jwt
import random
import string
import time
import tokenlib
import unittest
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.backends import default_backend
from fxa.core import Client
from fxa.oauth import Client as OAuthClient
from fxa.tests.utils import TestEmailAccount
from hashlib import sha256
from tokenserver.test_support import TestCase
# This is the client ID used for Firefox Desktop. The FxA team confirmed that
# this is the proper client ID to be using for these integration tests.
BROWSERID_AUDIENCE = "https://token.stage.mozaws.net"
CLIENT_ID = '5882386c6d801776'
DEFAULT_TOKEN_DURATION = 3600
FXA_ACCOUNT_STAGE_HOST = 'https://api-accounts.stage.mozaws.net'
FXA_OAUTH_STAGE_HOST = 'https://oauth.stage.mozaws.net'
PASSWORD_CHARACTERS = string.ascii_letters + string.punctuation + string.digits
PASSWORD_LENGTH = 32
SCOPE = 'https://identity.mozilla.com/apps/oldsync'
class TestE2e(TestCase, unittest.TestCase):
def setUp(self):
super(TestE2e, self).setUp()
def tearDown(self):
super(TestE2e, self).tearDown()
@classmethod
def setUpClass(cls):
# Create an ephemeral email account to use to create an FxA account
cls.acct = TestEmailAccount()
cls.client = Client(FXA_ACCOUNT_STAGE_HOST)
cls.oauth_client = OAuthClient(CLIENT_ID, None,
server_url=FXA_OAUTH_STAGE_HOST)
cls.fxa_password = cls._generate_password()
# Create an FxA account for these end-to-end tests
cls.session = cls.client.create_account(cls.acct.email,
password=cls.fxa_password)
# Loop until we receive the verification email from FxA
while not cls.acct.messages:
time.sleep(0.5)
cls.acct.fetch()
# Find the message containing the verification code and verify the
# code
for m in cls.acct.messages:
if 'x-verify-code' in m['headers']:
cls.session.verify_email_code(m['headers']['x-verify-code'])
# Create an OAuth token to be used for the end-to-end tests
cls.oauth_token = cls.oauth_client.authorize_token(cls.session, SCOPE)
cls.browserid_assertion = \
cls.session.get_identity_assertion(BROWSERID_AUDIENCE)
@classmethod
def tearDownClass(cls):
cls.acct.clear()
cls.client.destroy_account(cls.acct.email, cls.fxa_password)
@staticmethod
def _generate_password():
r = range(PASSWORD_LENGTH)
return ''.join(random.choice(PASSWORD_CHARACTERS) for i in r)
def _get_oauth_token_with_bad_scope(self):
bad_scope = 'bad_scope'
return self.oauth_client.authorize_token(self.session, bad_scope)
def _get_browserid_assertion_with_bad_audience(self):
bad_audience = 'badaudience.com'
return self.session.get_identity_assertion(bad_audience)
def _get_bad_token(self):
key = rsa.generate_private_key(backend=default_backend(),
public_exponent=65537,
key_size=2048)
format = serialization.PrivateFormat.TraditionalOpenSSL
algorithm = serialization.NoEncryption()
pem = key.private_bytes(encoding=serialization.Encoding.PEM,
format=format,
encryption_algorithm=algorithm)
private_key = pem.decode('utf-8')
claims = {
'sub': 'fake sub',
'iat': 12345,
'exp': 12345,
}
return jwt.encode(claims, private_key, algorithm='RS256')
def _extract_keys_changed_at_from_assertion(self, assertion):
token = assertion.split('~')[-2]
claims = jwt.decode(token, options={"verify_signature": False})
return claims['fxa-keysChangedAt']
@classmethod
def _change_password(cls):
new_password = cls._generate_password()
cls.session.change_password(cls.fxa_password, new_password)
cls.fxa_password = new_password
# Adapted from the original Tokenserver:
# https://github.com/mozilla-services/tokenserver/blob/master/tokenserver/util.py#L24
def _fxa_metrics_hash(self, value):
hasher = hmac.new(self.FXA_METRICS_HASH_SECRET.encode('utf-8'), b'',
sha256)
hasher.update(value.encode('utf-8'))
return hasher.hexdigest()
def test_unauthorized_oauth_error_status(self):
# Totally busted auth -> generic error.
headers = {
'Authorization': 'Unsupported-Auth-Scheme IHACKYOU',
'X-KeyID': '1234-qqo'
}
res = self.app.get('/1.0/sync/1.5', headers=headers, status=401)
expected_error_response = {
'errors': [
{
'description': 'Unsupported',
'location': 'body',
'name': ''
}
],
'status': 'error'
}
self.assertEqual(res.json, expected_error_response)
token = self._get_bad_token()
headers = {
'Authorization': 'Bearer %s' % token,
'X-KeyID': '1234-qqo'
}
# Bad token -> 'invalid-credentials'
res = self.app.get('/1.0/sync/1.5', headers=headers, status=401)
expected_error_response = {
'errors': [
{
'description': 'Unauthorized',
'location': 'body',
'name': ''
}
],
'status': 'invalid-credentials'
}
self.assertEqual(res.json, expected_error_response)
# Untrusted scopes -> 'invalid-credentials'
token = self._get_oauth_token_with_bad_scope()
headers = {
'Authorization': 'Bearer %s' % token,
'X-KeyID': '1234-qqo'
}
res = self.app.get('/1.0/sync/1.5', headers=headers, status=401)
self.assertEqual(res.json, expected_error_response)
def test_unauthorized_browserid_error_status(self):
assertion = self._get_bad_token()
headers = {
'Authorization': 'BrowserID %s' % assertion,
'X-Client-State': 'aaaa',
}
# Bad assertion -> 'invalid-credentials'
res = self.app.get('/1.0/sync/1.5', headers=headers, status=401)
expected_error_response = {
'errors': [
{
'description': 'Unauthorized',
'location': 'body',
'name': ''
}
],
'status': 'invalid-credentials'
}
self.assertEqual(res.json, expected_error_response)
# Bad audience -> 'invalid-credentials'
assertion = self._get_browserid_assertion_with_bad_audience()
headers = {
'Authorization': 'BrowserID %s' % assertion,
'X-Client-State': 'aaaa',
}
res = self.app.get('/1.0/sync/1.5', headers=headers, status=401)
self.assertEqual(res.json, expected_error_response)
def test_valid_oauth_request(self):
oauth_token = self.oauth_token
headers = {
'Authorization': 'Bearer %s' % oauth_token,
'X-KeyID': '1234-qqo'
}
# Send a valid request, allocating a new user
res = self.app.get('/1.0/sync/1.5', headers=headers)
fxa_uid = self.session.uid
# Retrieve the user from the database
user = self._get_user(res.json['uid'])
# First, let's verify that the token we received is valid. To do this,
# we can unpack the hawk header ID into the payload and its signature
# and then construct a tokenlib token to compute the signature
# ourselves. To obtain a matching signature, we use the same secret as
# is used by Tokenserver.
raw = urlsafe_b64decode(res.json['id'])
payload = raw[:-32]
signature = raw[-32:]
payload_dict = json.loads(payload.decode('utf-8'))
# The `id` payload should include a field indicating the origin of the
# token
self.assertEqual(payload_dict['tokenserver_origin'], 'rust')
signing_secret = self.TOKEN_SIGNING_SECRET
expected_token = tokenlib.make_token(payload_dict,
secret=signing_secret)
expected_signature = urlsafe_b64decode(expected_token)[-32:]
# Using the #compare_digest method here is not strictly necessary, as
# this is not a security-sensitive situation, but it's good practice
self.assertTrue(hmac.compare_digest(expected_signature, signature))
# Check that the given key is a secret derived from the hawk ID
expected_secret = tokenlib.get_derived_secret(res.json['id'],
secret=signing_secret)
self.assertEqual(res.json['key'], expected_secret)
# Check to make sure the remainder of the fields are valid
self.assertEqual(res.json['uid'], user['uid'])
self.assertEqual(res.json['api_endpoint'],
'%s/1.5/%s' % (self.NODE_URL, user['uid']))
self.assertEqual(res.json['duration'], DEFAULT_TOKEN_DURATION)
self.assertEqual(res.json['hashalg'], 'sha256')
self.assertEqual(res.json['hashed_fxa_uid'],
self._fxa_metrics_hash(fxa_uid)[:32])
self.assertEqual(res.json['node_type'], 'spanner')
# The response should have an X-Timestamp header that contains the
# number of seconds since the UNIX epoch
self.assertIn('X-Timestamp', res.headers)
self.assertIsNotNone(int(res.headers['X-Timestamp']))
token = self.unsafelyParseToken(res.json['id'])
self.assertIn('hashed_device_id', token)
self.assertEqual(token["uid"], res.json["uid"])
self.assertEqual(token["fxa_uid"], fxa_uid)
self.assertEqual(token["fxa_kid"], "0000000001234-qqo")
self.assertNotEqual(token["hashed_fxa_uid"], token["fxa_uid"])
self.assertEqual(token["hashed_fxa_uid"], res.json["hashed_fxa_uid"])
self.assertIn("hashed_device_id", token)
def test_valid_browserid_request(self):
assertion = self.browserid_assertion
headers = {
'Authorization': 'BrowserID %s' % assertion,
'X-Client-State': 'aaaa'
}
# Send a valid request, allocating a new user
res = self.app.get('/1.0/sync/1.5', headers=headers)
fxa_uid = self.session.uid
# Retrieve the user from the database
user = self._get_user(res.json['uid'])
# First, let's verify that the token we received is valid. To do this,
# we can unpack the hawk header ID into the payload and its signature
# and then construct a tokenlib token to compute the signature
# ourselves. To obtain a matching signature, we use the same secret as
# is used by Tokenserver.
raw = urlsafe_b64decode(res.json['id'])
payload = raw[:-32]
signature = raw[-32:]
payload_dict = json.loads(payload.decode('utf-8'))
signing_secret = self.TOKEN_SIGNING_SECRET
expected_token = tokenlib.make_token(payload_dict,
secret=signing_secret)
expected_signature = urlsafe_b64decode(expected_token)[-32:]
# Using the #compare_digest method here is not strictly necessary, as
# this is not a security-sensitive situation, but it's good practice
self.assertTrue(hmac.compare_digest(expected_signature, signature))
# Check that the given key is a secret derived from the hawk ID
expected_secret = tokenlib.get_derived_secret(
res.json['id'], secret=signing_secret)
self.assertEqual(res.json['key'], expected_secret)
# Check to make sure the remainder of the fields are valid
self.assertEqual(res.json['uid'], user['uid'])
self.assertEqual(res.json['api_endpoint'],
'%s/1.5/%s' % (self.NODE_URL, user['uid']))
self.assertEqual(res.json['duration'], DEFAULT_TOKEN_DURATION)
self.assertEqual(res.json['hashalg'], 'sha256')
self.assertEqual(res.json['hashed_fxa_uid'],
self._fxa_metrics_hash(fxa_uid)[:32])
self.assertEqual(res.json['node_type'], 'spanner')
token = self.unsafelyParseToken(res.json['id'])
self.assertIn('hashed_device_id', token)
self.assertEqual(token["uid"], res.json["uid"])
self.assertEqual(token["fxa_uid"], fxa_uid)
assertion = self.browserid_assertion
keys_changed_at = \
self._extract_keys_changed_at_from_assertion(assertion)
self.assertEqual(token["fxa_kid"], "%s-qqo" % str(keys_changed_at))
self.assertNotEqual(token["hashed_fxa_uid"], token["fxa_uid"])
self.assertEqual(token["hashed_fxa_uid"], res.json["hashed_fxa_uid"])
self.assertIn("hashed_device_id", token)