Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 28 additions & 2 deletions packages/client/src/client/stdio.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ import process from 'node:process';
import type { Stream } from 'node:stream';
import { PassThrough } from 'node:stream';

import type { JSONRPCMessage, Transport } from '@modelcontextprotocol/core';
import { ReadBuffer, SdkError, SdkErrorCode, serializeMessage } from '@modelcontextprotocol/core';
import type { JSONRPCMessage, JSONRPCRequest, Transport } from '@modelcontextprotocol/core';
import { isStatelessProtocolVersion, ReadBuffer, SdkError, SdkErrorCode, serializeMessage, StreamDriver } from '@modelcontextprotocol/core';
import spawn from 'cross-spawn';

export type StdioServerParameters = {
Expand Down Expand Up @@ -95,11 +95,26 @@ export class StdioClientTransport implements Transport {
private _readBuffer: ReadBuffer = new ReadBuffer();
private _serverParams: StdioServerParameters;
private _stderrStream: PassThrough | null = null;
private _protocolVersion?: string;
/* eslint-disable-next-line unicorn/consistent-function-scoping */
private readonly _driver = new StreamDriver(m => this.send(m));

onclose?: () => void;
onerror?: (error: Error) => void;
onmessage?: (message: JSONRPCMessage) => void;

/**
* Sends one request and returns the server's messages for it. Backed by
* `StreamDriver`; bypasses `Protocol.request()`.
*/
sendAndReceive(request: Omit<JSONRPCRequest, 'jsonrpc' | 'id'>, opts?: { signal?: AbortSignal }): AsyncIterable<JSONRPCMessage> {
return this._driver.sendAndReceive(request, opts);
}

setProtocolVersion(version: string): void {
this._protocolVersion = version;
}

constructor(server: StdioServerParameters) {
this._serverParams = server;
if (server.stderr === 'pipe' || server.stderr === 'overlapped') {
Expand Down Expand Up @@ -195,6 +210,16 @@ export class StdioClientTransport implements Transport {
break;
}

// Default to StreamDriver until setProtocolVersion is called with
// a pre-2026 version. The discover/initialize probe goes via
// sendAndReceive, so the driver claims those.
if (
(this._protocolVersion === undefined || isStatelessProtocolVersion(this._protocolVersion)) &&
this._driver.onMessage(message)
) {
continue;
}

this.onmessage?.(message);
} catch (error) {
this.onerror?.(error as Error);
Expand All @@ -203,6 +228,7 @@ export class StdioClientTransport implements Transport {
}

async close(): Promise<void> {
this._driver.close();
if (this._process) {
const processToClose = this._process;
this._process = undefined;
Expand Down
2 changes: 2 additions & 0 deletions packages/core/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@ export * from './shared/authUtils.js';
export * from './shared/dispatcher.js';
export * from './shared/metadataUtils.js';
export * from './shared/protocol.js';
export * from './shared/serverStatelessRouter.js';
export * from './shared/stateless.js';
export * from './shared/stdio.js';
export * from './shared/streamDriver.js';
export * from './shared/toolNameValidation.js';
export * from './shared/transport.js';
export * from './shared/uriTemplate.js';
Expand Down
74 changes: 74 additions & 0 deletions packages/core/src/shared/serverStatelessRouter.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
import type { JSONRPCMessage, JSONRPCRequest, RequestId } from '../types/index.js';
import { isJSONRPCNotification, ProtocolErrorCode } from '../types/index.js';
import { errorResponse } from './dispatcher.js';
import type { ListenContext, StatelessHandlers } from './stateless.js';
import { isStatelessRequest } from './stateless.js';

/**
* Per-message router for pipe-shaped server transports (stdio, in-memory).
* Call once per inbound message. Returns `true` if the message was claimed by
* the stateless path (so the caller should NOT pass it to legacy `onmessage`).
*
* Stateless requests are dispatched via {@linkcode StatelessHandlers};
* `notifications/cancelled` aborts a tracked in-flight request.
*/
export function routeServerStateless(
message: JSONRPCMessage,
handlers: StatelessHandlers,
inflight: Map<RequestId, AbortController>,
write: (m: JSONRPCMessage) => void,
ctx: ListenContext,
onerror?: (e: Error) => void
): boolean {
if (isStatelessRequest(message)) {
const ac = new AbortController();
inflight.set(message.id, ac);
void handleOne(handlers, message, ac, write, ctx)
.catch(error => onerror?.(error instanceof Error ? error : new Error(String(error))))
.finally(() => inflight.delete(message.id));
return true;
}
if (isJSONRPCNotification(message) && message.method === 'notifications/cancelled') {
const requestId = (message.params as { requestId?: RequestId } | undefined)?.requestId;
const ac = requestId === undefined ? undefined : inflight.get(requestId);
if (ac) {
ac.abort();
return true;
}
}
return false;
}

async function handleOne(
handlers: StatelessHandlers,
req: JSONRPCRequest,
ac: AbortController,
write: (m: JSONRPCMessage) => void,
ctx: ListenContext
): Promise<void> {
if (req.method === 'subscriptions/listen') {
let listenStream;
try {
listenStream = handlers.listen(req, ctx);
} catch (error) {
write(
errorResponse(req.id, ProtocolErrorCode.InvalidParams, error instanceof Error ? error.message : 'Invalid listen request')
);
return;
}
const { stream, close } = listenStream;
ac.signal.addEventListener('abort', close, { once: true });
try {
for await (const m of stream) {
if (ac.signal.aborted) break;
write(m);
}
} finally {
ac.signal.removeEventListener('abort', close);
close();
}
} else {
const response = await handlers.dispatch(req, { signal: ac.signal, authInfo: ctx.authInfo, notify: write });
write(response);
}
}
132 changes: 132 additions & 0 deletions packages/core/src/shared/streamDriver.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
import { isJSONRPCNotification, isJSONRPCResponse } from '../types/guards.js';
import type { JSONRPCMessage, JSONRPCRequest, RequestId } from '../types/index.js';
import { JSONRPC_VERSION, ProtocolErrorCode } from '../types/index.js';
import { AsyncQueue } from '../util/asyncQueue.js';
import { META_KEYS } from './stateless.js';

/**
* Minimal request→response correlator for pipe-shaped client transports
* (stdio, in-memory) under the 2026-06 stateless model. Provides
* `sendAndReceive(request) → AsyncIterable<JSONRPCMessage>` so the Client can
* make stateless calls without going through `Protocol.request()` and its
* `_responseHandlers` map.
*
* The transport feeds every inbound message to {@linkcode onMessage}; the
* driver routes responses by `id` and notifications by `_meta.subscriptionId`
* (which is the originating request's id, per SEP-2575) to the matching
* iterator. Closing/breaking the iterator sends `notifications/cancelled`.
*/
export class StreamDriver {
// Seed in a range Protocol's `_requestMessageId` (which starts at 0) will
// not reach, so a stdio fallback that mixes a discover-probe (StreamDriver)
// with a legacy initialize (Protocol.request) on the same pipe cannot
// collide on id 0.
private _nextId = 0x40_00_00_00;
private readonly _pending = new Map<RequestId, AsyncQueue<JSONRPCMessage>>();

constructor(private readonly _send: (m: JSONRPCMessage) => Promise<void>) {}

/**
* Sends one request and returns an async-iterable of the messages the server
* emits for it: zero or more notifications, then exactly one response (which
* ends the iteration). For `subscriptions/listen`, the iteration continues
* until `break`/`return()` (which sends `notifications/cancelled`).
*
* The request is dispatched and registered in `_pending` immediately, before
* the first `next()`. Callers MUST consume the iterable (`for await` is
* sufficient: it calls `return()` on break/throw); obtaining it and never
* iterating leaks the `_pending` entry until {@linkcode close}.
*/
sendAndReceive(request: Omit<JSONRPCRequest, 'jsonrpc' | 'id'>, opts?: { signal?: AbortSignal }): AsyncIterable<JSONRPCMessage> {
const id = this._nextId++;
const isListen = request.method === 'subscriptions/listen';
const queue = new AsyncQueue<JSONRPCMessage>(256);

const cleanup = () => {
this._pending.delete(id);
this._pending.delete(String(id));
opts?.signal?.removeEventListener('abort', onAbort);
};
const cancel = () => {
if (queue.closed) return;
this._send({ jsonrpc: JSONRPC_VERSION, method: 'notifications/cancelled', params: { requestId: id } }).catch(() => {});
queue.close();
cleanup();
};
const onAbort = () => cancel();
opts?.signal?.addEventListener('abort', onAbort, { once: true });

this._pending.set(id, queue);
// `_meta.subscriptionId` on inbound notifications equals the string form
// of the request id (SEP-2575). Register the same queue under that key
// so {@linkcode onMessage} can route notifications without a second map.
this._pending.set(String(id), queue);

this._send({ jsonrpc: JSONRPC_VERSION, id, ...request }).catch(error => {
// Surface send failure to the iterator instead of hanging forever.
queue.push({
jsonrpc: JSONRPC_VERSION,
id,
error: {
code: ProtocolErrorCode.InternalError,
message: `Transport send failed: ${error instanceof Error ? error.message : String(error)}`
}
});
queue.close();
cleanup();
});

const inner = queue.iterate();
return {
[Symbol.asyncIterator]: () => ({
async next(): Promise<IteratorResult<JSONRPCMessage>> {
const r = await inner.next();
if (r.done) {
cleanup();
} else if (!isListen && isJSONRPCResponse(r.value)) {
// Non-listen: end after the response.
queue.close();
cleanup();
}
return r;
},
async return(): Promise<IteratorResult<JSONRPCMessage>> {
cancel();
return { value: undefined, done: true };
}
})
};
}

/**
* Feeds one inbound message to the driver. The transport calls this for
* every message received while in stateless mode. Returns `true` if the
* message was claimed (routed to a pending iterator).
*/
onMessage(m: JSONRPCMessage): boolean {
if ('id' in m && m.id !== null && m.id !== undefined) {
const q = this._pending.get(m.id);
if (q) {
q.push(m);
return true;
}
}
if (isJSONRPCNotification(m)) {
const sid = (m.params?._meta as Record<string, unknown> | undefined)?.[META_KEYS.subscriptionId];
if (typeof sid === 'string') {
const q = this._pending.get(sid);
if (q) {
q.push(m);
return true;
}
}
}
return false;
}

/** Ends every pending iterator (e.g., on transport close). */
close(): void {
for (const q of this._pending.values()) q.close();
this._pending.clear();
}
}
54 changes: 51 additions & 3 deletions packages/core/src/util/inMemory.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import { SdkError, SdkErrorCode } from '../errors/sdkErrors.js';
import { routeServerStateless } from '../shared/serverStatelessRouter.js';
import type { StatelessHandlers } from '../shared/stateless.js';
import { StreamDriver } from '../shared/streamDriver.js';
import type { Transport } from '../shared/transport.js';
import type { AuthInfo, JSONRPCMessage, RequestId } from '../types/index.js';
import type { AuthInfo, JSONRPCMessage, JSONRPCRequest, RequestId } from '../types/index.js';

interface QueuedMessage {
message: JSONRPCMessage;
Expand All @@ -18,11 +21,27 @@ export class InMemoryTransport implements Transport {
private _messageQueue: QueuedMessage[] = [];
private _closed = false;

private _statelessHandlers?: StatelessHandlers;
private readonly _inflight = new Map<RequestId, AbortController>();

/* eslint-disable-next-line unicorn/consistent-function-scoping */
private readonly _driver = new StreamDriver(m => this.send(m));

onclose?: () => void;
onerror?: (error: Error) => void;
onmessage?: (message: JSONRPCMessage, extra?: { authInfo?: AuthInfo }) => void;
sessionId?: string;

/** Client-side: backed by `StreamDriver`. */
sendAndReceive(request: Omit<JSONRPCRequest, 'jsonrpc' | 'id'>, opts?: { signal?: AbortSignal }): AsyncIterable<JSONRPCMessage> {
return this._driver.sendAndReceive(request, opts);
}

/** Server-side: installed by `Server.connect()`. */
setStatelessHandlers(h: StatelessHandlers): void {
this._statelessHandlers = h;
}

/**
* Creates a pair of linked in-memory transports that can communicate with each other. One should be passed to a {@linkcode @modelcontextprotocol/client!client/client.Client | Client} and one to a {@linkcode @modelcontextprotocol/server!server/server.Server | Server}.
*/
Expand All @@ -38,13 +57,42 @@ export class InMemoryTransport implements Transport {
// Process any messages that were queued before start was called
while (this._messageQueue.length > 0) {
const queuedMessage = this._messageQueue.shift()!;
this.onmessage?.(queuedMessage.message, queuedMessage.extra);
this._receive(queuedMessage.message, queuedMessage.extra);
}
}

/**
* Receive path. Per-message router (mirrors stdio): server-side stateless
* requests go to {@linkcode StatelessHandlers}; client-side
* {@linkcode StreamDriver} claims responses for pending iterators; unclaimed
* messages fall through to legacy `onmessage`.
*/
private _receive(message: JSONRPCMessage, extra?: { authInfo?: AuthInfo }): void {
if (
this._statelessHandlers &&
routeServerStateless(
message,
this._statelessHandlers,
this._inflight,
m => {
this.send(m).catch(error => this.onerror?.(error as Error));
},
{ authInfo: extra?.authInfo },
e => this.onerror?.(e)
)
) {
return;
}
if (this._driver.onMessage(message)) return;
this.onmessage?.(message, extra);
}

async close(): Promise<void> {
if (this._closed) return;
this._closed = true;
for (const ac of this._inflight.values()) ac.abort();
this._inflight.clear();
this._driver.close();

const other = this._otherTransport;
this._otherTransport = undefined;
Expand All @@ -65,7 +113,7 @@ export class InMemoryTransport implements Transport {
}

if (this._otherTransport.onmessage) {
this._otherTransport.onmessage(message, { authInfo: options?.authInfo });
this._otherTransport._receive(message, { authInfo: options?.authInfo });
} else {
this._otherTransport._messageQueue.push({ message, extra: { authInfo: options?.authInfo } });
}
Expand Down
Loading
Loading