Skip to content

Proposing a Distributed Remote Transport Abstraction #828

@DavidParks8

Description

@DavidParks8

Distributed Session Routing Design Specification

As enterprises begin to scale out mcp servers, they inevitably run into scaling issues with stateful features which requires session affinity-based routing. This document specifies the design for supporting distributed MCP server deployments using the Streamable HTTP transport to remove the need for session affinity and improve the impact of horizontal scale out. The design introduces abstractions that enable proper message routing across multiple server instances while maintaining the MCP protocol's requirements for SSE stream affinity and request-response correlation.

Background

The MCP Streamable HTTP transport specification has specific routing requirements in distributed systems:

  1. SSE Stream Affinity: When a client opens an SSE stream via GET request to instance A, all server-initiated messages (notifications, requests) for that session must be sent through instance A's stream.

  2. Request-Response Correlation: When instance A sends a request to the client (e.g., sampling, roots, elicitation), the client's response may arrive via POST to instance B. Instance B must route the response back to instance A where the request originated.

  3. Session State: Session state created during initialization must be accessible across instances, though the actual MCP server instance may live on one specific server.

Design Goals

  1. Backward Compatible: Single-instance deployments should have zero overhead
  2. Clean Separation: Routing concerns separated from core transport logic
  3. Testable: Easy to mock and unit test
  4. Flexible: Support the potential for multiple future distributed architectures (Orleans, Redis, Service Bus, etc.)
  5. Opt-in: Complexity only when needed
  6. Spec Compliant: Ensure proper message routing per MCP specification

Architecture

Core Abstraction: ISessionRouter

The primary abstraction is the ISessionRouter interface, which encapsulates all routing decisions and execution. Callers provide callbacks for local message handling, and the router internally decides whether to execute them locally or forward messages to remote instances.

Why Callbacks?

The callback-based design provides:

  • Location transparency: Callers don't need to know about routing decisions
  • Serialization safety: Callbacks are never serialized, only executed locally
  • Flexibility: Supports both local and distributed implementations seamlessly
  • Testability: Easy to verify routing behavior by inspecting callback invocations

Public Interfaces

ISessionRouter

namespace ModelContextProtocol.AspNetCore.Routing;

/// <summary>
/// Provides routing capabilities for MCP sessions in distributed deployments.
/// Handles both routing decisions and message delivery internally.
/// </summary>
/// <remarks>
/// <para>
/// In single-instance deployments, implementations should simply execute all callbacks immediately.
/// In distributed deployments, implementations must:
/// <list type="bullet">
///   <item>Track which instance owns each session</item>
///   <item>Track which instance has an active GET/SSE stream for each session</item>
///   <item>Track which instance originated each outgoing request to the client</item>
///   <item>Route messages to the appropriate instance based on message type and context</item>
/// </list>
/// </para>
/// <para>
/// The router uses callbacks to execute message handling on the correct instance. Callbacks are
/// always executed locally and should never be serialized or transmitted between instances.
/// </para>
/// </remarks>
public interface ISessionRouter
{
    /// <summary>
    /// Registers that this instance owns the session and is handling the initialize request.
    /// </summary>
    /// <param name="sessionId">The MCP session ID.</param>
    /// <param name="cancellationToken">Cancellation token.</param>
    /// <returns>A task representing the registration operation.</returns>
    /// <remarks>
    /// This is called after the client sends an initialize request and before the server responds.
    /// The session owner is responsible for maintaining the MCP server instance and session state.
    /// </remarks>
    ValueTask RegisterSessionAsync(string sessionId, CancellationToken cancellationToken = default);

