Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions src/mcp/server/streamable_http_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,29 @@ async def handle_request(self, scope: Scope, receive: Receive, send: Send) -> No

async def _handle_stateless_request(self, scope: Scope, receive: Receive, send: Send) -> None:
"""Process request in stateless mode - creating a new transport for each request."""
# In stateless mode, only POST is meaningful. GET (SSE stream) and DELETE
# (session termination) both require session state that stateless mode does
# not maintain, so reject them with 405 before creating a transport.
request = Request(scope, receive)
if request.method in ("GET", "DELETE"):
logger.debug(f"Stateless mode: rejecting {request.method} with 405")
error_response = JSONRPCError(
jsonrpc="2.0",
id=None,
error=ErrorData(
code=INVALID_REQUEST,
message=(f"Method Not Allowed: {request.method} is not supported in stateless mode"),
),
)
response = Response(
content=error_response.model_dump_json(by_alias=True, exclude_unset=True),
status_code=HTTPStatus.METHOD_NOT_ALLOWED,
headers={"Allow": "POST"},
media_type="application/json",
)
await response(scope, receive, send)
return

logger.debug("Stateless mode: Creating new transport for this request")
# No session ID needed in stateless mode
http_transport = StreamableHTTPServerTransport(
Expand Down
113 changes: 113 additions & 0 deletions tests/server/test_streamable_http_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -413,3 +413,116 @@ def test_session_idle_timeout_rejects_non_positive():
def test_session_idle_timeout_rejects_stateless():
with pytest.raises(RuntimeError, match="not supported in stateless"):
StreamableHTTPSessionManager(app=Server("test"), session_idle_timeout=30, stateless=True)


async def _collect_stateless_response(
method: str,
) -> tuple[Message | None, bytes]:
"""Send a request of the given method to a stateless manager and return
(response.start message, response body)."""
app = Server("test-stateless-method")
manager = StreamableHTTPSessionManager(app=app, stateless=True)

sent_messages: list[Message] = []
response_body = b""

async def mock_send(message: Message):
nonlocal response_body
sent_messages.append(message)
if message["type"] == "http.response.body":
response_body += message.get("body", b"")

scope = {
"type": "http",
"method": method,
"path": "/mcp",
"headers": [
(b"content-type", b"application/json"),
(b"accept", b"application/json, text/event-stream"),
],
}

async def mock_receive(): # pragma: no cover
return {"type": "http.request", "body": b"", "more_body": False}

async with manager.run():
await manager.handle_request(scope, mock_receive, mock_send)

response_start = next(
(msg for msg in sent_messages if msg["type"] == "http.response.start"),
None,
)
return response_start, response_body


@pytest.mark.anyio
async def test_stateless_get_returns_405():
"""GET requests return 405 in stateless mode since SSE streams require session state."""
response_start, response_body = await _collect_stateless_response("GET")

assert response_start is not None
assert response_start["status"] == 405

headers = {name.decode().lower(): value.decode() for name, value in response_start.get("headers", [])}
assert headers.get("allow") == "POST"

error_data = json.loads(response_body)
assert error_data["jsonrpc"] == "2.0"
assert error_data["id"] is None
assert error_data["error"]["code"] == INVALID_REQUEST
assert "GET" in error_data["error"]["message"]
assert "stateless" in error_data["error"]["message"].lower()


@pytest.mark.anyio
async def test_stateless_delete_returns_405():
"""DELETE requests return 405 in stateless mode since there is no session to terminate."""
response_start, response_body = await _collect_stateless_response("DELETE")

assert response_start is not None
assert response_start["status"] == 405

headers = {name.decode().lower(): value.decode() for name, value in response_start.get("headers", [])}
assert headers.get("allow") == "POST"

error_data = json.loads(response_body)
assert error_data["jsonrpc"] == "2.0"
assert error_data["id"] is None
assert error_data["error"]["code"] == INVALID_REQUEST
assert "DELETE" in error_data["error"]["message"]


@pytest.mark.anyio
async def test_stateless_get_does_not_create_transport():
"""A GET in stateless mode should short-circuit without spinning up a transport."""
app = Server("test-stateless-no-transport")
manager = StreamableHTTPSessionManager(app=app, stateless=True)

created_transports: list[StreamableHTTPServerTransport] = []
original_constructor = StreamableHTTPServerTransport

def track_transport(*args: Any, **kwargs: Any) -> StreamableHTTPServerTransport:
transport = original_constructor(*args, **kwargs) # pragma: no cover
created_transports.append(transport) # pragma: no cover
return transport # pragma: no cover

with patch.object(streamable_http_manager, "StreamableHTTPServerTransport", side_effect=track_transport):
async with manager.run():
sent_messages: list[Message] = []

async def mock_send(message: Message):
sent_messages.append(message)

scope = {
"type": "http",
"method": "GET",
"path": "/mcp",
"headers": [(b"accept", b"text/event-stream")],
}

async def mock_receive(): # pragma: no cover
return {"type": "http.request", "body": b"", "more_body": False}

await manager.handle_request(scope, mock_receive, mock_send)

assert created_transports == [], "Stateless GET must not create a transport"
Loading