diff --git a/.server-changes/batch-stream-phase2-retry-idempotency.md b/.server-changes/batch-stream-phase2-retry-idempotency.md new file mode 100644 index 00000000000..a14adcadb1f --- /dev/null +++ b/.server-changes/batch-stream-phase2-retry-idempotency.md @@ -0,0 +1,6 @@ +--- +area: webapp +type: fix +--- + +Stop spurious `BatchTriggerError` failures when a fast-completing `batchTrigger`/`batchTriggerAndWait` raced the stream finalisation - the API now treats these as successes instead of 422s. diff --git a/apps/webapp/app/runEngine/services/streamBatchItems.server.ts b/apps/webapp/app/runEngine/services/streamBatchItems.server.ts index 79c84eb540d..81f244655d2 100644 --- a/apps/webapp/app/runEngine/services/streamBatchItems.server.ts +++ b/apps/webapp/app/runEngine/services/streamBatchItems.server.ts @@ -4,12 +4,55 @@ import { } from "@trigger.dev/core/v3"; import { BatchId } from "@trigger.dev/core/v3/isomorphic"; import type { BatchItem, RunEngine } from "@internal/run-engine"; +import type { BatchTaskRunStatus } from "@trigger.dev/database"; import { prisma, type PrismaClientOrTransaction } from "~/db.server"; import type { AuthenticatedEnvironment } from "~/services/apiAuth.server"; import { logger } from "~/services/logger.server"; import { ServiceValidationError, WithRunEngine } from "../../v3/services/baseService.server"; import { BatchPayloadProcessor } from "../concerns/batchPayloads.server"; +/** + * Phase 2 retry idempotency check (TRI-9944). + * + * Returns true when the batch is in a state that means the Phase 2 stream's + * job has already been done — every item has a TaskRun record (real or + * pre-failed) for the customer to monitor. A retry, or the original call + * racing against a fast-completing BatchQueue, should return sealed:true + * in these states so the SDK stops retrying. + * + * Three "work is done" shapes: + * - status moved out of PENDING into PROCESSING/COMPLETED/PARTIAL_FAILED + * (PROCESSING via our seal, COMPLETED via tryCompleteBatch, PARTIAL_FAILED + * via the V2 batchCompletionCallback). + * - status stuck at PENDING but `sealed=true`: another concurrent + * streamBatchItems call sealed the batch and then the callback's + * happy-path branch reset status to PENDING ("all runs created"). + * - status stuck at PENDING with `sealed=false` but `processingCompletedAt` + * set: the cleanup-race. BatchQueue rushed through all items, callback + * fired (setting processingCompletedAt), cleanup deleted the Redis + * metadata — all before our service got the chance to seal. The work + * is done; the discriminator is processingCompletedAt which is set + * exclusively by the V2 completion callback. + * + * ABORTED is excluded — it means ZERO TaskRun records were created (every + * per-item attempt failed AND the pre-failed-TaskRun fallback also failed, + * or queue-overload on every item). The customer has nothing to monitor + * at the run level, so the trigger call must throw to give their retry/ + * error handling a chance to create a fresh batch. + */ +export function isIdempotentRetrySuccess( + status: BatchTaskRunStatus | null | undefined, + sealed: boolean | null | undefined, + processingCompletedAt: Date | null | undefined +): boolean { + return ( + status === "PROCESSING" || + status === "COMPLETED" || + status === "PARTIAL_FAILED" || + (status === "PENDING" && (sealed === true || processingCompletedAt != null)) + ); +} + export type StreamBatchItemsServiceOptions = { maxItemBytes: number; }; @@ -93,6 +136,7 @@ export class StreamBatchItemsService extends WithRunEngine { runCount: true, sealed: true, batchVersion: true, + processingCompletedAt: true, }, }); @@ -100,13 +144,27 @@ export class StreamBatchItemsService extends WithRunEngine { throw new ServiceValidationError(`Batch ${batchFriendlyId} not found`); } - if (batch.sealed) { - throw new ServiceValidationError( - `Batch ${batchFriendlyId} is already sealed and cannot accept more items` - ); + if (isIdempotentRetrySuccess(batch.status, batch.sealed, batch.processingCompletedAt)) { + logger.info("Batch already sealed/completed - treating Phase 2 retry as success", { + batchId: batchFriendlyId, + batchSealed: batch.sealed, + batchStatus: batch.status, + processingCompletedAt: batch.processingCompletedAt, + }); + + return { + id: batchFriendlyId, + itemsAccepted: 0, + itemsDeduplicated: 0, + sealed: true, + runCount: batch.runCount, + }; } if (batch.status !== "PENDING") { + // ABORTED or any other unexpected non-PENDING state — surface as an error. + // For ABORTED specifically, throwing is required so the customer's + // batchTrigger() retries (a new batch) can recreate the runs. throw new ServiceValidationError( `Batch ${batchFriendlyId} is not in PENDING status (current: ${batch.status})` ); @@ -220,17 +278,24 @@ export class StreamBatchItemsService extends WithRunEngine { // COMPLETED (sealed by the BatchQueue completion path before we got here). const currentBatch = await this._prisma.batchTaskRun.findFirst({ where: { id: batchId }, - select: { sealed: true, status: true }, + select: { sealed: true, status: true, processingCompletedAt: true }, }); - if (currentBatch?.sealed || currentBatch?.status === "COMPLETED") { + if ( + isIdempotentRetrySuccess( + currentBatch?.status, + currentBatch?.sealed, + currentBatch?.processingCompletedAt + ) + ) { logger.info("Batch already sealed before count check (fast completion)", { batchId: batchFriendlyId, itemsAccepted, itemsDeduplicated, enqueuedCount, expectedCount: batch.runCount, - batchStatus: currentBatch.status, + batchStatus: currentBatch?.status, + processingCompletedAt: currentBatch?.processingCompletedAt, }); return { @@ -242,6 +307,15 @@ export class StreamBatchItemsService extends WithRunEngine { }; } + if (currentBatch?.status === "ABORTED") { + // Zero TaskRuns exist — the count-mismatch sealed:false semantics + // ("retry with missing items") would mislead the SDK. Throw so the + // customer's batchTrigger() retry creates a fresh batch. + throw new ServiceValidationError( + `Batch ${batchFriendlyId} is not in PENDING status (current: ABORTED)` + ); + } + logger.warn("Batch item count mismatch", { batchId: batchFriendlyId, expected: batch.runCount, @@ -300,20 +374,25 @@ export class StreamBatchItemsService extends WithRunEngine { friendlyId: true, status: true, sealed: true, + processingCompletedAt: true, }, }); if ( - (currentBatch?.sealed && currentBatch.status === "PROCESSING") || - currentBatch?.status === "COMPLETED" + isIdempotentRetrySuccess( + currentBatch?.status, + currentBatch?.sealed, + currentBatch?.processingCompletedAt + ) ) { logger.info("Batch already sealed/completed by concurrent path", { batchId: batchFriendlyId, itemsAccepted, itemsDeduplicated, envId: environment.id, - batchStatus: currentBatch.status, - batchSealed: currentBatch.sealed, + batchStatus: currentBatch?.status, + batchSealed: currentBatch?.sealed, + processingCompletedAt: currentBatch?.processingCompletedAt, }); span.setAttribute("itemsAccepted", itemsAccepted); diff --git a/apps/webapp/test/engine/streamBatchItems.test.ts b/apps/webapp/test/engine/streamBatchItems.test.ts index 6e5f2264b1f..f5348d71b98 100644 --- a/apps/webapp/test/engine/streamBatchItems.test.ts +++ b/apps/webapp/test/engine/streamBatchItems.test.ts @@ -40,8 +40,9 @@ describe("StreamBatchItemsService", () => { environmentId: string, options: { runCount: number; - status?: "PENDING" | "PROCESSING" | "COMPLETED" | "ABORTED"; + status?: "PENDING" | "PROCESSING" | "COMPLETED" | "PARTIAL_FAILED" | "ABORTED"; sealed?: boolean; + processingCompletedAt?: Date | null; } ) { const { id, friendlyId } = BatchId.generate(); @@ -57,6 +58,7 @@ describe("StreamBatchItemsService", () => { runIds: [], batchVersion: "runengine:v2", sealed: options.sealed ?? false, + processingCompletedAt: options.processingCompletedAt ?? null, }, }); @@ -174,7 +176,7 @@ describe("StreamBatchItemsService", () => { ); containerTest( - "should handle race condition when batch already sealed by another request", + "should return sealed=true when batch is already sealed and PROCESSING (Phase 2 retry idempotency)", async ({ prisma, redisOptions }) => { const engine = new RunEngine({ prisma, @@ -211,14 +213,717 @@ describe("StreamBatchItemsService", () => { const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); - // Create a batch that is already sealed and PROCESSING (simulating another request won the race) + // Simulate the SDK retrying Phase 2 after the original request succeeded: + // the original request already sealed the batch and moved it to PROCESSING. const batch = await createBatch(prisma, authenticatedEnvironment.id, { runCount: 2, status: "PROCESSING", sealed: true, }); - // Initialize the batch in Redis with full count + const service = new StreamBatchItemsService({ + prisma, + engine, + }); + + const result = await service.call( + authenticatedEnvironment, + batch.friendlyId, + itemsToAsyncIterable([]), + { + maxItemBytes: 1024 * 1024, + } + ); + + // The retry should be treated as success — the original request already + // enqueued every item, so the SDK should stop retrying. + expect(result.sealed).toBe(true); + expect(result.id).toBe(batch.friendlyId); + expect(result.itemsAccepted).toBe(0); + expect(result.itemsDeduplicated).toBe(0); + expect(result.runCount).toBe(2); + + await engine.quit(); + } + ); + + containerTest( + "should return sealed=true when batch is COMPLETED before Phase 2 retry arrives (TRI-9944)", + async ({ prisma, redisOptions }) => { + const engine = new RunEngine({ + prisma, + worker: { + redis: redisOptions, + workers: 1, + tasksPerWorker: 10, + pollIntervalMs: 100, + disabled: true, + }, + queue: { + redis: redisOptions, + }, + runLock: { + redis: redisOptions, + }, + machines: { + defaultMachine: "small-1x", + machines: { + "small-1x": { + name: "small-1x" as const, + cpu: 0.5, + memory: 0.5, + centsPerMs: 0.0001, + }, + }, + baseCostInCents: 0.0005, + }, + batchQueue: { + redis: redisOptions, + }, + tracer: trace.getTracer("test", "0.0.0"), + }); + + const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); + + // The customer-reported scenario: single-item batch where the original + // Phase 2 request succeeded server-side, the run executed fast, the batch + // flipped to COMPLETED, then the lost-response SDK retry hits us. + // Note: tryCompleteBatch sets status=COMPLETED but does NOT set sealed=true. + const batch = await createBatch(prisma, authenticatedEnvironment.id, { + runCount: 1, + status: "COMPLETED", + sealed: false, + }); + + const service = new StreamBatchItemsService({ + prisma, + engine, + }); + + const result = await service.call( + authenticatedEnvironment, + batch.friendlyId, + itemsToAsyncIterable([]), + { + maxItemBytes: 1024 * 1024, + } + ); + + expect(result.sealed).toBe(true); + expect(result.id).toBe(batch.friendlyId); + expect(result.itemsAccepted).toBe(0); + expect(result.itemsDeduplicated).toBe(0); + + await engine.quit(); + } + ); + + containerTest( + "should throw when batch is in ABORTED status", + async ({ prisma, redisOptions }) => { + const engine = new RunEngine({ + prisma, + worker: { + redis: redisOptions, + workers: 1, + tasksPerWorker: 10, + pollIntervalMs: 100, + disabled: true, + }, + queue: { + redis: redisOptions, + }, + runLock: { + redis: redisOptions, + }, + machines: { + defaultMachine: "small-1x", + machines: { + "small-1x": { + name: "small-1x" as const, + cpu: 0.5, + memory: 0.5, + centsPerMs: 0.0001, + }, + }, + baseCostInCents: 0.0005, + }, + batchQueue: { + redis: redisOptions, + }, + tracer: trace.getTracer("test", "0.0.0"), + }); + + const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); + + const batch = await createBatch(prisma, authenticatedEnvironment.id, { + runCount: 2, + status: "ABORTED", + sealed: false, + }); + + const service = new StreamBatchItemsService({ + prisma, + engine, + }); + + await expect( + service.call(authenticatedEnvironment, batch.friendlyId, itemsToAsyncIterable([]), { + maxItemBytes: 1024 * 1024, + }) + ).rejects.toThrow(ServiceValidationError); + + await engine.quit(); + } + ); + + containerTest( + "should throw when batch is sealed but ABORTED (callback aborted post-seal must surface as error)", + async ({ prisma, redisOptions }) => { + const engine = new RunEngine({ + prisma, + worker: { + redis: redisOptions, + workers: 1, + tasksPerWorker: 10, + pollIntervalMs: 100, + disabled: true, + }, + queue: { + redis: redisOptions, + }, + runLock: { + redis: redisOptions, + }, + machines: { + defaultMachine: "small-1x", + machines: { + "small-1x": { + name: "small-1x" as const, + cpu: 0.5, + memory: 0.5, + centsPerMs: 0.0001, + }, + }, + baseCostInCents: 0.0005, + }, + batchQueue: { + redis: redisOptions, + }, + tracer: trace.getTracer("test", "0.0.0"), + }); + + const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); + + // V2 batch completion callback sets status=ABORTED (failedRunCount > 0 && + // successfulRunCount === 0) without touching sealed=true that the seal + // step previously set. The Phase 2 retry must NOT mask this terminal + // failure as success — every run failed. + const batch = await createBatch(prisma, authenticatedEnvironment.id, { + runCount: 2, + status: "ABORTED", + sealed: true, + }); + + const service = new StreamBatchItemsService({ + prisma, + engine, + }); + + await expect( + service.call(authenticatedEnvironment, batch.friendlyId, itemsToAsyncIterable([]), { + maxItemBytes: 1024 * 1024, + }) + ).rejects.toThrow(ServiceValidationError); + + await engine.quit(); + } + ); + + containerTest( + "should return sealed=true when batch is PARTIAL_FAILED (Phase 2 retry idempotency)", + async ({ prisma, redisOptions }) => { + const engine = new RunEngine({ + prisma, + worker: { + redis: redisOptions, + workers: 1, + tasksPerWorker: 10, + pollIntervalMs: 100, + disabled: true, + }, + queue: { + redis: redisOptions, + }, + runLock: { + redis: redisOptions, + }, + machines: { + defaultMachine: "small-1x", + machines: { + "small-1x": { + name: "small-1x" as const, + cpu: 0.5, + memory: 0.5, + centsPerMs: 0.0001, + }, + }, + baseCostInCents: 0.0005, + }, + batchQueue: { + redis: redisOptions, + }, + tracer: trace.getTracer("test", "0.0.0"), + }); + + const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); + + // V2 completion callback sets PARTIAL_FAILED when some run-creation + // attempts failed but at least one succeeded. The Phase 2 stream itself + // did its job (items were enqueued and processed), so a retry should + // see this as terminal success — the per-item failures are visible on + // the individual run records. + const batch = await createBatch(prisma, authenticatedEnvironment.id, { + runCount: 2, + status: "PARTIAL_FAILED", + sealed: false, + }); + + const service = new StreamBatchItemsService({ + prisma, + engine, + }); + + const result = await service.call( + authenticatedEnvironment, + batch.friendlyId, + itemsToAsyncIterable([]), + { + maxItemBytes: 1024 * 1024, + } + ); + + expect(result.sealed).toBe(true); + expect(result.id).toBe(batch.friendlyId); + expect(result.itemsAccepted).toBe(0); + expect(result.itemsDeduplicated).toBe(0); + + await engine.quit(); + } + ); + + containerTest( + "should return sealed=true when batch is PARTIAL_FAILED by callback before seal attempt", + async ({ prisma, redisOptions }) => { + const engine = new RunEngine({ + prisma, + worker: { + redis: redisOptions, + workers: 1, + tasksPerWorker: 10, + pollIntervalMs: 100, + disabled: true, + }, + queue: { + redis: redisOptions, + }, + runLock: { + redis: redisOptions, + }, + machines: { + defaultMachine: "small-1x", + machines: { + "small-1x": { + name: "small-1x" as const, + cpu: 0.5, + memory: 0.5, + centsPerMs: 0.0001, + }, + }, + baseCostInCents: 0.0005, + }, + batchQueue: { + redis: redisOptions, + }, + tracer: trace.getTracer("test", "0.0.0"), + }); + + const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); + + const batch = await createBatch(prisma, authenticatedEnvironment.id, { + runCount: 2, + status: "PENDING", + sealed: false, + }); + + await engine.initializeBatch({ + batchId: batch.id, + friendlyId: batch.friendlyId, + environmentId: authenticatedEnvironment.id, + environmentType: authenticatedEnvironment.type, + organizationId: authenticatedEnvironment.organizationId, + projectId: authenticatedEnvironment.projectId, + runCount: 2, + processingConcurrency: 10, + }); + + await engine.enqueueBatchItem(batch.id, authenticatedEnvironment.id, 0, { + task: "test-task", + payload: JSON.stringify({ data: "item1" }), + payloadType: "application/json", + }); + await engine.enqueueBatchItem(batch.id, authenticatedEnvironment.id, 1, { + task: "test-task", + payload: JSON.stringify({ data: "item2" }), + payloadType: "application/json", + }); + + // Simulate the race where V2's batchCompletionCallback runs between + // getEnqueuedCount and the seal updateMany — some runs failed to create + // but at least one succeeded, so callback sets status=PARTIAL_FAILED + // without setting sealed=true. + const racingPrisma = { + ...prisma, + batchTaskRun: { + ...prisma.batchTaskRun, + findFirst: prisma.batchTaskRun.findFirst.bind(prisma.batchTaskRun), + updateMany: async () => { + await prisma.batchTaskRun.update({ + where: { id: batch.id }, + data: { + status: "PARTIAL_FAILED", + }, + }); + return { count: 0 }; + }, + findUnique: prisma.batchTaskRun.findUnique.bind(prisma.batchTaskRun), + }, + } as unknown as PrismaClient; + + const service = new StreamBatchItemsService({ + prisma: racingPrisma, + engine, + }); + + const result = await service.call( + authenticatedEnvironment, + batch.friendlyId, + itemsToAsyncIterable([]), + { + maxItemBytes: 1024 * 1024, + } + ); + + expect(result.sealed).toBe(true); + expect(result.id).toBe(batch.friendlyId); + + const updatedBatch = await prisma.batchTaskRun.findUnique({ + where: { id: batch.id }, + }); + + expect(updatedBatch?.status).toBe("PARTIAL_FAILED"); + expect(updatedBatch?.sealed).toBe(false); + + await engine.quit(); + } + ); + + containerTest( + "should return sealed=true when concurrent request already sealed the batch during seal attempt", + async ({ prisma, redisOptions }) => { + const engine = new RunEngine({ + prisma, + worker: { + redis: redisOptions, + workers: 1, + tasksPerWorker: 10, + pollIntervalMs: 100, + disabled: true, + }, + queue: { + redis: redisOptions, + }, + runLock: { + redis: redisOptions, + }, + machines: { + defaultMachine: "small-1x", + machines: { + "small-1x": { + name: "small-1x" as const, + cpu: 0.5, + memory: 0.5, + centsPerMs: 0.0001, + }, + }, + baseCostInCents: 0.0005, + }, + batchQueue: { + redis: redisOptions, + }, + tracer: trace.getTracer("test", "0.0.0"), + }); + + const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); + + // Create a batch in PENDING state + const batch = await createBatch(prisma, authenticatedEnvironment.id, { + runCount: 2, + status: "PENDING", + sealed: false, + }); + + // Initialize the batch in Redis + await engine.initializeBatch({ + batchId: batch.id, + friendlyId: batch.friendlyId, + environmentId: authenticatedEnvironment.id, + environmentType: authenticatedEnvironment.type, + organizationId: authenticatedEnvironment.organizationId, + projectId: authenticatedEnvironment.projectId, + runCount: 2, + processingConcurrency: 10, + }); + + // Enqueue items + await engine.enqueueBatchItem(batch.id, authenticatedEnvironment.id, 0, { + task: "test-task", + payload: JSON.stringify({ data: "item1" }), + payloadType: "application/json", + }); + await engine.enqueueBatchItem(batch.id, authenticatedEnvironment.id, 1, { + task: "test-task", + payload: JSON.stringify({ data: "item2" }), + payloadType: "application/json", + }); + + // Create a custom prisma client that simulates a race condition: + // When updateMany is called on batchTaskRun, it returns count: 0 (as if another request beat us) + // but the subsequent findUnique shows the batch is sealed and PROCESSING + const racingPrisma = { + ...prisma, + batchTaskRun: { + ...prisma.batchTaskRun, + findFirst: prisma.batchTaskRun.findFirst.bind(prisma.batchTaskRun), + updateMany: async () => { + // Simulate another request winning the race - seal the batch first + await prisma.batchTaskRun.update({ + where: { id: batch.id }, + data: { + sealed: true, + sealedAt: new Date(), + status: "PROCESSING", + processingStartedAt: new Date(), + }, + }); + // Return 0 as if the conditional update failed + return { count: 0 }; + }, + findUnique: prisma.batchTaskRun.findUnique.bind(prisma.batchTaskRun), + }, + } as unknown as PrismaClient; + + const service = new StreamBatchItemsService({ + prisma: racingPrisma, + engine, + }); + + // Call the service - it should detect the race and return success since batch is sealed + const result = await service.call( + authenticatedEnvironment, + batch.friendlyId, + itemsToAsyncIterable([]), + { + maxItemBytes: 1024 * 1024, + } + ); + + // Should return sealed=true because the batch was sealed (by the "other" request) + expect(result.sealed).toBe(true); + expect(result.id).toBe(batch.friendlyId); + + // Verify the batch is sealed in the database + const updatedBatch = await prisma.batchTaskRun.findUnique({ + where: { id: batch.id }, + }); + + expect(updatedBatch?.sealed).toBe(true); + expect(updatedBatch?.status).toBe("PROCESSING"); + + await engine.quit(); + } + ); + + containerTest( + "should return sealed=true when batch is COMPLETED by BatchQueue before seal attempt", + async ({ prisma, redisOptions }) => { + const engine = new RunEngine({ + prisma, + worker: { + redis: redisOptions, + workers: 1, + tasksPerWorker: 10, + pollIntervalMs: 100, + disabled: true, + }, + queue: { + redis: redisOptions, + }, + runLock: { + redis: redisOptions, + }, + machines: { + defaultMachine: "small-1x", + machines: { + "small-1x": { + name: "small-1x" as const, + cpu: 0.5, + memory: 0.5, + centsPerMs: 0.0001, + }, + }, + baseCostInCents: 0.0005, + }, + batchQueue: { + redis: redisOptions, + }, + tracer: trace.getTracer("test", "0.0.0"), + }); + + const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); + + // Create a batch in PENDING state + const batch = await createBatch(prisma, authenticatedEnvironment.id, { + runCount: 2, + status: "PENDING", + sealed: false, + }); + + // Initialize the batch in Redis + await engine.initializeBatch({ + batchId: batch.id, + friendlyId: batch.friendlyId, + environmentId: authenticatedEnvironment.id, + environmentType: authenticatedEnvironment.type, + organizationId: authenticatedEnvironment.organizationId, + projectId: authenticatedEnvironment.projectId, + runCount: 2, + processingConcurrency: 10, + }); + + // Enqueue items - the enqueued count check passes but the seal updateMany + // will race with tryCompleteBatch moving status to COMPLETED. + await engine.enqueueBatchItem(batch.id, authenticatedEnvironment.id, 0, { + task: "test-task", + payload: JSON.stringify({ data: "item1" }), + payloadType: "application/json", + }); + await engine.enqueueBatchItem(batch.id, authenticatedEnvironment.id, 1, { + task: "test-task", + payload: JSON.stringify({ data: "item2" }), + payloadType: "application/json", + }); + + // Simulate the race where BatchQueue's completionCallback runs + // tryCompleteBatch between getEnqueuedCount and the seal updateMany. + // tryCompleteBatch sets status=COMPLETED but NOT sealed=true. + const racingPrisma = { + ...prisma, + batchTaskRun: { + ...prisma.batchTaskRun, + findFirst: prisma.batchTaskRun.findFirst.bind(prisma.batchTaskRun), + updateMany: async () => { + await prisma.batchTaskRun.update({ + where: { id: batch.id }, + data: { + status: "COMPLETED", + }, + }); + // The conditional updateMany(where: status="PENDING") would now fail + return { count: 0 }; + }, + findUnique: prisma.batchTaskRun.findUnique.bind(prisma.batchTaskRun), + }, + } as unknown as PrismaClient; + + const service = new StreamBatchItemsService({ + prisma: racingPrisma, + engine, + }); + + const result = await service.call( + authenticatedEnvironment, + batch.friendlyId, + itemsToAsyncIterable([]), + { + maxItemBytes: 1024 * 1024, + } + ); + + // The endpoint should accept the COMPLETED state as a success case so the + // SDK does not retry a batch whose child runs have already finished. + expect(result.sealed).toBe(true); + expect(result.id).toBe(batch.friendlyId); + + const updatedBatch = await prisma.batchTaskRun.findUnique({ + where: { id: batch.id }, + }); + + expect(updatedBatch?.status).toBe("COMPLETED"); + // sealed stays false because the BatchQueue completion path does not set + // it - that's fine, the batch is terminal. + expect(updatedBatch?.sealed).toBe(false); + + await engine.quit(); + } + ); + + containerTest( + "should throw error when race condition leaves batch in unexpected state", + async ({ prisma, redisOptions }) => { + const engine = new RunEngine({ + prisma, + worker: { + redis: redisOptions, + workers: 1, + tasksPerWorker: 10, + pollIntervalMs: 100, + disabled: true, + }, + queue: { + redis: redisOptions, + }, + runLock: { + redis: redisOptions, + }, + machines: { + defaultMachine: "small-1x", + machines: { + "small-1x": { + name: "small-1x" as const, + cpu: 0.5, + memory: 0.5, + centsPerMs: 0.0001, + }, + }, + baseCostInCents: 0.0005, + }, + batchQueue: { + redis: redisOptions, + }, + tracer: trace.getTracer("test", "0.0.0"), + }); + + const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); + + // Create a batch in PENDING state + const batch = await createBatch(prisma, authenticatedEnvironment.id, { + runCount: 2, + status: "PENDING", + sealed: false, + }); + + // Initialize the batch in Redis await engine.initializeBatch({ batchId: batch.id, friendlyId: batch.friendlyId, @@ -230,7 +935,7 @@ describe("StreamBatchItemsService", () => { processingConcurrency: 10, }); - // Enqueue items directly + // Enqueue items await engine.enqueueBatchItem(batch.id, authenticatedEnvironment.id, 0, { task: "test-task", payload: JSON.stringify({ data: "item1" }), @@ -242,24 +947,47 @@ describe("StreamBatchItemsService", () => { payloadType: "application/json", }); + // Create a custom prisma client that simulates a race condition where + // the batch ends up in an unexpected state (ABORTED instead of PROCESSING) + const racingPrisma = { + ...prisma, + batchTaskRun: { + ...prisma.batchTaskRun, + findFirst: prisma.batchTaskRun.findFirst.bind(prisma.batchTaskRun), + updateMany: async () => { + // Simulate the batch being aborted by another process + await prisma.batchTaskRun.update({ + where: { id: batch.id }, + data: { + sealed: true, + status: "ABORTED", + }, + }); + // Return 0 as if the conditional update failed + return { count: 0 }; + }, + findUnique: prisma.batchTaskRun.findUnique.bind(prisma.batchTaskRun), + }, + } as unknown as PrismaClient; + const service = new StreamBatchItemsService({ - prisma, + prisma: racingPrisma, engine, }); - // This should fail because the batch is already sealed + // Call the service - it should throw because the batch is in an unexpected state await expect( service.call(authenticatedEnvironment, batch.friendlyId, itemsToAsyncIterable([]), { maxItemBytes: 1024 * 1024, }) - ).rejects.toThrow(ServiceValidationError); + ).rejects.toThrow(/unexpected state/); await engine.quit(); } ); containerTest( - "should return sealed=true when concurrent request already sealed the batch during seal attempt", + "should return sealed=false when item count does not match", async ({ prisma, redisOptions }) => { const engine = new RunEngine({ prisma, @@ -296,9 +1024,9 @@ describe("StreamBatchItemsService", () => { const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); - // Create a batch in PENDING state + // Create a batch expecting 3 items const batch = await createBatch(prisma, authenticatedEnvironment.id, { - runCount: 2, + runCount: 3, status: "PENDING", sealed: false, }); @@ -311,11 +1039,11 @@ describe("StreamBatchItemsService", () => { environmentType: authenticatedEnvironment.type, organizationId: authenticatedEnvironment.organizationId, projectId: authenticatedEnvironment.projectId, - runCount: 2, + runCount: 3, processingConcurrency: 10, }); - // Enqueue items + // Only enqueue 2 items (1 short of expected) await engine.enqueueBatchItem(batch.id, authenticatedEnvironment.id, 0, { task: "test-task", payload: JSON.stringify({ data: "item1" }), @@ -327,38 +1055,11 @@ describe("StreamBatchItemsService", () => { payloadType: "application/json", }); - // Create a custom prisma client that simulates a race condition: - // When updateMany is called on batchTaskRun, it returns count: 0 (as if another request beat us) - // but the subsequent findUnique shows the batch is sealed and PROCESSING - const racingPrisma = { - ...prisma, - batchTaskRun: { - ...prisma.batchTaskRun, - findFirst: prisma.batchTaskRun.findFirst.bind(prisma.batchTaskRun), - updateMany: async () => { - // Simulate another request winning the race - seal the batch first - await prisma.batchTaskRun.update({ - where: { id: batch.id }, - data: { - sealed: true, - sealedAt: new Date(), - status: "PROCESSING", - processingStartedAt: new Date(), - }, - }); - // Return 0 as if the conditional update failed - return { count: 0 }; - }, - findUnique: prisma.batchTaskRun.findUnique.bind(prisma.batchTaskRun), - }, - } as unknown as PrismaClient; - const service = new StreamBatchItemsService({ - prisma: racingPrisma, + prisma, engine, }); - // Call the service - it should detect the race and return success since batch is sealed const result = await service.call( authenticatedEnvironment, batch.friendlyId, @@ -368,24 +1069,25 @@ describe("StreamBatchItemsService", () => { } ); - // Should return sealed=true because the batch was sealed (by the "other" request) - expect(result.sealed).toBe(true); - expect(result.id).toBe(batch.friendlyId); + // Should return sealed=false because item count doesn't match + expect(result.sealed).toBe(false); + expect(result.enqueuedCount).toBe(2); + expect(result.expectedCount).toBe(3); - // Verify the batch is sealed in the database + // Verify the batch is NOT sealed in the database const updatedBatch = await prisma.batchTaskRun.findUnique({ where: { id: batch.id }, }); - expect(updatedBatch?.sealed).toBe(true); - expect(updatedBatch?.status).toBe("PROCESSING"); + expect(updatedBatch?.sealed).toBe(false); + expect(updatedBatch?.status).toBe("PENDING"); await engine.quit(); } ); containerTest( - "should return sealed=true when batch is COMPLETED by BatchQueue before seal attempt", + "should return sealed=true when seal-failed race produces sealed=true + PENDING (post-callback all-created)", async ({ prisma, redisOptions }) => { const engine = new RunEngine({ prisma, @@ -422,14 +1124,12 @@ describe("StreamBatchItemsService", () => { const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); - // Create a batch in PENDING state const batch = await createBatch(prisma, authenticatedEnvironment.id, { runCount: 2, status: "PENDING", sealed: false, }); - // Initialize the batch in Redis await engine.initializeBatch({ batchId: batch.id, friendlyId: batch.friendlyId, @@ -441,8 +1141,6 @@ describe("StreamBatchItemsService", () => { processingConcurrency: 10, }); - // Enqueue items - the enqueued count check passes but the seal updateMany - // will race with tryCompleteBatch moving status to COMPLETED. await engine.enqueueBatchItem(batch.id, authenticatedEnvironment.id, 0, { task: "test-task", payload: JSON.stringify({ data: "item1" }), @@ -454,9 +1152,12 @@ describe("StreamBatchItemsService", () => { payloadType: "application/json", }); - // Simulate the race where BatchQueue's completionCallback runs - // tryCompleteBatch between getEnqueuedCount and the seal updateMany. - // tryCompleteBatch sets status=COMPLETED but NOT sealed=true. + // Simulate the race where a concurrent path seals the batch (sealed=true, + // PROCESSING), then the V2 batchCompletionCallback fires with all runs + // created successfully and resets status to PENDING (sealed stays true). + // Our seal updateMany then fails the conditional (sealed=false no longer + // matches), and the re-query sees sealed=true + PENDING — a perfectly + // valid post-callback state that the SDK retry should treat as success. const racingPrisma = { ...prisma, batchTaskRun: { @@ -466,10 +1167,13 @@ describe("StreamBatchItemsService", () => { await prisma.batchTaskRun.update({ where: { id: batch.id }, data: { - status: "COMPLETED", + sealed: true, + sealedAt: new Date(), + // Intentionally leave status as PENDING — that's exactly what + // the V2 batchCompletionCallback does after all runs are + // created (status PROCESSING → PENDING). }, }); - // The conditional updateMany(where: status="PENDING") would now fail return { count: 0 }; }, findUnique: prisma.batchTaskRun.findUnique.bind(prisma.batchTaskRun), @@ -490,8 +1194,6 @@ describe("StreamBatchItemsService", () => { } ); - // The endpoint should accept the COMPLETED state as a success case so the - // SDK does not retry a batch whose child runs have already finished. expect(result.sealed).toBe(true); expect(result.id).toBe(batch.friendlyId); @@ -499,17 +1201,15 @@ describe("StreamBatchItemsService", () => { where: { id: batch.id }, }); - expect(updatedBatch?.status).toBe("COMPLETED"); - // sealed stays false because the BatchQueue completion path does not set - // it - that's fine, the batch is terminal. - expect(updatedBatch?.sealed).toBe(false); + expect(updatedBatch?.sealed).toBe(true); + expect(updatedBatch?.status).toBe("PENDING"); await engine.quit(); } ); containerTest( - "should throw error when race condition leaves batch in unexpected state", + "should throw when count-mismatch race produces sealed=true + ABORTED (no TaskRuns created)", async ({ prisma, redisOptions }) => { const engine = new RunEngine({ prisma, @@ -546,14 +1246,12 @@ describe("StreamBatchItemsService", () => { const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); - // Create a batch in PENDING state const batch = await createBatch(prisma, authenticatedEnvironment.id, { - runCount: 2, + runCount: 3, status: "PENDING", sealed: false, }); - // Initialize the batch in Redis await engine.initializeBatch({ batchId: batch.id, friendlyId: batch.friendlyId, @@ -561,11 +1259,17 @@ describe("StreamBatchItemsService", () => { environmentType: authenticatedEnvironment.type, organizationId: authenticatedEnvironment.organizationId, projectId: authenticatedEnvironment.projectId, - runCount: 2, + runCount: 3, processingConcurrency: 10, }); - // Enqueue items + // Only enqueue 2 items so the post-loop count check trips into the + // mismatch handler. The race we're simulating: between our pre-loop + // findFirst and the count-mismatch re-query, a concurrent path sealed + // the batch, runs were attempted, every run-creation failed AND the + // pre-failed-TaskRun fallback also failed → callback sets ABORTED. + // The customer has zero TaskRun records to monitor, so the retry must + // throw rather than silently succeed. await engine.enqueueBatchItem(batch.id, authenticatedEnvironment.id, 0, { task: "test-task", payload: JSON.stringify({ data: "item1" }), @@ -577,25 +1281,31 @@ describe("StreamBatchItemsService", () => { payloadType: "application/json", }); - // Create a custom prisma client that simulates a race condition where - // the batch ends up in an unexpected state (ABORTED instead of PROCESSING) + // Override findFirst to flip the batch to sealed=true + ABORTED on the + // re-query that happens INSIDE the count-mismatch branch. The first + // findFirst (pre-loop) must still see PENDING + sealed=false so we + // pass through and reach the count-mismatch branch. + let findFirstCallCount = 0; const racingPrisma = { ...prisma, batchTaskRun: { ...prisma.batchTaskRun, - findFirst: prisma.batchTaskRun.findFirst.bind(prisma.batchTaskRun), - updateMany: async () => { - // Simulate the batch being aborted by another process - await prisma.batchTaskRun.update({ - where: { id: batch.id }, - data: { - sealed: true, - status: "ABORTED", - }, - }); - // Return 0 as if the conditional update failed - return { count: 0 }; + findFirst: async (args: Parameters[0]) => { + findFirstCallCount++; + if (findFirstCallCount >= 2) { + await prisma.batchTaskRun.update({ + where: { id: batch.id }, + data: { + sealed: true, + sealedAt: new Date(), + status: "ABORTED", + completedAt: new Date(), + }, + }); + } + return prisma.batchTaskRun.findFirst.call(prisma.batchTaskRun, args); }, + updateMany: prisma.batchTaskRun.updateMany.bind(prisma.batchTaskRun), findUnique: prisma.batchTaskRun.findUnique.bind(prisma.batchTaskRun), }, } as unknown as PrismaClient; @@ -605,19 +1315,18 @@ describe("StreamBatchItemsService", () => { engine, }); - // Call the service - it should throw because the batch is in an unexpected state await expect( service.call(authenticatedEnvironment, batch.friendlyId, itemsToAsyncIterable([]), { maxItemBytes: 1024 * 1024, }) - ).rejects.toThrow(/unexpected state/); + ).rejects.toThrow(ServiceValidationError); await engine.quit(); } ); containerTest( - "should return sealed=false when item count does not match", + "should return sealed=true when batch is sealed=false + PENDING + processingCompletedAt set (pre-loop post-callback)", async ({ prisma, redisOptions }) => { const engine = new RunEngine({ prisma, @@ -654,14 +1363,93 @@ describe("StreamBatchItemsService", () => { const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); - // Create a batch expecting 3 items + // The V2 batchCompletionCallback set processingCompletedAt without + // touching sealed (sealed gets set by streamBatchItems, not the callback). + // Status stays PENDING because every run was created successfully (the + // callback's "all created, waiting for completion" branch). A Phase 2 + // retry arriving in this state must treat it as success — every item + // has a TaskRun record for the customer to monitor. const batch = await createBatch(prisma, authenticatedEnvironment.id, { - runCount: 3, + runCount: 4, + status: "PENDING", + sealed: false, + processingCompletedAt: new Date(), + }); + + const service = new StreamBatchItemsService({ + prisma, + engine, + }); + + const result = await service.call( + authenticatedEnvironment, + batch.friendlyId, + itemsToAsyncIterable([]), + { + maxItemBytes: 1024 * 1024, + } + ); + + expect(result.sealed).toBe(true); + expect(result.id).toBe(batch.friendlyId); + expect(result.itemsAccepted).toBe(0); + expect(result.itemsDeduplicated).toBe(0); + + await engine.quit(); + } + ); + + containerTest( + "should return sealed=true on count-mismatch when callback fired before our getEnqueuedCount (cleanup race — customer scenario)", + async ({ prisma, redisOptions }) => { + const engine = new RunEngine({ + prisma, + worker: { + redis: redisOptions, + workers: 1, + tasksPerWorker: 10, + pollIntervalMs: 100, + disabled: true, + }, + queue: { + redis: redisOptions, + }, + runLock: { + redis: redisOptions, + }, + machines: { + defaultMachine: "small-1x", + machines: { + "small-1x": { + name: "small-1x" as const, + cpu: 0.5, + memory: 0.5, + centsPerMs: 0.0001, + }, + }, + baseCostInCents: 0.0005, + }, + batchQueue: { + redis: redisOptions, + }, + tracer: trace.getTracer("test", "0.0.0"), + }); + + const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); + + // The customer's exact case: 4-item batch, BatchQueue rushes through + // all items before our service finishes its loop, callback fires + // (setting processingCompletedAt; status stays PENDING because all + // runs created cleanly), cleanup deletes Redis state. Our service + // hits the count-mismatch branch because getBatchEnqueuedCount returns + // 0 (cleaned). The re-query sees sealed=false + PENDING but + // processingCompletedAt is set — the work is done. + const batch = await createBatch(prisma, authenticatedEnvironment.id, { + runCount: 4, status: "PENDING", sealed: false, }); - // Initialize the batch in Redis await engine.initializeBatch({ batchId: batch.id, friendlyId: batch.friendlyId, @@ -669,24 +1457,45 @@ describe("StreamBatchItemsService", () => { environmentType: authenticatedEnvironment.type, organizationId: authenticatedEnvironment.organizationId, projectId: authenticatedEnvironment.projectId, - runCount: 3, + runCount: 4, processingConcurrency: 10, }); - // Only enqueue 2 items (1 short of expected) - await engine.enqueueBatchItem(batch.id, authenticatedEnvironment.id, 0, { - task: "test-task", - payload: JSON.stringify({ data: "item1" }), - payloadType: "application/json", - }); - await engine.enqueueBatchItem(batch.id, authenticatedEnvironment.id, 1, { - task: "test-task", - payload: JSON.stringify({ data: "item2" }), - payloadType: "application/json", - }); + // Force the count-mismatch branch by leaving Redis at 0 items vs + // runCount=4. The pre-loop must see "initial" state (so it passes + // through to the loop), and the count-mismatch re-query must see + // "post-callback" state. Use a findFirst counter to flip the DB + // between those two reads, exactly matching the production timing + // where the callback fires while our loop is running. + let findFirstCallCount = 0; + const racingPrisma = { + ...prisma, + batchTaskRun: { + ...prisma.batchTaskRun, + findFirst: async (args: Parameters[0]) => { + findFirstCallCount++; + if (findFirstCallCount === 2) { + // The post-loop count-mismatch re-query: BatchQueue completed + // all items and the callback fired in the window before this + // read. Status stays PENDING (all runs created OK) but + // processingCompletedAt is now set. + await prisma.batchTaskRun.update({ + where: { id: batch.id }, + data: { + processingCompletedAt: new Date(), + successfulRunCount: 4, + }, + }); + } + return prisma.batchTaskRun.findFirst.call(prisma.batchTaskRun, args); + }, + updateMany: prisma.batchTaskRun.updateMany.bind(prisma.batchTaskRun), + findUnique: prisma.batchTaskRun.findUnique.bind(prisma.batchTaskRun), + }, + } as unknown as PrismaClient; const service = new StreamBatchItemsService({ - prisma, + prisma: racingPrisma, engine, }); @@ -699,18 +1508,13 @@ describe("StreamBatchItemsService", () => { } ); - // Should return sealed=false because item count doesn't match - expect(result.sealed).toBe(false); - expect(result.enqueuedCount).toBe(2); - expect(result.expectedCount).toBe(3); - - // Verify the batch is NOT sealed in the database - const updatedBatch = await prisma.batchTaskRun.findUnique({ - where: { id: batch.id }, - }); - - expect(updatedBatch?.sealed).toBe(false); - expect(updatedBatch?.status).toBe("PENDING"); + // The retry must be treated as success — every item's TaskRun was + // created by the original Phase 2 call. Returning sealed:false here + // (the previous behavior) made the SDK retry the stream against a + // cleaned-up batch, which then 5xx'd, exhausted SDK retries, and + // surfaced as BatchTriggerError despite all runs succeeding. + expect(result.sealed).toBe(true); + expect(result.id).toBe(batch.friendlyId); await engine.quit(); }