    /// <summary>
    /// Registers that this instance has an active SSE stream (GET request) for the session.
    /// </summary>
    /// <param name="sessionId">The MCP session ID.</param>
    /// <param name="streamId">A unique identifier for this specific GET stream.</param>
    /// <param name="cancellationToken">Cancellation token.</param>
    /// <returns>A disposable that unregisters the stream when disposed.</returns>
    /// <remarks>
    /// <para>
    /// This is called when a client opens a GET request to listen for server-initiated messages.
    /// According to the MCP specification, servers must send each message on only one stream,
    /// so only one instance can have an active stream for a given session at a time.
    /// </para>
    /// <para>
    /// The returned <see cref="IAsyncDisposable"/> should be disposed when the GET request completes
    /// or the SSE stream is closed, to indicate the stream is no longer active.
    /// </para>
    /// </remarks>
    ValueTask<IAsyncDisposable> RegisterActiveStreamAsync(
        string sessionId, 
        string streamId, 
        CancellationToken cancellationToken = default);

    /// <summary>
    /// Registers an outgoing request to the client, tracking where it originated for response routing.
    /// </summary>
    /// <param name="sessionId">The MCP session ID.</param>
    /// <param name="requestId">The JSON-RPC request ID.</param>
    /// <param name="cancellationToken">Cancellation token.</param>
    /// <returns>A disposable that unregisters the request when disposed (on response or timeout).</returns>
    /// <remarks>
    /// <para>
    /// This is called before sending a request from the server to the client (e.g., sampling/createMessage,
    /// roots/list, elicitation/create). The instance that sends the request must be able to receive
    /// the response, even if the response arrives at a different instance via POST.
    /// </para>
    /// <para>
    /// The returned <see cref="IAsyncDisposable"/> should be disposed when the response is received
    /// or the request times out, to clean up tracking state.
    /// </para>
    /// </remarks>
    ValueTask<IAsyncDisposable> RegisterOutgoingRequestAsync(
        string sessionId, 
        RequestId requestId, 
        CancellationToken cancellationToken = default);

    /// <summary>
    /// Routes and sends an outgoing message (server-to-client) to the appropriate instance.
    /// </summary>
    /// <param name="sessionId">The MCP session ID.</param>
    /// <param name="message">The message to route.</param>
    /// <param name="localSender">
    /// A callback to send the message locally if this instance should handle it.
    /// This callback will only be invoked if the router determines the message should be handled locally.
    /// </param>
    /// <param name="cancellationToken">Cancellation token.</param>
    /// <returns>A task representing the routing and delivery operation.</returns>
    /// <remarks>
    /// <para>
    /// Routing logic:
    /// <list type="bullet">
    ///   <item>
    ///     <term>JSON-RPC Responses</term>
    ///     <description>Route to the instance that sent the original request (tracked by <see cref="RegisterOutgoingRequestAsync"/>)</description>
    ///   </item>
    ///   <item>
    ///     <term>JSON-RPC Requests and Notifications</term>
    ///     <description>Route to the instance with an active GET/SSE stream (tracked by <see cref="RegisterActiveStreamAsync"/>)</description>
    ///   </item>
    /// </list>
    /// </para>
    /// <para>
    /// If the message should be handled locally, the router invokes the <paramref name="localSender"/> callback.
    /// If the message should be handled remotely, the router forwards it to the appropriate instance using
    /// implementation-specific mechanisms (HTTP, gRPC, message bus, Orleans grain observers, etc.).
    /// </para>
    /// <para>
    /// The <paramref name="localSender"/> callback should never be serialized or transmitted. It represents
    /// the local transport's send operation and is only meaningful on the current instance.
    /// </para>
    /// </remarks>
    ValueTask RouteOutgoingMessageAsync(
        string sessionId, 
        JsonRpcMessage message, 
        Func<JsonRpcMessage, CancellationToken, ValueTask> localSender,
        CancellationToken cancellationToken = default);

