diff --git a/.changeset/big-ants-act.md b/.changeset/big-ants-act.md new file mode 100644 index 0000000000..653c97ed44 --- /dev/null +++ b/.changeset/big-ants-act.md @@ -0,0 +1,6 @@ +--- +"trigger.dev": patch +"@trigger.dev/core": patch +--- + +Move max duration handling into the parent process diff --git a/packages/cli-v3/src/entryPoints/dev-run-worker.ts b/packages/cli-v3/src/entryPoints/dev-run-worker.ts index 9239f2b2bd..e02d9f8e44 100644 --- a/packages/cli-v3/src/entryPoints/dev-run-worker.ts +++ b/packages/cli-v3/src/entryPoints/dev-run-worker.ts @@ -128,6 +128,17 @@ usage.setGlobalUsageManager(devUsageManager); const usageTimeoutManager = new UsageTimeoutManager(devUsageManager); timeout.setGlobalManager(usageTimeoutManager); +// Register listener to send IPC message when max duration is exceeded +timeout.registerListener(async (maxDurationInSeconds, elapsedTimeInSeconds) => { + log( + `[${new Date().toISOString()}] Max duration exceeded: ${maxDurationInSeconds}s, elapsed: ${elapsedTimeInSeconds}s` + ); + await zodIpc.send("MAX_DURATION_EXCEEDED", { + maxDurationInSeconds, + elapsedTimeInSeconds, + }); +}); + const standardResourceCatalog = new StandardResourceCatalog(); resourceCatalog.setGlobalResourceCatalog(standardResourceCatalog); diff --git a/packages/cli-v3/src/entryPoints/managed-run-worker.ts b/packages/cli-v3/src/entryPoints/managed-run-worker.ts index a9c593d720..09138fb82a 100644 --- a/packages/cli-v3/src/entryPoints/managed-run-worker.ts +++ b/packages/cli-v3/src/entryPoints/managed-run-worker.ts @@ -726,6 +726,17 @@ function initializeUsageManager({ usage.setGlobalUsageManager(prodUsageManager); timeout.setGlobalManager(new UsageTimeoutManager(devUsageManager)); + // Register listener to send IPC message when max duration is exceeded + timeout.registerListener(async (maxDurationInSeconds, elapsedTimeInSeconds) => { + console.log( + `[${new Date().toISOString()}] Max duration exceeded: ${maxDurationInSeconds}s, elapsed: ${elapsedTimeInSeconds}s` + ); + await zodIpc.send("MAX_DURATION_EXCEEDED", { + maxDurationInSeconds, + elapsedTimeInSeconds, + }); + }); + return prodUsageManager; } diff --git a/packages/cli-v3/src/executions/taskRunProcess.ts b/packages/cli-v3/src/executions/taskRunProcess.ts index be971294aa..6be1488aee 100644 --- a/packages/cli-v3/src/executions/taskRunProcess.ts +++ b/packages/cli-v3/src/executions/taskRunProcess.ts @@ -30,6 +30,7 @@ import { CleanupProcessError, internalErrorFromUnexpectedExit, GracefulExitTimeoutError, + MaxDurationExceededError, UnexpectedExitError, SuspendedProcessError, } from "@trigger.dev/core/v3/errors"; @@ -74,6 +75,8 @@ export class TaskRunProcess { private _isBeingKilled: boolean = false; private _isBeingCancelled: boolean = false; private _isBeingSuspended: boolean = false; + private _isMaxDurationExceeded: boolean = false; + private _maxDurationInfo?: { maxDurationInSeconds: number; elapsedTimeInSeconds: number }; private _stderr: Array = []; public onTaskRunHeartbeat: Evt = new Evt(); @@ -209,6 +212,23 @@ export class TaskRunProcess { SET_SUSPENDABLE: async (message) => { this.onSetSuspendable.post(message); }, + MAX_DURATION_EXCEEDED: async (message) => { + logger.debug("max duration exceeded, gracefully terminating child process", { + maxDurationInSeconds: message.maxDurationInSeconds, + elapsedTimeInSeconds: message.elapsedTimeInSeconds, + pid: this.pid, + }); + + // Set flag and store duration info for error reporting in #handleExit + this._isMaxDurationExceeded = true; + this._maxDurationInfo = { + maxDurationInSeconds: message.maxDurationInSeconds, + elapsedTimeInSeconds: message.elapsedTimeInSeconds, + }; + + // Use the same graceful termination approach as cancel + await this.#gracefullyTerminate(this.options.gracefulTerminationTimeoutInMs); + }, }, }); @@ -319,7 +339,25 @@ export class TaskRunProcess { const { rejecter } = attemptPromise; - if (this._isBeingCancelled) { + if (this._isMaxDurationExceeded) { + if (!this._maxDurationInfo) { + rejecter( + new UnexpectedExitError( + code ?? -1, + signal, + "MaxDuration flag set but duration info missing" + ) + ); + continue; + } + + rejecter( + new MaxDurationExceededError( + this._maxDurationInfo.maxDurationInSeconds, + this._maxDurationInfo.elapsedTimeInSeconds + ) + ); + } else if (this._isBeingCancelled) { rejecter(new CancelledProcessError()); } else if (this._gracefulExitTimeoutElapsed) { // Order matters, this has to be before the graceful exit timeout @@ -477,6 +515,14 @@ export class TaskRunProcess { }; } + if (error instanceof MaxDurationExceededError) { + return { + type: "INTERNAL_ERROR", + code: TaskRunErrorCodes.MAX_DURATION_EXCEEDED, + message: error.message, + }; + } + if (error instanceof CleanupProcessError) { return { type: "INTERNAL_ERROR", diff --git a/packages/core/src/v3/errors.ts b/packages/core/src/v3/errors.ts index 077289a659..fd03bf445f 100644 --- a/packages/core/src/v3/errors.ts +++ b/packages/core/src/v3/errors.ts @@ -557,6 +557,17 @@ export class GracefulExitTimeoutError extends Error { } } +export class MaxDurationExceededError extends Error { + constructor( + public readonly maxDurationInSeconds: number, + public readonly elapsedTimeInSeconds: number + ) { + super(`Run exceeded maximum compute time (maxDuration) of ${maxDurationInSeconds} seconds`); + + this.name = "MaxDurationExceededError"; + } +} + type ErrorLink = { name: string; href: string; diff --git a/packages/core/src/v3/schemas/messages.ts b/packages/core/src/v3/schemas/messages.ts index ebe3dc39b3..c635e57445 100644 --- a/packages/core/src/v3/schemas/messages.ts +++ b/packages/core/src/v3/schemas/messages.ts @@ -189,6 +189,13 @@ export const ExecutorToWorkerMessageCatalog = { suspendable: z.boolean(), }), }, + MAX_DURATION_EXCEEDED: { + message: z.object({ + version: z.literal("v1").default("v1"), + maxDurationInSeconds: z.number(), + elapsedTimeInSeconds: z.number(), + }), + }, }; export const WorkerToExecutorMessageCatalog = { diff --git a/packages/core/src/v3/timeout/api.ts b/packages/core/src/v3/timeout/api.ts index 63ddb48db3..dd211bc42c 100644 --- a/packages/core/src/v3/timeout/api.ts +++ b/packages/core/src/v3/timeout/api.ts @@ -47,6 +47,13 @@ export class TimeoutAPI implements TimeoutManager { this.disable(); } + public registerListener(listener: (timeoutInSeconds: number, elapsedTimeInSeconds: number) => void | Promise) { + const manager = this.#getManager(); + if (manager.registerListener) { + manager.registerListener(listener); + } + } + #getManager(): TimeoutManager { return getGlobal(API_NAME) ?? NOOP_TIMEOUT_MANAGER; } diff --git a/packages/core/src/v3/timeout/types.ts b/packages/core/src/v3/timeout/types.ts index 0ea0b8fe34..6978689676 100644 --- a/packages/core/src/v3/timeout/types.ts +++ b/packages/core/src/v3/timeout/types.ts @@ -2,6 +2,7 @@ export interface TimeoutManager { abortAfterTimeout: (timeoutInSeconds?: number) => AbortController; signal?: AbortSignal; reset: () => void; + registerListener?: (listener: (timeoutInSeconds: number, elapsedTimeInSeconds: number) => void | Promise) => void; } export class TaskRunExceededMaxDuration extends Error { diff --git a/packages/core/src/v3/timeout/usageTimeoutManager.ts b/packages/core/src/v3/timeout/usageTimeoutManager.ts index ec14ffe6cc..3a6a196dce 100644 --- a/packages/core/src/v3/timeout/usageTimeoutManager.ts +++ b/packages/core/src/v3/timeout/usageTimeoutManager.ts @@ -5,11 +5,21 @@ export class UsageTimeoutManager implements TimeoutManager { private _abortController: AbortController; private _abortSignal: AbortSignal | undefined; private _intervalId: NodeJS.Timeout | undefined; + private _listener?: ( + timeoutInSeconds: number, + elapsedTimeInSeconds: number + ) => void | Promise; constructor(private readonly usageManager: UsageManager) { this._abortController = new AbortController(); } + registerListener( + listener: (timeoutInSeconds: number, elapsedTimeInSeconds: number) => void | Promise + ): void { + this._listener = listener; + } + get signal(): AbortSignal | undefined { return this._abortSignal; } @@ -42,8 +52,15 @@ export class UsageTimeoutManager implements TimeoutManager { if (sample.cpuTime > timeoutInSeconds * 1000) { clearInterval(this._intervalId); + const elapsedTimeInSeconds = sample.cpuTime / 1000; + + // Call the listener if registered + if (this._listener) { + void this._listener(timeoutInSeconds, elapsedTimeInSeconds); + } + this._abortController.abort( - new TaskRunExceededMaxDuration(timeoutInSeconds, sample.cpuTime / 1000) + new TaskRunExceededMaxDuration(timeoutInSeconds, elapsedTimeInSeconds) ); } } diff --git a/packages/core/src/v3/workers/taskExecutor.ts b/packages/core/src/v3/workers/taskExecutor.ts index 6de4c77a5f..ca724744a5 100644 --- a/packages/core/src/v3/workers/taskExecutor.ts +++ b/packages/core/src/v3/workers/taskExecutor.ts @@ -3,7 +3,6 @@ import { promiseWithResolvers } from "../../utils.js"; import { ApiError, RateLimitError } from "../apiClient/errors.js"; import { ConsoleInterceptor } from "../consoleInterceptor.js"; import { - InternalError, isCompleteTaskWithOutput, isInternalError, parseError, @@ -419,29 +418,13 @@ export class TaskExecutor { throw new Error("Task does not have a run function"); } - // Create a promise that rejects when the signal aborts - const abortPromise = new Promise((_, reject) => { - signal.addEventListener("abort", () => { - if (typeof signal.reason === "string" && signal.reason.includes("cancel")) { - return; - } - - const maxDuration = ctx.run.maxDuration; - reject( - new InternalError({ - code: TaskRunErrorCodes.MAX_DURATION_EXCEEDED, - message: `Run exceeded maximum compute time (maxDuration) of ${maxDuration} seconds`, - }) - ); - }); - }); - return runTimelineMetrics.measureMetric("trigger.dev/execution", "run", async () => { return await this._tracer.startActiveSpan( "run()", async (span) => { - // Race between the run function and the abort promise - return await Promise.race([runFn(payload, { ctx, init, signal }), abortPromise]); + // maxDuration is now enforced by killing the process, not by Promise.race + // The signal is still passed to runFn for cancellation and other abort conditions + return await runFn(payload, { ctx, init, signal }); }, { attributes: { [SemanticInternalAttributes.STYLE_ICON]: "task-fn-run" }, diff --git a/packages/core/test/taskExecutor.test.ts b/packages/core/test/taskExecutor.test.ts index 229a952fff..0bb4b51bf8 100644 --- a/packages/core/test/taskExecutor.test.ts +++ b/packages/core/test/taskExecutor.test.ts @@ -1459,93 +1459,6 @@ describe("TaskExecutor", () => { }); }); - test("should handle max duration abort signal and call hooks in correct order", async () => { - const executionOrder: string[] = []; - const maxDurationSeconds = 1000; - - // Create an abort controller that we'll trigger manually - const controller = new AbortController(); - - // Register global init hook - lifecycleHooks.registerGlobalInitHook({ - id: "test-init", - fn: async () => { - executionOrder.push("init"); - return { - foo: "bar", - }; - }, - }); - - // Register failure hook - lifecycleHooks.registerGlobalFailureHook({ - id: "global-failure", - fn: async ({ error }) => { - executionOrder.push("failure"); - expect((error as Error).message).toBe( - `Run exceeded maximum compute time (maxDuration) of ${maxDurationSeconds} seconds` - ); - }, - }); - - // Register complete hook - lifecycleHooks.registerGlobalCompleteHook({ - id: "global-complete", - fn: async ({ result }) => { - executionOrder.push("complete"); - expect(result.ok).toBe(false); - }, - }); - - // Register cleanup hook - lifecycleHooks.registerGlobalCleanupHook({ - id: "global-cleanup", - fn: async () => { - executionOrder.push("cleanup"); - }, - }); - - const task = { - id: "test-task", - fns: { - run: async (payload: any, params: RunFnParams) => { - executionOrder.push("run-start"); - - // Create a promise that never resolves - await new Promise((resolve) => { - // Trigger abort after a small delay - setTimeout(() => { - controller.abort(); - }, 10); - }); - - // This should never be reached - executionOrder.push("run-end"); - }, - }, - }; - - const result = await executeTask(task, { test: "data" }, controller.signal); - - // Verify hooks were called in correct order - expect(executionOrder).toEqual(["init", "run-start", "failure", "complete", "cleanup"]); - - // Verify the error result - expect(result).toEqual({ - result: { - ok: false, - id: "test-run-id", - error: { - type: "INTERNAL_ERROR", - code: TaskRunErrorCodes.MAX_DURATION_EXCEEDED, - message: "Run exceeded maximum compute time (maxDuration) of 1000 seconds", - stackTrace: expect.any(String), - }, - skippedRetrying: false, - }, - }); - }); - test("should call onWait and onResume hooks in correct order with proper data", async () => { const executionOrder: string[] = []; const waitData = { type: "task", runId: "test-run-id" } as const; diff --git a/packages/rsc/src/package.json b/packages/rsc/src/package.json deleted file mode 100644 index 3dbc1ca591..0000000000 --- a/packages/rsc/src/package.json +++ /dev/null @@ -1,3 +0,0 @@ -{ - "type": "module" -}