Skip to content
15 changes: 13 additions & 2 deletions examples/clients/simple-auth-client/mcp_simple_auth_client/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,9 +150,15 @@ def get_state(self):
class SimpleAuthClient:
"""Simple MCP client with auth support."""

def __init__(self, server_url: str, transport_type: str = "streamable-http"):
def __init__(
self,
server_url: str,
transport_type: str = "streamable-http",
client_metadata_url: str | None = None,
):
self.server_url = server_url
self.transport_type = transport_type
self.client_metadata_url = client_metadata_url
self.session: ClientSession | None = None

async def connect(self):
Expand Down Expand Up @@ -185,12 +191,14 @@ async def _default_redirect_handler(authorization_url: str) -> None:
webbrowser.open(authorization_url)

# Create OAuth authentication handler using the new interface
# Use client_metadata_url to enable CIMD when the server supports it
oauth_auth = OAuthClientProvider(
server_url=self.server_url,
client_metadata=OAuthClientMetadata.model_validate(client_metadata_dict),
storage=InMemoryTokenStorage(),
redirect_handler=_default_redirect_handler,
callback_handler=callback_handler,
client_metadata_url=self.client_metadata_url,
)

# Create transport with auth handler based on transport type
Expand Down Expand Up @@ -334,6 +342,7 @@ async def main():
# Most MCP streamable HTTP servers use /mcp as the endpoint
server_url = os.getenv("MCP_SERVER_PORT", 8000)
transport_type = os.getenv("MCP_TRANSPORT_TYPE", "streamable-http")
client_metadata_url = os.getenv("MCP_CLIENT_METADATA_URL")
server_url = (
f"http://localhost:{server_url}/mcp"
if transport_type == "streamable-http"
Expand All @@ -343,9 +352,11 @@ async def main():
print("🚀 Simple MCP Auth Client")
print(f"Connecting to: {server_url}")
print(f"Transport type: {transport_type}")
if client_metadata_url:
print(f"Client metadata URL: {client_metadata_url}")

# Start connection flow - OAuth will be handled automatically
client = SimpleAuthClient(server_url, transport_type)
client = SimpleAuthClient(server_url, transport_type, client_metadata_url)
await client.connect()


Expand Down
64 changes: 53 additions & 11 deletions src/mcp/client/auth/oauth2.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from mcp.client.auth.utils import (
build_oauth_authorization_server_metadata_discovery_urls,
build_protected_resource_metadata_discovery_urls,
create_client_info_from_metadata_url,
create_client_registration_request,
create_oauth_metadata_request,
extract_field_from_www_auth,
Expand All @@ -33,6 +34,8 @@
handle_protected_resource_response,
handle_registration_response,
handle_token_response_scopes,
is_valid_client_metadata_url,
should_use_client_metadata_url,
)
from mcp.client.streamable_http import MCP_PROTOCOL_VERSION
from mcp.shared.auth import (
Expand Down Expand Up @@ -96,6 +99,7 @@ class OAuthContext:
redirect_handler: Callable[[str], Awaitable[None]] | None
callback_handler: Callable[[], Awaitable[tuple[str, str | None]]] | None
timeout: float = 300.0
client_metadata_url: str | None = None

# Discovered metadata
protected_resource_metadata: ProtectedResourceMetadata | None = None
Expand Down Expand Up @@ -226,15 +230,40 @@ def __init__(
redirect_handler: Callable[[str], Awaitable[None]] | None = None,
callback_handler: Callable[[], Awaitable[tuple[str, str | None]]] | None = None,
timeout: float = 300.0,
client_metadata_url: str | None = None,
):
"""Initialize OAuth2 authentication."""
"""Initialize OAuth2 authentication.
Args:
server_url: The MCP server URL.
client_metadata: OAuth client metadata for registration.
storage: Token storage implementation.
redirect_handler: Handler for authorization redirects.
callback_handler: Handler for authorization callbacks.
timeout: Timeout for the OAuth flow.
client_metadata_url: URL-based client ID. When provided and the server
advertises client_id_metadata_document_supported=true, this URL will be
used as the client_id instead of performing dynamic client registration.
Must be a valid HTTPS URL with a non-root pathname.
Raises:
ValueError: If client_metadata_url is provided but not a valid HTTPS URL
with a non-root pathname.
"""
# Validate client_metadata_url if provided
if client_metadata_url is not None and not is_valid_client_metadata_url(client_metadata_url):
raise ValueError(
f"client_metadata_url must be a valid HTTPS URL with a non-root pathname, got: {client_metadata_url}"
)

self.context = OAuthContext(
server_url=server_url,
client_metadata=client_metadata,
storage=storage,
redirect_handler=redirect_handler,
callback_handler=callback_handler,
timeout=timeout,
client_metadata_url=client_metadata_url,
)
self._initialized = False

Expand Down Expand Up @@ -566,17 +595,30 @@ async def async_auth_flow(self, request: httpx.Request) -> AsyncGenerator[httpx.
self.context.oauth_metadata,
)

# Step 4: Register client if needed
registration_request = create_client_registration_request(
self.context.oauth_metadata,
self.context.client_metadata,
self.context.get_authorization_base_url(self.context.server_url),
)
# Step 4: Register client or use URL-based client ID (CIMD)
if not self.context.client_info:
registration_response = yield registration_request
client_information = await handle_registration_response(registration_response)
self.context.client_info = client_information
await self.context.storage.set_client_info(client_information)
if should_use_client_metadata_url(
self.context.oauth_metadata, self.context.client_metadata_url
):
# Use URL-based client ID (CIMD)
logger.debug(f"Using URL-based client ID (CIMD): {self.context.client_metadata_url}")
client_information = create_client_info_from_metadata_url(
self.context.client_metadata_url, # type: ignore[arg-type]
redirect_uris=self.context.client_metadata.redirect_uris,
)
self.context.client_info = client_information
await self.context.storage.set_client_info(client_information)
else:
# Fallback to Dynamic Client Registration
registration_request = create_client_registration_request(
self.context.oauth_metadata,
self.context.client_metadata,
self.context.get_authorization_base_url(self.context.server_url),
)
registration_response = yield registration_request
client_information = await handle_registration_response(registration_response)
self.context.client_info = client_information
await self.context.storage.set_client_info(client_information)

# Step 5: Perform authorization and complete token exchange
token_response = yield await self._perform_authorization()
Expand Down
71 changes: 70 additions & 1 deletion src/mcp/client/auth/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from urllib.parse import urljoin, urlparse

from httpx import Request, Response
from pydantic import ValidationError
from pydantic import AnyUrl, ValidationError

from mcp.client.auth import OAuthRegistrationError, OAuthTokenError
from mcp.client.streamable_http import MCP_PROTOCOL_VERSION
Expand Down Expand Up @@ -243,6 +243,75 @@ async def handle_registration_response(response: Response) -> OAuthClientInforma
raise OAuthRegistrationError(f"Invalid registration response: {e}")


def is_valid_client_metadata_url(url: str | None) -> bool:
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

open to a different break down of helper functions here, this seemed like a reasonably flexible combo.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems appropriate to me

"""Validate that a URL is suitable for use as a client_id (CIMD).
The URL must be HTTPS with a non-root pathname.
Args:
url: The URL to validate
Returns:
True if the URL is a valid HTTPS URL with a non-root pathname
"""
if not url:
return False
try:
parsed = urlparse(url)
return parsed.scheme == "https" and parsed.path not in ("", "/")
except Exception:
return False


def should_use_client_metadata_url(
oauth_metadata: OAuthMetadata | None,
client_metadata_url: str | None,
) -> bool:
"""Determine if URL-based client ID (CIMD) should be used instead of DCR.
URL-based client IDs should be used when:
1. The server advertises client_id_metadata_document_supported=true
2. The client has a valid client_metadata_url configured
Args:
oauth_metadata: OAuth authorization server metadata
client_metadata_url: URL-based client ID (already validated)
Returns:
True if CIMD should be used, False if DCR should be used
"""
if not client_metadata_url:
return False

if not oauth_metadata:
return False

return oauth_metadata.client_id_metadata_document_supported is True


def create_client_info_from_metadata_url(
client_metadata_url: str, redirect_uris: list[AnyUrl] | None = None
) -> OAuthClientInformationFull:
"""Create client information using a URL-based client ID (CIMD).
When using URL-based client IDs, the URL itself becomes the client_id
and no client_secret is used (token_endpoint_auth_method="none").
Args:
client_metadata_url: The URL to use as the client_id
redirect_uris: The redirect URIs from the client metadata (passed through for
compatibility with OAuthClientInformationFull which inherits from OAuthClientMetadata)
Returns:
OAuthClientInformationFull with the URL as client_id
"""
return OAuthClientInformationFull(
client_id=client_metadata_url,
token_endpoint_auth_method="none",
redirect_uris=redirect_uris,
)


async def handle_token_response_scopes(
response: Response,
) -> OAuthToken:
Expand Down
Loading