Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 16 additions & 11 deletions src/authorization/resolvers.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,8 @@ def evaluate_role_rules(rule: JwtRoleRule, jwt_claims: dict[str, Any]) -> UserRo
return (
set(rule.roles)
if JwtRolesResolver._evaluate_operator(
rule.negate,
rule,
[match.value for match in parse(rule.jsonpath).find(jwt_claims)],
rule.operator,
rule.value,
)
else set()
)
Expand All @@ -99,19 +97,26 @@ def _get_claims(auth: AuthTuple) -> dict[str, Any]:

@staticmethod
def _evaluate_operator(
negate: bool, match: Any, operator: JsonPathOperator, value: Any
rule: JwtRoleRule, match: Any
) -> bool: # pylint: disable=too-many-branches
"""Evaluate an operator against a match and value."""
"""Evaluate an operator against a match and rule."""
result = False
match operator:
match rule.operator:
case JsonPathOperator.EQUALS:
result = match == value
result = match == rule.value
case JsonPathOperator.CONTAINS:
result = value in match
result = rule.value in match
case JsonPathOperator.IN:
result = match in value

if negate:
result = match in rule.value
case JsonPathOperator.MATCH:
# Use the pre-compiled regex pattern for better performance
if rule.compiled_regex is not None:
result = any(
isinstance(item, str) and bool(rule.compiled_regex.search(item))
for item in match
)

if rule.negate:
result = not result

return result
Expand Down
28 changes: 27 additions & 1 deletion src/models/config.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
"""Model with service configuration."""

from pathlib import Path
from typing import Optional, Any
from typing import Optional, Any, Pattern
from enum import Enum
from functools import cached_property
import re

import jsonpath_ng
from jsonpath_ng.exceptions import JSONPathError
Expand Down Expand Up @@ -233,6 +235,7 @@ class JsonPathOperator(str, Enum):
EQUALS = "equals"
CONTAINS = "contains"
IN = "in"
MATCH = "match"


class JwtRoleRule(ConfigurationBase):
Expand Down Expand Up @@ -272,6 +275,29 @@ def check_roles(self) -> Self:

return self

@model_validator(mode="after")
def check_regex_pattern(self) -> Self:
"""Verify that regex patterns are valid for MATCH operator."""
if self.operator == JsonPathOperator.MATCH:
if not isinstance(self.value, str):
raise ValueError(
f"MATCH operator requires a string pattern, {type(self.value).__name__}"
)
try:
re.compile(self.value)
except re.error as e:
raise ValueError(
f"Invalid regex pattern for MATCH operator: {self.value}: {e}"
) from e
return self

@cached_property
def compiled_regex(self) -> Optional[Pattern[str]]:
"""Return compiled regex pattern for MATCH operator, None otherwise."""
if self.operator == JsonPathOperator.MATCH and isinstance(self.value, str):
return re.compile(self.value)
return None


class Action(str, Enum):
"""Available actions in the system."""
Expand Down
117 changes: 117 additions & 0 deletions tests/unit/authorization/test_resolvers.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

import json
import base64
import re

import pytest

from authorization.resolvers import JwtRolesResolver, GenericAccessResolver
from models.config import JwtRoleRule, AccessRule, JsonPathOperator, Action
Expand Down Expand Up @@ -75,6 +78,120 @@ async def test_resolve_roles_no_match(self):
roles = await jwt_resolver.resolve_roles(auth)
assert len(roles) == 0

async def test_resolve_roles_match_operator_email_domain(self):
"""Test role extraction using MATCH operator with email domain regex."""
role_rules = [
JwtRoleRule(
jsonpath="$.email",
operator=JsonPathOperator.MATCH,
value=r"@redhat\.com$",
roles=["redhat_employee"],
)
]
jwt_resolver = JwtRolesResolver(role_rules)

jwt_claims = {
"exp": 1754489339,
"iat": 1754488439,
"sub": "f:123:employee@redhat.com",
"email": "employee@redhat.com",
}

auth = ("user", "token", claims_to_token(jwt_claims))
roles = await jwt_resolver.resolve_roles(auth)
assert "redhat_employee" in roles

async def test_resolve_roles_match_operator_no_match(self):
"""Test role extraction using MATCH operator with no match."""
role_rules = [
JwtRoleRule(
jsonpath="$.email",
operator=JsonPathOperator.MATCH,
value=r"@redhat\.com$",
roles=["redhat_employee"],
)
]
jwt_resolver = JwtRolesResolver(role_rules)

jwt_claims = {
"exp": 1754489339,
"iat": 1754488439,
"sub": "f:123:user@example.com",
"email": "user@example.com",
}

auth = ("user", "token", claims_to_token(jwt_claims))
roles = await jwt_resolver.resolve_roles(auth)
assert len(roles) == 0

async def test_resolve_roles_match_operator_invalid_regex(self):
"""Test that invalid regex patterns are rejected at rule creation time."""
with pytest.raises(
ValueError, match="Invalid regex pattern for MATCH operator"
):
JwtRoleRule(
jsonpath="$.email",
operator=JsonPathOperator.MATCH,
value="[invalid regex(", # Invalid regex pattern
roles=["test_role"],
)

async def test_resolve_roles_match_operator_non_string_pattern(self):
"""Test that non-string regex patterns are rejected at rule creation time."""
with pytest.raises(
ValueError, match="MATCH operator requires a string pattern"
):
JwtRoleRule(
jsonpath="$.user_id",
operator=JsonPathOperator.MATCH,
value=123, # Non-string pattern
roles=["test_role"],
)

async def test_resolve_roles_match_operator_non_string_value(self):
"""Test role extraction using MATCH operator with non-string match value."""
role_rules = [
JwtRoleRule(
jsonpath="$.user_id",
operator=JsonPathOperator.MATCH,
value=r"\d+", # Number pattern
roles=["numeric_user"],
)
]
jwt_resolver = JwtRolesResolver(role_rules)

jwt_claims = {
"exp": 1754489339,
"iat": 1754488439,
"user_id": 12345, # Non-string value
}

auth = ("user", "token", claims_to_token(jwt_claims))
roles = await jwt_resolver.resolve_roles(auth)
assert len(roles) == 0 # Non-string values don't match regex

async def test_compiled_regex_property(self):
"""Test that compiled regex pattern is properly created for MATCH operator."""
# Test MATCH operator creates compiled regex
match_rule = JwtRoleRule(
jsonpath="$.email",
operator=JsonPathOperator.MATCH,
value=r"@example\.com$",
roles=["example_user"],
)
assert match_rule.compiled_regex is not None
assert isinstance(match_rule.compiled_regex, re.Pattern)
assert match_rule.compiled_regex.pattern == r"@example\.com$"

# Test non-MATCH operator returns None
equals_rule = JwtRoleRule(
jsonpath="$.email",
operator=JsonPathOperator.EQUALS,
value="test@example.com",
roles=["example_user"],
)
assert equals_rule.compiled_regex is None


class TestGenericAccessResolver:
"""Test cases for GenericAccessResolver."""
Expand Down