diff --git a/README.md b/README.md index d2fb9194a..e2ef5a7ca 100644 --- a/README.md +++ b/README.md @@ -57,6 +57,7 @@ - [Advanced Usage](#advanced-usage) - [Low-Level Server](#low-level-server) - [Structured Output Support](#structured-output-support) + - [Pagination (Advanced)](#pagination-advanced) - [Writing MCP Clients](#writing-mcp-clients) - [Client Display Utilities](#client-display-utilities) - [OAuth Authentication for Clients](#oauth-authentication-for-clients) @@ -1737,6 +1738,116 @@ Tools can return data in three ways: When an `outputSchema` is defined, the server automatically validates the structured output against the schema. This ensures type safety and helps catch errors early. +### Pagination (Advanced) + +For servers that need to handle large datasets, the low-level server provides paginated versions of list operations. This is an optional optimization - most servers won't need pagination unless they're dealing with hundreds or thousands of items. + +#### Server-side Implementation + + +```python +""" +Example of implementing pagination with MCP server decorators. +""" + +from pydantic import AnyUrl + +import mcp.types as types +from mcp.server.lowlevel import Server + +# Initialize the server +server = Server("paginated-server") + +# Sample data to paginate +ITEMS = [f"Item {i}" for i in range(1, 101)] # 100 items + + +@server.list_resources() +async def list_resources_paginated(request: types.ListResourcesRequest) -> types.ListResourcesResult: + """List resources with pagination support.""" + page_size = 10 + + # Extract cursor from request params + cursor = request.params.cursor if request.params is not None else None + + # Parse cursor to get offset + start = 0 if cursor is None else int(cursor) + end = start + page_size + + # Get page of resources + page_items = [ + types.Resource(uri=AnyUrl(f"resource://items/{item}"), name=item, description=f"Description for {item}") + for item in ITEMS[start:end] + ] + + # Determine next cursor + next_cursor = str(end) if end < len(ITEMS) else None + + return types.ListResourcesResult(resources=page_items, nextCursor=next_cursor) +``` + +_Full example: [examples/snippets/servers/pagination_example.py](https://github.com/modelcontextprotocol/python-sdk/blob/main/examples/snippets/servers/pagination_example.py)_ + + +#### Client-side Consumption + + +```python +""" +Example of consuming paginated MCP endpoints from a client. +""" + +import asyncio + +from mcp.client.session import ClientSession +from mcp.client.stdio import StdioServerParameters, stdio_client +from mcp.types import Resource + + +async def list_all_resources() -> None: + """Fetch all resources using pagination.""" + async with stdio_client(StdioServerParameters(command="uv", args=["run", "mcp-simple-pagination"])) as ( + read, + write, + ): + async with ClientSession(read, write) as session: + await session.initialize() + + all_resources: list[Resource] = [] + cursor = None + + while True: + # Fetch a page of resources + result = await session.list_resources(cursor=cursor) + all_resources.extend(result.resources) + + print(f"Fetched {len(result.resources)} resources") + + # Check if there are more pages + if result.nextCursor: + cursor = result.nextCursor + else: + break + + print(f"Total resources: {len(all_resources)}") + + +if __name__ == "__main__": + asyncio.run(list_all_resources()) +``` + +_Full example: [examples/snippets/clients/pagination_client.py](https://github.com/modelcontextprotocol/python-sdk/blob/main/examples/snippets/clients/pagination_client.py)_ + + +#### Key Points + +- **Cursors are opaque strings** - the server defines the format (numeric offsets, timestamps, etc.) +- **Return `nextCursor=None`** when there are no more pages +- **Backward compatible** - clients that don't support pagination will still work (they'll just get the first page) +- **Flexible page sizes** - Each endpoint can define its own page size based on data characteristics + +See the [simple-pagination example](examples/servers/simple-pagination) for a complete implementation. + ### Writing MCP Clients The SDK provides a high-level client interface for connecting to MCP servers using various [transports](https://modelcontextprotocol.io/specification/2025-03-26/basic/transports): diff --git a/examples/servers/simple-pagination/README.md b/examples/servers/simple-pagination/README.md new file mode 100644 index 000000000..e732b8efb --- /dev/null +++ b/examples/servers/simple-pagination/README.md @@ -0,0 +1,77 @@ +# MCP Simple Pagination + +A simple MCP server demonstrating pagination for tools, resources, and prompts using cursor-based pagination. + +## Usage + +Start the server using either stdio (default) or SSE transport: + +```bash +# Using stdio transport (default) +uv run mcp-simple-pagination + +# Using SSE transport on custom port +uv run mcp-simple-pagination --transport sse --port 8000 +``` + +The server exposes: + +- 25 tools (paginated, 5 per page) +- 30 resources (paginated, 10 per page) +- 20 prompts (paginated, 7 per page) + +Each paginated list returns a `nextCursor` when more pages are available. Use this cursor in subsequent requests to retrieve the next page. + +## Example + +Using the MCP client, you can retrieve paginated items like this using the STDIO transport: + +```python +import asyncio +from mcp.client.session import ClientSession +from mcp.client.stdio import StdioServerParameters, stdio_client + + +async def main(): + async with stdio_client( + StdioServerParameters(command="uv", args=["run", "mcp-simple-pagination"]) + ) as (read, write): + async with ClientSession(read, write) as session: + await session.initialize() + + # Get first page of tools + tools_page1 = await session.list_tools() + print(f"First page: {len(tools_page1.tools)} tools") + print(f"Next cursor: {tools_page1.nextCursor}") + + # Get second page using cursor + if tools_page1.nextCursor: + tools_page2 = await session.list_tools(cursor=tools_page1.nextCursor) + print(f"Second page: {len(tools_page2.tools)} tools") + + # Similarly for resources + resources_page1 = await session.list_resources() + print(f"First page: {len(resources_page1.resources)} resources") + + # And for prompts + prompts_page1 = await session.list_prompts() + print(f"First page: {len(prompts_page1.prompts)} prompts") + + +asyncio.run(main()) +``` + +## Pagination Details + +The server uses simple numeric indices as cursors for demonstration purposes. In production scenarios, you might use: + +- Database offsets or row IDs +- Timestamps for time-based pagination +- Opaque tokens encoding pagination state + +The pagination implementation demonstrates: + +- Handling `None` cursor for the first page +- Returning `nextCursor` when more data exists +- Gracefully handling invalid cursors +- Different page sizes for different resource types diff --git a/examples/servers/simple-pagination/mcp_simple_pagination/__init__.py b/examples/servers/simple-pagination/mcp_simple_pagination/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/examples/servers/simple-pagination/mcp_simple_pagination/__main__.py b/examples/servers/simple-pagination/mcp_simple_pagination/__main__.py new file mode 100644 index 000000000..e7ef16530 --- /dev/null +++ b/examples/servers/simple-pagination/mcp_simple_pagination/__main__.py @@ -0,0 +1,5 @@ +import sys + +from .server import main + +sys.exit(main()) # type: ignore[call-arg] diff --git a/examples/servers/simple-pagination/mcp_simple_pagination/server.py b/examples/servers/simple-pagination/mcp_simple_pagination/server.py new file mode 100644 index 000000000..360cbc3cf --- /dev/null +++ b/examples/servers/simple-pagination/mcp_simple_pagination/server.py @@ -0,0 +1,228 @@ +""" +Simple MCP server demonstrating pagination for tools, resources, and prompts. + +This example shows how to use the paginated decorators to handle large lists +of items that need to be split across multiple pages. +""" + +from typing import Any + +import anyio +import click +import mcp.types as types +from mcp.server.lowlevel import Server +from pydantic import AnyUrl +from starlette.requests import Request + +# Sample data - in real scenarios, this might come from a database +SAMPLE_TOOLS = [ + types.Tool( + name=f"tool_{i}", + title=f"Tool {i}", + description=f"This is sample tool number {i}", + inputSchema={"type": "object", "properties": {"input": {"type": "string"}}}, + ) + for i in range(1, 26) # 25 tools total +] + +SAMPLE_RESOURCES = [ + types.Resource( + uri=AnyUrl(f"file:///path/to/resource_{i}.txt"), + name=f"resource_{i}", + description=f"This is sample resource number {i}", + ) + for i in range(1, 31) # 30 resources total +] + +SAMPLE_PROMPTS = [ + types.Prompt( + name=f"prompt_{i}", + description=f"This is sample prompt number {i}", + arguments=[ + types.PromptArgument(name="arg1", description="First argument", required=True), + ], + ) + for i in range(1, 21) # 20 prompts total +] + + +@click.command() +@click.option("--port", default=8000, help="Port to listen on for SSE") +@click.option( + "--transport", + type=click.Choice(["stdio", "sse"]), + default="stdio", + help="Transport type", +) +def main(port: int, transport: str) -> int: + app = Server("mcp-simple-pagination") + + # Paginated list_tools - returns 5 tools per page + @app.list_tools() + async def list_tools_paginated(request: types.ListToolsRequest) -> types.ListToolsResult: + page_size = 5 + + cursor = request.params.cursor if request.params is not None else None + if cursor is None: + # First page + start_idx = 0 + else: + # Parse cursor to get the start index + try: + start_idx = int(cursor) + except (ValueError, TypeError): + # Invalid cursor, return empty + return types.ListToolsResult(tools=[], nextCursor=None) + + # Get the page of tools + page_tools = SAMPLE_TOOLS[start_idx : start_idx + page_size] + + # Determine if there are more pages + next_cursor = None + if start_idx + page_size < len(SAMPLE_TOOLS): + next_cursor = str(start_idx + page_size) + + return types.ListToolsResult(tools=page_tools, nextCursor=next_cursor) + + # Paginated list_resources - returns 10 resources per page + @app.list_resources() + async def list_resources_paginated( + request: types.ListResourcesRequest, + ) -> types.ListResourcesResult: + page_size = 10 + + cursor = request.params.cursor if request.params is not None else None + if cursor is None: + # First page + start_idx = 0 + else: + # Parse cursor to get the start index + try: + start_idx = int(cursor) + except (ValueError, TypeError): + # Invalid cursor, return empty + return types.ListResourcesResult(resources=[], nextCursor=None) + + # Get the page of resources + page_resources = SAMPLE_RESOURCES[start_idx : start_idx + page_size] + + # Determine if there are more pages + next_cursor = None + if start_idx + page_size < len(SAMPLE_RESOURCES): + next_cursor = str(start_idx + page_size) + + return types.ListResourcesResult(resources=page_resources, nextCursor=next_cursor) + + # Paginated list_prompts - returns 7 prompts per page + @app.list_prompts() + async def list_prompts_paginated( + request: types.ListPromptsRequest, + ) -> types.ListPromptsResult: + page_size = 7 + + cursor = request.params.cursor if request.params is not None else None + if cursor is None: + # First page + start_idx = 0 + else: + # Parse cursor to get the start index + try: + start_idx = int(cursor) + except (ValueError, TypeError): + # Invalid cursor, return empty + return types.ListPromptsResult(prompts=[], nextCursor=None) + + # Get the page of prompts + page_prompts = SAMPLE_PROMPTS[start_idx : start_idx + page_size] + + # Determine if there are more pages + next_cursor = None + if start_idx + page_size < len(SAMPLE_PROMPTS): + next_cursor = str(start_idx + page_size) + + return types.ListPromptsResult(prompts=page_prompts, nextCursor=next_cursor) + + # Implement call_tool handler + @app.call_tool() + async def call_tool(name: str, arguments: dict[str, Any]) -> list[types.ContentBlock]: + # Find the tool in our sample data + tool = next((t for t in SAMPLE_TOOLS if t.name == name), None) + if not tool: + raise ValueError(f"Unknown tool: {name}") + + # Simple mock response + return [ + types.TextContent( + type="text", + text=f"Called tool '{name}' with arguments: {arguments}", + ) + ] + + # Implement read_resource handler + @app.read_resource() + async def read_resource(uri: AnyUrl) -> str: + # Find the resource in our sample data + resource = next((r for r in SAMPLE_RESOURCES if r.uri == uri), None) + if not resource: + raise ValueError(f"Unknown resource: {uri}") + + # Return a simple string - the decorator will convert it to TextResourceContents + return f"Content of {resource.name}: This is sample content for the resource." + + # Implement get_prompt handler + @app.get_prompt() + async def get_prompt(name: str, arguments: dict[str, str] | None) -> types.GetPromptResult: + # Find the prompt in our sample data + prompt = next((p for p in SAMPLE_PROMPTS if p.name == name), None) + if not prompt: + raise ValueError(f"Unknown prompt: {name}") + + # Simple mock response + message_text = f"This is the prompt '{name}'" + if arguments: + message_text += f" with arguments: {arguments}" + + return types.GetPromptResult( + description=prompt.description, + messages=[ + types.PromptMessage( + role="user", + content=types.TextContent(type="text", text=message_text), + ) + ], + ) + + if transport == "sse": + from mcp.server.sse import SseServerTransport + from starlette.applications import Starlette + from starlette.responses import Response + from starlette.routing import Mount, Route + + sse = SseServerTransport("/messages/") + + async def handle_sse(request: Request): + async with sse.connect_sse(request.scope, request.receive, request._send) as streams: # type: ignore[reportPrivateUsage] + await app.run(streams[0], streams[1], app.create_initialization_options()) + return Response() + + starlette_app = Starlette( + debug=True, + routes=[ + Route("/sse", endpoint=handle_sse, methods=["GET"]), + Mount("/messages/", app=sse.handle_post_message), + ], + ) + + import uvicorn + + uvicorn.run(starlette_app, host="127.0.0.1", port=port) + else: + from mcp.server.stdio import stdio_server + + async def arun(): + async with stdio_server() as streams: + await app.run(streams[0], streams[1], app.create_initialization_options()) + + anyio.run(arun) + + return 0 diff --git a/examples/servers/simple-pagination/pyproject.toml b/examples/servers/simple-pagination/pyproject.toml new file mode 100644 index 000000000..0c60cf73c --- /dev/null +++ b/examples/servers/simple-pagination/pyproject.toml @@ -0,0 +1,47 @@ +[project] +name = "mcp-simple-pagination" +version = "0.1.0" +description = "A simple MCP server demonstrating pagination for tools, resources, and prompts" +readme = "README.md" +requires-python = ">=3.10" +authors = [{ name = "Anthropic, PBC." }] +maintainers = [ + { name = "David Soria Parra", email = "davidsp@anthropic.com" }, + { name = "Justin Spahr-Summers", email = "justin@anthropic.com" }, +] +keywords = ["mcp", "llm", "automation", "pagination", "cursor"] +license = { text = "MIT" } +classifiers = [ + "Development Status :: 4 - Beta", + "Intended Audience :: Developers", + "License :: OSI Approved :: MIT License", + "Programming Language :: Python :: 3", + "Programming Language :: Python :: 3.10", +] +dependencies = ["anyio>=4.5", "click>=8.2.0", "httpx>=0.27", "mcp"] + +[project.scripts] +mcp-simple-pagination = "mcp_simple_pagination.server:main" + +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[tool.hatch.build.targets.wheel] +packages = ["mcp_simple_pagination"] + +[tool.pyright] +include = ["mcp_simple_pagination"] +venvPath = "." +venv = ".venv" + +[tool.ruff.lint] +select = ["E", "F", "I"] +ignore = [] + +[tool.ruff] +line-length = 120 +target-version = "py310" + +[tool.uv] +dev-dependencies = ["pyright>=1.1.378", "pytest>=8.3.3", "ruff>=0.6.9"] \ No newline at end of file diff --git a/examples/snippets/clients/pagination_client.py b/examples/snippets/clients/pagination_client.py new file mode 100644 index 000000000..4df1aec60 --- /dev/null +++ b/examples/snippets/clients/pagination_client.py @@ -0,0 +1,41 @@ +""" +Example of consuming paginated MCP endpoints from a client. +""" + +import asyncio + +from mcp.client.session import ClientSession +from mcp.client.stdio import StdioServerParameters, stdio_client +from mcp.types import Resource + + +async def list_all_resources() -> None: + """Fetch all resources using pagination.""" + async with stdio_client(StdioServerParameters(command="uv", args=["run", "mcp-simple-pagination"])) as ( + read, + write, + ): + async with ClientSession(read, write) as session: + await session.initialize() + + all_resources: list[Resource] = [] + cursor = None + + while True: + # Fetch a page of resources + result = await session.list_resources(cursor=cursor) + all_resources.extend(result.resources) + + print(f"Fetched {len(result.resources)} resources") + + # Check if there are more pages + if result.nextCursor: + cursor = result.nextCursor + else: + break + + print(f"Total resources: {len(all_resources)}") + + +if __name__ == "__main__": + asyncio.run(list_all_resources()) diff --git a/examples/snippets/servers/pagination_example.py b/examples/snippets/servers/pagination_example.py new file mode 100644 index 000000000..70c3b3492 --- /dev/null +++ b/examples/snippets/servers/pagination_example.py @@ -0,0 +1,38 @@ +""" +Example of implementing pagination with MCP server decorators. +""" + +from pydantic import AnyUrl + +import mcp.types as types +from mcp.server.lowlevel import Server + +# Initialize the server +server = Server("paginated-server") + +# Sample data to paginate +ITEMS = [f"Item {i}" for i in range(1, 101)] # 100 items + + +@server.list_resources() +async def list_resources_paginated(request: types.ListResourcesRequest) -> types.ListResourcesResult: + """List resources with pagination support.""" + page_size = 10 + + # Extract cursor from request params + cursor = request.params.cursor if request.params is not None else None + + # Parse cursor to get offset + start = 0 if cursor is None else int(cursor) + end = start + page_size + + # Get page of resources + page_items = [ + types.Resource(uri=AnyUrl(f"resource://items/{item}"), name=item, description=f"Description for {item}") + for item in ITEMS[start:end] + ] + + # Determine next cursor + next_cursor = str(end) if end < len(ITEMS) else None + + return types.ListResourcesResult(resources=page_items, nextCursor=next_cursor) diff --git a/src/mcp/server/lowlevel/func_inspection.py b/src/mcp/server/lowlevel/func_inspection.py new file mode 100644 index 000000000..f5a745db2 --- /dev/null +++ b/src/mcp/server/lowlevel/func_inspection.py @@ -0,0 +1,54 @@ +import inspect +from collections.abc import Callable +from typing import Any, TypeVar, get_type_hints + +T = TypeVar("T") +R = TypeVar("R") + + +def create_call_wrapper(func: Callable[..., R], request_type: type[T]) -> Callable[[T], R]: + """ + Create a wrapper function that knows how to call func with the request object. + + Returns a wrapper function that takes the request and calls func appropriately. + + The wrapper handles three calling patterns: + 1. Positional-only parameter typed as request_type (no default): func(req) + 2. Positional/keyword parameter typed as request_type (no default): func(**{param_name: req}) + 3. No request parameter or parameter with default: func() + """ + try: + sig = inspect.signature(func) + type_hints = get_type_hints(func) + except (ValueError, TypeError, NameError): + return lambda _: func() + + # Check for positional-only parameter typed as request_type + for param_name, param in sig.parameters.items(): + if param.kind == inspect.Parameter.POSITIONAL_ONLY: + param_type = type_hints.get(param_name) + if param_type == request_type: + # Check if it has a default - if so, treat as old style + if param.default is not inspect.Parameter.empty: + return lambda _: func() + # Found positional-only parameter with correct type and no default + return lambda req: func(req) + + # Check for any positional/keyword parameter typed as request_type + for param_name, param in sig.parameters.items(): + if param.kind in (inspect.Parameter.POSITIONAL_OR_KEYWORD, inspect.Parameter.KEYWORD_ONLY): + param_type = type_hints.get(param_name) + if param_type == request_type: + # Check if it has a default - if so, treat as old style + if param.default is not inspect.Parameter.empty: + return lambda _: func() + + # Found keyword parameter with correct type and no default + # Need to capture param_name in closure properly + def make_keyword_wrapper(name: str) -> Callable[[Any], Any]: + return lambda req: func(**{name: req}) + + return make_keyword_wrapper(param_name) + + # No request parameter found - use old style + return lambda _: func() diff --git a/src/mcp/server/lowlevel/server.py b/src/mcp/server/lowlevel/server.py index 3076e283e..3448424bc 100644 --- a/src/mcp/server/lowlevel/server.py +++ b/src/mcp/server/lowlevel/server.py @@ -11,7 +11,7 @@ 2. Define request handlers using decorators: @server.list_prompts() - async def handle_list_prompts() -> list[types.Prompt]: + async def handle_list_prompts(request: types.ListPromptsRequest) -> types.ListPromptsResult: # Implementation @server.get_prompt() @@ -21,7 +21,7 @@ async def handle_get_prompt( # Implementation @server.list_tools() - async def handle_list_tools() -> list[types.Tool]: + async def handle_list_tools(request: types.ListToolsRequest) -> types.ListToolsResult: # Implementation @server.call_tool() @@ -82,6 +82,7 @@ async def main(): from typing_extensions import TypeVar import mcp.types as types +from mcp.server.lowlevel.func_inspection import create_call_wrapper from mcp.server.lowlevel.helper_types import ReadResourceContents from mcp.server.models import InitializationOptions from mcp.server.session import ServerSession @@ -229,12 +230,22 @@ def request_context( return request_ctx.get() def list_prompts(self): - def decorator(func: Callable[[], Awaitable[list[types.Prompt]]]): + def decorator( + func: Callable[[], Awaitable[list[types.Prompt]]] + | Callable[[types.ListPromptsRequest], Awaitable[types.ListPromptsResult]], + ): logger.debug("Registering handler for PromptListRequest") - async def handler(_: Any): - prompts = await func() - return types.ServerResult(types.ListPromptsResult(prompts=prompts)) + wrapper = create_call_wrapper(func, types.ListPromptsRequest) + + async def handler(req: types.ListPromptsRequest): + result = await wrapper(req) + # Handle both old style (list[Prompt]) and new style (ListPromptsResult) + if isinstance(result, types.ListPromptsResult): + return types.ServerResult(result) + else: + # Old style returns list[Prompt] + return types.ServerResult(types.ListPromptsResult(prompts=result)) self.request_handlers[types.ListPromptsRequest] = handler return func @@ -257,12 +268,22 @@ async def handler(req: types.GetPromptRequest): return decorator def list_resources(self): - def decorator(func: Callable[[], Awaitable[list[types.Resource]]]): + def decorator( + func: Callable[[], Awaitable[list[types.Resource]]] + | Callable[[types.ListResourcesRequest], Awaitable[types.ListResourcesResult]], + ): logger.debug("Registering handler for ListResourcesRequest") - async def handler(_: Any): - resources = await func() - return types.ServerResult(types.ListResourcesResult(resources=resources)) + wrapper = create_call_wrapper(func, types.ListResourcesRequest) + + async def handler(req: types.ListResourcesRequest): + result = await wrapper(req) + # Handle both old style (list[Resource]) and new style (ListResourcesResult) + if isinstance(result, types.ListResourcesResult): + return types.ServerResult(result) + else: + # Old style returns list[Resource] + return types.ServerResult(types.ListResourcesResult(resources=result)) self.request_handlers[types.ListResourcesRequest] = handler return func @@ -380,16 +401,30 @@ async def handler(req: types.UnsubscribeRequest): return decorator def list_tools(self): - def decorator(func: Callable[[], Awaitable[list[types.Tool]]]): + def decorator( + func: Callable[[], Awaitable[list[types.Tool]]] + | Callable[[types.ListToolsRequest], Awaitable[types.ListToolsResult]], + ): logger.debug("Registering handler for ListToolsRequest") - async def handler(_: Any): - tools = await func() - # Refresh the tool cache - self._tool_cache.clear() - for tool in tools: - self._tool_cache[tool.name] = tool - return types.ServerResult(types.ListToolsResult(tools=tools)) + wrapper = create_call_wrapper(func, types.ListToolsRequest) + + async def handler(req: types.ListToolsRequest): + result = await wrapper(req) + + # Handle both old style (list[Tool]) and new style (ListToolsResult) + if isinstance(result, types.ListToolsResult): + # Refresh the tool cache with returned tools + for tool in result.tools: + self._tool_cache[tool.name] = tool + return types.ServerResult(result) + else: + # Old style returns list[Tool] + # Clear and refresh the entire tool cache + self._tool_cache.clear() + for tool in result: + self._tool_cache[tool.name] = tool + return types.ServerResult(types.ListToolsResult(tools=result)) self.request_handlers[types.ListToolsRequest] = handler return func diff --git a/tests/server/lowlevel/__init__.py b/tests/server/lowlevel/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/server/lowlevel/test_func_inspection.py b/tests/server/lowlevel/test_func_inspection.py new file mode 100644 index 000000000..556fede4a --- /dev/null +++ b/tests/server/lowlevel/test_func_inspection.py @@ -0,0 +1,292 @@ +"""Unit tests for func_inspection module. + +Tests the create_call_wrapper function which determines how to call handler functions +with different parameter signatures and type hints. +""" + +from typing import Any, Generic, TypeVar + +import pytest + +from mcp.server.lowlevel.func_inspection import create_call_wrapper +from mcp.types import ListPromptsRequest, ListResourcesRequest, ListToolsRequest, PaginatedRequestParams + +T = TypeVar("T") + + +@pytest.mark.anyio +async def test_no_params_returns_deprecated_wrapper() -> None: + """Test: def foo() - should call without request.""" + called_without_request = False + + async def handler() -> list[str]: + nonlocal called_without_request + called_without_request = True + return ["test"] + + wrapper = create_call_wrapper(handler, ListPromptsRequest) + + # Wrapper should call handler without passing request + request = ListPromptsRequest(method="prompts/list", params=None) + result = await wrapper(request) + assert called_without_request is True + assert result == ["test"] + + +@pytest.mark.anyio +async def test_param_with_default_returns_deprecated_wrapper() -> None: + """Test: def foo(thing: int = 1) - should call without request.""" + called_without_request = False + + async def handler(thing: int = 1) -> list[str]: + nonlocal called_without_request + called_without_request = True + return [f"test-{thing}"] + + wrapper = create_call_wrapper(handler, ListPromptsRequest) + + # Wrapper should call handler without passing request (uses default value) + request = ListPromptsRequest(method="prompts/list", params=None) + result = await wrapper(request) + assert called_without_request is True + assert result == ["test-1"] + + +@pytest.mark.anyio +async def test_typed_request_param_passes_request() -> None: + """Test: def foo(req: ListPromptsRequest) - should pass request through.""" + received_request = None + + async def handler(req: ListPromptsRequest) -> list[str]: + nonlocal received_request + received_request = req + return ["test"] + + wrapper = create_call_wrapper(handler, ListPromptsRequest) + + # Wrapper should pass request to handler + request = ListPromptsRequest(method="prompts/list", params=PaginatedRequestParams(cursor="test-cursor")) + await wrapper(request) + + assert received_request is not None + assert received_request is request + params = getattr(received_request, "params", None) + assert params is not None + assert params.cursor == "test-cursor" + + +@pytest.mark.anyio +async def test_typed_request_with_default_param_passes_request() -> None: + """Test: def foo(req: ListPromptsRequest, thing: int = 1) - should pass request through.""" + received_request = None + received_thing = None + + async def handler(req: ListPromptsRequest, thing: int = 1) -> list[str]: + nonlocal received_request, received_thing + received_request = req + received_thing = thing + return ["test"] + + wrapper = create_call_wrapper(handler, ListPromptsRequest) + + # Wrapper should pass request to handler + request = ListPromptsRequest(method="prompts/list", params=None) + await wrapper(request) + + assert received_request is request + assert received_thing == 1 # default value + + +@pytest.mark.anyio +async def test_optional_typed_request_with_default_none_is_deprecated() -> None: + """Test: def foo(thing: int = 1, req: ListPromptsRequest | None = None) - old style.""" + called_without_request = False + + async def handler(thing: int = 1, req: ListPromptsRequest | None = None) -> list[str]: + nonlocal called_without_request + called_without_request = True + return ["test"] + + wrapper = create_call_wrapper(handler, ListPromptsRequest) + + # Wrapper should call handler without passing request + request = ListPromptsRequest(method="prompts/list", params=None) + result = await wrapper(request) + assert called_without_request is True + assert result == ["test"] + + +@pytest.mark.anyio +async def test_untyped_request_param_is_deprecated() -> None: + """Test: def foo(req) - should call without request.""" + called = False + + async def handler(req): # type: ignore[no-untyped-def] # pyright: ignore[reportMissingParameterType] + nonlocal called + called = True + return ["test"] + + wrapper = create_call_wrapper(handler, ListPromptsRequest) # pyright: ignore[reportUnknownArgumentType] + + # Wrapper should call handler without passing request, which will fail because req is required + request = ListPromptsRequest(method="prompts/list", params=None) + # This will raise TypeError because handler expects 'req' but wrapper doesn't provide it + with pytest.raises(TypeError, match="missing 1 required positional argument"): + await wrapper(request) + + +@pytest.mark.anyio +async def test_any_typed_request_param_is_deprecated() -> None: + """Test: def foo(req: Any) - should call without request.""" + + async def handler(req: Any) -> list[str]: + return ["test"] + + wrapper = create_call_wrapper(handler, ListPromptsRequest) + + # Wrapper should call handler without passing request, which will fail because req is required + request = ListPromptsRequest(method="prompts/list", params=None) + # This will raise TypeError because handler expects 'req' but wrapper doesn't provide it + with pytest.raises(TypeError, match="missing 1 required positional argument"): + await wrapper(request) + + +@pytest.mark.anyio +async def test_generic_typed_request_param_is_deprecated() -> None: + """Test: def foo(req: Generic[T]) - should call without request.""" + + async def handler(req: Generic[T]) -> list[str]: # pyright: ignore[reportGeneralTypeIssues] + return ["test"] + + wrapper = create_call_wrapper(handler, ListPromptsRequest) + + # Wrapper should call handler without passing request, which will fail because req is required + request = ListPromptsRequest(method="prompts/list", params=None) + # This will raise TypeError because handler expects 'req' but wrapper doesn't provide it + with pytest.raises(TypeError, match="missing 1 required positional argument"): + await wrapper(request) + + +@pytest.mark.anyio +async def test_wrong_typed_request_param_is_deprecated() -> None: + """Test: def foo(req: str) - should call without request.""" + + async def handler(req: str) -> list[str]: + return ["test"] + + wrapper = create_call_wrapper(handler, ListPromptsRequest) + + # Wrapper should call handler without passing request, which will fail because req is required + request = ListPromptsRequest(method="prompts/list", params=None) + # This will raise TypeError because handler expects 'req' but wrapper doesn't provide it + with pytest.raises(TypeError, match="missing 1 required positional argument"): + await wrapper(request) + + +@pytest.mark.anyio +async def test_required_param_before_typed_request_attempts_to_pass() -> None: + """Test: def foo(thing: int, req: ListPromptsRequest) - attempts to pass request (will fail at runtime).""" + received_request = None + + async def handler(thing: int, req: ListPromptsRequest) -> list[str]: + nonlocal received_request + received_request = req + return ["test"] + + wrapper = create_call_wrapper(handler, ListPromptsRequest) + + # Wrapper will attempt to pass request, but it will fail at runtime + # because 'thing' is required and has no default + request = ListPromptsRequest(method="prompts/list", params=None) + + # This will raise TypeError because 'thing' is missing + with pytest.raises(TypeError, match="missing 1 required positional argument: 'thing'"): + await wrapper(request) + + +@pytest.mark.anyio +async def test_positional_only_param_with_correct_type() -> None: + """Test: def foo(req: ListPromptsRequest, /) - should pass request through.""" + received_request = None + + async def handler(req: ListPromptsRequest, /) -> list[str]: + nonlocal received_request + received_request = req + return ["test"] + + wrapper = create_call_wrapper(handler, ListPromptsRequest) + + # Wrapper should pass request to handler + request = ListPromptsRequest(method="prompts/list", params=None) + await wrapper(request) + + assert received_request is request + + +@pytest.mark.anyio +async def test_keyword_only_param_with_correct_type() -> None: + """Test: def foo(*, req: ListPromptsRequest) - should pass request through.""" + received_request = None + + async def handler(*, req: ListPromptsRequest) -> list[str]: + nonlocal received_request + received_request = req + return ["test"] + + wrapper = create_call_wrapper(handler, ListPromptsRequest) + + # Wrapper should pass request to handler with keyword argument + request = ListPromptsRequest(method="prompts/list", params=None) + await wrapper(request) + + assert received_request is request + + +@pytest.mark.anyio +async def test_different_request_types() -> None: + """Test that wrapper works with different request types.""" + # Test with ListResourcesRequest + received_request = None + + async def handler(req: ListResourcesRequest) -> list[str]: + nonlocal received_request + received_request = req + return ["test"] + + wrapper = create_call_wrapper(handler, ListResourcesRequest) + + request = ListResourcesRequest(method="resources/list", params=None) + await wrapper(request) + + assert received_request is request + + # Test with ListToolsRequest + received_request = None + + async def handler2(req: ListToolsRequest) -> list[str]: + nonlocal received_request + received_request = req + return ["test"] + + wrapper2 = create_call_wrapper(handler2, ListToolsRequest) + + request2 = ListToolsRequest(method="tools/list", params=None) + await wrapper2(request2) + + assert received_request is request2 + + +@pytest.mark.anyio +async def test_mixed_params_with_typed_request() -> None: + """Test: def foo(a: str, req: ListPromptsRequest, b: int = 5) - attempts to pass request.""" + + async def handler(a: str, req: ListPromptsRequest, b: int = 5) -> list[str]: + return ["test"] + + wrapper = create_call_wrapper(handler, ListPromptsRequest) + + # Will fail at runtime due to missing 'a' + request = ListPromptsRequest(method="prompts/list", params=None) + + with pytest.raises(TypeError, match="missing 1 required positional argument: 'a'"): + await wrapper(request) diff --git a/tests/server/lowlevel/test_server_listing.py b/tests/server/lowlevel/test_server_listing.py new file mode 100644 index 000000000..23ac7e451 --- /dev/null +++ b/tests/server/lowlevel/test_server_listing.py @@ -0,0 +1,182 @@ +"""Basic tests for list_prompts, list_resources, and list_tools decorators without pagination.""" + +import warnings + +import pytest +from pydantic import AnyUrl + +from mcp.server import Server +from mcp.types import ( + ListPromptsRequest, + ListPromptsResult, + ListResourcesRequest, + ListResourcesResult, + ListToolsRequest, + ListToolsResult, + Prompt, + Resource, + ServerResult, + Tool, +) + + +@pytest.mark.anyio +async def test_list_prompts_basic() -> None: + """Test basic prompt listing without pagination.""" + server = Server("test") + + test_prompts = [ + Prompt(name="prompt1", description="First prompt"), + Prompt(name="prompt2", description="Second prompt"), + ] + + with warnings.catch_warnings(): + warnings.simplefilter("ignore", DeprecationWarning) + + @server.list_prompts() + async def handle_list_prompts() -> list[Prompt]: + return test_prompts + + handler = server.request_handlers[ListPromptsRequest] + request = ListPromptsRequest(method="prompts/list", params=None) + result = await handler(request) + + assert isinstance(result, ServerResult) + assert isinstance(result.root, ListPromptsResult) + assert result.root.prompts == test_prompts + + +@pytest.mark.anyio +async def test_list_resources_basic() -> None: + """Test basic resource listing without pagination.""" + server = Server("test") + + test_resources = [ + Resource(uri=AnyUrl("file:///test1.txt"), name="Test 1"), + Resource(uri=AnyUrl("file:///test2.txt"), name="Test 2"), + ] + + with warnings.catch_warnings(): + warnings.simplefilter("ignore", DeprecationWarning) + + @server.list_resources() + async def handle_list_resources() -> list[Resource]: + return test_resources + + handler = server.request_handlers[ListResourcesRequest] + request = ListResourcesRequest(method="resources/list", params=None) + result = await handler(request) + + assert isinstance(result, ServerResult) + assert isinstance(result.root, ListResourcesResult) + assert result.root.resources == test_resources + + +@pytest.mark.anyio +async def test_list_tools_basic() -> None: + """Test basic tool listing without pagination.""" + server = Server("test") + + test_tools = [ + Tool( + name="tool1", + description="First tool", + inputSchema={ + "type": "object", + "properties": { + "message": {"type": "string"}, + }, + "required": ["message"], + }, + ), + Tool( + name="tool2", + description="Second tool", + inputSchema={ + "type": "object", + "properties": { + "count": {"type": "number"}, + "enabled": {"type": "boolean"}, + }, + "required": ["count"], + }, + ), + ] + + with warnings.catch_warnings(): + warnings.simplefilter("ignore", DeprecationWarning) + + @server.list_tools() + async def handle_list_tools() -> list[Tool]: + return test_tools + + handler = server.request_handlers[ListToolsRequest] + request = ListToolsRequest(method="tools/list", params=None) + result = await handler(request) + + assert isinstance(result, ServerResult) + assert isinstance(result.root, ListToolsResult) + assert result.root.tools == test_tools + + +@pytest.mark.anyio +async def test_list_prompts_empty() -> None: + """Test listing with empty results.""" + server = Server("test") + + with warnings.catch_warnings(): + warnings.simplefilter("ignore", DeprecationWarning) + + @server.list_prompts() + async def handle_list_prompts() -> list[Prompt]: + return [] + + handler = server.request_handlers[ListPromptsRequest] + request = ListPromptsRequest(method="prompts/list", params=None) + result = await handler(request) + + assert isinstance(result, ServerResult) + assert isinstance(result.root, ListPromptsResult) + assert result.root.prompts == [] + + +@pytest.mark.anyio +async def test_list_resources_empty() -> None: + """Test listing with empty results.""" + server = Server("test") + + with warnings.catch_warnings(): + warnings.simplefilter("ignore", DeprecationWarning) + + @server.list_resources() + async def handle_list_resources() -> list[Resource]: + return [] + + handler = server.request_handlers[ListResourcesRequest] + request = ListResourcesRequest(method="resources/list", params=None) + result = await handler(request) + + assert isinstance(result, ServerResult) + assert isinstance(result.root, ListResourcesResult) + assert result.root.resources == [] + + +@pytest.mark.anyio +async def test_list_tools_empty() -> None: + """Test listing with empty results.""" + server = Server("test") + + with warnings.catch_warnings(): + warnings.simplefilter("ignore", DeprecationWarning) + + @server.list_tools() + async def handle_list_tools() -> list[Tool]: + return [] + + handler = server.request_handlers[ListToolsRequest] + request = ListToolsRequest(method="tools/list", params=None) + result = await handler(request) + + assert isinstance(result, ServerResult) + assert isinstance(result.root, ListToolsResult) + assert result.root.tools == [] diff --git a/tests/server/lowlevel/test_server_pagination.py b/tests/server/lowlevel/test_server_pagination.py new file mode 100644 index 000000000..8d64dd525 --- /dev/null +++ b/tests/server/lowlevel/test_server_pagination.py @@ -0,0 +1,111 @@ +import pytest + +from mcp.server import Server +from mcp.types import ( + ListPromptsRequest, + ListPromptsResult, + ListResourcesRequest, + ListResourcesResult, + ListToolsRequest, + ListToolsResult, + PaginatedRequestParams, + ServerResult, +) + + +@pytest.mark.anyio +async def test_list_prompts_pagination() -> None: + server = Server("test") + test_cursor = "test-cursor-123" + + # Track what request was received + received_request: ListPromptsRequest | None = None + + @server.list_prompts() + async def handle_list_prompts(request: ListPromptsRequest) -> ListPromptsResult: + nonlocal received_request + received_request = request + return ListPromptsResult(prompts=[], nextCursor="next") + + handler = server.request_handlers[ListPromptsRequest] + + # Test: No cursor provided -> handler receives request with None params + request = ListPromptsRequest(method="prompts/list", params=None) + result = await handler(request) + assert received_request is not None + assert received_request.params is None + assert isinstance(result, ServerResult) + + # Test: Cursor provided -> handler receives request with cursor in params + request_with_cursor = ListPromptsRequest(method="prompts/list", params=PaginatedRequestParams(cursor=test_cursor)) + result2 = await handler(request_with_cursor) + assert received_request is not None + assert received_request.params is not None + assert received_request.params.cursor == test_cursor + assert isinstance(result2, ServerResult) + + +@pytest.mark.anyio +async def test_list_resources_pagination() -> None: + server = Server("test") + test_cursor = "resource-cursor-456" + + # Track what request was received + received_request: ListResourcesRequest | None = None + + @server.list_resources() + async def handle_list_resources(request: ListResourcesRequest) -> ListResourcesResult: + nonlocal received_request + received_request = request + return ListResourcesResult(resources=[], nextCursor="next") + + handler = server.request_handlers[ListResourcesRequest] + + # Test: No cursor provided -> handler receives request with None params + request = ListResourcesRequest(method="resources/list", params=None) + result = await handler(request) + assert received_request is not None + assert received_request.params is None + assert isinstance(result, ServerResult) + + # Test: Cursor provided -> handler receives request with cursor in params + request_with_cursor = ListResourcesRequest( + method="resources/list", params=PaginatedRequestParams(cursor=test_cursor) + ) + result2 = await handler(request_with_cursor) + assert received_request is not None + assert received_request.params is not None + assert received_request.params.cursor == test_cursor + assert isinstance(result2, ServerResult) + + +@pytest.mark.anyio +async def test_list_tools_pagination() -> None: + server = Server("test") + test_cursor = "tools-cursor-789" + + # Track what request was received + received_request: ListToolsRequest | None = None + + @server.list_tools() + async def handle_list_tools(request: ListToolsRequest) -> ListToolsResult: + nonlocal received_request + received_request = request + return ListToolsResult(tools=[], nextCursor="next") + + handler = server.request_handlers[ListToolsRequest] + + # Test: No cursor provided -> handler receives request with None params + request = ListToolsRequest(method="tools/list", params=None) + result = await handler(request) + assert received_request is not None + assert received_request.params is None + assert isinstance(result, ServerResult) + + # Test: Cursor provided -> handler receives request with cursor in params + request_with_cursor = ListToolsRequest(method="tools/list", params=PaginatedRequestParams(cursor=test_cursor)) + result2 = await handler(request_with_cursor) + assert received_request is not None + assert received_request.params is not None + assert received_request.params.cursor == test_cursor + assert isinstance(result2, ServerResult) diff --git a/uv.lock b/uv.lock index 7979f9aab..68abdcc4f 100644 --- a/uv.lock +++ b/uv.lock @@ -1,11 +1,12 @@ version = 1 -revision = 3 +revision = 2 requires-python = ">=3.10" [manifest] members = [ "mcp", "mcp-simple-auth", + "mcp-simple-pagination", "mcp-simple-prompt", "mcp-simple-resource", "mcp-simple-streamablehttp", @@ -730,6 +731,39 @@ dev = [ { name = "ruff", specifier = ">=0.8.5" }, ] +[[package]] +name = "mcp-simple-pagination" +version = "0.1.0" +source = { editable = "examples/servers/simple-pagination" } +dependencies = [ + { name = "anyio" }, + { name = "click" }, + { name = "httpx" }, + { name = "mcp" }, +] + +[package.dev-dependencies] +dev = [ + { name = "pyright" }, + { name = "pytest" }, + { name = "ruff" }, +] + +[package.metadata] +requires-dist = [ + { name = "anyio", specifier = ">=4.5" }, + { name = "click", specifier = ">=8.2.0" }, + { name = "httpx", specifier = ">=0.27" }, + { name = "mcp", editable = "." }, +] + +[package.metadata.requires-dev] +dev = [ + { name = "pyright", specifier = ">=1.1.378" }, + { name = "pytest", specifier = ">=8.3.3" }, + { name = "ruff", specifier = ">=0.6.9" }, +] + [[package]] name = "mcp-simple-prompt" version = "0.1.0"