From cf3ae5c88ea43e280860b67fbb3ef5347a41db21 Mon Sep 17 00:00:00 2001 From: Viicos <65306057+Viicos@users.noreply.github.com> Date: Sat, 18 Oct 2025 17:46:53 +0200 Subject: [PATCH 1/2] Refactor `func_metadata()` implementation --- pyproject.toml | 2 + .../server/fastmcp/utilities/func_metadata.py | 233 ++++++++---------- uv.lock | 4 + 3 files changed, 108 insertions(+), 131 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 21013e79e..9d4b29385 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -33,6 +33,8 @@ dependencies = [ "uvicorn>=0.31.1; sys_platform != 'emscripten'", "jsonschema>=4.20.0", "pywin32>=310; sys_platform == 'win32'", + "typing-extensions>=4.9.0", + "typing-inspection>=0.4.1", ] [project.optional-dependencies] diff --git a/src/mcp/server/fastmcp/utilities/func_metadata.py b/src/mcp/server/fastmcp/utilities/func_metadata.py index 873b1ae19..50f014737 100644 --- a/src/mcp/server/fastmcp/utilities/func_metadata.py +++ b/src/mcp/server/fastmcp/utilities/func_metadata.py @@ -1,10 +1,9 @@ import inspect import json -import types from collections.abc import Awaitable, Callable, Sequence from itertools import chain from types import GenericAlias -from typing import Annotated, Any, ForwardRef, Union, cast, get_args, get_origin, get_type_hints +from typing import Annotated, Any, cast, get_args, get_origin, get_type_hints import pydantic_core from pydantic import ( @@ -15,10 +14,10 @@ WithJsonSchema, create_model, ) -from pydantic._internal._typing_extra import eval_type_backport from pydantic.fields import FieldInfo from pydantic.json_schema import GenerateJsonSchema, JsonSchemaWarningKind -from pydantic_core import PydanticUndefined +from typing_extensions import is_typeddict +from typing_inspection.introspection import UNKNOWN, AnnotationSource, inspect_annotation, is_union_origin from mcp.server.fastmcp.exceptions import InvalidSignature from mcp.server.fastmcp.utilities.logging import get_logger @@ -212,56 +211,47 @@ def func_metadata( - output_model: A pydantic model for the return type if output is structured - output_conversion: Records how function output should be converted before returning. """ - sig = _get_typed_signature(func) + try: + sig = inspect.signature(func, eval_str=True) + except NameError as e: + # This raise could perhaps be skipped, and we (FastMCP) just call + # model_rebuild right before using it 🤷 + raise InvalidSignature(f"Unable to evaluate type annotations for callable {func.__name__!r}") from e params = sig.parameters dynamic_pydantic_model_params: dict[str, Any] = {} - globalns = getattr(func, "__globals__", {}) for param in params.values(): if param.name.startswith("_"): raise InvalidSignature(f"Parameter {param.name} of {func.__name__} cannot start with '_'") if param.name in skip_names: continue - annotation = param.annotation - - # `x: None` / `x: None = None` - if annotation is None: - annotation = Annotated[ - None, - Field(default=param.default if param.default is not inspect.Parameter.empty else PydanticUndefined), - ] - - # Untyped field - if annotation is inspect.Parameter.empty: - annotation = Annotated[ - Any, - Field(), - # 🤷 - WithJsonSchema({"title": param.name, "type": "string"}), - ] - - field_info = FieldInfo.from_annotated_attribute( - _get_typed_annotation(annotation, globalns), - param.default if param.default is not inspect.Parameter.empty else PydanticUndefined, - ) + annotation = param.annotation if param.annotation is not inspect.Parameter.empty else Any + field_name = param.name + field_kwargs: dict[str, Any] = {} + field_metadata: list[Any] = [] + + if param.annotation is inspect.Parameter.empty: + field_metadata.append(WithJsonSchema({"title": param.name, "type": "string"})) # Check if the parameter name conflicts with BaseModel attributes # This is necessary because Pydantic warns about shadowing parent attributes - if hasattr(BaseModel, param.name) and callable(getattr(BaseModel, param.name)): + if hasattr(BaseModel, field_name) and callable(getattr(BaseModel, field_name)): # Use an alias to avoid the shadowing warning - field_info.alias = param.name - field_info.validation_alias = param.name - field_info.serialization_alias = param.name - # Use a prefixed internal name - internal_name = f"field_{param.name}" - dynamic_pydantic_model_params[internal_name] = (field_info.annotation, field_info) + field_kwargs["alias"] = field_name + # Use a prefixed field name + field_name = f"field_{field_name}" + + if param.default is not inspect.Parameter.empty: + dynamic_pydantic_model_params[field_name] = ( + Annotated[(annotation, *field_metadata, Field(**field_kwargs))], + param.default, + ) else: - dynamic_pydantic_model_params[param.name] = (field_info.annotation, field_info) - continue + dynamic_pydantic_model_params[field_name] = Annotated[(annotation, *field_metadata, Field(**field_kwargs))] arguments_model = create_model( f"{func.__name__}Arguments", - **dynamic_pydantic_model_params, __base__=ArgModelBase, + **dynamic_pydantic_model_params, ) if structured_output is False: @@ -272,14 +262,15 @@ def func_metadata( if sig.return_annotation is inspect.Parameter.empty and structured_output is True: raise InvalidSignature(f"Function {func.__name__}: return annotation required for structured output") - output_info = FieldInfo.from_annotation(_get_typed_annotation(sig.return_annotation, globalns)) - annotation = output_info.annotation + inspected_return_ann = inspect_annotation(sig.return_annotation, annotation_source=AnnotationSource.FUNCTION) + return_type_expr = inspected_return_ann.type + if return_type_expr is UNKNOWN and structured_output is True: + # `return_type_expr` is `UNKNOWN` when a bare type qualifier is used (unlikely to happen as a return annotation + # because it doesn't make any sense, but technically possible). + raise InvalidSignature(f"Function {func.__name__}: return annotation required for structured output") - # Reject CallToolResult in Union types (including Optional) - # Handle both typing.Union (Union[X, Y]) and types.UnionType (X | Y) - origin = get_origin(annotation) - if origin is Union or origin is types.UnionType: - args = get_args(annotation) + if is_union_origin(get_origin(return_type_expr)): + args = get_args(return_type_expr) # Check if CallToolResult appears in the union (excluding None for Optional check) if any(isinstance(arg, type) and issubclass(arg, CallToolResult) for arg in args if arg is not type(None)): raise InvalidSignature( @@ -287,20 +278,34 @@ def func_metadata( "To return empty results, use: CallToolResult(content=[])" ) + original_annotation: Any # if the typehint is CallToolResult, the user either intends to return without validation # or they provided validation as Annotated metadata - if isinstance(annotation, type) and issubclass(annotation, CallToolResult): - if output_info.metadata: - annotation = output_info.metadata[0] + if isinstance(return_type_expr, type) and issubclass(return_type_expr, CallToolResult): + if inspected_return_ann.metadata: + return_type_expr = inspected_return_ann.metadata[0] + if len(inspected_return_ann.metadata) >= 2: + # Reconstruct the original annotation, by preserving the remaining metadata, + # i.e. from `Annotated[CallToolResult, ReturnType, Gt(1)]` to + # `Annotated[ReturnType, Gt(1)]`: + original_annotation = Annotated[(return_type_expr, *inspected_return_ann.metadata[1:])] + else: + # We only had `Annotated[CallToolResult, ReturnType]`, treat the original annotation + # as beging `ReturnType`: + original_annotation = return_type_expr else: return FuncMetadata(arg_model=arguments_model) + else: + original_annotation = sig.return_annotation - output_model, output_schema, wrap_output = _try_create_model_and_schema(annotation, func.__name__, output_info) + output_model, output_schema, wrap_output = _try_create_model_and_schema( + original_annotation, return_type_expr, func.__name__ + ) if output_model is None and structured_output is True: # Model creation failed or produced warnings - no structured output raise InvalidSignature( - f"Function {func.__name__}: return type {annotation} is not serializable for structured output" + f"Function {func.__name__}: return type {return_type_expr} is not serializable for structured output" ) return FuncMetadata( @@ -312,10 +317,18 @@ def func_metadata( def _try_create_model_and_schema( - annotation: Any, func_name: str, field_info: FieldInfo + original_annotation: Any, + type_expr: Any, + func_name: str, ) -> tuple[type[BaseModel] | None, dict[str, Any] | None, bool]: """Try to create a model and schema for the given annotation without warnings. + Args: + original_annotation: The original return annotation (may be wrapped in `Annotated`). + type_expr: The underlying type expression derived from the return annotation + (`Annotated` and type qualifiers were stripped). + func_name: The name of the function. + Returns: tuple of (model or None, schema or None, wrap_output) Model and schema are None if warnings occur or creation fails. @@ -325,43 +338,45 @@ def _try_create_model_and_schema( wrap_output = False # First handle special case: None - if annotation is None: - model = _create_wrapped_model(func_name, annotation, field_info) + if type_expr is None: + model = _create_wrapped_model(func_name, original_annotation) wrap_output = True # Handle GenericAlias types (list[str], dict[str, int], Union[str, int], etc.) - elif isinstance(annotation, GenericAlias): - origin = get_origin(annotation) + elif isinstance(type_expr, GenericAlias): + origin = get_origin(type_expr) # Special case: dict with string keys can use RootModel if origin is dict: - args = get_args(annotation) + args = get_args(type_expr) if len(args) == 2 and args[0] is str: - model = _create_dict_model(func_name, annotation) + # TODO: should we use the original annotation? We are loosing any potential `Annotated` + # metadata for Pydantic here: + model = _create_dict_model(func_name, type_expr) else: # dict with non-str keys needs wrapping - model = _create_wrapped_model(func_name, annotation, field_info) + model = _create_wrapped_model(func_name, original_annotation) wrap_output = True else: # All other generic types need wrapping (list, tuple, Union, Optional, etc.) - model = _create_wrapped_model(func_name, annotation, field_info) + model = _create_wrapped_model(func_name, original_annotation) wrap_output = True # Handle regular type objects - elif isinstance(annotation, type): - type_annotation: type[Any] = cast(type[Any], annotation) + elif isinstance(type_expr, type): + type_annotation = cast(type[Any], type_expr) # Case 1: BaseModel subclasses (can be used directly) - if issubclass(annotation, BaseModel): - model = annotation + if issubclass(type_annotation, BaseModel): + model = type_annotation - # Case 2: TypedDict (special dict subclass with __annotations__) - elif hasattr(type_annotation, "__annotations__") and issubclass(annotation, dict): + # Case 2: TypedDicts: + elif is_typeddict(type_annotation): model = _create_model_from_typeddict(type_annotation) # Case 3: Primitive types that need wrapping - elif annotation in (str, int, float, bool, bytes, type(None)): - model = _create_wrapped_model(func_name, annotation, field_info) + elif type_annotation in (str, int, float, bool, bytes, type(None)): + model = _create_wrapped_model(func_name, original_annotation) wrap_output = True # Case 4: Other class types (dataclasses, regular classes with annotations) @@ -369,14 +384,14 @@ def _try_create_model_and_schema( type_hints = get_type_hints(type_annotation) if type_hints: # Classes with type hints can be converted to Pydantic models - model = _create_model_from_class(type_annotation) + model = _create_model_from_class(type_annotation, type_hints) # Classes without type hints are not serializable - model remains None # Handle any other types not covered above else: # This includes typing constructs that aren't GenericAlias in Python 3.10 # (e.g., Union, Optional in some Python versions) - model = _create_wrapped_model(func_name, annotation, field_info) + model = _create_wrapped_model(func_name, original_annotation) wrap_output = True if model: @@ -390,7 +405,7 @@ def _try_create_model_and_schema( # ValueError: When there are issues with the type definition (including our custom warnings) # SchemaError: When Pydantic can't build a schema # ValidationError: When validation fails - logger.info(f"Cannot create schema for type {annotation} in {func_name}: {type(e).__name__}: {e}") + logger.info(f"Cannot create schema for type {type_expr} in {func_name}: {type(e).__name__}: {e}") return None, None, False return model, schema, wrap_output @@ -398,7 +413,10 @@ def _try_create_model_and_schema( return None, None, False -def _create_model_from_class(cls: type[Any]) -> type[BaseModel]: +_no_default = object() + + +def _create_model_from_class(cls: type[Any], type_hints: dict[str, Any]) -> type[BaseModel]: """Create a Pydantic model from an ordinary class. The created model will: @@ -406,24 +424,20 @@ def _create_model_from_class(cls: type[Any]) -> type[BaseModel]: - Have fields with the same names and types as the class's fields - Include all fields whose type does not include None in the set of required fields - Precondition: cls must have type hints (i.e., get_type_hints(cls) is non-empty) + Precondition: cls must have type hints (i.e., `type_hints` is non-empty) """ - type_hints = get_type_hints(cls) - model_fields: dict[str, Any] = {} for field_name, field_type in type_hints.items(): if field_name.startswith("_"): continue - default = getattr(cls, field_name, PydanticUndefined) - field_info = FieldInfo.from_annotated_attribute(field_type, default) - model_fields[field_name] = (field_info.annotation, field_info) - - # Create a base class with the config - class BaseWithConfig(BaseModel): - model_config = ConfigDict(from_attributes=True) + default = getattr(cls, field_name, _no_default) + if default is _no_default: + model_fields[field_name] = field_type + else: + model_fields[field_name] = (field_type, default) - return create_model(cls.__name__, **model_fields, __base__=BaseWithConfig) + return create_model(cls.__name__, __config__=ConfigDict(from_attributes=True), **model_fields) def _create_model_from_typeddict(td_type: type[Any]) -> type[BaseModel]: @@ -436,31 +450,25 @@ def _create_model_from_typeddict(td_type: type[Any]) -> type[BaseModel]: model_fields: dict[str, Any] = {} for field_name, field_type in type_hints.items(): - field_info = FieldInfo.from_annotation(field_type) - if field_name not in required_keys: # For optional TypedDict fields, set default=None # This makes them not required in the Pydantic model # The model should use exclude_unset=True when dumping to get TypedDict semantics - field_info.default = None - - model_fields[field_name] = (field_info.annotation, field_info) + model_fields[field_name] = (field_type, None) + else: + model_fields[field_name] = field_type - return create_model(td_type.__name__, **model_fields, __base__=BaseModel) + return create_model(td_type.__name__, **model_fields) -def _create_wrapped_model(func_name: str, annotation: Any, field_info: FieldInfo) -> type[BaseModel]: +def _create_wrapped_model(func_name: str, annotation: Any) -> type[BaseModel]: """Create a model that wraps a type in a 'result' field. This is used for primitive types, generic types like list/dict, etc. """ model_name = f"{func_name}Output" - # Pydantic needs type(None) instead of None for the type annotation - if annotation is None: - annotation = type(None) - - return create_model(model_name, result=(annotation, field_info), __base__=BaseModel) + return create_model(model_name, result=annotation) def _create_dict_model(func_name: str, dict_annotation: Any) -> type[BaseModel]: @@ -476,43 +484,6 @@ class DictModel(RootModel[dict_annotation]): return DictModel -def _get_typed_annotation(annotation: Any, globalns: dict[str, Any]) -> Any: - def try_eval_type(value: Any, globalns: dict[str, Any], localns: dict[str, Any]) -> tuple[Any, bool]: - try: - return eval_type_backport(value, globalns, localns), True - except NameError: - return value, False - - if isinstance(annotation, str): - annotation = ForwardRef(annotation) - annotation, status = try_eval_type(annotation, globalns, globalns) - - # This check and raise could perhaps be skipped, and we (FastMCP) just call - # model_rebuild right before using it 🤷 - if status is False: - raise InvalidSignature(f"Unable to evaluate type annotation {annotation}") - - return annotation - - -def _get_typed_signature(call: Callable[..., Any]) -> inspect.Signature: - """Get function signature while evaluating forward references""" - signature = inspect.signature(call) - globalns = getattr(call, "__globals__", {}) - typed_params = [ - inspect.Parameter( - name=param.name, - kind=param.kind, - default=param.default, - annotation=_get_typed_annotation(param.annotation, globalns), - ) - for param in signature.parameters.values() - ] - typed_return = _get_typed_annotation(signature.return_annotation, globalns) - typed_signature = inspect.Signature(typed_params, return_annotation=typed_return) - return typed_signature - - def _convert_to_content( result: Any, ) -> Sequence[ContentBlock]: diff --git a/uv.lock b/uv.lock index 6c6b13a6e..cb27d659b 100644 --- a/uv.lock +++ b/uv.lock @@ -618,6 +618,8 @@ dependencies = [ { name = "pywin32", marker = "sys_platform == 'win32'" }, { name = "sse-starlette" }, { name = "starlette" }, + { name = "typing-extensions" }, + { name = "typing-inspection" }, { name = "uvicorn", marker = "sys_platform != 'emscripten'" }, ] @@ -668,6 +670,8 @@ requires-dist = [ { name = "sse-starlette", specifier = ">=1.6.1" }, { name = "starlette", specifier = ">=0.27" }, { name = "typer", marker = "extra == 'cli'", specifier = ">=0.16.0" }, + { name = "typing-extensions", specifier = ">=4.9.0" }, + { name = "typing-inspection", specifier = ">=0.4.1" }, { name = "uvicorn", marker = "sys_platform != 'emscripten'", specifier = ">=0.31.1" }, { name = "websockets", marker = "extra == 'ws'", specifier = ">=15.0.1" }, ] From ee570a7e8aaaa15efde3f129a454ab198c7257a3 Mon Sep 17 00:00:00 2001 From: Viicos <65306057+Viicos@users.noreply.github.com> Date: Fri, 24 Oct 2025 11:22:19 +0200 Subject: [PATCH 2/2] Feedback --- .../server/fastmcp/utilities/func_metadata.py | 16 +++++++++----- tests/server/fastmcp/test_func_metadata.py | 22 ++++++++++++++++++- 2 files changed, 31 insertions(+), 7 deletions(-) diff --git a/src/mcp/server/fastmcp/utilities/func_metadata.py b/src/mcp/server/fastmcp/utilities/func_metadata.py index 50f014737..5e066116d 100644 --- a/src/mcp/server/fastmcp/utilities/func_metadata.py +++ b/src/mcp/server/fastmcp/utilities/func_metadata.py @@ -17,7 +17,7 @@ from pydantic.fields import FieldInfo from pydantic.json_schema import GenerateJsonSchema, JsonSchemaWarningKind from typing_extensions import is_typeddict -from typing_inspection.introspection import UNKNOWN, AnnotationSource, inspect_annotation, is_union_origin +from typing_inspection.introspection import UNKNOWN, ForbiddenQualifier, AnnotationSource, inspect_annotation, is_union_origin from mcp.server.fastmcp.exceptions import InvalidSignature from mcp.server.fastmcp.utilities.logging import get_logger @@ -262,12 +262,16 @@ def func_metadata( if sig.return_annotation is inspect.Parameter.empty and structured_output is True: raise InvalidSignature(f"Function {func.__name__}: return annotation required for structured output") - inspected_return_ann = inspect_annotation(sig.return_annotation, annotation_source=AnnotationSource.FUNCTION) + try: + inspected_return_ann = inspect_annotation(sig.return_annotation, annotation_source=AnnotationSource.FUNCTION) + except ForbiddenQualifier as e: + raise InvalidSignature(f"Function {func.__name__}: return annotation contains an invalid type qualifier") from e + return_type_expr = inspected_return_ann.type - if return_type_expr is UNKNOWN and structured_output is True: - # `return_type_expr` is `UNKNOWN` when a bare type qualifier is used (unlikely to happen as a return annotation - # because it doesn't make any sense, but technically possible). - raise InvalidSignature(f"Function {func.__name__}: return annotation required for structured output") + + # `AnnotationSource.FUNCTION` allows no type qualifier to be used, so `return_type_expr` is guaranteed to *not* be + # unknown (i.e. a bare `Final`). + assert return_type_expr is not UNKNOWN if is_union_origin(get_origin(return_type_expr)): args = get_args(return_type_expr) diff --git a/tests/server/fastmcp/test_func_metadata.py b/tests/server/fastmcp/test_func_metadata.py index 793dfc324..b2ee42943 100644 --- a/tests/server/fastmcp/test_func_metadata.py +++ b/tests/server/fastmcp/test_func_metadata.py @@ -5,7 +5,7 @@ # pyright: reportUnknownLambdaType=false from collections.abc import Callable from dataclasses import dataclass -from typing import Annotated, Any, TypedDict +from typing import Annotated, Any, TypedDict, Final import annotated_types import pytest @@ -1182,3 +1182,23 @@ def func_with_reserved_json( assert result["json"] == {"nested": "data"} assert result["model_dump"] == [1, 2, 3] assert result["normal"] == "plain string" + + +def test_disallowed_type_qualifier(): + from mcp.server.fastmcp.exceptions import InvalidSignature + + def func_disallowed_qualifier() -> Final[int]: # type: ignore + pass + + with pytest.raises(InvalidSignature) as exc_info: + func_metadata(func_disallowed_qualifier) + assert "return annotation contains an invalid type qualifier" in str(exc_info.value) + + +def test_preserves_pydantic_metadata(): + def func_with_metadata() -> Annotated[int, Field(gt=1)]: ... + + meta = func_metadata(func_with_metadata) + + assert meta.output_schema is not None + assert meta.output_schema["properties"]["result"] == {"exclusiveMinimum": 1, "title": "Result", "type": "integer"}