Skip to content

feat: support protected resource metadata for mcp server #27

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
188 changes: 100 additions & 88 deletions mcpauth/__init__.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,22 @@
from contextvars import ContextVar
import logging
from typing import Any, Callable, List, Literal, Optional, Union

from typing import Callable, List, Literal, Optional, Union
from typing_extensions import deprecated

from .auth.authorization_server_handler import (
AuthorizationServerHandler,
AuthServerModeConfig,
)
from .auth.mcp_auth_handler import MCPAuthHandler
from .auth.resource_server_handler import (
ResourceServerHandler,
ResourceServerModeConfig,
)
from .middleware.create_bearer_auth import BearerAuthConfig
from .types import AuthInfo, VerifyAccessTokenFunction
from .config import AuthServerConfig, ServerMetadataPaths
from .types import AuthInfo, ResourceServerConfig, VerifyAccessTokenFunction
from .config import AuthServerConfig
from .exceptions import MCPAuthAuthServerException, AuthServerExceptionCode
from .utils import validate_server_config
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.responses import Response, JSONResponse
from starlette.requests import Request
from starlette.routing import Route
from starlette.routing import Router, Route

_context_var_name = "mcp_auth_context"

Expand All @@ -23,41 +29,47 @@ class MCPAuth:
See Also: https://mcp-auth.dev for more information about the library and its usage.
"""

server: AuthServerConfig
"""
The configuration for the remote authorization server.
"""
_handler: MCPAuthHandler

def __init__(
self,
server: AuthServerConfig,
server: Optional[AuthServerConfig] = None,
protected_resources: Optional[
Union[ResourceServerConfig, List[ResourceServerConfig]]
] = None,
context_var: ContextVar[Optional[AuthInfo]] = ContextVar(
_context_var_name, default=None
),
):
"""
:param server: Configuration for the remote authorization server.
:param server: Configuration for the remote authorization server (deprecated).
:param protected_resources: Configuration for one or more protected resource servers.
:param context_var: Context variable to store the `AuthInfo` object for the current request.
By default, it will be created with the name "mcp_auth_context".
"""

result = validate_server_config(server)
if server and protected_resources:
raise MCPAuthAuthServerException(
AuthServerExceptionCode.INVALID_SERVER_CONFIG,
cause={
"error_description": "Either `server` or `protected_resources` must be provided, but not both."
},
)

if not result.is_valid:
logging.error(
"The authorization server configuration is invalid:\n"
f"{result.errors}\n"
if server:
self._handler = AuthorizationServerHandler(AuthServerModeConfig(server))
elif protected_resources:
self._handler = ResourceServerHandler(
ResourceServerModeConfig(protected_resources)
)
else:
raise MCPAuthAuthServerException(
AuthServerExceptionCode.INVALID_SERVER_CONFIG, cause=result
AuthServerExceptionCode.INVALID_SERVER_CONFIG,
cause={
"error_description": "Either `server` or `protected_resources` must be provided."
},
)

if len(result.warnings) > 0:
logging.warning("The authorization server configuration has warnings:\n")
for warning in result.warnings:
logging.warning(f"- {warning}")

self.server = server
self._context_var = context_var

@property
Expand All @@ -72,64 +84,48 @@ def auth_info(self) -> Optional[AuthInfo]:

return self._context_var.get()

def metadata_endpoint(self) -> Callable[[Request], Any]:
@deprecated("Use resource_metadata_router() instead for resource server mode")
def metadata_route(self) -> Route:
"""
Returns a Starlette endpoint function that handles the OAuth 2.0 Authorization Metadata
endpoint (`/.well-known/oauth-authorization-server`) with CORS support.

Example:
```python
from starlette.applications import Starlette
from mcpauth import MCPAuth
from mcpauth.config import ServerMetadataPaths

mcp_auth = MCPAuth(server=your_server_config)
app = Starlette(routes=[
Route(
ServerMetadataPaths.OAUTH.value,
mcp_auth.metadata_endpoint(),
methods=["GET", "OPTIONS"] # Ensure to handle both GET and OPTIONS methods
)
])
```
Returns a router that handles the legacy OAuth 2.0 Authorization Server Metadata endpoint.

