diff --git a/examples/clients/simple-auth-client/mcp_simple_auth_client/main.py b/examples/clients/simple-auth-client/mcp_simple_auth_client/main.py index 39c69501d..516203435 100644 --- a/examples/clients/simple-auth-client/mcp_simple_auth_client/main.py +++ b/examples/clients/simple-auth-client/mcp_simple_auth_client/main.py @@ -177,7 +177,6 @@ async def callback_handler() -> tuple[str, str | None]: "redirect_uris": ["http://localhost:3030/callback"], "grant_types": ["authorization_code", "refresh_token"], "response_types": ["code"], - "token_endpoint_auth_method": "client_secret_post", } async def _default_redirect_handler(authorization_url: str) -> None: diff --git a/src/mcp/client/auth.py b/src/mcp/client/auth.py index 376036e8c..e24e85d08 100644 --- a/src/mcp/client/auth.py +++ b/src/mcp/client/auth.py @@ -14,7 +14,7 @@ from collections.abc import AsyncGenerator, Awaitable, Callable from dataclasses import dataclass, field from typing import Protocol -from urllib.parse import urlencode, urljoin, urlparse +from urllib.parse import quote, urlencode, urljoin, urlparse import anyio import httpx @@ -175,6 +175,42 @@ def should_include_resource_param(self, protocol_version: str | None = None) -> # Version format is YYYY-MM-DD, so string comparison works return protocol_version >= "2025-06-18" + def prepare_token_auth( + self, data: dict[str, str], headers: dict[str, str] | None = None + ) -> tuple[dict[str, str], dict[str, str]]: + """Prepare authentication for token requests. + + Args: + data: The form data to send + headers: Optional headers dict to update + + Returns: + Tuple of (updated_data, updated_headers) + """ + if headers is None: + headers = {} + + if not self.client_info: + return data, headers + + auth_method = self.client_info.token_endpoint_auth_method + + if auth_method == "client_secret_basic" and self.client_info.client_secret: + # URL-encode client ID and secret per RFC 6749 Section 2.3.1 + encoded_id = quote(self.client_info.client_id, safe="") + encoded_secret = quote(self.client_info.client_secret, safe="") + credentials = f"{encoded_id}:{encoded_secret}" + encoded_credentials = base64.b64encode(credentials.encode()).decode() + headers["Authorization"] = f"Basic {encoded_credentials}" + # Don't include client_secret in body for basic auth + data = {k: v for k, v in data.items() if k != "client_secret"} + elif auth_method == "client_secret_post" and self.client_info.client_secret: + # Include client_secret in request body + data["client_secret"] = self.client_info.client_secret + # For auth_method == "none", don't add any client_secret + + return data, headers + class OAuthClientProvider(httpx.Auth): """ @@ -291,6 +327,27 @@ async def _register_client(self) -> httpx.Request | None: registration_data = self.context.client_metadata.model_dump(by_alias=True, mode="json", exclude_none=True) + # If token_endpoint_auth_method is None, auto-select based on server support + if self.context.client_metadata.token_endpoint_auth_method is None: + preference_order = ["client_secret_basic", "client_secret_post", "none"] + + if self.context.oauth_metadata and self.context.oauth_metadata.token_endpoint_auth_methods_supported: + supported = self.context.oauth_metadata.token_endpoint_auth_methods_supported + for method in preference_order: + if method in supported: + registration_data["token_endpoint_auth_method"] = method + break + else: + # No compatible methods between client and server + raise OAuthRegistrationError( + f"No compatible authentication methods. " + f"Server supports: {supported}, " + f"Client supports: {preference_order}" + ) + else: + # No server metadata available, use our default preference + registration_data["token_endpoint_auth_method"] = preference_order[0] + return httpx.Request( "POST", registration_url, json=registration_data, headers={"Content-Type": "application/json"} ) @@ -378,12 +435,11 @@ async def _exchange_token(self, auth_code: str, code_verifier: str) -> httpx.Req if self.context.should_include_resource_param(self.context.protocol_version): token_data["resource"] = self.context.get_resource_url() # RFC 8707 - if self.context.client_info.client_secret: - token_data["client_secret"] = self.context.client_info.client_secret + # Prepare authentication based on preferred method + headers = {"Content-Type": "application/x-www-form-urlencoded"} + token_data, headers = self.context.prepare_token_auth(token_data, headers) - return httpx.Request( - "POST", token_url, data=token_data, headers={"Content-Type": "application/x-www-form-urlencoded"} - ) + return httpx.Request("POST", token_url, data=token_data, headers=headers) async def _handle_token_response(self, response: httpx.Response) -> None: """Handle token exchange response.""" @@ -432,12 +488,11 @@ async def _refresh_token(self) -> httpx.Request: if self.context.should_include_resource_param(self.context.protocol_version): refresh_data["resource"] = self.context.get_resource_url() # RFC 8707 - if self.context.client_info.client_secret: - refresh_data["client_secret"] = self.context.client_info.client_secret + # Prepare authentication based on preferred method + headers = {"Content-Type": "application/x-www-form-urlencoded"} + refresh_data, headers = self.context.prepare_token_auth(refresh_data, headers) - return httpx.Request( - "POST", token_url, data=refresh_data, headers={"Content-Type": "application/x-www-form-urlencoded"} - ) + return httpx.Request("POST", token_url, data=refresh_data, headers=headers) async def _handle_refresh_response(self, response: httpx.Response) -> bool: """Handle token refresh response. Returns True if successful.""" diff --git a/src/mcp/server/auth/handlers/register.py b/src/mcp/server/auth/handlers/register.py index 93720340a..4522c481a 100644 --- a/src/mcp/server/auth/handlers/register.py +++ b/src/mcp/server/auth/handlers/register.py @@ -49,6 +49,11 @@ async def handle(self, request: Request) -> Response: ) client_id = str(uuid4()) + + # If auth method is None, default to client_secret_post + if client_metadata.token_endpoint_auth_method is None: + client_metadata.token_endpoint_auth_method = "client_secret_post" + client_secret = None if client_metadata.token_endpoint_auth_method != "none": # cryptographically secure random 32-byte hex string diff --git a/src/mcp/server/auth/handlers/revoke.py b/src/mcp/server/auth/handlers/revoke.py index 478ad7a01..c71795319 100644 --- a/src/mcp/server/auth/handlers/revoke.py +++ b/src/mcp/server/auth/handlers/revoke.py @@ -40,28 +40,25 @@ async def handle(self, request: Request) -> Response: Handler for the OAuth 2.0 Token Revocation endpoint. """ try: - form_data = await request.form() - revocation_request = RevocationRequest.model_validate(dict(form_data)) - except ValidationError as e: + client = await self.client_authenticator.authenticate_request(request) + except AuthenticationError as e: return PydanticJSONResponse( - status_code=400, + status_code=401, content=RevocationErrorResponse( - error="invalid_request", - error_description=stringify_pydantic_error(e), + error="unauthorized_client", + error_description=e.message, ), ) - # Authenticate client try: - client = await self.client_authenticator.authenticate( - revocation_request.client_id, revocation_request.client_secret - ) - except AuthenticationError as e: + form_data = await request.form() + revocation_request = RevocationRequest.model_validate(dict(form_data)) + except ValidationError as e: return PydanticJSONResponse( - status_code=401, + status_code=400, content=RevocationErrorResponse( - error="unauthorized_client", - error_description=e.message, + error="invalid_request", + error_description=stringify_pydantic_error(e), ), ) diff --git a/src/mcp/server/auth/handlers/token.py b/src/mcp/server/auth/handlers/token.py index 4e15e6265..80c58db77 100644 --- a/src/mcp/server/auth/handlers/token.py +++ b/src/mcp/server/auth/handlers/token.py @@ -91,6 +91,22 @@ def response(self, obj: TokenSuccessResponse | TokenErrorResponse): ) async def handle(self, request: Request): + try: + client_info = await self.client_authenticator.authenticate_request(request) + except AuthenticationError as e: + # Authentication failures should return 401 + return PydanticJSONResponse( + content=TokenErrorResponse( + error="unauthorized_client", + error_description=e.message, + ), + status_code=401, + headers={ + "Cache-Control": "no-store", + "Pragma": "no-cache", + }, + ) + try: form_data = await request.form() token_request = TokenRequest.model_validate(dict(form_data)).root @@ -102,19 +118,6 @@ async def handle(self, request: Request): ) ) - try: - client_info = await self.client_authenticator.authenticate( - client_id=token_request.client_id, - client_secret=token_request.client_secret, - ) - except AuthenticationError as e: - return self.response( - TokenErrorResponse( - error="unauthorized_client", - error_description=e.message, - ) - ) - if token_request.grant_type not in client_info.grant_types: return self.response( TokenErrorResponse( diff --git a/src/mcp/server/auth/middleware/client_auth.py b/src/mcp/server/auth/middleware/client_auth.py index d5f473b48..b774cafba 100644 --- a/src/mcp/server/auth/middleware/client_auth.py +++ b/src/mcp/server/auth/middleware/client_auth.py @@ -1,5 +1,11 @@ +import base64 +import binascii +import hmac import time from typing import Any +from urllib.parse import unquote + +from starlette.requests import Request from mcp.server.auth.provider import OAuthAuthorizationServerProvider from mcp.shared.auth import OAuthClientInformationFull @@ -30,19 +36,73 @@ def __init__(self, provider: OAuthAuthorizationServerProvider[Any, Any, Any]): """ self.provider = provider - async def authenticate(self, client_id: str, client_secret: str | None) -> OAuthClientInformationFull: - # Look up client information - client = await self.provider.get_client(client_id) + async def authenticate_request(self, request: Request) -> OAuthClientInformationFull: + """ + Authenticate a client from an HTTP request. + + Extracts client credentials from the appropriate location based on the + client's registered authentication method and validates them. + + Args: + request: The HTTP request containing client credentials + + Returns: + The authenticated client information + + Raises: + AuthenticationError: If authentication fails + """ + form_data = await request.form() + client_id = form_data.get("client_id") + if not client_id: + raise AuthenticationError("Missing client_id") + + client = await self.provider.get_client(str(client_id)) if not client: raise AuthenticationError("Invalid client_id") - # If client from the store expects a secret, validate that the request provides - # that secret + request_client_secret: str | None = None + auth_header = request.headers.get("Authorization", "") + + if client.token_endpoint_auth_method == "client_secret_basic": + if not auth_header.startswith("Basic "): + raise AuthenticationError("Missing or invalid Basic authentication in Authorization header") + + try: + encoded_credentials = auth_header[6:] # Remove "Basic " prefix + decoded = base64.b64decode(encoded_credentials).decode("utf-8") + if ":" not in decoded: + raise ValueError("Invalid Basic auth format") + basic_client_id, request_client_secret = decoded.split(":", 1) + + # URL-decode both parts per RFC 6749 Section 2.3.1 + basic_client_id = unquote(basic_client_id) + request_client_secret = unquote(request_client_secret) + + if basic_client_id != client_id: + raise AuthenticationError("Client ID mismatch in Basic auth") + except (ValueError, UnicodeDecodeError, binascii.Error): + raise AuthenticationError("Invalid Basic authentication header") + + elif client.token_endpoint_auth_method == "client_secret_post": + raw_form_data = form_data.get("client_secret") + # form_data.get() can return a UploadFile or None, so we need to check if it's a string + if isinstance(raw_form_data, str): + request_client_secret = str(raw_form_data) + + elif client.token_endpoint_auth_method == "none": + request_client_secret = None + else: + raise AuthenticationError(f"Unsupported auth method: {client.token_endpoint_auth_method}") + if client.client_secret: - if not client_secret: + if not request_client_secret: raise AuthenticationError("Client secret is required") - if client.client_secret != client_secret: + # hmac.compare_digest requires that both arguments are either bytes or a `str` containing + # only ASCII characters. Since we do not control `request_client_secret`, we encode both + # arguments to bytes. + if not hmac.compare_digest(client.client_secret.encode(), request_client_secret.encode()): raise AuthenticationError("Invalid client_secret") if client.client_secret_expires_at and client.client_secret_expires_at < int(time.time()): diff --git a/src/mcp/server/auth/routes.py b/src/mcp/server/auth/routes.py index bce32df52..e20db5a54 100644 --- a/src/mcp/server/auth/routes.py +++ b/src/mcp/server/auth/routes.py @@ -164,7 +164,7 @@ def build_metadata( response_types_supported=["code"], response_modes_supported=None, grant_types_supported=["authorization_code", "refresh_token"], - token_endpoint_auth_methods_supported=["client_secret_post"], + token_endpoint_auth_methods_supported=["client_secret_post", "client_secret_basic"], token_endpoint_auth_signing_alg_values_supported=None, service_documentation=service_documentation_url, ui_locales_supported=None, @@ -181,7 +181,7 @@ def build_metadata( # Add revocation endpoint if supported if revocation_options.enabled: metadata.revocation_endpoint = AnyHttpUrl(str(issuer_url).rstrip("/") + REVOCATION_PATH) - metadata.revocation_endpoint_auth_methods_supported = ["client_secret_post"] + metadata.revocation_endpoint_auth_methods_supported = ["client_secret_post", "client_secret_basic"] return metadata diff --git a/src/mcp/shared/auth.py b/src/mcp/shared/auth.py index b7f048bba..b3df92fe7 100644 --- a/src/mcp/shared/auth.py +++ b/src/mcp/shared/auth.py @@ -42,10 +42,7 @@ class OAuthClientMetadata(BaseModel): """ redirect_uris: list[AnyUrl] = Field(..., min_length=1) - # token_endpoint_auth_method: this implementation only supports none & - # client_secret_post; - # ie: we do not support client_secret_basic - token_endpoint_auth_method: Literal["none", "client_secret_post"] = "client_secret_post" + token_endpoint_auth_method: Literal["none", "client_secret_post", "client_secret_basic"] | None = None # grant_types: this implementation only supports authorization_code & refresh_token grant_types: list[Literal["authorization_code", "refresh_token"] | str] = [ "authorization_code", diff --git a/tests/client/test_auth.py b/tests/client/test_auth.py index 6e58e496d..23c76f0f2 100644 --- a/tests/client/test_auth.py +++ b/tests/client/test_auth.py @@ -2,16 +2,25 @@ Tests for refactored OAuth client authentication implementation. """ +import base64 +import json import time from unittest import mock +from urllib.parse import unquote import httpx import pytest from inline_snapshot import Is, snapshot from pydantic import AnyHttpUrl, AnyUrl -from mcp.client.auth import OAuthClientProvider, PKCEParameters -from mcp.shared.auth import OAuthClientInformationFull, OAuthClientMetadata, OAuthToken, ProtectedResourceMetadata +from mcp.client.auth import OAuthClientProvider, OAuthRegistrationError, PKCEParameters +from mcp.shared.auth import ( + OAuthClientInformationFull, + OAuthClientMetadata, + OAuthMetadata, + OAuthToken, + ProtectedResourceMetadata, +) class MockTokenStorage: @@ -415,6 +424,43 @@ async def test_register_client_skip_if_registered(self, oauth_provider: OAuthCli request = await oauth_provider._register_client() assert request is None + @pytest.mark.anyio + async def test_register_client_none_auth_method_with_server_metadata(self, oauth_provider: OAuthClientProvider): + """Test that token_endpoint_auth_method=None selects from server's supported methods.""" + # Set server metadata with specific supported methods + oauth_provider.context.oauth_metadata = OAuthMetadata( + issuer=AnyHttpUrl("https://auth.example.com"), + authorization_endpoint=AnyHttpUrl("https://auth.example.com/authorize"), + token_endpoint=AnyHttpUrl("https://auth.example.com/token"), + token_endpoint_auth_methods_supported=["client_secret_post"], + ) + # Ensure client_metadata has None for token_endpoint_auth_method + assert oauth_provider.context.client_metadata.token_endpoint_auth_method is None + + request = await oauth_provider._register_client() + assert request is not None + + body = json.loads(request.content) + assert body["token_endpoint_auth_method"] == "client_secret_post" + + @pytest.mark.anyio + async def test_register_client_none_auth_method_no_compatible(self, oauth_provider: OAuthClientProvider): + """Test that registration raises error when no compatible auth methods.""" + # Set server metadata with unsupported methods only + oauth_provider.context.oauth_metadata = OAuthMetadata( + issuer=AnyHttpUrl("https://auth.example.com"), + authorization_endpoint=AnyHttpUrl("https://auth.example.com/authorize"), + token_endpoint=AnyHttpUrl("https://auth.example.com/token"), + token_endpoint_auth_methods_supported=["private_key_jwt", "client_secret_jwt"], + ) + assert oauth_provider.context.client_metadata.token_endpoint_auth_method is None + + with pytest.raises(OAuthRegistrationError) as exc_info: + await oauth_provider._register_client() + + assert "No compatible authentication methods" in str(exc_info.value) + assert "private_key_jwt" in str(exc_info.value) + @pytest.mark.anyio async def test_token_exchange_request(self, oauth_provider: OAuthClientProvider): """Test token exchange request building.""" @@ -423,6 +469,7 @@ async def test_token_exchange_request(self, oauth_provider: OAuthClientProvider) client_id="test_client", client_secret="test_secret", redirect_uris=[AnyUrl("http://localhost:3030/callback")], + token_endpoint_auth_method="client_secret_post", ) request = await oauth_provider._exchange_token("test_auth_code", "test_verifier") @@ -448,6 +495,7 @@ async def test_refresh_token_request(self, oauth_provider: OAuthClientProvider, client_id="test_client", client_secret="test_secret", redirect_uris=[AnyUrl("http://localhost:3030/callback")], + token_endpoint_auth_method="client_secret_post", ) request = await oauth_provider._refresh_token() @@ -463,6 +511,114 @@ async def test_refresh_token_request(self, oauth_provider: OAuthClientProvider, assert "client_id=test_client" in content assert "client_secret=test_secret" in content + @pytest.mark.anyio + async def test_basic_auth_token_exchange(self, oauth_provider: OAuthClientProvider): + """Test token exchange with client_secret_basic authentication.""" + # Set up OAuth metadata to support basic auth + oauth_provider.context.oauth_metadata = OAuthMetadata( + issuer=AnyHttpUrl("https://auth.example.com"), + authorization_endpoint=AnyHttpUrl("https://auth.example.com/authorize"), + token_endpoint=AnyHttpUrl("https://auth.example.com/token"), + token_endpoint_auth_methods_supported=["client_secret_basic", "client_secret_post"], + ) + + client_id_raw = "test@client" # Include special character to test URL encoding + client_secret_raw = "test:secret" # Include colon to test URL encoding + + oauth_provider.context.client_info = OAuthClientInformationFull( + client_id=client_id_raw, + client_secret=client_secret_raw, + redirect_uris=[AnyUrl("http://localhost:3030/callback")], + token_endpoint_auth_method="client_secret_basic", + ) + + request = await oauth_provider._exchange_token("test_auth_code", "test_verifier") + + # Should use basic auth (registered method) + assert "Authorization" in request.headers + assert request.headers["Authorization"].startswith("Basic ") + + # Decode and verify credentials are properly URL-encoded + encoded_creds = request.headers["Authorization"][6:] # Remove "Basic " prefix + decoded = base64.b64decode(encoded_creds).decode() + client_id, client_secret = decoded.split(":", 1) + + # Check URL encoding was applied + assert client_id == "test%40client" # @ should be encoded as %40 + assert client_secret == "test%3Asecret" # : should be encoded as %3A + + # Verify decoded values match original + assert unquote(client_id) == client_id_raw + assert unquote(client_secret) == client_secret_raw + + # client_secret should NOT be in body for basic auth + content = request.content.decode() + assert "client_secret=" not in content + assert "client_id=test%40client" in content # client_id still in body + + @pytest.mark.anyio + async def test_basic_auth_refresh_token(self, oauth_provider: OAuthClientProvider, valid_tokens: OAuthToken): + """Test token refresh with client_secret_basic authentication.""" + oauth_provider.context.current_tokens = valid_tokens + + # Set up OAuth metadata to only support basic auth + oauth_provider.context.oauth_metadata = OAuthMetadata( + issuer=AnyHttpUrl("https://auth.example.com"), + authorization_endpoint=AnyHttpUrl("https://auth.example.com/authorize"), + token_endpoint=AnyHttpUrl("https://auth.example.com/token"), + token_endpoint_auth_methods_supported=["client_secret_basic"], + ) + + client_id = "test_client" + client_secret = "test_secret" + oauth_provider.context.client_info = OAuthClientInformationFull( + client_id=client_id, + client_secret=client_secret, + redirect_uris=[AnyUrl("http://localhost:3030/callback")], + token_endpoint_auth_method="client_secret_basic", + ) + + request = await oauth_provider._refresh_token() + + assert "Authorization" in request.headers + assert request.headers["Authorization"].startswith("Basic ") + + encoded_creds = request.headers["Authorization"][6:] + decoded = base64.b64decode(encoded_creds).decode() + assert decoded == f"{client_id}:{client_secret}" + + # client_secret should NOT be in body + content = request.content.decode() + assert "client_secret=" not in content + + @pytest.mark.anyio + async def test_none_auth_method(self, oauth_provider: OAuthClientProvider): + """Test 'none' authentication method (public client).""" + oauth_provider.context.oauth_metadata = OAuthMetadata( + issuer=AnyHttpUrl("https://auth.example.com"), + authorization_endpoint=AnyHttpUrl("https://auth.example.com/authorize"), + token_endpoint=AnyHttpUrl("https://auth.example.com/token"), + token_endpoint_auth_methods_supported=["none"], + ) + + client_id = "public_client" + oauth_provider.context.client_info = OAuthClientInformationFull( + client_id=client_id, + client_secret=None, # No secret for public client + redirect_uris=[AnyUrl("http://localhost:3030/callback")], + token_endpoint_auth_method="none", + ) + + request = await oauth_provider._exchange_token("test_auth_code", "test_verifier") + + # Should NOT have Authorization header + assert "Authorization" not in request.headers + + # Should NOT have client_secret in body + content = request.content.decode() + assert "client_secret=" not in content + assert "client_id=public_client" in content + class TestProtectedResourceMetadata: """Test protected resource handling.""" @@ -832,10 +988,10 @@ def test_build_metadata( "registration_endpoint": Is(registration_endpoint), "scopes_supported": ["read", "write", "admin"], "grant_types_supported": ["authorization_code", "refresh_token"], - "token_endpoint_auth_methods_supported": ["client_secret_post"], + "token_endpoint_auth_methods_supported": ["client_secret_post", "client_secret_basic"], "service_documentation": Is(service_documentation_url), "revocation_endpoint": Is(revocation_endpoint), - "revocation_endpoint_auth_methods_supported": ["client_secret_post"], + "revocation_endpoint_auth_methods_supported": ["client_secret_post", "client_secret_basic"], "code_challenge_methods_supported": ["S256"], } ) diff --git a/tests/server/fastmcp/auth/test_auth_integration.py b/tests/server/fastmcp/auth/test_auth_integration.py index fa33fbf43..bf24f8e04 100644 --- a/tests/server/fastmcp/auth/test_auth_integration.py +++ b/tests/server/fastmcp/auth/test_auth_integration.py @@ -12,7 +12,7 @@ import httpx import pytest -from pydantic import AnyHttpUrl +from pydantic import AnyHttpUrl, AnyUrl from starlette.applications import Starlette from mcp.server.auth.provider import ( @@ -354,7 +354,7 @@ async def test_metadata_endpoint(self, test_client: httpx.AsyncClient): assert metadata["revocation_endpoint"] == "https://auth.example.com/revoke" assert metadata["response_types_supported"] == ["code"] assert metadata["code_challenge_methods_supported"] == ["S256"] - assert metadata["token_endpoint_auth_methods_supported"] == ["client_secret_post"] + assert metadata["token_endpoint_auth_methods_supported"] == ["client_secret_post", "client_secret_basic"] assert metadata["grant_types_supported"] == [ "authorization_code", "refresh_token", @@ -373,8 +373,8 @@ async def test_token_validation_error(self, test_client: httpx.AsyncClient): }, ) error_response = response.json() - assert error_response["error"] == "invalid_request" - assert "error_description" in error_response # Contains validation error messages + assert error_response["error"] == "unauthorized_client" + assert "error_description" in error_response # Contains error message @pytest.mark.anyio async def test_token_invalid_auth_code( @@ -1010,6 +1010,147 @@ async def test_client_registration_default_response_types( assert "response_types" in data assert data["response_types"] == ["code"] + @pytest.mark.anyio + async def test_client_secret_basic_authentication( + self, test_client: httpx.AsyncClient, mock_oauth_provider: MockOAuthProvider, pkce_challenge: dict[str, str] + ): + """Test that client_secret_basic authentication works correctly.""" + client_metadata = { + "redirect_uris": ["https://client.example.com/callback"], + "client_name": "Basic Auth Client", + "token_endpoint_auth_method": "client_secret_basic", + "grant_types": ["authorization_code", "refresh_token"], + } + + response = await test_client.post("/register", json=client_metadata) + assert response.status_code == 201 + client_info = response.json() + assert client_info["token_endpoint_auth_method"] == "client_secret_basic" + + auth_code = f"code_{int(time.time())}" + mock_oauth_provider.auth_codes[auth_code] = AuthorizationCode( + code=auth_code, + client_id=client_info["client_id"], + code_challenge=pkce_challenge["code_challenge"], + redirect_uri=AnyUrl("https://client.example.com/callback"), + redirect_uri_provided_explicitly=True, + scopes=["read", "write"], + expires_at=time.time() + 600, + ) + + credentials = f"{client_info['client_id']}:{client_info['client_secret']}" + encoded_credentials = base64.b64encode(credentials.encode()).decode() + + response = await test_client.post( + "/token", + headers={"Authorization": f"Basic {encoded_credentials}"}, + data={ + "grant_type": "authorization_code", + "client_id": client_info["client_id"], + "code": auth_code, + "code_verifier": pkce_challenge["code_verifier"], + "redirect_uri": "https://client.example.com/callback", + }, + ) + assert response.status_code == 200 + token_response = response.json() + assert "access_token" in token_response + + @pytest.mark.anyio + async def test_wrong_auth_method_without_valid_credentials_fails( + self, test_client: httpx.AsyncClient, mock_oauth_provider: MockOAuthProvider, pkce_challenge: dict[str, str] + ): + """Test that using the wrong authentication method fails when credentials are missing.""" + client_metadata = { + "redirect_uris": ["https://client.example.com/callback"], + "client_name": "Post Auth Client", + "token_endpoint_auth_method": "client_secret_post", + "grant_types": ["authorization_code", "refresh_token"], + } + + response = await test_client.post("/register", json=client_metadata) + assert response.status_code == 201 + client_info = response.json() + assert client_info["token_endpoint_auth_method"] == "client_secret_post" + + auth_code = f"code_{int(time.time())}" + mock_oauth_provider.auth_codes[auth_code] = AuthorizationCode( + code=auth_code, + client_id=client_info["client_id"], + code_challenge=pkce_challenge["code_challenge"], + redirect_uri=AnyUrl("https://client.example.com/callback"), + redirect_uri_provided_explicitly=True, + scopes=["read", "write"], + expires_at=time.time() + 600, + ) + + # Try to use Basic auth when client_secret_post is registered (without secret in body) + # This should fail because the secret is missing from the expected location + + credentials = f"{client_info['client_id']}:{client_info['client_secret']}" + encoded_credentials = base64.b64encode(credentials.encode()).decode() + + response = await test_client.post( + "/token", + headers={"Authorization": f"Basic {encoded_credentials}"}, + data={ + "grant_type": "authorization_code", + "client_id": client_info["client_id"], + # client_secret NOT in body where it should be + "code": auth_code, + "code_verifier": pkce_challenge["code_verifier"], + "redirect_uri": "https://client.example.com/callback", + }, + ) + assert response.status_code == 401 + error_response = response.json() + assert error_response["error"] == "unauthorized_client" + assert "Client secret is required" in error_response["error_description"] + + @pytest.mark.anyio + async def test_basic_auth_without_header_fails( + self, test_client: httpx.AsyncClient, mock_oauth_provider: MockOAuthProvider, pkce_challenge: dict[str, str] + ): + """Test that omitting Basic auth when client_secret_basic is registered fails.""" + client_metadata = { + "redirect_uris": ["https://client.example.com/callback"], + "client_name": "Basic Auth Client", + "token_endpoint_auth_method": "client_secret_basic", + "grant_types": ["authorization_code", "refresh_token"], + } + + response = await test_client.post("/register", json=client_metadata) + assert response.status_code == 201 + client_info = response.json() + assert client_info["token_endpoint_auth_method"] == "client_secret_basic" + + auth_code = f"code_{int(time.time())}" + mock_oauth_provider.auth_codes[auth_code] = AuthorizationCode( + code=auth_code, + client_id=client_info["client_id"], + code_challenge=pkce_challenge["code_challenge"], + redirect_uri=AnyUrl("https://client.example.com/callback"), + redirect_uri_provided_explicitly=True, + scopes=["read", "write"], + expires_at=time.time() + 600, + ) + + response = await test_client.post( + "/token", + data={ + "grant_type": "authorization_code", + "client_id": client_info["client_id"], + "client_secret": client_info["client_secret"], # Secret in body (ignored) + "code": auth_code, + "code_verifier": pkce_challenge["code_verifier"], + "redirect_uri": "https://client.example.com/callback", + }, + ) + assert response.status_code == 401 + error_response = response.json() + assert error_response["error"] == "unauthorized_client" + assert "Missing or invalid Basic authentication" in error_response["error_description"] + class TestAuthorizeEndpointErrors: """Test error handling in the OAuth authorization endpoint."""