diff --git a/apps/docs/content/docs/en/execution/costs.mdx b/apps/docs/content/docs/en/execution/costs.mdx index 5d88091b13..21f2302df8 100644 --- a/apps/docs/content/docs/en/execution/costs.mdx +++ b/apps/docs/content/docs/en/execution/costs.mdx @@ -213,6 +213,25 @@ Different subscription plans have different usage limits: | **Team** | $40/seat (pooled, adjustable) | 300 sync, 2,500 async | | **Enterprise** | Custom | Custom | +## Execution Time Limits + +Workflows have maximum execution time limits based on your subscription plan: + +| Plan | Sync Execution | Async Execution | +|------|----------------|-----------------| +| **Free** | 5 minutes | 10 minutes | +| **Pro** | 60 minutes | 90 minutes | +| **Team** | 60 minutes | 90 minutes | +| **Enterprise** | 60 minutes | 90 minutes | + +**Sync executions** run immediately and return results directly. These are triggered via the API with `async: false` (default) or through the UI. +**Async executions** (triggered via API with `async: true`, webhooks, or schedules) run in the background. Async time limits are up to 2x the sync limit, capped at 90 minutes. + + + + If a workflow exceeds its time limit, it will be terminated and marked as failed with a timeout error. Design long-running workflows to use async execution or break them into smaller workflows. + + ## Billing Model Sim uses a **base subscription + overage** billing model: diff --git a/apps/sim/app/(landing)/components/landing-pricing/landing-pricing.tsx b/apps/sim/app/(landing)/components/landing-pricing/landing-pricing.tsx index e6aa6d6f84..e931667222 100644 --- a/apps/sim/app/(landing)/components/landing-pricing/landing-pricing.tsx +++ b/apps/sim/app/(landing)/components/landing-pricing/landing-pricing.tsx @@ -11,7 +11,7 @@ import { Database, DollarSign, HardDrive, - Workflow, + Timer, } from 'lucide-react' import { useRouter } from 'next/navigation' import { cn } from '@/lib/core/utils/cn' @@ -44,7 +44,7 @@ interface PricingTier { const FREE_PLAN_FEATURES: PricingFeature[] = [ { icon: DollarSign, text: '$20 usage limit' }, { icon: HardDrive, text: '5GB file storage' }, - { icon: Workflow, text: 'Public template access' }, + { icon: Timer, text: '5 min execution limit' }, { icon: Database, text: 'Limited log retention' }, { icon: Code2, text: 'CLI/SDK Access' }, ] diff --git a/apps/sim/app/api/cron/cleanup-stale-executions/route.ts b/apps/sim/app/api/cron/cleanup-stale-executions/route.ts index 2b0859143f..a7bb1aa350 100644 --- a/apps/sim/app/api/cron/cleanup-stale-executions/route.ts +++ b/apps/sim/app/api/cron/cleanup-stale-executions/route.ts @@ -4,10 +4,12 @@ import { createLogger } from '@sim/logger' import { and, eq, lt, sql } from 'drizzle-orm' import { type NextRequest, NextResponse } from 'next/server' import { verifyCronAuth } from '@/lib/auth/internal' +import { getMaxExecutionTimeout } from '@/lib/core/execution-limits' const logger = createLogger('CleanupStaleExecutions') -const STALE_THRESHOLD_MINUTES = 30 +const STALE_THRESHOLD_MS = getMaxExecutionTimeout() + 5 * 60 * 1000 +const STALE_THRESHOLD_MINUTES = Math.ceil(STALE_THRESHOLD_MS / 60000) const MAX_INT32 = 2_147_483_647 export async function GET(request: NextRequest) { diff --git a/apps/sim/app/api/mcp/serve/[serverId]/route.ts b/apps/sim/app/api/mcp/serve/[serverId]/route.ts index 2359d90199..9d9f917bd6 100644 --- a/apps/sim/app/api/mcp/serve/[serverId]/route.ts +++ b/apps/sim/app/api/mcp/serve/[serverId]/route.ts @@ -21,6 +21,7 @@ import { and, eq } from 'drizzle-orm' import { type NextRequest, NextResponse } from 'next/server' import { checkHybridAuth } from '@/lib/auth/hybrid' import { generateInternalToken } from '@/lib/auth/internal' +import { getMaxExecutionTimeout } from '@/lib/core/execution-limits' import { getBaseUrl } from '@/lib/core/utils/urls' const logger = createLogger('WorkflowMcpServeAPI') @@ -264,7 +265,7 @@ async function handleToolsCall( method: 'POST', headers, body: JSON.stringify({ input: params.arguments || {}, triggerType: 'mcp' }), - signal: AbortSignal.timeout(600000), // 10 minute timeout + signal: AbortSignal.timeout(getMaxExecutionTimeout()), }) const executeResult = await response.json() diff --git a/apps/sim/app/api/mcp/tools/execute/route.ts b/apps/sim/app/api/mcp/tools/execute/route.ts index fe0736ba14..4229cebbfd 100644 --- a/apps/sim/app/api/mcp/tools/execute/route.ts +++ b/apps/sim/app/api/mcp/tools/execute/route.ts @@ -1,5 +1,8 @@ import { createLogger } from '@sim/logger' import type { NextRequest } from 'next/server' +import { getHighestPrioritySubscription } from '@/lib/billing/core/plan' +import { getExecutionTimeout } from '@/lib/core/execution-limits' +import type { SubscriptionPlan } from '@/lib/core/rate-limiter/types' import { getParsedBody, withMcpAuth } from '@/lib/mcp/middleware' import { mcpService } from '@/lib/mcp/service' import type { McpTool, McpToolCall, McpToolResult } from '@/lib/mcp/types' @@ -7,7 +10,6 @@ import { categorizeError, createMcpErrorResponse, createMcpSuccessResponse, - MCP_CONSTANTS, validateStringParam, } from '@/lib/mcp/utils' @@ -171,13 +173,16 @@ export const POST = withMcpAuth('read')( arguments: args, } + const userSubscription = await getHighestPrioritySubscription(userId) + const executionTimeout = getExecutionTimeout( + userSubscription?.plan as SubscriptionPlan | undefined, + 'sync' + ) + const result = await Promise.race([ mcpService.executeTool(userId, serverId, toolCall, workspaceId), new Promise((_, reject) => - setTimeout( - () => reject(new Error('Tool execution timeout')), - MCP_CONSTANTS.EXECUTION_TIMEOUT - ) + setTimeout(() => reject(new Error('Tool execution timeout')), executionTimeout) ), ]) diff --git a/apps/sim/app/api/tools/dropbox/upload/route.ts b/apps/sim/app/api/tools/dropbox/upload/route.ts index 629a1dfbd8..bdf06a5c63 100644 --- a/apps/sim/app/api/tools/dropbox/upload/route.ts +++ b/apps/sim/app/api/tools/dropbox/upload/route.ts @@ -3,6 +3,7 @@ import { type NextRequest, NextResponse } from 'next/server' import { z } from 'zod' import { checkInternalAuth } from '@/lib/auth/hybrid' import { generateRequestId } from '@/lib/core/utils/request' +import { httpHeaderSafeJson } from '@/lib/core/utils/validation' import { FileInputSchema } from '@/lib/uploads/utils/file-schemas' import { processFilesToUserFiles, type RawFileInput } from '@/lib/uploads/utils/file-utils' import { downloadFileFromStorage } from '@/lib/uploads/utils/file-utils.server' @@ -11,16 +12,6 @@ export const dynamic = 'force-dynamic' const logger = createLogger('DropboxUploadAPI') -/** - * Escapes non-ASCII characters in JSON string for HTTP header safety. - * Dropbox API requires characters 0x7F and all non-ASCII to be escaped as \uXXXX. - */ -function httpHeaderSafeJson(value: object): string { - return JSON.stringify(value).replace(/[\u007f-\uffff]/g, (c) => { - return '\\u' + ('0000' + c.charCodeAt(0).toString(16)).slice(-4) - }) -} - const DropboxUploadSchema = z.object({ accessToken: z.string().min(1, 'Access token is required'), path: z.string().min(1, 'Destination path is required'), diff --git a/apps/sim/app/api/tools/stt/route.ts b/apps/sim/app/api/tools/stt/route.ts index 1317d8453d..bb5de0310d 100644 --- a/apps/sim/app/api/tools/stt/route.ts +++ b/apps/sim/app/api/tools/stt/route.ts @@ -2,6 +2,7 @@ import { createLogger } from '@sim/logger' import { type NextRequest, NextResponse } from 'next/server' import { extractAudioFromVideo, isVideoFile } from '@/lib/audio/extractor' import { checkInternalAuth } from '@/lib/auth/hybrid' +import { DEFAULT_EXECUTION_TIMEOUT_MS } from '@/lib/core/execution-limits' import { secureFetchWithPinnedIP, validateUrlWithDNS, @@ -636,7 +637,8 @@ async function transcribeWithAssemblyAI( let transcript: any let attempts = 0 - const maxAttempts = 60 // 5 minutes with 5-second intervals + const pollIntervalMs = 5000 + const maxAttempts = Math.ceil(DEFAULT_EXECUTION_TIMEOUT_MS / pollIntervalMs) while (attempts < maxAttempts) { const statusResponse = await fetch(`https://api.assemblyai.com/v2/transcript/${id}`, { diff --git a/apps/sim/app/api/tools/textract/parse/route.ts b/apps/sim/app/api/tools/textract/parse/route.ts index ad19aeb955..ca1e4a540c 100644 --- a/apps/sim/app/api/tools/textract/parse/route.ts +++ b/apps/sim/app/api/tools/textract/parse/route.ts @@ -3,6 +3,7 @@ import { createLogger } from '@sim/logger' import { type NextRequest, NextResponse } from 'next/server' import { z } from 'zod' import { checkInternalAuth } from '@/lib/auth/hybrid' +import { DEFAULT_EXECUTION_TIMEOUT_MS } from '@/lib/core/execution-limits' import { validateAwsRegion, validateS3BucketName } from '@/lib/core/security/input-validation' import { secureFetchWithPinnedIP, @@ -226,8 +227,8 @@ async function pollForJobCompletion( useAnalyzeDocument: boolean, requestId: string ): Promise> { - const pollIntervalMs = 5000 // 5 seconds between polls - const maxPollTimeMs = 180000 // 3 minutes maximum polling time + const pollIntervalMs = 5000 + const maxPollTimeMs = DEFAULT_EXECUTION_TIMEOUT_MS const maxAttempts = Math.ceil(maxPollTimeMs / pollIntervalMs) const getTarget = useAnalyzeDocument diff --git a/apps/sim/app/api/tools/tts/route.ts b/apps/sim/app/api/tools/tts/route.ts index bc7bbe7387..f8e1065540 100644 --- a/apps/sim/app/api/tools/tts/route.ts +++ b/apps/sim/app/api/tools/tts/route.ts @@ -2,6 +2,7 @@ import { createLogger } from '@sim/logger' import type { NextRequest } from 'next/server' import { NextResponse } from 'next/server' import { checkInternalAuth } from '@/lib/auth/hybrid' +import { DEFAULT_EXECUTION_TIMEOUT_MS } from '@/lib/core/execution-limits' import { validateAlphanumericId } from '@/lib/core/security/input-validation' import { getBaseUrl } from '@/lib/core/utils/urls' import { StorageService } from '@/lib/uploads' @@ -60,7 +61,7 @@ export async function POST(request: NextRequest) { text, model_id: modelId, }), - signal: AbortSignal.timeout(60000), + signal: AbortSignal.timeout(DEFAULT_EXECUTION_TIMEOUT_MS), }) if (!response.ok) { diff --git a/apps/sim/app/api/tools/video/route.ts b/apps/sim/app/api/tools/video/route.ts index 375042e931..dd131cad31 100644 --- a/apps/sim/app/api/tools/video/route.ts +++ b/apps/sim/app/api/tools/video/route.ts @@ -1,6 +1,7 @@ import { createLogger } from '@sim/logger' import { type NextRequest, NextResponse } from 'next/server' import { checkInternalAuth } from '@/lib/auth/hybrid' +import { getMaxExecutionTimeout } from '@/lib/core/execution-limits' import { downloadFileFromStorage } from '@/lib/uploads/utils/file-utils.server' import type { UserFile } from '@/executor/types' import type { VideoRequestBody } from '@/tools/video/types' @@ -326,11 +327,12 @@ async function generateWithRunway( logger.info(`[${requestId}] Runway task created: ${taskId}`) - const maxAttempts = 120 // 10 minutes with 5-second intervals + const pollIntervalMs = 5000 + const maxAttempts = Math.ceil(getMaxExecutionTimeout() / pollIntervalMs) let attempts = 0 while (attempts < maxAttempts) { - await sleep(5000) // Poll every 5 seconds + await sleep(pollIntervalMs) const statusResponse = await fetch(`https://api.dev.runwayml.com/v1/tasks/${taskId}`, { headers: { @@ -370,7 +372,7 @@ async function generateWithRunway( attempts++ } - throw new Error('Runway generation timed out after 10 minutes') + throw new Error('Runway generation timed out') } async function generateWithVeo( @@ -429,11 +431,12 @@ async function generateWithVeo( logger.info(`[${requestId}] Veo operation created: ${operationName}`) - const maxAttempts = 60 // 5 minutes with 5-second intervals + const pollIntervalMs = 5000 + const maxAttempts = Math.ceil(getMaxExecutionTimeout() / pollIntervalMs) let attempts = 0 while (attempts < maxAttempts) { - await sleep(5000) + await sleep(pollIntervalMs) const statusResponse = await fetch( `https://generativelanguage.googleapis.com/v1beta/${operationName}`, @@ -485,7 +488,7 @@ async function generateWithVeo( attempts++ } - throw new Error('Veo generation timed out after 5 minutes') + throw new Error('Veo generation timed out') } async function generateWithLuma( @@ -541,11 +544,12 @@ async function generateWithLuma( logger.info(`[${requestId}] Luma generation created: ${generationId}`) - const maxAttempts = 120 // 10 minutes + const pollIntervalMs = 5000 + const maxAttempts = Math.ceil(getMaxExecutionTimeout() / pollIntervalMs) let attempts = 0 while (attempts < maxAttempts) { - await sleep(5000) + await sleep(pollIntervalMs) const statusResponse = await fetch( `https://api.lumalabs.ai/dream-machine/v1/generations/${generationId}`, @@ -592,7 +596,7 @@ async function generateWithLuma( attempts++ } - throw new Error('Luma generation timed out after 10 minutes') + throw new Error('Luma generation timed out') } async function generateWithMiniMax( @@ -658,14 +662,13 @@ async function generateWithMiniMax( logger.info(`[${requestId}] MiniMax task created: ${taskId}`) - // Poll for completion (6-10 minutes typical) - const maxAttempts = 120 // 10 minutes with 5-second intervals + const pollIntervalMs = 5000 + const maxAttempts = Math.ceil(getMaxExecutionTimeout() / pollIntervalMs) let attempts = 0 while (attempts < maxAttempts) { - await sleep(5000) + await sleep(pollIntervalMs) - // Query task status const statusResponse = await fetch( `https://api.minimax.io/v1/query/video_generation?task_id=${taskId}`, { @@ -743,7 +746,7 @@ async function generateWithMiniMax( attempts++ } - throw new Error('MiniMax generation timed out after 10 minutes') + throw new Error('MiniMax generation timed out') } // Helper function to strip subpaths from Fal.ai model IDs for status/result endpoints @@ -861,11 +864,12 @@ async function generateWithFalAI( // Get base model ID (without subpath) for status and result endpoints const baseModelId = getBaseModelId(falModelId) - const maxAttempts = 96 // 8 minutes with 5-second intervals + const pollIntervalMs = 5000 + const maxAttempts = Math.ceil(getMaxExecutionTimeout() / pollIntervalMs) let attempts = 0 while (attempts < maxAttempts) { - await sleep(5000) + await sleep(pollIntervalMs) const statusResponse = await fetch( `https://queue.fal.run/${baseModelId}/requests/${requestIdFal}/status`, @@ -938,7 +942,7 @@ async function generateWithFalAI( attempts++ } - throw new Error('Fal.ai generation timed out after 8 minutes') + throw new Error('Fal.ai generation timed out') } function getVideoDimensions( diff --git a/apps/sim/app/api/workflows/[id]/execute-from-block/route.ts b/apps/sim/app/api/workflows/[id]/execute-from-block/route.ts index dd59f33380..0d1dc5cb8c 100644 --- a/apps/sim/app/api/workflows/[id]/execute-from-block/route.ts +++ b/apps/sim/app/api/workflows/[id]/execute-from-block/route.ts @@ -3,11 +3,13 @@ import { type NextRequest, NextResponse } from 'next/server' import { v4 as uuidv4 } from 'uuid' import { z } from 'zod' import { checkHybridAuth } from '@/lib/auth/hybrid' +import { getTimeoutErrorMessage, isTimeoutError } from '@/lib/core/execution-limits' import { generateRequestId } from '@/lib/core/utils/request' import { SSE_HEADERS } from '@/lib/core/utils/sse' import { markExecutionCancelled } from '@/lib/execution/cancellation' import { preprocessExecution } from '@/lib/execution/preprocessing' import { LoggingSession } from '@/lib/logs/execution/logging-session' +import { buildTraceSpans } from '@/lib/logs/execution/trace-spans/trace-spans' import { executeWorkflowCore } from '@/lib/workflows/executor/execution-core' import { createSSECallbacks } from '@/lib/workflows/executor/execution-events' import { ExecutionSnapshot } from '@/executor/execution/snapshot' @@ -116,6 +118,16 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id: const loggingSession = new LoggingSession(workflowId, executionId, 'manual', requestId) const abortController = new AbortController() let isStreamClosed = false + let isTimedOut = false + + const syncTimeout = preprocessResult.executionTimeout?.sync + let timeoutId: NodeJS.Timeout | undefined + if (syncTimeout) { + timeoutId = setTimeout(() => { + isTimedOut = true + abortController.abort() + }, syncTimeout) + } const stream = new ReadableStream({ async start(controller) { @@ -167,13 +179,33 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id: }) if (result.status === 'cancelled') { - sendEvent({ - type: 'execution:cancelled', - timestamp: new Date().toISOString(), - executionId, - workflowId, - data: { duration: result.metadata?.duration || 0 }, - }) + if (isTimedOut && syncTimeout) { + const timeoutErrorMessage = getTimeoutErrorMessage(null, syncTimeout) + logger.info(`[${requestId}] Run-from-block execution timed out`, { + timeoutMs: syncTimeout, + }) + + await loggingSession.markAsFailed(timeoutErrorMessage) + + sendEvent({ + type: 'execution:error', + timestamp: new Date().toISOString(), + executionId, + workflowId, + data: { + error: timeoutErrorMessage, + duration: result.metadata?.duration || 0, + }, + }) + } else { + sendEvent({ + type: 'execution:cancelled', + timestamp: new Date().toISOString(), + executionId, + workflowId, + data: { duration: result.metadata?.duration || 0 }, + }) + } } else { sendEvent({ type: 'execution:completed', @@ -190,10 +222,27 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id: }) } } catch (error: unknown) { - const errorMessage = error instanceof Error ? error.message : 'Unknown error' - logger.error(`[${requestId}] Run-from-block execution failed: ${errorMessage}`) + const isTimeout = isTimeoutError(error) || isTimedOut + const errorMessage = isTimeout + ? getTimeoutErrorMessage(error, syncTimeout) + : error instanceof Error + ? error.message + : 'Unknown error' + + logger.error(`[${requestId}] Run-from-block execution failed: ${errorMessage}`, { + isTimeout, + }) const executionResult = hasExecutionResult(error) ? error.executionResult : undefined + const { traceSpans, totalDuration } = executionResult + ? buildTraceSpans(executionResult) + : { traceSpans: [], totalDuration: 0 } + + await loggingSession.safeCompleteWithError({ + totalDurationMs: totalDuration || executionResult?.metadata?.duration, + error: { message: errorMessage }, + traceSpans, + }) sendEvent({ type: 'execution:error', @@ -206,6 +255,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id: }, }) } finally { + if (timeoutId) clearTimeout(timeoutId) if (!isStreamClosed) { try { controller.enqueue(new TextEncoder().encode('data: [DONE]\n\n')) @@ -216,6 +266,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id: }, cancel() { isStreamClosed = true + if (timeoutId) clearTimeout(timeoutId) abortController.abort() markExecutionCancelled(executionId).catch(() => {}) }, diff --git a/apps/sim/app/api/workflows/[id]/execute/route.ts b/apps/sim/app/api/workflows/[id]/execute/route.ts index 2ef6b02709..05de03b090 100644 --- a/apps/sim/app/api/workflows/[id]/execute/route.ts +++ b/apps/sim/app/api/workflows/[id]/execute/route.ts @@ -5,6 +5,11 @@ import { validate as uuidValidate, v4 as uuidv4 } from 'uuid' import { z } from 'zod' import { checkHybridAuth } from '@/lib/auth/hybrid' import { isTriggerDevEnabled } from '@/lib/core/config/feature-flags' +import { + createTimeoutAbortController, + getTimeoutErrorMessage, + isTimeoutError, +} from '@/lib/core/execution-limits' import { generateRequestId } from '@/lib/core/utils/request' import { SSE_HEADERS } from '@/lib/core/utils/sse' import { getBaseUrl } from '@/lib/core/utils/urls' @@ -12,6 +17,7 @@ import { markExecutionCancelled } from '@/lib/execution/cancellation' import { processInputFileFields } from '@/lib/execution/files' import { preprocessExecution } from '@/lib/execution/preprocessing' import { LoggingSession } from '@/lib/logs/execution/logging-session' +import { buildTraceSpans } from '@/lib/logs/execution/trace-spans/trace-spans' import { cleanupExecutionBase64Cache, hydrateUserFilesWithBase64, @@ -120,10 +126,6 @@ type AsyncExecutionParams = { triggerType: CoreTriggerType } -/** - * Handles async workflow execution by queueing a background job. - * Returns immediately with a 202 Accepted response containing the job ID. - */ async function handleAsyncExecution(params: AsyncExecutionParams): Promise { const { requestId, workflowId, userId, input, triggerType } = params @@ -405,6 +407,10 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id: if (!enableSSE) { logger.info(`[${requestId}] Using non-SSE execution (direct JSON response)`) + const timeoutController = createTimeoutAbortController( + preprocessResult.executionTimeout?.sync + ) + try { const metadata: ExecutionMetadata = { requestId, @@ -438,8 +444,37 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id: includeFileBase64, base64MaxBytes, stopAfterBlockId, + abortSignal: timeoutController.signal, }) + if ( + result.status === 'cancelled' && + timeoutController.isTimedOut() && + timeoutController.timeoutMs + ) { + const timeoutErrorMessage = getTimeoutErrorMessage(null, timeoutController.timeoutMs) + logger.info(`[${requestId}] Non-SSE execution timed out`, { + timeoutMs: timeoutController.timeoutMs, + }) + await loggingSession.markAsFailed(timeoutErrorMessage) + + return NextResponse.json( + { + success: false, + output: result.output, + error: timeoutErrorMessage, + metadata: result.metadata + ? { + duration: result.metadata.duration, + startTime: result.metadata.startTime, + endTime: result.metadata.endTime, + } + : undefined, + }, + { status: 408 } + ) + } + const outputWithBase64 = includeFileBase64 ? ((await hydrateUserFilesWithBase64(result.output, { requestId, @@ -450,9 +485,6 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id: const resultWithBase64 = { ...result, output: outputWithBase64 } - // Cleanup base64 cache for this execution - await cleanupExecutionBase64Cache(executionId) - const hasResponseBlock = workflowHasResponseBlock(resultWithBase64) if (hasResponseBlock) { return createHttpResponseFromBlock(resultWithBase64) @@ -474,10 +506,17 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id: return NextResponse.json(filteredResult) } catch (error: unknown) { const errorMessage = error instanceof Error ? error.message : 'Unknown error' + logger.error(`[${requestId}] Non-SSE execution failed: ${errorMessage}`) const executionResult = hasExecutionResult(error) ? error.executionResult : undefined + await loggingSession.safeCompleteWithError({ + totalDurationMs: executionResult?.metadata?.duration, + error: { message: errorMessage }, + traceSpans: executionResult?.logs as any, + }) + return NextResponse.json( { success: false, @@ -493,6 +532,15 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id: }, { status: 500 } ) + } finally { + timeoutController.cleanup() + if (executionId) { + try { + await cleanupExecutionBase64Cache(executionId) + } catch (error) { + logger.error(`[${requestId}] Failed to cleanup base64 cache`, { error }) + } + } } } @@ -506,7 +554,6 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id: cachedWorkflowData?.blocks || {} ) const streamVariables = cachedWorkflowData?.variables ?? (workflow as any).variables - const stream = await createStreamingResponse({ requestId, workflow: { @@ -524,6 +571,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id: workflowTriggerType: triggerType === 'chat' ? 'chat' : 'api', includeFileBase64, base64MaxBytes, + timeoutMs: preprocessResult.executionTimeout?.sync, }, executionId, }) @@ -535,7 +583,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id: } const encoder = new TextEncoder() - const abortController = new AbortController() + const timeoutController = createTimeoutAbortController(preprocessResult.executionTimeout?.sync) let isStreamClosed = false const stream = new ReadableStream({ @@ -731,7 +779,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id: onStream, }, loggingSession, - abortSignal: abortController.signal, + abortSignal: timeoutController.signal, includeFileBase64, base64MaxBytes, stopAfterBlockId, @@ -767,16 +815,37 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id: } if (result.status === 'cancelled') { - logger.info(`[${requestId}] Workflow execution was cancelled`) - sendEvent({ - type: 'execution:cancelled', - timestamp: new Date().toISOString(), - executionId, - workflowId, - data: { - duration: result.metadata?.duration || 0, - }, - }) + if (timeoutController.isTimedOut() && timeoutController.timeoutMs) { + const timeoutErrorMessage = getTimeoutErrorMessage(null, timeoutController.timeoutMs) + logger.info(`[${requestId}] Workflow execution timed out`, { + timeoutMs: timeoutController.timeoutMs, + }) + + await loggingSession.markAsFailed(timeoutErrorMessage) + + sendEvent({ + type: 'execution:error', + timestamp: new Date().toISOString(), + executionId, + workflowId, + data: { + error: timeoutErrorMessage, + duration: result.metadata?.duration || 0, + }, + }) + } else { + logger.info(`[${requestId}] Workflow execution was cancelled`) + + sendEvent({ + type: 'execution:cancelled', + timestamp: new Date().toISOString(), + executionId, + workflowId, + data: { + duration: result.metadata?.duration || 0, + }, + }) + } return } @@ -799,14 +868,26 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id: endTime: result.metadata?.endTime || new Date().toISOString(), }, }) - - // Cleanup base64 cache for this execution - await cleanupExecutionBase64Cache(executionId) } catch (error: unknown) { - const errorMessage = error instanceof Error ? error.message : 'Unknown error' - logger.error(`[${requestId}] SSE execution failed: ${errorMessage}`) + const isTimeout = isTimeoutError(error) || timeoutController.isTimedOut() + const errorMessage = isTimeout + ? getTimeoutErrorMessage(error, timeoutController.timeoutMs) + : error instanceof Error + ? error.message + : 'Unknown error' + + logger.error(`[${requestId}] SSE execution failed: ${errorMessage}`, { isTimeout }) const executionResult = hasExecutionResult(error) ? error.executionResult : undefined + const { traceSpans, totalDuration } = executionResult + ? buildTraceSpans(executionResult) + : { traceSpans: [], totalDuration: 0 } + + await loggingSession.safeCompleteWithError({ + totalDurationMs: totalDuration || executionResult?.metadata?.duration, + error: { message: errorMessage }, + traceSpans, + }) sendEvent({ type: 'execution:error', @@ -819,20 +900,23 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id: }, }) } finally { + timeoutController.cleanup() + if (executionId) { + await cleanupExecutionBase64Cache(executionId) + } if (!isStreamClosed) { try { controller.enqueue(encoder.encode('data: [DONE]\n\n')) controller.close() - } catch { - // Stream already closed - nothing to do - } + } catch {} } } }, cancel() { isStreamClosed = true + timeoutController.cleanup() logger.info(`[${requestId}] Client aborted SSE stream, signalling cancellation`) - abortController.abort() + timeoutController.abort() markExecutionCancelled(executionId).catch(() => {}) }, }) diff --git a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/terminal/utils.ts b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/terminal/utils.ts index d54ccbfdf2..861708926f 100644 --- a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/terminal/utils.ts +++ b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/terminal/utils.ts @@ -1,5 +1,5 @@ import type React from 'react' -import { RepeatIcon, SplitIcon } from 'lucide-react' +import { AlertTriangleIcon, BanIcon, RepeatIcon, SplitIcon, XCircleIcon } from 'lucide-react' import { getBlock } from '@/blocks' import { TERMINAL_BLOCK_COLUMN_WIDTH } from '@/stores/constants' import type { ConsoleEntry } from '@/stores/terminal' @@ -12,6 +12,15 @@ const SUBFLOW_COLORS = { parallel: '#FEE12B', } as const +/** + * Special block type colors for errors and system messages + */ +const SPECIAL_BLOCK_COLORS = { + error: '#ef4444', + validation: '#f59e0b', + cancelled: '#6b7280', +} as const + /** * Retrieves the icon component for a given block type */ @@ -32,6 +41,18 @@ export function getBlockIcon( return SplitIcon } + if (blockType === 'error') { + return XCircleIcon + } + + if (blockType === 'validation') { + return AlertTriangleIcon + } + + if (blockType === 'cancelled') { + return BanIcon + } + return null } @@ -50,6 +71,16 @@ export function getBlockColor(blockType: string): string { if (blockType === 'parallel') { return SUBFLOW_COLORS.parallel } + // Special block types for errors and system messages + if (blockType === 'error') { + return SPECIAL_BLOCK_COLORS.error + } + if (blockType === 'validation') { + return SPECIAL_BLOCK_COLORS.validation + } + if (blockType === 'cancelled') { + return SPECIAL_BLOCK_COLORS.cancelled + } return '#6b7280' } diff --git a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/hooks/use-workflow-execution.ts b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/hooks/use-workflow-execution.ts index a1da75f364..1b514dccd8 100644 --- a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/hooks/use-workflow-execution.ts +++ b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/hooks/use-workflow-execution.ts @@ -4,6 +4,11 @@ import { useQueryClient } from '@tanstack/react-query' import { v4 as uuidv4 } from 'uuid' import { buildTraceSpans } from '@/lib/logs/execution/trace-spans/trace-spans' import { processStreamingBlockLogs } from '@/lib/tokenization' +import type { + BlockCompletedData, + BlockErrorData, + BlockStartedData, +} from '@/lib/workflows/executor/execution-events' import { extractTriggerMockPayload, selectBestTrigger, @@ -17,7 +22,13 @@ import { import { useCurrentWorkflow } from '@/app/workspace/[workspaceId]/w/[workflowId]/hooks/use-current-workflow' import { getBlock } from '@/blocks' import type { SerializableExecutionState } from '@/executor/execution/types' -import type { BlockLog, BlockState, ExecutionResult, StreamingExecution } from '@/executor/types' +import type { + BlockLog, + BlockState, + ExecutionResult, + NormalizedBlockOutput, + StreamingExecution, +} from '@/executor/types' import { hasExecutionResult } from '@/executor/utils/errors' import { coerceValue } from '@/executor/utils/start-block' import { subscriptionKeys } from '@/hooks/queries/subscription' @@ -27,7 +38,7 @@ import { useExecutionStore } from '@/stores/execution' import { useNotificationStore } from '@/stores/notifications' import { useVariablesStore } from '@/stores/panel' import { useEnvironmentStore } from '@/stores/settings/environment' -import { type ConsoleEntry, useTerminalConsoleStore } from '@/stores/terminal' +import { useTerminalConsoleStore } from '@/stores/terminal' import { useWorkflowDiffStore } from '@/stores/workflow-diff' import { useWorkflowRegistry } from '@/stores/workflows/registry/store' import { mergeSubblockState } from '@/stores/workflows/utils' @@ -41,6 +52,19 @@ interface DebugValidationResult { error?: string } +interface BlockEventHandlerConfig { + workflowId?: string + executionId?: string + workflowEdges: Array<{ id: string; target: string }> + activeBlocksSet: Set + accumulatedBlockLogs: BlockLog[] + accumulatedBlockStates: Map + executedBlockIds: Set + consoleMode: 'update' | 'add' + includeStartConsoleEntry: boolean + onBlockCompleteCallback?: (blockId: string, output: unknown) => Promise +} + const WORKFLOW_EXECUTION_FAILURE_MESSAGE = 'Workflow execution failed' function isRecord(value: unknown): value is Record { @@ -149,6 +173,340 @@ export function useWorkflowExecution() { setActiveBlocks, ]) + /** + * Builds timing fields for execution-level console entries. + */ + const buildExecutionTiming = useCallback((durationMs?: number) => { + const normalizedDuration = durationMs || 0 + return { + durationMs: normalizedDuration, + startedAt: new Date(Date.now() - normalizedDuration).toISOString(), + endedAt: new Date().toISOString(), + } + }, []) + + /** + * Adds an execution-level error entry to the console when appropriate. + */ + const addExecutionErrorConsoleEntry = useCallback( + (params: { + workflowId?: string + executionId?: string + error?: string + durationMs?: number + blockLogs: BlockLog[] + isPreExecutionError?: boolean + }) => { + if (!params.workflowId) return + + const hasBlockError = params.blockLogs.some((log) => log.error) + const isPreExecutionError = params.isPreExecutionError ?? false + if (!isPreExecutionError && hasBlockError) { + return + } + + const errorMessage = params.error || 'Execution failed' + const isTimeout = errorMessage.toLowerCase().includes('timed out') + const timing = buildExecutionTiming(params.durationMs) + + addConsole({ + input: {}, + output: {}, + success: false, + error: errorMessage, + durationMs: timing.durationMs, + startedAt: timing.startedAt, + executionOrder: isPreExecutionError ? 0 : Number.MAX_SAFE_INTEGER, + endedAt: timing.endedAt, + workflowId: params.workflowId, + blockId: isPreExecutionError + ? 'validation' + : isTimeout + ? 'timeout-error' + : 'execution-error', + executionId: params.executionId, + blockName: isPreExecutionError + ? 'Workflow Validation' + : isTimeout + ? 'Timeout Error' + : 'Execution Error', + blockType: isPreExecutionError ? 'validation' : 'error', + }) + }, + [addConsole, buildExecutionTiming] + ) + + /** + * Adds an execution-level cancellation entry to the console. + */ + const addExecutionCancelledConsoleEntry = useCallback( + (params: { workflowId?: string; executionId?: string; durationMs?: number }) => { + if (!params.workflowId) return + + const timing = buildExecutionTiming(params.durationMs) + addConsole({ + input: {}, + output: {}, + success: false, + error: 'Execution was cancelled', + durationMs: timing.durationMs, + startedAt: timing.startedAt, + executionOrder: Number.MAX_SAFE_INTEGER, + endedAt: timing.endedAt, + workflowId: params.workflowId, + blockId: 'cancelled', + executionId: params.executionId, + blockName: 'Execution Cancelled', + blockType: 'cancelled', + }) + }, + [addConsole, buildExecutionTiming] + ) + + /** + * Handles workflow-level execution errors for console output. + */ + const handleExecutionErrorConsole = useCallback( + (params: { + workflowId?: string + executionId?: string + error?: string + durationMs?: number + blockLogs: BlockLog[] + isPreExecutionError?: boolean + }) => { + if (params.workflowId) { + cancelRunningEntries(params.workflowId) + } + addExecutionErrorConsoleEntry(params) + }, + [addExecutionErrorConsoleEntry, cancelRunningEntries] + ) + + /** + * Handles workflow-level execution cancellations for console output. + */ + const handleExecutionCancelledConsole = useCallback( + (params: { workflowId?: string; executionId?: string; durationMs?: number }) => { + if (params.workflowId) { + cancelRunningEntries(params.workflowId) + } + addExecutionCancelledConsoleEntry(params) + }, + [addExecutionCancelledConsoleEntry, cancelRunningEntries] + ) + + const buildBlockEventHandlers = useCallback( + (config: BlockEventHandlerConfig) => { + const { + workflowId, + executionId, + workflowEdges, + activeBlocksSet, + accumulatedBlockLogs, + accumulatedBlockStates, + executedBlockIds, + consoleMode, + includeStartConsoleEntry, + onBlockCompleteCallback, + } = config + + const updateActiveBlocks = (blockId: string, isActive: boolean) => { + if (isActive) { + activeBlocksSet.add(blockId) + } else { + activeBlocksSet.delete(blockId) + } + setActiveBlocks(new Set(activeBlocksSet)) + } + + const markIncomingEdges = (blockId: string) => { + const incomingEdges = workflowEdges.filter((edge) => edge.target === blockId) + incomingEdges.forEach((edge) => { + setEdgeRunStatus(edge.id, 'success') + }) + } + + const isContainerBlockType = (blockType?: string) => { + return blockType === 'loop' || blockType === 'parallel' + } + + const createBlockLogEntry = ( + data: BlockCompletedData | BlockErrorData, + options: { success: boolean; output?: unknown; error?: string } + ): BlockLog => ({ + blockId: data.blockId, + blockName: data.blockName || 'Unknown Block', + blockType: data.blockType || 'unknown', + input: data.input || {}, + output: options.output ?? {}, + success: options.success, + error: options.error, + durationMs: data.durationMs, + startedAt: data.startedAt, + executionOrder: data.executionOrder, + endedAt: data.endedAt, + }) + + const addConsoleEntry = (data: BlockCompletedData, output: NormalizedBlockOutput) => { + if (!workflowId) return + addConsole({ + input: data.input || {}, + output, + success: true, + durationMs: data.durationMs, + startedAt: data.startedAt, + executionOrder: data.executionOrder, + endedAt: data.endedAt, + workflowId, + blockId: data.blockId, + executionId, + blockName: data.blockName || 'Unknown Block', + blockType: data.blockType || 'unknown', + iterationCurrent: data.iterationCurrent, + iterationTotal: data.iterationTotal, + iterationType: data.iterationType, + }) + } + + const addConsoleErrorEntry = (data: BlockErrorData) => { + if (!workflowId) return + addConsole({ + input: data.input || {}, + output: {}, + success: false, + error: data.error, + durationMs: data.durationMs, + startedAt: data.startedAt, + executionOrder: data.executionOrder, + endedAt: data.endedAt, + workflowId, + blockId: data.blockId, + executionId, + blockName: data.blockName || 'Unknown Block', + blockType: data.blockType || 'unknown', + iterationCurrent: data.iterationCurrent, + iterationTotal: data.iterationTotal, + iterationType: data.iterationType, + }) + } + + const updateConsoleEntry = (data: BlockCompletedData) => { + updateConsole( + data.blockId, + { + input: data.input || {}, + replaceOutput: data.output, + success: true, + durationMs: data.durationMs, + startedAt: data.startedAt, + endedAt: data.endedAt, + isRunning: false, + iterationCurrent: data.iterationCurrent, + iterationTotal: data.iterationTotal, + iterationType: data.iterationType, + }, + executionId + ) + } + + const updateConsoleErrorEntry = (data: BlockErrorData) => { + updateConsole( + data.blockId, + { + input: data.input || {}, + replaceOutput: {}, + success: false, + error: data.error, + durationMs: data.durationMs, + startedAt: data.startedAt, + endedAt: data.endedAt, + isRunning: false, + iterationCurrent: data.iterationCurrent, + iterationTotal: data.iterationTotal, + iterationType: data.iterationType, + }, + executionId + ) + } + + const onBlockStarted = (data: BlockStartedData) => { + updateActiveBlocks(data.blockId, true) + markIncomingEdges(data.blockId) + + if (!includeStartConsoleEntry || !workflowId) return + + const startedAt = new Date().toISOString() + addConsole({ + input: {}, + output: undefined, + success: undefined, + durationMs: undefined, + startedAt, + executionOrder: data.executionOrder, + endedAt: undefined, + workflowId, + blockId: data.blockId, + executionId, + blockName: data.blockName || 'Unknown Block', + blockType: data.blockType || 'unknown', + isRunning: true, + iterationCurrent: data.iterationCurrent, + iterationTotal: data.iterationTotal, + iterationType: data.iterationType, + }) + } + + const onBlockCompleted = (data: BlockCompletedData) => { + updateActiveBlocks(data.blockId, false) + setBlockRunStatus(data.blockId, 'success') + + executedBlockIds.add(data.blockId) + accumulatedBlockStates.set(data.blockId, { + output: data.output, + executed: true, + executionTime: data.durationMs, + }) + + if (isContainerBlockType(data.blockType)) { + return + } + + accumulatedBlockLogs.push(createBlockLogEntry(data, { success: true, output: data.output })) + + if (consoleMode === 'update') { + updateConsoleEntry(data) + } else { + addConsoleEntry(data, data.output as NormalizedBlockOutput) + } + + if (onBlockCompleteCallback) { + onBlockCompleteCallback(data.blockId, data.output).catch((error) => { + logger.error('Error in onBlockComplete callback:', error) + }) + } + } + + const onBlockError = (data: BlockErrorData) => { + updateActiveBlocks(data.blockId, false) + setBlockRunStatus(data.blockId, 'error') + + accumulatedBlockLogs.push( + createBlockLogEntry(data, { success: false, output: {}, error: data.error }) + ) + + if (consoleMode === 'update') { + updateConsoleErrorEntry(data) + } else { + addConsoleErrorEntry(data) + } + } + + return { onBlockStarted, onBlockCompleted, onBlockError } + }, + [addConsole, setActiveBlocks, setBlockRunStatus, setEdgeRunStatus, updateConsole] + ) + /** * Checks if debug session is complete based on execution result */ @@ -789,7 +1147,12 @@ export function useWorkflowExecution() { const startBlock = TriggerUtils.findStartBlock(filteredStates, 'chat') if (!startBlock) { - throw new Error(TriggerUtils.getTriggerValidationMessage('chat', 'missing')) + throw new WorkflowValidationError( + TriggerUtils.getTriggerValidationMessage('chat', 'missing'), + 'validation', + 'validation', + 'Workflow Validation' + ) } startBlockId = startBlock.blockId @@ -800,7 +1163,12 @@ export function useWorkflowExecution() { }) if (candidates.length === 0) { - const error = new Error('Workflow requires at least one trigger block to execute') + const error = new WorkflowValidationError( + 'Workflow requires at least one trigger block to execute', + 'validation', + 'validation', + 'Workflow Validation' + ) logger.error('No trigger blocks found for manual run', { allBlockTypes: Object.values(filteredStates).map((b) => b.type), }) @@ -813,7 +1181,12 @@ export function useWorkflowExecution() { (candidate) => candidate.path === StartBlockPath.SPLIT_API ) if (apiCandidates.length > 1) { - const error = new Error('Multiple API Trigger blocks found. Keep only one.') + const error = new WorkflowValidationError( + 'Multiple API Trigger blocks found. Keep only one.', + 'validation', + 'validation', + 'Workflow Validation' + ) logger.error('Multiple API triggers found') setIsExecuting(false) throw error @@ -833,7 +1206,12 @@ export function useWorkflowExecution() { const outgoingConnections = workflowEdges.filter((edge) => edge.source === startBlockId) if (outgoingConnections.length === 0) { const triggerName = selectedTrigger.name || selectedTrigger.type - const error = new Error(`${triggerName} must be connected to other blocks to execute`) + const error = new WorkflowValidationError( + `${triggerName} must be connected to other blocks to execute`, + 'validation', + 'validation', + 'Workflow Validation' + ) logger.error('Trigger has no outgoing connections', { triggerName, startBlockId }) setIsExecuting(false) throw error @@ -859,7 +1237,12 @@ export function useWorkflowExecution() { // If we don't have a valid startBlockId at this point, throw an error if (!startBlockId) { - const error = new Error('No valid trigger block found to start execution') + const error = new WorkflowValidationError( + 'No valid trigger block found to start execution', + 'validation', + 'validation', + 'Workflow Validation' + ) logger.error('No startBlockId found after trigger search') setIsExecuting(false) throw error @@ -892,6 +1275,19 @@ export function useWorkflowExecution() { // Execute the workflow try { + const blockHandlers = buildBlockEventHandlers({ + workflowId: activeWorkflowId, + executionId, + workflowEdges, + activeBlocksSet, + accumulatedBlockLogs, + accumulatedBlockStates, + executedBlockIds, + consoleMode: 'update', + includeStartConsoleEntry: true, + onBlockCompleteCallback: onBlockComplete, + }) + await executionStream.execute({ workflowId: activeWorkflowId, input: finalWorkflowInput, @@ -914,145 +1310,9 @@ export function useWorkflowExecution() { logger.info('Server execution started:', data) }, - onBlockStarted: (data) => { - activeBlocksSet.add(data.blockId) - // Create a new Set to trigger React re-render - setActiveBlocks(new Set(activeBlocksSet)) - - // Track edges that led to this block as soon as execution starts - const incomingEdges = workflowEdges.filter((edge) => edge.target === data.blockId) - incomingEdges.forEach((edge) => { - setEdgeRunStatus(edge.id, 'success') - }) - - // Add entry to terminal immediately with isRunning=true - // Use server-provided executionOrder to ensure correct sort order - const startedAt = new Date().toISOString() - addConsole({ - input: {}, - output: undefined, - success: undefined, - durationMs: undefined, - startedAt, - executionOrder: data.executionOrder, - endedAt: undefined, - workflowId: activeWorkflowId, - blockId: data.blockId, - executionId, - blockName: data.blockName || 'Unknown Block', - blockType: data.blockType || 'unknown', - isRunning: true, - // Pass through iteration context for subflow grouping - iterationCurrent: data.iterationCurrent, - iterationTotal: data.iterationTotal, - iterationType: data.iterationType, - }) - }, - - onBlockCompleted: (data) => { - activeBlocksSet.delete(data.blockId) - setActiveBlocks(new Set(activeBlocksSet)) - setBlockRunStatus(data.blockId, 'success') - - executedBlockIds.add(data.blockId) - accumulatedBlockStates.set(data.blockId, { - output: data.output, - executed: true, - executionTime: data.durationMs, - }) - - const isContainerBlock = data.blockType === 'loop' || data.blockType === 'parallel' - if (isContainerBlock) return - - const startedAt = data.startedAt - const endedAt = data.endedAt - - accumulatedBlockLogs.push({ - blockId: data.blockId, - blockName: data.blockName || 'Unknown Block', - blockType: data.blockType || 'unknown', - input: data.input || {}, - output: data.output, - success: true, - durationMs: data.durationMs, - startedAt, - executionOrder: data.executionOrder, - endedAt, - }) - - // Update existing console entry (created in onBlockStarted) with completion data - updateConsole( - data.blockId, - { - input: data.input || {}, - replaceOutput: data.output, - success: true, - durationMs: data.durationMs, - startedAt, - endedAt, - isRunning: false, - // Pass through iteration context for subflow grouping - iterationCurrent: data.iterationCurrent, - iterationTotal: data.iterationTotal, - iterationType: data.iterationType, - }, - executionId - ) - - // Call onBlockComplete callback if provided - if (onBlockComplete) { - onBlockComplete(data.blockId, data.output).catch((error) => { - logger.error('Error in onBlockComplete callback:', error) - }) - } - }, - - onBlockError: (data) => { - activeBlocksSet.delete(data.blockId) - // Create a new Set to trigger React re-render - setActiveBlocks(new Set(activeBlocksSet)) - - // Track failed block execution in run path - setBlockRunStatus(data.blockId, 'error') - - const startedAt = data.startedAt - const endedAt = data.endedAt - - // Accumulate block error log for the execution result - accumulatedBlockLogs.push({ - blockId: data.blockId, - blockName: data.blockName || 'Unknown Block', - blockType: data.blockType || 'unknown', - input: data.input || {}, - output: {}, - success: false, - error: data.error, - durationMs: data.durationMs, - startedAt, - executionOrder: data.executionOrder, - endedAt, - }) - - // Update existing console entry (created in onBlockStarted) with error data - updateConsole( - data.blockId, - { - input: data.input || {}, - replaceOutput: {}, - success: false, - error: data.error, - durationMs: data.durationMs, - startedAt, - endedAt, - isRunning: false, - // Pass through iteration context for subflow grouping - iterationCurrent: data.iterationCurrent, - iterationTotal: data.iterationTotal, - iterationType: data.iterationType, - }, - executionId - ) - }, + onBlockStarted: blockHandlers.onBlockStarted, + onBlockCompleted: blockHandlers.onBlockCompleted, + onBlockError: blockHandlers.onBlockError, onStreamChunk: (data) => { const existing = streamedContent.get(data.blockId) || '' @@ -1157,39 +1417,23 @@ export function useWorkflowExecution() { logs: accumulatedBlockLogs, } - // Only add workflow-level error if no blocks have executed yet - // This catches pre-execution errors (validation, serialization, etc.) - // Block execution errors are already logged via onBlockError callback - const { entries } = useTerminalConsoleStore.getState() - const existingLogs = entries.filter( - (log: ConsoleEntry) => log.executionId === executionId - ) - - if (existingLogs.length === 0) { - // No blocks executed yet - this is a pre-execution error - // Use 0 for executionOrder so validation errors appear first - addConsole({ - input: {}, - output: {}, - success: false, - error: data.error, - durationMs: data.duration || 0, - startedAt: new Date(Date.now() - (data.duration || 0)).toISOString(), - executionOrder: 0, - endedAt: new Date().toISOString(), - workflowId: activeWorkflowId, - blockId: 'validation', - executionId, - blockName: 'Workflow Validation', - blockType: 'validation', - }) - } + const isPreExecutionError = accumulatedBlockLogs.length === 0 + handleExecutionErrorConsole({ + workflowId: activeWorkflowId, + executionId, + error: data.error, + durationMs: data.duration, + blockLogs: accumulatedBlockLogs, + isPreExecutionError, + }) }, - onExecutionCancelled: () => { - if (activeWorkflowId) { - cancelRunningEntries(activeWorkflowId) - } + onExecutionCancelled: (data) => { + handleExecutionCancelledConsole({ + workflowId: activeWorkflowId, + executionId, + durationMs: data?.duration, + }) }, }, }) @@ -1585,115 +1829,27 @@ export function useWorkflowExecution() { const activeBlocksSet = new Set() try { + const blockHandlers = buildBlockEventHandlers({ + workflowId, + executionId, + workflowEdges, + activeBlocksSet, + accumulatedBlockLogs, + accumulatedBlockStates, + executedBlockIds, + consoleMode: 'add', + includeStartConsoleEntry: false, + }) + await executionStream.executeFromBlock({ workflowId, startBlockId: blockId, sourceSnapshot: effectiveSnapshot, input: workflowInput, callbacks: { - onBlockStarted: (data) => { - activeBlocksSet.add(data.blockId) - setActiveBlocks(new Set(activeBlocksSet)) - - const incomingEdges = workflowEdges.filter((edge) => edge.target === data.blockId) - incomingEdges.forEach((edge) => { - setEdgeRunStatus(edge.id, 'success') - }) - }, - - onBlockCompleted: (data) => { - activeBlocksSet.delete(data.blockId) - setActiveBlocks(new Set(activeBlocksSet)) - - setBlockRunStatus(data.blockId, 'success') - - executedBlockIds.add(data.blockId) - accumulatedBlockStates.set(data.blockId, { - output: data.output, - executed: true, - executionTime: data.durationMs, - }) - - const isContainerBlock = data.blockType === 'loop' || data.blockType === 'parallel' - if (isContainerBlock) return - - const startedAt = data.startedAt - const endedAt = data.endedAt - - accumulatedBlockLogs.push({ - blockId: data.blockId, - blockName: data.blockName || 'Unknown Block', - blockType: data.blockType || 'unknown', - input: data.input || {}, - output: data.output, - success: true, - durationMs: data.durationMs, - startedAt, - executionOrder: data.executionOrder, - endedAt, - }) - - addConsole({ - input: data.input || {}, - output: data.output, - success: true, - durationMs: data.durationMs, - startedAt, - executionOrder: data.executionOrder, - endedAt, - workflowId, - blockId: data.blockId, - executionId, - blockName: data.blockName || 'Unknown Block', - blockType: data.blockType || 'unknown', - iterationCurrent: data.iterationCurrent, - iterationTotal: data.iterationTotal, - iterationType: data.iterationType, - }) - }, - - onBlockError: (data) => { - activeBlocksSet.delete(data.blockId) - setActiveBlocks(new Set(activeBlocksSet)) - - setBlockRunStatus(data.blockId, 'error') - - const startedAt = data.startedAt - const endedAt = data.endedAt - - accumulatedBlockLogs.push({ - blockId: data.blockId, - blockName: data.blockName || 'Unknown Block', - blockType: data.blockType || 'unknown', - input: data.input || {}, - output: {}, - success: false, - error: data.error, - executionOrder: data.executionOrder, - durationMs: data.durationMs, - startedAt, - endedAt, - }) - - addConsole({ - input: data.input || {}, - output: {}, - success: false, - error: data.error, - durationMs: data.durationMs, - startedAt, - executionOrder: data.executionOrder, - endedAt, - workflowId, - blockId: data.blockId, - executionId, - blockName: data.blockName, - blockType: data.blockType, - iterationCurrent: data.iterationCurrent, - iterationTotal: data.iterationTotal, - iterationType: data.iterationType, - }) - }, + onBlockStarted: blockHandlers.onBlockStarted, + onBlockCompleted: blockHandlers.onBlockCompleted, + onBlockError: blockHandlers.onBlockError, onExecutionCompleted: (data) => { if (data.success) { @@ -1736,17 +1892,23 @@ export function useWorkflowExecution() { 'Workflow was modified. Run the workflow again to enable running from block.', workflowId, }) - } else { - addNotification({ - level: 'error', - message: data.error || 'Run from block failed', - workflowId, - }) } + + handleExecutionErrorConsole({ + workflowId, + executionId, + error: data.error, + durationMs: data.duration, + blockLogs: accumulatedBlockLogs, + }) }, - onExecutionCancelled: () => { - cancelRunningEntries(workflowId) + onExecutionCancelled: (data) => { + handleExecutionCancelledConsole({ + workflowId, + executionId, + durationMs: data?.duration, + }) }, }, }) @@ -1768,8 +1930,9 @@ export function useWorkflowExecution() { setBlockRunStatus, setEdgeRunStatus, addNotification, - addConsole, - cancelRunningEntries, + buildBlockEventHandlers, + handleExecutionErrorConsole, + handleExecutionCancelledConsole, executionStream, ] ) diff --git a/apps/sim/app/workspace/[workspaceId]/w/components/sidebar/components/settings-modal/components/subscription/plan-configs.ts b/apps/sim/app/workspace/[workspaceId]/w/components/sidebar/components/settings-modal/components/subscription/plan-configs.ts index 3438d72b24..8fadacc0c4 100644 --- a/apps/sim/app/workspace/[workspaceId]/w/components/sidebar/components/settings-modal/components/subscription/plan-configs.ts +++ b/apps/sim/app/workspace/[workspaceId]/w/components/sidebar/components/settings-modal/components/subscription/plan-configs.ts @@ -1,11 +1,11 @@ import { - Building2, Clock, Database, HardDrive, HeadphonesIcon, Server, ShieldCheck, + Timer, Users, Zap, } from 'lucide-react' @@ -15,8 +15,8 @@ import type { PlanFeature } from '@/app/workspace/[workspaceId]/w/components/sid export const PRO_PLAN_FEATURES: PlanFeature[] = [ { icon: Zap, text: '150 runs per minute (sync)' }, { icon: Clock, text: '1,000 runs per minute (async)' }, + { icon: Timer, text: '60 min sync execution limit' }, { icon: HardDrive, text: '50GB file storage' }, - { icon: Building2, text: 'Unlimited workspaces' }, { icon: Users, text: 'Unlimited invites' }, { icon: Database, text: 'Unlimited log retention' }, ] @@ -24,8 +24,8 @@ export const PRO_PLAN_FEATURES: PlanFeature[] = [ export const TEAM_PLAN_FEATURES: PlanFeature[] = [ { icon: Zap, text: '300 runs per minute (sync)' }, { icon: Clock, text: '2,500 runs per minute (async)' }, + { icon: Timer, text: '60 min sync execution limit' }, { icon: HardDrive, text: '500GB file storage (pooled)' }, - { icon: Building2, text: 'Unlimited workspaces' }, { icon: Users, text: 'Unlimited invites' }, { icon: Database, text: 'Unlimited log retention' }, { icon: SlackMonoIcon, text: 'Dedicated Slack channel' }, diff --git a/apps/sim/background/schedule-execution.ts b/apps/sim/background/schedule-execution.ts index c52676b8df..54fb95420d 100644 --- a/apps/sim/background/schedule-execution.ts +++ b/apps/sim/background/schedule-execution.ts @@ -4,6 +4,7 @@ import { task } from '@trigger.dev/sdk' import { Cron } from 'croner' import { eq } from 'drizzle-orm' import { v4 as uuidv4 } from 'uuid' +import { createTimeoutAbortController, getTimeoutErrorMessage } from '@/lib/core/execution-limits' import { preprocessExecution } from '@/lib/execution/preprocessing' import { LoggingSession } from '@/lib/logs/execution/logging-session' import { buildTraceSpans } from '@/lib/logs/execution/trace-spans/trace-spans' @@ -120,6 +121,7 @@ async function runWorkflowExecution({ loggingSession, requestId, executionId, + asyncTimeout, }: { payload: ScheduleExecutionPayload workflowRecord: WorkflowRecord @@ -127,6 +129,7 @@ async function runWorkflowExecution({ loggingSession: LoggingSession requestId: string executionId: string + asyncTimeout?: number }): Promise { try { logger.debug(`[${requestId}] Loading deployed workflow ${payload.workflowId}`) @@ -181,15 +184,33 @@ async function runWorkflowExecution({ [] ) - const executionResult = await executeWorkflowCore({ - snapshot, - callbacks: {}, - loggingSession, - includeFileBase64: true, - base64MaxBytes: undefined, - }) + const timeoutController = createTimeoutAbortController(asyncTimeout) + + let executionResult + try { + executionResult = await executeWorkflowCore({ + snapshot, + callbacks: {}, + loggingSession, + includeFileBase64: true, + base64MaxBytes: undefined, + abortSignal: timeoutController.signal, + }) + } finally { + timeoutController.cleanup() + } - if (executionResult.status === 'paused') { + if ( + executionResult.status === 'cancelled' && + timeoutController.isTimedOut() && + timeoutController.timeoutMs + ) { + const timeoutErrorMessage = getTimeoutErrorMessage(null, timeoutController.timeoutMs) + logger.info(`[${requestId}] Scheduled workflow execution timed out`, { + timeoutMs: timeoutController.timeoutMs, + }) + await loggingSession.markAsFailed(timeoutErrorMessage) + } else if (executionResult.status === 'paused') { if (!executionResult.snapshotSeed) { logger.error(`[${requestId}] Missing snapshot seed for paused execution`, { executionId, @@ -453,6 +474,7 @@ export async function executeScheduleJob(payload: ScheduleExecutionPayload) { loggingSession, requestId, executionId, + asyncTimeout: preprocessResult.executionTimeout?.async, }) if (executionResult.status === 'skip') { diff --git a/apps/sim/background/webhook-execution.ts b/apps/sim/background/webhook-execution.ts index 40f60971b7..c8abb1b396 100644 --- a/apps/sim/background/webhook-execution.ts +++ b/apps/sim/background/webhook-execution.ts @@ -4,7 +4,14 @@ import { createLogger } from '@sim/logger' import { task } from '@trigger.dev/sdk' import { eq } from 'drizzle-orm' import { v4 as uuidv4 } from 'uuid' +import { getHighestPrioritySubscription } from '@/lib/billing' +import { + createTimeoutAbortController, + getExecutionTimeout, + getTimeoutErrorMessage, +} from '@/lib/core/execution-limits' import { IdempotencyService, webhookIdempotency } from '@/lib/core/idempotency' +import type { SubscriptionPlan } from '@/lib/core/rate-limiter/types' import { processExecutionFiles } from '@/lib/execution/files' import { LoggingSession } from '@/lib/logs/execution/logging-session' import { buildTraceSpans } from '@/lib/logs/execution/trace-spans/trace-spans' @@ -134,7 +141,13 @@ async function executeWebhookJobInternal( requestId ) - // Track deploymentVersionId at function scope so it's available in catch block + const userSubscription = await getHighestPrioritySubscription(payload.userId) + const asyncTimeout = getExecutionTimeout( + userSubscription?.plan as SubscriptionPlan | undefined, + 'async' + ) + const timeoutController = createTimeoutAbortController(asyncTimeout) + let deploymentVersionId: string | undefined try { @@ -241,11 +254,22 @@ async function executeWebhookJobInternal( snapshot, callbacks: {}, loggingSession, - includeFileBase64: true, // Enable base64 hydration - base64MaxBytes: undefined, // Use default limit + includeFileBase64: true, + base64MaxBytes: undefined, + abortSignal: timeoutController.signal, }) - if (executionResult.status === 'paused') { + if ( + executionResult.status === 'cancelled' && + timeoutController.isTimedOut() && + timeoutController.timeoutMs + ) { + const timeoutErrorMessage = getTimeoutErrorMessage(null, timeoutController.timeoutMs) + logger.info(`[${requestId}] Airtable webhook execution timed out`, { + timeoutMs: timeoutController.timeoutMs, + }) + await loggingSession.markAsFailed(timeoutErrorMessage) + } else if (executionResult.status === 'paused') { if (!executionResult.snapshotSeed) { logger.error(`[${requestId}] Missing snapshot seed for paused execution`, { executionId, @@ -497,9 +521,20 @@ async function executeWebhookJobInternal( callbacks: {}, loggingSession, includeFileBase64: true, + abortSignal: timeoutController.signal, }) - if (executionResult.status === 'paused') { + if ( + executionResult.status === 'cancelled' && + timeoutController.isTimedOut() && + timeoutController.timeoutMs + ) { + const timeoutErrorMessage = getTimeoutErrorMessage(null, timeoutController.timeoutMs) + logger.info(`[${requestId}] Webhook execution timed out`, { + timeoutMs: timeoutController.timeoutMs, + }) + await loggingSession.markAsFailed(timeoutErrorMessage) + } else if (executionResult.status === 'paused') { if (!executionResult.snapshotSeed) { logger.error(`[${requestId}] Missing snapshot seed for paused execution`, { executionId, @@ -601,6 +636,8 @@ async function executeWebhookJobInternal( } throw error + } finally { + timeoutController.cleanup() } } diff --git a/apps/sim/background/workflow-execution.ts b/apps/sim/background/workflow-execution.ts index 99e83d54cc..818480fb62 100644 --- a/apps/sim/background/workflow-execution.ts +++ b/apps/sim/background/workflow-execution.ts @@ -1,6 +1,7 @@ import { createLogger } from '@sim/logger' import { task } from '@trigger.dev/sdk' import { v4 as uuidv4 } from 'uuid' +import { createTimeoutAbortController, getTimeoutErrorMessage } from '@/lib/core/execution-limits' import { preprocessExecution } from '@/lib/execution/preprocessing' import { LoggingSession } from '@/lib/logs/execution/logging-session' import { buildTraceSpans } from '@/lib/logs/execution/trace-spans/trace-spans' @@ -103,15 +104,33 @@ export async function executeWorkflowJob(payload: WorkflowExecutionPayload) { [] ) - const result = await executeWorkflowCore({ - snapshot, - callbacks: {}, - loggingSession, - includeFileBase64: true, - base64MaxBytes: undefined, - }) + const timeoutController = createTimeoutAbortController(preprocessResult.executionTimeout?.async) + + let result + try { + result = await executeWorkflowCore({ + snapshot, + callbacks: {}, + loggingSession, + includeFileBase64: true, + base64MaxBytes: undefined, + abortSignal: timeoutController.signal, + }) + } finally { + timeoutController.cleanup() + } - if (result.status === 'paused') { + if ( + result.status === 'cancelled' && + timeoutController.isTimedOut() && + timeoutController.timeoutMs + ) { + const timeoutErrorMessage = getTimeoutErrorMessage(null, timeoutController.timeoutMs) + logger.info(`[${requestId}] Workflow execution timed out`, { + timeoutMs: timeoutController.timeoutMs, + }) + await loggingSession.markAsFailed(timeoutErrorMessage) + } else if (result.status === 'paused') { if (!result.snapshotSeed) { logger.error(`[${requestId}] Missing snapshot seed for paused execution`, { executionId, diff --git a/apps/sim/executor/constants.ts b/apps/sim/executor/constants.ts index 8cfbaed582..b5f97dd471 100644 --- a/apps/sim/executor/constants.ts +++ b/apps/sim/executor/constants.ts @@ -1,3 +1,4 @@ +import { getMaxExecutionTimeout } from '@/lib/core/execution-limits' import type { LoopType, ParallelType } from '@/lib/workflows/types' /** @@ -187,8 +188,12 @@ export const HTTP = { export const AGENT = { DEFAULT_MODEL: 'claude-sonnet-4-5', - DEFAULT_FUNCTION_TIMEOUT: 600000, - REQUEST_TIMEOUT: 600000, + get DEFAULT_FUNCTION_TIMEOUT() { + return getMaxExecutionTimeout() + }, + get REQUEST_TIMEOUT() { + return getMaxExecutionTimeout() + }, CUSTOM_TOOL_PREFIX: 'custom_', } as const diff --git a/apps/sim/executor/execution/engine.ts b/apps/sim/executor/execution/engine.ts index 9bff038ee1..47afd8b032 100644 --- a/apps/sim/executor/execution/engine.ts +++ b/apps/sim/executor/execution/engine.ts @@ -162,6 +162,8 @@ export class ExecutionEngine { } } + this.finalizeIncompleteLogs() + const errorMessage = normalizeError(error) logger.error('Execution failed', { error: errorMessage }) diff --git a/apps/sim/executor/handlers/wait/wait-handler.ts b/apps/sim/executor/handlers/wait/wait-handler.ts index 5d62509f07..09ce055ec5 100644 --- a/apps/sim/executor/handlers/wait/wait-handler.ts +++ b/apps/sim/executor/handlers/wait/wait-handler.ts @@ -14,7 +14,7 @@ const sleep = async (ms: number, options: SleepOptions = {}): Promise = const { signal, executionId } = options const useRedis = isRedisCancellationEnabled() && !!executionId - if (!useRedis && signal?.aborted) { + if (signal?.aborted) { return false } @@ -27,7 +27,7 @@ const sleep = async (ms: number, options: SleepOptions = {}): Promise = const cleanup = () => { if (mainTimeoutId) clearTimeout(mainTimeoutId) if (checkIntervalId) clearInterval(checkIntervalId) - if (!useRedis && signal) signal.removeEventListener('abort', onAbort) + if (signal) signal.removeEventListener('abort', onAbort) } const onAbort = () => { @@ -37,6 +37,10 @@ const sleep = async (ms: number, options: SleepOptions = {}): Promise = resolve(false) } + if (signal) { + signal.addEventListener('abort', onAbort, { once: true }) + } + if (useRedis) { checkIntervalId = setInterval(async () => { if (resolved) return @@ -49,8 +53,6 @@ const sleep = async (ms: number, options: SleepOptions = {}): Promise = } } catch {} }, CANCELLATION_CHECK_INTERVAL_MS) - } else if (signal) { - signal.addEventListener('abort', onAbort, { once: true }) } mainTimeoutId = setTimeout(() => { diff --git a/apps/sim/executor/handlers/workflow/workflow-handler.ts b/apps/sim/executor/handlers/workflow/workflow-handler.ts index 1c780ccb0e..d32353f434 100644 --- a/apps/sim/executor/handlers/workflow/workflow-handler.ts +++ b/apps/sim/executor/handlers/workflow/workflow-handler.ts @@ -126,6 +126,7 @@ export class WorkflowBlockHandler implements BlockHandler { workspaceId: ctx.workspaceId, userId: ctx.userId, executionId: ctx.executionId, + abortSignal: ctx.abortSignal, }, }) diff --git a/apps/sim/executor/utils/subflow-utils.ts b/apps/sim/executor/utils/subflow-utils.ts index c4eb23f380..8c92f82a45 100644 --- a/apps/sim/executor/utils/subflow-utils.ts +++ b/apps/sim/executor/utils/subflow-utils.ts @@ -229,6 +229,10 @@ export function addSubflowErrorLog( } ctx.blockLogs.push(blockLog) + if (contextExtensions?.onBlockStart) { + contextExtensions.onBlockStart(blockId, blockName, blockType, execOrder) + } + if (contextExtensions?.onBlockComplete) { contextExtensions.onBlockComplete(blockId, blockName, blockType, { input: inputData, diff --git a/apps/sim/lib/a2a/constants.ts b/apps/sim/lib/a2a/constants.ts index 9027151857..d55dec2f52 100644 --- a/apps/sim/lib/a2a/constants.ts +++ b/apps/sim/lib/a2a/constants.ts @@ -1,7 +1,10 @@ export { AGENT_CARD_PATH } from '@a2a-js/sdk' + +import { DEFAULT_EXECUTION_TIMEOUT_MS } from '@/lib/core/execution-limits' + export const A2A_PROTOCOL_VERSION = '0.3.0' -export const A2A_DEFAULT_TIMEOUT = 300000 +export const A2A_DEFAULT_TIMEOUT = DEFAULT_EXECUTION_TIMEOUT_MS /** * Maximum number of messages stored per task in the database. diff --git a/apps/sim/lib/copilot/tools/client/base-tool.ts b/apps/sim/lib/copilot/tools/client/base-tool.ts index 75b02bfe21..8d7d396f9c 100644 --- a/apps/sim/lib/copilot/tools/client/base-tool.ts +++ b/apps/sim/lib/copilot/tools/client/base-tool.ts @@ -5,10 +5,8 @@ import type { ToolUIConfig } from './ui-config' const baseToolLogger = createLogger('BaseClientTool') -/** Default timeout for tool execution (5 minutes) */ -const DEFAULT_TOOL_TIMEOUT_MS = 2 * 60 * 1000 +const DEFAULT_TOOL_TIMEOUT_MS = 5 * 60 * 1000 -/** Timeout for tools that run workflows (10 minutes) */ export const WORKFLOW_EXECUTION_TIMEOUT_MS = 10 * 60 * 1000 // Client tool call states used by the new runtime diff --git a/apps/sim/lib/core/config/env.ts b/apps/sim/lib/core/config/env.ts index 6bd9df299e..60dfb73b39 100644 --- a/apps/sim/lib/core/config/env.ts +++ b/apps/sim/lib/core/config/env.ts @@ -170,6 +170,11 @@ export const env = createEnv({ RATE_LIMIT_ENTERPRISE_SYNC: z.string().optional().default('600'), // Enterprise tier sync API executions per minute RATE_LIMIT_ENTERPRISE_ASYNC: z.string().optional().default('5000'), // Enterprise tier async API executions per minute + EXECUTION_TIMEOUT_FREE: z.string().optional().default('300'), + EXECUTION_TIMEOUT_PRO: z.string().optional().default('3600'), + EXECUTION_TIMEOUT_TEAM: z.string().optional().default('3600'), + EXECUTION_TIMEOUT_ENTERPRISE: z.string().optional().default('3600'), + // Knowledge Base Processing Configuration - Shared across all processing methods KB_CONFIG_MAX_DURATION: z.number().optional().default(600), // Max processing duration in seconds (10 minutes) KB_CONFIG_MAX_ATTEMPTS: z.number().optional().default(3), // Max retry attempts diff --git a/apps/sim/lib/core/execution-limits/index.ts b/apps/sim/lib/core/execution-limits/index.ts new file mode 100644 index 0000000000..c9f6f047dc --- /dev/null +++ b/apps/sim/lib/core/execution-limits/index.ts @@ -0,0 +1 @@ +export * from './types' diff --git a/apps/sim/lib/core/execution-limits/types.ts b/apps/sim/lib/core/execution-limits/types.ts new file mode 100644 index 0000000000..06df5966f2 --- /dev/null +++ b/apps/sim/lib/core/execution-limits/types.ts @@ -0,0 +1,134 @@ +import { env } from '@/lib/core/config/env' +import type { SubscriptionPlan } from '@/lib/core/rate-limiter/types' + +interface ExecutionTimeoutConfig { + sync: number + async: number +} + +const DEFAULT_SYNC_TIMEOUTS_SECONDS = { + free: 300, + pro: 3600, + team: 3600, + enterprise: 3600, +} as const + +const ASYNC_MULTIPLIER = 2 +const MAX_ASYNC_TIMEOUT_SECONDS = 5400 + +function getSyncTimeoutForPlan(plan: SubscriptionPlan): number { + const envVarMap: Record = { + free: env.EXECUTION_TIMEOUT_FREE, + pro: env.EXECUTION_TIMEOUT_PRO, + team: env.EXECUTION_TIMEOUT_TEAM, + enterprise: env.EXECUTION_TIMEOUT_ENTERPRISE, + } + return (Number.parseInt(envVarMap[plan] || '') || DEFAULT_SYNC_TIMEOUTS_SECONDS[plan]) * 1000 +} + +function getAsyncTimeoutForPlan(plan: SubscriptionPlan): number { + const syncMs = getSyncTimeoutForPlan(plan) + const asyncMs = syncMs * ASYNC_MULTIPLIER + const maxAsyncMs = MAX_ASYNC_TIMEOUT_SECONDS * 1000 + return Math.min(asyncMs, maxAsyncMs) +} + +const EXECUTION_TIMEOUTS: Record = { + free: { + sync: getSyncTimeoutForPlan('free'), + async: getAsyncTimeoutForPlan('free'), + }, + pro: { + sync: getSyncTimeoutForPlan('pro'), + async: getAsyncTimeoutForPlan('pro'), + }, + team: { + sync: getSyncTimeoutForPlan('team'), + async: getAsyncTimeoutForPlan('team'), + }, + enterprise: { + sync: getSyncTimeoutForPlan('enterprise'), + async: getAsyncTimeoutForPlan('enterprise'), + }, +} + +export function getExecutionTimeout( + plan: SubscriptionPlan | undefined, + type: 'sync' | 'async' = 'sync' +): number { + return EXECUTION_TIMEOUTS[plan || 'free'][type] +} + +export function getMaxExecutionTimeout(): number { + return EXECUTION_TIMEOUTS.enterprise.async +} + +export const DEFAULT_EXECUTION_TIMEOUT_MS = EXECUTION_TIMEOUTS.free.sync + +export function isTimeoutError(error: unknown): boolean { + if (!error) return false + + if (error instanceof Error) { + return error.name === 'TimeoutError' + } + + if (typeof error === 'object' && 'name' in error) { + return (error as { name: string }).name === 'TimeoutError' + } + + return false +} + +export function getTimeoutErrorMessage(error: unknown, timeoutMs?: number): string { + if (timeoutMs) { + const timeoutSeconds = Math.floor(timeoutMs / 1000) + const timeoutMinutes = Math.floor(timeoutSeconds / 60) + const displayTime = + timeoutMinutes > 0 + ? `${timeoutMinutes} minute${timeoutMinutes > 1 ? 's' : ''}` + : `${timeoutSeconds} seconds` + return `Execution timed out after ${displayTime}` + } + + return 'Execution timed out' +} + +/** + * Helper to create an AbortController with timeout handling. + * Centralizes the timeout abort pattern used across execution paths. + */ +export interface TimeoutAbortController { + /** The AbortSignal to pass to execution functions */ + signal: AbortSignal + /** Returns true if the abort was triggered by timeout (not user cancellation) */ + isTimedOut: () => boolean + /** Cleanup function - call in finally block to clear the timeout */ + cleanup: () => void + /** Manually abort the execution (for user cancellation) */ + abort: () => void + /** The timeout duration in milliseconds (undefined if no timeout) */ + timeoutMs: number | undefined +} + +export function createTimeoutAbortController(timeoutMs?: number): TimeoutAbortController { + const abortController = new AbortController() + let isTimedOut = false + let timeoutId: NodeJS.Timeout | undefined + + if (timeoutMs) { + timeoutId = setTimeout(() => { + isTimedOut = true + abortController.abort() + }, timeoutMs) + } + + return { + signal: abortController.signal, + isTimedOut: () => isTimedOut, + cleanup: () => { + if (timeoutId) clearTimeout(timeoutId) + }, + abort: () => abortController.abort(), + timeoutMs, + } +} diff --git a/apps/sim/lib/core/idempotency/service.ts b/apps/sim/lib/core/idempotency/service.ts index 7a7239f9a0..b8fae55c03 100644 --- a/apps/sim/lib/core/idempotency/service.ts +++ b/apps/sim/lib/core/idempotency/service.ts @@ -4,6 +4,7 @@ import { idempotencyKey } from '@sim/db/schema' import { createLogger } from '@sim/logger' import { eq } from 'drizzle-orm' import { getRedisClient } from '@/lib/core/config/redis' +import { getMaxExecutionTimeout } from '@/lib/core/execution-limits' import { getStorageMethod, type StorageMethod } from '@/lib/core/storage' import { extractProviderIdentifierFromBody } from '@/lib/webhooks/provider-utils' @@ -36,9 +37,9 @@ export interface AtomicClaimResult { storageMethod: StorageMethod } -const DEFAULT_TTL = 60 * 60 * 24 * 7 // 7 days +const DEFAULT_TTL = 60 * 60 * 24 * 7 const REDIS_KEY_PREFIX = 'idempotency:' -const MAX_WAIT_TIME_MS = 300000 // 5 minutes max wait +const MAX_WAIT_TIME_MS = getMaxExecutionTimeout() const POLL_INTERVAL_MS = 1000 /** diff --git a/apps/sim/lib/core/utils/validation.ts b/apps/sim/lib/core/utils/validation.ts index 0945b30de5..5fcfc4d357 100644 --- a/apps/sim/lib/core/utils/validation.ts +++ b/apps/sim/lib/core/utils/validation.ts @@ -50,3 +50,13 @@ export function getInvalidCharacters(name: string): string[] { const invalidChars = name.match(/[^a-zA-Z0-9_\s]/g) return invalidChars ? [...new Set(invalidChars)] : [] } + +/** + * Escapes non-ASCII characters in JSON string for HTTP header safety. + * Dropbox API requires characters 0x7F and all non-ASCII to be escaped as \uXXXX. + */ +export function httpHeaderSafeJson(value: object): string { + return JSON.stringify(value).replace(/[\u007f-\uffff]/g, (c) => { + return `\\u${(`0000${c.charCodeAt(0).toString(16)}`).slice(-4)}` + }) +} diff --git a/apps/sim/lib/execution/constants.ts b/apps/sim/lib/execution/constants.ts index bf095770b9..faca9679b4 100644 --- a/apps/sim/lib/execution/constants.ts +++ b/apps/sim/lib/execution/constants.ts @@ -1,7 +1,3 @@ -/** - * Execution timeout constants - * - * DEFAULT_EXECUTION_TIMEOUT_MS: The default timeout for executing user code (10 minutes) - */ +import { DEFAULT_EXECUTION_TIMEOUT_MS } from '@/lib/core/execution-limits' -export const DEFAULT_EXECUTION_TIMEOUT_MS = 600000 // 10 minutes (600 seconds) +export { DEFAULT_EXECUTION_TIMEOUT_MS } diff --git a/apps/sim/lib/execution/preprocessing.ts b/apps/sim/lib/execution/preprocessing.ts index 5bbb834e80..9a0236fd16 100644 --- a/apps/sim/lib/execution/preprocessing.ts +++ b/apps/sim/lib/execution/preprocessing.ts @@ -4,7 +4,9 @@ import { createLogger } from '@sim/logger' import { eq } from 'drizzle-orm' import { checkServerSideUsageLimits } from '@/lib/billing/calculations/usage-monitor' import { getHighestPrioritySubscription } from '@/lib/billing/core/subscription' +import { getExecutionTimeout } from '@/lib/core/execution-limits' import { RateLimiter } from '@/lib/core/rate-limiter/rate-limiter' +import type { SubscriptionPlan } from '@/lib/core/rate-limiter/types' import { LoggingSession } from '@/lib/logs/execution/logging-session' import { getWorkspaceBilledAccountUserId } from '@/lib/workspaces/utils' import type { CoreTriggerType } from '@/stores/logs/filters/types' @@ -133,10 +135,10 @@ export interface PreprocessExecutionResult { success: boolean error?: { message: string - statusCode: number // HTTP status code (401, 402, 403, 404, 429, 500) - logCreated: boolean // Whether error was logged to execution_logs + statusCode: number + logCreated: boolean } - actorUserId?: string // The user ID that will be billed + actorUserId?: string workflowRecord?: WorkflowRecord userSubscription?: SubscriptionInfo | null rateLimitInfo?: { @@ -144,6 +146,10 @@ export interface PreprocessExecutionResult { remaining: number resetAt: Date } + executionTimeout?: { + sync: number + async: number + } } type WorkflowRecord = typeof workflow.$inferSelect @@ -484,12 +490,17 @@ export async function preprocessExecution( triggerType, }) + const plan = userSubscription?.plan as SubscriptionPlan | undefined return { success: true, actorUserId, workflowRecord, userSubscription, rateLimitInfo, + executionTimeout: { + sync: getExecutionTimeout(plan, 'sync'), + async: getExecutionTimeout(plan, 'async'), + }, } } diff --git a/apps/sim/lib/logs/execution/logger.ts b/apps/sim/lib/logs/execution/logger.ts index 055b8fe268..0fc47fa73b 100644 --- a/apps/sim/lib/logs/execution/logger.ts +++ b/apps/sim/lib/logs/execution/logger.ts @@ -261,10 +261,14 @@ export class ExecutionLogger implements IExecutionLoggerService { models: costSummary.models, } - const totalDuration = + const rawDurationMs = isResume && existingLog?.startedAt ? new Date(endedAt).getTime() - new Date(existingLog.startedAt).getTime() : totalDurationMs + const totalDuration = + typeof rawDurationMs === 'number' && Number.isFinite(rawDurationMs) + ? Math.max(0, Math.round(rawDurationMs)) + : 0 const [updatedLog] = await db .update(workflowExecutionLogs) diff --git a/apps/sim/lib/logs/execution/logging-session.ts b/apps/sim/lib/logs/execution/logging-session.ts index fd3ae55bab..be15156869 100644 --- a/apps/sim/lib/logs/execution/logging-session.ts +++ b/apps/sim/lib/logs/execution/logging-session.ts @@ -776,11 +776,16 @@ export class LoggingSession { await db .update(workflowExecutionLogs) .set({ + level: 'error', status: 'failed', executionData: sql`jsonb_set( - COALESCE(execution_data, '{}'::jsonb), - ARRAY['error'], - to_jsonb(${message}::text) + jsonb_set( + COALESCE(execution_data, '{}'::jsonb), + ARRAY['error'], + to_jsonb(${message}::text) + ), + ARRAY['finalOutput'], + jsonb_build_object('error', ${message}::text) )`, }) .where(eq(workflowExecutionLogs.executionId, executionId)) diff --git a/apps/sim/lib/mcp/client.ts b/apps/sim/lib/mcp/client.ts index 5fdb4adac6..b65e9a1453 100644 --- a/apps/sim/lib/mcp/client.ts +++ b/apps/sim/lib/mcp/client.ts @@ -12,6 +12,7 @@ import { Client } from '@modelcontextprotocol/sdk/client/index.js' import { StreamableHTTPClientTransport } from '@modelcontextprotocol/sdk/client/streamableHttp.js' import type { ListToolsResult, Tool } from '@modelcontextprotocol/sdk/types.js' import { createLogger } from '@sim/logger' +import { getMaxExecutionTimeout } from '@/lib/core/execution-limits' import { McpConnectionError, type McpConnectionStatus, @@ -202,7 +203,7 @@ export class McpClient { const sdkResult = await this.client.callTool( { name: toolCall.name, arguments: toolCall.arguments }, undefined, - { timeout: 600000 } // 10 minutes - override SDK's 60s default + { timeout: getMaxExecutionTimeout() } ) return sdkResult as McpToolResult diff --git a/apps/sim/lib/mcp/shared.ts b/apps/sim/lib/mcp/shared.ts index 3f3bdae664..eaecff1f0e 100644 --- a/apps/sim/lib/mcp/shared.ts +++ b/apps/sim/lib/mcp/shared.ts @@ -32,9 +32,11 @@ export function sanitizeHeaders( /** * Client-safe MCP constants + * Note: CLIENT_TIMEOUT should match DEFAULT_EXECUTION_TIMEOUT_MS from @/lib/core/execution-limits + * (5 minutes = 300 seconds for free tier). Keep in sync if that value changes. */ export const MCP_CLIENT_CONSTANTS = { - CLIENT_TIMEOUT: 600000, + CLIENT_TIMEOUT: 5 * 60 * 1000, // 5 minutes - matches DEFAULT_EXECUTION_TIMEOUT_MS MAX_RETRIES: 3, RECONNECT_DELAY: 1000, } as const diff --git a/apps/sim/lib/mcp/utils.test.ts b/apps/sim/lib/mcp/utils.test.ts index 1b0cabf98c..65d11a7427 100644 --- a/apps/sim/lib/mcp/utils.test.ts +++ b/apps/sim/lib/mcp/utils.test.ts @@ -1,4 +1,5 @@ import { describe, expect, it } from 'vitest' +import { DEFAULT_EXECUTION_TIMEOUT_MS } from '@/lib/core/execution-limits' import { categorizeError, createMcpToolId, @@ -81,8 +82,8 @@ describe('generateMcpServerId', () => { }) describe('MCP_CONSTANTS', () => { - it.concurrent('has correct execution timeout (10 minutes)', () => { - expect(MCP_CONSTANTS.EXECUTION_TIMEOUT).toBe(600000) + it.concurrent('has correct execution timeout', () => { + expect(MCP_CONSTANTS.EXECUTION_TIMEOUT).toBe(DEFAULT_EXECUTION_TIMEOUT_MS) }) it.concurrent('has correct cache timeout (5 minutes)', () => { @@ -107,8 +108,8 @@ describe('MCP_CONSTANTS', () => { }) describe('MCP_CLIENT_CONSTANTS', () => { - it.concurrent('has correct client timeout (10 minutes)', () => { - expect(MCP_CLIENT_CONSTANTS.CLIENT_TIMEOUT).toBe(600000) + it.concurrent('has correct client timeout', () => { + expect(MCP_CLIENT_CONSTANTS.CLIENT_TIMEOUT).toBe(DEFAULT_EXECUTION_TIMEOUT_MS) }) it.concurrent('has correct auto refresh interval (5 minutes)', () => { diff --git a/apps/sim/lib/mcp/utils.ts b/apps/sim/lib/mcp/utils.ts index ab28a66ee2..3e6af0551c 100644 --- a/apps/sim/lib/mcp/utils.ts +++ b/apps/sim/lib/mcp/utils.ts @@ -1,12 +1,10 @@ import { NextResponse } from 'next/server' +import { DEFAULT_EXECUTION_TIMEOUT_MS } from '@/lib/core/execution-limits' import type { McpApiResponse } from '@/lib/mcp/types' import { isMcpTool, MCP } from '@/executor/constants' -/** - * MCP-specific constants - */ export const MCP_CONSTANTS = { - EXECUTION_TIMEOUT: 600000, + EXECUTION_TIMEOUT: DEFAULT_EXECUTION_TIMEOUT_MS, CACHE_TIMEOUT: 5 * 60 * 1000, DEFAULT_RETRIES: 3, DEFAULT_CONNECTION_TIMEOUT: 30000, @@ -45,11 +43,8 @@ export function sanitizeHeaders( ) } -/** - * Client-safe MCP constants - */ export const MCP_CLIENT_CONSTANTS = { - CLIENT_TIMEOUT: 600000, + CLIENT_TIMEOUT: DEFAULT_EXECUTION_TIMEOUT_MS, AUTO_REFRESH_INTERVAL: 5 * 60 * 1000, } as const diff --git a/apps/sim/lib/workflows/executor/execute-workflow.ts b/apps/sim/lib/workflows/executor/execute-workflow.ts index 1ed65c1192..8edce5526e 100644 --- a/apps/sim/lib/workflows/executor/execute-workflow.ts +++ b/apps/sim/lib/workflows/executor/execute-workflow.ts @@ -19,6 +19,7 @@ export interface ExecuteWorkflowOptions { skipLoggingComplete?: boolean includeFileBase64?: boolean base64MaxBytes?: number + abortSignal?: AbortSignal } export interface WorkflowInfo { @@ -82,6 +83,7 @@ export async function executeWorkflow( loggingSession, includeFileBase64: streamConfig?.includeFileBase64, base64MaxBytes: streamConfig?.base64MaxBytes, + abortSignal: streamConfig?.abortSignal, }) if (result.status === 'paused') { diff --git a/apps/sim/lib/workflows/executor/execution-events.ts b/apps/sim/lib/workflows/executor/execution-events.ts index ba36f97871..bc5c316c28 100644 --- a/apps/sim/lib/workflows/executor/execution-events.ts +++ b/apps/sim/lib/workflows/executor/execution-events.ts @@ -58,9 +58,6 @@ export interface ExecutionErrorEvent extends BaseExecutionEvent { } } -/** - * Execution cancelled event - */ export interface ExecutionCancelledEvent extends BaseExecutionEvent { type: 'execution:cancelled' workflowId: string @@ -167,9 +164,6 @@ export type ExecutionEvent = | StreamChunkEvent | StreamDoneEvent -/** - * Extracted data types for use in callbacks - */ export type ExecutionStartedData = ExecutionStartedEvent['data'] export type ExecutionCompletedData = ExecutionCompletedEvent['data'] export type ExecutionErrorData = ExecutionErrorEvent['data'] diff --git a/apps/sim/lib/workflows/executor/human-in-the-loop-manager.ts b/apps/sim/lib/workflows/executor/human-in-the-loop-manager.ts index 479ead99ac..ee176ad9b6 100644 --- a/apps/sim/lib/workflows/executor/human-in-the-loop-manager.ts +++ b/apps/sim/lib/workflows/executor/human-in-the-loop-manager.ts @@ -4,6 +4,7 @@ import { pausedExecutions, resumeQueue, workflowExecutionLogs } from '@sim/db/sc import { createLogger } from '@sim/logger' import { and, asc, desc, eq, inArray, lt, type SQL, sql } from 'drizzle-orm' import type { Edge } from 'reactflow' +import { createTimeoutAbortController, getTimeoutErrorMessage } from '@/lib/core/execution-limits' import { preprocessExecution } from '@/lib/execution/preprocessing' import { LoggingSession } from '@/lib/logs/execution/logging-session' import { executeWorkflowCore } from '@/lib/workflows/executor/execution-core' @@ -771,14 +772,39 @@ export class PauseResumeManager { actorUserId: metadata.userId, }) - return await executeWorkflowCore({ - snapshot: resumeSnapshot, - callbacks: {}, - loggingSession, - skipLogCreation: true, // Reuse existing log entry - includeFileBase64: true, // Enable base64 hydration - base64MaxBytes: undefined, // Use default limit - }) + const timeoutController = createTimeoutAbortController( + preprocessingResult.executionTimeout?.async + ) + + let result: ExecutionResult + try { + result = await executeWorkflowCore({ + snapshot: resumeSnapshot, + callbacks: {}, + loggingSession, + skipLogCreation: true, // Reuse existing log entry + includeFileBase64: true, // Enable base64 hydration + base64MaxBytes: undefined, // Use default limit + abortSignal: timeoutController.signal, + }) + } finally { + timeoutController.cleanup() + } + + if ( + result.status === 'cancelled' && + timeoutController.isTimedOut() && + timeoutController.timeoutMs + ) { + const timeoutErrorMessage = getTimeoutErrorMessage(null, timeoutController.timeoutMs) + logger.info('Resume execution timed out', { + resumeExecutionId, + timeoutMs: timeoutController.timeoutMs, + }) + await loggingSession.markAsFailed(timeoutErrorMessage) + } + + return result } private static async markResumeCompleted(args: { diff --git a/apps/sim/lib/workflows/streaming/streaming.ts b/apps/sim/lib/workflows/streaming/streaming.ts index 88e7a584d5..0bb84b66bf 100644 --- a/apps/sim/lib/workflows/streaming/streaming.ts +++ b/apps/sim/lib/workflows/streaming/streaming.ts @@ -1,4 +1,5 @@ import { createLogger } from '@sim/logger' +import { createTimeoutAbortController, getTimeoutErrorMessage } from '@/lib/core/execution-limits' import { extractBlockIdFromOutputId, extractPathFromOutputId, @@ -32,6 +33,7 @@ export interface StreamingConfig { workflowTriggerType?: 'api' | 'chat' includeFileBase64?: boolean base64MaxBytes?: number + timeoutMs?: number } export interface StreamingResponseOptions { @@ -169,6 +171,7 @@ export async function createStreamingResponse( options: StreamingResponseOptions ): Promise { const { requestId, workflow, input, executingUserId, streamConfig, executionId } = options + const timeoutController = createTimeoutAbortController(streamConfig.timeoutMs) return new ReadableStream({ async start(controller) { @@ -284,6 +287,7 @@ export async function createStreamingResponse( skipLoggingComplete: true, includeFileBase64: streamConfig.includeFileBase64, base64MaxBytes: streamConfig.base64MaxBytes, + abortSignal: timeoutController.signal, }, executionId ) @@ -293,18 +297,34 @@ export async function createStreamingResponse( processStreamingBlockLogs(result.logs, state.streamedContent) } - await completeLoggingSession(result) + if ( + result.status === 'cancelled' && + timeoutController.isTimedOut() && + timeoutController.timeoutMs + ) { + const timeoutErrorMessage = getTimeoutErrorMessage(null, timeoutController.timeoutMs) + logger.info(`[${requestId}] Streaming execution timed out`, { + timeoutMs: timeoutController.timeoutMs, + }) + if (result._streamingMetadata?.loggingSession) { + await result._streamingMetadata.loggingSession.markAsFailed(timeoutErrorMessage) + } + controller.enqueue(encodeSSE({ event: 'error', error: timeoutErrorMessage })) + } else { + await completeLoggingSession(result) + + const minimalResult = await buildMinimalResult( + result, + streamConfig.selectedOutputs, + state.streamedContent, + requestId, + streamConfig.includeFileBase64 ?? true, + streamConfig.base64MaxBytes + ) - const minimalResult = await buildMinimalResult( - result, - streamConfig.selectedOutputs, - state.streamedContent, - requestId, - streamConfig.includeFileBase64 ?? true, - streamConfig.base64MaxBytes - ) + controller.enqueue(encodeSSE({ event: 'final', data: minimalResult })) + } - controller.enqueue(encodeSSE({ event: 'final', data: minimalResult })) controller.enqueue(encodeSSE('[DONE]')) if (executionId) { @@ -323,6 +343,20 @@ export async function createStreamingResponse( } controller.close() + } finally { + timeoutController.cleanup() + } + }, + async cancel(reason) { + logger.info(`[${requestId}] Streaming response cancelled`, { reason }) + timeoutController.abort() + timeoutController.cleanup() + if (executionId) { + try { + await cleanupExecutionBase64Cache(executionId) + } catch (error) { + logger.error(`[${requestId}] Failed to cleanup base64 cache`, { error }) + } } }, }) diff --git a/apps/sim/tools/apify/run_actor_async.ts b/apps/sim/tools/apify/run_actor_async.ts index 0eeef731dc..ca600e11d2 100644 --- a/apps/sim/tools/apify/run_actor_async.ts +++ b/apps/sim/tools/apify/run_actor_async.ts @@ -1,8 +1,9 @@ +import { DEFAULT_EXECUTION_TIMEOUT_MS } from '@/lib/core/execution-limits' import type { RunActorParams, RunActorResult } from '@/tools/apify/types' import type { ToolConfig } from '@/tools/types' -const POLL_INTERVAL_MS = 5000 // 5 seconds between polls -const MAX_POLL_TIME_MS = 300000 // 5 minutes maximum polling time +const POLL_INTERVAL_MS = 5000 +const MAX_POLL_TIME_MS = DEFAULT_EXECUTION_TIMEOUT_MS export const apifyRunActorAsyncTool: ToolConfig = { id: 'apify_run_actor_async', diff --git a/apps/sim/tools/browser_use/run_task.ts b/apps/sim/tools/browser_use/run_task.ts index 9dbeeb5b6f..9422282a00 100644 --- a/apps/sim/tools/browser_use/run_task.ts +++ b/apps/sim/tools/browser_use/run_task.ts @@ -1,11 +1,12 @@ import { createLogger } from '@sim/logger' +import { getMaxExecutionTimeout } from '@/lib/core/execution-limits' import type { BrowserUseRunTaskParams, BrowserUseRunTaskResponse } from '@/tools/browser_use/types' import type { ToolConfig, ToolResponse } from '@/tools/types' const logger = createLogger('BrowserUseTool') const POLL_INTERVAL_MS = 5000 -const MAX_POLL_TIME_MS = 600000 // 10 minutes +const MAX_POLL_TIME_MS = getMaxExecutionTimeout() const MAX_CONSECUTIVE_ERRORS = 3 async function createSessionWithProfile( diff --git a/apps/sim/tools/dropbox/download.ts b/apps/sim/tools/dropbox/download.ts index 8adf286e6f..487f705e9e 100644 --- a/apps/sim/tools/dropbox/download.ts +++ b/apps/sim/tools/dropbox/download.ts @@ -1,16 +1,7 @@ +import { httpHeaderSafeJson } from '@/lib/core/utils/validation' import type { DropboxDownloadParams, DropboxDownloadResponse } from '@/tools/dropbox/types' import type { ToolConfig } from '@/tools/types' -/** - * Escapes non-ASCII characters in JSON string for HTTP header safety. - * Dropbox API requires characters 0x7F and all non-ASCII to be escaped as \uXXXX. - */ -function httpHeaderSafeJson(value: object): string { - return JSON.stringify(value).replace(/[\u007f-\uffff]/g, (c) => { - return '\\u' + ('0000' + c.charCodeAt(0).toString(16)).slice(-4) - }) -} - export const dropboxDownloadTool: ToolConfig = { id: 'dropbox_download', name: 'Dropbox Download File', diff --git a/apps/sim/tools/exa/research.ts b/apps/sim/tools/exa/research.ts index 95f08cd074..8af21576f7 100644 --- a/apps/sim/tools/exa/research.ts +++ b/apps/sim/tools/exa/research.ts @@ -1,11 +1,12 @@ import { createLogger } from '@sim/logger' +import { DEFAULT_EXECUTION_TIMEOUT_MS } from '@/lib/core/execution-limits' import type { ExaResearchParams, ExaResearchResponse } from '@/tools/exa/types' import type { ToolConfig } from '@/tools/types' const logger = createLogger('ExaResearchTool') -const POLL_INTERVAL_MS = 5000 // 5 seconds between polls -const MAX_POLL_TIME_MS = 300000 // 5 minutes maximum polling time +const POLL_INTERVAL_MS = 5000 +const MAX_POLL_TIME_MS = DEFAULT_EXECUTION_TIMEOUT_MS export const researchTool: ToolConfig = { id: 'exa_research', diff --git a/apps/sim/tools/firecrawl/agent.ts b/apps/sim/tools/firecrawl/agent.ts index 9b5c2e6914..bd4b4f41e4 100644 --- a/apps/sim/tools/firecrawl/agent.ts +++ b/apps/sim/tools/firecrawl/agent.ts @@ -1,11 +1,12 @@ import { createLogger } from '@sim/logger' +import { DEFAULT_EXECUTION_TIMEOUT_MS } from '@/lib/core/execution-limits' import type { AgentParams, AgentResponse } from '@/tools/firecrawl/types' import type { ToolConfig } from '@/tools/types' const logger = createLogger('FirecrawlAgentTool') -const POLL_INTERVAL_MS = 5000 // 5 seconds between polls -const MAX_POLL_TIME_MS = 300000 // 5 minutes maximum polling time +const POLL_INTERVAL_MS = 5000 +const MAX_POLL_TIME_MS = DEFAULT_EXECUTION_TIMEOUT_MS export const agentTool: ToolConfig = { id: 'firecrawl_agent', diff --git a/apps/sim/tools/firecrawl/crawl.ts b/apps/sim/tools/firecrawl/crawl.ts index d490994ff2..04886632d7 100644 --- a/apps/sim/tools/firecrawl/crawl.ts +++ b/apps/sim/tools/firecrawl/crawl.ts @@ -1,12 +1,13 @@ import { createLogger } from '@sim/logger' +import { DEFAULT_EXECUTION_TIMEOUT_MS } from '@/lib/core/execution-limits' import type { FirecrawlCrawlParams, FirecrawlCrawlResponse } from '@/tools/firecrawl/types' import { CRAWLED_PAGE_OUTPUT_PROPERTIES } from '@/tools/firecrawl/types' import type { ToolConfig } from '@/tools/types' const logger = createLogger('FirecrawlCrawlTool') -const POLL_INTERVAL_MS = 5000 // 5 seconds between polls -const MAX_POLL_TIME_MS = 300000 // 5 minutes maximum polling time +const POLL_INTERVAL_MS = 5000 +const MAX_POLL_TIME_MS = DEFAULT_EXECUTION_TIMEOUT_MS export const crawlTool: ToolConfig = { id: 'firecrawl_crawl', diff --git a/apps/sim/tools/firecrawl/extract.ts b/apps/sim/tools/firecrawl/extract.ts index 86d76d5024..a82aa3eb24 100644 --- a/apps/sim/tools/firecrawl/extract.ts +++ b/apps/sim/tools/firecrawl/extract.ts @@ -1,11 +1,12 @@ import { createLogger } from '@sim/logger' +import { DEFAULT_EXECUTION_TIMEOUT_MS } from '@/lib/core/execution-limits' import type { ExtractParams, ExtractResponse } from '@/tools/firecrawl/types' import type { ToolConfig } from '@/tools/types' const logger = createLogger('FirecrawlExtractTool') -const POLL_INTERVAL_MS = 5000 // 5 seconds between polls -const MAX_POLL_TIME_MS = 300000 // 5 minutes maximum polling time +const POLL_INTERVAL_MS = 5000 +const MAX_POLL_TIME_MS = DEFAULT_EXECUTION_TIMEOUT_MS export const extractTool: ToolConfig = { id: 'firecrawl_extract', diff --git a/apps/sim/tools/index.ts b/apps/sim/tools/index.ts index f497a5cf0e..77e9b4d765 100644 --- a/apps/sim/tools/index.ts +++ b/apps/sim/tools/index.ts @@ -1,5 +1,6 @@ import { createLogger } from '@sim/logger' import { generateInternalToken } from '@/lib/auth/internal' +import { DEFAULT_EXECUTION_TIMEOUT_MS } from '@/lib/core/execution-limits' import { secureFetchWithPinnedIP, validateUrlWithDNS, @@ -628,9 +629,8 @@ async function executeToolRequest( let response: Response if (isInternalRoute) { - // Set up AbortController for timeout support on internal routes const controller = new AbortController() - const timeout = requestParams.timeout || 300000 + const timeout = requestParams.timeout || DEFAULT_EXECUTION_TIMEOUT_MS const timeoutId = setTimeout(() => controller.abort(), timeout) try { diff --git a/apps/sim/tools/utils.ts b/apps/sim/tools/utils.ts index 1cfc4b42f5..e5364e415b 100644 --- a/apps/sim/tools/utils.ts +++ b/apps/sim/tools/utils.ts @@ -1,4 +1,5 @@ import { createLogger } from '@sim/logger' +import { getMaxExecutionTimeout } from '@/lib/core/execution-limits' import { getBaseUrl } from '@/lib/core/utils/urls' import { AGENT, isCustomTool } from '@/executor/constants' import { getCustomTool } from '@/hooks/queries/custom-tools' @@ -122,9 +123,7 @@ export function formatRequestParams(tool: ToolConfig, params: Record