From d67085d652f99ea6389cbd3e627f31afba9b7341 Mon Sep 17 00:00:00 2001 From: Waleed Latif Date: Fri, 22 May 2026 08:50:14 -0700 Subject: [PATCH 1/9] =?UTF-8?q?improvement(mcp):=20post-merge=20hardening?= =?UTF-8?q?=20=E2=80=94=20protocol=20negotiation=20+=20distributed=20OAuth?= =?UTF-8?q?=20lock=20+=20typed=20error=20dispatch?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Inbound MCP server now negotiates protocolVersion per MCP 2025-06-18: echoes the client's requested version when supported, falls back to our latest. Previously hardcoded to the oldest spec version (2024-11-05). - withMcpOauthRefreshLock now takes a Postgres advisory transaction lock in addition to the in-process Promise chain, so concurrent processes (multi-task ECS) serialize on a per-OAuth-row basis. Previously a refresh race across processes could rotate a token under another process's feet and force re-auth. - categorizeError dispatches on McpOauthAuthorizationRequiredError / UnauthorizedError / McpConnectionError first, only falling back to substring matching for SDK / third-party errors. Adds 502 for connection failures and 503 for cooldown. Tests cover all four typed cases. - discoverTools no longer pretends to handle deferred-side-effect rejections via a dead allSettled().catch() — each side-effect already self-logs; we just swallow per-promise to silence unhandled-rejection warnings. --- .../api/mcp/serve/[serverId]/route.test.ts | 45 +++++++++++++++++++ .../sim/app/api/mcp/serve/[serverId]/route.ts | 18 +++++++- apps/sim/lib/mcp/oauth/storage.ts | 30 +++++++++---- apps/sim/lib/mcp/service.ts | 6 +-- apps/sim/lib/mcp/utils.test.ts | 28 ++++++++++++ apps/sim/lib/mcp/utils.ts | 25 ++++++++--- 6 files changed, 133 insertions(+), 19 deletions(-) diff --git a/apps/sim/app/api/mcp/serve/[serverId]/route.test.ts b/apps/sim/app/api/mcp/serve/[serverId]/route.test.ts index bd9b10d4d3d..01ee610f6d8 100644 --- a/apps/sim/app/api/mcp/serve/[serverId]/route.test.ts +++ b/apps/sim/app/api/mcp/serve/[serverId]/route.test.ts @@ -197,4 +197,49 @@ describe('MCP Serve Route', () => { expect(headers['X-API-Key']).toBeUndefined() expect(mockGenerateInternalToken).toHaveBeenCalledWith('user-1') }) + + describe('initialize protocol version negotiation', () => { + async function callInitialize(protocolVersion?: string) { + dbChainMockFns.limit.mockResolvedValueOnce([ + { + id: 'server-1', + name: 'Public Server', + workspaceId: 'ws-1', + isPublic: true, + createdBy: 'owner-1', + }, + ]) + const params: Record = { + capabilities: {}, + clientInfo: { name: 'test', version: '1.0.0' }, + } + if (protocolVersion !== undefined) params.protocolVersion = protocolVersion + const req = new NextRequest('http://localhost:3000/api/mcp/serve/server-1', { + method: 'POST', + body: JSON.stringify({ jsonrpc: '2.0', id: 1, method: 'initialize', params }), + }) + const res = await POST(req, { params: Promise.resolve({ serverId: 'server-1' }) }) + return res.json() as Promise<{ result: { protocolVersion: string } }> + } + + it('echoes a supported client protocolVersion (2025-06-18)', async () => { + const body = await callInitialize('2025-06-18') + expect(body.result.protocolVersion).toBe('2025-06-18') + }) + + it('echoes a supported client protocolVersion (2024-11-05)', async () => { + const body = await callInitialize('2024-11-05') + expect(body.result.protocolVersion).toBe('2024-11-05') + }) + + it('falls back to latest when client requests unknown version', async () => { + const body = await callInitialize('2099-01-01') + expect(body.result.protocolVersion).toBe('2025-06-18') + }) + + it('falls back to latest when client omits protocolVersion', async () => { + const body = await callInitialize(undefined) + expect(body.result.protocolVersion).toBe('2025-06-18') + }) + }) }) diff --git a/apps/sim/app/api/mcp/serve/[serverId]/route.ts b/apps/sim/app/api/mcp/serve/[serverId]/route.ts index 702c9a57cf4..61767f8143f 100644 --- a/apps/sim/app/api/mcp/serve/[serverId]/route.ts +++ b/apps/sim/app/api/mcp/serve/[serverId]/route.ts @@ -36,6 +36,22 @@ import { getUserEntityPermissions } from '@/lib/workspaces/permissions/utils' const logger = createLogger('WorkflowMcpServeAPI') +// Newest first. We echo the client's version when we support it (per +// MCP 2025-06-18 lifecycle spec); otherwise fall back to our latest. +const SUPPORTED_PROTOCOL_VERSIONS = ['2025-06-18', '2025-03-26', '2024-11-05'] as const +const LATEST_PROTOCOL_VERSION = SUPPORTED_PROTOCOL_VERSIONS[0] + +function negotiateProtocolVersion(rpcParams: unknown): string { + const requested = + rpcParams && typeof rpcParams === 'object' && 'protocolVersion' in rpcParams + ? (rpcParams as { protocolVersion?: unknown }).protocolVersion + : undefined + if (typeof requested === 'string' && SUPPORTED_PROTOCOL_VERSIONS.includes(requested as never)) { + return requested + } + return LATEST_PROTOCOL_VERSION +} + export const dynamic = 'force-dynamic' interface RouteParams { @@ -214,7 +230,7 @@ export const POST = withRouteHandler( switch (method) { case 'initialize': { const result: InitializeResult = { - protocolVersion: '2024-11-05', + protocolVersion: negotiateProtocolVersion(rpcParams), capabilities: { tools: {} }, serverInfo: { name: server.name, version: '1.0.0' }, } diff --git a/apps/sim/lib/mcp/oauth/storage.ts b/apps/sim/lib/mcp/oauth/storage.ts index ee6ae0143ff..84531646688 100644 --- a/apps/sim/lib/mcp/oauth/storage.ts +++ b/apps/sim/lib/mcp/oauth/storage.ts @@ -8,7 +8,7 @@ import { mcpServerOauth } from '@sim/db/schema' import { createLogger } from '@sim/logger' import { toError } from '@sim/utils/errors' import { generateId } from '@sim/utils/id' -import { and, eq, gt } from 'drizzle-orm' +import { and, eq, gt, sql } from 'drizzle-orm' import { decryptSecret, encryptSecret } from '@/lib/core/security/encryption' const logger = createLogger('McpOauthStorage') @@ -227,19 +227,31 @@ export async function clearState(rowId: string): Promise { } /** - * Per-process serialization for an OAuth row. Refresh tokens rotate (RFC 6749 §6, - * MCP §2.3.3), so two concurrent refreshes against the same row would race and one - * would receive `invalid_grant`, wiping the credentials. We serialize SDK calls - * that may trigger a refresh on a per-row basis. + * Serialize OAuth row access across all callers, in-process AND across + * processes. Refresh tokens rotate (RFC 6749 §6, MCP §2.3.3), so two concurrent + * refreshes against the same row would race and one would receive + * `invalid_grant`, wiping credentials. + * + * Two-tier locking: + * 1) In-process Promise chain — cheap, avoids DB roundtrips when the same + * Node process holds concurrent callers. + * 2) Postgres advisory transaction lock — blocks across processes; released + * automatically when the transaction ends. */ const refreshLocks = new Map>() export async function withMcpOauthRefreshLock(rowId: string, fn: () => Promise): Promise { const prev = refreshLocks.get(rowId) ?? Promise.resolve() - // Wait for the predecessor to settle (success or failure), discard its - // value/error, then run fn. Each caller awaits its own fn's outcome — errors - // do not propagate across callers in the chain. - const next = prev.catch(() => undefined).then(() => fn()) + const next = prev + .catch(() => undefined) + .then(() => + db.transaction(async (tx) => { + await tx.execute( + sql`SELECT pg_advisory_xact_lock(hashtextextended(${`mcp_oauth_refresh:${rowId}`}, 0))` + ) + return fn() + }) + ) refreshLocks.set(rowId, next) const cleanup = () => { if (refreshLocks.get(rowId) === next) refreshLocks.delete(rowId) diff --git a/apps/sim/lib/mcp/service.ts b/apps/sim/lib/mcp/service.ts index 113998c61aa..90b931c2298 100644 --- a/apps/sim/lib/mcp/service.ts +++ b/apps/sim/lib/mcp/service.ts @@ -601,9 +601,9 @@ class McpService { // Await cache writes so a follow-up discoverTools sees consistent state. await Promise.allSettled(cacheWrites) - Promise.allSettled(deferredSideEffects).catch((err) => { - logger.error(`[${requestId}] Error in deferred discovery work:`, err) - }) + // Each deferred side-effect self-logs failures, so we just mark the + // promises as handled to avoid unhandled-rejection warnings. + for (const p of deferredSideEffects) p.catch(() => {}) if (mcpConnectionManager) { for (const conn of liveConnections) { diff --git a/apps/sim/lib/mcp/utils.test.ts b/apps/sim/lib/mcp/utils.test.ts index 29aded1358e..30990f62d4a 100644 --- a/apps/sim/lib/mcp/utils.test.ts +++ b/apps/sim/lib/mcp/utils.test.ts @@ -1,5 +1,7 @@ +import { UnauthorizedError } from '@modelcontextprotocol/sdk/client/auth.js' import { describe, expect, it } from 'vitest' import { DEFAULT_EXECUTION_TIMEOUT_MS } from '@/lib/core/execution-limits' +import { McpConnectionError, McpOauthAuthorizationRequiredError } from '@/lib/mcp/types' import { categorizeError, createMcpToolId, @@ -304,6 +306,32 @@ describe('categorizeError', () => { expect(result.message).toBe('Unknown error occurred') }) + it.concurrent('returns 401 for McpOauthAuthorizationRequiredError via instanceof', () => { + const error = new McpOauthAuthorizationRequiredError('mcp-a', 'A') + const result = categorizeError(error) + expect(result.status).toBe(401) + expect(result.message).toBe('Authentication required') + }) + + it.concurrent('returns 401 for SDK UnauthorizedError via instanceof', () => { + const error = new UnauthorizedError('token expired') + const result = categorizeError(error) + expect(result.status).toBe(401) + }) + + it.concurrent('returns 503 for McpConnectionError with cooldown message', () => { + const error = new McpConnectionError('Server in cooldown — try again shortly.', 'mcp-a') + const result = categorizeError(error) + expect(result.status).toBe(503) + }) + + it.concurrent('returns 502 for other McpConnectionError', () => { + const error = new McpConnectionError('connect ECONNREFUSED', 'mcp-a') + const result = categorizeError(error) + expect(result.status).toBe(502) + expect(result.message).toBe('Connection failed') + }) + it.concurrent('returns 500 for null', () => { const result = categorizeError(null) expect(result.status).toBe(500) diff --git a/apps/sim/lib/mcp/utils.ts b/apps/sim/lib/mcp/utils.ts index 6364cafb111..e5c2f9db22f 100644 --- a/apps/sim/lib/mcp/utils.ts +++ b/apps/sim/lib/mcp/utils.ts @@ -1,6 +1,11 @@ +import { UnauthorizedError } from '@modelcontextprotocol/sdk/client/auth.js' import { NextResponse } from 'next/server' import { DEFAULT_EXECUTION_TIMEOUT_MS } from '@/lib/core/execution-limits' -import type { McpApiResponse } from '@/lib/mcp/types' +import { + type McpApiResponse, + McpConnectionError, + McpOauthAuthorizationRequiredError, +} from '@/lib/mcp/types' import { isMcpTool, MCP } from '@/executor/constants' export const MCP_CONSTANTS = { @@ -137,28 +142,36 @@ export function categorizeError(error: unknown): { message: string; status: numb return { message: 'Unknown error occurred', status: 500 } } + // Typed dispatch first — our own classes carry definitive intent. + if (error instanceof McpOauthAuthorizationRequiredError || error instanceof UnauthorizedError) { + return { message: 'Authentication required', status: 401 } + } + if (error instanceof McpConnectionError) { + if (error.message.toLowerCase().includes('cooldown')) { + return { message: 'Server temporarily unavailable', status: 503 } + } + return { message: 'Connection failed', status: 502 } + } + + // Fall back to substring matching for SDK / third-party errors we don't + // own a typed class for. const msg = error.message.toLowerCase() if (msg.includes('timeout')) { return { message: 'Request timed out', status: 408 } } - if (msg.includes('cooldown')) { return { message: 'Server temporarily unavailable', status: 503 } } - if (msg.includes('not found') || msg.includes('not accessible')) { return { message: 'Resource not found', status: 404 } } - if (msg.includes('authentication') || msg.includes('unauthorized')) { return { message: 'Authentication required', status: 401 } } - if (msg.includes('invalid') || msg.includes('missing required') || msg.includes('validation')) { return { message: 'Invalid request parameters', status: 400 } } - return { message: 'Internal server error', status: 500 } } From 13c2a6db1eae508372f8cd9dc489ff6f74702a15 Mon Sep 17 00:00:00 2001 From: Waleed Latif Date: Fri, 22 May 2026 09:00:26 -0700 Subject: [PATCH 2/9] fix(mcp): bugbot review on hardening PR MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Replace `as never` cast in negotiateProtocolVersion with `as readonly string[]` on the array — preserves TypeScript narrowing on the comparison while satisfying the readonly-tuple `.includes()` constraint properly. - Document the pg_advisory_xact_lock tradeoff: session-level locks (`pg_advisory_lock`) would release the connection earlier, but PgBouncer transaction-pooling mode breaks them. xact_lock is the correct choice for Sim's deployment; if pool pressure becomes real, the comment notes the Redlock escape hatch. --- apps/sim/app/api/mcp/serve/[serverId]/route.ts | 5 ++++- apps/sim/lib/mcp/oauth/storage.ts | 8 ++++++++ 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/apps/sim/app/api/mcp/serve/[serverId]/route.ts b/apps/sim/app/api/mcp/serve/[serverId]/route.ts index 61767f8143f..dcb206ef3f0 100644 --- a/apps/sim/app/api/mcp/serve/[serverId]/route.ts +++ b/apps/sim/app/api/mcp/serve/[serverId]/route.ts @@ -46,7 +46,10 @@ function negotiateProtocolVersion(rpcParams: unknown): string { rpcParams && typeof rpcParams === 'object' && 'protocolVersion' in rpcParams ? (rpcParams as { protocolVersion?: unknown }).protocolVersion : undefined - if (typeof requested === 'string' && SUPPORTED_PROTOCOL_VERSIONS.includes(requested as never)) { + if ( + typeof requested === 'string' && + (SUPPORTED_PROTOCOL_VERSIONS as readonly string[]).includes(requested) + ) { return requested } return LATEST_PROTOCOL_VERSION diff --git a/apps/sim/lib/mcp/oauth/storage.ts b/apps/sim/lib/mcp/oauth/storage.ts index 84531646688..d110bd63f71 100644 --- a/apps/sim/lib/mcp/oauth/storage.ts +++ b/apps/sim/lib/mcp/oauth/storage.ts @@ -237,6 +237,14 @@ export async function clearState(rowId: string): Promise { * Node process holds concurrent callers. * 2) Postgres advisory transaction lock — blocks across processes; released * automatically when the transaction ends. + * + * Tradeoff: the connection is held for the duration of `fn()`, which includes + * the SDK's OAuth HTTP refresh. Session-level locks (`pg_advisory_lock`) would + * release the connection earlier, but they don't survive PgBouncer transaction + * pooling — they're scoped to the underlying physical connection, which can be + * swapped between statements. `pg_advisory_xact_lock` is the correct choice + * here. If pool pressure becomes a real concern at scale, swap this for a + * Redis-based distributed lock (Redlock) that doesn't pin a DB connection. */ const refreshLocks = new Map>() From d01506bf625a915370a0c182f7b46ce212139668 Mon Sep 17 00:00:00 2001 From: Waleed Latif Date: Fri, 22 May 2026 09:07:40 -0700 Subject: [PATCH 3/9] chore(mcp): trim verbose comments + reuse SDK Tool type in McpTool MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - McpTool now extends `Pick` from @modelcontextprotocol/sdk so name/description fields stay in sync with the SDK; serverId/serverName remain Sim-specific additions. - Drop file-header restatements ("MCP Types - for connecting to external MCP servers"), one-line wrapper docstrings ("Get connection status"), and narrative comment blocks that just restate visible code. - Keep only comments that document non-obvious "why" — OAuth refresh-lock tradeoff, in-flight dedup key composition, SDK Tool.inputSchema typing, preregistered-client semantics, postMessage handshake contract. --- apps/sim/hooks/queries/mcp.ts | 24 ++------- apps/sim/lib/mcp/client.ts | 52 +------------------ apps/sim/lib/mcp/oauth/provider.ts | 5 +- apps/sim/lib/mcp/service.ts | 40 +-------------- apps/sim/lib/mcp/types.ts | 80 ++++-------------------------- 5 files changed, 16 insertions(+), 185 deletions(-) diff --git a/apps/sim/hooks/queries/mcp.ts b/apps/sim/hooks/queries/mcp.ts index 9f483a4fef7..b87ec642ee0 100644 --- a/apps/sim/hooks/queries/mcp.ts +++ b/apps/sim/hooks/queries/mcp.ts @@ -57,9 +57,7 @@ export const mcpKeys = { export type { McpServer } -/** - * Input for creating/updating an MCP server (distinct from McpServerConfig in types.ts) - */ +/** Wire shape for create/update; distinct from runtime McpServerConfig. */ export interface McpServerInput { name: string transport: McpTransport @@ -265,11 +263,7 @@ export function useCreateMcpServer() { }) } -/** - * Result of `useStartMcpOauth`. When `popup` is set, the caller should wait - * for it to close (or for the `mcp-oauth` postMessage) before clearing any - * "connecting" UI state. - */ +/** On `redirect`, the caller must wait for `popup.closed` or the `mcp-oauth` postMessage. */ export type StartMcpOauthMutationResult = | { status: 'redirect'; popup: Window } | { status: 'already_authorized' } @@ -464,13 +458,7 @@ const sseConnections: Map = ((globalThis as Record)[SSE_KEY] as Map) ?? ((globalThis as Record)[SSE_KEY] = new Map()) -/** - * Subscribe to MCP tool-change SSE events for a workspace. - * On each `tools_changed` event, invalidates the relevant React Query caches - * so the UI refreshes automatically. - * - * Invalidates both external MCP server keys and workflow MCP server keys. - */ +/** Subscribes to `tools_changed` SSE events and invalidates the affected query keys. */ export function useMcpToolsEvents(workspaceId: string) { const queryClient = useQueryClient() @@ -598,17 +586,11 @@ export function useMcpServerTest() { } } -/** - * Fetch allowed MCP domains (admin-configured allowlist) - */ async function fetchAllowedMcpDomains(signal?: AbortSignal): Promise { const data = await requestJson(getAllowedMcpDomainsContract, { signal }) return data.allowedMcpDomains ?? null } -/** - * Hook to fetch allowed MCP domains - */ export function useAllowedMcpDomains() { return useQuery({ queryKey: mcpKeys.allowedDomains(), diff --git a/apps/sim/lib/mcp/client.ts b/apps/sim/lib/mcp/client.ts index 9f3c36d00a5..925aa840df9 100644 --- a/apps/sim/lib/mcp/client.ts +++ b/apps/sim/lib/mcp/client.ts @@ -1,13 +1,3 @@ -/** - * MCP (Model Context Protocol) Client - * - * Implements the client side of MCP protocol with support for: - * - Streamable HTTP transport (MCP 2025-06-18) - * - Tool execution and discovery - * - Session management and protocol version negotiation - * - Custom security/consent layer - */ - import { UnauthorizedError } from '@modelcontextprotocol/sdk/client/auth.js' import { Client } from '@modelcontextprotocol/sdk/client/index.js' import { StreamableHTTPClientTransport } from '@modelcontextprotocol/sdk/client/streamableHttp.js' @@ -94,11 +84,6 @@ export class McpClient { ) } - /** - * Initialize connection to MCP server. - * If an `onToolsChanged` callback was provided, registers a notification handler - * for `notifications/tools/list_changed` after connecting. - */ async connect(): Promise { logger.info(`Connecting to MCP server: ${this.config.name} (${this.config.transport})`) @@ -135,9 +120,6 @@ export class McpClient { } } - /** - * Disconnect from MCP server - */ async disconnect(): Promise { logger.info(`Disconnecting from MCP server: ${this.config.name}`) @@ -152,16 +134,10 @@ export class McpClient { logger.info(`Disconnected from MCP server: ${this.config.name}`) } - /** - * Get current connection status - */ getStatus(): McpConnectionStatus { return { ...this.connectionStatus } } - /** - * List all available tools from the server - */ async listTools(): Promise { if (!this.isConnected) { throw new McpConnectionError('Not connected to server', this.config.name) @@ -190,9 +166,6 @@ export class McpClient { } } - /** - * Execute a tool on the MCP server - */ async callTool(toolCall: McpToolCall): Promise { if (!this.isConnected) { throw new McpConnectionError('Not connected to server', this.config.name) @@ -237,10 +210,6 @@ export class McpClient { } } - /** - * Ping the server to check if it's still alive and responsive - * Per MCP spec: servers should respond to ping requests - */ async ping(): Promise<{ _meta?: Record }> { if (!this.isConnected) { throw new McpConnectionError('Not connected to server', this.config.name) @@ -257,18 +226,11 @@ export class McpClient { } } - /** - * Check if the server declared `capabilities.tools.listChanged: true` during initialization. - */ hasListChangedCapability(): boolean { return !!this.client.getServerCapabilities()?.tools?.listChanged } - /** - * Register a callback to be invoked when the underlying transport closes. - * Used by the connection manager for reconnection logic. - * Chains with the SDK's internal onclose handler so it still performs its cleanup. - */ + /** Chains with the SDK's internal onclose handler so its cleanup still runs. */ onClose(callback: () => void): void { const existingHandler = this.transport.onclose this.transport.onclose = () => { @@ -277,16 +239,10 @@ export class McpClient { } } - /** - * Get server configuration - */ getConfig(): McpServerConfig { return { ...this.config } } - /** - * Get version information for this client - */ static getVersionInfo(): McpVersionInfo { return { supported: [...McpClient.SUPPORTED_VERSIONS], @@ -294,9 +250,6 @@ export class McpClient { } } - /** - * Get the negotiated protocol version for this connection - */ getNegotiatedVersion(): string | undefined { const serverVersion = this.client.getServerVersion() return typeof serverVersion === 'string' ? serverVersion : undefined @@ -306,9 +259,6 @@ export class McpClient { return this.transport.sessionId } - /** - * Request user consent for tool execution - */ async requestConsent(consentRequest: McpConsentRequest): Promise { if (!this.securityPolicy.requireConsent) { return { granted: true, auditId: `audit-${Date.now()}` } diff --git a/apps/sim/lib/mcp/oauth/provider.ts b/apps/sim/lib/mcp/oauth/provider.ts index 9b130e06453..86a2c72de42 100644 --- a/apps/sim/lib/mcp/oauth/provider.ts +++ b/apps/sim/lib/mcp/oauth/provider.ts @@ -41,10 +41,7 @@ export interface PreregisteredClient { interface SimMcpOauthProviderInit { row: McpOauthRow scope?: string - /** - * Optional user-supplied client credentials. When provided, the SDK skips - * Dynamic Client Registration and uses these for the auth/token exchange. - */ + /** When set, the SDK skips Dynamic Client Registration and uses these credentials directly. */ preregistered?: PreregisteredClient } diff --git a/apps/sim/lib/mcp/service.ts b/apps/sim/lib/mcp/service.ts index 90b931c2298..adbef7b0ae2 100644 --- a/apps/sim/lib/mcp/service.ts +++ b/apps/sim/lib/mcp/service.ts @@ -1,7 +1,3 @@ -/** - * MCP Service - Clean stateless service for MCP operations - */ - import { UnauthorizedError } from '@modelcontextprotocol/sdk/client/auth.js' import { StreamableHTTPError } from '@modelcontextprotocol/sdk/client/streamableHttp.js' import { db } from '@sim/db' @@ -100,19 +96,12 @@ class McpService { } } - /** - * Dispose of the service and cleanup resources - */ dispose(): void { this.unsubscribeConnectionManager?.() this.cacheAdapter.dispose() logger.info('MCP Service disposed') } - /** - * Resolve environment variables in server config. - * Uses shared utility with strict mode (throws on missing vars). - */ private async resolveConfigEnvVars( config: McpServerConfig, userId: string, @@ -126,9 +115,6 @@ class McpService { return { config: resolvedConfig, resolvedIP } } - /** - * Get server configuration from database - */ private async getServerConfig( serverId: string, workspaceId: string @@ -171,9 +157,6 @@ class McpService { } } - /** - * Get all enabled servers for a workspace - */ private async getWorkspaceServers(workspaceId: string): Promise { const whereConditions = [ eq(mcpServers.workspaceId, workspaceId), @@ -205,9 +188,6 @@ class McpService { .filter((config) => isMcpDomainAllowed(config.url)) } - /** - * Create and connect to an MCP client - */ private async createClient( config: McpServerConfig, resolvedIP: string | null, @@ -262,10 +242,6 @@ class McpService { }) } - /** - * Execute a tool on a specific server with retry logic for session errors. - * Retries once on session-related errors (400, 404, session ID issues). - */ async executeTool( userId: string, serverId: string, @@ -320,12 +296,7 @@ class McpService { throw new Error(`Failed to execute tool ${toolCall.name} after ${maxRetries} attempts`) } - /** - * Detects an expired or unknown `Mcp-Session-Id` so the caller can retry. - * Per MCP spec, the server returns HTTP 404 for an unknown session id and - * may return 400 when the session header is malformed; the SDK surfaces - * both as `StreamableHTTPError` with a typed numeric `code` field. - */ + /** MCP spec: server returns 404 for unknown session id, 400 for malformed header. */ private isSessionError(error: unknown): boolean { if (error instanceof StreamableHTTPError) { return error.code === 404 || error.code === 400 @@ -333,9 +304,6 @@ class McpService { return false } - /** - * Update server connection status after discovery attempt - */ private async updateServerStatus( serverId: string, workspaceId: string, @@ -448,9 +416,6 @@ class McpService { } } - /** - * Discover tools from all workspace servers - */ async discoverTools( userId: string, workspaceId: string, @@ -744,9 +709,6 @@ class McpService { throw new Error(`Failed to discover tools from server ${serverId} after ${maxRetries} attempts`) } - /** - * Get server summaries for a user - */ async getServerSummaries(userId: string, workspaceId: string): Promise { const requestId = generateRequestId() diff --git a/apps/sim/lib/mcp/types.ts b/apps/sim/lib/mcp/types.ts index 1cb4ad3e782..04ebad6a4be 100644 --- a/apps/sim/lib/mcp/types.ts +++ b/apps/sim/lib/mcp/types.ts @@ -1,15 +1,8 @@ -/** - * MCP Types - for connecting to external MCP servers - */ +import type { Tool } from '@modelcontextprotocol/sdk/types.js' export type McpTransport = 'streamable-http' -/** - * Auth mode for an outbound MCP server connection. - * - `none` — server requires no auth. - * - `headers` — static header map (legacy / API-token / bearer). - * - `oauth` — OAuth 2.1 + PKCE via the SDK's authProvider, persisted per workspace server. - */ +/** `oauth` uses the SDK's authProvider; `headers` is a static map; `none` is unauthenticated. */ export type McpAuthType = 'none' | 'headers' | 'oauth' export interface McpServerStatusConfig { @@ -24,10 +17,7 @@ export interface McpServerConfig { transport: McpTransport url?: string authType?: McpAuthType - /** - * Required when `authType === 'oauth'` — identifies whose stored tokens - * to use when establishing the connection. Omit for header / none auth. - */ + /** Required for `authType === 'oauth'` — selects whose stored tokens to use. */ userId?: string workspaceId?: string headers?: Record @@ -72,10 +62,6 @@ export interface McpSecurityPolicy { auditLevel: 'none' | 'basic' | 'detailed' } -/** - * JSON Schema property definition for tool parameters. - * Follows JSON Schema specification with description support. - */ export interface McpToolSchemaProperty { type?: string | string[] description?: string @@ -87,10 +73,7 @@ export interface McpToolSchemaProperty { [key: string]: unknown } -/** - * JSON Schema for tool input parameters. - * Aligns with MCP SDK's Tool.inputSchema structure. - */ +/** Typed view of the SDK's `Tool.inputSchema` (which is `Record`). */ export interface McpToolSchema { type: 'object' properties?: Record @@ -99,13 +82,8 @@ export interface McpToolSchema { [key: string]: unknown } -/** - * MCP Tool with server context. - * Extends the SDK's Tool type with app-specific server tracking. - */ -export interface McpTool { - name: string - description?: string +/** SDK `Tool` plus the server context Sim tracks. */ +export interface McpTool extends Pick { inputSchema: McpToolSchema serverId: string serverName: string @@ -151,12 +129,7 @@ export class McpConnectionError extends McpError { } } -/** - * Thrown when an OAuth-protected MCP server is reachable but the current - * user has not yet authorized Sim. This is a benign "pending" state, not a - * connection failure — callers should surface a re-auth prompt rather than - * marking the server as errored. - */ +/** Benign "needs re-auth" state — distinct from a connection failure. */ export class McpOauthAuthorizationRequiredError extends McpError { constructor( public readonly serverId: string, @@ -180,38 +153,18 @@ export interface McpServerSummary { error?: string } -/** - * Callback invoked when an MCP server sends a `notifications/tools/list_changed` notification. - */ export type McpToolsChangedCallback = (serverId: string) => void -/** - * Options for creating an McpClient with notification support. - */ export interface McpClientOptions { config: McpServerConfig securityPolicy?: McpSecurityPolicy onToolsChanged?: McpToolsChangedCallback - /** - * Pre-resolved IP address to pin all transport HTTP connections to. When - * set, the SDK transport uses a custom fetch backed by an undici Agent with - * a fixed DNS lookup, preventing DNS-rebinding (TOCTOU) attacks between - * URL validation and connection. Should be supplied by callers that have - * just validated the URL via `validateMcpServerSsrf`. - */ + /** Pre-resolved IP pinned via undici to prevent DNS-rebinding between URL validation and connection. */ resolvedIP?: string - /** - * SDK-compatible OAuth client provider. When provided, the underlying - * StreamableHTTPClientTransport delegates token discovery, refresh, and - * 401 recovery to it. Should be supplied for `authType === 'oauth'` - * server configs. - */ + /** SDK provider for OAuth token discovery, refresh, and 401 recovery. Required for `authType === 'oauth'`. */ authProvider?: import('@modelcontextprotocol/sdk/client/auth.js').OAuthClientProvider } -/** - * Event emitted by the connection manager when a server's tools change. - */ export interface ToolsChangedEvent { serverId: string serverName: string @@ -219,9 +172,6 @@ export interface ToolsChangedEvent { timestamp: number } -/** - * State of a managed persistent connection. - */ export interface ManagedConnectionState { serverId: string serverName: string @@ -233,9 +183,6 @@ export interface ManagedConnectionState { lastActivity: number } -/** - * Event emitted when workflow CRUD modifies a workflow MCP server's tools. - */ export interface WorkflowToolsChangedEvent { serverId: string workspaceId: string @@ -253,10 +200,7 @@ export interface McpToolDiscoveryResponse { byServer: Record } -/** - * MCP tool reference stored in workflow blocks (for validation). - * Minimal version used for comparing against discovered tools. - */ +/** Minimal MCP tool reference stored in workflow blocks for schema validation. */ export interface StoredMcpToolReference { serverId: string serverUrl?: string @@ -264,10 +208,6 @@ export interface StoredMcpToolReference { schema?: McpToolSchema } -/** - * Full stored MCP tool with workflow context (for API responses). - * Extended version that includes which workflow the tool is used in. - */ export interface StoredMcpTool extends StoredMcpToolReference { workflowId: string workflowName: string From cdb3e14874b2867f92e10b244231bee7df80d253 Mon Sep 17 00:00:00 2001 From: Waleed Latif Date: Fri, 22 May 2026 09:15:26 -0700 Subject: [PATCH 4/9] improvement(mcp): swap PG advisory lock for Redis mutex on OAuth refresh MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit withMcpOauthRefreshLock now uses `coalesceLocally` + Redis acquireLock/ releaseLock with polling — the same primitives backing regular OAuth refresh (`app/api/auth/oauth/utils.ts`). No more pinning a Postgres connection for the duration of the SDK's OAuth HTTP refresh. - In-process dedup: shared promise via `coalesceLocally`. - Cross-process: Redis SET NX EX mutex; followers poll until the leader releases (30s max wait, 100ms poll), then acquire and run fn(). - Each MCP caller still constructs its own client (semantics preserved). - Falls open when Redis is unavailable — same behavior as the regular OAuth refresh code path. --- apps/sim/lib/mcp/oauth/storage.ts | 85 ++++++++++++++++++++----------- 1 file changed, 54 insertions(+), 31 deletions(-) diff --git a/apps/sim/lib/mcp/oauth/storage.ts b/apps/sim/lib/mcp/oauth/storage.ts index d110bd63f71..9a9b1759731 100644 --- a/apps/sim/lib/mcp/oauth/storage.ts +++ b/apps/sim/lib/mcp/oauth/storage.ts @@ -7,8 +7,11 @@ import { db } from '@sim/db' import { mcpServerOauth } from '@sim/db/schema' import { createLogger } from '@sim/logger' import { toError } from '@sim/utils/errors' -import { generateId } from '@sim/utils/id' -import { and, eq, gt, sql } from 'drizzle-orm' +import { sleep } from '@sim/utils/helpers' +import { generateId, generateShortId } from '@sim/utils/id' +import { and, eq, gt } from 'drizzle-orm' +import { coalesceLocally } from '@/lib/concurrency/singleflight' +import { acquireLock, releaseLock } from '@/lib/core/config/redis' import { decryptSecret, encryptSecret } from '@/lib/core/security/encryption' const logger = createLogger('McpOauthStorage') @@ -232,38 +235,58 @@ export async function clearState(rowId: string): Promise { * refreshes against the same row would race and one would receive * `invalid_grant`, wiping credentials. * - * Two-tier locking: - * 1) In-process Promise chain — cheap, avoids DB roundtrips when the same - * Node process holds concurrent callers. - * 2) Postgres advisory transaction lock — blocks across processes; released - * automatically when the transaction ends. + * Two-tier coordination, matching the regular OAuth refresh pattern + * (`app/api/auth/oauth/utils.ts`): + * 1) `coalesceLocally` — in-process dedup; concurrent same-process callers + * share a single inflight promise. + * 2) Redis distributed lock (`acquireLock` / `releaseLock`) — cross-process + * mutex. Followers poll until the leader releases, then acquire and run + * their own `fn()` (each MCP caller needs its own client connection). * - * Tradeoff: the connection is held for the duration of `fn()`, which includes - * the SDK's OAuth HTTP refresh. Session-level locks (`pg_advisory_lock`) would - * release the connection earlier, but they don't survive PgBouncer transaction - * pooling — they're scoped to the underlying physical connection, which can be - * swapped between statements. `pg_advisory_xact_lock` is the correct choice - * here. If pool pressure becomes a real concern at scale, swap this for a - * Redis-based distributed lock (Redlock) that doesn't pin a DB connection. + * Falls open if Redis is unavailable — `acquireLock` no-ops, all callers run + * `fn()` uncoordinated. The in-process layer still serializes within a + * process; cross-process races become possible but rare in practice. */ -const refreshLocks = new Map>() +const REFRESH_LOCK_TTL_SEC = 30 +const REFRESH_POLL_INTERVAL_MS = 100 +const REFRESH_MAX_WAIT_MS = 30_000 export async function withMcpOauthRefreshLock(rowId: string, fn: () => Promise): Promise { - const prev = refreshLocks.get(rowId) ?? Promise.resolve() - const next = prev - .catch(() => undefined) - .then(() => - db.transaction(async (tx) => { - await tx.execute( - sql`SELECT pg_advisory_xact_lock(hashtextextended(${`mcp_oauth_refresh:${rowId}`}, 0))` - ) + const lockKey = `mcp:oauth:refresh:${rowId}` + return coalesceLocally(lockKey, async () => { + const ownerToken = generateShortId() + const deadline = Date.now() + REFRESH_MAX_WAIT_MS + + while (true) { + let acquired = false + try { + acquired = await acquireLock(lockKey, ownerToken, REFRESH_LOCK_TTL_SEC) + } catch (error) { + logger.warn('Redis unavailable, running OAuth flow uncoordinated', { + rowId, + error: toError(error).message, + }) return fn() - }) - ) - refreshLocks.set(rowId, next) - const cleanup = () => { - if (refreshLocks.get(rowId) === next) refreshLocks.delete(rowId) - } - next.then(cleanup, cleanup) - return next + } + + if (acquired) { + try { + return await fn() + } finally { + await releaseLock(lockKey, ownerToken).catch((error) => { + logger.warn('Refresh lock release failed (will expire via TTL)', { + rowId, + error: toError(error).message, + }) + }) + } + } + + if (Date.now() >= deadline) { + logger.warn('Refresh lock wait timed out, running uncoordinated', { rowId }) + return fn() + } + await sleep(REFRESH_POLL_INTERVAL_MS) + } + }) } From e6c4ec2b725a49cc094978ea623b2d028e36172e Mon Sep 17 00:00:00 2001 From: Waleed Latif Date: Fri, 22 May 2026 09:47:51 -0700 Subject: [PATCH 5/9] improvement(mcp): use SDK protocol versions + pool pinned undici agents + cover OAuth lock - McpClient.SUPPORTED_VERSIONS removed; getVersionInfo() and the inbound serve route both import LATEST_PROTOCOL_VERSION / SUPPORTED_PROTOCOL_VERSIONS directly from @modelcontextprotocol/sdk/types.js. New protocol revisions ship automatically with SDK upgrades. - pinned-fetch now caches undici Agents in a module-level LRU keyed by resolvedIP (max 64). Back-to-back MCP calls to the same server reuse the keep-alive connection pool instead of opening fresh TCP + TLS each time. - New integration tests for withMcpOauthRefreshLock covering: in-process dedup via coalesceLocally, cross-process serialization via Redis mutex, fall-open on Redis unavailable, lock release on throw, release-failure swallow, per-row key isolation. --- .../api/mcp/serve/[serverId]/route.test.ts | 10 +- .../sim/app/api/mcp/serve/[serverId]/route.ts | 12 +-- apps/sim/lib/mcp/client.ts | 12 +-- apps/sim/lib/mcp/oauth/storage.test.ts | 96 ++++++++++++++++++- apps/sim/lib/mcp/pinned-fetch.test.ts | 27 +++++- apps/sim/lib/mcp/pinned-fetch.ts | 58 +++++++---- 6 files changed, 173 insertions(+), 42 deletions(-) diff --git a/apps/sim/app/api/mcp/serve/[serverId]/route.test.ts b/apps/sim/app/api/mcp/serve/[serverId]/route.test.ts index 01ee610f6d8..cd9c5523231 100644 --- a/apps/sim/app/api/mcp/serve/[serverId]/route.test.ts +++ b/apps/sim/app/api/mcp/serve/[serverId]/route.test.ts @@ -232,14 +232,16 @@ describe('MCP Serve Route', () => { expect(body.result.protocolVersion).toBe('2024-11-05') }) - it('falls back to latest when client requests unknown version', async () => { + it('falls back to SDK latest when client requests unknown version', async () => { + const { LATEST_PROTOCOL_VERSION } = await import('@modelcontextprotocol/sdk/types.js') const body = await callInitialize('2099-01-01') - expect(body.result.protocolVersion).toBe('2025-06-18') + expect(body.result.protocolVersion).toBe(LATEST_PROTOCOL_VERSION) }) - it('falls back to latest when client omits protocolVersion', async () => { + it('falls back to SDK latest when client omits protocolVersion', async () => { + const { LATEST_PROTOCOL_VERSION } = await import('@modelcontextprotocol/sdk/types.js') const body = await callInitialize(undefined) - expect(body.result.protocolVersion).toBe('2025-06-18') + expect(body.result.protocolVersion).toBe(LATEST_PROTOCOL_VERSION) }) }) }) diff --git a/apps/sim/app/api/mcp/serve/[serverId]/route.ts b/apps/sim/app/api/mcp/serve/[serverId]/route.ts index dcb206ef3f0..d876dcd0ef2 100644 --- a/apps/sim/app/api/mcp/serve/[serverId]/route.ts +++ b/apps/sim/app/api/mcp/serve/[serverId]/route.ts @@ -11,8 +11,10 @@ import { type JSONRPCError, type JSONRPCMessage, type JSONRPCResultResponse, + LATEST_PROTOCOL_VERSION, type ListToolsResult, type RequestId, + SUPPORTED_PROTOCOL_VERSIONS, type Tool, } from '@modelcontextprotocol/sdk/types.js' import { db } from '@sim/db' @@ -36,20 +38,12 @@ import { getUserEntityPermissions } from '@/lib/workspaces/permissions/utils' const logger = createLogger('WorkflowMcpServeAPI') -// Newest first. We echo the client's version when we support it (per -// MCP 2025-06-18 lifecycle spec); otherwise fall back to our latest. -const SUPPORTED_PROTOCOL_VERSIONS = ['2025-06-18', '2025-03-26', '2024-11-05'] as const -const LATEST_PROTOCOL_VERSION = SUPPORTED_PROTOCOL_VERSIONS[0] - function negotiateProtocolVersion(rpcParams: unknown): string { const requested = rpcParams && typeof rpcParams === 'object' && 'protocolVersion' in rpcParams ? (rpcParams as { protocolVersion?: unknown }).protocolVersion : undefined - if ( - typeof requested === 'string' && - (SUPPORTED_PROTOCOL_VERSIONS as readonly string[]).includes(requested) - ) { + if (typeof requested === 'string' && SUPPORTED_PROTOCOL_VERSIONS.includes(requested)) { return requested } return LATEST_PROTOCOL_VERSION diff --git a/apps/sim/lib/mcp/client.ts b/apps/sim/lib/mcp/client.ts index 925aa840df9..7fcf7351a12 100644 --- a/apps/sim/lib/mcp/client.ts +++ b/apps/sim/lib/mcp/client.ts @@ -2,7 +2,9 @@ import { UnauthorizedError } from '@modelcontextprotocol/sdk/client/auth.js' import { Client } from '@modelcontextprotocol/sdk/client/index.js' import { StreamableHTTPClientTransport } from '@modelcontextprotocol/sdk/client/streamableHttp.js' import { + LATEST_PROTOCOL_VERSION, type ListToolsResult, + SUPPORTED_PROTOCOL_VERSIONS, type Tool, ToolListChangedNotificationSchema, } from '@modelcontextprotocol/sdk/types.js' @@ -40,12 +42,6 @@ export class McpClient { private authProvider?: McpClientOptions['authProvider'] private isConnected = false - private static readonly SUPPORTED_VERSIONS = [ - '2025-06-18', // Latest stable with elicitation and OAuth 2.1 - '2025-03-26', // Streamable HTTP support - '2024-11-05', // Initial stable release - ] - constructor(options: McpClientOptions) { this.config = options.config this.securityPolicy = options.securityPolicy ?? { @@ -245,8 +241,8 @@ export class McpClient { static getVersionInfo(): McpVersionInfo { return { - supported: [...McpClient.SUPPORTED_VERSIONS], - preferred: McpClient.SUPPORTED_VERSIONS[0], + supported: [...SUPPORTED_PROTOCOL_VERSIONS], + preferred: LATEST_PROTOCOL_VERSION, } } diff --git a/apps/sim/lib/mcp/oauth/storage.test.ts b/apps/sim/lib/mcp/oauth/storage.test.ts index 95c7ae853c0..2f81fef55f8 100644 --- a/apps/sim/lib/mcp/oauth/storage.test.ts +++ b/apps/sim/lib/mcp/oauth/storage.test.ts @@ -11,11 +11,25 @@ import { } from '@sim/testing' import { beforeEach, describe, expect, it, vi } from 'vitest' +const { mockAcquireLock, mockReleaseLock } = vi.hoisted(() => ({ + mockAcquireLock: vi.fn(), + mockReleaseLock: vi.fn(), +})) + vi.mock('@sim/db', () => dbChainMock) vi.mock('@sim/db/schema', () => schemaMock) vi.mock('@/lib/core/security/encryption', () => encryptionMock) +vi.mock('@/lib/core/config/redis', () => ({ + acquireLock: mockAcquireLock, + releaseLock: mockReleaseLock, +})) -import { getOrCreateOauthRow, loadOauthRow, setOauthRowUser } from './storage' +import { + getOrCreateOauthRow, + loadOauthRow, + setOauthRowUser, + withMcpOauthRefreshLock, +} from './storage' describe('MCP OAuth storage', () => { beforeEach(() => { @@ -92,3 +106,83 @@ describe('MCP OAuth storage', () => { ) }) }) + +describe('withMcpOauthRefreshLock', () => { + beforeEach(() => { + vi.clearAllMocks() + mockAcquireLock.mockReset() + mockReleaseLock.mockReset() + mockReleaseLock.mockResolvedValue(true) + }) + + it('coalesces concurrent in-process callers onto a single fn execution', async () => { + mockAcquireLock.mockResolvedValue(true) + const fn = vi.fn(async () => 'tokens') + + const results = await Promise.all([ + withMcpOauthRefreshLock('row-coalesce', fn), + withMcpOauthRefreshLock('row-coalesce', fn), + withMcpOauthRefreshLock('row-coalesce', fn), + ]) + + expect(results).toEqual(['tokens', 'tokens', 'tokens']) + expect(fn).toHaveBeenCalledTimes(1) + expect(mockAcquireLock).toHaveBeenCalledTimes(1) + expect(mockReleaseLock).toHaveBeenCalledTimes(1) + }) + + it('serializes cross-process callers: follower polls until leader releases', async () => { + // First acquire fails (another process holds it), second succeeds. + mockAcquireLock.mockResolvedValueOnce(false).mockResolvedValueOnce(true) + const fn = vi.fn(async () => 'fresh') + + const result = await withMcpOauthRefreshLock('row-mutex', fn) + + expect(result).toBe('fresh') + expect(mockAcquireLock).toHaveBeenCalledTimes(2) + expect(fn).toHaveBeenCalledTimes(1) + }) + + it('falls open when Redis is unavailable on acquire', async () => { + mockAcquireLock.mockRejectedValueOnce(new Error('Redis connection refused')) + const fn = vi.fn(async () => 'uncoordinated') + + const result = await withMcpOauthRefreshLock('row-redis-down', fn) + + expect(result).toBe('uncoordinated') + expect(fn).toHaveBeenCalledTimes(1) + expect(mockReleaseLock).not.toHaveBeenCalled() + }) + + it('releases the lock even when fn throws', async () => { + mockAcquireLock.mockResolvedValue(true) + const fn = vi.fn(async () => { + throw new Error('refresh failed') + }) + + await expect(withMcpOauthRefreshLock('row-throws', fn)).rejects.toThrow('refresh failed') + + expect(mockReleaseLock).toHaveBeenCalledTimes(1) + }) + + it('does not surface releaseLock failures to the caller', async () => { + mockAcquireLock.mockResolvedValue(true) + mockReleaseLock.mockRejectedValueOnce(new Error('release failed')) + const fn = vi.fn(async () => 'value') + + const result = await withMcpOauthRefreshLock('row-release-fail', fn) + expect(result).toBe('value') + }) + + it('uses per-row lock keys so different rows do not serialize', async () => { + mockAcquireLock.mockResolvedValue(true) + const fn = vi.fn(async () => 'ok') + + await Promise.all([withMcpOauthRefreshLock('row-a', fn), withMcpOauthRefreshLock('row-b', fn)]) + + expect(mockAcquireLock).toHaveBeenCalledTimes(2) + const keys = mockAcquireLock.mock.calls.map((c) => c[0]) + expect(keys).toContain('mcp:oauth:refresh:row-a') + expect(keys).toContain('mcp:oauth:refresh:row-b') + }) +}) diff --git a/apps/sim/lib/mcp/pinned-fetch.test.ts b/apps/sim/lib/mcp/pinned-fetch.test.ts index 3237ae4fe44..229845aa36f 100644 --- a/apps/sim/lib/mcp/pinned-fetch.test.ts +++ b/apps/sim/lib/mcp/pinned-fetch.test.ts @@ -25,12 +25,13 @@ vi.mock('@/lib/core/security/input-validation.server', () => ({ createPinnedLookup: mockCreatePinnedLookup, })) -import { createMcpPinnedFetch } from '@/lib/mcp/pinned-fetch' +import { __resetPinnedAgentsForTests, createMcpPinnedFetch } from '@/lib/mcp/pinned-fetch' describe('createMcpPinnedFetch', () => { beforeEach(() => { vi.clearAllMocks() capturedAgentOptions.length = 0 + __resetPinnedAgentsForTests() mockCreatePinnedLookup.mockReturnValue('pinned-lookup-fn') mockUndiciFetch.mockResolvedValue(new Response('ok')) }) @@ -73,7 +74,7 @@ describe('createMcpPinnedFetch', () => { expect(init.dispatcher).toBeInstanceOf(mockAgent) }) - it('reuses the same dispatcher across calls (one Agent per fetch instance)', async () => { + it('reuses the same dispatcher across calls within a fetch instance', async () => { const fetchLike = createMcpPinnedFetch('203.0.113.10') await fetchLike('https://example.com/a') await fetchLike('https://example.com/b') @@ -82,4 +83,26 @@ describe('createMcpPinnedFetch', () => { const d2 = (mockUndiciFetch.mock.calls[1][1] as { dispatcher: unknown }).dispatcher expect(d1).toBe(d2) }) + + it('pools agents by resolvedIP across createMcpPinnedFetch calls', async () => { + const a = createMcpPinnedFetch('203.0.113.10') + const b = createMcpPinnedFetch('203.0.113.10') + await a('https://example.com/a') + await b('https://example.com/b') + expect(capturedAgentOptions).toHaveLength(1) + const d1 = (mockUndiciFetch.mock.calls[0][1] as { dispatcher: unknown }).dispatcher + const d2 = (mockUndiciFetch.mock.calls[1][1] as { dispatcher: unknown }).dispatcher + expect(d1).toBe(d2) + }) + + it('creates separate agents for different resolved IPs', async () => { + const a = createMcpPinnedFetch('203.0.113.10') + const b = createMcpPinnedFetch('198.51.100.20') + await a('https://example.com/a') + await b('https://example.com/b') + expect(capturedAgentOptions).toHaveLength(2) + const d1 = (mockUndiciFetch.mock.calls[0][1] as { dispatcher: unknown }).dispatcher + const d2 = (mockUndiciFetch.mock.calls[1][1] as { dispatcher: unknown }).dispatcher + expect(d1).not.toBe(d2) + }) }) diff --git a/apps/sim/lib/mcp/pinned-fetch.ts b/apps/sim/lib/mcp/pinned-fetch.ts index 798de5710e6..40254e8d6dd 100644 --- a/apps/sim/lib/mcp/pinned-fetch.ts +++ b/apps/sim/lib/mcp/pinned-fetch.ts @@ -3,29 +3,51 @@ import { Agent, type RequestInit as UndiciRequestInit, fetch as undiciFetch } fr import { createPinnedLookup } from '@/lib/core/security/input-validation.server' /** - * Creates a FetchLike that pins all outbound HTTP connections to a pre-resolved - * IP address. Used by the MCP transport to prevent DNS-rebinding (TOCTOU) - * attacks: validation performs DNS once and confirms the IP is allowed; this - * fetch then forces every subsequent request (initial POST, SSE GET, redirects) - * to use that same IP, regardless of what the hostname now resolves to. + * Pins outbound HTTP connections to a pre-resolved IP to prevent DNS-rebinding + * between URL validation and connection. Hostname is preserved so TLS SNI and + * the Host header still match the certificate. * - * Uses undici's `fetch` directly so the `dispatcher` option is part of the - * real type contract — not a cast that would silently break if a future - * runtime swapped out the implementation. - * - * The original hostname is preserved on the request so TLS SNI and the Host - * header continue to match the certificate. + * Agents are pooled by `resolvedIP` so back-to-back calls to the same server + * reuse the same keep-alive connection pool instead of opening a fresh TCP + + * TLS connection per McpClient instance. */ +const MAX_POOLED_AGENTS = 64 +const pinnedAgents = new Map() + +function getPinnedAgent(resolvedIP: string): Agent { + const existing = pinnedAgents.get(resolvedIP) + if (existing) { + // LRU touch — re-insert to mark as most recently used. + pinnedAgents.delete(resolvedIP) + pinnedAgents.set(resolvedIP, existing) + return existing + } + if (pinnedAgents.size >= MAX_POOLED_AGENTS) { + const oldestKey = pinnedAgents.keys().next().value + if (oldestKey !== undefined) { + const oldest = pinnedAgents.get(oldestKey) + pinnedAgents.delete(oldestKey) + oldest?.close().catch(() => {}) + } + } + const agent = new Agent({ connect: { lookup: createPinnedLookup(resolvedIP) } }) + pinnedAgents.set(resolvedIP, agent) + return agent +} + +export function __resetPinnedAgentsForTests(): void { + for (const agent of pinnedAgents.values()) { + try { + void agent.close?.() + } catch {} + } + pinnedAgents.clear() +} + export function createMcpPinnedFetch(resolvedIP: string): FetchLike { - const dispatcher = new Agent({ - connect: { lookup: createPinnedLookup(resolvedIP) }, - }) + const dispatcher = getPinnedAgent(resolvedIP) return (async (url, init) => { - // DOM `RequestInit` and undici's `RequestInit` are structurally compatible - // at runtime (Node's global fetch IS undici) but differ in TS types. - // Cast the init through unknown to bridge the typing without losing the - // critical `dispatcher` typing on the call itself. const undiciInit: UndiciRequestInit = { // double-cast-allowed: DOM RequestInit and undici RequestInit are structurally compatible at runtime (Node's global fetch IS undici) but the TS types differ ...(init as unknown as UndiciRequestInit), From c5069c02cb448e7b2ee5d40966d77e9986330e1f Mon Sep 17 00:00:00 2001 From: Waleed Latif Date: Fri, 22 May 2026 09:56:08 -0700 Subject: [PATCH 6/9] fix(mcp): serialize OAuth refresh callers; do not share McpClient MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit withMcpOauthRefreshLock previously wrapped fn() in coalesceLocally, which returns the SAME promise (and the same resolved value) to all in-process callers. fn() returns a stateful McpClient — sharing it meant whichever caller finished first would disconnect the client while another was still mid-call, leaving in-flight RPC on a closed connection. Swap coalesceLocally for a per-row Promise chain: each caller waits for the previous to settle, then runs its OWN fn() (gets its own client). Cross-process Redis mutex semantics unchanged. The "shareable scalar" assumption that makes coalesceLocally correct for regular OAuth refresh (returns an access token string) does not hold for MCP, where each caller needs an independent connection. --- apps/sim/lib/mcp/oauth/storage.test.ts | 28 +++++--- apps/sim/lib/mcp/oauth/storage.ts | 94 +++++++++++++++----------- 2 files changed, 73 insertions(+), 49 deletions(-) diff --git a/apps/sim/lib/mcp/oauth/storage.test.ts b/apps/sim/lib/mcp/oauth/storage.test.ts index 2f81fef55f8..ec3c32a4ee4 100644 --- a/apps/sim/lib/mcp/oauth/storage.test.ts +++ b/apps/sim/lib/mcp/oauth/storage.test.ts @@ -115,20 +115,32 @@ describe('withMcpOauthRefreshLock', () => { mockReleaseLock.mockResolvedValue(true) }) - it('coalesces concurrent in-process callers onto a single fn execution', async () => { + it('serializes concurrent in-process callers, each running its own fn()', async () => { mockAcquireLock.mockResolvedValue(true) - const fn = vi.fn(async () => 'tokens') + let active = 0 + let maxActive = 0 + const fn = vi.fn(async () => { + active++ + maxActive = Math.max(maxActive, active) + await new Promise((r) => setTimeout(r, 1)) + active-- + return 'tokens' + }) const results = await Promise.all([ - withMcpOauthRefreshLock('row-coalesce', fn), - withMcpOauthRefreshLock('row-coalesce', fn), - withMcpOauthRefreshLock('row-coalesce', fn), + withMcpOauthRefreshLock('row-serial', fn), + withMcpOauthRefreshLock('row-serial', fn), + withMcpOauthRefreshLock('row-serial', fn), ]) expect(results).toEqual(['tokens', 'tokens', 'tokens']) - expect(fn).toHaveBeenCalledTimes(1) - expect(mockAcquireLock).toHaveBeenCalledTimes(1) - expect(mockReleaseLock).toHaveBeenCalledTimes(1) + // Each caller gets its own fn() invocation — critical because fn() returns + // a stateful McpClient that can't be shared across consumers. + expect(fn).toHaveBeenCalledTimes(3) + // But never two at the same time within a process. + expect(maxActive).toBe(1) + expect(mockAcquireLock).toHaveBeenCalledTimes(3) + expect(mockReleaseLock).toHaveBeenCalledTimes(3) }) it('serializes cross-process callers: follower polls until leader releases', async () => { diff --git a/apps/sim/lib/mcp/oauth/storage.ts b/apps/sim/lib/mcp/oauth/storage.ts index 9a9b1759731..d86164f5258 100644 --- a/apps/sim/lib/mcp/oauth/storage.ts +++ b/apps/sim/lib/mcp/oauth/storage.ts @@ -10,7 +10,6 @@ import { toError } from '@sim/utils/errors' import { sleep } from '@sim/utils/helpers' import { generateId, generateShortId } from '@sim/utils/id' import { and, eq, gt } from 'drizzle-orm' -import { coalesceLocally } from '@/lib/concurrency/singleflight' import { acquireLock, releaseLock } from '@/lib/core/config/redis' import { decryptSecret, encryptSecret } from '@/lib/core/security/encryption' @@ -235,58 +234,71 @@ export async function clearState(rowId: string): Promise { * refreshes against the same row would race and one would receive * `invalid_grant`, wiping credentials. * - * Two-tier coordination, matching the regular OAuth refresh pattern - * (`app/api/auth/oauth/utils.ts`): - * 1) `coalesceLocally` — in-process dedup; concurrent same-process callers - * share a single inflight promise. - * 2) Redis distributed lock (`acquireLock` / `releaseLock`) — cross-process - * mutex. Followers poll until the leader releases, then acquire and run - * their own `fn()` (each MCP caller needs its own client connection). + * Two-tier serialization (each caller runs its OWN `fn()` — callers consume + * `McpClient` instances that can't be shared, unlike a scalar access token): + * 1) In-process: per-row Promise chain. Concurrent callers queue; each + * runs `fn()` after the previous settles. + * 2) Cross-process: Redis mutex (`acquireLock` / `releaseLock`). Followers + * poll until the holder releases, then acquire and run `fn()`. * - * Falls open if Redis is unavailable — `acquireLock` no-ops, all callers run - * `fn()` uncoordinated. The in-process layer still serializes within a - * process; cross-process races become possible but rare in practice. + * Falls open if Redis is unavailable — `acquireLock` no-ops, but in-process + * serialization still holds within a single Node process. */ const REFRESH_LOCK_TTL_SEC = 30 const REFRESH_POLL_INTERVAL_MS = 100 const REFRESH_MAX_WAIT_MS = 30_000 +const inflightChains = new Map>() + export async function withMcpOauthRefreshLock(rowId: string, fn: () => Promise): Promise { const lockKey = `mcp:oauth:refresh:${rowId}` - return coalesceLocally(lockKey, async () => { - const ownerToken = generateShortId() - const deadline = Date.now() + REFRESH_MAX_WAIT_MS + const prev = inflightChains.get(lockKey) ?? Promise.resolve() + const next = prev.catch(() => undefined).then(() => runWithRedisMutex(lockKey, rowId, fn)) + inflightChains.set(lockKey, next) + const cleanup = () => { + if (inflightChains.get(lockKey) === next) inflightChains.delete(lockKey) + } + next.then(cleanup, cleanup) + return next as Promise +} - while (true) { - let acquired = false - try { - acquired = await acquireLock(lockKey, ownerToken, REFRESH_LOCK_TTL_SEC) - } catch (error) { - logger.warn('Redis unavailable, running OAuth flow uncoordinated', { - rowId, - error: toError(error).message, - }) - return fn() - } +async function runWithRedisMutex( + lockKey: string, + rowId: string, + fn: () => Promise +): Promise { + const ownerToken = generateShortId() + const deadline = Date.now() + REFRESH_MAX_WAIT_MS - if (acquired) { - try { - return await fn() - } finally { - await releaseLock(lockKey, ownerToken).catch((error) => { - logger.warn('Refresh lock release failed (will expire via TTL)', { - rowId, - error: toError(error).message, - }) + while (true) { + let acquired = false + try { + acquired = await acquireLock(lockKey, ownerToken, REFRESH_LOCK_TTL_SEC) + } catch (error) { + logger.warn('Redis unavailable, running OAuth flow uncoordinated', { + rowId, + error: toError(error).message, + }) + return fn() + } + + if (acquired) { + try { + return await fn() + } finally { + await releaseLock(lockKey, ownerToken).catch((error) => { + logger.warn('Refresh lock release failed (will expire via TTL)', { + rowId, + error: toError(error).message, }) - } + }) } + } - if (Date.now() >= deadline) { - logger.warn('Refresh lock wait timed out, running uncoordinated', { rowId }) - return fn() - } - await sleep(REFRESH_POLL_INTERVAL_MS) + if (Date.now() >= deadline) { + logger.warn('Refresh lock wait timed out, running uncoordinated', { rowId }) + return fn() } - }) + await sleep(REFRESH_POLL_INTERVAL_MS) + } } From 6d67cb746c5b40fdcbb95c6d5dca2055302cabe7 Mon Sep 17 00:00:00 2001 From: Waleed Latif Date: Fri, 22 May 2026 10:03:49 -0700 Subject: [PATCH 7/9] =?UTF-8?q?fix(mcp):=20bugbot=20=E2=80=94=20TTL=20watc?= =?UTF-8?q?hdog=20on=20OAuth=20lock=20+=20don't=20close=20evicted=20pinned?= =?UTF-8?q?=20agents?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Redis refresh lock now uses a 15s TTL with a watchdog that extends every 5s while fn() runs. Long-running OAuth refreshes no longer lose the lock mid-flight and let another process race the same refresh. - Pinned-agent LRU eviction no longer calls `agent.close()`. Existing `createMcpPinnedFetch` closures hold the dispatcher reference and were using a closed Agent after eviction. We drop from the cache and let GC release the dispatcher once the last closure dies; undici closes idle keep-alive connections via its own internal timeout. - New tests: watchdog extends while fn() runs and stops once it settles; evicted agents are not closed and captured closures still work. --- apps/sim/lib/mcp/oauth/storage.test.ts | 39 +++++++++++++++++++++++++- apps/sim/lib/mcp/oauth/storage.ts | 20 ++++++++++--- apps/sim/lib/mcp/pinned-fetch.test.ts | 27 +++++++++++++++--- apps/sim/lib/mcp/pinned-fetch.ts | 16 ++++------- 4 files changed, 83 insertions(+), 19 deletions(-) diff --git a/apps/sim/lib/mcp/oauth/storage.test.ts b/apps/sim/lib/mcp/oauth/storage.test.ts index ec3c32a4ee4..afd5fc28e3c 100644 --- a/apps/sim/lib/mcp/oauth/storage.test.ts +++ b/apps/sim/lib/mcp/oauth/storage.test.ts @@ -11,9 +11,10 @@ import { } from '@sim/testing' import { beforeEach, describe, expect, it, vi } from 'vitest' -const { mockAcquireLock, mockReleaseLock } = vi.hoisted(() => ({ +const { mockAcquireLock, mockReleaseLock, mockExtendLock } = vi.hoisted(() => ({ mockAcquireLock: vi.fn(), mockReleaseLock: vi.fn(), + mockExtendLock: vi.fn(), })) vi.mock('@sim/db', () => dbChainMock) @@ -22,6 +23,7 @@ vi.mock('@/lib/core/security/encryption', () => encryptionMock) vi.mock('@/lib/core/config/redis', () => ({ acquireLock: mockAcquireLock, releaseLock: mockReleaseLock, + extendLock: mockExtendLock, })) import { @@ -112,7 +114,9 @@ describe('withMcpOauthRefreshLock', () => { vi.clearAllMocks() mockAcquireLock.mockReset() mockReleaseLock.mockReset() + mockExtendLock.mockReset() mockReleaseLock.mockResolvedValue(true) + mockExtendLock.mockResolvedValue(true) }) it('serializes concurrent in-process callers, each running its own fn()', async () => { @@ -197,4 +201,37 @@ describe('withMcpOauthRefreshLock', () => { expect(keys).toContain('mcp:oauth:refresh:row-a') expect(keys).toContain('mcp:oauth:refresh:row-b') }) + + it('extends the lock TTL while fn() is running so long refreshes do not lose the lock', async () => { + vi.useFakeTimers() + try { + mockAcquireLock.mockResolvedValue(true) + let resolveFn: (v: string) => void + const fn = vi.fn( + () => + new Promise((resolve) => { + resolveFn = resolve + }) + ) + + const pending = withMcpOauthRefreshLock('row-watchdog', fn) + + // Advance time past two extend intervals (5s + 5s = 10s). + await vi.advanceTimersByTimeAsync(11_000) + expect(mockExtendLock.mock.calls.length).toBeGreaterThanOrEqual(2) + for (const call of mockExtendLock.mock.calls) { + expect(call[0]).toBe('mcp:oauth:refresh:row-watchdog') + } + + resolveFn!('done') + await expect(pending).resolves.toBe('done') + + // Watchdog must stop once fn() settles — no more extend calls. + const extendCallsAtFinish = mockExtendLock.mock.calls.length + await vi.advanceTimersByTimeAsync(20_000) + expect(mockExtendLock.mock.calls.length).toBe(extendCallsAtFinish) + } finally { + vi.useRealTimers() + } + }) }) diff --git a/apps/sim/lib/mcp/oauth/storage.ts b/apps/sim/lib/mcp/oauth/storage.ts index d86164f5258..acd10f7d414 100644 --- a/apps/sim/lib/mcp/oauth/storage.ts +++ b/apps/sim/lib/mcp/oauth/storage.ts @@ -10,7 +10,7 @@ import { toError } from '@sim/utils/errors' import { sleep } from '@sim/utils/helpers' import { generateId, generateShortId } from '@sim/utils/id' import { and, eq, gt } from 'drizzle-orm' -import { acquireLock, releaseLock } from '@/lib/core/config/redis' +import { acquireLock, extendLock, releaseLock } from '@/lib/core/config/redis' import { decryptSecret, encryptSecret } from '@/lib/core/security/encryption' const logger = createLogger('McpOauthStorage') @@ -238,13 +238,16 @@ export async function clearState(rowId: string): Promise { * `McpClient` instances that can't be shared, unlike a scalar access token): * 1) In-process: per-row Promise chain. Concurrent callers queue; each * runs `fn()` after the previous settles. - * 2) Cross-process: Redis mutex (`acquireLock` / `releaseLock`). Followers - * poll until the holder releases, then acquire and run `fn()`. + * 2) Cross-process: Redis mutex (`acquireLock` / `releaseLock`) with a TTL + * watchdog that periodically extends the lock while `fn()` runs, so + * long-running refreshes don't drop the lock and let another process + * race onto the same refresh. * * Falls open if Redis is unavailable — `acquireLock` no-ops, but in-process * serialization still holds within a single Node process. */ -const REFRESH_LOCK_TTL_SEC = 30 +const REFRESH_LOCK_TTL_SEC = 15 +const REFRESH_LOCK_EXTEND_INTERVAL_MS = 5_000 const REFRESH_POLL_INTERVAL_MS = 100 const REFRESH_MAX_WAIT_MS = 30_000 @@ -283,9 +286,18 @@ async function runWithRedisMutex( } if (acquired) { + const watchdog = setInterval(() => { + extendLock(lockKey, ownerToken, REFRESH_LOCK_TTL_SEC).catch((error) => { + logger.warn('Refresh lock extend failed', { + rowId, + error: toError(error).message, + }) + }) + }, REFRESH_LOCK_EXTEND_INTERVAL_MS) try { return await fn() } finally { + clearInterval(watchdog) await releaseLock(lockKey, ownerToken).catch((error) => { logger.warn('Refresh lock release failed (will expire via TTL)', { rowId, diff --git a/apps/sim/lib/mcp/pinned-fetch.test.ts b/apps/sim/lib/mcp/pinned-fetch.test.ts index 229845aa36f..8a4c27be0df 100644 --- a/apps/sim/lib/mcp/pinned-fetch.test.ts +++ b/apps/sim/lib/mcp/pinned-fetch.test.ts @@ -3,22 +3,27 @@ */ import { beforeEach, describe, expect, it, vi } from 'vitest' -const { mockAgent, mockCreatePinnedLookup, mockUndiciFetch, capturedAgentOptions } = vi.hoisted( - () => { +const { mockAgent, mockCreatePinnedLookup, mockUndiciFetch, capturedAgentOptions, agentCloses } = + vi.hoisted(() => { const capturedAgentOptions: unknown[] = [] + const agentCloses: unknown[] = [] class MockAgent { constructor(options: unknown) { capturedAgentOptions.push(options) } + close() { + agentCloses.push(this) + return Promise.resolve() + } } return { mockAgent: MockAgent, mockCreatePinnedLookup: vi.fn(), mockUndiciFetch: vi.fn(), capturedAgentOptions, + agentCloses, } - } -) + }) vi.mock('undici', () => ({ Agent: mockAgent, fetch: mockUndiciFetch })) vi.mock('@/lib/core/security/input-validation.server', () => ({ @@ -31,6 +36,7 @@ describe('createMcpPinnedFetch', () => { beforeEach(() => { vi.clearAllMocks() capturedAgentOptions.length = 0 + agentCloses.length = 0 __resetPinnedAgentsForTests() mockCreatePinnedLookup.mockReturnValue('pinned-lookup-fn') mockUndiciFetch.mockResolvedValue(new Response('ok')) @@ -105,4 +111,17 @@ describe('createMcpPinnedFetch', () => { const d2 = (mockUndiciFetch.mock.calls[1][1] as { dispatcher: unknown }).dispatcher expect(d1).not.toBe(d2) }) + + it('does not close evicted agents — captured closures keep working', async () => { + // Build an early closure whose agent will get evicted by later IPs. + const earlyClient = createMcpPinnedFetch('10.0.0.1') + // Fill the cache past its 64-entry limit so the early entry is evicted. + for (let i = 0; i < 64; i++) createMcpPinnedFetch(`10.1.${Math.floor(i / 256)}.${i % 256}`) + + // Eviction must NOT have closed any agents. + expect(agentCloses).toHaveLength(0) + // The early closure's captured dispatcher is still callable. + await earlyClient('https://example.com/still-works') + expect(mockUndiciFetch).toHaveBeenCalledTimes(1) + }) }) diff --git a/apps/sim/lib/mcp/pinned-fetch.ts b/apps/sim/lib/mcp/pinned-fetch.ts index 40254e8d6dd..236518d13ec 100644 --- a/apps/sim/lib/mcp/pinned-fetch.ts +++ b/apps/sim/lib/mcp/pinned-fetch.ts @@ -23,12 +23,13 @@ function getPinnedAgent(resolvedIP: string): Agent { return existing } if (pinnedAgents.size >= MAX_POOLED_AGENTS) { + // Drop the oldest entry WITHOUT closing it — existing `createMcpPinnedFetch` + // closures may still hold a reference and have in-flight requests. The + // dispatcher is GC'd (and its sockets cleaned up) when the last closure + // releases it; undici closes idle keep-alive connections after its own + // timeout (default 4s). const oldestKey = pinnedAgents.keys().next().value - if (oldestKey !== undefined) { - const oldest = pinnedAgents.get(oldestKey) - pinnedAgents.delete(oldestKey) - oldest?.close().catch(() => {}) - } + if (oldestKey !== undefined) pinnedAgents.delete(oldestKey) } const agent = new Agent({ connect: { lookup: createPinnedLookup(resolvedIP) } }) pinnedAgents.set(resolvedIP, agent) @@ -36,11 +37,6 @@ function getPinnedAgent(resolvedIP: string): Agent { } export function __resetPinnedAgentsForTests(): void { - for (const agent of pinnedAgents.values()) { - try { - void agent.close?.() - } catch {} - } pinnedAgents.clear() } From 4c05f6307f4b8c09f7c2bd761a59cb581f0326e7 Mon Sep 17 00:00:00 2001 From: Waleed Latif Date: Fri, 22 May 2026 10:10:17 -0700 Subject: [PATCH 8/9] fix(mcp): throw instead of falling open when refresh lock wait exceeds deadline MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When the Redis refresh lock can't be acquired within REFRESH_MAX_WAIT_MS the previous code ran fn() uncoordinated — but another process can still be holding the lock (watchdog-extended) and refreshing the same OAuth row, recreating the exact race the lock prevents. Throw on deadline. The caller can retry; the Redis-down branch remains the only path that runs fn() uncoordinated (no coordination is possible there). --- apps/sim/lib/mcp/oauth/storage.test.ts | 19 +++++++++++++++++++ apps/sim/lib/mcp/oauth/storage.ts | 10 ++++++++-- 2 files changed, 27 insertions(+), 2 deletions(-) diff --git a/apps/sim/lib/mcp/oauth/storage.test.ts b/apps/sim/lib/mcp/oauth/storage.test.ts index afd5fc28e3c..61455b36135 100644 --- a/apps/sim/lib/mcp/oauth/storage.test.ts +++ b/apps/sim/lib/mcp/oauth/storage.test.ts @@ -202,6 +202,25 @@ describe('withMcpOauthRefreshLock', () => { expect(keys).toContain('mcp:oauth:refresh:row-b') }) + it('throws when the lock is held longer than the max wait (does not race)', async () => { + vi.useFakeTimers() + try { + // Acquire always fails — another process holds the lock with watchdog extension. + mockAcquireLock.mockResolvedValue(false) + const fn = vi.fn(async () => 'should-not-run') + + const pending = withMcpOauthRefreshLock('row-deadline', fn) + // Attach the rejection expectation before draining so Vitest doesn't see + // an unhandled rejection while timers advance. + const assertion = expect(pending).rejects.toThrow(/held longer than/) + await vi.advanceTimersByTimeAsync(31_000) + await assertion + expect(fn).not.toHaveBeenCalled() + } finally { + vi.useRealTimers() + } + }) + it('extends the lock TTL while fn() is running so long refreshes do not lose the lock', async () => { vi.useFakeTimers() try { diff --git a/apps/sim/lib/mcp/oauth/storage.ts b/apps/sim/lib/mcp/oauth/storage.ts index acd10f7d414..aca0fbf5ec6 100644 --- a/apps/sim/lib/mcp/oauth/storage.ts +++ b/apps/sim/lib/mcp/oauth/storage.ts @@ -308,8 +308,14 @@ async function runWithRedisMutex( } if (Date.now() >= deadline) { - logger.warn('Refresh lock wait timed out, running uncoordinated', { rowId }) - return fn() + // Lock still held by another process AND its watchdog is keeping it + // alive — falling open would let us refresh concurrently and race the + // rotating refresh token. Throw and let the caller decide whether to + // retry; the Redis-down path remains the only branch that runs `fn()` + // uncoordinated (no coordination available there). + throw new Error( + `MCP OAuth refresh lock for ${rowId} held longer than ${REFRESH_MAX_WAIT_MS}ms` + ) } await sleep(REFRESH_POLL_INTERVAL_MS) } From 12144cfa496d1d0c994d8f0dca76fd57b5346598 Mon Sep 17 00:00:00 2001 From: Waleed Latif Date: Fri, 22 May 2026 10:18:52 -0700 Subject: [PATCH 9/9] docs(mcp): restore TSDoc that documented intent on exported types/methods MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Earlier comment-trim pass went too far on a few exports — restored the TSDoc that explained non-obvious "why" decisions: - SimMcpOauthProviderInit.preregistered: when set, the SDK skips DCR. - McpServerConfig.userId: required for OAuth; selects which user's stored tokens to use. - McpOauthAuthorizationRequiredError: benign pending state vs failure. - McpToolsChangedCallback / McpClientOptions: notification semantics, DNS-rebinding pinning rationale, OAuth provider contract. - StoredMcpToolReference / StoredMcpTool: minimal vs extended use. - McpClient.connect: documents listChanged handler registration. - McpService.executeTool: documents session-error retry behavior. Pure-restatement comments ("Disconnect from MCP server") stay trimmed. --- apps/sim/lib/mcp/client.ts | 5 ++++ apps/sim/lib/mcp/oauth/provider.ts | 5 +++- apps/sim/lib/mcp/service.ts | 4 +++ apps/sim/lib/mcp/types.ts | 42 ++++++++++++++++++++++++++---- 4 files changed, 50 insertions(+), 6 deletions(-) diff --git a/apps/sim/lib/mcp/client.ts b/apps/sim/lib/mcp/client.ts index 7fcf7351a12..ca2b26724fa 100644 --- a/apps/sim/lib/mcp/client.ts +++ b/apps/sim/lib/mcp/client.ts @@ -80,6 +80,11 @@ export class McpClient { ) } + /** + * Initialize connection to MCP server. + * If an `onToolsChanged` callback was provided, registers a notification handler + * for `notifications/tools/list_changed` after connecting. + */ async connect(): Promise { logger.info(`Connecting to MCP server: ${this.config.name} (${this.config.transport})`) diff --git a/apps/sim/lib/mcp/oauth/provider.ts b/apps/sim/lib/mcp/oauth/provider.ts index 86a2c72de42..9b130e06453 100644 --- a/apps/sim/lib/mcp/oauth/provider.ts +++ b/apps/sim/lib/mcp/oauth/provider.ts @@ -41,7 +41,10 @@ export interface PreregisteredClient { interface SimMcpOauthProviderInit { row: McpOauthRow scope?: string - /** When set, the SDK skips Dynamic Client Registration and uses these credentials directly. */ + /** + * Optional user-supplied client credentials. When provided, the SDK skips + * Dynamic Client Registration and uses these for the auth/token exchange. + */ preregistered?: PreregisteredClient } diff --git a/apps/sim/lib/mcp/service.ts b/apps/sim/lib/mcp/service.ts index adbef7b0ae2..4ef53382483 100644 --- a/apps/sim/lib/mcp/service.ts +++ b/apps/sim/lib/mcp/service.ts @@ -242,6 +242,10 @@ class McpService { }) } + /** + * Execute a tool on a specific server with retry logic for session errors. + * Retries once on session-related errors (400, 404, session ID issues). + */ async executeTool( userId: string, serverId: string, diff --git a/apps/sim/lib/mcp/types.ts b/apps/sim/lib/mcp/types.ts index 04ebad6a4be..be506fd5b07 100644 --- a/apps/sim/lib/mcp/types.ts +++ b/apps/sim/lib/mcp/types.ts @@ -17,7 +17,10 @@ export interface McpServerConfig { transport: McpTransport url?: string authType?: McpAuthType - /** Required for `authType === 'oauth'` — selects whose stored tokens to use. */ + /** + * Required when `authType === 'oauth'` — identifies whose stored tokens + * to use when establishing the connection. Omit for header / none auth. + */ userId?: string workspaceId?: string headers?: Record @@ -129,7 +132,12 @@ export class McpConnectionError extends McpError { } } -/** Benign "needs re-auth" state — distinct from a connection failure. */ +/** + * Thrown when an OAuth-protected MCP server is reachable but the current + * user has not yet authorized Sim. This is a benign "pending" state, not a + * connection failure — callers should surface a re-auth prompt rather than + * marking the server as errored. + */ export class McpOauthAuthorizationRequiredError extends McpError { constructor( public readonly serverId: string, @@ -153,15 +161,32 @@ export interface McpServerSummary { error?: string } +/** + * Callback invoked when an MCP server sends a `notifications/tools/list_changed` notification. + */ export type McpToolsChangedCallback = (serverId: string) => void +/** + * Options for creating an McpClient with notification support. + */ export interface McpClientOptions { config: McpServerConfig securityPolicy?: McpSecurityPolicy onToolsChanged?: McpToolsChangedCallback - /** Pre-resolved IP pinned via undici to prevent DNS-rebinding between URL validation and connection. */ + /** + * Pre-resolved IP address to pin all transport HTTP connections to. When + * set, the SDK transport uses a custom fetch backed by an undici Agent with + * a fixed DNS lookup, preventing DNS-rebinding (TOCTOU) attacks between + * URL validation and connection. Should be supplied by callers that have + * just validated the URL via `validateMcpServerSsrf`. + */ resolvedIP?: string - /** SDK provider for OAuth token discovery, refresh, and 401 recovery. Required for `authType === 'oauth'`. */ + /** + * SDK-compatible OAuth client provider. When provided, the underlying + * StreamableHTTPClientTransport delegates token discovery, refresh, and + * 401 recovery to it. Should be supplied for `authType === 'oauth'` + * server configs. + */ authProvider?: import('@modelcontextprotocol/sdk/client/auth.js').OAuthClientProvider } @@ -200,7 +225,10 @@ export interface McpToolDiscoveryResponse { byServer: Record } -/** Minimal MCP tool reference stored in workflow blocks for schema validation. */ +/** + * MCP tool reference stored in workflow blocks (for validation). + * Minimal version used for comparing against discovered tools. + */ export interface StoredMcpToolReference { serverId: string serverUrl?: string @@ -208,6 +236,10 @@ export interface StoredMcpToolReference { schema?: McpToolSchema } +/** + * Full stored MCP tool with workflow context (for API responses). + * Extended version that includes which workflow the tool is used in. + */ export interface StoredMcpTool extends StoredMcpToolReference { workflowId: string workflowName: string