Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changeset/big-ants-act.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
"trigger.dev": patch
"@trigger.dev/core": patch
---

Move max duration handling into the parent process
11 changes: 11 additions & 0 deletions packages/cli-v3/src/entryPoints/dev-run-worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,17 @@ usage.setGlobalUsageManager(devUsageManager);
const usageTimeoutManager = new UsageTimeoutManager(devUsageManager);
timeout.setGlobalManager(usageTimeoutManager);

// Register listener to send IPC message when max duration is exceeded
timeout.registerListener(async (maxDurationInSeconds, elapsedTimeInSeconds) => {
log(
`[${new Date().toISOString()}] Max duration exceeded: ${maxDurationInSeconds}s, elapsed: ${elapsedTimeInSeconds}s`
);
await zodIpc.send("MAX_DURATION_EXCEEDED", {
maxDurationInSeconds,
elapsedTimeInSeconds,
});
});

const standardResourceCatalog = new StandardResourceCatalog();
resourceCatalog.setGlobalResourceCatalog(standardResourceCatalog);

Expand Down
11 changes: 11 additions & 0 deletions packages/cli-v3/src/entryPoints/managed-run-worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -726,6 +726,17 @@ function initializeUsageManager({
usage.setGlobalUsageManager(prodUsageManager);
timeout.setGlobalManager(new UsageTimeoutManager(devUsageManager));

// Register listener to send IPC message when max duration is exceeded
timeout.registerListener(async (maxDurationInSeconds, elapsedTimeInSeconds) => {
console.log(
`[${new Date().toISOString()}] Max duration exceeded: ${maxDurationInSeconds}s, elapsed: ${elapsedTimeInSeconds}s`
);
await zodIpc.send("MAX_DURATION_EXCEEDED", {
maxDurationInSeconds,
elapsedTimeInSeconds,
});
});

return prodUsageManager;
}

Expand Down
48 changes: 47 additions & 1 deletion packages/cli-v3/src/executions/taskRunProcess.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import {
CleanupProcessError,
internalErrorFromUnexpectedExit,
GracefulExitTimeoutError,
MaxDurationExceededError,
UnexpectedExitError,
SuspendedProcessError,
} from "@trigger.dev/core/v3/errors";
Expand Down Expand Up @@ -74,6 +75,8 @@ export class TaskRunProcess {
private _isBeingKilled: boolean = false;
private _isBeingCancelled: boolean = false;
private _isBeingSuspended: boolean = false;
private _isMaxDurationExceeded: boolean = false;
private _maxDurationInfo?: { maxDurationInSeconds: number; elapsedTimeInSeconds: number };
private _stderr: Array<string> = [];

public onTaskRunHeartbeat: Evt<string> = new Evt();
Expand Down Expand Up @@ -209,6 +212,23 @@ export class TaskRunProcess {
SET_SUSPENDABLE: async (message) => {
this.onSetSuspendable.post(message);
},
MAX_DURATION_EXCEEDED: async (message) => {
logger.debug("max duration exceeded, gracefully terminating child process", {
maxDurationInSeconds: message.maxDurationInSeconds,
elapsedTimeInSeconds: message.elapsedTimeInSeconds,
pid: this.pid,
});

// Set flag and store duration info for error reporting in #handleExit
this._isMaxDurationExceeded = true;
this._maxDurationInfo = {
maxDurationInSeconds: message.maxDurationInSeconds,
elapsedTimeInSeconds: message.elapsedTimeInSeconds,
};

// Use the same graceful termination approach as cancel
await this.#gracefullyTerminate(this.options.gracefulTerminationTimeoutInMs);
},
},
});

