Skip to content

Commit

Permalink
Merge branch 'master' into thedrow-patch-1
Browse files Browse the repository at this point in the history
  • Loading branch information
skion committed May 22, 2018
2 parents 230e463 + a306b12 commit b7b850a
Show file tree
Hide file tree
Showing 2 changed files with 137 additions and 5 deletions.
72 changes: 69 additions & 3 deletions tests/oauth2/rfc6749/clients/test_base.py
Expand Up @@ -4,7 +4,7 @@
import datetime

from oauthlib import common
from oauthlib.oauth2 import Client, InsecureTransportError
from oauthlib.oauth2 import Client, InsecureTransportError, TokenExpiredError
from oauthlib.oauth2.rfc6749 import utils
from oauthlib.oauth2.rfc6749.clients import AUTH_HEADER, BODY, URI_QUERY

Expand Down Expand Up @@ -51,10 +51,26 @@ def test_add_bearer_token(self):
self.assertFormBodyEqual(body, self.body)
self.assertEqual(headers, self.bearer_header)

# Non-HTTPS
insecure_uri = 'http://example.com/path?query=world'
client = Client(self.client_id, access_token=self.access_token, token_type="Bearer")
self.assertRaises(InsecureTransportError, client.add_token, insecure_uri,
body=self.body,
headers=self.headers)

# Missing access token
client = Client(self.client_id)
self.assertRaises(ValueError, client.add_token, self.uri)

# Expired token
expired = 523549800
expired_token = {
'expires_at': expired,
}
client = Client(self.client_id, token=expired_token, access_token=self.access_token, token_type="Bearer")
self.assertRaises(TokenExpiredError, client.add_token, self.uri,
body=self.body, headers=self.headers)

# The default token placement, bearer in auth header
client = Client(self.client_id, access_token=self.access_token)
uri, headers, body = client.add_token(self.uri, body=self.body,
Expand Down Expand Up @@ -150,8 +166,26 @@ def test_add_mac_token(self):
self.assertEqual(uri, self.uri)
self.assertEqual(body, self.body)
self.assertEqual(headers, self.mac_00_header)
# Non-HTTPS
insecure_uri = 'http://example.com/path?query=world'
self.assertRaises(InsecureTransportError, client.add_token, insecure_uri,
body=self.body,
headers=self.headers,
issue_time=datetime.datetime.now())
# Expired Token
expired = 523549800
expired_token = {
'expires_at': expired,
}
client = Client(self.client_id, token=expired_token, token_type="MAC",
access_token=self.access_token, mac_key=self.mac_key,
mac_algorithm="hmac-sha-1")
self.assertRaises(TokenExpiredError, client.add_token, self.uri,
body=self.body,
headers=self.headers,
issue_time=datetime.datetime.now())

# Add the Authorization header (draft 00)
# Add the Authorization header (draft 01)
client = Client(self.client_id, token_type="MAC",
access_token=self.access_token, mac_key=self.mac_key,
mac_algorithm="hmac-sha-1")
Expand All @@ -160,7 +194,24 @@ def test_add_mac_token(self):
self.assertEqual(uri, self.uri)
self.assertEqual(body, self.body)
self.assertEqual(headers, self.mac_01_header)

# Non-HTTPS
insecure_uri = 'http://example.com/path?query=world'
self.assertRaises(InsecureTransportError, client.add_token, insecure_uri,
body=self.body,
headers=self.headers,
draft=1)
# Expired Token
expired = 523549800
expired_token = {
'expires_at': expired,
}
client = Client(self.client_id, token=expired_token, token_type="MAC",
access_token=self.access_token, mac_key=self.mac_key,
mac_algorithm="hmac-sha-1")
self.assertRaises(TokenExpiredError, client.add_token, self.uri,
body=self.body,
headers=self.headers,
draft=1)

def test_revocation_request(self):
client = Client(self.client_id)
Expand Down Expand Up @@ -208,6 +259,21 @@ def test_prepare_authorization_request(self):
# NotImplementedError
self.assertRaises(NotImplementedError, client.prepare_authorization_request, auth_url)

def test_prepare_token_request(self):
redirect_url = 'https://example.com/callback/'
scopes = 'read'
token_url = 'https://example.com/token/'
state = 'fake_state'

client = Client(self.client_id, scope=scopes, state=state)

# Non-HTTPS
self.assertRaises(InsecureTransportError,
client.prepare_token_request, 'http://example.com/token/')

# NotImplementedError
self.assertRaises(NotImplementedError, client.prepare_token_request, token_url)

def test_prepare_refresh_token_request(self):
client = Client(self.client_id)

Expand Down
70 changes: 68 additions & 2 deletions tests/oauth2/rfc6749/clients/test_service_application.py
Expand Up @@ -89,15 +89,81 @@ def test_request_body(self, t):
audience=self.audience,
body=self.body)
r = Request('https://a.b', body=body)
self.assertEqual(r.isnot, 'empty')
self.assertEqual(r.grant_type, ServiceApplicationClient.grant_type)
self.assertEqual(r.isnot, 'empty')
self.assertEqual(r.grant_type, ServiceApplicationClient.grant_type)

