From f30f2514555f144d9aee3168f9628ecb8fd85fe8 Mon Sep 17 00:00:00 2001 From: Enea Date: Thu, 23 Apr 2026 09:46:11 -0400 Subject: [PATCH] feat(webapp): add `skipIfActive` trigger option for drop-on-conflict dedup MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit `skipIfActive: boolean` is a new option on `tasks.trigger()` (and batch items). When set, and at least one `tag` is supplied, the webapp checks Postgres for an in-flight (non-terminal) TaskRun with the same `taskIdentifier` + environment + all-of the supplied tags. If one exists, the trigger is a no-op: the existing run is returned with `isCached: true` + new `wasSkipped: true` flag, and no new run is created. Why: current options don't cover the "cron scanner drops duplicates" pattern. - `idempotencyKey` caches successful completions too -> scheduled retriggers blocked until TTL expires. - `concurrencyKey` queues duplicates FIFO -> backlog grows when a sync hangs; dashboard fills with redundant QUEUED rows. - Manual `runs.list()` dedup in user code is expensive (ClickHouse `task_runs_v2 FINAL + hasAny(tags)` under load) and gets reinvented poorly in every downstream project. Implementation follows the `IdempotencyKeyConcern` pattern: - `SkipIfActiveConcern.handleTriggerRequest()` does one indexed SELECT on `"TaskRun"` (GIN `runTags_idx` + composite status/env index). - Invoked in `RunEngineTriggerTaskService` AFTER idempotency (explicit idempotency match wins) but BEFORE run creation (skipped triggers never touch the queue). - Route surfaces `wasSkipped: true` in the response. Additive — older SDK clients ignore unknown field. Scope: - Engine v2 only (the V1 `TriggerTaskServiceV1` path is not touched; new code in v1 is frozen upstream). - Per-item honor in batch trigger via the same per-item concern call. - No schema migration; reuses existing indexes. Docs: `docs/skip-if-active.mdx` compares with `idempotencyKey` / `concurrencyKey` and documents the interaction matrix. Tests: unit tests for `SkipIfActiveConcern` covering no-flag, no-tags, no-match, single-match, string-tag normalization, and the row-disappeared-mid-query race. Refs: https://github.com/triggerdotdev/trigger.dev/issues/3428 (related context for cron-scanner load patterns that motivated this option). --- .../routes/api.v1.tasks.$taskId.trigger.ts | 1 + .../runEngine/concerns/skipIfActive.server.ts | 105 +++++++++++++ .../runEngine/services/triggerTask.server.ts | 12 ++ .../app/v3/services/triggerTask.server.ts | 6 + .../test/engine/skipIfActiveConcern.test.ts | 144 ++++++++++++++++++ docs/docs.json | 1 + docs/skip-if-active.mdx | 81 ++++++++++ packages/core/src/v3/schemas/api.ts | 20 +++ 8 files changed, 370 insertions(+) create mode 100644 apps/webapp/app/runEngine/concerns/skipIfActive.server.ts create mode 100644 apps/webapp/test/engine/skipIfActiveConcern.test.ts create mode 100644 docs/skip-if-active.mdx diff --git a/apps/webapp/app/routes/api.v1.tasks.$taskId.trigger.ts b/apps/webapp/app/routes/api.v1.tasks.$taskId.trigger.ts index 0c188c17768..c673107eeba 100644 --- a/apps/webapp/app/routes/api.v1.tasks.$taskId.trigger.ts +++ b/apps/webapp/app/routes/api.v1.tasks.$taskId.trigger.ts @@ -135,6 +135,7 @@ const { action, loader } = createActionApiRoute( { id: result.run.friendlyId, isCached: result.isCached, + ...(result.wasSkipped ? { wasSkipped: true } : {}), }, { headers: $responseHeaders, diff --git a/apps/webapp/app/runEngine/concerns/skipIfActive.server.ts b/apps/webapp/app/runEngine/concerns/skipIfActive.server.ts new file mode 100644 index 00000000000..a52ef464ce2 --- /dev/null +++ b/apps/webapp/app/runEngine/concerns/skipIfActive.server.ts @@ -0,0 +1,105 @@ +import { Prisma, type PrismaClientOrTransaction, type TaskRun } from "@trigger.dev/database"; +import { logger } from "~/services/logger.server"; +import type { TriggerTaskRequest } from "../types"; + +export type SkipIfActiveConcernResult = + | { wasSkipped: true; run: TaskRun } + | { wasSkipped: false }; + +/** + * DB-level `TaskRunStatus` values that represent a run that has not reached + * a terminal state — i.e. still counts as "active" for dedup purposes. + * Mirrors the non-final statuses in `TaskRunStatus` (see + * `internal-packages/database/prisma/schema.prisma`). + */ +const ACTIVE_TASK_RUN_STATUSES = [ + "DELAYED", + "PENDING", + "PENDING_VERSION", + "WAITING_FOR_DEPLOY", + "DEQUEUED", + "EXECUTING", + "WAITING_TO_RESUME", + "RETRYING_AFTER_FAILURE", + "PAUSED", +] as const; + +/** + * Implements the `skipIfActive` trigger option. + * + * When `body.options.skipIfActive === true` and at least one tag is set, the + * concern looks for any in-flight TaskRun with: + * + * runtimeEnvironmentId = + * taskIdentifier = + * status IN (non-terminal) + * runTags @> + * + * If found, the existing run is returned and the trigger is short-circuited. + * If not found, the caller proceeds to create a new run as usual. + * + * Intended use case: cron-style scanners that poll at a fixed cadence but + * should drop duplicate triggers while a prior invocation is still running — + * without generating queue backlog (`concurrencyKey`) or caching successful + * completions (`idempotencyKey`). + */ +export class SkipIfActiveConcern { + constructor(private readonly prisma: PrismaClientOrTransaction) {} + + async handleTriggerRequest(request: TriggerTaskRequest): Promise { + if (request.body.options?.skipIfActive !== true) { + return { wasSkipped: false }; + } + + const rawTags = request.body.options?.tags; + const tags = Array.isArray(rawTags) ? rawTags : rawTags ? [rawTags] : []; + + if (tags.length === 0) { + // `skipIfActive` requires a tag scope — without tags, every run of this + // task would dedup against every other, which is rarely the intent. + // Treat as no-op rather than silently matching. + logger.debug("[SkipIfActiveConcern] skipIfActive=true with no tags — skipping the check", { + taskIdentifier: request.taskId, + }); + return { wasSkipped: false }; + } + + // `runTags @> ARRAY[...]::text[]` hits the GIN ArrayOps index on + // `TaskRun.runTags` and AND-matches every supplied tag. Bounded by + // runtimeEnvironmentId + taskIdentifier + status via existing indexes. + const statusArray = Prisma.sql`ARRAY[${Prisma.join( + ACTIVE_TASK_RUN_STATUSES.map((s) => Prisma.sql`${s}`) + )}]::"TaskRunStatus"[]`; + + const existing = await this.prisma.$queryRaw>` + SELECT id + FROM "TaskRun" + WHERE "runtimeEnvironmentId" = ${request.environment.id} + AND "taskIdentifier" = ${request.taskId} + AND status = ANY(${statusArray}) + AND "runTags" @> ${tags}::text[] + LIMIT 1 + `; + + if (existing.length === 0) { + return { wasSkipped: false }; + } + + const run = await this.prisma.taskRun.findUnique({ where: { id: existing[0].id } }); + if (!run) { + // Row disappeared between the existence probe and the fetch (e.g. + // completed + deleted mid-query). Treat as "no active run" so the + // caller creates a fresh one instead of failing. + return { wasSkipped: false }; + } + + logger.debug("[SkipIfActiveConcern] active run matched, skipping new trigger", { + runId: run.id, + taskIdentifier: request.taskId, + environmentId: request.environment.id, + tags, + }); + + return { wasSkipped: true, run }; + } +} diff --git a/apps/webapp/app/runEngine/services/triggerTask.server.ts b/apps/webapp/app/runEngine/services/triggerTask.server.ts index f19404b3ec5..54f635e72d8 100644 --- a/apps/webapp/app/runEngine/services/triggerTask.server.ts +++ b/apps/webapp/app/runEngine/services/triggerTask.server.ts @@ -31,6 +31,7 @@ import type { } from "../../v3/services/triggerTask.server"; import { clampMaxDuration } from "../../v3/utils/maxDuration"; import { IdempotencyKeyConcern } from "../concerns/idempotencyKeys.server"; +import { SkipIfActiveConcern } from "../concerns/skipIfActive.server"; import type { PayloadProcessor, QueueManager, @@ -54,6 +55,7 @@ export class RunEngineTriggerTaskService { private readonly validator: TriggerTaskValidator; private readonly payloadProcessor: PayloadProcessor; private readonly idempotencyKeyConcern: IdempotencyKeyConcern; + private readonly skipIfActiveConcern: SkipIfActiveConcern; private readonly runNumberIncrementer: RunNumberIncrementer; private readonly prisma: PrismaClientOrTransaction; private readonly engine: RunEngine; @@ -69,6 +71,7 @@ export class RunEngineTriggerTaskService { validator: TriggerTaskValidator; payloadProcessor: PayloadProcessor; idempotencyKeyConcern: IdempotencyKeyConcern; + skipIfActiveConcern?: SkipIfActiveConcern; runNumberIncrementer: RunNumberIncrementer; traceEventConcern: TraceEventConcern; tracer: Tracer; @@ -81,6 +84,7 @@ export class RunEngineTriggerTaskService { this.validator = opts.validator; this.payloadProcessor = opts.payloadProcessor; this.idempotencyKeyConcern = opts.idempotencyKeyConcern; + this.skipIfActiveConcern = opts.skipIfActiveConcern ?? new SkipIfActiveConcern(opts.prisma); this.runNumberIncrementer = opts.runNumberIncrementer; this.tracer = opts.tracer; this.traceEventConcern = opts.traceEventConcern; @@ -207,6 +211,14 @@ export class RunEngineTriggerTaskService { const { idempotencyKey, idempotencyKeyExpiresAt } = idempotencyKeyConcernResult; + // `skipIfActive` — drop-on-conflict. Runs *after* idempotency so a + // deliberate idempotency cache-hit wins, but *before* run creation so + // we never touch the queue for a skipped trigger. + const skipIfActiveResult = await this.skipIfActiveConcern.handleTriggerRequest(triggerRequest); + if (skipIfActiveResult.wasSkipped) { + return { run: skipIfActiveResult.run, isCached: true, wasSkipped: true }; + } + if (idempotencyKey) { await this.triggerRacepointSystem.waitForRacepoint({ racepoint: "idempotencyKey", diff --git a/apps/webapp/app/v3/services/triggerTask.server.ts b/apps/webapp/app/v3/services/triggerTask.server.ts index f68b23832b8..8b7b440f2e7 100644 --- a/apps/webapp/app/v3/services/triggerTask.server.ts +++ b/apps/webapp/app/v3/services/triggerTask.server.ts @@ -45,6 +45,12 @@ export class OutOfEntitlementError extends Error { export type TriggerTaskServiceResult = { run: TaskRun; isCached: boolean; + /** + * True when the run returned was matched by the `skipIfActive` option and + * no new run was created. Always false/undefined for idempotency-cached + * runs — `isCached` distinguishes those. + */ + wasSkipped?: boolean; }; export const MAX_ATTEMPTS = 2; diff --git a/apps/webapp/test/engine/skipIfActiveConcern.test.ts b/apps/webapp/test/engine/skipIfActiveConcern.test.ts new file mode 100644 index 00000000000..58eb21292a7 --- /dev/null +++ b/apps/webapp/test/engine/skipIfActiveConcern.test.ts @@ -0,0 +1,144 @@ +import { describe, expect, it, vi } from "vitest"; + +vi.mock("~/services/logger.server", () => ({ + logger: { + debug: vi.fn(), + info: vi.fn(), + warn: vi.fn(), + error: vi.fn(), + }, +})); + +import { SkipIfActiveConcern } from "../../app/runEngine/concerns/skipIfActive.server"; +import type { TriggerTaskRequest } from "../../app/runEngine/types"; + +type MockPrisma = { + $queryRaw: ReturnType; + taskRun: { findUnique: ReturnType }; +}; + +function buildRequest(overrides: { + skipIfActive?: boolean; + tags?: string | string[]; + taskId?: string; + environmentId?: string; +}): TriggerTaskRequest { + return { + taskId: overrides.taskId ?? "ezderm-notes-fetch", + friendlyId: "run_test", + environment: { + id: overrides.environmentId ?? "env_123", + organizationId: "org_1", + projectId: "proj_1", + } as TriggerTaskRequest["environment"], + body: { + payload: {}, + options: { + skipIfActive: overrides.skipIfActive, + tags: overrides.tags, + }, + }, + options: {}, + } as TriggerTaskRequest; +} + +function mockPrisma(initial?: { + existing?: Array<{ id: string }>; + run?: { id: string } | null; +}): MockPrisma { + return { + $queryRaw: vi.fn().mockResolvedValue(initial?.existing ?? []), + taskRun: { findUnique: vi.fn().mockResolvedValue(initial?.run ?? null) }, + }; +} + +describe("SkipIfActiveConcern", () => { + it("returns wasSkipped=false when the flag is not set", async () => { + const prisma = mockPrisma(); + const concern = new SkipIfActiveConcern(prisma as never); + + const result = await concern.handleTriggerRequest( + buildRequest({ skipIfActive: undefined, tags: ["connector:abc"] }) + ); + + expect(result).toEqual({ wasSkipped: false }); + expect(prisma.$queryRaw).not.toHaveBeenCalled(); + }); + + it("returns wasSkipped=false when skipIfActive=false", async () => { + const prisma = mockPrisma(); + const concern = new SkipIfActiveConcern(prisma as never); + + const result = await concern.handleTriggerRequest( + buildRequest({ skipIfActive: false, tags: ["connector:abc"] }) + ); + + expect(result).toEqual({ wasSkipped: false }); + expect(prisma.$queryRaw).not.toHaveBeenCalled(); + }); + + it("returns wasSkipped=false when no tags are supplied (tag scope required)", async () => { + const prisma = mockPrisma(); + const concern = new SkipIfActiveConcern(prisma as never); + + const result = await concern.handleTriggerRequest( + buildRequest({ skipIfActive: true, tags: undefined }) + ); + + expect(result).toEqual({ wasSkipped: false }); + expect(prisma.$queryRaw).not.toHaveBeenCalled(); + }); + + it("returns wasSkipped=false when no active run matches", async () => { + const prisma = mockPrisma({ existing: [] }); + const concern = new SkipIfActiveConcern(prisma as never); + + const result = await concern.handleTriggerRequest( + buildRequest({ skipIfActive: true, tags: ["connector:abc"] }) + ); + + expect(result).toEqual({ wasSkipped: false }); + expect(prisma.$queryRaw).toHaveBeenCalledTimes(1); + expect(prisma.taskRun.findUnique).not.toHaveBeenCalled(); + }); + + it("returns wasSkipped=true with the existing run when an active run matches all tags", async () => { + const existingRun = { id: "run_existing", status: "EXECUTING", runTags: ["connector:abc"] }; + const prisma = mockPrisma({ existing: [{ id: existingRun.id }], run: existingRun }); + const concern = new SkipIfActiveConcern(prisma as never); + + const result = await concern.handleTriggerRequest( + buildRequest({ skipIfActive: true, tags: ["connector:abc"] }) + ); + + expect(result).toMatchObject({ wasSkipped: true, run: existingRun }); + expect(prisma.$queryRaw).toHaveBeenCalledTimes(1); + expect(prisma.taskRun.findUnique).toHaveBeenCalledWith({ where: { id: "run_existing" } }); + }); + + it("normalizes a single tag string into an array", async () => { + const prisma = mockPrisma({ + existing: [{ id: "run_x" }], + run: { id: "run_x", status: "PENDING", runTags: ["connector:abc"] }, + }); + const concern = new SkipIfActiveConcern(prisma as never); + + const result = await concern.handleTriggerRequest( + // @ts-expect-error Zod allows both string and string[] for tags + buildRequest({ skipIfActive: true, tags: "connector:abc" }) + ); + + expect(result.wasSkipped).toBe(true); + }); + + it("recovers gracefully when the row disappears between the probe and the fetch", async () => { + const prisma = mockPrisma({ existing: [{ id: "run_gone" }], run: null }); + const concern = new SkipIfActiveConcern(prisma as never); + + const result = await concern.handleTriggerRequest( + buildRequest({ skipIfActive: true, tags: ["connector:abc"] }) + ); + + expect(result).toEqual({ wasSkipped: false }); + }); +}); diff --git a/docs/docs.json b/docs/docs.json index 13b8b7706de..4e47260bb1f 100644 --- a/docs/docs.json +++ b/docs/docs.json @@ -70,6 +70,7 @@ "versioning", "machines", "idempotency", + "skip-if-active", "runs/max-duration", "tags", "runs/metadata", diff --git a/docs/skip-if-active.mdx b/docs/skip-if-active.mdx new file mode 100644 index 00000000000..e771dce9a92 --- /dev/null +++ b/docs/skip-if-active.mdx @@ -0,0 +1,81 @@ +--- +title: "Skip-if-active" +description: "Drop a trigger request when a matching run is already in flight — built for cron-style scanners that should dedup without queueing." +--- + +## When to use `skipIfActive` + +`skipIfActive` is a trigger option that **drops** the request if an in-flight run with the same `taskIdentifier` and tag set already exists in the same environment. No new run is created, no queue entry is added, and the caller gets back the existing run with `wasSkipped: true`. + +Typical use case: a **cron-style orchestrator** that polls every minute and should start a sync if one isn't already running — but should **not** build up a backlog when the previous sync is still working. + +### `idempotencyKey` vs `concurrencyKey` vs `skipIfActive` + +| Option | Behavior when a matching run exists | Use when | +|------------------|-------------------------------------------------|----------------------------------------------------------| +| `idempotencyKey` | Returns the existing run (including completed!) | Webhook retries, "run this exactly once ever" | +| `concurrencyKey` | Queues the new run FIFO behind the existing one | Every request is real work that must eventually execute | +| `skipIfActive` | Drops the new trigger, returns the in-flight run | Cron polling; duplicate triggers are redundant work | + +`skipIfActive` differs from `idempotencyKey` because it **does not** cache completed runs — once the run reaches a terminal state the dedup window ends automatically. It differs from `concurrencyKey` because it does not queue — excess triggers are discarded, not delayed. + +## Example + +```ts +import { tasks } from "@trigger.dev/sdk/v3"; + +// Orchestrator fires every minute for each connector. +for (const connector of connectors) { + await tasks.trigger( + "ezderm-notes-fetch", + { connectorId: connector.id }, + { + tags: [`connector:${connector.id}`], + skipIfActive: true, + } + ); +} +``` + +If `ezderm-notes-fetch` is already running (PENDING / DEQUEUED / EXECUTING / WAITING_TO_RESUME / …) with `connector:` in its `runTags`, the second call is a no-op. + +## Required: at least one tag + +`skipIfActive` is a no-op without a `tags` scope. Without tags every run of the task would dedup against every other, which is rarely what you want. Supply one or more tags: + +```ts +await tasks.trigger("sync-patient", payload, { + tags: [`patient:${patientId}`], + skipIfActive: true, +}); +``` + +All supplied tags must be present on the existing run (AND semantics, powered by PostgreSQL's `array @> array` operator + the `runTags` GIN index). + +## Response shape + +```ts +// Fresh trigger — no conflict +{ id: "run_abc", isCached: false } + +// Dropped because an in-flight match was found +{ id: "run_existing", isCached: true, wasSkipped: true } +``` + +`isCached: true` is reused for parity with `idempotencyKey`-matched responses. The additional `wasSkipped: true` flag distinguishes drop-on-active from idempotency reuse when the caller needs to tell them apart. + +## Interaction with other options + +| Combined with | Behavior | +|--------------------------|------------------------------------------------------------------------------------------| +| `idempotencyKey` | Idempotency check runs first. A match wins and skipIfActive never fires. | +| `concurrencyKey` | skipIfActive runs before queue admission. A match drops — queue is not touched. | +| `delay` | A skipped trigger never creates a delayed run. The existing in-flight run is returned. | +| `ttl` | Irrelevant for skipped triggers — no new run is created. | +| `batchTrigger` | Applied per item. Some items may be skipped while others enqueue normally. | + +## When *not* to use it + +- Webhooks where the duplicate represents real, distinct work that must be processed. Use `idempotencyKey` to dedup by event id, or `concurrencyKey` to serialize. +- Workflows where losing a trigger is a data-loss bug. skipIfActive is drop-on-conflict — silently discarded triggers are the intended outcome. +- Anywhere you want visibility of queued work on the dashboard; skipped triggers leave no trace beyond the returned `wasSkipped: true` response. diff --git a/packages/core/src/v3/schemas/api.ts b/packages/core/src/v3/schemas/api.ts index 2fa9ba224a5..d7069defa70 100644 --- a/packages/core/src/v3/schemas/api.ts +++ b/packages/core/src/v3/schemas/api.ts @@ -191,6 +191,19 @@ export const TriggerTaskRequestBody = z.object({ delay: z.string().or(z.coerce.date()).optional(), idempotencyKey: z.string().optional(), idempotencyKeyTTL: z.string().optional(), + /** + * When true, the request is a no-op if an in-flight (non-terminal) run + * for the same `taskIdentifier` already exists in the same runtime + * environment AND contains all of the supplied `tags`. The existing run + * is returned with `wasSkipped: true` and no new run is created. + * + * Designed for cron-style scanners that poll repeatedly but should drop + * duplicate work when a previous invocation is still running — unlike + * `idempotencyKey` (which also caches successful completions) and + * unlike `concurrencyKey` (which queues duplicates). Requires at least + * one tag to scope the check. + */ + skipIfActive: z.boolean().optional(), machine: MachinePresetName.optional(), maxAttempts: z.number().int().optional(), maxDuration: z.number().optional(), @@ -212,6 +225,11 @@ export type TriggerTaskRequestBody = z.infer; export const TriggerTaskResponse = z.object({ id: z.string(), isCached: z.boolean().optional(), + /** + * When true, the request matched an existing in-flight run via + * `skipIfActive` and no new run was created. + */ + wasSkipped: z.boolean().optional(), }); export type TriggerTaskResponse = z.infer; @@ -233,6 +251,8 @@ export const BatchTriggerTaskItem = z.object({ delay: z.string().or(z.coerce.date()).optional(), idempotencyKey: z.string().optional(), idempotencyKeyTTL: z.string().optional(), + /** See `TriggerTaskRequestBody.options.skipIfActive`. */ + skipIfActive: z.boolean().optional(), lockToVersion: z.string().optional(), machine: MachinePresetName.optional(), maxAttempts: z.number().int().optional(),