From deaa29bd19c6923fabff4be344ebe0670c05d12c Mon Sep 17 00:00:00 2001 From: Matt Aitken Date: Wed, 27 May 2026 14:31:48 -0700 Subject: [PATCH 1/5] fix(webapp): treat Phase 2 batch-stream retries as idempotent (TRI-9944) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When the SDK created a batch and then streamed its items (Phase 2 of the 2-phase batch API), a lost response would trigger the SDK's network-retry path. For small, fast-completing batches the original request had already enqueued every item, sealed the batch, and the runs flipped the batch to PROCESSING or COMPLETED by the time the retry arrived. The retry then failed the pre-loop check at streamBatchItems.server.ts:109 with a 422 — surfacing a customer-visible BatchTriggerError for a batch whose runs had actually succeeded. StreamBatchItemsService.call now returns the standard sealed:true success response (itemsAccepted: 0, itemsDeduplicated: 0, runCount: batch.runCount) when the batch is already sealed or in PROCESSING/COMPLETED, matching the idempotency already applied at the two post-loop race-condition branches in the same file (lines 226 and 306). ABORTED and other unexpected non-PENDING states still throw. Tests: - Rewrote the existing "already sealed" race test from expecting a throw to expecting sealed:true (Phase 2 retry idempotency). - Added a COMPLETED-pre-loop test mirroring the exact customer scenario (single-item batch, status=COMPLETED, sealed=false — tryCompleteBatch sets status without setting sealed). - Added a negative ABORTED test to lock in that terminal-failure states still surface as errors. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../batch-stream-phase2-retry-idempotency.md | 24 +++ .../services/streamBatchItems.server.ts | 24 ++- .../test/engine/streamBatchItems.test.ts | 161 +++++++++++++++--- 3 files changed, 183 insertions(+), 26 deletions(-) create mode 100644 .server-changes/batch-stream-phase2-retry-idempotency.md diff --git a/.server-changes/batch-stream-phase2-retry-idempotency.md b/.server-changes/batch-stream-phase2-retry-idempotency.md new file mode 100644 index 00000000000..77394ea0e73 --- /dev/null +++ b/.server-changes/batch-stream-phase2-retry-idempotency.md @@ -0,0 +1,24 @@ +--- +area: webapp +type: fix +--- + +Treat Phase 2 batch-stream retries as idempotent when the batch has +already been sealed or moved past `PENDING` (TRI-9944). + +When the SDK created a batch and then streamed its items (Phase 2 of +the 2-phase batch API), a lost response would trigger the SDK's +network-retry path. For small, fast-completing batches the original +request had already enqueued every item, sealed the batch, and let the +runs flip the batch to `PROCESSING` or even `COMPLETED` by the time the +retry arrived. The retry then failed the pre-loop check at +`apps/webapp/app/runEngine/services/streamBatchItems.server.ts:109` +with a 422 — surfacing a customer-visible `BatchTriggerError` for a +batch whose runs had actually succeeded. + +`StreamBatchItemsService.call` now returns the standard `sealed: true` +success response (with `itemsAccepted: 0`, `itemsDeduplicated: 0`, +`runCount: batch.runCount`) when the batch is already sealed or in +`PROCESSING`/`COMPLETED`, matching the idempotency already applied at +the two post-loop race-condition branches in the same file. +`ABORTED` and other unexpected non-`PENDING` states still throw. diff --git a/apps/webapp/app/runEngine/services/streamBatchItems.server.ts b/apps/webapp/app/runEngine/services/streamBatchItems.server.ts index 79c84eb540d..c9be860697b 100644 --- a/apps/webapp/app/runEngine/services/streamBatchItems.server.ts +++ b/apps/webapp/app/runEngine/services/streamBatchItems.server.ts @@ -100,13 +100,29 @@ export class StreamBatchItemsService extends WithRunEngine { throw new ServiceValidationError(`Batch ${batchFriendlyId} not found`); } - if (batch.sealed) { - throw new ServiceValidationError( - `Batch ${batchFriendlyId} is already sealed and cannot accept more items` - ); + // Phase 2 retry idempotency (TRI-9944): if the batch is already sealed + // or has moved past PENDING into PROCESSING/COMPLETED, this is a retry + // of a request whose response was lost — the original successful request + // already enqueued every item and sealed the batch. Returning sealed:true + // makes the SDK stop retrying instead of throwing a customer-visible 422. + if (batch.sealed || batch.status === "PROCESSING" || batch.status === "COMPLETED") { + logger.info("Batch already sealed/completed - treating Phase 2 retry as success", { + batchId: batchFriendlyId, + batchSealed: batch.sealed, + batchStatus: batch.status, + }); + + return { + id: batchFriendlyId, + itemsAccepted: 0, + itemsDeduplicated: 0, + sealed: true, + runCount: batch.runCount, + }; } if (batch.status !== "PENDING") { + // ABORTED or any other unexpected non-PENDING state — surface as an error. throw new ServiceValidationError( `Batch ${batchFriendlyId} is not in PENDING status (current: ${batch.status})` ); diff --git a/apps/webapp/test/engine/streamBatchItems.test.ts b/apps/webapp/test/engine/streamBatchItems.test.ts index 6e5f2264b1f..0dcdc0141ac 100644 --- a/apps/webapp/test/engine/streamBatchItems.test.ts +++ b/apps/webapp/test/engine/streamBatchItems.test.ts @@ -174,7 +174,7 @@ describe("StreamBatchItemsService", () => { ); containerTest( - "should handle race condition when batch already sealed by another request", + "should return sealed=true when batch is already sealed and PROCESSING (Phase 2 retry idempotency)", async ({ prisma, redisOptions }) => { const engine = new RunEngine({ prisma, @@ -211,35 +211,153 @@ describe("StreamBatchItemsService", () => { const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); - // Create a batch that is already sealed and PROCESSING (simulating another request won the race) + // Simulate the SDK retrying Phase 2 after the original request succeeded: + // the original request already sealed the batch and moved it to PROCESSING. const batch = await createBatch(prisma, authenticatedEnvironment.id, { runCount: 2, status: "PROCESSING", sealed: true, }); - // Initialize the batch in Redis with full count - await engine.initializeBatch({ - batchId: batch.id, - friendlyId: batch.friendlyId, - environmentId: authenticatedEnvironment.id, - environmentType: authenticatedEnvironment.type, - organizationId: authenticatedEnvironment.organizationId, - projectId: authenticatedEnvironment.projectId, - runCount: 2, - processingConcurrency: 10, + const service = new StreamBatchItemsService({ + prisma, + engine, }); - // Enqueue items directly - await engine.enqueueBatchItem(batch.id, authenticatedEnvironment.id, 0, { - task: "test-task", - payload: JSON.stringify({ data: "item1" }), - payloadType: "application/json", + const result = await service.call( + authenticatedEnvironment, + batch.friendlyId, + itemsToAsyncIterable([]), + { + maxItemBytes: 1024 * 1024, + } + ); + + // The retry should be treated as success — the original request already + // enqueued every item, so the SDK should stop retrying. + expect(result.sealed).toBe(true); + expect(result.id).toBe(batch.friendlyId); + expect(result.itemsAccepted).toBe(0); + expect(result.itemsDeduplicated).toBe(0); + expect(result.runCount).toBe(2); + + await engine.quit(); + } + ); + + containerTest( + "should return sealed=true when batch is COMPLETED before Phase 2 retry arrives (TRI-9944)", + async ({ prisma, redisOptions }) => { + const engine = new RunEngine({ + prisma, + worker: { + redis: redisOptions, + workers: 1, + tasksPerWorker: 10, + pollIntervalMs: 100, + disabled: true, + }, + queue: { + redis: redisOptions, + }, + runLock: { + redis: redisOptions, + }, + machines: { + defaultMachine: "small-1x", + machines: { + "small-1x": { + name: "small-1x" as const, + cpu: 0.5, + memory: 0.5, + centsPerMs: 0.0001, + }, + }, + baseCostInCents: 0.0005, + }, + batchQueue: { + redis: redisOptions, + }, + tracer: trace.getTracer("test", "0.0.0"), }); - await engine.enqueueBatchItem(batch.id, authenticatedEnvironment.id, 1, { - task: "test-task", - payload: JSON.stringify({ data: "item2" }), - payloadType: "application/json", + + const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); + + // The customer-reported scenario: single-item batch where the original + // Phase 2 request succeeded server-side, the run executed fast, the batch + // flipped to COMPLETED, then the lost-response SDK retry hits us. + // Note: tryCompleteBatch sets status=COMPLETED but does NOT set sealed=true. + const batch = await createBatch(prisma, authenticatedEnvironment.id, { + runCount: 1, + status: "COMPLETED", + sealed: false, + }); + + const service = new StreamBatchItemsService({ + prisma, + engine, + }); + + const result = await service.call( + authenticatedEnvironment, + batch.friendlyId, + itemsToAsyncIterable([]), + { + maxItemBytes: 1024 * 1024, + } + ); + + expect(result.sealed).toBe(true); + expect(result.id).toBe(batch.friendlyId); + expect(result.itemsAccepted).toBe(0); + expect(result.itemsDeduplicated).toBe(0); + + await engine.quit(); + } + ); + + containerTest( + "should throw when batch is in ABORTED status", + async ({ prisma, redisOptions }) => { + const engine = new RunEngine({ + prisma, + worker: { + redis: redisOptions, + workers: 1, + tasksPerWorker: 10, + pollIntervalMs: 100, + disabled: true, + }, + queue: { + redis: redisOptions, + }, + runLock: { + redis: redisOptions, + }, + machines: { + defaultMachine: "small-1x", + machines: { + "small-1x": { + name: "small-1x" as const, + cpu: 0.5, + memory: 0.5, + centsPerMs: 0.0001, + }, + }, + baseCostInCents: 0.0005, + }, + batchQueue: { + redis: redisOptions, + }, + tracer: trace.getTracer("test", "0.0.0"), + }); + + const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); + + const batch = await createBatch(prisma, authenticatedEnvironment.id, { + runCount: 2, + status: "ABORTED", + sealed: false, }); const service = new StreamBatchItemsService({ @@ -247,7 +365,6 @@ describe("StreamBatchItemsService", () => { engine, }); - // This should fail because the batch is already sealed await expect( service.call(authenticatedEnvironment, batch.friendlyId, itemsToAsyncIterable([]), { maxItemBytes: 1024 * 1024, From e8caa1dfd5fd1308d0c7c939f869908d7af47e66 Mon Sep 17 00:00:00 2001 From: Matt Aitken Date: Wed, 27 May 2026 14:50:00 -0700 Subject: [PATCH 2/5] fix(webapp): tighten Phase 2 idempotency check; cover PARTIAL_FAILED race MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Addresses two PR review findings: CodeRabbit: sealed=true + ABORTED would silently succeed under the previous `if (batch.sealed || ...)` check. V2's batch completion callback can set status=ABORTED (failedRunCount > 0 && successfulRunCount === 0) on a batch that streamBatchItems already sealed — leaving sealed=true alongside a terminally-failed batch. A Phase 2 retry of such a batch must surface the error, not mask it. Devin: PARTIAL_FAILED (failedRunCount > 0 with at least one success) is a real V2 completion-callback status, but neither the pre-loop check nor the post-loop race handlers (lines 226 and 306) accepted it as success. A retry whose original stream succeeded would either 422 at the pre-loop or hit "unexpected state" at the post-loop seal- failed branch. Changes: - Pre-loop: replace the broad `sealed || PROCESSING || COMPLETED` check with an `isIdempotentRetrySuccess` boolean that admits PROCESSING, COMPLETED, PARTIAL_FAILED, or (sealed && PENDING) — ABORTED falls through to the throw. - Post-loop count-mismatch (line 226 region): add PARTIAL_FAILED to the success short-circuit alongside sealed and COMPLETED. - Post-loop seal-failed (line 306 region): add PARTIAL_FAILED to the success short-circuit alongside (sealed && PROCESSING) and COMPLETED. Tests (TDD red-then-green): - New: pre-loop sealed=true + ABORTED → throws (CodeRabbit's case). - New: pre-loop PARTIAL_FAILED → returns sealed:true. - New: post-loop seal-failed race with PARTIAL_FAILED → returns sealed:true (uses the same racingPrisma pattern as the existing COMPLETED race test). All 34 tests in streamBatchItems.test.ts pass. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../services/streamBatchItems.server.ts | 35 ++- .../test/engine/streamBatchItems.test.ts | 251 ++++++++++++++++++ 2 files changed, 278 insertions(+), 8 deletions(-) diff --git a/apps/webapp/app/runEngine/services/streamBatchItems.server.ts b/apps/webapp/app/runEngine/services/streamBatchItems.server.ts index c9be860697b..a483bb1e43e 100644 --- a/apps/webapp/app/runEngine/services/streamBatchItems.server.ts +++ b/apps/webapp/app/runEngine/services/streamBatchItems.server.ts @@ -100,12 +100,26 @@ export class StreamBatchItemsService extends WithRunEngine { throw new ServiceValidationError(`Batch ${batchFriendlyId} not found`); } - // Phase 2 retry idempotency (TRI-9944): if the batch is already sealed - // or has moved past PENDING into PROCESSING/COMPLETED, this is a retry - // of a request whose response was lost — the original successful request - // already enqueued every item and sealed the batch. Returning sealed:true - // makes the SDK stop retrying instead of throwing a customer-visible 422. - if (batch.sealed || batch.status === "PROCESSING" || batch.status === "COMPLETED") { + // Phase 2 retry idempotency (TRI-9944): a successful original request + // sealed the batch (sealed=true, status=PROCESSING) and the V2 batch + // completion callback can then independently update status to: + // - PENDING (all runs created — sealed stays true) + // - PARTIAL_FAILED (some run creations failed — sealed stays true/false) + // - COMPLETED (set by tryCompleteBatch after every run reaches a final + // state — sealed is NOT set by this path) + // For all of these the Phase 2 stream did its job, so a retry should + // return sealed:true and the SDK stops retrying. + // + // ABORTED is explicitly excluded — it means every run-creation attempt + // failed and the batch is terminally broken; surface that as an error + // rather than masking it as success. + const isIdempotentRetrySuccess = + batch.status === "PROCESSING" || + batch.status === "COMPLETED" || + batch.status === "PARTIAL_FAILED" || + (batch.sealed && batch.status === "PENDING"); + + if (isIdempotentRetrySuccess) { logger.info("Batch already sealed/completed - treating Phase 2 retry as success", { batchId: batchFriendlyId, batchSealed: batch.sealed, @@ -239,7 +253,11 @@ export class StreamBatchItemsService extends WithRunEngine { select: { sealed: true, status: true }, }); - if (currentBatch?.sealed || currentBatch?.status === "COMPLETED") { + if ( + currentBatch?.sealed || + currentBatch?.status === "COMPLETED" || + currentBatch?.status === "PARTIAL_FAILED" + ) { logger.info("Batch already sealed before count check (fast completion)", { batchId: batchFriendlyId, itemsAccepted, @@ -321,7 +339,8 @@ export class StreamBatchItemsService extends WithRunEngine { if ( (currentBatch?.sealed && currentBatch.status === "PROCESSING") || - currentBatch?.status === "COMPLETED" + currentBatch?.status === "COMPLETED" || + currentBatch?.status === "PARTIAL_FAILED" ) { logger.info("Batch already sealed/completed by concurrent path", { batchId: batchFriendlyId, diff --git a/apps/webapp/test/engine/streamBatchItems.test.ts b/apps/webapp/test/engine/streamBatchItems.test.ts index 0dcdc0141ac..3dc9c0163fb 100644 --- a/apps/webapp/test/engine/streamBatchItems.test.ts +++ b/apps/webapp/test/engine/streamBatchItems.test.ts @@ -375,6 +375,257 @@ describe("StreamBatchItemsService", () => { } ); + containerTest( + "should throw when batch is sealed but ABORTED (callback aborted post-seal must surface as error)", + async ({ prisma, redisOptions }) => { + const engine = new RunEngine({ + prisma, + worker: { + redis: redisOptions, + workers: 1, + tasksPerWorker: 10, + pollIntervalMs: 100, + disabled: true, + }, + queue: { + redis: redisOptions, + }, + runLock: { + redis: redisOptions, + }, + machines: { + defaultMachine: "small-1x", + machines: { + "small-1x": { + name: "small-1x" as const, + cpu: 0.5, + memory: 0.5, + centsPerMs: 0.0001, + }, + }, + baseCostInCents: 0.0005, + }, + batchQueue: { + redis: redisOptions, + }, + tracer: trace.getTracer("test", "0.0.0"), + }); + + const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); + + // V2 batch completion callback sets status=ABORTED (failedRunCount > 0 && + // successfulRunCount === 0) without touching sealed=true that the seal + // step previously set. The Phase 2 retry must NOT mask this terminal + // failure as success — every run failed. + const batch = await createBatch(prisma, authenticatedEnvironment.id, { + runCount: 2, + status: "ABORTED", + sealed: true, + }); + + const service = new StreamBatchItemsService({ + prisma, + engine, + }); + + await expect( + service.call(authenticatedEnvironment, batch.friendlyId, itemsToAsyncIterable([]), { + maxItemBytes: 1024 * 1024, + }) + ).rejects.toThrow(ServiceValidationError); + + await engine.quit(); + } + ); + + containerTest( + "should return sealed=true when batch is PARTIAL_FAILED (Phase 2 retry idempotency)", + async ({ prisma, redisOptions }) => { + const engine = new RunEngine({ + prisma, + worker: { + redis: redisOptions, + workers: 1, + tasksPerWorker: 10, + pollIntervalMs: 100, + disabled: true, + }, + queue: { + redis: redisOptions, + }, + runLock: { + redis: redisOptions, + }, + machines: { + defaultMachine: "small-1x", + machines: { + "small-1x": { + name: "small-1x" as const, + cpu: 0.5, + memory: 0.5, + centsPerMs: 0.0001, + }, + }, + baseCostInCents: 0.0005, + }, + batchQueue: { + redis: redisOptions, + }, + tracer: trace.getTracer("test", "0.0.0"), + }); + + const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); + + // V2 completion callback sets PARTIAL_FAILED when some run-creation + // attempts failed but at least one succeeded. The Phase 2 stream itself + // did its job (items were enqueued and processed), so a retry should + // see this as terminal success — the per-item failures are visible on + // the individual run records. + const batch = await createBatch(prisma, authenticatedEnvironment.id, { + runCount: 2, + status: "PARTIAL_FAILED", + sealed: false, + }); + + const service = new StreamBatchItemsService({ + prisma, + engine, + }); + + const result = await service.call( + authenticatedEnvironment, + batch.friendlyId, + itemsToAsyncIterable([]), + { + maxItemBytes: 1024 * 1024, + } + ); + + expect(result.sealed).toBe(true); + expect(result.id).toBe(batch.friendlyId); + expect(result.itemsAccepted).toBe(0); + expect(result.itemsDeduplicated).toBe(0); + + await engine.quit(); + } + ); + + containerTest( + "should return sealed=true when batch is PARTIAL_FAILED by callback before seal attempt", + async ({ prisma, redisOptions }) => { + const engine = new RunEngine({ + prisma, + worker: { + redis: redisOptions, + workers: 1, + tasksPerWorker: 10, + pollIntervalMs: 100, + disabled: true, + }, + queue: { + redis: redisOptions, + }, + runLock: { + redis: redisOptions, + }, + machines: { + defaultMachine: "small-1x", + machines: { + "small-1x": { + name: "small-1x" as const, + cpu: 0.5, + memory: 0.5, + centsPerMs: 0.0001, + }, + }, + baseCostInCents: 0.0005, + }, + batchQueue: { + redis: redisOptions, + }, + tracer: trace.getTracer("test", "0.0.0"), + }); + + const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); + + const batch = await createBatch(prisma, authenticatedEnvironment.id, { + runCount: 2, + status: "PENDING", + sealed: false, + }); + + await engine.initializeBatch({ + batchId: batch.id, + friendlyId: batch.friendlyId, + environmentId: authenticatedEnvironment.id, + environmentType: authenticatedEnvironment.type, + organizationId: authenticatedEnvironment.organizationId, + projectId: authenticatedEnvironment.projectId, + runCount: 2, + processingConcurrency: 10, + }); + + await engine.enqueueBatchItem(batch.id, authenticatedEnvironment.id, 0, { + task: "test-task", + payload: JSON.stringify({ data: "item1" }), + payloadType: "application/json", + }); + await engine.enqueueBatchItem(batch.id, authenticatedEnvironment.id, 1, { + task: "test-task", + payload: JSON.stringify({ data: "item2" }), + payloadType: "application/json", + }); + + // Simulate the race where V2's batchCompletionCallback runs between + // getEnqueuedCount and the seal updateMany — some runs failed to create + // but at least one succeeded, so callback sets status=PARTIAL_FAILED + // without setting sealed=true. + const racingPrisma = { + ...prisma, + batchTaskRun: { + ...prisma.batchTaskRun, + findFirst: prisma.batchTaskRun.findFirst.bind(prisma.batchTaskRun), + updateMany: async () => { + await prisma.batchTaskRun.update({ + where: { id: batch.id }, + data: { + status: "PARTIAL_FAILED", + }, + }); + return { count: 0 }; + }, + findUnique: prisma.batchTaskRun.findUnique.bind(prisma.batchTaskRun), + }, + } as unknown as PrismaClient; + + const service = new StreamBatchItemsService({ + prisma: racingPrisma, + engine, + }); + + const result = await service.call( + authenticatedEnvironment, + batch.friendlyId, + itemsToAsyncIterable([]), + { + maxItemBytes: 1024 * 1024, + } + ); + + expect(result.sealed).toBe(true); + expect(result.id).toBe(batch.friendlyId); + + const updatedBatch = await prisma.batchTaskRun.findUnique({ + where: { id: batch.id }, + }); + + expect(updatedBatch?.status).toBe("PARTIAL_FAILED"); + expect(updatedBatch?.sealed).toBe(false); + + await engine.quit(); + } + ); + containerTest( "should return sealed=true when concurrent request already sealed the batch during seal attempt", async ({ prisma, redisOptions }) => { From 5a25abf2cd61760feb11b99c7f252d5fb751008d Mon Sep 17 00:00:00 2001 From: Matt Aitken Date: Wed, 27 May 2026 15:18:56 -0700 Subject: [PATCH 3/5] refactor(webapp): unify Phase 2 retry idempotency check across all 3 branches MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit After settling the operational contract — ABORTED throws because zero TaskRun records exist for the customer to monitor; every other terminal state returns sealed:true because TaskRun records exist (some may be in failed state, but per-run signals reach the customer through run monitoring) — three inconsistencies remained between the pre-loop check and the two post-loop race handlers: 1. Seal-failed branch threw "unexpected state" on sealed=true + PENDING, which is the legitimate post-callback "all runs created" state (V2 batchCompletionCallback resets PROCESSING → PENDING and leaves sealed=true). Pre-loop and count-mismatch both accept this state. 2. Count-mismatch branch admitted sealed=true + ABORTED via the bare `currentBatch?.sealed` clause, returning sealed:true. Pre-loop throws on this state. The count-mismatch outcome would silently hide a batch where zero TaskRuns were created. 3. Count-mismatch branch's fall-through return (sealed:false) implies "retry with missing items", which is wrong for ABORTED — a fresh batch is needed. Extracted the per-status policy into an exported helper: isIdempotentRetrySuccess(status, sealed) returns true for PROCESSING, COMPLETED, PARTIAL_FAILED, or (sealed && PENDING). ABORTED is excluded so the customer's batchTrigger() retry fires. All three branches now call the same helper. The count-mismatch branch additionally throws explicitly on ABORTED before falling through to the sealed:false return. Tests (TDD red-then-green): - New: seal-failed race with sealed=true + PENDING returns sealed:true (was throwing "unexpected state"). Uses racingPrisma to set the exact post-callback shape during the seal updateMany. - New: count-mismatch race with sealed=true + ABORTED throws ServiceValidationError (was returning sealed:false). Uses a call-counter on findFirst to flip the batch state between the pre-loop read and the re-query. All 36 tests in streamBatchItems.test.ts pass; webapp typecheck clean. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../services/streamBatchItems.server.ts | 83 +++--- .../test/engine/streamBatchItems.test.ts | 239 ++++++++++++++++++ 2 files changed, 289 insertions(+), 33 deletions(-) diff --git a/apps/webapp/app/runEngine/services/streamBatchItems.server.ts b/apps/webapp/app/runEngine/services/streamBatchItems.server.ts index a483bb1e43e..c97b05ab135 100644 --- a/apps/webapp/app/runEngine/services/streamBatchItems.server.ts +++ b/apps/webapp/app/runEngine/services/streamBatchItems.server.ts @@ -4,12 +4,45 @@ import { } from "@trigger.dev/core/v3"; import { BatchId } from "@trigger.dev/core/v3/isomorphic"; import type { BatchItem, RunEngine } from "@internal/run-engine"; +import type { BatchTaskRunStatus } from "@trigger.dev/database"; import { prisma, type PrismaClientOrTransaction } from "~/db.server"; import type { AuthenticatedEnvironment } from "~/services/apiAuth.server"; import { logger } from "~/services/logger.server"; import { ServiceValidationError, WithRunEngine } from "../../v3/services/baseService.server"; import { BatchPayloadProcessor } from "../concerns/batchPayloads.server"; +/** + * Phase 2 retry idempotency check (TRI-9944). + * + * Returns true when the batch is in a state that means the Phase 2 stream's + * job has already been done by an earlier (or concurrent) request — every + * item is enqueued, runs have been or are being created, and at least one + * TaskRun record exists for the customer to monitor. A retry should return + * sealed:true in these states so the SDK stops retrying. + * + * - PROCESSING / sealed=true + PENDING: original sealed; runs are executing + * (PENDING after callback "all runs created") or about to. + * - COMPLETED: every run reached a terminal state (tryCompleteBatch). + * - PARTIAL_FAILED: at least one TaskRun record exists; per-run failures + * are visible at the run level. + * + * ABORTED is intentionally excluded — it means ZERO TaskRun records were + * created (every per-item attempt failed AND the pre-failed-TaskRun fallback + * also failed). The customer has nothing to monitor at the run level, so + * the trigger call must throw to give their retry/error handling a chance. + */ +export function isIdempotentRetrySuccess( + status: BatchTaskRunStatus | null | undefined, + sealed: boolean | null | undefined +): boolean { + return ( + status === "PROCESSING" || + status === "COMPLETED" || + status === "PARTIAL_FAILED" || + (sealed === true && status === "PENDING") + ); +} + export type StreamBatchItemsServiceOptions = { maxItemBytes: number; }; @@ -100,26 +133,7 @@ export class StreamBatchItemsService extends WithRunEngine { throw new ServiceValidationError(`Batch ${batchFriendlyId} not found`); } - // Phase 2 retry idempotency (TRI-9944): a successful original request - // sealed the batch (sealed=true, status=PROCESSING) and the V2 batch - // completion callback can then independently update status to: - // - PENDING (all runs created — sealed stays true) - // - PARTIAL_FAILED (some run creations failed — sealed stays true/false) - // - COMPLETED (set by tryCompleteBatch after every run reaches a final - // state — sealed is NOT set by this path) - // For all of these the Phase 2 stream did its job, so a retry should - // return sealed:true and the SDK stops retrying. - // - // ABORTED is explicitly excluded — it means every run-creation attempt - // failed and the batch is terminally broken; surface that as an error - // rather than masking it as success. - const isIdempotentRetrySuccess = - batch.status === "PROCESSING" || - batch.status === "COMPLETED" || - batch.status === "PARTIAL_FAILED" || - (batch.sealed && batch.status === "PENDING"); - - if (isIdempotentRetrySuccess) { + if (isIdempotentRetrySuccess(batch.status, batch.sealed)) { logger.info("Batch already sealed/completed - treating Phase 2 retry as success", { batchId: batchFriendlyId, batchSealed: batch.sealed, @@ -137,6 +151,8 @@ export class StreamBatchItemsService extends WithRunEngine { if (batch.status !== "PENDING") { // ABORTED or any other unexpected non-PENDING state — surface as an error. + // For ABORTED specifically, throwing is required so the customer's + // batchTrigger() retries (a new batch) can recreate the runs. throw new ServiceValidationError( `Batch ${batchFriendlyId} is not in PENDING status (current: ${batch.status})` ); @@ -253,18 +269,14 @@ export class StreamBatchItemsService extends WithRunEngine { select: { sealed: true, status: true }, }); - if ( - currentBatch?.sealed || - currentBatch?.status === "COMPLETED" || - currentBatch?.status === "PARTIAL_FAILED" - ) { + if (isIdempotentRetrySuccess(currentBatch?.status, currentBatch?.sealed)) { logger.info("Batch already sealed before count check (fast completion)", { batchId: batchFriendlyId, itemsAccepted, itemsDeduplicated, enqueuedCount, expectedCount: batch.runCount, - batchStatus: currentBatch.status, + batchStatus: currentBatch?.status, }); return { @@ -276,6 +288,15 @@ export class StreamBatchItemsService extends WithRunEngine { }; } + if (currentBatch?.status === "ABORTED") { + // Zero TaskRuns exist — the count-mismatch sealed:false semantics + // ("retry with missing items") would mislead the SDK. Throw so the + // customer's batchTrigger() retry creates a fresh batch. + throw new ServiceValidationError( + `Batch ${batchFriendlyId} is not in PENDING status (current: ABORTED)` + ); + } + logger.warn("Batch item count mismatch", { batchId: batchFriendlyId, expected: batch.runCount, @@ -337,18 +358,14 @@ export class StreamBatchItemsService extends WithRunEngine { }, }); - if ( - (currentBatch?.sealed && currentBatch.status === "PROCESSING") || - currentBatch?.status === "COMPLETED" || - currentBatch?.status === "PARTIAL_FAILED" - ) { + if (isIdempotentRetrySuccess(currentBatch?.status, currentBatch?.sealed)) { logger.info("Batch already sealed/completed by concurrent path", { batchId: batchFriendlyId, itemsAccepted, itemsDeduplicated, envId: environment.id, - batchStatus: currentBatch.status, - batchSealed: currentBatch.sealed, + batchStatus: currentBatch?.status, + batchSealed: currentBatch?.sealed, }); span.setAttribute("itemsAccepted", itemsAccepted); diff --git a/apps/webapp/test/engine/streamBatchItems.test.ts b/apps/webapp/test/engine/streamBatchItems.test.ts index 3dc9c0163fb..09de73baf6f 100644 --- a/apps/webapp/test/engine/streamBatchItems.test.ts +++ b/apps/webapp/test/engine/streamBatchItems.test.ts @@ -1083,6 +1083,245 @@ describe("StreamBatchItemsService", () => { await engine.quit(); } ); + + containerTest( + "should return sealed=true when seal-failed race produces sealed=true + PENDING (post-callback all-created)", + async ({ prisma, redisOptions }) => { + const engine = new RunEngine({ + prisma, + worker: { + redis: redisOptions, + workers: 1, + tasksPerWorker: 10, + pollIntervalMs: 100, + disabled: true, + }, + queue: { + redis: redisOptions, + }, + runLock: { + redis: redisOptions, + }, + machines: { + defaultMachine: "small-1x", + machines: { + "small-1x": { + name: "small-1x" as const, + cpu: 0.5, + memory: 0.5, + centsPerMs: 0.0001, + }, + }, + baseCostInCents: 0.0005, + }, + batchQueue: { + redis: redisOptions, + }, + tracer: trace.getTracer("test", "0.0.0"), + }); + + const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); + + const batch = await createBatch(prisma, authenticatedEnvironment.id, { + runCount: 2, + status: "PENDING", + sealed: false, + }); + + await engine.initializeBatch({ + batchId: batch.id, + friendlyId: batch.friendlyId, + environmentId: authenticatedEnvironment.id, + environmentType: authenticatedEnvironment.type, + organizationId: authenticatedEnvironment.organizationId, + projectId: authenticatedEnvironment.projectId, + runCount: 2, + processingConcurrency: 10, + }); + + await engine.enqueueBatchItem(batch.id, authenticatedEnvironment.id, 0, { + task: "test-task", + payload: JSON.stringify({ data: "item1" }), + payloadType: "application/json", + }); + await engine.enqueueBatchItem(batch.id, authenticatedEnvironment.id, 1, { + task: "test-task", + payload: JSON.stringify({ data: "item2" }), + payloadType: "application/json", + }); + + // Simulate the race where a concurrent path seals the batch (sealed=true, + // PROCESSING), then the V2 batchCompletionCallback fires with all runs + // created successfully and resets status to PENDING (sealed stays true). + // Our seal updateMany then fails the conditional (sealed=false no longer + // matches), and the re-query sees sealed=true + PENDING — a perfectly + // valid post-callback state that the SDK retry should treat as success. + const racingPrisma = { + ...prisma, + batchTaskRun: { + ...prisma.batchTaskRun, + findFirst: prisma.batchTaskRun.findFirst.bind(prisma.batchTaskRun), + updateMany: async () => { + await prisma.batchTaskRun.update({ + where: { id: batch.id }, + data: { + sealed: true, + sealedAt: new Date(), + // Intentionally leave status as PENDING — that's exactly what + // the V2 batchCompletionCallback does after all runs are + // created (status PROCESSING → PENDING). + }, + }); + return { count: 0 }; + }, + findUnique: prisma.batchTaskRun.findUnique.bind(prisma.batchTaskRun), + }, + } as unknown as PrismaClient; + + const service = new StreamBatchItemsService({ + prisma: racingPrisma, + engine, + }); + + const result = await service.call( + authenticatedEnvironment, + batch.friendlyId, + itemsToAsyncIterable([]), + { + maxItemBytes: 1024 * 1024, + } + ); + + expect(result.sealed).toBe(true); + expect(result.id).toBe(batch.friendlyId); + + const updatedBatch = await prisma.batchTaskRun.findUnique({ + where: { id: batch.id }, + }); + + expect(updatedBatch?.sealed).toBe(true); + expect(updatedBatch?.status).toBe("PENDING"); + + await engine.quit(); + } + ); + + containerTest( + "should throw when count-mismatch race produces sealed=true + ABORTED (no TaskRuns created)", + async ({ prisma, redisOptions }) => { + const engine = new RunEngine({ + prisma, + worker: { + redis: redisOptions, + workers: 1, + tasksPerWorker: 10, + pollIntervalMs: 100, + disabled: true, + }, + queue: { + redis: redisOptions, + }, + runLock: { + redis: redisOptions, + }, + machines: { + defaultMachine: "small-1x", + machines: { + "small-1x": { + name: "small-1x" as const, + cpu: 0.5, + memory: 0.5, + centsPerMs: 0.0001, + }, + }, + baseCostInCents: 0.0005, + }, + batchQueue: { + redis: redisOptions, + }, + tracer: trace.getTracer("test", "0.0.0"), + }); + + const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); + + const batch = await createBatch(prisma, authenticatedEnvironment.id, { + runCount: 3, + status: "PENDING", + sealed: false, + }); + + await engine.initializeBatch({ + batchId: batch.id, + friendlyId: batch.friendlyId, + environmentId: authenticatedEnvironment.id, + environmentType: authenticatedEnvironment.type, + organizationId: authenticatedEnvironment.organizationId, + projectId: authenticatedEnvironment.projectId, + runCount: 3, + processingConcurrency: 10, + }); + + // Only enqueue 2 items so the post-loop count check trips into the + // mismatch handler. The race we're simulating: between our pre-loop + // findFirst and the count-mismatch re-query, a concurrent path sealed + // the batch, runs were attempted, every run-creation failed AND the + // pre-failed-TaskRun fallback also failed → callback sets ABORTED. + // The customer has zero TaskRun records to monitor, so the retry must + // throw rather than silently succeed. + await engine.enqueueBatchItem(batch.id, authenticatedEnvironment.id, 0, { + task: "test-task", + payload: JSON.stringify({ data: "item1" }), + payloadType: "application/json", + }); + await engine.enqueueBatchItem(batch.id, authenticatedEnvironment.id, 1, { + task: "test-task", + payload: JSON.stringify({ data: "item2" }), + payloadType: "application/json", + }); + + // Override findFirst to flip the batch to sealed=true + ABORTED on the + // re-query that happens INSIDE the count-mismatch branch. The first + // findFirst (pre-loop) must still see PENDING + sealed=false so we + // pass through and reach the count-mismatch branch. + let findFirstCallCount = 0; + const racingPrisma = { + ...prisma, + batchTaskRun: { + ...prisma.batchTaskRun, + findFirst: async (args: Parameters[0]) => { + findFirstCallCount++; + if (findFirstCallCount >= 2) { + await prisma.batchTaskRun.update({ + where: { id: batch.id }, + data: { + sealed: true, + sealedAt: new Date(), + status: "ABORTED", + completedAt: new Date(), + }, + }); + } + return prisma.batchTaskRun.findFirst.call(prisma.batchTaskRun, args); + }, + updateMany: prisma.batchTaskRun.updateMany.bind(prisma.batchTaskRun), + findUnique: prisma.batchTaskRun.findUnique.bind(prisma.batchTaskRun), + }, + } as unknown as PrismaClient; + + const service = new StreamBatchItemsService({ + prisma: racingPrisma, + engine, + }); + + await expect( + service.call(authenticatedEnvironment, batch.friendlyId, itemsToAsyncIterable([]), { + maxItemBytes: 1024 * 1024, + }) + ).rejects.toThrow(ServiceValidationError); + + await engine.quit(); + } + ); }); describe("createNdjsonParserStream", () => { From f28f53f9a6b65b5c121866d00ee94732315c635b Mon Sep 17 00:00:00 2001 From: Matt Aitken Date: Wed, 27 May 2026 15:48:20 -0700 Subject: [PATCH 4/5] fix(webapp): handle cleanup-race in Phase 2 stream (4-item batchTriggerAndWait reports) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Another customer hit a related mode of the same Phase 2 stream class of bug: parent batchTriggerAndWait throwing BatchTriggerError despite every child run completing successfully. Pattern: 10 occurrences over 2 days, all 4-item batches, parents fail ~45s after the 200 — exactly five SDK stream-retry attempts exhausting. Trace of the failure mode against the existing code: 1. SDK POST /items sends 4 items. Server enqueues all 4. 2. BatchQueue rushes through them (independent items, fast). All 4 TaskRuns created. 3. batchCompletionCallback fires — sets processingCompletedAt = now(), successfulRunCount = 4, runIds. Status stays PENDING (the callback's "all created" happy path). sealed stays false (callback never touches it). 4. cleanup() runs, deletes the Redis batch metadata. 5. Our service's getBatchEnqueuedCount returns 0. Count-mismatch branch: 0 != 4. 6. Re-query: status=PENDING, sealed=false. Neither (sealed && PENDING) nor any of PROCESSING/COMPLETED/PARTIAL_FAILED matched → fell through to the sealed:false "client should stream more items" return. Server: 200 + sealed:false (matches the customer's "first POST returned 200, 8.1s"). 7. SDK retries the stream. engine.enqueueBatchItem at batch-queue/ index.ts:346 throws `Batch not found or not initialized` because cleanup deleted the metadata. Five retries exhaust → SDK throws BatchTriggerError (~45s after the 200). The discriminator that distinguishes "callback fired, work is done" from "client should stream more items" is processingCompletedAt: it's written exclusively by the V2 batchCompletionCallback (verified by grep across the run-engine and webapp). Nothing else touches it. Extended isIdempotentRetrySuccess to take processingCompletedAt as a third argument: (status === PENDING) && (sealed === true || processingCompletedAt != null) now means "callback fired, every item has a TaskRun, return sealed:true". The same helper is used by all three branches (pre-loop, count-mismatch, seal-failed) so the contract stays uniform. All three findFirst selects add `processingCompletedAt`. ABORTED still excluded everywhere. Test helper createBatch now accepts PARTIAL_FAILED (per CodeRabbit's adjacent nit on the previous commit) and processingCompletedAt. Tests (TDD red-then-green): - New: pre-loop with sealed=false + PENDING + processingCompletedAt set → returns sealed:true. Exercises the path a Phase 2 retry would hit if it arrived after the original count-mismatch returned sealed:false. - New: count-mismatch race with the customer's exact shape (sealed=false + PENDING + processingCompletedAt flipped between pre-loop read and re-query) → returns sealed:true. Uses the findFirst-counter racing pattern to reproduce the production timing. All 38 tests in streamBatchItems.test.ts pass; webapp typecheck clean. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../batch-stream-phase2-retry-idempotency.md | 59 ++++-- .../services/streamBatchItems.server.ts | 65 ++++-- .../test/engine/streamBatchItems.test.ts | 199 +++++++++++++++++- 3 files changed, 286 insertions(+), 37 deletions(-) diff --git a/.server-changes/batch-stream-phase2-retry-idempotency.md b/.server-changes/batch-stream-phase2-retry-idempotency.md index 77394ea0e73..1a3672c5ead 100644 --- a/.server-changes/batch-stream-phase2-retry-idempotency.md +++ b/.server-changes/batch-stream-phase2-retry-idempotency.md @@ -3,22 +3,47 @@ area: webapp type: fix --- -Treat Phase 2 batch-stream retries as idempotent when the batch has -already been sealed or moved past `PENDING` (TRI-9944). +Tighten Phase 2 batch-stream idempotency across all three branches of +`StreamBatchItemsService.call` so a successful original request, or a +retry of one, returns `sealed:true` instead of the customer-visible +422/`sealed:false` responses that surfaced as `BatchTriggerError` in +production. -When the SDK created a batch and then streamed its items (Phase 2 of -the 2-phase batch API), a lost response would trigger the SDK's -network-retry path. For small, fast-completing batches the original -request had already enqueued every item, sealed the batch, and let the -runs flip the batch to `PROCESSING` or even `COMPLETED` by the time the -retry arrived. The retry then failed the pre-loop check at -`apps/webapp/app/runEngine/services/streamBatchItems.server.ts:109` -with a 422 — surfacing a customer-visible `BatchTriggerError` for a -batch whose runs had actually succeeded. +Three related modes are now handled uniformly: -`StreamBatchItemsService.call` now returns the standard `sealed: true` -success response (with `itemsAccepted: 0`, `itemsDeduplicated: 0`, -`runCount: batch.runCount`) when the batch is already sealed or in -`PROCESSING`/`COMPLETED`, matching the idempotency already applied at -the two post-loop race-condition branches in the same file. -`ABORTED` and other unexpected non-`PENDING` states still throw. +1. **TRI-9944 (lost-response retry)**: SDK retries Phase 2 after a + network blip ate the response. The original sealed the batch; the + retry hits a non-`PENDING` status and the pre-loop check threw 422. + +2. **Sealed-with-callback PENDING**: the V2 `batchCompletionCallback` + resets status from `PROCESSING` back to `PENDING` when every run + was created cleanly, without touching `sealed`. The seal-failed + race branch threw "unexpected state" on this perfectly legitimate + state. + +3. **Cleanup-race (customer 4-item batchTriggerAndWait reports)**: + BatchQueue rushes through every item before the loop finishes its + seal step, the callback fires (setting `processingCompletedAt`), + `cleanup()` deletes the Redis metadata, then the service's + `getBatchEnqueuedCount` returns 0 ≠ `runCount`. The count-mismatch + branch returned `sealed:false` because `sealed=false + PENDING` + wasn't distinguishable from "client should stream more items". The + SDK then retried the stream against the cleaned-up batch, the + engine threw `Batch not found or not initialized`, retries + exhausted, customer saw `BatchTriggerError` despite every child run + completing successfully. + +The pre-loop check, the count-mismatch handler, and the seal-failed +handler now all call a single `isIdempotentRetrySuccess(status, sealed, +processingCompletedAt)` helper. `processingCompletedAt` is the +discriminator that fixes mode (3) — it's set exclusively by the V2 +completion callback, so `(status=PENDING) && (sealed || processingCompletedAt +!= null)` cleanly separates "callback fired, work is done" from "client +should stream more items". + +`ABORTED` (zero TaskRun records — every run-creation attempt failed +*and* the pre-failed-TaskRun fallback also failed) is explicitly +excluded from the idempotent-success path in all three branches: the +customer has nothing to monitor at the run level, so the trigger call +must throw to give their `batchTrigger()` retry the chance to create a +fresh batch. diff --git a/apps/webapp/app/runEngine/services/streamBatchItems.server.ts b/apps/webapp/app/runEngine/services/streamBatchItems.server.ts index c97b05ab135..81f244655d2 100644 --- a/apps/webapp/app/runEngine/services/streamBatchItems.server.ts +++ b/apps/webapp/app/runEngine/services/streamBatchItems.server.ts @@ -15,31 +15,41 @@ import { BatchPayloadProcessor } from "../concerns/batchPayloads.server"; * Phase 2 retry idempotency check (TRI-9944). * * Returns true when the batch is in a state that means the Phase 2 stream's - * job has already been done by an earlier (or concurrent) request — every - * item is enqueued, runs have been or are being created, and at least one - * TaskRun record exists for the customer to monitor. A retry should return - * sealed:true in these states so the SDK stops retrying. + * job has already been done — every item has a TaskRun record (real or + * pre-failed) for the customer to monitor. A retry, or the original call + * racing against a fast-completing BatchQueue, should return sealed:true + * in these states so the SDK stops retrying. * - * - PROCESSING / sealed=true + PENDING: original sealed; runs are executing - * (PENDING after callback "all runs created") or about to. - * - COMPLETED: every run reached a terminal state (tryCompleteBatch). - * - PARTIAL_FAILED: at least one TaskRun record exists; per-run failures - * are visible at the run level. + * Three "work is done" shapes: + * - status moved out of PENDING into PROCESSING/COMPLETED/PARTIAL_FAILED + * (PROCESSING via our seal, COMPLETED via tryCompleteBatch, PARTIAL_FAILED + * via the V2 batchCompletionCallback). + * - status stuck at PENDING but `sealed=true`: another concurrent + * streamBatchItems call sealed the batch and then the callback's + * happy-path branch reset status to PENDING ("all runs created"). + * - status stuck at PENDING with `sealed=false` but `processingCompletedAt` + * set: the cleanup-race. BatchQueue rushed through all items, callback + * fired (setting processingCompletedAt), cleanup deleted the Redis + * metadata — all before our service got the chance to seal. The work + * is done; the discriminator is processingCompletedAt which is set + * exclusively by the V2 completion callback. * - * ABORTED is intentionally excluded — it means ZERO TaskRun records were - * created (every per-item attempt failed AND the pre-failed-TaskRun fallback - * also failed). The customer has nothing to monitor at the run level, so - * the trigger call must throw to give their retry/error handling a chance. + * ABORTED is excluded — it means ZERO TaskRun records were created (every + * per-item attempt failed AND the pre-failed-TaskRun fallback also failed, + * or queue-overload on every item). The customer has nothing to monitor + * at the run level, so the trigger call must throw to give their retry/ + * error handling a chance to create a fresh batch. */ export function isIdempotentRetrySuccess( status: BatchTaskRunStatus | null | undefined, - sealed: boolean | null | undefined + sealed: boolean | null | undefined, + processingCompletedAt: Date | null | undefined ): boolean { return ( status === "PROCESSING" || status === "COMPLETED" || status === "PARTIAL_FAILED" || - (sealed === true && status === "PENDING") + (status === "PENDING" && (sealed === true || processingCompletedAt != null)) ); } @@ -126,6 +136,7 @@ export class StreamBatchItemsService extends WithRunEngine { runCount: true, sealed: true, batchVersion: true, + processingCompletedAt: true, }, }); @@ -133,11 +144,12 @@ export class StreamBatchItemsService extends WithRunEngine { throw new ServiceValidationError(`Batch ${batchFriendlyId} not found`); } - if (isIdempotentRetrySuccess(batch.status, batch.sealed)) { + if (isIdempotentRetrySuccess(batch.status, batch.sealed, batch.processingCompletedAt)) { logger.info("Batch already sealed/completed - treating Phase 2 retry as success", { batchId: batchFriendlyId, batchSealed: batch.sealed, batchStatus: batch.status, + processingCompletedAt: batch.processingCompletedAt, }); return { @@ -266,10 +278,16 @@ export class StreamBatchItemsService extends WithRunEngine { // COMPLETED (sealed by the BatchQueue completion path before we got here). const currentBatch = await this._prisma.batchTaskRun.findFirst({ where: { id: batchId }, - select: { sealed: true, status: true }, + select: { sealed: true, status: true, processingCompletedAt: true }, }); - if (isIdempotentRetrySuccess(currentBatch?.status, currentBatch?.sealed)) { + if ( + isIdempotentRetrySuccess( + currentBatch?.status, + currentBatch?.sealed, + currentBatch?.processingCompletedAt + ) + ) { logger.info("Batch already sealed before count check (fast completion)", { batchId: batchFriendlyId, itemsAccepted, @@ -277,6 +295,7 @@ export class StreamBatchItemsService extends WithRunEngine { enqueuedCount, expectedCount: batch.runCount, batchStatus: currentBatch?.status, + processingCompletedAt: currentBatch?.processingCompletedAt, }); return { @@ -355,10 +374,17 @@ export class StreamBatchItemsService extends WithRunEngine { friendlyId: true, status: true, sealed: true, + processingCompletedAt: true, }, }); - if (isIdempotentRetrySuccess(currentBatch?.status, currentBatch?.sealed)) { + if ( + isIdempotentRetrySuccess( + currentBatch?.status, + currentBatch?.sealed, + currentBatch?.processingCompletedAt + ) + ) { logger.info("Batch already sealed/completed by concurrent path", { batchId: batchFriendlyId, itemsAccepted, @@ -366,6 +392,7 @@ export class StreamBatchItemsService extends WithRunEngine { envId: environment.id, batchStatus: currentBatch?.status, batchSealed: currentBatch?.sealed, + processingCompletedAt: currentBatch?.processingCompletedAt, }); span.setAttribute("itemsAccepted", itemsAccepted); diff --git a/apps/webapp/test/engine/streamBatchItems.test.ts b/apps/webapp/test/engine/streamBatchItems.test.ts index 09de73baf6f..f5348d71b98 100644 --- a/apps/webapp/test/engine/streamBatchItems.test.ts +++ b/apps/webapp/test/engine/streamBatchItems.test.ts @@ -40,8 +40,9 @@ describe("StreamBatchItemsService", () => { environmentId: string, options: { runCount: number; - status?: "PENDING" | "PROCESSING" | "COMPLETED" | "ABORTED"; + status?: "PENDING" | "PROCESSING" | "COMPLETED" | "PARTIAL_FAILED" | "ABORTED"; sealed?: boolean; + processingCompletedAt?: Date | null; } ) { const { id, friendlyId } = BatchId.generate(); @@ -57,6 +58,7 @@ describe("StreamBatchItemsService", () => { runIds: [], batchVersion: "runengine:v2", sealed: options.sealed ?? false, + processingCompletedAt: options.processingCompletedAt ?? null, }, }); @@ -1322,6 +1324,201 @@ describe("StreamBatchItemsService", () => { await engine.quit(); } ); + + containerTest( + "should return sealed=true when batch is sealed=false + PENDING + processingCompletedAt set (pre-loop post-callback)", + async ({ prisma, redisOptions }) => { + const engine = new RunEngine({ + prisma, + worker: { + redis: redisOptions, + workers: 1, + tasksPerWorker: 10, + pollIntervalMs: 100, + disabled: true, + }, + queue: { + redis: redisOptions, + }, + runLock: { + redis: redisOptions, + }, + machines: { + defaultMachine: "small-1x", + machines: { + "small-1x": { + name: "small-1x" as const, + cpu: 0.5, + memory: 0.5, + centsPerMs: 0.0001, + }, + }, + baseCostInCents: 0.0005, + }, + batchQueue: { + redis: redisOptions, + }, + tracer: trace.getTracer("test", "0.0.0"), + }); + + const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); + + // The V2 batchCompletionCallback set processingCompletedAt without + // touching sealed (sealed gets set by streamBatchItems, not the callback). + // Status stays PENDING because every run was created successfully (the + // callback's "all created, waiting for completion" branch). A Phase 2 + // retry arriving in this state must treat it as success — every item + // has a TaskRun record for the customer to monitor. + const batch = await createBatch(prisma, authenticatedEnvironment.id, { + runCount: 4, + status: "PENDING", + sealed: false, + processingCompletedAt: new Date(), + }); + + const service = new StreamBatchItemsService({ + prisma, + engine, + }); + + const result = await service.call( + authenticatedEnvironment, + batch.friendlyId, + itemsToAsyncIterable([]), + { + maxItemBytes: 1024 * 1024, + } + ); + + expect(result.sealed).toBe(true); + expect(result.id).toBe(batch.friendlyId); + expect(result.itemsAccepted).toBe(0); + expect(result.itemsDeduplicated).toBe(0); + + await engine.quit(); + } + ); + + containerTest( + "should return sealed=true on count-mismatch when callback fired before our getEnqueuedCount (cleanup race — customer scenario)", + async ({ prisma, redisOptions }) => { + const engine = new RunEngine({ + prisma, + worker: { + redis: redisOptions, + workers: 1, + tasksPerWorker: 10, + pollIntervalMs: 100, + disabled: true, + }, + queue: { + redis: redisOptions, + }, + runLock: { + redis: redisOptions, + }, + machines: { + defaultMachine: "small-1x", + machines: { + "small-1x": { + name: "small-1x" as const, + cpu: 0.5, + memory: 0.5, + centsPerMs: 0.0001, + }, + }, + baseCostInCents: 0.0005, + }, + batchQueue: { + redis: redisOptions, + }, + tracer: trace.getTracer("test", "0.0.0"), + }); + + const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); + + // The customer's exact case: 4-item batch, BatchQueue rushes through + // all items before our service finishes its loop, callback fires + // (setting processingCompletedAt; status stays PENDING because all + // runs created cleanly), cleanup deletes Redis state. Our service + // hits the count-mismatch branch because getBatchEnqueuedCount returns + // 0 (cleaned). The re-query sees sealed=false + PENDING but + // processingCompletedAt is set — the work is done. + const batch = await createBatch(prisma, authenticatedEnvironment.id, { + runCount: 4, + status: "PENDING", + sealed: false, + }); + + await engine.initializeBatch({ + batchId: batch.id, + friendlyId: batch.friendlyId, + environmentId: authenticatedEnvironment.id, + environmentType: authenticatedEnvironment.type, + organizationId: authenticatedEnvironment.organizationId, + projectId: authenticatedEnvironment.projectId, + runCount: 4, + processingConcurrency: 10, + }); + + // Force the count-mismatch branch by leaving Redis at 0 items vs + // runCount=4. The pre-loop must see "initial" state (so it passes + // through to the loop), and the count-mismatch re-query must see + // "post-callback" state. Use a findFirst counter to flip the DB + // between those two reads, exactly matching the production timing + // where the callback fires while our loop is running. + let findFirstCallCount = 0; + const racingPrisma = { + ...prisma, + batchTaskRun: { + ...prisma.batchTaskRun, + findFirst: async (args: Parameters[0]) => { + findFirstCallCount++; + if (findFirstCallCount === 2) { + // The post-loop count-mismatch re-query: BatchQueue completed + // all items and the callback fired in the window before this + // read. Status stays PENDING (all runs created OK) but + // processingCompletedAt is now set. + await prisma.batchTaskRun.update({ + where: { id: batch.id }, + data: { + processingCompletedAt: new Date(), + successfulRunCount: 4, + }, + }); + } + return prisma.batchTaskRun.findFirst.call(prisma.batchTaskRun, args); + }, + updateMany: prisma.batchTaskRun.updateMany.bind(prisma.batchTaskRun), + findUnique: prisma.batchTaskRun.findUnique.bind(prisma.batchTaskRun), + }, + } as unknown as PrismaClient; + + const service = new StreamBatchItemsService({ + prisma: racingPrisma, + engine, + }); + + const result = await service.call( + authenticatedEnvironment, + batch.friendlyId, + itemsToAsyncIterable([]), + { + maxItemBytes: 1024 * 1024, + } + ); + + // The retry must be treated as success — every item's TaskRun was + // created by the original Phase 2 call. Returning sealed:false here + // (the previous behavior) made the SDK retry the stream against a + // cleaned-up batch, which then 5xx'd, exhausted SDK retries, and + // surfaced as BatchTriggerError despite all runs succeeding. + expect(result.sealed).toBe(true); + expect(result.id).toBe(batch.friendlyId); + + await engine.quit(); + } + ); }); describe("createNdjsonParserStream", () => { From d4e8af9d4a005678a4c1b86385db89ef957cde1e Mon Sep 17 00:00:00 2001 From: nicktrn <55853254+nicktrn@users.noreply.github.com> Date: Thu, 28 May 2026 10:17:56 +0100 Subject: [PATCH 5/5] chore: trim .server-changes file to release-note length --- .../batch-stream-phase2-retry-idempotency.md | 45 +------------------ 1 file changed, 1 insertion(+), 44 deletions(-) diff --git a/.server-changes/batch-stream-phase2-retry-idempotency.md b/.server-changes/batch-stream-phase2-retry-idempotency.md index 1a3672c5ead..a14adcadb1f 100644 --- a/.server-changes/batch-stream-phase2-retry-idempotency.md +++ b/.server-changes/batch-stream-phase2-retry-idempotency.md @@ -3,47 +3,4 @@ area: webapp type: fix --- -Tighten Phase 2 batch-stream idempotency across all three branches of -`StreamBatchItemsService.call` so a successful original request, or a -retry of one, returns `sealed:true` instead of the customer-visible -422/`sealed:false` responses that surfaced as `BatchTriggerError` in -production. - -Three related modes are now handled uniformly: - -1. **TRI-9944 (lost-response retry)**: SDK retries Phase 2 after a - network blip ate the response. The original sealed the batch; the - retry hits a non-`PENDING` status and the pre-loop check threw 422. - -2. **Sealed-with-callback PENDING**: the V2 `batchCompletionCallback` - resets status from `PROCESSING` back to `PENDING` when every run - was created cleanly, without touching `sealed`. The seal-failed - race branch threw "unexpected state" on this perfectly legitimate - state. - -3. **Cleanup-race (customer 4-item batchTriggerAndWait reports)**: - BatchQueue rushes through every item before the loop finishes its - seal step, the callback fires (setting `processingCompletedAt`), - `cleanup()` deletes the Redis metadata, then the service's - `getBatchEnqueuedCount` returns 0 ≠ `runCount`. The count-mismatch - branch returned `sealed:false` because `sealed=false + PENDING` - wasn't distinguishable from "client should stream more items". The - SDK then retried the stream against the cleaned-up batch, the - engine threw `Batch not found or not initialized`, retries - exhausted, customer saw `BatchTriggerError` despite every child run - completing successfully. - -The pre-loop check, the count-mismatch handler, and the seal-failed -handler now all call a single `isIdempotentRetrySuccess(status, sealed, -processingCompletedAt)` helper. `processingCompletedAt` is the -discriminator that fixes mode (3) — it's set exclusively by the V2 -completion callback, so `(status=PENDING) && (sealed || processingCompletedAt -!= null)` cleanly separates "callback fired, work is done" from "client -should stream more items". - -`ABORTED` (zero TaskRun records — every run-creation attempt failed -*and* the pre-failed-TaskRun fallback also failed) is explicitly -excluded from the idempotent-success path in all three branches: the -customer has nothing to monitor at the run level, so the trigger call -must throw to give their `batchTrigger()` retry the chance to create a -fresh batch. +Stop spurious `BatchTriggerError` failures when a fast-completing `batchTrigger`/`batchTriggerAndWait` raced the stream finalisation - the API now treats these as successes instead of 422s.