    /// <summary>
    /// Routes and handles an incoming message (client-to-server) from a POST request.
    /// </summary>
    /// <param name="sessionId">The MCP session ID.</param>
    /// <param name="message">The incoming message from the client.</param>
    /// <param name="localHandler">
    /// A callback to handle the message locally if this instance should process it.
    /// This callback will only be invoked if the router determines the message should be handled locally.
    /// </param>
    /// <param name="cancellationToken">Cancellation token.</param>
    /// <returns>A task representing the routing and handling operation.</returns>
    /// <remarks>
    /// <para>
    /// Routing logic:
    /// <list type="bullet">
    ///   <item>
    ///     <term>JSON-RPC Responses</term>
    ///     <description>Route to the instance that sent the original request (tracked by <see cref="RegisterOutgoingRequestAsync"/>)</description>
    ///   </item>
    ///   <item>
    ///     <term>JSON-RPC Requests and Notifications</term>
    ///     <description>Route to the instance that owns the session (tracked by <see cref="RegisterSessionAsync"/>)</description>
    ///   </item>
    /// </list>
    /// </para>
    /// <para>
    /// If the message should be handled locally, the router invokes the <paramref name="localHandler"/> callback.
    /// If the message should be handled remotely, the router forwards it to the appropriate instance using
    /// implementation-specific mechanisms.
    /// </para>
    /// <para>
    /// The <paramref name="localHandler"/> callback should never be serialized or transmitted. It represents
    /// the local session's message processing and is only meaningful on the current instance.
    /// </para>
    /// </remarks>
    ValueTask RouteIncomingMessageAsync(
        string sessionId, 
        JsonRpcMessage message, 
        Func<JsonRpcMessage, CancellationToken, ValueTask> localHandler,
        CancellationToken cancellationToken = default);

    /// <summary>
    /// Unregisters a session when it's disposed or terminated.
    /// </summary>
    /// <param name="sessionId">The MCP session ID.</param>
    /// <param name="cancellationToken">Cancellation token.</param>
    /// <returns>A task representing the unregistration operation.</returns>
    /// <remarks>
    /// This is called when a session is disposed (via DELETE request or idle timeout).
    /// Implementations should clean up all tracking state for the session.
    /// </remarks>
    ValueTask UnregisterSessionAsync(string sessionId, CancellationToken cancellationToken = default);
}

Default Implementation

LocalSessionRouter

A default implementation for single-instance deployments that requires no overhead:

namespace ModelContextProtocol.AspNetCore.Routing;

/// <summary>
/// Default implementation of <see cref="ISessionRouter"/> for single-instance deployments.
/// All messages are handled locally without any routing overhead.
/// </summary>
/// <remarks>
/// This implementation is used when no distributed routing is configured. It immediately
/// executes all callbacks locally and performs no tracking or forwarding.
/// </remarks>
internal sealed class LocalSessionRouter : ISessionRouter
{
    /// <inheritdoc/>
    public ValueTask RegisterSessionAsync(string sessionId, CancellationToken cancellationToken = default)
    {
        // No-op: Single instance doesn't need to track session ownership
        return ValueTask.CompletedTask;
    }

    /// <inheritdoc/>
    public ValueTask<IAsyncDisposable> RegisterActiveStreamAsync(
        string sessionId, 
        string streamId, 
        CancellationToken cancellationToken = default)
    {
        // No-op: Single instance doesn't need to track active streams
        return ValueTask.FromResult<IAsyncDisposable>(EmptyAsyncDisposable.Instance);
    }

    /// <inheritdoc/>
    public ValueTask<IAsyncDisposable> RegisterOutgoingRequestAsync(
        string sessionId, 
        RequestId requestId, 
        CancellationToken cancellationToken = default)
    {
        // No-op: Single instance doesn't need to track request origins
        return ValueTask.FromResult<IAsyncDisposable>(EmptyAsyncDisposable.Instance);
    }

    /// <inheritdoc/>
    public async ValueTask RouteOutgoingMessageAsync(
        string sessionId, 
        JsonRpcMessage message, 
        Func<JsonRpcMessage, CancellationToken, ValueTask> localSender,
        CancellationToken cancellationToken = default)
    {
        // Always handle locally in single-instance mode
        await localSender(message, cancellationToken).ConfigureAwait(false);
    }