This method is deprecated and will be removed in a future version.
For resource server mode, use `resource_metadata_router()` instead to serve
the Protected Resource Metadata endpoints.
"""
if isinstance(self._handler, ResourceServerHandler):
raise MCPAuthAuthServerException(
AuthServerExceptionCode.INVALID_SERVER_CONFIG,
cause={
"error_description": "`metadata_route` is not available in `resource server` mode. Use `resource_metadata_router()` instead."
},
)

async def endpoint(request: Request) -> Response:
if request.method == "OPTIONS":
response = Response(status_code=204)
else:
server_config = self.server
response = JSONResponse(
server_config.metadata.model_dump(exclude_none=True),
status_code=200,
)
response.headers["Access-Control-Allow-Origin"] = "*"
response.headers["Access-Control-Allow-Methods"] = "GET, OPTIONS"
response.headers["Access-Control-Allow-Headers"] = "*"
return response

return endpoint
oauth_metadata_route = self._handler.create_metadata_route().routes[0]

def metadata_route(self) -> Route:
"""
Returns a Starlette route that handles the OAuth 2.0 Authorization Metadata endpoint
(`/.well-known/oauth-authorization-server`) with CORS support.
if not isinstance(oauth_metadata_route, Route):
raise IndexError(
"No metadata endpoint route was created. Expected the authorization server metadata route to be present."
)

Example:
```python
from starlette.applications import Starlette
from mcpauth import MCPAuth
return oauth_metadata_route

mcp_auth = MCPAuth(server=your_server_config)
app = Starlette(routes=[mcp_auth.metadata_route()])
```
def resource_metadata_router(self) -> Router:
"""
Returns a router that serves the OAuth 2.0 Protected Resource Metadata endpoint
for all configured resources.

return Route(
ServerMetadataPaths.OAUTH.value,
self.metadata_endpoint(),
methods=["GET", "OPTIONS"],
)
This is an alias for `metadata_route` and is the recommended method to use when
in "resource server" mode.
"""
if isinstance(self._handler, AuthorizationServerHandler):
raise MCPAuthAuthServerException(
AuthServerExceptionCode.INVALID_SERVER_CONFIG,
cause={
"error_description": "`resource_metadata_router` is not available in `authorization server` mode."
},
)
return self._handler.create_metadata_route()

