From 0912d7dddd45caf9bad41f42477a7a5d4d6e7215 Mon Sep 17 00:00:00 2001 From: Koichi ITO Date: Thu, 12 Mar 2026 01:05:13 +0900 Subject: [PATCH] Make `StreamableHTTPTransport` a Rack application Add `call(env)` to `StreamableHTTPTransport`, making it a Rack application that works with `mount`, `run`, and Rack middleware. Refactor examples to use Rack middleware classes for MCP logging instead of proc wrappers, demonstrating idiomatic Rack composition with the new `run(transport)` pattern. Update README.md with mount and controller integration patterns. Closes #59, #60 ## How Has This Been Tested? Added tests for `call(env)` as a Rack app. All tests pass. ## Breaking Change No breaking changes. All existing APIs are preserved: - `StreamableHTTPTransport.new(server)` continues to work as before. - `handle_request(request)` is unchanged. The new `call(env)` is a purely additive public method. --- AGENTS.md | 4 +- README.md | 32 +++++- examples/http_server.rb | 81 +++++++------- examples/streamable_http_server.rb | 104 +++++++++--------- .../transports/streamable_http_transport.rb | 14 +++ .../streamable_http_transport_test.rb | 79 +++++++++++++ 6 files changed, 210 insertions(+), 104 deletions(-) diff --git a/AGENTS.md b/AGENTS.md index 6b269d72..53a98c2d 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -102,6 +102,6 @@ This is the official Ruby SDK for the Model Context Protocol (MCP), implementing ### Integration patterns -- **Rails controllers**: Use `server.handle_json(request.body.read)` for HTTP endpoints +- **Rails/Rack apps**: Mount `StreamableHTTPTransport` as a Rack app (e.g., `mount transport => "/mcp"`) - **Command-line tools**: Use `StdioTransport.new(server).open` for CLI applications -- **HTTP services**: Use `StreamableHttpTransport` for web-based servers +- **HTTP services**: Use `StreamableHTTPTransport` for web-based servers diff --git a/README.md b/README.md index 2914c4bc..1e9d68fe 100644 --- a/README.md +++ b/README.md @@ -301,14 +301,34 @@ transport = MCP::Server::Transports::StreamableHTTPTransport.new(server, statele ### Usage -#### Rails Controller +#### Rails (mount) -When added to a Rails controller on a route that handles POST requests, your server will be compliant with non-streaming -[Streamable HTTP](https://modelcontextprotocol.io/specification/latest/basic/transports#streamable-http) transport -requests. +`StreamableHTTPTransport` is a Rack app that can be mounted directly in Rails routes: -You can use `StreamableHTTPTransport#handle_request` to handle requests with proper HTTP -status codes (e.g., 202 Accepted for notifications). +```ruby +# config/routes.rb +server = MCP::Server.new( + name: "my_server", + title: "Example Server Display Name", + version: "1.0.0", + instructions: "Use the tools of this server as a last resort", + tools: [SomeTool, AnotherTool], + prompts: [MyPrompt], +) +transport = MCP::Server::Transports::StreamableHTTPTransport.new(server) +server.transport = transport + +Rails.application.routes.draw do + mount transport => "/mcp" +end +``` + +#### Rails (controller) + +While the mount approach creates a single server at boot time, the controller approach creates a new server per request. +This allows you to customize tools, prompts, or configuration based on the request (e.g., different tools per route). + +`StreamableHTTPTransport#handle_request` returns proper HTTP status codes (e.g., 202 Accepted for notifications): ```ruby class McpController < ActionController::Base diff --git a/examples/http_server.rb b/examples/http_server.rb index 7c0e760c..8f732bf8 100644 --- a/examples/http_server.rb +++ b/examples/http_server.rb @@ -96,52 +96,52 @@ def template(args, server_context:) transport = MCP::Server::Transports::StreamableHTTPTransport.new(server) server.transport = transport -# Create a logger for MCP-specific logging -mcp_logger = Logger.new($stdout) -mcp_logger.formatter = proc do |_severity, _datetime, _progname, msg| - "[MCP] #{msg}\n" -end +# Rack middleware for MCP-specific request/response logging. +class McpRequestLogger + def initialize(app) + @app = app + @logger = Logger.new($stdout) + @logger.formatter = proc { |_severity, _datetime, _progname, msg| "[MCP] #{msg}\n" } + end + + def call(env) + if env["REQUEST_METHOD"] == "POST" + body = env["rack.input"].read + env["rack.input"].rewind -# Create a Rack application with logging -app = proc do |env| - request = Rack::Request.new(env) - - # Log MCP-specific details for POST requests - if request.post? - body = request.body.read - request.body.rewind - begin - parsed_body = JSON.parse(body) - mcp_logger.info("Request: #{parsed_body["method"]} (id: #{parsed_body["id"]})") - mcp_logger.debug("Request body: #{JSON.pretty_generate(parsed_body)}") - rescue JSON::ParserError - mcp_logger.warn("Request body (raw): #{body}") + begin + parsed = JSON.parse(body) + + @logger.info("Request: #{parsed["method"]} (id: #{parsed["id"]})") + @logger.debug("Request body: #{JSON.pretty_generate(parsed)}") + rescue JSON::ParserError + @logger.warn("Request body (raw): #{body}") + end end - end - # Handle the request - response = transport.handle_request(request) - - # Log the MCP response details - _, _, body = response - if body.is_a?(Array) && !body.empty? && body.first - begin - parsed_response = JSON.parse(body.first) - if parsed_response["error"] - mcp_logger.error("Response error: #{parsed_response["error"]["message"]}") - else - mcp_logger.info("Response: #{parsed_response["result"] ? "success" : "empty"} (id: #{parsed_response["id"]})") + status, headers, response_body = @app.call(env) + + if response_body.is_a?(Array) && !response_body.empty? && response_body.first + begin + parsed = JSON.parse(response_body.first) + + if parsed["error"] + @logger.error("Response error: #{parsed["error"]["message"]}") + else + @logger.info("Response: #{parsed["result"] ? "success" : "empty"} (id: #{parsed["id"]})") + end + @logger.debug("Response body: #{JSON.pretty_generate(parsed)}") + rescue JSON::ParserError + @logger.warn("Response body (raw): #{response_body}") end - mcp_logger.debug("Response body: #{JSON.pretty_generate(parsed_response)}") - rescue JSON::ParserError - mcp_logger.warn("Response body (raw): #{body}") end - end - response + [status, headers, response_body] + end end -# Wrap the app with Rack middleware +# Build the Rack application with middleware. +# `StreamableHTTPTransport` responds to `call(env)`, so it can be used directly as a Rack app. rack_app = Rack::Builder.new do # Enable CORS to allow browser-based MCP clients (e.g., MCP Inspector) # WARNING: origins("*") allows all origins. Restrict this in production. @@ -159,11 +159,10 @@ def template(args, server_context:) # Use CommonLogger for standard HTTP request logging use(Rack::CommonLogger, Logger.new($stdout)) - - # Add other useful middleware use(Rack::ShowExceptions) + use(McpRequestLogger) - run(app) + run(transport) end # Start the server diff --git a/examples/streamable_http_server.rb b/examples/streamable_http_server.rb index a7857f63..dac130a4 100644 --- a/examples/streamable_http_server.rb +++ b/examples/streamable_http_server.rb @@ -65,70 +65,62 @@ def call(message:, delay: 0) transport = MCP::Server::Transports::StreamableHTTPTransport.new(server) server.transport = transport -# Create a logger for MCP request/response logging -mcp_logger = Logger.new($stdout) -mcp_logger.formatter = proc do |_severity, _datetime, _progname, msg| - "[MCP] #{msg}\n" -end +# Rack middleware for MCP request/response and SSE logging. +class McpSseLogger + def initialize(app) + @app = app + + @mcp_logger = Logger.new($stdout) + @mcp_logger.formatter = proc { |_severity, _datetime, _progname, msg| "[MCP] #{msg}\n" } + + @sse_logger = Logger.new($stdout) + @sse_logger.formatter = proc { |severity, datetime, _progname, msg| "[SSE] #{severity} #{datetime.strftime("%H:%M:%S.%L")} - #{msg}\n" } + end -# Create the Rack application -app = proc do |env| - request = Rack::Request.new(env) - - # Log request details - if request.post? - body = request.body.read - request.body.rewind - begin - parsed_body = JSON.parse(body) - mcp_logger.info("Request: #{parsed_body["method"]} (id: #{parsed_body["id"]})") - - # Log SSE-specific setup - if parsed_body["method"] == "initialize" - sse_logger.info("New client initializing session") + def call(env) + if env["REQUEST_METHOD"] == "POST" + body = env["rack.input"].read + env["rack.input"].rewind + + begin + parsed = JSON.parse(body) + + @mcp_logger.info("Request: #{parsed["method"]} (id: #{parsed["id"]})") + @sse_logger.info("New client initializing session") if parsed["method"] == "initialize" + rescue JSON::ParserError + @mcp_logger.warn("Invalid JSON in request") end - rescue JSON::ParserError - mcp_logger.warn("Invalid JSON in request") + elsif env["REQUEST_METHOD"] == "GET" + session_id = env["HTTP_MCP_SESSION_ID"] || Rack::Utils.parse_query(env["QUERY_STRING"])["sessionId"] + + @sse_logger.info("SSE connection request for session: #{session_id}") end - elsif request.get? - session_id = request.env["HTTP_MCP_SESSION_ID"] || - Rack::Utils.parse_query(request.env["QUERY_STRING"])["sessionId"] - sse_logger.info("SSE connection request for session: #{session_id}") - end - # Handle the request - response = transport.handle_request(request) - - # Log response details - status, headers, body = response - if body.is_a?(Array) && !body.empty? && request.post? - begin - parsed_response = JSON.parse(body.first) - if parsed_response["error"] - mcp_logger.error("Response error: #{parsed_response["error"]["message"]}") - elsif parsed_response["accepted"] - # Response was sent via SSE - server.notify_log_message(data: { details: "Response accepted and sent via SSE" }, level: "info") - sse_logger.info("Response sent via SSE stream") - else - mcp_logger.info("Response: success (id: #{parsed_response["id"]})") - - # Log session ID for initialization - if headers["Mcp-Session-Id"] - sse_logger.info("Session created: #{headers["Mcp-Session-Id"]}") + status, headers, response_body = @app.call(env) + + if response_body.is_a?(Array) && !response_body.empty? && env["REQUEST_METHOD"] == "POST" + begin + parsed = JSON.parse(response_body.first) + + if parsed["error"] + @mcp_logger.error("Response error: #{parsed["error"]["message"]}") + else + @mcp_logger.info("Response: success (id: #{parsed["id"]})") + @sse_logger.info("Session created: #{headers["Mcp-Session-Id"]}") if headers["Mcp-Session-Id"] end + rescue JSON::ParserError + @mcp_logger.warn("Invalid JSON in response") end - rescue JSON::ParserError - mcp_logger.warn("Invalid JSON in response") + elsif env["REQUEST_METHOD"] == "GET" && status == 200 + @sse_logger.info("SSE stream established") end - elsif request.get? && status == 200 - sse_logger.info("SSE stream established") - end - response + [status, headers, response_body] + end end -# Build the Rack application with middleware +# Build the Rack application with middleware. +# `StreamableHTTPTransport` responds to `call(env)`, so it can be used directly as a Rack app. rack_app = Rack::Builder.new do # Enable CORS to allow browser-based MCP clients (e.g., MCP Inspector) # WARNING: origins("*") allows all origins. Restrict this in production. @@ -146,7 +138,9 @@ def call(message:, delay: 0) use(Rack::CommonLogger, Logger.new($stdout)) use(Rack::ShowExceptions) - run(app) + use(McpSseLogger) + + run(transport) end # Print usage instructions diff --git a/lib/mcp/server/transports/streamable_http_transport.rb b/lib/mcp/server/transports/streamable_http_transport.rb index be5404a5..5aa65690 100644 --- a/lib/mcp/server/transports/streamable_http_transport.rb +++ b/lib/mcp/server/transports/streamable_http_transport.rb @@ -4,6 +4,15 @@ require "securerandom" require_relative "../../transport" +# This file is autoloaded only when `StreamableHTTPTransport` is referenced, +# so the `rack` dependency does not affect `StdioTransport` users. +begin + require "rack" +rescue LoadError + raise LoadError, "The 'rack' gem is required to use the StreamableHTTPTransport. " \ + "Add it to your Gemfile: gem 'rack'" +end + module MCP class Server module Transports @@ -21,6 +30,11 @@ def initialize(server, stateless: false) REQUIRED_GET_ACCEPT_TYPES = ["text/event-stream"].freeze STREAM_WRITE_ERRORS = [IOError, Errno::EPIPE, Errno::ECONNRESET].freeze + # Rack app interface. This transport can be mounted as a Rack app. + def call(env) + handle_request(Rack::Request.new(env)) + end + def handle_request(request) case request.env["REQUEST_METHOD"] when "POST" diff --git a/test/mcp/server/transports/streamable_http_transport_test.rb b/test/mcp/server/transports/streamable_http_transport_test.rb index abeb2e57..ee57bea5 100644 --- a/test/mcp/server/transports/streamable_http_transport_test.rb +++ b/test/mcp/server/transports/streamable_http_transport_test.rb @@ -1247,6 +1247,85 @@ class StreamableHTTPTransportTest < ActiveSupport::TestCase assert_equal(200, response[0]) end + test "call(env) works as a Rack app for POST requests" do + env = { + "REQUEST_METHOD" => "POST", + "PATH_INFO" => "/", + "rack.input" => StringIO.new({ jsonrpc: "2.0", method: "initialize", id: "init-1" }.to_json), + "CONTENT_TYPE" => "application/json", + "HTTP_ACCEPT" => "application/json, text/event-stream", + } + + response = @transport.call(env) + assert_equal 200, response[0] + assert_equal "application/json", response[1]["Content-Type"] + + body = JSON.parse(response[2][0]) + assert_equal "2.0", body["jsonrpc"] + assert_equal "init-1", body["id"] + end + + test "call(env) returns 405 for unsupported HTTP methods" do + env = { + "REQUEST_METHOD" => "PUT", + "PATH_INFO" => "/", + "rack.input" => StringIO.new(""), + } + + response = @transport.call(env) + assert_equal 405, response[0] + end + + test "call(env) handles GET SSE stream request" do + init_env = { + "REQUEST_METHOD" => "POST", + "PATH_INFO" => "/", + "rack.input" => StringIO.new({ jsonrpc: "2.0", method: "initialize", id: "init" }.to_json), + "CONTENT_TYPE" => "application/json", + "HTTP_ACCEPT" => "application/json, text/event-stream", + } + init_response = @transport.call(init_env) + session_id = init_response[1]["Mcp-Session-Id"] + + get_env = { + "REQUEST_METHOD" => "GET", + "PATH_INFO" => "/", + "rack.input" => StringIO.new(""), + "HTTP_ACCEPT" => "text/event-stream", + "HTTP_MCP_SESSION_ID" => session_id, + } + + response = @transport.call(get_env) + assert_equal 200, response[0] + assert_equal "text/event-stream", response[1]["Content-Type"] + assert response[2].is_a?(Proc) + end + + test "call(env) handles DELETE session request" do + init_env = { + "REQUEST_METHOD" => "POST", + "PATH_INFO" => "/", + "rack.input" => StringIO.new({ jsonrpc: "2.0", method: "initialize", id: "init" }.to_json), + "CONTENT_TYPE" => "application/json", + "HTTP_ACCEPT" => "application/json, text/event-stream", + } + init_response = @transport.call(init_env) + session_id = init_response[1]["Mcp-Session-Id"] + + delete_env = { + "REQUEST_METHOD" => "DELETE", + "PATH_INFO" => "/", + "rack.input" => StringIO.new(""), + "HTTP_MCP_SESSION_ID" => session_id, + } + + response = @transport.call(delete_env) + assert_equal 200, response[0] + + body = JSON.parse(response[2][0]) + assert body["success"] + end + private def create_rack_request(method, path, headers, body = nil)