Skip to content

Commit c633618

Browse files
committed
fix(webapp): make createDeploymentBackgroundWorker idempotent
CLI retries after a severed connection were colliding on the BackgroundWorker `(project, env, version)` unique index and burning the retry budget on 500s, leaving deployments to surface as "Indexing timed out" 8 minutes later. Look up an existing row on entry and return it on contentHash match (or 409 on drift); guard the final BUILDING → DEPLOYING transition with an updateMany so a sibling failure handler can't be silently overwritten.
1 parent 5083d16 commit c633618

6 files changed

Lines changed: 300 additions & 50 deletions
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
area: webapp
3+
type: fix
4+
---
5+
6+
Make `POST /api/v1/deployments/:deploymentId/background-workers` idempotent (for sequential requests only) so client-side retries no longer collide on the `BackgroundWorker` `(project, env, version)` unique index. Helps make deployments more resilient against the class of indexing failures that surfaces in the dashboard as "Indexing timed out", e.g. during transient database issues.

apps/webapp/app/v3/services/createBackgroundWorker.server.ts

Lines changed: 26 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -34,31 +34,7 @@ import { tryCatch } from "@trigger.dev/core/v3";
3434
import { engine } from "../runEngine.server";
3535
import { scheduleEngine } from "../scheduleEngine.server";
3636

37-
/**
38-
* Strip BackgroundWorkerMetadata down to the slice that's actually read after
39-
* storage. Everything else is duplicated to dedicated columns/tables
40-
* (BackgroundWorker.{contentHash,cliVersion,sdkVersion,runtime,runtimeVersion},
41-
* BackgroundWorkerTask, BackgroundWorkerFile, TaskQueue, Prompt). Today the
42-
* only post-write reader is changeCurrentDeployment.server.ts, which feeds
43-
* tasks[].schedule into syncDeclarativeSchedules. packageVersion, contentHash,
44-
* and tasks[].filePath are kept solely to satisfy BackgroundWorkerMetadata's
45-
* required fields when the column is parsed back.
46-
*/
47-
export function stripBackgroundWorkerMetadataForStorage(
48-
metadata: BackgroundWorkerMetadata
49-
): Prisma.InputJsonValue {
50-
return {
51-
packageVersion: metadata.packageVersion,
52-
contentHash: metadata.contentHash,
53-
tasks: metadata.tasks
54-
.filter((t) => t.schedule)
55-
.map((t) => ({
56-
id: t.id,
57-
filePath: t.filePath,
58-
schedule: t.schedule,
59-
})),
60-
};
61-
}
37+
export { stripBackgroundWorkerMetadataForStorage } from "./stripBackgroundWorkerMetadataForStorage.server";
6238

