diff --git a/docs/source/modules/openid_client.rst b/docs/source/modules/openid_client.rst index 265bf55..fc61697 100644 --- a/docs/source/modules/openid_client.rst +++ b/docs/source/modules/openid_client.rst @@ -116,9 +116,9 @@ Decode token .. code-block:: python - KEYCLOAK_PUBLIC_KEY = "-----BEGIN PUBLIC KEY-----\n" + keycloak_openid.public_key() + "\n-----END PUBLIC KEY-----" - options = {"verify_signature": True, "verify_aud": True, "verify_exp": True} - token_info = keycloak_openid.decode_token(token['access_token'], key=KEYCLOAK_PUBLIC_KEY, options=options) + token_info = keycloak_openid.decode_token(token['access_token']) + # Without validation + token_info = keycloak_openid.decode_token(token['access_token'], validate=False) Get UMA-permissions by token diff --git a/src/keycloak/keycloak_openid.py b/src/keycloak/keycloak_openid.py index 8fdbfd9..0c14709 100644 --- a/src/keycloak/keycloak_openid.py +++ b/src/keycloak/keycloak_openid.py @@ -212,7 +212,7 @@ def _token_info(self, token, method_token_info, **kwargs): :type token: str :param method_token_info: Token info method to use :type method_token_info: str - :param kwargs: Additional keyword arguments + :param kwargs: Additional keyword arguments passed to the decode_token method :type kwargs: dict :returns: Token info :rtype: dict @@ -516,7 +516,7 @@ def introspect(self, token, rpt=None, token_type_hint=None): data_raw = self.connection.raw_post(URL_INTROSPECT.format(**params_path), data=payload) return raise_error_from_response(data_raw, KeycloakPostError) - def decode_token(self, token, key, algorithms=["RS256"], **kwargs): + def decode_token(self, token, validate: bool = True, **kwargs): """Decode user token. A JSON Web Key (JWK) is a JavaScript Object Notation (JSON) data @@ -530,25 +530,30 @@ def decode_token(self, token, key, algorithms=["RS256"], **kwargs): :param token: Keycloak token :type token: str - :param key: Decode key - :type key: str - :param algorithms: Algorithms to use for decoding - :type algorithms: list[str] - :param kwargs: Keyword arguments + :param validate: Determines whether the token should be validated with the public key. + Defaults to True. + :type validate: bool + :param kwargs: Additional keyword arguments for jwcrypto's JWT object :type kwargs: dict :returns: Decoded token :rtype: dict """ - # To keep the same API, we map the python-jose options to our claims for jwcrypto - # Per the jwcrypto dev, `exp` and `nbf` are always checked - options = kwargs.get("options", {}) - check_claims = {} - if options.get("verify_aud") is True: - check_claims["aud"] = self.client_id - - k = jwk.JWK.from_pem(key.encode("utf-8")) - full_jwt = jwt.JWT(jwt=token, key=k, algs=algorithms, check_claims=check_claims) - return jwt.json_decode(full_jwt.claims) + if validate: + if "key" not in kwargs: + key = ( + "-----BEGIN PUBLIC KEY-----\n" + + self.public_key() + + "\n-----END PUBLIC KEY-----" + ) + key = jwk.JWK.from_pem(key.encode("utf-8")) + kwargs["key"] = key + + full_jwt = jwt.JWT(jwt=token, **kwargs) + return jwt.json_decode(full_jwt.claims) + else: + full_jwt = jwt.JWT(jwt=token, **kwargs) + full_jwt.token.objects["valid"] = True + return json.loads(full_jwt.token.payload.decode("utf-8")) def load_authorization_config(self, path): """Load Keycloak settings (authorization). diff --git a/tests/test_keycloak_openid.py b/tests/test_keycloak_openid.py index a41f454..dd3067a 100644 --- a/tests/test_keycloak_openid.py +++ b/tests/test_keycloak_openid.py @@ -307,15 +307,13 @@ def test_decode_token(oid_with_credentials: Tuple[KeycloakOpenID, str, str]): """ oid, username, password = oid_with_credentials token = oid.token(username=username, password=password) + decoded_access_token = oid.decode_token(token=token["access_token"]) + decoded_access_token_2 = oid.decode_token(token=token["access_token"], validate=False) + decoded_refresh_token = oid.decode_token(token=token["refresh_token"], validate=False) - assert ( - oid.decode_token( - token=token["access_token"], - key="-----BEGIN PUBLIC KEY-----\n" + oid.public_key() + "\n-----END PUBLIC KEY-----", - options={"verify_aud": False}, - )["preferred_username"] - == username - ) + assert decoded_access_token == decoded_access_token_2 + assert decoded_access_token["preferred_username"] == username, decoded_access_token + assert decoded_refresh_token["typ"] == "Refresh", decoded_refresh_token def test_load_authorization_config(oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]): @@ -354,20 +352,17 @@ def test_get_policies(oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str oid.load_authorization_config(path="tests/data/authz_settings.json") assert oid.get_policies(token=token["access_token"]) is None - key = "-----BEGIN PUBLIC KEY-----\n" + oid.public_key() + "\n-----END PUBLIC KEY-----" orig_client_id = oid.client_id oid.client_id = "account" - assert oid.get_policies(token=token["access_token"], method_token_info="decode", key=key) == [] + assert oid.get_policies(token=token["access_token"], method_token_info="decode") == [] policy = Policy(name="test", type="role", logic="POSITIVE", decision_strategy="UNANIMOUS") policy.add_role(role="account/view-profile") oid.authorization.policies["test"] = policy assert [ - str(x) - for x in oid.get_policies(token=token["access_token"], method_token_info="decode", key=key) + str(x) for x in oid.get_policies(token=token["access_token"], method_token_info="decode") ] == ["Policy: test (role)"] assert [ - repr(x) - for x in oid.get_policies(token=token["access_token"], method_token_info="decode", key=key) + repr(x) for x in oid.get_policies(token=token["access_token"], method_token_info="decode") ] == [""] oid.client_id = orig_client_id @@ -392,12 +387,9 @@ def test_get_permissions(oid_with_credentials_authz: Tuple[KeycloakOpenID, str, oid.load_authorization_config(path="tests/data/authz_settings.json") assert oid.get_permissions(token=token["access_token"]) is None - key = "-----BEGIN PUBLIC KEY-----\n" + oid.public_key() + "\n-----END PUBLIC KEY-----" orig_client_id = oid.client_id oid.client_id = "account" - assert ( - oid.get_permissions(token=token["access_token"], method_token_info="decode", key=key) == [] - ) + assert oid.get_permissions(token=token["access_token"], method_token_info="decode") == [] policy = Policy(name="test", type="role", logic="POSITIVE", decision_strategy="UNANIMOUS") policy.add_role(role="account/view-profile") policy.add_permission( @@ -408,15 +400,11 @@ def test_get_permissions(oid_with_credentials_authz: Tuple[KeycloakOpenID, str, oid.authorization.policies["test"] = policy assert [ str(x) - for x in oid.get_permissions( - token=token["access_token"], method_token_info="decode", key=key - ) + for x in oid.get_permissions(token=token["access_token"], method_token_info="decode") ] == ["Permission: test-perm (resource)"] assert [ repr(x) - for x in oid.get_permissions( - token=token["access_token"], method_token_info="decode", key=key - ) + for x in oid.get_permissions(token=token["access_token"], method_token_info="decode") ] == [""] oid.client_id = orig_client_id