Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
fe918e2
commit 224a420
Showing
3 changed files
with
90 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,2 +1,3 @@ | ||
from .core import OAuth1 | ||
from .oauth1_session import OAuth1Session | ||
from .oauth2_auth import OAuth2 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,34 @@ | ||
from __future__ import unicode_literals | ||
from oauthlib.oauth2 import WebApplicationClient, InsecureTransportError | ||
|
||
|
||
class OAuth2(object): | ||
"""Adds proof of authorization (OAuth2 token) to the request.""" | ||
|
||
def __init__(self, client_id=None, client=None, token=None): | ||
"""Construct a new OAuth 2 authorization object. | ||
:param client_id: Client id obtained during registration | ||
:param client: oauthlib.oauth2.Client to be used. Default is | ||
WebApplicationClient which is useful for any | ||
hosted application but not mobile or desktop. | ||
:param token: Token dictionary, must include access_token | ||
and token_type. | ||
""" | ||
self._client = client or WebApplicationClient(client_id, token=token) | ||
if token: | ||
for k, v in token.items(): | ||
setattr(self._client, k, v) | ||
|
||
def __call__(self, r): | ||
"""Append an OAuth 2 token to the request. | ||
Note that currently HTTPS is required for all requests. There may be | ||
a token type that allows for plain HTTP in the future and then this | ||
should be updated to allow plain HTTP on a white list basis. | ||
""" | ||
if not r.url.startswith('https://'): | ||
raise InsecureTransportError() | ||
r.url, r.headers, r.body = self._client.add_token(r.url, | ||
http_method=r.method, body=r.body, headers=r.headers) | ||
return r |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,55 @@ | ||
from __future__ import unicode_literals | ||
import unittest | ||
|
||
from oauthlib.oauth2 import WebApplicationClient, MobileApplicationClient | ||
from oauthlib.oauth2 import LegacyApplicationClient, BackendApplicationClient | ||
from requests import Request | ||
from requests_oauthlib import OAuth2 | ||
|
||
|
||
class OAuth2AuthTest(unittest.TestCase): | ||
|
||
def setUp(self): | ||
self.token = { | ||
'token_type': 'Bearer', | ||
'access_token': 'asdfoiw37850234lkjsdfsdf', | ||
'expires_in': '3600' | ||
} | ||
self.client_id = 'foo' | ||
self.clients = [ | ||
WebApplicationClient(self.client_id), | ||
MobileApplicationClient(self.client_id), | ||
LegacyApplicationClient(self.client_id), | ||
BackendApplicationClient(self.client_id), | ||
] | ||
|
||
def test_add_token_to_url(self): | ||
url = 'https://example.com/resource?foo=bar' | ||
new_url = url + '&access_token=' + self.token['access_token'] | ||
for client in self.clients: | ||
client.default_token_placement = 'query' | ||
auth = OAuth2(client=client, token=self.token) | ||
r = Request('GET', url, auth=auth).prepare() | ||
self.assertEqual(r.url, new_url) | ||
|
||
def test_add_token_to_headers(self): | ||
token = 'Bearer ' + self.token['access_token'] | ||
for client in self.clients: | ||
auth = OAuth2(client=client, token=self.token) | ||
r = Request('GET', 'https://i.b', auth=auth).prepare() | ||
self.assertEqual(r.headers['Authorization'], token) | ||
|
||
def test_add_token_to_body(self): | ||
body = 'foo=bar' | ||
new_body = body + '&access_token=' + self.token['access_token'] | ||
for client in self.clients: | ||
client.default_token_placement = 'body' | ||
auth = OAuth2(client=client, token=self.token) | ||
r = Request('GET', 'https://i.b', data=body, auth=auth).prepare() | ||
self.assertEqual(r.body, new_body) | ||
|
||
def test_add_nonexisting_token(self): | ||
for client in self.clients: | ||
auth = OAuth2(client=client) | ||
r = Request('GET', 'https://i.b', auth=auth) | ||
self.assertRaises(ValueError, r.prepare) |