# Playing with OpenID Connect flows against a Keykloack setup

This is a Python3 notebook that illustrates OpenID Connect flows, using a local Keycloak instance as OpenID provider.

In [1]:
import base64
import json
import logging
import uuid
import requests

In [2]:
logging.basicConfig(level=logging.INFO)

## Setup of Keycloak instance

We need a test/development Keycloak instance.
For example, spin up a local Keycloak instance with Docker as follows:

    docker run --rm -it -p 9090:8080  \
        -e KEYCLOAK_USER=admin -e KEYCLOAK_PASSWORD=admin  \
        jboss/keycloak:7.0.0

In [3]:
keycloak_base_url = "http://localhost:9090/auth"
admin_username = "admin"
admin_password = "admin"

### Keycloak Admin API

To be able to create clients and users through the Keycloak admin API, we first have to obtain an admin access token through OpenID, which we have to use a bearer token for other admin API requests. Let's wrap this stuff in a class.

In [4]:
class KeycloakAdmin:
    def __init__(self, base_url: str, username: str, password: str):
        r = requests.post(
            base_url + '/realms/master/protocol/openid-connect/token', 
            data={
                "username": username,
                "password": password,
                "grant_type": "password",
                "client_id": "admin-cli"
            })
        r.raise_for_status()
        
        self.session = requests.Session()
        self.session.headers["Authorization"]= "Bearer " + r.json()["access_token"]
        self.admin_base_url = keycloak_base_url + '/admin/realms/master'
        self.log = logging.getLogger("keycloak-admin")
    
    def create_client(self, options: dict = None, prefix: str = "myclient-") -> str:
        client_id = prefix + uuid.uuid4().hex[:8]
        data = {"id": client_id}
        data.update(options)
        self.log.info("Creating client with settings {s!r}".format(s=data))
        r = self.session.post(self.admin_base_url + '/clients', json=data)
        r.raise_for_status()
        return client_id

    def get_client_secret(self, client_id) -> str:
        r = self.session.get(self.admin_base_url + '/clients/{c}/client-secret'.format(c=client_id))
        r.raise_for_status()
        self.log.info("Client secret response: {r!r}".format(r=r.text))
        client_secret = r.json()["value"]
        return client_secret
    
    def create_user(self, prefix: str = "John-", password: str = "j0hn"):
        username = prefix + uuid.uuid4().hex[:8]

        r = self.session.post(
            self.admin_base_url + '/users', 
            json={
                "username": username,
                "credentials": [
                    {"type": "password", "value": password, "temporary": False},
                ],
                "enabled": True,
            }
        )
        r.raise_for_status()
        return username, password


# And while we're at it,
def jwt_decode(token: str):
    """Poor man's JWT decoding"""

    def _decode(data: str) -> dict:
        decoded = base64.b64decode(data + '=' * (4 - len(data) % 4)).decode('ascii')
        return json.loads(decoded)

    header, payload, signature = token.split('.')
    return _decode(header), _decode(payload)

In [5]:
keycloak_admin = KeycloakAdmin(keycloak_base_url, admin_username, admin_password)

## General OpenID

To better see what is going on the HTTP level when doing OpenID Connect request, we'll add a `requests` hook that prints a bit of request and response info.

In [6]:
from IPython.display import HTML, display

def _show_request_info(r, *args, **kwargs):
    req = r.request
    default_headers = requests.utils.default_headers()
    headers = {k:v for k,v in req.headers.items() if k not in default_headers}
    display(HTML('''<div style="padding: 1ex; border: 1px solid #ddf; background-color: #eef;">
        Did request: <code>{m} {u}</code> with <ul>
        <li>body <code>{b!r}</code></li>
        <li>headers <code>{h!r}</code></li>
        <li>&rArr; response {r}</li>
        </ul>'''.format(
        m=req.method, u=req.url, 
        b=req.body, h=headers,
        r=r.status_code
    )))

session = requests.Session()
session.hooks['response'].append(_show_request_info)

And while we're at it, define some additional small utilities.

In [7]:
def jwt_decode(token: str):
    """Poor man's JWT decoding"""

    def _decode(data: str) -> dict:
        decoded = base64.b64decode(data + '=' * (4 - len(data) % 4)).decode('ascii')
        return json.loads(decoded)

    header, payload, signature = token.split('.')
    return _decode(header), _decode(payload)

Get provider info from configuration document.

In [8]:
provider_info = session.get(
    keycloak_base_url + '/realms/master/.well-known/openid-configuration'
).json()
provider_info.keys()

dict_keys(['issuer', 'authorization_endpoint', 'token_endpoint', 'token_introspection_endpoint', 'userinfo_endpoint', 'end_session_endpoint', 'jwks_uri', 'check_session_iframe', 'grant_types_supported', 'response_types_supported', 'subject_types_supported', 'id_token_signing_alg_values_supported', 'id_token_encryption_alg_values_supported', 'id_token_encryption_enc_values_supported', 'userinfo_signing_alg_values_supported', 'request_object_signing_alg_values_supported', 'response_modes_supported', 'registration_endpoint', 'token_endpoint_auth_methods_supported', 'token_endpoint_auth_signing_alg_values_supported', 'claims_supported', 'claim_types_supported', 'claims_parameter_supported', 'scopes_supported', 'request_parameter_supported', 'request_uri_parameter_supported', 'code_challenge_methods_supported', 'tls_client_certificate_bound_access_tokens', 'introspection_endpoint'])

Get the token endpoint URL.

In [9]:
token_endpoint = provider_info["token_endpoint"]
token_endpoint

'http://localhost:9090/auth/realms/master/protocol/openid-connect/token'

# Client credentials flow

Create a client in Keycloak with settings that allow enable Client Credentials Grant. We'll also need the client's secret.

In [10]:
cc_client = keycloak_admin.create_client(options={
    "serviceAccountsEnabled": True,
})
cc_client_secret = keycloak_admin.get_client_secret(cc_client)

cc_client, cc_client_secret

INFO:keycloak-admin:Creating client with settings {'id': 'myclient-0d6770ed', 'serviceAccountsEnabled': True}
INFO:keycloak-admin:Client secret response: '{"type":"secret","value":"007203eb-9e26-46d1-a4b7-183ee00f22c5"}'


('myclient-0d6770ed', '007203eb-9e26-46d1-a4b7-183ee00f22c5')

Do `client_credentials` token request.

In [11]:
r = session.post(
    token_endpoint,
    data={
        "grant_type": "client_credentials",
        "client_id": cc_client,
        "client_secret": cc_client_secret,
    }
)
r.raise_for_status()
r.json()

{'access_token': 'eyJhbGciOiJSUzI1NiIsInR5cCIgOiAiSldUIiwia2lkIiA6ICJXVEdTMXJyN1pTY2RWU05DV1d5M0tiVUJXNFVaUDc2ZFZ1V1l4RTdYSnYwIn0.eyJqdGkiOiJkY2I5ODE1Zi1kNTU0LTQ1ZjMtODE5MS00YTIwNTBkNGI2MjEiLCJleHAiOjE1NzE2NTcyMjQsIm5iZiI6MCwiaWF0IjoxNTcxNjU3MTY0LCJpc3MiOiJodHRwOi8vbG9jYWxob3N0OjkwOTAvYXV0aC9yZWFsbXMvbWFzdGVyIiwiYXVkIjoiYWNjb3VudCIsInN1YiI6IjJhNjY4ZDQ4LTAzNzctNDdiOS05MzBjLWQ4MmE1YWU4YjYyZCIsInR5cCI6IkJlYXJlciIsImF6cCI6Im15Y2xpZW50LTBkNjc3MGVkIiwiYXV0aF90aW1lIjowLCJzZXNzaW9uX3N0YXRlIjoiMjc3Y2JjMTgtMmYzZS00ZDg1LTkwZmItM2VjOWE5MmNjZDlmIiwiYWNyIjoiMSIsInJlYWxtX2FjY2VzcyI6eyJyb2xlcyI6WyJvZmZsaW5lX2FjY2VzcyIsInVtYV9hdXRob3JpemF0aW9uIl19LCJyZXNvdXJjZV9hY2Nlc3MiOnsiYWNjb3VudCI6eyJyb2xlcyI6WyJtYW5hZ2UtYWNjb3VudCIsIm1hbmFnZS1hY2NvdW50LWxpbmtzIiwidmlldy1wcm9maWxlIl19fSwic2NvcGUiOiJlbWFpbCBwcm9maWxlIiwiY2xpZW50SWQiOiJteWNsaWVudC0wZDY3NzBlZCIsImNsaWVudEhvc3QiOiIxNzIuMTcuMC4xIiwiZW1haWxfdmVyaWZpZWQiOmZhbHNlLCJwcmVmZXJyZWRfdXNlcm5hbWUiOiJzZXJ2aWNlLWFjY291bnQtbXljbGllbnQtMGQ2NzcwZWQiLCJjbGllbnRBZGRyZX

## JWT inspection

Extract access token and inspect it (assuming it is a JWT token).

In [12]:
access_token = r.json()["access_token"]
jwt_decode(access_token)

({'alg': 'RS256',
  'typ': 'JWT',
  'kid': 'WTGS1rr7ZScdVSNCWWy3KbUBW4UZP76dVuWYxE7XJv0'},
 {'jti': 'dcb9815f-d554-45f3-8191-4a2050d4b621',
  'exp': 1571657224,
  'nbf': 0,
  'iat': 1571657164,
  'iss': 'http://localhost:9090/auth/realms/master',
  'aud': 'account',
  'sub': '2a668d48-0377-47b9-930c-d82a5ae8b62d',
  'typ': 'Bearer',
  'azp': 'myclient-0d6770ed',
  'auth_time': 0,
  'session_state': '277cbc18-2f3e-4d85-90fb-3ec9a92ccd9f',
  'acr': '1',
  'realm_access': {'roles': ['offline_access', 'uma_authorization']},
  'resource_access': {'account': {'roles': ['manage-account',
     'manage-account-links',
     'view-profile']}},
  'scope': 'email profile',
  'clientId': 'myclient-0d6770ed',
  'clientHost': '172.17.0.1',
  'email_verified': False,
  'preferred_username': 'service-account-myclient-0d6770ed',
  'clientAddress': '172.17.0.1',
  'email': 'service-account-myclient-0d6770ed@placeholder.org'})

## Query `userinfo`

Check the access token against the `userinfo` endpoint

In [13]:
r = session.get(
    provider_info["userinfo_endpoint"], 
    headers={
        "Authorization": "Bearer %s" % access_token
    }
)
r.raise_for_status()
r.json()

{'sub': '2a668d48-0377-47b9-930c-d82a5ae8b62d',
 'email_verified': False,
 'preferred_username': 'service-account-myclient-0d6770ed',
 'email': 'service-account-myclient-0d6770ed@placeholder.org'}

# Resource Owner Password flow

Create a client that allows resource owner password flow.

In [14]:
pwd_client = keycloak_admin.create_client({
    "publicClient": True,
    "directAccessGrantsEnabled": True,
})
pwd_client

INFO:keycloak-admin:Creating client with settings {'id': 'myclient-ab474185', 'publicClient': True, 'directAccessGrantsEnabled': True}


'myclient-ab474185'

In [15]:
user, password = keycloak_admin.create_user()

In [16]:
r = session.post(
    token_endpoint,
    data={
        "grant_type": "password",
        "username": user,
        "password": password,
        "client_id": pwd_client,
    }
)
r.raise_for_status()
r.json()

{'access_token': 'eyJhbGciOiJSUzI1NiIsInR5cCIgOiAiSldUIiwia2lkIiA6ICJXVEdTMXJyN1pTY2RWU05DV1d5M0tiVUJXNFVaUDc2ZFZ1V1l4RTdYSnYwIn0.eyJqdGkiOiJkNDE5MTQ4Zi1kMjA4LTQzMTYtOWNlOS02MDdhZTJhMzVjMjYiLCJleHAiOjE1NzE2NTcyMjQsIm5iZiI6MCwiaWF0IjoxNTcxNjU3MTY0LCJpc3MiOiJodHRwOi8vbG9jYWxob3N0OjkwOTAvYXV0aC9yZWFsbXMvbWFzdGVyIiwiYXVkIjoiYWNjb3VudCIsInN1YiI6ImZmNzdkNDhkLTRlYzEtNDJhMC1iYjdmLTAyODFkYzQ2NDBkMiIsInR5cCI6IkJlYXJlciIsImF6cCI6Im15Y2xpZW50LWFiNDc0MTg1IiwiYXV0aF90aW1lIjowLCJzZXNzaW9uX3N0YXRlIjoiMmI5NjVkOWYtYzFkZC00ZWEzLTk2MzEtYzc2NTEyNDZiNzMxIiwiYWNyIjoiMSIsInJlYWxtX2FjY2VzcyI6eyJyb2xlcyI6WyJvZmZsaW5lX2FjY2VzcyIsInVtYV9hdXRob3JpemF0aW9uIl19LCJyZXNvdXJjZV9hY2Nlc3MiOnsiYWNjb3VudCI6eyJyb2xlcyI6WyJtYW5hZ2UtYWNjb3VudCIsIm1hbmFnZS1hY2NvdW50LWxpbmtzIiwidmlldy1wcm9maWxlIl19fSwic2NvcGUiOiJlbWFpbCBwcm9maWxlIiwiZW1haWxfdmVyaWZpZWQiOmZhbHNlLCJwcmVmZXJyZWRfdXNlcm5hbWUiOiJqb2huLWJmOTk1MDg2In0.Q8u-hMwdAhtfKHRiL7Y_1dPgErN_-LY0QBGRNxNSYi3Bdb43Y7zd8vY2k8C-t6fToor_lETKSizJ-kL_Ad1JTdmdKo3fl7QGCrZ_Xx4qNSl8tbT5rovyah

## JWT inspection

Extract access token and inspect it (assuming it is a JWT token).

In [17]:
access_token = r.json()["access_token"]
jwt_decode(access_token)

({'alg': 'RS256',
  'typ': 'JWT',
  'kid': 'WTGS1rr7ZScdVSNCWWy3KbUBW4UZP76dVuWYxE7XJv0'},
 {'jti': 'd419148f-d208-4316-9ce9-607ae2a35c26',
  'exp': 1571657224,
  'nbf': 0,
  'iat': 1571657164,
  'iss': 'http://localhost:9090/auth/realms/master',
  'aud': 'account',
  'sub': 'ff77d48d-4ec1-42a0-bb7f-0281dc4640d2',
  'typ': 'Bearer',
  'azp': 'myclient-ab474185',
  'auth_time': 0,
  'session_state': '2b965d9f-c1dd-4ea3-9631-c7651246b731',
  'acr': '1',
  'realm_access': {'roles': ['offline_access', 'uma_authorization']},
  'resource_access': {'account': {'roles': ['manage-account',
     'manage-account-links',
     'view-profile']}},
  'scope': 'email profile',
  'email_verified': False,
  'preferred_username': 'john-bf995086'})

## Query `userinfo`

Check the access token against the `userinfo` endpoint.

In [18]:
r = session.get(
    provider_info["userinfo_endpoint"], 
    headers={
        "Authorization": "Bearer %s" % access_token
    }
)
r.raise_for_status()
r.json()

{'sub': 'ff77d48d-4ec1-42a0-bb7f-0281dc4640d2',
 'email_verified': False,
 'preferred_username': 'john-bf995086'}