    /// <inheritdoc/>
    public async ValueTask RouteIncomingMessageAsync(
        string sessionId, 
        JsonRpcMessage message, 
        Func<JsonRpcMessage, CancellationToken, ValueTask> localHandler,
        CancellationToken cancellationToken = default)
    {
        // Always handle locally in single-instance mode
        await localHandler(message, cancellationToken).ConfigureAwait(false);
    }

    /// <inheritdoc/>
    public ValueTask UnregisterSessionAsync(string sessionId, CancellationToken cancellationToken = default)
    {
        // No-op: No tracking to clean up
        return ValueTask.CompletedTask;
    }

    private sealed class EmptyAsyncDisposable : IAsyncDisposable
    {
        public static readonly EmptyAsyncDisposable Instance = new();
        private EmptyAsyncDisposable() { }
        public ValueTask DisposeAsync() => ValueTask.CompletedTask;
    }
}

Configuration

Dependency Injection Registration

The ISessionRouter is resolved through dependency injection, allowing for flexible configuration and testability:

// Program.cs or Startup.cs

// Default: Single-instance mode (no routing overhead)
builder.Services.AddSingleton<ISessionRouter, LocalSessionRouter>();

// Or: Custom distributed implementation
builder.Services.AddSingleton<ISessionRouter, OrleansSessionRouter>();

// Or: Conditional based on environment
if (builder.Environment.IsProduction())
{
    // Distributed mode for production
    builder.Services.AddSingleton<ISessionRouter, DistributedSessionRouter>();
}
else
{
    // Local mode for development
    builder.Services.AddSingleton<ISessionRouter, LocalSessionRouter>();
}

// Configure MCP server
builder.Services.AddMcp(options =>
{
    // ... existing MCP configuration ...
});

Service Lifetime

ISessionRouter should be registered as a singleton:

  • Single instance per application
  • Maintains consistency across all sessions
  • Efficient for state tracking and forwarding
  • Thread-safe implementations required

Default Registration

If no ISessionRouter is registered, the framework will use LocalSessionRouter by default:

// In McpEndpointRouteBuilderExtensions or service registration
services.TryAddSingleton<ISessionRouter, LocalSessionRouter>();

Integration Points

StreamableHttpServerTransport

The transport layer receives ISessionRouter via constructor injection and uses it for outgoing messages:

public sealed class StreamableHttpServerTransport : ITransport
{
    private readonly ISessionRouter _sessionRouter;
    
    public StreamableHttpServerTransport(ISessionRouter sessionRouter)
    {
        _sessionRouter = sessionRouter;
    }

    public async Task SendMessageAsync(JsonRpcMessage message, CancellationToken cancellationToken = default)
    {
        Throw.IfNull(message);

        if (Stateless)
        {
            throw new InvalidOperationException(
                "Unsolicited server to client messages are not supported in stateless mode.");
        }

        if (SessionId is not null)
        {
            // Router handles decision and execution
            await _sessionRouter.RouteOutgoingMessageAsync(
                SessionId, 
                message,
                localSender: (msg, ct) => _sseWriter.SendMessageAsync(msg, ct),
                cancellationToken).ConfigureAwait(false);
        }
        else
        {
            // No session ID, send directly (initialization phase)
            await _sseWriter.SendMessageAsync(message, cancellationToken).ConfigureAwait(false);
        }
    }
}

Note: The transport is typically created per-session. The ISessionRouter is injected when creating the transport instance.

StreamableHttpHandler

The HTTP handler receives ISessionRouter via constructor injection and uses it for routing both incoming and outgoing messages:

internal sealed class StreamableHttpHandler
{
    private readonly ISessionRouter _sessionRouter;
    // ... other dependencies ...
    