6339
export class CreateBackgroundWorkerService extends BaseService {
6440
private readonly _taskMetaCache: TaskMetadataCache;
@@ -356,8 +332,13 @@ async function createWorkerTask(
356332
prisma: PrismaClientOrTransaction,
357333
tasksToBackgroundFiles?: Map<string, string>
358334
): Promise<TaskMetadataEntry | null> {
335+
// Hoisted so the P2002 catch branch can return the same entry shape.
336+
let queue: TaskQueue | undefined;
337+
let resolvedTriggerSource: "SCHEDULED" | "AGENT" | "STANDARD" | undefined;
338+
let resolvedTtl: string | null | undefined;
339+
359340
try {
360-
let queue = queues.find((queue) => queue.name === task.queue?.name);
341+
queue = queues.find((queue) => queue.name === task.queue?.name);
361342

362343
if (!queue) {
363344
// Create a TaskQueue
@@ -374,14 +355,14 @@ async function createWorkerTask(
374355
);
375356
}
376357

377-
const resolvedTriggerSource =
358+
resolvedTriggerSource =
378359
task.triggerSource === "schedule"
379360
? ("SCHEDULED" as const)
380361
: task.triggerSource === "agent"
381362
? ("AGENT" as const)
382363
: ("STANDARD" as const);
383364

384-
const resolvedTtl =
365+
resolvedTtl =
385366
typeof task.ttl === "number" ? stringifyDuration(task.ttl) ?? null : task.ttl ?? null;
386367

387368
await prisma.backgroundWorkerTask.create({
@@ -418,10 +399,26 @@ async function createWorkerTask(
418399
if (error instanceof Prisma.PrismaClientKnownRequestError) {
419400
// The error code for unique constraint violation in Prisma is P2002
420401
if (error.code === "P2002") {
421-
logger.warn("Task already exists", {
402+
// Retry landing after the first attempt's row was already written.
403+
const existing = await prisma.backgroundWorkerTask.findFirst({
404+
where: { workerId: worker.id, slug: task.id },
405+
select: { id: true },
406+
});
407+
408+
logger.warn("Attempted to recreate background worker task", {
422409
task,
423410
worker,
424411
});
412+
413+
if (existing && queue && resolvedTriggerSource && resolvedTtl !== undefined) {
414+
return {
415+
slug: task.id,
416+
ttl: resolvedTtl,
417+
triggerSource: resolvedTriggerSource,
418+
queueId: queue.id,
419+
queueName: queue.name,
420+
};
421+
}
425422
} else {
426423
logger.error("Prisma Error creating background worker task", {
427424
error: {

apps/webapp/app/v3/services/createDeploymentBackgroundWorkerV4.server.ts

Lines changed: 38 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,19 @@
11
import { CreateBackgroundWorkerRequestBody, logger, tryCatch } from "@trigger.dev/core/v3";
2-
import { BackgroundWorkerId } from "@trigger.dev/core/v3/isomorphic";
3-
import type { BackgroundWorker, PrismaClientOrTransaction, WorkerDeployment } from "@trigger.dev/database";
2+
import type {
3+
BackgroundWorker,
4+
PrismaClientOrTransaction,
5+
WorkerDeployment,
6+
} from "@trigger.dev/database";
47
import { AuthenticatedEnvironment } from "~/services/apiAuth.server";
58
import { type TaskMetadataCache } from "~/services/taskMetadataCache.server";
69
import { taskMetadataCacheInstance } from "~/services/taskMetadataCacheInstance.server";
710
import { BaseService, ServiceValidationError } from "./baseService.server";
811
import {
912
createBackgroundFiles,
1013
createWorkerResources,
11-
stripBackgroundWorkerMetadataForStorage,
1214
syncDeclarativeSchedules,
1315
} from "./createBackgroundWorker.server";
16+
import { findOrCreateBackgroundWorker } from "./createDeploymentBackgroundWorkerV4/findOrCreateBackgroundWorker.server";
1417
import { TimeoutDeploymentService } from "./timeoutDeployment.server";
1518
import { env } from "~/env.server";
1619

@@ -50,6 +53,11 @@ export class CreateDeploymentBackgroundWorkerServiceV4 extends BaseService {
5053
});
5154

5255
if (!deployment) {
56+
logger.warn("createDeploymentBackgroundWorker: deployment not found", {
57+
deploymentId,
58+
environmentId: environment.id,
59+
projectId: environment.projectId,
60+
});
5361
return;
5462
}
5563

@@ -70,25 +78,21 @@ export class CreateDeploymentBackgroundWorkerServiceV4 extends BaseService {
7078
}
7179

7280
if (deployment.status !== "BUILDING") {
81+
logger.warn("createDeploymentBackgroundWorker: deployment not in BUILDING state", {
82+
deploymentId,
83+
deploymentStatus: deployment.status,
84+
environmentId: environment.id,
85+
projectId: environment.projectId,
86+
});
7387
return;
7488
}
7589

76-
const backgroundWorker = await this._prisma.backgroundWorker.create({
77-
data: {
78-
...BackgroundWorkerId.generate(),
79-
version: deployment.version,
80-
runtimeEnvironmentId: environment.id,
81-
projectId: environment.projectId,
82-
metadata: stripBackgroundWorkerMetadataForStorage(body.metadata),
83-
contentHash: body.metadata.contentHash,
84-
cliVersion: body.metadata.cliPackageVersion,
85-
sdkVersion: body.metadata.packageVersion,
86-
supportsLazyAttempts: body.supportsLazyAttempts,
87-
engine: body.engine,
88-
runtime: body.metadata.runtime,
89-
runtimeVersion: body.metadata.runtimeVersion,
90-
},
91-
});
90+
const backgroundWorker = await findOrCreateBackgroundWorker(
91+
environment,
92+
deployment,
93+
body,
94+
this._prisma
95+
);
9296

9397
//upgrade the project to engine "V2" if it's not already
9498
if (environment.project.engine === "V1" && body.engine === "V2") {
@@ -188,10 +192,11 @@ export class CreateDeploymentBackgroundWorkerServiceV4 extends BaseService {
188192
throw serviceError;
189193
}
190194

191-
// Link the deployment with the background worker
192-
await this._prisma.workerDeployment.update({
195+
// Guarded BUILDING → DEPLOYING transition. `updateMany` for optimistic concurrency control
196+
const { count: updatedCount } = await this._prisma.workerDeployment.updateMany({
193197
where: {
194198
id: deployment.id,
199+
status: "BUILDING",
195200
},
196201
data: {
197202
status: "DEPLOYING",
@@ -203,6 +208,18 @@ export class CreateDeploymentBackgroundWorkerServiceV4 extends BaseService {
203208
},
204209
});
205210

211+
if (updatedCount === 0) {
212+
logger.warn(
213+
"createDeploymentBackgroundWorker: deployment no longer in BUILDING state, skipping DEPLOYING transition",
214+
{
215+
deploymentId,
216+
environmentId: environment.id,
217+
projectId: environment.projectId,
218+
}
219+
);
220+
return backgroundWorker;
221+
}
222+
206223
await TimeoutDeploymentService.enqueue(
207224
deployment.id,
208225
"DEPLOYING",
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
import type { CreateBackgroundWorkerRequestBody } from "@trigger.dev/core/v3";
2+
import { BackgroundWorkerId } from "@trigger.dev/core/v3/isomorphic";
3+
import type {
4+
BackgroundWorker,
5+
PrismaClientOrTransaction,
6+
WorkerDeployment,
7+
} from "@trigger.dev/database";
8+
import { prisma as defaultPrisma } from "~/db.server";
9+
import type { AuthenticatedEnvironment } from "~/services/apiAuth.server";
10+
import { ServiceValidationError } from "../common.server";
11+
import { stripBackgroundWorkerMetadataForStorage } from "../stripBackgroundWorkerMetadataForStorage.server";
12+
13+
/**
14+
* Idempotent on `(project, environment, version)` for sequential calls, not concurrent calls.
15+
*/
16+
export async function findOrCreateBackgroundWorker(
17+
environment: AuthenticatedEnvironment,
18+
deployment: WorkerDeployment,
19+
body: CreateBackgroundWorkerRequestBody,
20+
prisma: PrismaClientOrTransaction = defaultPrisma
21+
): Promise<BackgroundWorker> {
22+
const existing = await prisma.backgroundWorker.findFirst({
23+
where: {
24+
projectId: environment.projectId,
25+
runtimeEnvironmentId: environment.id,
26+
version: deployment.version,
27+
},
28+
});
29+
30+
if (existing && existing.contentHash === body.metadata.contentHash) {
31+
return existing;
32+
}
33+
34+
if (existing) {
35+
throw new ServiceValidationError(
36+
"A background worker for this deployment version already exists with a different content hash",
37+
409
38+
);
39+
}
40+
41+
return prisma.backgroundWorker.create({
42+
data: {
43+
...BackgroundWorkerId.generate(),
44+
version: deployment.version,
45+
runtimeEnvironmentId: environment.id,
46+
projectId: environment.projectId,
47+
metadata: stripBackgroundWorkerMetadataForStorage(body.metadata),
48+
contentHash: body.metadata.contentHash,
49+
cliVersion: body.metadata.cliPackageVersion,
50+
sdkVersion: body.metadata.packageVersion,
51+
supportsLazyAttempts: body.supportsLazyAttempts,
52+
engine: body.engine,
53+
runtime: body.metadata.runtime,
54+
runtimeVersion: body.metadata.runtimeVersion,
55+
},
56+
});
57+
}
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
import { BackgroundWorkerMetadata } from "@trigger.dev/core/v3";
2+
import { Prisma } from "@trigger.dev/database";
3+
4+
/**
5+
* Strip BackgroundWorkerMetadata down to the slice that's actually read after
6+
* storage. Everything else is duplicated to dedicated columns/tables
7+
* (BackgroundWorker.{contentHash,cliVersion,sdkVersion,runtime,runtimeVersion},
8+
* BackgroundWorkerTask, BackgroundWorkerFile, TaskQueue, Prompt). Today the
9+
* only post-write reader is changeCurrentDeployment.server.ts, which feeds
10+
* tasks[].schedule into syncDeclarativeSchedules. packageVersion, contentHash,
11+
* and tasks[].filePath are kept solely to satisfy BackgroundWorkerMetadata's
12+
* required fields when the column is parsed back.
13+
*/
14+
export function stripBackgroundWorkerMetadataForStorage(
15+
metadata: BackgroundWorkerMetadata
16+
): Prisma.InputJsonValue {
17+
return {
18+
packageVersion: metadata.packageVersion,
19+
contentHash: metadata.contentHash,
20+
tasks: metadata.tasks
21+
.filter((t) => t.schedule)
22+
.map((t) => ({
23+
id: t.id,
24+
filePath: t.filePath,
25+
schedule: t.schedule,
26+
})),
27+
};
28+
}

0 commit comments

Comments
 (0)