Expand Down Expand Up @@ -319,7 +339,25 @@ export class TaskRunProcess {

const { rejecter } = attemptPromise;

if (this._isBeingCancelled) {
if (this._isMaxDurationExceeded) {
if (!this._maxDurationInfo) {
rejecter(
new UnexpectedExitError(
code ?? -1,
signal,
"MaxDuration flag set but duration info missing"
)
);
continue;
}

rejecter(
new MaxDurationExceededError(
this._maxDurationInfo.maxDurationInSeconds,
this._maxDurationInfo.elapsedTimeInSeconds
)
);
} else if (this._isBeingCancelled) {
rejecter(new CancelledProcessError());
} else if (this._gracefulExitTimeoutElapsed) {
// Order matters, this has to be before the graceful exit timeout
Expand Down Expand Up @@ -477,6 +515,14 @@ export class TaskRunProcess {
};
}

if (error instanceof MaxDurationExceededError) {
return {
type: "INTERNAL_ERROR",
code: TaskRunErrorCodes.MAX_DURATION_EXCEEDED,
message: error.message,
};
}

if (error instanceof CleanupProcessError) {
return {
type: "INTERNAL_ERROR",
Expand Down
11 changes: 11 additions & 0 deletions packages/core/src/v3/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -557,6 +557,17 @@ export class GracefulExitTimeoutError extends Error {
}
}

export class MaxDurationExceededError extends Error {
constructor(
public readonly maxDurationInSeconds: number,
public readonly elapsedTimeInSeconds: number
) {
super(`Run exceeded maximum compute time (maxDuration) of ${maxDurationInSeconds} seconds`);

this.name = "MaxDurationExceededError";
}
}

type ErrorLink = {
name: string;
href: string;
Expand Down
7 changes: 7 additions & 0 deletions packages/core/src/v3/schemas/messages.ts
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,13 @@ export const ExecutorToWorkerMessageCatalog = {
suspendable: z.boolean(),
}),
},
MAX_DURATION_EXCEEDED: {
message: z.object({
version: z.literal("v1").default("v1"),
maxDurationInSeconds: z.number(),
elapsedTimeInSeconds: z.number(),
}),
},
};

export const WorkerToExecutorMessageCatalog = {
Expand Down
7 changes: 7 additions & 0 deletions packages/core/src/v3/timeout/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,13 @@ export class TimeoutAPI implements TimeoutManager {
this.disable();
}

public registerListener(listener: (timeoutInSeconds: number, elapsedTimeInSeconds: number) => void | Promise<void>) {
const manager = this.#getManager();
if (manager.registerListener) {
manager.registerListener(listener);
}
}

#getManager(): TimeoutManager {
return getGlobal(API_NAME) ?? NOOP_TIMEOUT_MANAGER;
}
Expand Down
1 change: 1 addition & 0 deletions packages/core/src/v3/timeout/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ export interface TimeoutManager {
abortAfterTimeout: (timeoutInSeconds?: number) => AbortController;
signal?: AbortSignal;
reset: () => void;
registerListener?: (listener: (timeoutInSeconds: number, elapsedTimeInSeconds: number) => void | Promise<void>) => void;
}

export class TaskRunExceededMaxDuration extends Error {
Expand Down
19 changes: 18 additions & 1 deletion packages/core/src/v3/timeout/usageTimeoutManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,21 @@ export class UsageTimeoutManager implements TimeoutManager {
private _abortController: AbortController;
private _abortSignal: AbortSignal | undefined;
private _intervalId: NodeJS.Timeout | undefined;
private _listener?: (
timeoutInSeconds: number,
elapsedTimeInSeconds: number
) => void | Promise<void>;

constructor(private readonly usageManager: UsageManager) {
this._abortController = new AbortController();
}

registerListener(
listener: (timeoutInSeconds: number, elapsedTimeInSeconds: number) => void | Promise<void>
): void {
this._listener = listener;
}

get signal(): AbortSignal | undefined {
return this._abortSignal;
}
Expand Down Expand Up @@ -42,8 +52,15 @@ export class UsageTimeoutManager implements TimeoutManager {
if (sample.cpuTime > timeoutInSeconds * 1000) {
clearInterval(this._intervalId);

const elapsedTimeInSeconds = sample.cpuTime / 1000;

// Call the listener if registered
if (this._listener) {
void this._listener(timeoutInSeconds, elapsedTimeInSeconds);
}

this._abortController.abort(
new TaskRunExceededMaxDuration(timeoutInSeconds, sample.cpuTime / 1000)
new TaskRunExceededMaxDuration(timeoutInSeconds, elapsedTimeInSeconds)
);
}
}
Expand Down
23 changes: 3 additions & 20 deletions packages/core/src/v3/workers/taskExecutor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ import { promiseWithResolvers } from "../../utils.js";
import { ApiError, RateLimitError } from "../apiClient/errors.js";
import { ConsoleInterceptor } from "../consoleInterceptor.js";
import {
InternalError,
isCompleteTaskWithOutput,
isInternalError,
parseError,
Expand Down Expand Up @@ -419,29 +418,13 @@ export class TaskExecutor {
throw new Error("Task does not have a run function");
}

// Create a promise that rejects when the signal aborts
const abortPromise = new Promise((_, reject) => {
signal.addEventListener("abort", () => {
if (typeof signal.reason === "string" && signal.reason.includes("cancel")) {
return;
}

const maxDuration = ctx.run.maxDuration;
reject(
new InternalError({
code: TaskRunErrorCodes.MAX_DURATION_EXCEEDED,
message: `Run exceeded maximum compute time (maxDuration) of ${maxDuration} seconds`,
})
);
});
});

return runTimelineMetrics.measureMetric("trigger.dev/execution", "run", async () => {
return await this._tracer.startActiveSpan(
"run()",
async (span) => {
// Race between the run function and the abort promise
return await Promise.race([runFn(payload, { ctx, init, signal }), abortPromise]);
// maxDuration is now enforced by killing the process, not by Promise.race
// The signal is still passed to runFn for cancellation and other abort conditions
return await runFn(payload, { ctx, init, signal });
},
{
attributes: { [SemanticInternalAttributes.STYLE_ICON]: "task-fn-run" },
Expand Down
87 changes: 0 additions & 87 deletions packages/core/test/taskExecutor.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1459,93 +1459,6 @@ describe("TaskExecutor", () => {
});
});

test("should handle max duration abort signal and call hooks in correct order", async () => {
const executionOrder: string[] = [];
const maxDurationSeconds = 1000;

// Create an abort controller that we'll trigger manually
const controller = new AbortController();

// Register global init hook
lifecycleHooks.registerGlobalInitHook({
id: "test-init",
fn: async () => {
executionOrder.push("init");
return {
foo: "bar",
};
},
});

// Register failure hook
lifecycleHooks.registerGlobalFailureHook({
id: "global-failure",
fn: async ({ error }) => {
executionOrder.push("failure");
expect((error as Error).message).toBe(
`Run exceeded maximum compute time (maxDuration) of ${maxDurationSeconds} seconds`
);
},
});

// Register complete hook
lifecycleHooks.registerGlobalCompleteHook({
id: "global-complete",
fn: async ({ result }) => {
executionOrder.push("complete");
expect(result.ok).toBe(false);
},
});

// Register cleanup hook
lifecycleHooks.registerGlobalCleanupHook({
id: "global-cleanup",
fn: async () => {
executionOrder.push("cleanup");
},
});

const task = {
id: "test-task",
fns: {
run: async (payload: any, params: RunFnParams<any>) => {
executionOrder.push("run-start");

// Create a promise that never resolves
await new Promise((resolve) => {
// Trigger abort after a small delay
setTimeout(() => {
controller.abort();
}, 10);
});

// This should never be reached
executionOrder.push("run-end");
},
},
};

const result = await executeTask(task, { test: "data" }, controller.signal);

// Verify hooks were called in correct order
expect(executionOrder).toEqual(["init", "run-start", "failure", "complete", "cleanup"]);

// Verify the error result
expect(result).toEqual({
result: {
ok: false,
id: "test-run-id",
error: {
type: "INTERNAL_ERROR",
code: TaskRunErrorCodes.MAX_DURATION_EXCEEDED,
message: "Run exceeded maximum compute time (maxDuration) of 1000 seconds",
stackTrace: expect.any(String),
},
skippedRetrying: false,
},
});
});

test("should call onWait and onResume hooks in correct order with proper data", async () => {
const executionOrder: string[] = [];
const waitData = { type: "task", runId: "test-run-id" } as const;
Expand Down
3 changes: 0 additions & 3 deletions packages/rsc/src/package.json

This file was deleted.