    public StreamableHttpHandler(
        ISessionRouter sessionRouter,
        // ... other parameters ...
    )
    {
        _sessionRouter = sessionRouter;
        // ...
    }
    
    // ... rest of class ...
}

POST Request Handling

public async Task HandlePostRequestAsync(HttpContext context)
{
    // ... existing validation ...
    
    var session = await GetOrCreateSessionAsync(context);
    if (session is null)
    {
        return;
    }

    await using var _ = await session.AcquireReferenceAsync(context.RequestAborted);

    var message = await ReadJsonRpcMessageAsync(context);
    if (message is null)
    {
        await WriteJsonRpcErrorAsync(context,
            "Bad Request: The POST body did not contain a valid JSON-RPC message.",
            StatusCodes.Status400BadRequest);
        return;
    }

    // Router handles decision and execution
    await _sessionRouter.RouteIncomingMessageAsync(
        session.Id,
        message,
        localHandler: async (msg, ct) =>
        {
            InitializeSseResponse(context);
            var wroteResponse = await session.Transport.HandlePostRequest(msg, context.Response.Body, ct);
            if (!wroteResponse)
            {
                context.Response.Headers.ContentType = (string?)null;
                context.Response.StatusCode = StatusCodes.Status202Accepted;
            }
        },
        context.RequestAborted);
}

GET Request Handling

public async Task HandleGetRequestAsync(HttpContext context)
{
    // ... existing validation ...
    
    var sessionId = context.Request.Headers[McpSessionIdHeaderName].ToString();
    var session = await GetSessionAsync(context, sessionId);
    if (session is null)
    {
        return;
    }

    if (!session.TryStartGetRequest())
    {
        await WriteJsonRpcErrorAsync(context,
            "Bad Request: This server does not support multiple GET requests.",
            StatusCodes.Status400BadRequest);
        return;
    }

    await using var _ = await session.AcquireReferenceAsync(context.RequestAborted);
    
    var streamId = Guid.NewGuid().ToString();
    var streamRegistration = await _sessionRouter.RegisterActiveStreamAsync(
        sessionId, 
        streamId, 
        context.RequestAborted);
    
    try
    {
        InitializeSseResponse(context);
        await context.Response.Body.FlushAsync(context.RequestAborted);
        await session.Transport.HandleGetRequest(context.Response.Body, context.RequestAborted);
    }
    finally
    {
        await streamRegistration.DisposeAsync();
    }
}

Session Registration

private async ValueTask<StreamableHttpSession> StartNewSessionAsync(HttpContext context)
{
    string sessionId = MakeNewSessionId();
    // ... create transport and session ...
    
    await _sessionRouter.RegisterSessionAsync(sessionId, context.RequestAborted);
    
    return session;
}

Session Cleanup

// In StreamableHttpSession.DisposeAsync or session manager
public async ValueTask DisposeAsync()
{
    // ... existing disposal logic ...
    
    await _sessionRouter.UnregisterSessionAsync(sessionId);
}

Request Tracking

For tracking outgoing requests, integration is needed in the message sending pipeline:

// In McpServer or transport layer when sending a request to the client
private async Task SendClientRequestAsync(JsonRpcRequest request, CancellationToken cancellationToken)
{
    IAsyncDisposable? requestRegistration = null;
    
    if (_sessionRouter is not null && SessionId is not null)
    {
        requestRegistration = await _sessionRouter.RegisterOutgoingRequestAsync(
            SessionId, 
            request.Id, 
            cancellationToken);
    }
    
    try
    {
        await _transport.SendMessageAsync(request, cancellationToken);
        
        // Wait for response...
        var response = await WaitForResponseAsync(request.Id, cancellationToken);
        
        return response;
    }
    finally
    {
        if (requestRegistration is not null)
        {
            await requestRegistration.DisposeAsync();
        }
    }
}

Distributed Implementation Guidelines

