Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 18 additions & 10 deletions apps/webapp/app/runEngine/services/triggerTask.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,19 @@ import type {
QueueManager,
RunNumberIncrementer,
TraceEventConcern,
TriggerRacepoints,
TriggerRacepointSystem,
TriggerTaskRequest,
TriggerTaskValidator,
} from "../types";
import { ServiceValidationError } from "~/v3/services/common.server";

class NoopTriggerRacepointSystem implements TriggerRacepointSystem {
async waitForRacepoint(options: { racepoint: TriggerRacepoints; id: string }): Promise<void> {
return;
}
}

export class RunEngineTriggerTaskService {
private readonly queueConcern: QueueManager;
private readonly validator: TriggerTaskValidator;
Expand All @@ -52,6 +60,7 @@ export class RunEngineTriggerTaskService {
private readonly engine: RunEngine;
private readonly tracer: Tracer;
private readonly traceEventConcern: TraceEventConcern;
private readonly triggerRacepointSystem: TriggerRacepointSystem;
private readonly metadataMaximumSize: number;

constructor(opts: {
Expand All @@ -65,6 +74,7 @@ export class RunEngineTriggerTaskService {
traceEventConcern: TraceEventConcern;
tracer: Tracer;
metadataMaximumSize: number;
triggerRacepointSystem?: TriggerRacepointSystem;
}) {
this.prisma = opts.prisma;
this.engine = opts.engine;
Expand All @@ -76,6 +86,7 @@ export class RunEngineTriggerTaskService {
this.tracer = opts.tracer;
this.traceEventConcern = opts.traceEventConcern;
this.metadataMaximumSize = opts.metadataMaximumSize;
this.triggerRacepointSystem = opts.triggerRacepointSystem ?? new NoopTriggerRacepointSystem();
}

public async call({
Expand Down Expand Up @@ -196,19 +207,16 @@ export class RunEngineTriggerTaskService {

const { idempotencyKey, idempotencyKeyExpiresAt } = idempotencyKeyConcernResult;

if (idempotencyKey) {
await this.triggerRacepointSystem.waitForRacepoint({
racepoint: "idempotencyKey",
id: idempotencyKey,
});
}

if (!options.skipChecks) {
const queueSizeGuard = await this.queueConcern.validateQueueLimits(environment);

logger.debug("Queue size guard result", {
queueSizeGuard,
environment: {
id: environment.id,
type: environment.type,
organization: environment.organization,
project: environment.project,
},
});

if (!queueSizeGuard.ok) {
throw new ServiceValidationError(
`Cannot trigger ${taskId} as the queue size limit for this environment has been reached. The maximum size is ${queueSizeGuard.maximumSize}`
Expand Down
6 changes: 6 additions & 0 deletions apps/webapp/app/runEngine/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -156,3 +156,9 @@ export interface TraceEventConcern {
callback: (span: TracedEventSpan) => Promise<T>
): Promise<T>;
}

export type TriggerRacepoints = "idempotencyKey";

export interface TriggerRacepointSystem {
waitForRacepoint(options: { racepoint: TriggerRacepoints; id: string }): Promise<void>;
}
251 changes: 250 additions & 1 deletion apps/webapp/test/engine/triggerTask.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ vi.mock("~/services/platform.v3.server", async (importOriginal) => {

import { RunEngine } from "@internal/run-engine";
import { setupAuthenticatedEnvironment, setupBackgroundWorker } from "@internal/run-engine/tests";
import { containerTest } from "@internal/testcontainers";
import { assertNonNullable, containerTest } from "@internal/testcontainers";
import { trace } from "@opentelemetry/api";
import { IOPacket } from "@trigger.dev/core/v3";
import { TaskRun } from "@trigger.dev/database";
Expand All @@ -31,11 +31,15 @@ import {
TagValidationParams,
TracedEventSpan,
TraceEventConcern,
TriggerRacepoints,
TriggerRacepointSystem,
TriggerTaskRequest,
TriggerTaskValidator,
ValidationResult,
} from "~/runEngine/types";
import { RunEngineTriggerTaskService } from "../../app/runEngine/services/triggerTask.server";
import { promiseWithResolvers } from "@trigger.dev/core";
import { setTimeout } from "node:timers/promises";

vi.setConfig({ testTimeout: 30_000 }); // 30 seconds timeout

Expand Down Expand Up @@ -108,6 +112,29 @@ class MockTraceEventConcern implements TraceEventConcern {
}
}

type TriggerRacepoint = { promise: Promise<void>; resolve: (value: void) => void };

class MockTriggerRacepointSystem implements TriggerRacepointSystem {
private racepoints: Record<string, TriggerRacepoint | undefined> = {};

async waitForRacepoint({ id }: { racepoint: TriggerRacepoints; id: string }): Promise<void> {
const racepoint = this.racepoints[id];

if (racepoint) {
return racepoint.promise;
}

return Promise.resolve();
}

registerRacepoint(racepoint: TriggerRacepoints, id: string): TriggerRacepoint {
const { promise, resolve } = promiseWithResolvers<void>();
this.racepoints[id] = { promise, resolve };

return { promise, resolve };
}
}

describe("RunEngineTriggerTaskService", () => {
containerTest("should trigger a task with minimal options", async ({ prisma, redisOptions }) => {
const engine = new RunEngine({
Expand Down Expand Up @@ -312,6 +339,228 @@ describe("RunEngineTriggerTaskService", () => {
await engine.quit();
});

containerTest(
"should handle idempotency keys when the engine throws an RunDuplicateIdempotencyKeyError",
async ({ prisma, redisOptions }) => {
const engine = new RunEngine({
prisma,
worker: {
redis: redisOptions,
workers: 1,
tasksPerWorker: 10,
pollIntervalMs: 100,
},
queue: {
redis: redisOptions,
},
runLock: {
redis: redisOptions,
},
machines: {
defaultMachine: "small-1x",
machines: {
"small-1x": {
name: "small-1x" as const,
cpu: 0.5,
memory: 0.5,
centsPerMs: 0.0001,
},
},
baseCostInCents: 0.0005,
},
tracer: trace.getTracer("test", "0.0.0"),
logLevel: "debug",
});

const parentTask = "parent-task";

const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION");

const taskIdentifier = "test-task";

//create background worker
await setupBackgroundWorker(engine, authenticatedEnvironment, [parentTask, taskIdentifier]);

const parentRun1 = await engine.trigger(
{
number: 1,
friendlyId: "run_p1",
environment: authenticatedEnvironment,
taskIdentifier: parentTask,
payload: "{}",
payloadType: "application/json",
context: {},
traceContext: {},
traceId: "t12345",
spanId: "s12345",
queue: `task/${parentTask}`,
isTest: false,
tags: [],
workerQueue: "main",
},
prisma
);

//dequeue parent and create the attempt
await setTimeout(500);
const dequeued = await engine.dequeueFromWorkerQueue({
consumerId: "test_12345",
workerQueue: "main",
});
await engine.startRunAttempt({
runId: parentRun1.id,
snapshotId: dequeued[0].snapshot.id,
});

const parentRun2 = await engine.trigger(
{
number: 2,
friendlyId: "run_p2",
environment: authenticatedEnvironment,
taskIdentifier: parentTask,
payload: "{}",
payloadType: "application/json",
context: {},
traceContext: {},
traceId: "t12346",
spanId: "s12346",
queue: `task/${parentTask}`,
isTest: false,
tags: [],
workerQueue: "main",
},
prisma
);

await setTimeout(500);
const dequeued2 = await engine.dequeueFromWorkerQueue({
consumerId: "test_12345",
workerQueue: "main",
});
await engine.startRunAttempt({
runId: parentRun2.id,
snapshotId: dequeued2[0].snapshot.id,
});

const queuesManager = new DefaultQueueManager(prisma, engine);

const idempotencyKeyConcern = new IdempotencyKeyConcern(
prisma,
engine,
new MockTraceEventConcern()
);

const triggerRacepointSystem = new MockTriggerRacepointSystem();

const triggerTaskService = new RunEngineTriggerTaskService({
engine,
prisma,
runNumberIncrementer: new MockRunNumberIncrementer(),
payloadProcessor: new MockPayloadProcessor(),
queueConcern: queuesManager,
idempotencyKeyConcern,
validator: new MockTriggerTaskValidator(),
traceEventConcern: new MockTraceEventConcern(),
tracer: trace.getTracer("test", "0.0.0"),
metadataMaximumSize: 1024 * 1024 * 1, // 1MB
triggerRacepointSystem,
});

const idempotencyKey = "test-idempotency-key";

const racepoint = triggerRacepointSystem.registerRacepoint("idempotencyKey", idempotencyKey);

const childTriggerPromise1 = triggerTaskService.call({
taskId: taskIdentifier,
environment: authenticatedEnvironment,
body: {
payload: { test: "test" },
options: {
idempotencyKey,
parentRunId: parentRun1.friendlyId,
resumeParentOnCompletion: true,
},
},
});

const childTriggerPromise2 = triggerTaskService.call({
taskId: taskIdentifier,
environment: authenticatedEnvironment,
body: {
payload: { test: "test" },
options: {
idempotencyKey,
parentRunId: parentRun2.friendlyId,
resumeParentOnCompletion: true,
},
},
});

await setTimeout(500);

// Now we can resolve the racepoint
racepoint.resolve();

const result = await childTriggerPromise1;
const result2 = await childTriggerPromise2;

expect(result).toBeDefined();
expect(result?.run.friendlyId).toBeDefined();
expect(result?.run.status).toBe("PENDING");

const run = await prisma.taskRun.findUnique({
where: {
id: result?.run.id,
},
});

expect(run).toBeDefined();
expect(run?.friendlyId).toBe(result?.run.friendlyId);
expect(run?.engine).toBe("V2");
expect(run?.queuedAt).toBeDefined();
expect(run?.queue).toBe(`task/${taskIdentifier}`);

expect(result2).toBeDefined();
expect(result2?.run.friendlyId).toBe(result?.run.friendlyId);

const parent1ExecutionData = await engine.getRunExecutionData({ runId: parentRun1.id });
assertNonNullable(parent1ExecutionData);
expect(parent1ExecutionData.snapshot.executionStatus).toBe("EXECUTING_WITH_WAITPOINTS");

const parent2ExecutionData = await engine.getRunExecutionData({ runId: parentRun2.id });
assertNonNullable(parent2ExecutionData);
expect(parent2ExecutionData.snapshot.executionStatus).toBe("EXECUTING_WITH_WAITPOINTS");

const parent1RunWaitpoint = await prisma.taskRunWaitpoint.findFirst({
where: {
taskRunId: parentRun1.id,
},
include: {
waitpoint: true,
},
});

assertNonNullable(parent1RunWaitpoint);
expect(parent1RunWaitpoint.waitpoint.type).toBe("RUN");
expect(parent1RunWaitpoint.waitpoint.completedByTaskRunId).toBe(result?.run.id);

const parent2RunWaitpoint = await prisma.taskRunWaitpoint.findFirst({
where: {
taskRunId: parentRun2.id,
},
include: {
waitpoint: true,
},
});

assertNonNullable(parent2RunWaitpoint);
expect(parent2RunWaitpoint.waitpoint.type).toBe("RUN");
expect(parent2RunWaitpoint.waitpoint.completedByTaskRunId).toBe(result2?.run.id);

await engine.quit();
}
);

containerTest(
"should resolve queue names correctly when locked to version",
async ({ prisma, redisOptions }) => {
Expand Down
Loading
Loading