diff --git a/apps/webapp/app/runEngine/services/triggerTask.server.ts b/apps/webapp/app/runEngine/services/triggerTask.server.ts index 0203ba0c76..1b598e592b 100644 --- a/apps/webapp/app/runEngine/services/triggerTask.server.ts +++ b/apps/webapp/app/runEngine/services/triggerTask.server.ts @@ -37,11 +37,19 @@ import type { QueueManager, RunNumberIncrementer, TraceEventConcern, + TriggerRacepoints, + TriggerRacepointSystem, TriggerTaskRequest, TriggerTaskValidator, } from "../types"; import { ServiceValidationError } from "~/v3/services/common.server"; +class NoopTriggerRacepointSystem implements TriggerRacepointSystem { + async waitForRacepoint(options: { racepoint: TriggerRacepoints; id: string }): Promise { + return; + } +} + export class RunEngineTriggerTaskService { private readonly queueConcern: QueueManager; private readonly validator: TriggerTaskValidator; @@ -52,6 +60,7 @@ export class RunEngineTriggerTaskService { private readonly engine: RunEngine; private readonly tracer: Tracer; private readonly traceEventConcern: TraceEventConcern; + private readonly triggerRacepointSystem: TriggerRacepointSystem; private readonly metadataMaximumSize: number; constructor(opts: { @@ -65,6 +74,7 @@ export class RunEngineTriggerTaskService { traceEventConcern: TraceEventConcern; tracer: Tracer; metadataMaximumSize: number; + triggerRacepointSystem?: TriggerRacepointSystem; }) { this.prisma = opts.prisma; this.engine = opts.engine; @@ -76,6 +86,7 @@ export class RunEngineTriggerTaskService { this.tracer = opts.tracer; this.traceEventConcern = opts.traceEventConcern; this.metadataMaximumSize = opts.metadataMaximumSize; + this.triggerRacepointSystem = opts.triggerRacepointSystem ?? new NoopTriggerRacepointSystem(); } public async call({ @@ -196,19 +207,16 @@ export class RunEngineTriggerTaskService { const { idempotencyKey, idempotencyKeyExpiresAt } = idempotencyKeyConcernResult; + if (idempotencyKey) { + await this.triggerRacepointSystem.waitForRacepoint({ + racepoint: "idempotencyKey", + id: idempotencyKey, + }); + } + if (!options.skipChecks) { const queueSizeGuard = await this.queueConcern.validateQueueLimits(environment); - logger.debug("Queue size guard result", { - queueSizeGuard, - environment: { - id: environment.id, - type: environment.type, - organization: environment.organization, - project: environment.project, - }, - }); - if (!queueSizeGuard.ok) { throw new ServiceValidationError( `Cannot trigger ${taskId} as the queue size limit for this environment has been reached. The maximum size is ${queueSizeGuard.maximumSize}` diff --git a/apps/webapp/app/runEngine/types.ts b/apps/webapp/app/runEngine/types.ts index b1aa8b7715..10dcbd7a3d 100644 --- a/apps/webapp/app/runEngine/types.ts +++ b/apps/webapp/app/runEngine/types.ts @@ -156,3 +156,9 @@ export interface TraceEventConcern { callback: (span: TracedEventSpan) => Promise ): Promise; } + +export type TriggerRacepoints = "idempotencyKey"; + +export interface TriggerRacepointSystem { + waitForRacepoint(options: { racepoint: TriggerRacepoints; id: string }): Promise; +} diff --git a/apps/webapp/test/engine/triggerTask.test.ts b/apps/webapp/test/engine/triggerTask.test.ts index ad16be44e7..e21e0dbb2e 100644 --- a/apps/webapp/test/engine/triggerTask.test.ts +++ b/apps/webapp/test/engine/triggerTask.test.ts @@ -16,7 +16,7 @@ vi.mock("~/services/platform.v3.server", async (importOriginal) => { import { RunEngine } from "@internal/run-engine"; import { setupAuthenticatedEnvironment, setupBackgroundWorker } from "@internal/run-engine/tests"; -import { containerTest } from "@internal/testcontainers"; +import { assertNonNullable, containerTest } from "@internal/testcontainers"; import { trace } from "@opentelemetry/api"; import { IOPacket } from "@trigger.dev/core/v3"; import { TaskRun } from "@trigger.dev/database"; @@ -31,11 +31,15 @@ import { TagValidationParams, TracedEventSpan, TraceEventConcern, + TriggerRacepoints, + TriggerRacepointSystem, TriggerTaskRequest, TriggerTaskValidator, ValidationResult, } from "~/runEngine/types"; import { RunEngineTriggerTaskService } from "../../app/runEngine/services/triggerTask.server"; +import { promiseWithResolvers } from "@trigger.dev/core"; +import { setTimeout } from "node:timers/promises"; vi.setConfig({ testTimeout: 30_000 }); // 30 seconds timeout @@ -108,6 +112,29 @@ class MockTraceEventConcern implements TraceEventConcern { } } +type TriggerRacepoint = { promise: Promise; resolve: (value: void) => void }; + +class MockTriggerRacepointSystem implements TriggerRacepointSystem { + private racepoints: Record = {}; + + async waitForRacepoint({ id }: { racepoint: TriggerRacepoints; id: string }): Promise { + const racepoint = this.racepoints[id]; + + if (racepoint) { + return racepoint.promise; + } + + return Promise.resolve(); + } + + registerRacepoint(racepoint: TriggerRacepoints, id: string): TriggerRacepoint { + const { promise, resolve } = promiseWithResolvers(); + this.racepoints[id] = { promise, resolve }; + + return { promise, resolve }; + } +} + describe("RunEngineTriggerTaskService", () => { containerTest("should trigger a task with minimal options", async ({ prisma, redisOptions }) => { const engine = new RunEngine({ @@ -312,6 +339,228 @@ describe("RunEngineTriggerTaskService", () => { await engine.quit(); }); + containerTest( + "should handle idempotency keys when the engine throws an RunDuplicateIdempotencyKeyError", + async ({ prisma, redisOptions }) => { + const engine = new RunEngine({ + prisma, + worker: { + redis: redisOptions, + workers: 1, + tasksPerWorker: 10, + pollIntervalMs: 100, + }, + queue: { + redis: redisOptions, + }, + runLock: { + redis: redisOptions, + }, + machines: { + defaultMachine: "small-1x", + machines: { + "small-1x": { + name: "small-1x" as const, + cpu: 0.5, + memory: 0.5, + centsPerMs: 0.0001, + }, + }, + baseCostInCents: 0.0005, + }, + tracer: trace.getTracer("test", "0.0.0"), + logLevel: "debug", + }); + + const parentTask = "parent-task"; + + const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); + + const taskIdentifier = "test-task"; + + //create background worker + await setupBackgroundWorker(engine, authenticatedEnvironment, [parentTask, taskIdentifier]); + + const parentRun1 = await engine.trigger( + { + number: 1, + friendlyId: "run_p1", + environment: authenticatedEnvironment, + taskIdentifier: parentTask, + payload: "{}", + payloadType: "application/json", + context: {}, + traceContext: {}, + traceId: "t12345", + spanId: "s12345", + queue: `task/${parentTask}`, + isTest: false, + tags: [], + workerQueue: "main", + }, + prisma + ); + + //dequeue parent and create the attempt + await setTimeout(500); + const dequeued = await engine.dequeueFromWorkerQueue({ + consumerId: "test_12345", + workerQueue: "main", + }); + await engine.startRunAttempt({ + runId: parentRun1.id, + snapshotId: dequeued[0].snapshot.id, + }); + + const parentRun2 = await engine.trigger( + { + number: 2, + friendlyId: "run_p2", + environment: authenticatedEnvironment, + taskIdentifier: parentTask, + payload: "{}", + payloadType: "application/json", + context: {}, + traceContext: {}, + traceId: "t12346", + spanId: "s12346", + queue: `task/${parentTask}`, + isTest: false, + tags: [], + workerQueue: "main", + }, + prisma + ); + + await setTimeout(500); + const dequeued2 = await engine.dequeueFromWorkerQueue({ + consumerId: "test_12345", + workerQueue: "main", + }); + await engine.startRunAttempt({ + runId: parentRun2.id, + snapshotId: dequeued2[0].snapshot.id, + }); + + const queuesManager = new DefaultQueueManager(prisma, engine); + + const idempotencyKeyConcern = new IdempotencyKeyConcern( + prisma, + engine, + new MockTraceEventConcern() + ); + + const triggerRacepointSystem = new MockTriggerRacepointSystem(); + + const triggerTaskService = new RunEngineTriggerTaskService({ + engine, + prisma, + runNumberIncrementer: new MockRunNumberIncrementer(), + payloadProcessor: new MockPayloadProcessor(), + queueConcern: queuesManager, + idempotencyKeyConcern, + validator: new MockTriggerTaskValidator(), + traceEventConcern: new MockTraceEventConcern(), + tracer: trace.getTracer("test", "0.0.0"), + metadataMaximumSize: 1024 * 1024 * 1, // 1MB + triggerRacepointSystem, + }); + + const idempotencyKey = "test-idempotency-key"; + + const racepoint = triggerRacepointSystem.registerRacepoint("idempotencyKey", idempotencyKey); + + const childTriggerPromise1 = triggerTaskService.call({ + taskId: taskIdentifier, + environment: authenticatedEnvironment, + body: { + payload: { test: "test" }, + options: { + idempotencyKey, + parentRunId: parentRun1.friendlyId, + resumeParentOnCompletion: true, + }, + }, + }); + + const childTriggerPromise2 = triggerTaskService.call({ + taskId: taskIdentifier, + environment: authenticatedEnvironment, + body: { + payload: { test: "test" }, + options: { + idempotencyKey, + parentRunId: parentRun2.friendlyId, + resumeParentOnCompletion: true, + }, + }, + }); + + await setTimeout(500); + + // Now we can resolve the racepoint + racepoint.resolve(); + + const result = await childTriggerPromise1; + const result2 = await childTriggerPromise2; + + expect(result).toBeDefined(); + expect(result?.run.friendlyId).toBeDefined(); + expect(result?.run.status).toBe("PENDING"); + + const run = await prisma.taskRun.findUnique({ + where: { + id: result?.run.id, + }, + }); + + expect(run).toBeDefined(); + expect(run?.friendlyId).toBe(result?.run.friendlyId); + expect(run?.engine).toBe("V2"); + expect(run?.queuedAt).toBeDefined(); + expect(run?.queue).toBe(`task/${taskIdentifier}`); + + expect(result2).toBeDefined(); + expect(result2?.run.friendlyId).toBe(result?.run.friendlyId); + + const parent1ExecutionData = await engine.getRunExecutionData({ runId: parentRun1.id }); + assertNonNullable(parent1ExecutionData); + expect(parent1ExecutionData.snapshot.executionStatus).toBe("EXECUTING_WITH_WAITPOINTS"); + + const parent2ExecutionData = await engine.getRunExecutionData({ runId: parentRun2.id }); + assertNonNullable(parent2ExecutionData); + expect(parent2ExecutionData.snapshot.executionStatus).toBe("EXECUTING_WITH_WAITPOINTS"); + + const parent1RunWaitpoint = await prisma.taskRunWaitpoint.findFirst({ + where: { + taskRunId: parentRun1.id, + }, + include: { + waitpoint: true, + }, + }); + + assertNonNullable(parent1RunWaitpoint); + expect(parent1RunWaitpoint.waitpoint.type).toBe("RUN"); + expect(parent1RunWaitpoint.waitpoint.completedByTaskRunId).toBe(result?.run.id); + + const parent2RunWaitpoint = await prisma.taskRunWaitpoint.findFirst({ + where: { + taskRunId: parentRun2.id, + }, + include: { + waitpoint: true, + }, + }); + + assertNonNullable(parent2RunWaitpoint); + expect(parent2RunWaitpoint.waitpoint.type).toBe("RUN"); + expect(parent2RunWaitpoint.waitpoint.completedByTaskRunId).toBe(result2?.run.id); + + await engine.quit(); + } + ); + containerTest( "should resolve queue names correctly when locked to version", async ({ prisma, redisOptions }) => { diff --git a/internal-packages/run-engine/src/engine/index.ts b/internal-packages/run-engine/src/engine/index.ts index 8f1aedbebe..f9b3061e91 100644 --- a/internal-packages/run-engine/src/engine/index.ts +++ b/internal-packages/run-engine/src/engine/index.ts @@ -381,11 +381,15 @@ export class RunEngine { const status = delayUntil ? "DELAYED" : "PENDING"; //create run - let taskRun: TaskRun; + let taskRun: TaskRun & { associatedWaitpoint: Waitpoint | null }; + const taskRunId = RunId.fromFriendlyId(friendlyId); try { taskRun = await prisma.taskRun.create({ + include: { + associatedWaitpoint: true, + }, data: { - id: RunId.fromFriendlyId(friendlyId), + id: taskRunId, engine: "V2", status, number, @@ -459,6 +463,12 @@ export class RunEngine { runnerId, }, }, + associatedWaitpoint: { + create: this.waitpointSystem.buildRunAssociatedWaitpoint({ + projectId: environment.project.id, + environmentId: environment.id, + }), + }, }, }); } catch (error) { @@ -492,23 +502,13 @@ export class RunEngine { span.setAttribute("runId", taskRun.id); - //create associated waitpoint (this completes when the run completes) - const associatedWaitpoint = await this.waitpointSystem.createRunAssociatedWaitpoint( - prisma, - { - projectId: environment.project.id, - environmentId: environment.id, - completedByTaskRunId: taskRun.id, - } - ); - //triggerAndWait or batchTriggerAndWait - if (resumeParentOnCompletion && parentTaskRunId) { + if (resumeParentOnCompletion && parentTaskRunId && taskRun.associatedWaitpoint) { //this will block the parent run from continuing until this waitpoint is completed (and removed) await this.waitpointSystem.blockRunWithWaitpoint({ runId: parentTaskRunId, - waitpoints: associatedWaitpoint.id, - projectId: associatedWaitpoint.projectId, + waitpoints: taskRun.associatedWaitpoint.id, + projectId: taskRun.associatedWaitpoint.projectId, organizationId: environment.organization.id, batch, workerId, diff --git a/internal-packages/run-engine/src/engine/systems/waitpointSystem.ts b/internal-packages/run-engine/src/engine/systems/waitpointSystem.ts index 4e23934b73..74062ed17e 100644 --- a/internal-packages/run-engine/src/engine/systems/waitpointSystem.ts +++ b/internal-packages/run-engine/src/engine/systems/waitpointSystem.ts @@ -738,25 +738,21 @@ export class WaitpointSystem { }); // end of runlock } - public async createRunAssociatedWaitpoint( - tx: PrismaClientOrTransaction, - { + public buildRunAssociatedWaitpoint({ + projectId, + environmentId, + }: { + projectId: string; + environmentId: string; + }) { + return { + ...WaitpointId.generate(), + type: "RUN" as const, + status: "PENDING" as const, + idempotencyKey: nanoid(24), + userProvidedIdempotencyKey: false, projectId, environmentId, - completedByTaskRunId, - }: { projectId: string; environmentId: string; completedByTaskRunId: string } - ) { - return tx.waitpoint.create({ - data: { - ...WaitpointId.generate(), - type: "RUN", - status: "PENDING", - idempotencyKey: nanoid(24), - userProvidedIdempotencyKey: false, - projectId, - environmentId, - completedByTaskRunId, - }, - }); + }; } } diff --git a/internal-packages/run-engine/src/engine/tests/triggerAndWait.test.ts b/internal-packages/run-engine/src/engine/tests/triggerAndWait.test.ts index 101314e86d..b874d3f4ca 100644 --- a/internal-packages/run-engine/src/engine/tests/triggerAndWait.test.ts +++ b/internal-packages/run-engine/src/engine/tests/triggerAndWait.test.ts @@ -4,6 +4,7 @@ import { expect } from "vitest"; import { RunEngine } from "../index.js"; import { setTimeout } from "node:timers/promises"; import { setupAuthenticatedEnvironment, setupBackgroundWorker } from "./setup.js"; +import { RunDuplicateIdempotencyKeyError } from "../errors.js"; vi.setConfig({ testTimeout: 60_000 }); @@ -453,4 +454,164 @@ describe("RunEngine triggerAndWait", () => { } } ); + containerTest( + "triggerAndWait two parent runs triggering the same child run with the same idempotency key", + async ({ prisma, redisOptions }) => { + //create environment + const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); + + const engine = new RunEngine({ + prisma, + worker: { + redis: redisOptions, + workers: 1, + tasksPerWorker: 10, + pollIntervalMs: 100, + }, + queue: { + redis: redisOptions, + masterQueueConsumersDisabled: true, + processWorkerQueueDebounceMs: 50, + }, + runLock: { + redis: redisOptions, + }, + machines: { + defaultMachine: "small-1x", + machines: { + "small-1x": { + name: "small-1x" as const, + cpu: 0.5, + memory: 0.5, + centsPerMs: 0.0001, + }, + }, + baseCostInCents: 0.0001, + }, + tracer: trace.getTracer("test", "0.0.0"), + }); + + try { + const parentTask = "parent-task"; + const childTask = "child-task"; + const idempotencyKey = "a-key"; + + //create background worker + await setupBackgroundWorker(engine, authenticatedEnvironment, [parentTask, childTask]); + + //trigger the run + const parentRun1 = await engine.trigger( + { + number: 1, + friendlyId: "run_p1234", + environment: authenticatedEnvironment, + taskIdentifier: parentTask, + payload: "{}", + payloadType: "application/json", + context: {}, + traceContext: {}, + traceId: "t12345", + spanId: "s12345", + queue: `task/${parentTask}`, + isTest: false, + tags: [], + workerQueue: "main", + }, + prisma + ); + + //dequeue parent and create the attempt + await setTimeout(500); + const dequeued = await engine.dequeueFromWorkerQueue({ + consumerId: "test_12345", + workerQueue: "main", + }); + await engine.startRunAttempt({ + runId: parentRun1.id, + snapshotId: dequeued[0].snapshot.id, + }); + + const parentRun2 = await engine.trigger( + { + number: 2, + friendlyId: "run_p12345", + environment: authenticatedEnvironment, + taskIdentifier: parentTask, + payload: "{}", + payloadType: "application/json", + context: {}, + traceContext: {}, + traceId: "t12346", + spanId: "s12346", + queue: `task/${parentTask}`, + isTest: false, + tags: [], + workerQueue: "main", + }, + prisma + ); + + await setTimeout(500); + const dequeued2 = await engine.dequeueFromWorkerQueue({ + consumerId: "test_12345", + workerQueue: "main", + }); + await engine.startRunAttempt({ + runId: parentRun2.id, + snapshotId: dequeued2[0].snapshot.id, + }); + + await engine.trigger( + { + number: 1, + friendlyId: "run_c1234", + environment: authenticatedEnvironment, + taskIdentifier: childTask, + payload: "{}", + payloadType: "application/json", + context: {}, + traceContext: {}, + traceId: "t12345", + spanId: "s12345", + queue: `task/${childTask}`, + isTest: false, + tags: [], + resumeParentOnCompletion: true, + parentTaskRunId: parentRun1.id, + workerQueue: "main", + idempotencyKey, + }, + prisma + ); + + // This should throw a RunDuplicateIdempotencyKeyError + await expect( + engine.trigger( + { + number: 2, + friendlyId: "run_c12345", + environment: authenticatedEnvironment, + taskIdentifier: childTask, + payload: "{}", + payloadType: "application/json", + context: {}, + traceContext: {}, + traceId: "t123455", + spanId: "s123455", + queue: `task/${childTask}`, + isTest: false, + tags: [], + resumeParentOnCompletion: true, + parentTaskRunId: parentRun2.id, + workerQueue: "main", + idempotencyKey, + }, + prisma + ) + ).rejects.toThrow(RunDuplicateIdempotencyKeyError); + } finally { + await engine.quit(); + } + } + ); });