Skip to content

Commit

Permalink
Merge pull request #497 from requests/pkce
Browse files Browse the repository at this point in the history
Add PKCE support with oauthlib 3.2.0
  • Loading branch information
JonathanHuot committed Feb 27, 2024
2 parents 424adf0 + 596beb5 commit 39fe529
Show file tree
Hide file tree
Showing 6 changed files with 85 additions and 1 deletion.
2 changes: 1 addition & 1 deletion HISTORY.rst
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ v1.4.0 (TBD)
- Add support for Python 3.8-3.12
- Remove support of Python 2.x, <3.7
- Migrated to Github Action

- Add PKCE support

v1.3.1 (21 January 2022)
++++++++++++++++++++++++
Expand Down
1 change: 1 addition & 0 deletions docs/examples/examples.rst
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ Examples
github
google
linkedin
native_spa_pkce_auth0
outlook
spotify
tumblr
Expand Down
20 changes: 20 additions & 0 deletions docs/examples/native_spa_pkce_auth0.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@

client_id = 'your_client_id'

authorization_base_url = "https://dev-foobar.eu.auth0.com/authorize"
token_url = "https://dev-foobar.eu.auth0.com/oauth/token"
scope = ["openid"]

from requests_oauthlib import OAuth2Session
redirect_uri = 'http://localhost:8080/callback'

session = OAuth2Session(client_id, scope=scope, redirect_uri=redirect_uri, pkce="S256")
authorization_url, state = session.authorization_url(authorization_base_url,access_type="offline")

print("Please go here and authorize:")
print(authorization_url)

redirect_response = input('Paste the full redirect URL here: ')

token = session.fetch_token(token_url, authorization_response=redirect_response, include_client_id=True)
print(token)
12 changes: 12 additions & 0 deletions docs/examples/native_spa_pkce_auth0.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
Native or SPA Tutorial with PKCE in Auth0
=========================================

Setup a new web project in the Auth0 Dashboard, (application type: Native application or Single Page Web Application)_

Note this sample is valid for any Identity Providers supporting OAuth2.0 Authorization Code with PKCE.

When you have obtained a ``client_id``, and registered
a callback URL then you can try out the command line interactive example below.

.. literalinclude:: native_spa_pkce_auth0.py
:language: python
20 changes: 20 additions & 0 deletions requests_oauthlib/oauth2_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ def __init__(
token=None,
state=None,
token_updater=None,
pkce=None,
**kwargs
):
"""Construct a new OAuth 2 client session.
Expand All @@ -72,6 +73,7 @@ def __init__(
set a TokenUpdated warning will be raised when a token
has been refreshed. This warning will carry the token
in its token argument.
:param pkce: Set "S256" or "plain" to enable PKCE. Default is disabled.
:param kwargs: Arguments to pass to the Session constructor.
"""
super(OAuth2Session, self).__init__(**kwargs)
Expand All @@ -84,6 +86,10 @@ def __init__(
self.auto_refresh_url = auto_refresh_url
self.auto_refresh_kwargs = auto_refresh_kwargs or {}
self.token_updater = token_updater
self._pkce = pkce

if self._pkce not in ["S256", "plain", None]:
raise AttributeError("Wrong value for {}(.., pkce={})".format(self.__class__, self._pkce))

# Ensure that requests doesn't do any automatic auth. See #278.
# The default behavior can be re-enabled by setting auth to None.
Expand Down Expand Up @@ -177,6 +183,13 @@ def authorization_url(self, url, state=None, **kwargs):
:return: authorization_url, state
"""
state = state or self.new_state()
if self._pkce:
self._code_verifier = self._client.create_code_verifier(43)
kwargs["code_challenge_method"] = self._pkce
kwargs["code_challenge"] = self._client.create_code_challenge(
code_verifier=self._code_verifier,
code_challenge_method=self._pkce
)
return (
self._client.prepare_request_uri(
url,
Expand Down Expand Up @@ -268,6 +281,13 @@ def fetch_token(
"Please supply either code or " "authorization_response parameters."
)

if self._pkce:
if self._code_verifier is None:
raise ValueError(
"Code verifier is not found, authorization URL must be generated before"
)
kwargs["code_verifier"] = self._code_verifier

# Earlier versions of this library build an HTTPBasicAuth header out of
# `username` and `password`. The RFC states, however these attributes
# must be in the request body and not the header.
Expand Down
31 changes: 31 additions & 0 deletions tests/test_oauth2_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,27 @@ def test_authorization_url(self):
self.assertIn(self.client_id, auth_url)
self.assertIn("response_type=token", auth_url)

def test_pkce_authorization_url(self):
url = "https://example.com/authorize?foo=bar"

web = WebApplicationClient(self.client_id)
s = OAuth2Session(client=web, pkce="S256")
auth_url, state = s.authorization_url(url)
self.assertIn(state, auth_url)
self.assertIn(self.client_id, auth_url)
self.assertIn("response_type=code", auth_url)
self.assertIn("code_challenge=", auth_url)
self.assertIn("code_challenge_method=S256", auth_url)

mobile = MobileApplicationClient(self.client_id)
s = OAuth2Session(client=mobile, pkce="S256")
auth_url, state = s.authorization_url(url)
self.assertIn(state, auth_url)
self.assertIn(self.client_id, auth_url)
self.assertIn("response_type=token", auth_url)
self.assertIn("code_challenge=", auth_url)
self.assertIn("code_challenge_method=S256", auth_url)

@mock.patch("time.time", new=lambda: fake_time)
def test_refresh_token_request(self):
self.expired_token = dict(self.token)
Expand Down Expand Up @@ -424,6 +445,16 @@ def test_web_app_fetch_token(self):
authorization_response="https://i.b/no-state?code=abc",
)

@mock.patch("time.time", new=lambda: fake_time)
def test_pkce_web_app_fetch_token(self):
url = "https://example.com/token"

web = WebApplicationClient(self.client_id, code=CODE)
sess = OAuth2Session(client=web, token=self.token, pkce="S256")
sess.send = fake_token(self.token)
sess._code_verifier = "foobar"
self.assertEqual(sess.fetch_token(url), self.token)

def test_client_id_proxy(self):
sess = OAuth2Session("test-id")
self.assertEqual(sess.client_id, "test-id")
Expand Down

0 comments on commit 39fe529

Please sign in to comment.