### OAuth2 using PKCE workflow
###### https://www.stefaanlippens.net/oauth-code-flow-pkce.html
###### 

In [2]:
import base64
import hashlib
import html
import json
import os
import re
import urllib.parse
import requests

In [3]:
# keyCloak setup
provider = "http://localhost:9090/realms/master"
client_id = "ashok-pkce-test"
username = "ashoksharma"
password = "ashok007"
redirect_uri = "http://localhost:9090"

#### Connect to authentication provider
The first phase of the flow is to connect to the OAuth/OpenID Connect provider and authenticate. For a PKCE-enabled flow we need a some PKCE ingredients from the start.

#### PKCE code verifier and challenge
We need a code verifier, which is a long enough random alphanumeric string, only to be used "client side". We'll use a simple urandom/base64 trick to generate one:

In [4]:
code_verifier = base64.urlsafe_b64encode(os.urandom(40)).decode('utf-8')
print (code_verifier)
code_verifier = re.sub('[^a-zA-Z0-9]+', '', code_verifier)
code_verifier, len(code_verifier)

3cTw6gJWWkHJKdLTJxotos-oPufawX19Rq1FHJ6TtKaH6NiCMme2CQ==


('3cTw6gJWWkHJKdLTJxotosoPufawX19Rq1FHJ6TtKaH6NiCMme2CQ', 53)

To create the PKCE code challenge we hash the code verifier with SHA256 and encode the result in URL-safe base64 (without padding)

In [5]:
code_challenge = hashlib.sha256(code_verifier.encode('utf-8')).digest()
code_challenge = base64.urlsafe_b64encode(code_challenge).decode('utf-8')
code_challenge = code_challenge.replace('=', '')
code_challenge, len(code_challenge)

('sKCQLdEgxQNppMpsht0qGS3UQ0Kdl097xJdRLrcsobc', 43)

#### Request login page
We now have all the pieces for the initial request, which will give us the login page of the authentication provider. Adding the code challenge signals to the OAuth provider that we are expecting the PKCE based flow.

In [6]:
state = "fooobarbaz"
resp = requests.get(
    url=provider + "/protocol/openid-connect/auth",
    params={
        "response_type": "code",
        "client_id": client_id,
        "scope": "openid",
        "redirect_uri": redirect_uri,
        "state": state,
        "code_challenge": code_challenge,
        "code_challenge_method": "S256",
    },
    allow_redirects=False
)
resp.status_code

200

#### Parse login page (response)
Get cookie data from response headers (requires a bit of manipulation).

In [7]:
cookie = resp.headers['Set-Cookie']
cookie = '; '.join(c.split(';')[0] for c in cookie.split(', '))
cookie

'AUTH_SESSION_ID=f353fb10-76a2-4e1e-afb1-0a09805de638; AUTH_SESSION_ID_LEGACY=f353fb10-76a2-4e1e-afb1-0a09805de638; KC_RESTART=eyJhbGciOiJIUzI1NiIsInR5cCIgOiAiSldUIiwia2lkIiA6ICJjNWIxNDQyYy0wZDFlLTRlMGMtOWUzOS0wOTY0ZDdlN2ZkNTEifQ.eyJjaWQiOiJhc2hvay1wa2NlLXRlc3QiLCJwdHkiOiJvcGVuaWQtY29ubmVjdCIsInJ1cmkiOiJodHRwOi8vbG9jYWxob3N0OjkwOTAiLCJhY3QiOiJBVVRIRU5USUNBVEUiLCJub3RlcyI6eyJzY29wZSI6Im9wZW5pZCIsImlzcyI6Imh0dHA6Ly9sb2NhbGhvc3Q6OTA5MC9yZWFsbXMvbWFzdGVyIiwicmVzcG9uc2VfdHlwZSI6ImNvZGUiLCJjb2RlX2NoYWxsZW5nZV9tZXRob2QiOiJTMjU2IiwicmVkaXJlY3RfdXJpIjoiaHR0cDovL2xvY2FsaG9zdDo5MDkwIiwic3RhdGUiOiJmb29vYmFyYmF6IiwiY29kZV9jaGFsbGVuZ2UiOiJzS0NRTGRFZ3hRTnBwTXBzaHQwcUdTM1VRMEtkbDA5N3hKZFJMcmNzb2JjIn19.EtGEENeti7oW4IZKL-Mwgg7JbC8RwjCCArd0HZLoa-M'

