Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changeset/poor-files-taste.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
"trigger.dev": patch
"@trigger.dev/core": patch
---

Fix SIGTERM handling during warm start long poll
13 changes: 10 additions & 3 deletions packages/cli-v3/src/entryPoints/managed/controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -106,11 +106,18 @@ export class ManagedRunController {
message: "Received SIGTERM, stopping worker",
});

// Disable warm starts
// Disable warm starts - prevents new warm start requests
this.warmStartEnabled = false;

// ..now we wait for any active runs to finish
// SIGKILL will handle the rest, nothing to do here
// Abort any ongoing warm start long poll - immediately stops waiting for next run
// This prevents the scenario where:
// 1. SIGTERM kills a prepared child process
// 2. Warm start poll returns a new run
// 3. Controller tries to use the dead child process
this.warmStartClient?.abort();

// Now we wait for any active runs to finish gracefully
// SIGKILL will handle forced termination after termination grace period
});
}

Expand Down
108 changes: 77 additions & 31 deletions packages/core/src/v3/workers/warmStartClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ export class WarmStartClient {
private readonly logger = new SimpleStructuredLogger("warm-start-client");
private readonly apiUrl: URL;
private backoff = new ExponentialBackoff("FullJitter");
private abortController: AbortController | null = null;

private get connectUrl() {
return new URL("/connect", this.apiUrl);
Expand All @@ -30,6 +31,30 @@ export class WarmStartClient {
this.apiUrl = opts.apiUrl;
}

abort() {
if (!this.abortController) {
this.logger.warn("Abort called but no abort controller exists");
return;
}

this.abortController.abort();
this.abortController = null;
}

private async withAbort<T>(fn: (signal: AbortSignal) => Promise<T>): Promise<T> {
if (this.abortController) {
throw new Error("A warm start is already in progress");
}

this.abortController = new AbortController();

try {
return await fn(this.abortController.signal);
} finally {
this.abortController = null;
}
}

async connect(): Promise<ApiResult<WarmStartConnectResponse>> {
return wrapZodFetch(
WarmStartConnectResponse,
Expand Down Expand Up @@ -61,39 +86,42 @@ export class WarmStartClient {
connectionTimeoutMs: number;
keepaliveMs: number;
}): Promise<DequeuedMessage | null> {
const res = await this.longPoll<unknown>(
this.warmStartUrl.href,
{
method: "GET",
headers: {
"x-trigger-workload-controller-id": this.opts.controllerId,
"x-trigger-deployment-id": this.opts.deploymentId,
"x-trigger-deployment-version": this.opts.deploymentVersion,
"x-trigger-machine-cpu": this.opts.machineCpu,
"x-trigger-machine-memory": this.opts.machineMemory,
"x-trigger-worker-instance-name": workerInstanceName,
return this.withAbort(async (abortSignal) => {
const res = await this.longPoll<unknown>(
this.warmStartUrl.href,
{
method: "GET",
headers: {
"x-trigger-workload-controller-id": this.opts.controllerId,
"x-trigger-deployment-id": this.opts.deploymentId,
"x-trigger-deployment-version": this.opts.deploymentVersion,
"x-trigger-machine-cpu": this.opts.machineCpu,
"x-trigger-machine-memory": this.opts.machineMemory,
"x-trigger-worker-instance-name": workerInstanceName,
},
},
},
{
timeoutMs: connectionTimeoutMs,
totalDurationMs: keepaliveMs,
{
timeoutMs: connectionTimeoutMs,
totalDurationMs: keepaliveMs,
abortSignal,
}
);

if (!res.ok) {
this.logger.error("warmStart: failed", {
error: res.error,
connectionTimeoutMs,
keepaliveMs,
});
return null;
}
);

if (!res.ok) {
this.logger.error("warmStart: failed", {
error: res.error,
connectionTimeoutMs,
keepaliveMs,
});
return null;
}

const nextRun = DequeuedMessage.parse(res.data);
const nextRun = DequeuedMessage.parse(res.data);

this.logger.debug("warmStart: got next run", { nextRun });
this.logger.debug("warmStart: got next run", { nextRun });

return nextRun;
return nextRun;
});
}

private async longPoll<T = any>(
Expand All @@ -102,9 +130,11 @@ export class WarmStartClient {
{
timeoutMs,
totalDurationMs,
abortSignal,
}: {
timeoutMs: number;
totalDurationMs: number;
abortSignal: AbortSignal;
}
): Promise<
| {
Expand All @@ -123,11 +153,20 @@ export class WarmStartClient {
let retries = 0;

while (Date.now() < endTime) {
if (abortSignal.aborted) {
return {
ok: false,
error: "Aborted - abort signal triggered before fetch",
};
}

try {
const controller = new AbortController();
const signal = controller.signal;
const timeoutController = new AbortController();
const timeoutId = setTimeout(() => timeoutController.abort(), timeoutMs);

const timeoutId = setTimeout(() => controller.abort(), timeoutMs);
// Create compound signal that aborts on either timeout or parent abort
const signals = [timeoutController.signal, abortSignal];
const signal = AbortSignal.any(signals);

const response = await fetch(url, { ...requestInit, signal });

Expand All @@ -148,6 +187,13 @@ export class WarmStartClient {
}
} catch (error) {
if (error instanceof Error && error.name === "AbortError") {
// Check if this was a parent abort or just a timeout
if (abortSignal.aborted) {
return {
ok: false,
error: "Aborted - abort signal triggered during fetch",
};
}
this.logger.log("Long poll request timed out, retrying...");
continue;
} else {
Expand Down