-
-
Notifications
You must be signed in to change notification settings - Fork 1.2k
feat(webapp): add skipIfActive trigger option for drop-on-conflict dedup
#3433
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Closed
Closed
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
105 changes: 105 additions & 0 deletions
105
apps/webapp/app/runEngine/concerns/skipIfActive.server.ts
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,105 @@ | ||
| import { Prisma, type PrismaClientOrTransaction, type TaskRun } from "@trigger.dev/database"; | ||
| import { logger } from "~/services/logger.server"; | ||
| import type { TriggerTaskRequest } from "../types"; | ||
|
|
||
| export type SkipIfActiveConcernResult = | ||
| | { wasSkipped: true; run: TaskRun } | ||
| | { wasSkipped: false }; | ||
|
|
||
| /** | ||
| * DB-level `TaskRunStatus` values that represent a run that has not reached | ||
| * a terminal state — i.e. still counts as "active" for dedup purposes. | ||
| * Mirrors the non-final statuses in `TaskRunStatus` (see | ||
| * `internal-packages/database/prisma/schema.prisma`). | ||
| */ | ||
| const ACTIVE_TASK_RUN_STATUSES = [ | ||
| "DELAYED", | ||
| "PENDING", | ||
| "PENDING_VERSION", | ||
| "WAITING_FOR_DEPLOY", | ||
| "DEQUEUED", | ||
| "EXECUTING", | ||
| "WAITING_TO_RESUME", | ||
| "RETRYING_AFTER_FAILURE", | ||
| "PAUSED", | ||
| ] as const; | ||
|
|
||
| /** | ||
| * Implements the `skipIfActive` trigger option. | ||
| * | ||
| * When `body.options.skipIfActive === true` and at least one tag is set, the | ||
| * concern looks for any in-flight TaskRun with: | ||
| * | ||
| * runtimeEnvironmentId = <env> | ||
| * taskIdentifier = <task> | ||
| * status IN (non-terminal) | ||
| * runTags @> <supplied tags> | ||
| * | ||
| * If found, the existing run is returned and the trigger is short-circuited. | ||
| * If not found, the caller proceeds to create a new run as usual. | ||
| * | ||
| * Intended use case: cron-style scanners that poll at a fixed cadence but | ||
| * should drop duplicate triggers while a prior invocation is still running — | ||
| * without generating queue backlog (`concurrencyKey`) or caching successful | ||
| * completions (`idempotencyKey`). | ||
| */ | ||
| export class SkipIfActiveConcern { | ||
| constructor(private readonly prisma: PrismaClientOrTransaction) {} | ||
|
|
||
| async handleTriggerRequest(request: TriggerTaskRequest): Promise<SkipIfActiveConcernResult> { | ||
| if (request.body.options?.skipIfActive !== true) { | ||
| return { wasSkipped: false }; | ||
| } | ||
|
|
||
| const rawTags = request.body.options?.tags; | ||
| const tags = Array.isArray(rawTags) ? rawTags : rawTags ? [rawTags] : []; | ||
|
|
||
| if (tags.length === 0) { | ||
| // `skipIfActive` requires a tag scope — without tags, every run of this | ||
| // task would dedup against every other, which is rarely the intent. | ||
| // Treat as no-op rather than silently matching. | ||
| logger.debug("[SkipIfActiveConcern] skipIfActive=true with no tags — skipping the check", { | ||
| taskIdentifier: request.taskId, | ||
| }); | ||
| return { wasSkipped: false }; | ||
| } | ||
|
|
||
| // `runTags @> ARRAY[...]::text[]` hits the GIN ArrayOps index on | ||
| // `TaskRun.runTags` and AND-matches every supplied tag. Bounded by | ||
| // runtimeEnvironmentId + taskIdentifier + status via existing indexes. | ||
| const statusArray = Prisma.sql`ARRAY[${Prisma.join( | ||
| ACTIVE_TASK_RUN_STATUSES.map((s) => Prisma.sql`${s}`) | ||
| )}]::"TaskRunStatus"[]`; | ||
|
|
||
| const existing = await this.prisma.$queryRaw<Array<{ id: string }>>` | ||
| SELECT id | ||
| FROM "TaskRun" | ||
| WHERE "runtimeEnvironmentId" = ${request.environment.id} | ||
| AND "taskIdentifier" = ${request.taskId} | ||
| AND status = ANY(${statusArray}) | ||
| AND "runTags" @> ${tags}::text[] | ||
| LIMIT 1 | ||
| `; | ||
|
|
||
| if (existing.length === 0) { | ||
| return { wasSkipped: false }; | ||
| } | ||
|
|
||
| const run = await this.prisma.taskRun.findUnique({ where: { id: existing[0].id } }); | ||
| if (!run) { | ||
| // Row disappeared between the existence probe and the fetch (e.g. | ||
| // completed + deleted mid-query). Treat as "no active run" so the | ||
| // caller creates a fresh one instead of failing. | ||
| return { wasSkipped: false }; | ||
| } | ||
|
|
||
| logger.debug("[SkipIfActiveConcern] active run matched, skipping new trigger", { | ||
| runId: run.id, | ||
| taskIdentifier: request.taskId, | ||
| environmentId: request.environment.id, | ||
| tags, | ||
| }); | ||
|
|
||
| return { wasSkipped: true, run }; | ||
| } | ||
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,144 @@ | ||
| import { describe, expect, it, vi } from "vitest"; | ||
|
|
||
| vi.mock("~/services/logger.server", () => ({ | ||
| logger: { | ||
| debug: vi.fn(), | ||
| info: vi.fn(), | ||
| warn: vi.fn(), | ||
| error: vi.fn(), | ||
| }, | ||
| })); | ||
|
|
||
| import { SkipIfActiveConcern } from "../../app/runEngine/concerns/skipIfActive.server"; | ||
| import type { TriggerTaskRequest } from "../../app/runEngine/types"; | ||
|
|
||
| type MockPrisma = { | ||
| $queryRaw: ReturnType<typeof vi.fn>; | ||
| taskRun: { findUnique: ReturnType<typeof vi.fn> }; | ||
| }; | ||
|
|
||
| function buildRequest(overrides: { | ||
| skipIfActive?: boolean; | ||
| tags?: string | string[]; | ||
| taskId?: string; | ||
| environmentId?: string; | ||
| }): TriggerTaskRequest { | ||
| return { | ||
| taskId: overrides.taskId ?? "ezderm-notes-fetch", | ||
| friendlyId: "run_test", | ||
| environment: { | ||
| id: overrides.environmentId ?? "env_123", | ||
| organizationId: "org_1", | ||
| projectId: "proj_1", | ||
| } as TriggerTaskRequest["environment"], | ||
| body: { | ||
| payload: {}, | ||
| options: { | ||
| skipIfActive: overrides.skipIfActive, | ||
| tags: overrides.tags, | ||
| }, | ||
| }, | ||
| options: {}, | ||
| } as TriggerTaskRequest; | ||
| } | ||
|
|
||
| function mockPrisma(initial?: { | ||
| existing?: Array<{ id: string }>; | ||
| run?: { id: string } | null; | ||
| }): MockPrisma { | ||
| return { | ||
| $queryRaw: vi.fn().mockResolvedValue(initial?.existing ?? []), | ||
| taskRun: { findUnique: vi.fn().mockResolvedValue(initial?.run ?? null) }, | ||
| }; | ||
| } | ||
|
|
||
| describe("SkipIfActiveConcern", () => { | ||
| it("returns wasSkipped=false when the flag is not set", async () => { | ||
| const prisma = mockPrisma(); | ||
| const concern = new SkipIfActiveConcern(prisma as never); | ||
|
|
||
| const result = await concern.handleTriggerRequest( | ||
| buildRequest({ skipIfActive: undefined, tags: ["connector:abc"] }) | ||
| ); | ||
|
|
||
| expect(result).toEqual({ wasSkipped: false }); | ||
| expect(prisma.$queryRaw).not.toHaveBeenCalled(); | ||
| }); | ||
|
|
||
| it("returns wasSkipped=false when skipIfActive=false", async () => { | ||
| const prisma = mockPrisma(); | ||
| const concern = new SkipIfActiveConcern(prisma as never); | ||
|
|
||
| const result = await concern.handleTriggerRequest( | ||
| buildRequest({ skipIfActive: false, tags: ["connector:abc"] }) | ||
| ); | ||
|
|
||
| expect(result).toEqual({ wasSkipped: false }); | ||
| expect(prisma.$queryRaw).not.toHaveBeenCalled(); | ||
| }); | ||
|
|
||
| it("returns wasSkipped=false when no tags are supplied (tag scope required)", async () => { | ||
| const prisma = mockPrisma(); | ||
| const concern = new SkipIfActiveConcern(prisma as never); | ||
|
|
||
| const result = await concern.handleTriggerRequest( | ||
| buildRequest({ skipIfActive: true, tags: undefined }) | ||
| ); | ||
|
|
||
| expect(result).toEqual({ wasSkipped: false }); | ||
| expect(prisma.$queryRaw).not.toHaveBeenCalled(); | ||
| }); | ||
|
|
||
| it("returns wasSkipped=false when no active run matches", async () => { | ||
| const prisma = mockPrisma({ existing: [] }); | ||
| const concern = new SkipIfActiveConcern(prisma as never); | ||
|
|
||
| const result = await concern.handleTriggerRequest( | ||
| buildRequest({ skipIfActive: true, tags: ["connector:abc"] }) | ||
| ); | ||
|
|
||
| expect(result).toEqual({ wasSkipped: false }); | ||
| expect(prisma.$queryRaw).toHaveBeenCalledTimes(1); | ||
| expect(prisma.taskRun.findUnique).not.toHaveBeenCalled(); | ||
| }); | ||
|
|
||
| it("returns wasSkipped=true with the existing run when an active run matches all tags", async () => { | ||
| const existingRun = { id: "run_existing", status: "EXECUTING", runTags: ["connector:abc"] }; | ||
| const prisma = mockPrisma({ existing: [{ id: existingRun.id }], run: existingRun }); | ||
| const concern = new SkipIfActiveConcern(prisma as never); | ||
|
|
||
| const result = await concern.handleTriggerRequest( | ||
| buildRequest({ skipIfActive: true, tags: ["connector:abc"] }) | ||
| ); | ||
|
|
||
| expect(result).toMatchObject({ wasSkipped: true, run: existingRun }); | ||
| expect(prisma.$queryRaw).toHaveBeenCalledTimes(1); | ||
| expect(prisma.taskRun.findUnique).toHaveBeenCalledWith({ where: { id: "run_existing" } }); | ||
| }); | ||
|
|
||
| it("normalizes a single tag string into an array", async () => { | ||
| const prisma = mockPrisma({ | ||
| existing: [{ id: "run_x" }], | ||
| run: { id: "run_x", status: "PENDING", runTags: ["connector:abc"] }, | ||
| }); | ||
| const concern = new SkipIfActiveConcern(prisma as never); | ||
|
|
||
| const result = await concern.handleTriggerRequest( | ||
| // @ts-expect-error Zod allows both string and string[] for tags | ||
| buildRequest({ skipIfActive: true, tags: "connector:abc" }) | ||
| ); | ||
|
|
||
| expect(result.wasSkipped).toBe(true); | ||
| }); | ||
|
|
||
| it("recovers gracefully when the row disappears between the probe and the fetch", async () => { | ||
| const prisma = mockPrisma({ existing: [{ id: "run_gone" }], run: null }); | ||
| const concern = new SkipIfActiveConcern(prisma as never); | ||
|
|
||
| const result = await concern.handleTriggerRequest( | ||
| buildRequest({ skipIfActive: true, tags: ["connector:abc"] }) | ||
| ); | ||
|
|
||
| expect(result).toEqual({ wasSkipped: false }); | ||
| }); | ||
| }); | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🔴 Test file uses mocks and spies, violating mandatory repository testing rules
The test file
apps/webapp/test/engine/skipIfActiveConcern.test.tsextensively usesvi.mock(line 3),vi.fn()(lines 5–9, 50–51), and hand-rolled mock objects for the Prisma client, directly violating the mandatory testing rules inai/references/tests.md(referenced byAGENTS.md). Those rules explicitly state: "Do not mock anything", "Do not use mocks in tests", "Do not use spies in tests", "Do not use stubs in tests", "Do not use fakes in tests". TheAGENTS.mdfurther states: "Tests should avoid mocks or stubs and use the helpers from@internal/testcontainerswhen Redis or Postgres are needed." The test should usepostgresTestfrom@internal/testcontainersto run against a real Postgres database, similar to howapps/webapp/test/engine/triggerTask.test.tsusescontainerTestfor its core assertions.Prompt for agents
Was this helpful? React with 👍 or 👎 to provide feedback.