def bearer_auth_middleware(
self,
Expand All @@ -138,6 +134,7 @@ def bearer_auth_middleware(
required_scopes: Optional[List[str]] = None,
show_error_details: bool = False,
leeway: float = 60,
resource: Optional[str] = None,
) -> type[BaseHTTPMiddleware]:
"""
Creates a middleware that handles bearer token authentication.
Expand All @@ -150,38 +147,53 @@ def bearer_auth_middleware(
Defaults to `False`.
:param leeway: Optional leeway in seconds for JWT verification (`jwt.decode`). Defaults to
`60`. Not used if a custom function is provided.
:param resource: The identifier of the protected resource. Required when using `protected_resources`.
:return: A middleware class that can be used in a Starlette or FastAPI application.
"""
from .middleware.create_bearer_auth import create_bearer_auth

metadata = self.server.metadata
if isinstance(mode_or_verify, str) and mode_or_verify == "jwt":
from .utils import create_verify_jwt
issuer: Union[str, Callable[[str], None]]

resource_for_verifier: str

if not metadata.jwks_uri:
if isinstance(self._handler, ResourceServerHandler):
if not resource:
raise MCPAuthAuthServerException(
AuthServerExceptionCode.MISSING_JWKS_URI
AuthServerExceptionCode.INVALID_SERVER_CONFIG,
cause={
"error_description": "A `resource` must be specified in the `bearer_auth_middleware` configuration when using a `protected_resources` configuration."
},
)
resource_for_verifier = resource
else: # AuthorizationServerHandler
# In the deprecated `authorization server` mode, `getTokenVerifier` does not utilize the
# `resource` parameter. Passing an empty string `''` is a straightforward approach that
# avoids over-engineering a solution for a legacy path.
resource_for_verifier = ""

verify = create_verify_jwt(
metadata.jwks_uri,
leeway=leeway,
if isinstance(mode_or_verify, str) and mode_or_verify == "jwt":
token_verifier = self._handler.get_token_verifier(
resource=resource_for_verifier
)
verify = token_verifier.create_verify_jwt_function(leeway=leeway)
issuer = token_verifier.validate_jwt_issuer
elif callable(mode_or_verify):
verify = mode_or_verify
# For custom verify functions, issuer validation should be handled by the custom logic
issuer = lambda _: None # No-op function that accepts any issuer
else:
raise ValueError(
"mode_or_verify must be 'jwt' or a callable function that verifies tokens."
)

from .middleware.create_bearer_auth import create_bearer_auth

return create_bearer_auth(
verify,
config=BearerAuthConfig(
issuer=metadata.issuer,
issuer=issuer,
audience=audience,
required_scopes=required_scopes,
show_error_details=show_error_details,
resource=resource,
),
context_var=self._context_var,
)
93 changes: 93 additions & 0 deletions mcpauth/auth/authorization_server_handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
import logging
from typing import Any, Callable

from starlette.routing import Route, Router
from starlette.requests import Request
from starlette.responses import Response, JSONResponse

from ..config import AuthServerConfig, ServerMetadataPaths
from ..exceptions import AuthServerExceptionCode, MCPAuthAuthServerException
from ..utils import validate_server_config
from .mcp_auth_handler import MCPAuthHandler
from .token_verifier import TokenVerifier


class AuthServerModeConfig:
"""
Configuration for the legacy, MCP-server-as-authorization-server mode.
"""

def __init__(self, server: AuthServerConfig):
self.server = server


class AuthorizationServerHandler(MCPAuthHandler):
"""
Handles the authentication logic for the legacy `server` mode.
"""

def __init__(self, config: AuthServerModeConfig):
logging.warning(
"The authorization server mode is deprecated. Please use resource server mode instead."
)

result = validate_server_config(config.server)

if not result.is_valid:
logging.error(
"The authorization server configuration is invalid:\n"
f"{result.errors}\n"
)
raise MCPAuthAuthServerException(
AuthServerExceptionCode.INVALID_SERVER_CONFIG, cause=result
)

if len(result.warnings) > 0:
logging.warning("The authorization server configuration has warnings:\n")
for warning in result.warnings:
logging.warning(f"- {warning}")

self.server = config.server
self.token_verifier = TokenVerifier([config.server])

def create_metadata_route(self) -> Router:
"""
Returns a Starlette route that handles the OAuth 2.0 Authorization Metadata endpoint
(`/.well-known/oauth-authorization-server`) with CORS support.
"""
routes = [
Route(
ServerMetadataPaths.OAUTH.value,
self._create_metadata_endpoint(),
methods=["GET", "OPTIONS"],
)
]
return Router(routes=routes)

def _create_metadata_endpoint(self) -> Callable[[Request], Any]:
"""
Returns a Starlette endpoint function that handles the OAuth 2.0 Authorization Metadata
endpoint (`/.well-known/oauth-authorization-server`) with CORS support.
"""

def endpoint(request: Request) -> Response:
if request.method == "OPTIONS":
response = Response(status_code=204)
else:
response = JSONResponse(
self.server.metadata.model_dump(exclude_none=True),
status_code=200,
)
response.headers["Access-Control-Allow-Origin"] = "*"
response.headers["Access-Control-Allow-Methods"] = "GET, OPTIONS"
response.headers["Access-Control-Allow-Headers"] = "*"
return response

return endpoint

def get_token_verifier(self, resource: str) -> TokenVerifier:
"""
This is a dummy implementation that ignores the resource, as there is only
one `TokenVerifier` in the authorization server mode.
"""
return self.token_verifier
28 changes: 28 additions & 0 deletions mcpauth/auth/mcp_auth_handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
from abc import ABC, abstractmethod

from starlette.routing import Router

from .token_verifier import TokenVerifier


class MCPAuthHandler(ABC):
"""
Defines the contract for a handler that manages the logic for a specific MCPAuth configuration.
This allows for clean separation of logic between legacy and modern configurations.
"""

@abstractmethod
def create_metadata_route(self) -> Router:
"""
Returns a router for serving either the legacy OAuth 2.0 Authorization Server Metadata or
the OAuth 2.0 Protected Resource Metadata, depending on the configuration.
"""
... # pragma: no cover

@abstractmethod
def get_token_verifier(self, resource: str) -> TokenVerifier:
"""
Resolves the appropriate TokenVerifier based on the provided resource.
:param resource: The resource identifier for verifier lookup.
"""
... # pragma: no cover
Loading