Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,6 @@ This is the official Ruby SDK for the Model Context Protocol (MCP), implementing

### Integration patterns

- **Rails controllers**: Use `server.handle_json(request.body.read)` for HTTP endpoints
- **Rails/Rack apps**: Mount `StreamableHTTPTransport` as a Rack app (e.g., `mount transport => "/mcp"`)
- **Command-line tools**: Use `StdioTransport.new(server).open` for CLI applications
- **HTTP services**: Use `StreamableHttpTransport` for web-based servers
- **HTTP services**: Use `StreamableHTTPTransport` for web-based servers
32 changes: 26 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -301,14 +301,34 @@ transport = MCP::Server::Transports::StreamableHTTPTransport.new(server, statele

### Usage

#### Rails Controller
#### Rails (mount)

When added to a Rails controller on a route that handles POST requests, your server will be compliant with non-streaming
[Streamable HTTP](https://modelcontextprotocol.io/specification/latest/basic/transports#streamable-http) transport
requests.
`StreamableHTTPTransport` is a Rack app that can be mounted directly in Rails routes:

You can use `StreamableHTTPTransport#handle_request` to handle requests with proper HTTP
status codes (e.g., 202 Accepted for notifications).
```ruby
# config/routes.rb
server = MCP::Server.new(
name: "my_server",
title: "Example Server Display Name",
version: "1.0.0",
instructions: "Use the tools of this server as a last resort",
tools: [SomeTool, AnotherTool],
prompts: [MyPrompt],
)
transport = MCP::Server::Transports::StreamableHTTPTransport.new(server)
server.transport = transport

Rails.application.routes.draw do
mount transport => "/mcp"
end
```

#### Rails (controller)

While the mount approach creates a single server at boot time, the controller approach creates a new server per request.
This allows you to customize tools, prompts, or configuration based on the request (e.g., different tools per route).

`StreamableHTTPTransport#handle_request` returns proper HTTP status codes (e.g., 202 Accepted for notifications):

```ruby
class McpController < ActionController::Base
Expand Down
81 changes: 40 additions & 41 deletions examples/http_server.rb
Original file line number Diff line number Diff line change
Expand Up @@ -96,52 +96,52 @@ def template(args, server_context:)
transport = MCP::Server::Transports::StreamableHTTPTransport.new(server)
server.transport = transport

# Create a logger for MCP-specific logging
mcp_logger = Logger.new($stdout)
mcp_logger.formatter = proc do |_severity, _datetime, _progname, msg|
"[MCP] #{msg}\n"
end
# Rack middleware for MCP-specific request/response logging.
class McpRequestLogger
def initialize(app)
@app = app
@logger = Logger.new($stdout)
@logger.formatter = proc { |_severity, _datetime, _progname, msg| "[MCP] #{msg}\n" }
end

def call(env)
if env["REQUEST_METHOD"] == "POST"
body = env["rack.input"].read
env["rack.input"].rewind

# Create a Rack application with logging
app = proc do |env|
request = Rack::Request.new(env)

# Log MCP-specific details for POST requests
if request.post?
body = request.body.read
request.body.rewind
begin
parsed_body = JSON.parse(body)
mcp_logger.info("Request: #{parsed_body["method"]} (id: #{parsed_body["id"]})")
mcp_logger.debug("Request body: #{JSON.pretty_generate(parsed_body)}")
rescue JSON::ParserError
mcp_logger.warn("Request body (raw): #{body}")
begin
parsed = JSON.parse(body)

@logger.info("Request: #{parsed["method"]} (id: #{parsed["id"]})")
@logger.debug("Request body: #{JSON.pretty_generate(parsed)}")
rescue JSON::ParserError
@logger.warn("Request body (raw): #{body}")
end
end
end

# Handle the request
response = transport.handle_request(request)

# Log the MCP response details
_, _, body = response
if body.is_a?(Array) && !body.empty? && body.first
begin
parsed_response = JSON.parse(body.first)
if parsed_response["error"]
mcp_logger.error("Response error: #{parsed_response["error"]["message"]}")
else
mcp_logger.info("Response: #{parsed_response["result"] ? "success" : "empty"} (id: #{parsed_response["id"]})")
status, headers, response_body = @app.call(env)

if response_body.is_a?(Array) && !response_body.empty? && response_body.first
begin
parsed = JSON.parse(response_body.first)

if parsed["error"]
@logger.error("Response error: #{parsed["error"]["message"]}")
else
@logger.info("Response: #{parsed["result"] ? "success" : "empty"} (id: #{parsed["id"]})")
end
@logger.debug("Response body: #{JSON.pretty_generate(parsed)}")
rescue JSON::ParserError
@logger.warn("Response body (raw): #{response_body}")
end
mcp_logger.debug("Response body: #{JSON.pretty_generate(parsed_response)}")
rescue JSON::ParserError
mcp_logger.warn("Response body (raw): #{body}")
end
end

response
[status, headers, response_body]
end
end

# Wrap the app with Rack middleware
# Build the Rack application with middleware.
# `StreamableHTTPTransport` responds to `call(env)`, so it can be used directly as a Rack app.
rack_app = Rack::Builder.new do
# Enable CORS to allow browser-based MCP clients (e.g., MCP Inspector)
# WARNING: origins("*") allows all origins. Restrict this in production.
Expand All @@ -159,11 +159,10 @@ def template(args, server_context:)

# Use CommonLogger for standard HTTP request logging
use(Rack::CommonLogger, Logger.new($stdout))

# Add other useful middleware
use(Rack::ShowExceptions)
use(McpRequestLogger)

run(app)
run(transport)
end

# Start the server
Expand Down
104 changes: 49 additions & 55 deletions examples/streamable_http_server.rb
Original file line number Diff line number Diff line change
Expand Up @@ -65,70 +65,62 @@ def call(message:, delay: 0)
transport = MCP::Server::Transports::StreamableHTTPTransport.new(server)
server.transport = transport

# Create a logger for MCP request/response logging
mcp_logger = Logger.new($stdout)
mcp_logger.formatter = proc do |_severity, _datetime, _progname, msg|
"[MCP] #{msg}\n"
end
# Rack middleware for MCP request/response and SSE logging.
class McpSseLogger
def initialize(app)
@app = app

@mcp_logger = Logger.new($stdout)
@mcp_logger.formatter = proc { |_severity, _datetime, _progname, msg| "[MCP] #{msg}\n" }

@sse_logger = Logger.new($stdout)
@sse_logger.formatter = proc { |severity, datetime, _progname, msg| "[SSE] #{severity} #{datetime.strftime("%H:%M:%S.%L")} - #{msg}\n" }
end

# Create the Rack application
app = proc do |env|
request = Rack::Request.new(env)

# Log request details
if request.post?
body = request.body.read
request.body.rewind
begin
parsed_body = JSON.parse(body)
mcp_logger.info("Request: #{parsed_body["method"]} (id: #{parsed_body["id"]})")

# Log SSE-specific setup
if parsed_body["method"] == "initialize"
sse_logger.info("New client initializing session")
def call(env)
if env["REQUEST_METHOD"] == "POST"
body = env["rack.input"].read
env["rack.input"].rewind

begin
parsed = JSON.parse(body)

@mcp_logger.info("Request: #{parsed["method"]} (id: #{parsed["id"]})")
@sse_logger.info("New client initializing session") if parsed["method"] == "initialize"
rescue JSON::ParserError
@mcp_logger.warn("Invalid JSON in request")
end
rescue JSON::ParserError
mcp_logger.warn("Invalid JSON in request")
elsif env["REQUEST_METHOD"] == "GET"
session_id = env["HTTP_MCP_SESSION_ID"] || Rack::Utils.parse_query(env["QUERY_STRING"])["sessionId"]

@sse_logger.info("SSE connection request for session: #{session_id}")
end
elsif request.get?
session_id = request.env["HTTP_MCP_SESSION_ID"] ||
Rack::Utils.parse_query(request.env["QUERY_STRING"])["sessionId"]
sse_logger.info("SSE connection request for session: #{session_id}")
end

# Handle the request
response = transport.handle_request(request)

# Log response details
status, headers, body = response
if body.is_a?(Array) && !body.empty? && request.post?
begin
parsed_response = JSON.parse(body.first)
if parsed_response["error"]
mcp_logger.error("Response error: #{parsed_response["error"]["message"]}")
elsif parsed_response["accepted"]
# Response was sent via SSE
server.notify_log_message(data: { details: "Response accepted and sent via SSE" }, level: "info")
sse_logger.info("Response sent via SSE stream")
else
mcp_logger.info("Response: success (id: #{parsed_response["id"]})")

# Log session ID for initialization
if headers["Mcp-Session-Id"]
sse_logger.info("Session created: #{headers["Mcp-Session-Id"]}")
status, headers, response_body = @app.call(env)

if response_body.is_a?(Array) && !response_body.empty? && env["REQUEST_METHOD"] == "POST"
begin
parsed = JSON.parse(response_body.first)

if parsed["error"]
@mcp_logger.error("Response error: #{parsed["error"]["message"]}")
else
@mcp_logger.info("Response: success (id: #{parsed["id"]})")
@sse_logger.info("Session created: #{headers["Mcp-Session-Id"]}") if headers["Mcp-Session-Id"]
end
rescue JSON::ParserError
@mcp_logger.warn("Invalid JSON in response")
end
rescue JSON::ParserError
mcp_logger.warn("Invalid JSON in response")
elsif env["REQUEST_METHOD"] == "GET" && status == 200
@sse_logger.info("SSE stream established")
end
elsif request.get? && status == 200
sse_logger.info("SSE stream established")
end

response
[status, headers, response_body]
end
end

# Build the Rack application with middleware
# Build the Rack application with middleware.
# `StreamableHTTPTransport` responds to `call(env)`, so it can be used directly as a Rack app.
rack_app = Rack::Builder.new do
# Enable CORS to allow browser-based MCP clients (e.g., MCP Inspector)
# WARNING: origins("*") allows all origins. Restrict this in production.
Expand All @@ -146,7 +138,9 @@ def call(message:, delay: 0)

use(Rack::CommonLogger, Logger.new($stdout))
use(Rack::ShowExceptions)
run(app)
use(McpSseLogger)

run(transport)
end

# Print usage instructions
Expand Down
14 changes: 14 additions & 0 deletions lib/mcp/server/transports/streamable_http_transport.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,15 @@
require "securerandom"
require_relative "../../transport"

# This file is autoloaded only when `StreamableHTTPTransport` is referenced,
# so the `rack` dependency does not affect `StdioTransport` users.
begin
require "rack"
rescue LoadError
raise LoadError, "The 'rack' gem is required to use the StreamableHTTPTransport. " \
"Add it to your Gemfile: gem 'rack'"
end

module MCP
class Server
module Transports
Expand All @@ -21,6 +30,11 @@ def initialize(server, stateless: false)
REQUIRED_GET_ACCEPT_TYPES = ["text/event-stream"].freeze
STREAM_WRITE_ERRORS = [IOError, Errno::EPIPE, Errno::ECONNRESET].freeze

# Rack app interface. This transport can be mounted as a Rack app.
def call(env)
handle_request(Rack::Request.new(env))
end

def handle_request(request)
case request.env["REQUEST_METHOD"]
when "POST"
Expand Down
79 changes: 79 additions & 0 deletions test/mcp/server/transports/streamable_http_transport_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -1247,6 +1247,85 @@ class StreamableHTTPTransportTest < ActiveSupport::TestCase
assert_equal(200, response[0])
end

test "call(env) works as a Rack app for POST requests" do
env = {
"REQUEST_METHOD" => "POST",
"PATH_INFO" => "/",
"rack.input" => StringIO.new({ jsonrpc: "2.0", method: "initialize", id: "init-1" }.to_json),
"CONTENT_TYPE" => "application/json",
"HTTP_ACCEPT" => "application/json, text/event-stream",
}

response = @transport.call(env)
assert_equal 200, response[0]
assert_equal "application/json", response[1]["Content-Type"]

body = JSON.parse(response[2][0])
assert_equal "2.0", body["jsonrpc"]
assert_equal "init-1", body["id"]
end

test "call(env) returns 405 for unsupported HTTP methods" do
env = {
"REQUEST_METHOD" => "PUT",
"PATH_INFO" => "/",
"rack.input" => StringIO.new(""),
}

response = @transport.call(env)
assert_equal 405, response[0]
end

test "call(env) handles GET SSE stream request" do
init_env = {
"REQUEST_METHOD" => "POST",
"PATH_INFO" => "/",
"rack.input" => StringIO.new({ jsonrpc: "2.0", method: "initialize", id: "init" }.to_json),
"CONTENT_TYPE" => "application/json",
"HTTP_ACCEPT" => "application/json, text/event-stream",
}
init_response = @transport.call(init_env)
session_id = init_response[1]["Mcp-Session-Id"]

get_env = {
"REQUEST_METHOD" => "GET",
"PATH_INFO" => "/",
"rack.input" => StringIO.new(""),
"HTTP_ACCEPT" => "text/event-stream",
"HTTP_MCP_SESSION_ID" => session_id,
}

response = @transport.call(get_env)
assert_equal 200, response[0]
assert_equal "text/event-stream", response[1]["Content-Type"]
assert response[2].is_a?(Proc)
end

test "call(env) handles DELETE session request" do
init_env = {
"REQUEST_METHOD" => "POST",
"PATH_INFO" => "/",
"rack.input" => StringIO.new({ jsonrpc: "2.0", method: "initialize", id: "init" }.to_json),
"CONTENT_TYPE" => "application/json",
"HTTP_ACCEPT" => "application/json, text/event-stream",
}
init_response = @transport.call(init_env)
session_id = init_response[1]["Mcp-Session-Id"]

delete_env = {
"REQUEST_METHOD" => "DELETE",
"PATH_INFO" => "/",
"rack.input" => StringIO.new(""),
"HTTP_MCP_SESSION_ID" => session_id,
}

response = @transport.call(delete_env)
assert_equal 200, response[0]

body = JSON.parse(response[2][0])
assert body["success"]
end

private

def create_rack_request(method, path, headers, body = nil)
Expand Down