Extract the login URL to post to from the page HTML code. Because the the Keycloak login page is straightforward HTML we can get away with some simple regexes.

In [8]:
page = resp.text
form_action = html.unescape(re.search('<form\s+.*?\s+action="(.*?)"', page, re.DOTALL).group(1))
form_action

'http://localhost:9090/realms/master/login-actions/authenticate?session_code=z2_BDKnkIPG2pix9oMbnW6985kqfIvw0lOT9D2VGTDo&execution=f5744b73-b1c2-490d-824e-f02f475027dc&client_id=ashok-pkce-test&tab_id=SWmcChdLCn8'

In [9]:
print(page)

<!DOCTYPE html>
<html class="login-pf">

<head>
    <meta charset="utf-8">
    <meta http-equiv="Content-Type" content="text/html; charset=UTF-8" />
    <meta name="robots" content="noindex, nofollow">

            <meta name="viewport" content="width=device-width,initial-scale=1"/>
    <title>Sign in to Keycloak</title>
    <link rel="icon" href="/resources/ahw05/login/keycloak/img/favicon.ico" />
            <link href="/resources/ahw05/common/keycloak/web_modules/@patternfly/react-core/dist/styles/base.css" rel="stylesheet" />
            <link href="/resources/ahw05/common/keycloak/web_modules/@patternfly/react-core/dist/styles/app.css" rel="stylesheet" />
            <link href="/resources/ahw05/common/keycloak/node_modules/patternfly/dist/css/patternfly.min.css" rel="stylesheet" />
            <link href="/resources/ahw05/common/keycloak/node_modules/patternfly/dist/css/patternfly-additions.min.css" rel="stylesheet" />
            <link href="/resources/ahw05/common/keycloak/lib/

#### Do the login (aka authenticate)
Now, we post the login form with the user we created earlier, passing it the extracted cookie as well.

In [8]:
resp = requests.post(
    url=form_action, 
    data={
        "username": username,
        "password": password,
    }, 
    headers={"Cookie": cookie},
    allow_redirects=False
)
resp.status_code

302

In [9]:
redirect = resp.headers['Location']
redirect

'http://localhost:9090/realms/master/login-actions/required-action?execution=update_user_locale&client_id=ashok-pkce-test&tab_id=XHjmc2NUyoM'

In [10]:
assert redirect.startswith(redirect_uri)

AssertionError: 

#### Extract authorization code from redirect
The redirect URL contains the authentication code.

In [None]:
query = urllib.parse.urlparse(redirect).query
redirect_params = urllib.parse.parse_qs(query)
redirect_params

In [None]:
auth_code = redirect_params['code'][0]
auth_code

### Exchange authorization code for an access token
We can now exchange the authorization code for an access token. In the normal OAuth authorization flow we should include a static secret here, but instead we provide the code verifier here which acts proof that the initial request was done by us.

In [None]:
resp = requests.post(
    url=provider + "/protocol/openid-connect/token",
    data={
        "grant_type": "authorization_code",
        "client_id": client_id,
        "redirect_uri": redirect_uri,
        "code": auth_code,
        "code_verifier": code_verifier,
    },
    allow_redirects=False
)
resp.status_code

In the response we get, among others, the access token and id token:

In [None]:
result = resp.json()
result

### Decode the JWT tokens
The access and id tokens are JWT tokens apparently. Let's decode the payload.

In [None]:
def _b64_decode(data):
    data += '=' * (4 - len(data) % 4)
    return base64.b64decode(data).decode('utf-8')

def jwt_payload_decode(jwt):
    _, payload, _ = jwt.split('.')
    return json.loads(_b64_decode(payload))

In [None]:
jwt_payload_decode(result['access_token'])

In [None]:
jwt_payload_decode(result['id_token'])