Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,22 +7,58 @@
fetching the authorization URL and extracting the auth code from the redirect.

Usage:
python -m mcp_conformance_auth_client <server-url>
python -m mcp_conformance_auth_client <scenario> <server-url>

Environment Variables:
MCP_CONFORMANCE_CONTEXT - JSON object containing test credentials:
{
"client_id": "...",
"client_secret": "...", # For client_secret_basic flow
"private_key_pem": "...", # For private_key_jwt flow
"signing_algorithm": "ES256" # Optional, defaults to ES256
}

Scenarios:
auth/* - Authorization code flow scenarios (default behavior)
auth/client-credentials-jwt - Client credentials with JWT authentication (SEP-1046)
auth/client-credentials-basic - Client credentials with client_secret_basic
"""

import asyncio
import json
import logging
import os
import sys
from datetime import timedelta
from urllib.parse import ParseResult, parse_qs, urlparse

import httpx
from mcp import ClientSession
from mcp.client.auth import OAuthClientProvider, TokenStorage
from mcp.client.auth.extensions.client_credentials import (
ClientCredentialsOAuthProvider,
PrivateKeyJWTOAuthProvider,
SignedJWTParameters,
)
from mcp.client.streamable_http import streamablehttp_client
from mcp.shared.auth import OAuthClientInformationFull, OAuthClientMetadata, OAuthToken
from pydantic import AnyUrl


def get_conformance_context() -> dict:
"""Load conformance test context from MCP_CONFORMANCE_CONTEXT environment variable."""
context_json = os.environ.get("MCP_CONFORMANCE_CONTEXT")
if not context_json:
raise RuntimeError(
"MCP_CONFORMANCE_CONTEXT environment variable not set. "
"Expected JSON with client_id, client_secret, and/or private_key_pem."
)
try:
return json.loads(context_json)
except json.JSONDecodeError as e:
raise RuntimeError(f"Failed to parse MCP_CONFORMANCE_CONTEXT as JSON: {e}") from e


# Set up logging to stderr (stdout is for conformance test output)
logging.basicConfig(
level=logging.DEBUG,
Expand Down Expand Up @@ -111,17 +147,17 @@ async def handle_callback(self) -> tuple[str, str | None]:
return auth_code, state


async def run_client(server_url: str) -> None:
async def run_authorization_code_client(server_url: str) -> None:
"""
Run the conformance test client against the given server URL.
Run the conformance test client with authorization code flow.

This function:
1. Connects to the MCP server with OAuth authentication
1. Connects to the MCP server with OAuth authorization code flow
2. Initializes the session
3. Lists available tools
4. Calls a test tool
"""
logger.debug(f"Starting conformance auth client for {server_url}")
logger.debug(f"Starting conformance auth client (authorization_code) for {server_url}")

# Create callback handler that will automatically fetch auth codes
callback_handler = ConformanceOAuthCallbackHandler()
Expand All @@ -140,6 +176,89 @@ async def run_client(server_url: str) -> None:
callback_handler=callback_handler.handle_callback,
)

await _run_session(server_url, oauth_auth)


async def run_client_credentials_jwt_client(server_url: str) -> None:
"""
Run the conformance test client with client credentials flow using private_key_jwt (SEP-1046).

This function:
1. Connects to the MCP server with OAuth client_credentials grant
2. Uses private_key_jwt authentication with credentials from MCP_CONFORMANCE_CONTEXT
3. Initializes the session
4. Lists available tools
5. Calls a test tool
"""
logger.debug(f"Starting conformance auth client (client_credentials_jwt) for {server_url}")

# Load credentials from environment
context = get_conformance_context()
client_id = context.get("client_id")
private_key_pem = context.get("private_key_pem")
signing_algorithm = context.get("signing_algorithm", "ES256")

if not client_id:
raise RuntimeError("MCP_CONFORMANCE_CONTEXT missing 'client_id'")
if not private_key_pem:
raise RuntimeError("MCP_CONFORMANCE_CONTEXT missing 'private_key_pem'")

# Create JWT parameters for SDK-signed assertions
jwt_params = SignedJWTParameters(
issuer=client_id,
subject=client_id,
signing_algorithm=signing_algorithm,
signing_key=private_key_pem,
)

# Create OAuth provider for client_credentials with private_key_jwt
oauth_auth = PrivateKeyJWTOAuthProvider(
server_url=server_url,
storage=InMemoryTokenStorage(),
client_id=client_id,
assertion_provider=jwt_params.create_assertion_provider(),
)

await _run_session(server_url, oauth_auth)


async def run_client_credentials_basic_client(server_url: str) -> None:
"""
Run the conformance test client with client credentials flow using client_secret_basic.

This function:
1. Connects to the MCP server with OAuth client_credentials grant
2. Uses client_secret_basic authentication with credentials from MCP_CONFORMANCE_CONTEXT
3. Initializes the session
4. Lists available tools
5. Calls a test tool
"""
logger.debug(f"Starting conformance auth client (client_credentials_basic) for {server_url}")

# Load credentials from environment
context = get_conformance_context()
client_id = context.get("client_id")
client_secret = context.get("client_secret")

if not client_id:
raise RuntimeError("MCP_CONFORMANCE_CONTEXT missing 'client_id'")
if not client_secret:
raise RuntimeError("MCP_CONFORMANCE_CONTEXT missing 'client_secret'")

# Create OAuth provider for client_credentials with client_secret_basic
oauth_auth = ClientCredentialsOAuthProvider(
server_url=server_url,
storage=InMemoryTokenStorage(),
client_id=client_id,
client_secret=client_secret,
token_endpoint_auth_method="client_secret_basic",
)

await _run_session(server_url, oauth_auth)


async def _run_session(server_url: str, oauth_auth: OAuthClientProvider) -> None:
"""Common session logic for all OAuth flows."""
# Connect using streamable HTTP transport with OAuth
async with streamablehttp_client(
url=server_url,
Expand Down Expand Up @@ -168,14 +287,26 @@ async def run_client(server_url: str) -> None:

def main() -> None:
"""Main entry point for the conformance auth client."""
if len(sys.argv) != 2:
print(f"Usage: {sys.argv[0]} <server-url>", file=sys.stderr)
if len(sys.argv) != 3:
print(f"Usage: {sys.argv[0]} <scenario> <server-url>", file=sys.stderr)
print("", file=sys.stderr)
print("Scenarios:", file=sys.stderr)
print(" auth/* - Authorization code flow (default)", file=sys.stderr)
print(" auth/client-credentials-jwt - Client credentials with JWT auth (SEP-1046)", file=sys.stderr)
print(" auth/client-credentials-basic - Client credentials with client_secret_basic", file=sys.stderr)
sys.exit(1)

server_url = sys.argv[1]
scenario = sys.argv[1]
server_url = sys.argv[2]

try:
asyncio.run(run_client(server_url))
if scenario == "auth/client-credentials-jwt":
asyncio.run(run_client_credentials_jwt_client(server_url))
elif scenario == "auth/client-credentials-basic":
asyncio.run(run_client_credentials_basic_client(server_url))
else:
# Default to authorization code flow for all other auth/* scenarios
asyncio.run(run_authorization_code_client(server_url))
except Exception:
logger.exception("Client failed")
sys.exit(1)
Expand Down
Loading