diff --git a/src/authorization/resolvers.py b/src/authorization/resolvers.py index 7531b169..13e2e86b 100644 --- a/src/authorization/resolvers.py +++ b/src/authorization/resolvers.py @@ -87,12 +87,6 @@ def _get_claims(auth: AuthTuple) -> dict[str, Any]: return {} jwt_claims = unsafe_get_claims(token) - - if not jwt_claims: - raise RoleResolutionError( - "Invalid authentication token: no JWT claims found" - ) - return jwt_claims @staticmethod diff --git a/tests/unit/authorization/test_middleware.py b/tests/unit/authorization/test_middleware.py new file mode 100644 index 00000000..272ccfdd --- /dev/null +++ b/tests/unit/authorization/test_middleware.py @@ -0,0 +1,269 @@ +"""Unit tests for the authorization middleware.""" + +import pytest +from fastapi import HTTPException, status +from starlette.requests import Request + +from authorization.middleware import ( + get_authorization_resolvers, + _perform_authorization_check, + authorize, +) +from authorization.resolvers import ( + NoopRolesResolver, + NoopAccessResolver, + JwtRolesResolver, + GenericAccessResolver, +) +from models.config import Action, JwtRoleRule, AccessRule, JsonPathOperator +import constants + + +@pytest.fixture(name="dummy_auth_tuple") +def fixture_dummy_auth_tuple(): + """Standard auth tuple for testing.""" + return ("user_id", "username", False, "mock_token") + + +class TestGetAuthorizationResolvers: + """Test cases for the get_authorization_resolvers function.""" + + @pytest.fixture + def mock_configuration(self, mocker): + """Mock configuration object.""" + config = mocker.MagicMock() + config.authorization_configuration.access_rules = [] + config.authentication_configuration.jwk_configuration.jwt_configuration.role_rules = ( + [] + ) + return config + + @pytest.fixture + def sample_access_rule(self): + """Sample access rule for testing.""" + return AccessRule(role="test", actions=[Action.QUERY]) + + @pytest.fixture + def sample_role_rule(self): + """Sample role rule for testing.""" + return JwtRoleRule( + jsonpath="$.test", + operator=JsonPathOperator.EQUALS, + value="test", + roles=["test"], + ) + + @pytest.mark.parametrize( + "auth_module,expected_types", + [ + (constants.AUTH_MOD_NOOP, (NoopRolesResolver, NoopAccessResolver)), + (constants.AUTH_MOD_K8S, (NoopRolesResolver, NoopAccessResolver)), + ( + constants.AUTH_MOD_NOOP_WITH_TOKEN, + (NoopRolesResolver, NoopAccessResolver), + ), + ], + ) + def test_noop_auth_modules( + self, mocker, mock_configuration, auth_module, expected_types + ): + """Test resolver selection for noop-style authentication modules.""" + mock_configuration.authentication_configuration.module = auth_module + mocker.patch("authorization.middleware.configuration", mock_configuration) + + roles_resolver, access_resolver = get_authorization_resolvers() + + assert isinstance(roles_resolver, expected_types[0]) + assert isinstance(access_resolver, expected_types[1]) + + @pytest.mark.parametrize( + "empty_rules", ["role_rules", "access_rules", "both_rules"] + ) + def test_jwk_token_with_empty_rules( + self, + mocker, + mock_configuration, + sample_access_rule, + sample_role_rule, + empty_rules, + ): # pylint: disable=too-many-arguments,too-many-positional-arguments + """Test JWK token auth falls back to noop when rules are missing.""" + get_authorization_resolvers.cache_clear() + + mock_configuration.authentication_configuration.module = ( + constants.AUTH_MOD_JWK_TOKEN + ) + + # Create a real rule for the non-empty case + if empty_rules == "role_rules": + mock_configuration.authorization_configuration.access_rules = [ + sample_access_rule + ] + elif empty_rules == "access_rules": + jwt_config = ( + mock_configuration.authentication_configuration.jwk_configuration.jwt_configuration + ) + jwt_config.role_rules = [sample_role_rule] + elif empty_rules == "both_rules": + # For "both_rules", both lists remain empty (default in fixture) + pass + + mocker.patch("authorization.middleware.configuration", mock_configuration) + + roles_resolver, access_resolver = get_authorization_resolvers() + assert isinstance(roles_resolver, NoopRolesResolver) + assert isinstance(access_resolver, NoopAccessResolver) + + def test_jwk_token_with_rules( + self, mocker, mock_configuration, sample_access_rule, sample_role_rule + ): + """Test JWK token auth with configured rules returns proper resolvers.""" + get_authorization_resolvers.cache_clear() + + mock_configuration.authentication_configuration.module = ( + constants.AUTH_MOD_JWK_TOKEN + ) + mock_configuration.authorization_configuration.access_rules = [ + sample_access_rule + ] + jwt_config = ( + mock_configuration.authentication_configuration.jwk_configuration.jwt_configuration + ) + jwt_config.role_rules = [sample_role_rule] + mocker.patch("authorization.middleware.configuration", mock_configuration) + + roles_resolver, access_resolver = get_authorization_resolvers() + assert isinstance(roles_resolver, JwtRolesResolver) + assert isinstance(access_resolver, GenericAccessResolver) + + def test_unknown_auth_module(self, mocker, mock_configuration): + """Test unknown authentication module raises HTTPException.""" + # Clear the cache to avoid cached results + get_authorization_resolvers.cache_clear() + + mock_configuration.authentication_configuration.module = "unknown" + mocker.patch("authorization.middleware.configuration", mock_configuration) + + with pytest.raises(HTTPException) as exc_info: + get_authorization_resolvers() + + assert exc_info.value.status_code == status.HTTP_500_INTERNAL_SERVER_ERROR + + +class TestPerformAuthorizationCheck: + """Test cases for _perform_authorization_check function.""" + + @pytest.fixture + def mock_resolvers(self, mocker): + """Mock role and access resolvers.""" + role_resolver = mocker.AsyncMock() + access_resolver = mocker.MagicMock() + role_resolver.resolve_roles.return_value = {"employee"} + access_resolver.check_access.return_value = True + access_resolver.get_actions.return_value = {Action.QUERY} + return role_resolver, access_resolver + + async def test_missing_auth_kwarg(self): + """Test KeyError when auth dependency is missing.""" + with pytest.raises(HTTPException) as exc_info: + await _perform_authorization_check(Action.QUERY, (), {}) + + assert exc_info.value.status_code == status.HTTP_500_INTERNAL_SERVER_ERROR + + async def test_access_denied(self, mocker, dummy_auth_tuple, mock_resolvers): + """Test HTTPException when access is denied.""" + role_resolver, access_resolver = mock_resolvers + access_resolver.check_access.return_value = False # Override to deny access + + mocker.patch( + "authorization.middleware.get_authorization_resolvers", + return_value=(role_resolver, access_resolver), + ) + + with pytest.raises(HTTPException) as exc_info: + await _perform_authorization_check( + Action.ADMIN, (), {"auth": dummy_auth_tuple} + ) + + assert exc_info.value.status_code == status.HTTP_403_FORBIDDEN + assert ( + "Insufficient permissions for action: Action.ADMIN" in exc_info.value.detail + ) + + @pytest.mark.parametrize("request_location", ["kwargs", "args", "none"]) + async def test_request_state_handling( + self, mocker, dummy_auth_tuple, mock_resolvers, request_location + ): + """Test that authorized_actions are set on request state when present.""" + mocker.patch( + "authorization.middleware.get_authorization_resolvers", + return_value=mock_resolvers, + ) + + mock_request = mocker.MagicMock(spec=Request) + mock_request.state = mocker.MagicMock() + + kwargs = {"auth": dummy_auth_tuple} + args = () + + if request_location == "kwargs": + kwargs["request"] = mock_request + elif request_location == "args": + args = (mock_request,) + + await _perform_authorization_check(Action.QUERY, args, kwargs) + + if request_location != "none": + assert mock_request.state.authorized_actions == {Action.QUERY} + + async def test_everyone_role_added(self, mocker, dummy_auth_tuple, mock_resolvers): + """Test that everyone (*) role is always added to user roles.""" + role_resolver, access_resolver = mock_resolvers + mocker.patch( + "authorization.middleware.get_authorization_resolvers", + return_value=(role_resolver, access_resolver), + ) + + await _perform_authorization_check(Action.QUERY, (), {"auth": dummy_auth_tuple}) + + # Verify check_access was called with both user roles and everyone role + access_resolver.check_access.assert_called_once_with( + Action.QUERY, {"employee", "*"} + ) + + +class TestAuthorizeDecorator: + """Test cases for authorize decorator.""" + + async def test_decorator_success(self, mocker, dummy_auth_tuple): + """Test successful authorization through decorator.""" + + @authorize(Action.QUERY) + async def mock_endpoint(**_): + return "success" + + mocker.patch( + "authorization.middleware._perform_authorization_check", return_value=None + ) + + result = await mock_endpoint(auth=dummy_auth_tuple) + assert result == "success" + + async def test_decorator_failure(self, mocker, dummy_auth_tuple): + """Test authorization failure through decorator.""" + + @authorize(Action.ADMIN) + async def mock_endpoint(**_): + return "success" + + mocker.patch( + "authorization.middleware._perform_authorization_check", + side_effect=HTTPException( + status_code=status.HTTP_403_FORBIDDEN, detail="Insufficient permissions" + ), + ) + + with pytest.raises(HTTPException) as exc_info: + await mock_endpoint(auth=dummy_auth_tuple) + + assert exc_info.value.status_code == status.HTTP_403_FORBIDDEN diff --git a/tests/unit/authorization/test_resolvers.py b/tests/unit/authorization/test_resolvers.py index d0dbf36e..138d4ca4 100644 --- a/tests/unit/authorization/test_resolvers.py +++ b/tests/unit/authorization/test_resolvers.py @@ -3,11 +3,13 @@ import json import base64 import re +from contextlib import nullcontext as does_not_raise import pytest from authorization.resolvers import JwtRolesResolver, GenericAccessResolver from models.config import JwtRoleRule, AccessRule, JsonPathOperator, Action +import constants def claims_to_token(claims: dict) -> str: @@ -21,25 +23,38 @@ def claims_to_token(claims: dict) -> str: return f"foo_header.{b64_encoded_claims}.foo_signature" +def claims_to_auth_tuple(claims: dict) -> tuple: + """Convert JWT claims dictionary to an auth tuple.""" + return ("user", "token", False, claims_to_token(claims)) + + class TestJwtRolesResolver: """Test cases for JwtRolesResolver.""" - async def test_resolve_roles_redhat_employee(self): - """Test role extraction for RedHat employee JWT.""" - role_rules = [ - JwtRoleRule( - jsonpath="$.realm_access.roles[*]", - operator=JsonPathOperator.CONTAINS, - value="redhat:employees", - roles=["employee"], - ) - ] - jwt_resolver = JwtRolesResolver(role_rules) + @pytest.fixture + async def employee_role_rule(self): + """Role rule for RedHat employees.""" + return JwtRoleRule( + jsonpath="$.realm_access.roles[*]", + operator=JsonPathOperator.CONTAINS, + value="redhat:employees", + roles=["employee"], + ) - jwt_claims = { + @pytest.fixture + async def employee_resolver(self, employee_role_rule): + """JwtRolesResolver with a rule for RedHat employees.""" + return JwtRolesResolver([employee_role_rule]) + + @pytest.fixture + async def employee_claims(self): + """JWT claims for a RedHat employee.""" + return { + "foo": "bar", "exp": 1754489339, "iat": 1754488439, "sub": "f:123:employee@redhat.com", + "email": "employee@redhat.com", "realm_access": { "roles": [ "uma_authorization", @@ -49,80 +64,122 @@ async def test_resolve_roles_redhat_employee(self): }, } - # Mock auth tuple with JWT claims as third element - auth = ("user", "token", False, claims_to_token(jwt_claims)) - roles = await jwt_resolver.resolve_roles(auth) - assert "employee" in roles - - async def test_resolve_roles_no_match(self): - """Test role extraction when no rules match.""" - role_rules = [ - JwtRoleRule( - jsonpath="$.realm_access.roles[*]", - operator=JsonPathOperator.CONTAINS, - value="redhat:employees", - roles=["employee"], - ) - ] - jwt_resolver = JwtRolesResolver(role_rules) - - jwt_claims = { + @pytest.fixture + async def non_employee_claims(self): + """JWT claims for a non-RedHat employee.""" + return { "exp": 1754489339, "iat": 1754488439, "sub": "f:123:user@example.com", "realm_access": {"roles": ["uma_authorization", "default-roles-example"]}, } - # Mock auth tuple with JWT claims as third element - auth = ("user", "token", False, claims_to_token(jwt_claims)) - roles = await jwt_resolver.resolve_roles(auth) - assert len(roles) == 0 + async def test_resolve_roles_redhat_employee( + self, employee_resolver, employee_claims + ): + """Test role extraction for RedHat employee JWT.""" + assert "employee" in await employee_resolver.resolve_roles( + claims_to_auth_tuple(employee_claims) + ) - async def test_resolve_roles_match_operator_email_domain(self): - """Test role extraction using MATCH operator with email domain regex.""" - role_rules = [ - JwtRoleRule( - jsonpath="$.email", - operator=JsonPathOperator.MATCH, - value=r"@redhat\.com$", - roles=["redhat_employee"], + async def test_resolve_roles_no_match(self, employee_resolver, non_employee_claims): + """Test no roles extracted for non-RedHat employee JWT.""" + assert ( + len( + await employee_resolver.resolve_roles( + claims_to_auth_tuple(non_employee_claims) + ) ) - ] - jwt_resolver = JwtRolesResolver(role_rules) + == 0 + ) - jwt_claims = { - "exp": 1754489339, - "iat": 1754488439, - "sub": "f:123:employee@redhat.com", - "email": "employee@redhat.com", - } + async def test_negate_operator(self, employee_role_rule, non_employee_claims): + """Test role extraction with negated operator.""" + negated_rule = employee_role_rule + negated_rule.negate = True - auth = ("user", "token", False, claims_to_token(jwt_claims)) - roles = await jwt_resolver.resolve_roles(auth) - assert "redhat_employee" in roles + resolver = JwtRolesResolver([negated_rule]) - async def test_resolve_roles_match_operator_no_match(self): - """Test role extraction using MATCH operator with no match.""" - role_rules = [ - JwtRoleRule( - jsonpath="$.email", - operator=JsonPathOperator.MATCH, - value=r"@redhat\.com$", - roles=["redhat_employee"], - ) - ] - jwt_resolver = JwtRolesResolver(role_rules) + assert "employee" in await resolver.resolve_roles( + claims_to_auth_tuple(non_employee_claims) + ) - jwt_claims = { - "exp": 1754489339, - "iat": 1754488439, - "sub": "f:123:user@example.com", - "email": "user@example.com", - } + @pytest.fixture + async def email_rule_resolver(self): + """JwtRolesResolver with a rule for email domain.""" + return JwtRolesResolver( + [ + JwtRoleRule( + jsonpath="$.email", + operator=JsonPathOperator.MATCH, + value=r"@redhat\.com$", + roles=["redhat_employee"], + ) + ] + ) - auth = ("user", "token", False, claims_to_token(jwt_claims)) - roles = await jwt_resolver.resolve_roles(auth) - assert len(roles) == 0 + @pytest.fixture + async def equals_rule_resolver(self): + """JwtRolesResolver with a rule for exact email match.""" + return JwtRolesResolver( + [ + JwtRoleRule( + jsonpath="$.foo", + operator=JsonPathOperator.EQUALS, + value=["bar"], + roles=["foobar"], + ) + ] + ) + + async def test_resolve_roles_equals_operator( + self, equals_rule_resolver, employee_claims + ): + """Test role extraction using EQUALS operator.""" + assert "foobar" in await equals_rule_resolver.resolve_roles( + claims_to_auth_tuple(employee_claims) + ) + + @pytest.fixture + async def in_rule_resolver(self): + """JwtRolesResolver with a rule for IN operator.""" + return JwtRolesResolver( + [ + JwtRoleRule( + jsonpath="$.foo", + operator=JsonPathOperator.IN, + value=[["bar"], ["baz"]], + roles=["in_role"], + ) + ] + ) + + async def test_resolve_roles_in_operator(self, in_rule_resolver, employee_claims): + """Test role extraction using IN operator.""" + assert "in_role" in await in_rule_resolver.resolve_roles( + claims_to_auth_tuple(employee_claims) + ) + + async def test_resolve_roles_match_operator_email_domain( + self, email_rule_resolver, employee_claims + ): + """Test role extraction using MATCH operator with email domain regex.""" + assert "redhat_employee" in await email_rule_resolver.resolve_roles( + claims_to_auth_tuple(employee_claims) + ) + + async def test_resolve_roles_match_operator_no_match( + self, email_rule_resolver, non_employee_claims + ): + """Test role extraction using MATCH operator with no match.""" + assert ( + len( + await email_rule_resolver.resolve_roles( + claims_to_auth_tuple(non_employee_claims) + ) + ) + == 0 + ) async def test_resolve_roles_match_operator_invalid_regex(self): """Test that invalid regex patterns are rejected at rule creation time.""" @@ -192,10 +249,37 @@ async def test_compiled_regex_property(self): ) assert equals_rule.compiled_regex is None + async def test_resolve_roles_with_no_user_token(self, employee_resolver): + """Test NO_USER_TOKEN returns empty claims.""" + guest_tuple = ( + "user", + "username", + False, + constants.NO_USER_TOKEN, + ) + + with does_not_raise(): + # We don't truly care about the absence of roles, + # just that no exception is raised + assert len(await employee_resolver.resolve_roles(guest_tuple)) == 0 + class TestGenericAccessResolver: """Test cases for GenericAccessResolver.""" + @pytest.fixture + def admin_access_rules(self): + """Access rules with admin role for testing.""" + return [AccessRule(role="superuser", actions=[Action.ADMIN])] + + @pytest.fixture + def multi_role_access_rules(self): + """Access rules with multiple roles for testing.""" + return [ + AccessRule(role="user", actions=[Action.QUERY, Action.GET_MODELS]), + AccessRule(role="moderator", actions=[Action.FEEDBACK]), + ] + async def test_check_access_with_valid_role(self): """Test access check with valid role.""" access_rules = [ @@ -230,3 +314,29 @@ async def test_check_access_with_no_roles(self): has_access = resolver.check_access(Action.QUERY, set()) assert has_access is False + + def test_admin_action_with_other_actions_raises_error(self): + """Test admin action with others raises ValueError.""" + with pytest.raises(ValueError): + GenericAccessResolver( + [AccessRule(role="superuser", actions=[Action.ADMIN, Action.QUERY])] + ) + + def test_admin_role_allows_all_actions(self, admin_access_rules): + """Test admin action allows all actions via recursive check.""" + resolver = GenericAccessResolver(admin_access_rules) + assert resolver.check_access(Action.QUERY, {"superuser"}) is True + + def test_admin_get_actions_excludes_admin_action(self, admin_access_rules): + """Test get actions on a role with admin returns everything except ADMIN.""" + resolver = GenericAccessResolver(admin_access_rules) + actions = resolver.get_actions({"superuser"}) + assert Action.ADMIN not in actions + assert Action.QUERY in actions + assert len(actions) == len(set(Action)) - 1 + + def test_get_actions_for_regular_users(self, multi_role_access_rules): + """Test non-admin user gets only their specific actions.""" + resolver = GenericAccessResolver(multi_role_access_rules) + actions = resolver.get_actions({"user", "moderator"}) + assert actions == {Action.QUERY, Action.GET_MODELS, Action.FEEDBACK}