Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/auth/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,16 @@

from fastapi import Request

from constants import DEFAULT_USER_NAME, DEFAULT_USER_UID, NO_USER_TOKEN

UserID = str
UserName = str
Token = str

AuthTuple = tuple[UserID, UserName, Token]

NO_AUTH_TUPLE: AuthTuple = (DEFAULT_USER_UID, DEFAULT_USER_NAME, NO_USER_TOKEN)


class AuthInterface(ABC): # pylint: disable=too-few-public-methods
"""Base class for all authentication method implementations."""
Expand Down
8 changes: 5 additions & 3 deletions src/auth/jwk_token.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from constants import (
DEFAULT_VIRTUAL_PATH,
)
from auth.interface import AuthInterface
from auth.interface import NO_AUTH_TUPLE, AuthInterface, AuthTuple
from auth.utils import extract_user_token
from models.config import JwkConfiguration

Expand Down Expand Up @@ -120,10 +120,12 @@ def __init__(
self.virtual_path: str = virtual_path
self.config: JwkConfiguration = config

async def __call__(self, request: Request) -> tuple[str, str, str]:
async def __call__(self, request: Request) -> AuthTuple:
"""Authenticate the JWT in the headers against the keys from the JWK url."""
user_token = extract_user_token(request.headers)
if not request.headers.get("Authorization"):
return NO_AUTH_TUPLE

user_token = extract_user_token(request.headers)
jwk_set = await get_jwk_set(str(self.config.url))

try:
Expand Down
5 changes: 5 additions & 0 deletions src/authorization/resolvers.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

from auth.interface import AuthTuple
from models.config import JwtRoleRule, AccessRule, JsonPathOperator, Action
import constants

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -71,6 +72,10 @@ def evaluate_role_rules(rule: JwtRoleRule, jwt_claims: dict[str, Any]) -> UserRo
def _get_claims(auth: AuthTuple) -> dict[str, Any]:
"""Get the JWT claims from the auth tuple."""
_, _, token = auth
if token == constants.NO_USER_TOKEN:
# No claims for guests
return {}

jwt_claims = json.loads(token)

if not jwt_claims:
Expand Down
9 changes: 5 additions & 4 deletions tests/unit/auth/test_jwk_token.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from authlib.jose import JsonWebKey, JsonWebToken

from auth.jwk_token import JwkTokenAuthDependency, _jwk_cache
from constants import DEFAULT_USER_NAME, DEFAULT_USER_UID, NO_USER_TOKEN
from models.config import JwkConfiguration, JwtConfiguration

TEST_USER_ID = "test-user-123"
Expand Down Expand Up @@ -267,11 +268,11 @@ async def test_no_auth_header(

dependency = JwkTokenAuthDependency(default_jwk_configuration)

with pytest.raises(HTTPException) as exc_info:
await dependency(no_token_request)
user_id, username, token_claims = await dependency(no_token_request)

assert exc_info.value.status_code == 400
assert exc_info.value.detail == "No Authorization header found"
assert user_id == DEFAULT_USER_UID
assert username == DEFAULT_USER_NAME
assert token_claims == NO_USER_TOKEN


async def test_no_bearer(
Expand Down