diff --git a/src/authorization/resolvers.py b/src/authorization/resolvers.py index 0b6fde13..8e330275 100644 --- a/src/authorization/resolvers.py +++ b/src/authorization/resolvers.py @@ -72,10 +72,8 @@ def evaluate_role_rules(rule: JwtRoleRule, jwt_claims: dict[str, Any]) -> UserRo return ( set(rule.roles) if JwtRolesResolver._evaluate_operator( - rule.negate, + rule, [match.value for match in parse(rule.jsonpath).find(jwt_claims)], - rule.operator, - rule.value, ) else set() ) @@ -99,19 +97,26 @@ def _get_claims(auth: AuthTuple) -> dict[str, Any]: @staticmethod def _evaluate_operator( - negate: bool, match: Any, operator: JsonPathOperator, value: Any + rule: JwtRoleRule, match: Any ) -> bool: # pylint: disable=too-many-branches - """Evaluate an operator against a match and value.""" + """Evaluate an operator against a match and rule.""" result = False - match operator: + match rule.operator: case JsonPathOperator.EQUALS: - result = match == value + result = match == rule.value case JsonPathOperator.CONTAINS: - result = value in match + result = rule.value in match case JsonPathOperator.IN: - result = match in value - - if negate: + result = match in rule.value + case JsonPathOperator.MATCH: + # Use the pre-compiled regex pattern for better performance + if rule.compiled_regex is not None: + result = any( + isinstance(item, str) and bool(rule.compiled_regex.search(item)) + for item in match + ) + + if rule.negate: result = not result return result diff --git a/src/models/config.py b/src/models/config.py index 9166eb4b..7e59dbea 100644 --- a/src/models/config.py +++ b/src/models/config.py @@ -1,8 +1,10 @@ """Model with service configuration.""" from pathlib import Path -from typing import Optional, Any +from typing import Optional, Any, Pattern from enum import Enum +from functools import cached_property +import re import jsonpath_ng from jsonpath_ng.exceptions import JSONPathError @@ -233,6 +235,7 @@ class JsonPathOperator(str, Enum): EQUALS = "equals" CONTAINS = "contains" IN = "in" + MATCH = "match" class JwtRoleRule(ConfigurationBase): @@ -272,6 +275,29 @@ def check_roles(self) -> Self: return self + @model_validator(mode="after") + def check_regex_pattern(self) -> Self: + """Verify that regex patterns are valid for MATCH operator.""" + if self.operator == JsonPathOperator.MATCH: + if not isinstance(self.value, str): + raise ValueError( + f"MATCH operator requires a string pattern, {type(self.value).__name__}" + ) + try: + re.compile(self.value) + except re.error as e: + raise ValueError( + f"Invalid regex pattern for MATCH operator: {self.value}: {e}" + ) from e + return self + + @cached_property + def compiled_regex(self) -> Optional[Pattern[str]]: + """Return compiled regex pattern for MATCH operator, None otherwise.""" + if self.operator == JsonPathOperator.MATCH and isinstance(self.value, str): + return re.compile(self.value) + return None + class Action(str, Enum): """Available actions in the system.""" diff --git a/tests/unit/authorization/test_resolvers.py b/tests/unit/authorization/test_resolvers.py index 790edc4f..f8b9720a 100644 --- a/tests/unit/authorization/test_resolvers.py +++ b/tests/unit/authorization/test_resolvers.py @@ -2,6 +2,9 @@ import json import base64 +import re + +import pytest from authorization.resolvers import JwtRolesResolver, GenericAccessResolver from models.config import JwtRoleRule, AccessRule, JsonPathOperator, Action @@ -75,6 +78,120 @@ async def test_resolve_roles_no_match(self): roles = await jwt_resolver.resolve_roles(auth) assert len(roles) == 0 + async def test_resolve_roles_match_operator_email_domain(self): + """Test role extraction using MATCH operator with email domain regex.""" + role_rules = [ + JwtRoleRule( + jsonpath="$.email", + operator=JsonPathOperator.MATCH, + value=r"@redhat\.com$", + roles=["redhat_employee"], + ) + ] + jwt_resolver = JwtRolesResolver(role_rules) + + jwt_claims = { + "exp": 1754489339, + "iat": 1754488439, + "sub": "f:123:employee@redhat.com", + "email": "employee@redhat.com", + } + + auth = ("user", "token", claims_to_token(jwt_claims)) + roles = await jwt_resolver.resolve_roles(auth) + assert "redhat_employee" in roles + + async def test_resolve_roles_match_operator_no_match(self): + """Test role extraction using MATCH operator with no match.""" + role_rules = [ + JwtRoleRule( + jsonpath="$.email", + operator=JsonPathOperator.MATCH, + value=r"@redhat\.com$", + roles=["redhat_employee"], + ) + ] + jwt_resolver = JwtRolesResolver(role_rules) + + jwt_claims = { + "exp": 1754489339, + "iat": 1754488439, + "sub": "f:123:user@example.com", + "email": "user@example.com", + } + + auth = ("user", "token", claims_to_token(jwt_claims)) + roles = await jwt_resolver.resolve_roles(auth) + assert len(roles) == 0 + + async def test_resolve_roles_match_operator_invalid_regex(self): + """Test that invalid regex patterns are rejected at rule creation time.""" + with pytest.raises( + ValueError, match="Invalid regex pattern for MATCH operator" + ): + JwtRoleRule( + jsonpath="$.email", + operator=JsonPathOperator.MATCH, + value="[invalid regex(", # Invalid regex pattern + roles=["test_role"], + ) + + async def test_resolve_roles_match_operator_non_string_pattern(self): + """Test that non-string regex patterns are rejected at rule creation time.""" + with pytest.raises( + ValueError, match="MATCH operator requires a string pattern" + ): + JwtRoleRule( + jsonpath="$.user_id", + operator=JsonPathOperator.MATCH, + value=123, # Non-string pattern + roles=["test_role"], + ) + + async def test_resolve_roles_match_operator_non_string_value(self): + """Test role extraction using MATCH operator with non-string match value.""" + role_rules = [ + JwtRoleRule( + jsonpath="$.user_id", + operator=JsonPathOperator.MATCH, + value=r"\d+", # Number pattern + roles=["numeric_user"], + ) + ] + jwt_resolver = JwtRolesResolver(role_rules) + + jwt_claims = { + "exp": 1754489339, + "iat": 1754488439, + "user_id": 12345, # Non-string value + } + + auth = ("user", "token", claims_to_token(jwt_claims)) + roles = await jwt_resolver.resolve_roles(auth) + assert len(roles) == 0 # Non-string values don't match regex + + async def test_compiled_regex_property(self): + """Test that compiled regex pattern is properly created for MATCH operator.""" + # Test MATCH operator creates compiled regex + match_rule = JwtRoleRule( + jsonpath="$.email", + operator=JsonPathOperator.MATCH, + value=r"@example\.com$", + roles=["example_user"], + ) + assert match_rule.compiled_regex is not None + assert isinstance(match_rule.compiled_regex, re.Pattern) + assert match_rule.compiled_regex.pattern == r"@example\.com$" + + # Test non-MATCH operator returns None + equals_rule = JwtRoleRule( + jsonpath="$.email", + operator=JsonPathOperator.EQUALS, + value="test@example.com", + roles=["example_user"], + ) + assert equals_rule.compiled_regex is None + class TestGenericAccessResolver: """Test cases for GenericAccessResolver."""