Implementations of ISessionRouter for distributed scenarios must provide:

1. State Storage

Track the following state (using Redis, database, Orleans grains, etc.):

  • Session ownership: sessionId → instanceId
  • Active streams: sessionId → (streamId, instanceId)
  • Pending requests: (sessionId, requestId) → instanceId

2. Message Forwarding

Forward messages between instances using one of:

  • HTTP: POST messages to target instance's internal API
  • gRPC: Efficient binary protocol for inter-instance communication
  • Message Bus: Azure Service Bus, RabbitMQ, etc.
  • Orleans Observers: Grain observer callbacks (for Orleans deployments)

3. Instance Identity

Each instance must have a unique identifier:

  • Machine name + process ID
  • Kubernetes pod name
  • Auto-generated GUID
  • Orleans silo address

4. Failure Handling

Handle scenarios where:

  • Target instance is no longer available (route to current owner)
  • No active stream exists (queue or drop message)
  • State store is temporarily unavailable (fallback to local handling)

Example: Distributed Implementation Structure

namespace ModelContextProtocol.AspNetCore.Routing;

internal sealed class DistributedSessionRouter : ISessionRouter
{
    private readonly ISessionLocationRegistry _registry; // State storage abstraction
    private readonly IMessageForwarder _forwarder;       // Inter-instance communication
    private readonly string _currentInstanceId;

    public async ValueTask RouteOutgoingMessageAsync(
        string sessionId, 
        JsonRpcMessage message, 
        Func<JsonRpcMessage, CancellationToken, ValueTask> localSender,
        CancellationToken cancellationToken = default)
    {
        string? targetInstance = null;

        // Determine routing based on message type
        if (message is JsonRpcResponse or JsonRpcError)
        {
            // Responses go to request originator
            var requestId = ((JsonRpcMessageWithId)message).Id;
            targetInstance = await _registry.GetRequestOriginAsync(sessionId, requestId, cancellationToken);
        }
        else
        {
            // Requests/notifications go to instance with active stream
            targetInstance = await _registry.GetActiveStreamInstanceAsync(sessionId, cancellationToken);
        }

        if (targetInstance == _currentInstanceId || targetInstance is null)
        {
            // Handle locally
            await localSender(message, cancellationToken);
        }
        else
        {
            // Forward to remote instance
            await _forwarder.ForwardMessageAsync(targetInstance, sessionId, message, cancellationToken);
        }
    }

    // ... other methods follow similar pattern ...
}

Testing Strategy

Unit Tests

  1. LocalSessionRouter: Verify all callbacks are executed immediately
  2. Routing Logic: Mock registry and forwarder to verify routing decisions
  3. State Tracking: Verify registration/unregistration flows

Integration Tests

  1. Single Instance: Verify no behavior change from non-routed version
  2. Multi-Instance: Simulate multiple instances with shared state
  3. Failure Scenarios: Test missing streams, disconnected instances

Performance Tests

  1. Overhead: Measure added latency for LocalSessionRouter
  2. Throughput: Test message throughput with distributed router
  3. State Operations: Measure registry operation performance

Migration Path

Phase 1: Interface Introduction

  • Add ISessionRouter interface
  • Implement LocalSessionRouter
  • Wire up integration points with null-checks

Phase 2: Testing

  • Add comprehensive unit tests
  • Add integration tests with mock distributed router

Non-Goals

  • Last-Event-ID Support: Session resumption via Last-Event-ID header is explicitly not supported
  • Cross-Session Routing: Each session is independent; no routing between different sessions
  • Client Load Balancing: Client chooses instance via load balancer; this design handles server-side routing only

Alternative Designs Considered

  1. Return Routing Decision: Rejected because it exposes internal routing logic to callers and is not part of the MCP spec
  2. Separate Forwarder Interface: Made internal because it's an implementation detail
  3. Direct Transport References: Rejected because transports aren't serializable across instances

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or request

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions