From 17d04196870ec868b7102935b11aa2a99cee98ec Mon Sep 17 00:00:00 2001 From: Omer Tuchfeld Date: Tue, 2 Sep 2025 11:49:01 +0200 Subject: [PATCH] Regex operator for JWT role extraction When we originally added the JWT role extraction operators, we made some basic ones which thought would be enough, but we found that there's too much complexity and variance in the various fields we find in Red Hat SSO tokens, so we realized we're going to need a regex operator to handle more complex matching scenarios This commit simply adds a new `MATCH` operator to the `JsonPathOperator` enum, and implements the logic to handle it in the `JwtRolesResolver`. `_evaluate_operator` had to be modified to accept the entire `JwtRoleRule` so that it can access the pre-compiled regex pattern for better performance. --- src/authorization/resolvers.py | 27 +++-- src/models/config.py | 28 ++++- tests/unit/authorization/test_resolvers.py | 117 +++++++++++++++++++++ 3 files changed, 160 insertions(+), 12 deletions(-) diff --git a/src/authorization/resolvers.py b/src/authorization/resolvers.py index 0b6fde13..8e330275 100644 --- a/src/authorization/resolvers.py +++ b/src/authorization/resolvers.py @@ -72,10 +72,8 @@ def evaluate_role_rules(rule: JwtRoleRule, jwt_claims: dict[str, Any]) -> UserRo return ( set(rule.roles) if JwtRolesResolver._evaluate_operator( - rule.negate, + rule, [match.value for match in parse(rule.jsonpath).find(jwt_claims)], - rule.operator, - rule.value, ) else set() ) @@ -99,19 +97,26 @@ def _get_claims(auth: AuthTuple) -> dict[str, Any]: @staticmethod def _evaluate_operator( - negate: bool, match: Any, operator: JsonPathOperator, value: Any + rule: JwtRoleRule, match: Any ) -> bool: # pylint: disable=too-many-branches - """Evaluate an operator against a match and value.""" + """Evaluate an operator against a match and rule.""" result = False - match operator: + match rule.operator: case JsonPathOperator.EQUALS: - result = match == value + result = match == rule.value case JsonPathOperator.CONTAINS: - result = value in match + result = rule.value in match case JsonPathOperator.IN: - result = match in value - - if negate: + result = match in rule.value + case JsonPathOperator.MATCH: + # Use the pre-compiled regex pattern for better performance + if rule.compiled_regex is not None: + result = any( + isinstance(item, str) and bool(rule.compiled_regex.search(item)) + for item in match + ) + + if rule.negate: result = not result return result diff --git a/src/models/config.py b/src/models/config.py index 9166eb4b..7e59dbea 100644 --- a/src/models/config.py +++ b/src/models/config.py @@ -1,8 +1,10 @@ """Model with service configuration.""" from pathlib import Path -from typing import Optional, Any +from typing import Optional, Any, Pattern from enum import Enum +from functools import cached_property +import re import jsonpath_ng from jsonpath_ng.exceptions import JSONPathError @@ -233,6 +235,7 @@ class JsonPathOperator(str, Enum): EQUALS = "equals" CONTAINS = "contains" IN = "in" + MATCH = "match" class JwtRoleRule(ConfigurationBase): @@ -272,6 +275,29 @@ def check_roles(self) -> Self: return self + @model_validator(mode="after") + def check_regex_pattern(self) -> Self: + """Verify that regex patterns are valid for MATCH operator.""" + if self.operator == JsonPathOperator.MATCH: + if not isinstance(self.value, str): + raise ValueError( + f"MATCH operator requires a string pattern, {type(self.value).__name__}" + ) + try: + re.compile(self.value) + except re.error as e: + raise ValueError( + f"Invalid regex pattern for MATCH operator: {self.value}: {e}" + ) from e + return self + + @cached_property + def compiled_regex(self) -> Optional[Pattern[str]]: + """Return compiled regex pattern for MATCH operator, None otherwise.""" + if self.operator == JsonPathOperator.MATCH and isinstance(self.value, str): + return re.compile(self.value) + return None + class Action(str, Enum): """Available actions in the system.""" diff --git a/tests/unit/authorization/test_resolvers.py b/tests/unit/authorization/test_resolvers.py index 790edc4f..f8b9720a 100644 --- a/tests/unit/authorization/test_resolvers.py +++ b/tests/unit/authorization/test_resolvers.py @@ -2,6 +2,9 @@ import json import base64 +import re + +import pytest from authorization.resolvers import JwtRolesResolver, GenericAccessResolver from models.config import JwtRoleRule, AccessRule, JsonPathOperator, Action @@ -75,6 +78,120 @@ async def test_resolve_roles_no_match(self): roles = await jwt_resolver.resolve_roles(auth) assert len(roles) == 0 + async def test_resolve_roles_match_operator_email_domain(self): + """Test role extraction using MATCH operator with email domain regex.""" + role_rules = [ + JwtRoleRule( + jsonpath="$.email", + operator=JsonPathOperator.MATCH, + value=r"@redhat\.com$", + roles=["redhat_employee"], + ) + ] + jwt_resolver = JwtRolesResolver(role_rules) + + jwt_claims = { + "exp": 1754489339, + "iat": 1754488439, + "sub": "f:123:employee@redhat.com", + "email": "employee@redhat.com", + } + + auth = ("user", "token", claims_to_token(jwt_claims)) + roles = await jwt_resolver.resolve_roles(auth) + assert "redhat_employee" in roles + + async def test_resolve_roles_match_operator_no_match(self): + """Test role extraction using MATCH operator with no match.""" + role_rules = [ + JwtRoleRule( + jsonpath="$.email", + operator=JsonPathOperator.MATCH, + value=r"@redhat\.com$", + roles=["redhat_employee"], + ) + ] + jwt_resolver = JwtRolesResolver(role_rules) + + jwt_claims = { + "exp": 1754489339, + "iat": 1754488439, + "sub": "f:123:user@example.com", + "email": "user@example.com", + } + + auth = ("user", "token", claims_to_token(jwt_claims)) + roles = await jwt_resolver.resolve_roles(auth) + assert len(roles) == 0 + + async def test_resolve_roles_match_operator_invalid_regex(self): + """Test that invalid regex patterns are rejected at rule creation time.""" + with pytest.raises( + ValueError, match="Invalid regex pattern for MATCH operator" + ): + JwtRoleRule( + jsonpath="$.email", + operator=JsonPathOperator.MATCH, + value="[invalid regex(", # Invalid regex pattern + roles=["test_role"], + ) + + async def test_resolve_roles_match_operator_non_string_pattern(self): + """Test that non-string regex patterns are rejected at rule creation time.""" + with pytest.raises( + ValueError, match="MATCH operator requires a string pattern" + ): + JwtRoleRule( + jsonpath="$.user_id", + operator=JsonPathOperator.MATCH, + value=123, # Non-string pattern + roles=["test_role"], + ) + + async def test_resolve_roles_match_operator_non_string_value(self): + """Test role extraction using MATCH operator with non-string match value.""" + role_rules = [ + JwtRoleRule( + jsonpath="$.user_id", + operator=JsonPathOperator.MATCH, + value=r"\d+", # Number pattern + roles=["numeric_user"], + ) + ] + jwt_resolver = JwtRolesResolver(role_rules) + + jwt_claims = { + "exp": 1754489339, + "iat": 1754488439, + "user_id": 12345, # Non-string value + } + + auth = ("user", "token", claims_to_token(jwt_claims)) + roles = await jwt_resolver.resolve_roles(auth) + assert len(roles) == 0 # Non-string values don't match regex + + async def test_compiled_regex_property(self): + """Test that compiled regex pattern is properly created for MATCH operator.""" + # Test MATCH operator creates compiled regex + match_rule = JwtRoleRule( + jsonpath="$.email", + operator=JsonPathOperator.MATCH, + value=r"@example\.com$", + roles=["example_user"], + ) + assert match_rule.compiled_regex is not None + assert isinstance(match_rule.compiled_regex, re.Pattern) + assert match_rule.compiled_regex.pattern == r"@example\.com$" + + # Test non-MATCH operator returns None + equals_rule = JwtRoleRule( + jsonpath="$.email", + operator=JsonPathOperator.EQUALS, + value="test@example.com", + roles=["example_user"], + ) + assert equals_rule.compiled_regex is None + class TestGenericAccessResolver: """Test cases for GenericAccessResolver."""