Skip to content

Fix connection closure handling in Protocol and InMemoryTransport #833

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 55 additions & 0 deletions src/inMemory.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -118,4 +118,59 @@ describe("InMemoryTransport", () => {
await serverTransport.start();
expect(receivedMessage).toEqual(message);
});

test("should handle double close idempotently", async () => {
let clientCloseCount = 0;
let serverCloseCount = 0;

clientTransport.onclose = () => {
clientCloseCount++;
};

serverTransport.onclose = () => {
serverCloseCount++;
};

await clientTransport.close();
await clientTransport.close(); // Second close should be idempotent

expect(clientCloseCount).toBe(1);
expect(serverCloseCount).toBe(1);
});

test("should handle concurrent close from both sides", async () => {
let clientCloseCount = 0;
let serverCloseCount = 0;

clientTransport.onclose = () => {
clientCloseCount++;
};

serverTransport.onclose = () => {
serverCloseCount++;
};

// Close both sides concurrently
await Promise.all([
clientTransport.close(),
serverTransport.close()
]);

expect(clientCloseCount).toBe(1);
expect(serverCloseCount).toBe(1);
});

test("should reject send after close from either side", async () => {
await serverTransport.close();

// Both sides should reject sends
await expect(
clientTransport.send({ jsonrpc: "2.0", method: "test", id: 1 })
).rejects.toThrow("Not connected");

await expect(
serverTransport.send({ jsonrpc: "2.0", method: "test", id: 2 })
).rejects.toThrow("Not connected");
});

});
18 changes: 14 additions & 4 deletions src/inMemory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ interface QueuedMessage {
export class InMemoryTransport implements Transport {
private _otherTransport?: InMemoryTransport;
private _messageQueue: QueuedMessage[] = [];
private _isClosed = false;
private _closePromise?: Promise<void>;

onclose?: () => void;
onerror?: (error: Error) => void;
Expand All @@ -39,10 +41,18 @@ export class InMemoryTransport implements Transport {
}

async close(): Promise<void> {
const other = this._otherTransport;
this._otherTransport = undefined;
await other?.close();
this.onclose?.();
if (this._isClosed) return this._closePromise ?? Promise.resolve();

this._isClosed = true;
this._closePromise = (async () => {
const peer = this._otherTransport;
this._otherTransport = undefined; // Prevent infinite recursion

this.onclose?.();
await peer?.close();
})();

return this._closePromise;
}

/**
Expand Down
12 changes: 12 additions & 0 deletions src/shared/protocol.ts
Original file line number Diff line number Diff line change
Expand Up @@ -334,12 +334,24 @@ export abstract class Protocol<
this._progressHandlers.clear();
this._pendingDebouncedNotifications.clear();
this._transport = undefined;

// Abort all active request handlers
const requestHandlerAbortControllers = this._requestHandlerAbortControllers;
this._requestHandlerAbortControllers = new Map();

this.onclose?.();

const error = new McpError(ErrorCode.ConnectionClosed, "Connection closed");

// Reject all pending response handlers (for outgoing requests)
for (const handler of responseHandlers.values()) {
handler(error);
}

// Abort all active request handlers (for incoming requests being processed)
for (const abortController of requestHandlerAbortControllers.values()) {
abortController.abort(error);
}
}

private _onerror(error: Error): void {
Expand Down