Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Inability for automatic token refresh when using grant_type=password #532

Open
mabuelatta-cohere opened this issue Mar 4, 2023 · 0 comments

Comments

@mabuelatta-cohere
Copy link

mabuelatta-cohere commented Mar 4, 2023

Describe the bug

Trying to authenticate and generate requests using an OAuth2Session using username/password authentication. I'm able to generate an initial token to be used in my session, however, I was trying to utilize the functionality to automatically refresh tokens by passing in the token_endpoint to automatically as stated in the documentation.

When my token would expire it would not refresh the token automatically and lead to an InvalidToken error. After looking into this further I tried to invoke the authorization to refresh and generate a new token by passing in the grant_type=password as one of the recognized metadata that gets passed into the client. However noticed that this will not properly resolve the problem as the ensure_active_token is used to automatically refresh tokens:

def ensure_active_token(self, token):
if not token.is_expired():
return True
refresh_token = token.get('refresh_token')
url = self.metadata.get('token_endpoint')
if refresh_token and url:
self.refresh_token(url, refresh_token=refresh_token)
return True
elif self.metadata.get('grant_type') == 'client_credentials':
access_token = token['access_token']
new_token = self.fetch_token(url, grant_type='client_credentials')
if self.update_token:
self.update_token(new_token, access_token=access_token)
return True

is neglecting to pass in the rest of the metadata from the session (like grant_type) to refresh_token here

def refresh_token(self, url, refresh_token=None, body='',
auth=None, headers=None, **kwargs):
"""Fetch a new access token using a refresh token.
:param url: Refresh Token endpoint, must be HTTPS.
:param refresh_token: The refresh_token to use.
:param body: Optional application/x-www-form-urlencoded body to add the
include in the token request. Prefer kwargs over body.
:param auth: An auth tuple or method as accepted by requests.
:param headers: Dict to default request headers with.
:return: A :class:`OAuth2Token` object (a dict too).
"""
session_kwargs = self._extract_session_request_params(kwargs)
refresh_token = refresh_token or self.token.get('refresh_token')
if 'scope' not in kwargs and self.scope:
kwargs['scope'] = self.scope
body = prepare_token_request(
'refresh_token', body,
refresh_token=refresh_token, **kwargs
)
if headers is None:
headers = DEFAULT_HEADERS.copy()
for hook in self.compliance_hook['refresh_token_request']:
url, headers, body = hook(url, headers, body)
if auth is None:
auth = self.client_auth(self.token_endpoint_auth_method)
return self._refresh_token(
url, refresh_token=refresh_token, body=body, headers=headers,
auth=auth, **session_kwargs)

that only uses passed-in params to construct the body to generate a new token through prepare_token_request here:

body = prepare_token_request(
'refresh_token', body,
refresh_token=refresh_token, **kwargs
)

Error Stacks

File 
"/Users/mamdouh.abuelatta/.local/share/virtualenvs/optic-dr-ETdtnlpf/lib/python3.8/site-packages/authlib/integrations/requests_client/oauth2_session.py
", line 109, in request
    return super(OAuth2Session, self).request(
  File 
"/Users/mamdouh.abuelatta/.local/share/virtualenvs/optic-dr-ETdtnlpf/lib/python3.8/site-packages/requests/sessions.py
", line 528, in request
    prep = self.prepare_request(req)
  File 
"/Users/mamdouh.abuelatta/.local/share/virtualenvs/optic-dr-ETdtnlpf/lib/python3.8/site-packages/requests/sessions.py
", line 456, in prepare_request
    p.prepare(
  File 
"/Users/mamdouh.abuelatta/.local/share/virtualenvs/optic-dr-ETdtnlpf/lib/python3.8/site-packages/requests/models.py
", line 320, in prepare
    self.prepare_auth(auth, url)
  File 
"/Users/mamdouh.abuelatta/.local/share/virtualenvs/optic-dr-ETdtnlpf/lib/python3.8/site-packages/requests/models.py
", line 556, in prepare_auth
    r = auth(self)
  File 
"/Users/mamdouh.abuelatta/.local/share/virtualenvs/optic-dr-ETdtnlpf/lib/python3.8/site-packages/authlib/integrations/requests_client/oauth2_session.py
", line 24, in __call__
    self.ensure_active_token()
  File 
"/Users/mamdouh.abuelatta/.local/share/virtualenvs/optic-dr-ETdtnlpf/lib/python3.8/site-packages/authlib/integrations/requests_client/oauth2_session.py
", line 21, in ensure_active_token
    raise InvalidTokenError()
authlib.integrations.base_client.errors.InvalidTokenError: token_invalid: 

During handling of the above exception, another exception occurred:

Traceback (most recent call last):

To Reproduce

A minimal example to reproduce the behavior:

  1. Initialize OAuth2Session with a valid token_endpoint and grant_type=password
  2. fetch_token with username/password
  3. Wait for(or manipulate token expiry) token to be in an expired state
  4. Use client.request to initiate a request

Expected behavior

Ability to pass grant_type=password and allow for automatic refresh token when the initial token has expired.

Environment:

  • OS: macOS Big Sur 11.7.3
  • Python Version: 3.8
  • Authlib Version: 1.2.0

Proposed Solution

A. Pass in the rest of the metadata from the session (like grant_type) to refresh_token here:

def refresh_token(self, url, refresh_token=None, body='',
auth=None, headers=None, **kwargs):
"""Fetch a new access token using a refresh token.
:param url: Refresh Token endpoint, must be HTTPS.
:param refresh_token: The refresh_token to use.
:param body: Optional application/x-www-form-urlencoded body to add the
include in the token request. Prefer kwargs over body.
:param auth: An auth tuple or method as accepted by requests.
:param headers: Dict to default request headers with.
:return: A :class:`OAuth2Token` object (a dict too).
"""
session_kwargs = self._extract_session_request_params(kwargs)
refresh_token = refresh_token or self.token.get('refresh_token')
if 'scope' not in kwargs and self.scope:
kwargs['scope'] = self.scope
body = prepare_token_request(
'refresh_token', body,
refresh_token=refresh_token, **kwargs
)
if headers is None:
headers = DEFAULT_HEADERS.copy()
for hook in self.compliance_hook['refresh_token_request']:
url, headers, body = hook(url, headers, body)
if auth is None:
auth = self.client_auth(self.token_endpoint_auth_method)
return self._refresh_token(
url, refresh_token=refresh_token, body=body, headers=headers,
auth=auth, **session_kwargs)

that will be passed-down as a params to construct the body to generate a new token through prepare_token_request here:

body = prepare_token_request(
'refresh_token', body,
refresh_token=refresh_token, **kwargs
)

B. Pass in combined params (method/session) when constructing the body to generate a new token through prepare_token_request here:

body = prepare_token_request(
'refresh_token', body,
refresh_token=refresh_token, **kwargs
)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants