Skip to content

Commit

Permalink
refactor: refactored decode_token
Browse files Browse the repository at this point in the history
complete refactor of the decode_token method in oid

BREAKING CHANGE: changes signatures significantly
  • Loading branch information
ryshoooo committed Apr 27, 2024
1 parent f0c731b commit 4dc5b0a
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 44 deletions.
6 changes: 3 additions & 3 deletions docs/source/modules/openid_client.rst
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,9 @@ Decode token

.. code-block:: python
KEYCLOAK_PUBLIC_KEY = "-----BEGIN PUBLIC KEY-----\n" + keycloak_openid.public_key() + "\n-----END PUBLIC KEY-----"
options = {"verify_signature": True, "verify_aud": True, "verify_exp": True}
token_info = keycloak_openid.decode_token(token['access_token'], key=KEYCLOAK_PUBLIC_KEY, options=options)
token_info = keycloak_openid.decode_token(token['access_token'])
# Without validation
token_info = keycloak_openid.decode_token(token['access_token'], validate=False)
Get UMA-permissions by token
Expand Down
39 changes: 22 additions & 17 deletions src/keycloak/keycloak_openid.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ def _token_info(self, token, method_token_info, **kwargs):
:type token: str
:param method_token_info: Token info method to use
:type method_token_info: str
:param kwargs: Additional keyword arguments
:param kwargs: Additional keyword arguments passed to the decode_token method
:type kwargs: dict
:returns: Token info
:rtype: dict
Expand Down Expand Up @@ -516,7 +516,7 @@ def introspect(self, token, rpt=None, token_type_hint=None):
data_raw = self.connection.raw_post(URL_INTROSPECT.format(**params_path), data=payload)
return raise_error_from_response(data_raw, KeycloakPostError)

def decode_token(self, token, key, algorithms=["RS256"], **kwargs):
def decode_token(self, token, validate: bool = True, **kwargs):
"""Decode user token.
A JSON Web Key (JWK) is a JavaScript Object Notation (JSON) data
Expand All @@ -530,25 +530,30 @@ def decode_token(self, token, key, algorithms=["RS256"], **kwargs):
:param token: Keycloak token
:type token: str
:param key: Decode key
:type key: str
:param algorithms: Algorithms to use for decoding
:type algorithms: list[str]
:param kwargs: Keyword arguments
:param validate: Determines whether the token should be validated with the public key.
Defaults to True.
:type validate: bool
:param kwargs: Additional keyword arguments for jwcrypto's JWT object
:type kwargs: dict
:returns: Decoded token
:rtype: dict
"""
# To keep the same API, we map the python-jose options to our claims for jwcrypto
# Per the jwcrypto dev, `exp` and `nbf` are always checked
options = kwargs.get("options", {})
check_claims = {}
if options.get("verify_aud") is True:
check_claims["aud"] = self.client_id

k = jwk.JWK.from_pem(key.encode("utf-8"))
full_jwt = jwt.JWT(jwt=token, key=k, algs=algorithms, check_claims=check_claims)
return jwt.json_decode(full_jwt.claims)
if validate:
if "key" not in kwargs:
key = (
"-----BEGIN PUBLIC KEY-----\n"
+ self.public_key()
+ "\n-----END PUBLIC KEY-----"
)
key = jwk.JWK.from_pem(key.encode("utf-8"))
kwargs["key"] = key

full_jwt = jwt.JWT(jwt=token, **kwargs)
return jwt.json_decode(full_jwt.claims)
else:
full_jwt = jwt.JWT(jwt=token, **kwargs)
full_jwt.token.objects["valid"] = True
return json.loads(full_jwt.token.payload.decode("utf-8"))

def load_authorization_config(self, path):
"""Load Keycloak settings (authorization).
Expand Down
36 changes: 12 additions & 24 deletions tests/test_keycloak_openid.py
Original file line number Diff line number Diff line change
Expand Up @@ -307,15 +307,13 @@ def test_decode_token(oid_with_credentials: Tuple[KeycloakOpenID, str, str]):
"""
oid, username, password = oid_with_credentials
token = oid.token(username=username, password=password)
decoded_access_token = oid.decode_token(token=token["access_token"])
decoded_access_token_2 = oid.decode_token(token=token["access_token"], validate=False)
decoded_refresh_token = oid.decode_token(token=token["refresh_token"], validate=False)

assert (
oid.decode_token(
token=token["access_token"],
key="-----BEGIN PUBLIC KEY-----\n" + oid.public_key() + "\n-----END PUBLIC KEY-----",
options={"verify_aud": False},
)["preferred_username"]
== username
)
assert decoded_access_token == decoded_access_token_2
assert decoded_access_token["preferred_username"] == username, decoded_access_token
assert decoded_refresh_token["typ"] == "Refresh", decoded_refresh_token


def test_load_authorization_config(oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]):
Expand Down Expand Up @@ -354,20 +352,17 @@ def test_get_policies(oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str
oid.load_authorization_config(path="tests/data/authz_settings.json")
assert oid.get_policies(token=token["access_token"]) is None

key = "-----BEGIN PUBLIC KEY-----\n" + oid.public_key() + "\n-----END PUBLIC KEY-----"
orig_client_id = oid.client_id
oid.client_id = "account"
assert oid.get_policies(token=token["access_token"], method_token_info="decode", key=key) == []
assert oid.get_policies(token=token["access_token"], method_token_info="decode") == []
policy = Policy(name="test", type="role", logic="POSITIVE", decision_strategy="UNANIMOUS")
policy.add_role(role="account/view-profile")
oid.authorization.policies["test"] = policy
assert [
str(x)
for x in oid.get_policies(token=token["access_token"], method_token_info="decode", key=key)
str(x) for x in oid.get_policies(token=token["access_token"], method_token_info="decode")
] == ["Policy: test (role)"]
assert [
repr(x)
for x in oid.get_policies(token=token["access_token"], method_token_info="decode", key=key)
repr(x) for x in oid.get_policies(token=token["access_token"], method_token_info="decode")
] == ["<Policy: test (role)>"]
oid.client_id = orig_client_id

Expand All @@ -392,12 +387,9 @@ def test_get_permissions(oid_with_credentials_authz: Tuple[KeycloakOpenID, str,
oid.load_authorization_config(path="tests/data/authz_settings.json")
assert oid.get_permissions(token=token["access_token"]) is None

key = "-----BEGIN PUBLIC KEY-----\n" + oid.public_key() + "\n-----END PUBLIC KEY-----"
orig_client_id = oid.client_id
oid.client_id = "account"
assert (
oid.get_permissions(token=token["access_token"], method_token_info="decode", key=key) == []
)
assert oid.get_permissions(token=token["access_token"], method_token_info="decode") == []
policy = Policy(name="test", type="role", logic="POSITIVE", decision_strategy="UNANIMOUS")
policy.add_role(role="account/view-profile")
policy.add_permission(
Expand All @@ -408,15 +400,11 @@ def test_get_permissions(oid_with_credentials_authz: Tuple[KeycloakOpenID, str,
oid.authorization.policies["test"] = policy
assert [
str(x)
for x in oid.get_permissions(
token=token["access_token"], method_token_info="decode", key=key
)
for x in oid.get_permissions(token=token["access_token"], method_token_info="decode")
] == ["Permission: test-perm (resource)"]
assert [
repr(x)
for x in oid.get_permissions(
token=token["access_token"], method_token_info="decode", key=key
)
for x in oid.get_permissions(token=token["access_token"], method_token_info="decode")
] == ["<Permission: test-perm (resource)>"]
oid.client_id = orig_client_id

Expand Down

0 comments on commit 4dc5b0a

Please sign in to comment.