claim = jwt.decode(r.assertion, self.public_key, audience=self.audience, algorithms=['RS256'])

self.assertEqual(claim['iss'], self.issuer)
# audience verification is handled during decode now
self.assertEqual(claim['sub'], self.subject)
self.assertEqual(claim['iat'], int(t.return_value))
self.assertNotIn('nbf', claim)
self.assertNotIn('jti', claim)

# Missing issuer parameter
self.assertRaises(ValueError, client.prepare_request_body,
issuer=None, subject=self.subject, audience=self.audience, body=self.body)

# Missing subject parameter
self.assertRaises(ValueError, client.prepare_request_body,
issuer=self.issuer, subject=None, audience=self.audience, body=self.body)

# Missing audience parameter
self.assertRaises(ValueError, client.prepare_request_body,
issuer=self.issuer, subject=self.subject, audience=None, body=self.body)

# Optional kwargs
not_before = time() - 3600
jwt_id = '8zd15df4s35f43sd'
body = client.prepare_request_body(issuer=self.issuer,
subject=self.subject,
audience=self.audience,
body=self.body,
not_before=not_before,
jwt_id=jwt_id)

r = Request('https://a.b', body=body)
self.assertEqual(r.isnot, 'empty')
self.assertEqual(r.grant_type, ServiceApplicationClient.grant_type)

claim = jwt.decode(r.assertion, self.public_key, audience=self.audience, algorithms=['RS256'])

self.assertEqual(claim['iss'], self.issuer)
# audience verification is handled during decode now
self.assertEqual(claim['sub'], self.subject)
self.assertEqual(claim['iat'], int(t.return_value))
self.assertEqual(claim['nbf'], not_before)
self.assertEqual(claim['jti'], jwt_id)

@patch('time.time')
def test_request_body_no_initial_private_key(self, t):
t.return_value = time()
self.token['expires_at'] = self.token['expires_in'] + t.return_value

client = ServiceApplicationClient(
self.client_id, private_key=None)

# Basic with private key provided
body = client.prepare_request_body(issuer=self.issuer,
subject=self.subject,
audience=self.audience,
body=self.body,
private_key=self.private_key)
r = Request('https://a.b', body=body)
self.assertEqual(r.isnot, 'empty')
self.assertEqual(r.grant_type, ServiceApplicationClient.grant_type)

claim = jwt.decode(r.assertion, self.public_key, audience=self.audience, algorithms=['RS256'])

self.assertEqual(claim['iss'], self.issuer)
# audience verification is handled during decode now
self.assertEqual(claim['sub'], self.subject)
self.assertEqual(claim['iat'], int(t.return_value))

# No private key provided
self.assertRaises(ValueError, client.prepare_request_body,
issuer=self.issuer, subject=self.subject, audience=self.audience, body=self.body)

@patch('time.time')
def test_parse_token_response(self, t):
Expand Down

0 comments on commit b7b850a

Please sign in to comment.