Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .server-changes/batch-stream-phase2-retry-idempotency.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
area: webapp
type: fix
---

Stop spurious `BatchTriggerError` failures when a fast-completing `batchTrigger`/`batchTriggerAndWait` raced the stream finalisation - the API now treats these as successes instead of 422s.
101 changes: 90 additions & 11 deletions apps/webapp/app/runEngine/services/streamBatchItems.server.ts
Comment thread
matt-aitken marked this conversation as resolved.
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,55 @@ import {
} from "@trigger.dev/core/v3";
import { BatchId } from "@trigger.dev/core/v3/isomorphic";
import type { BatchItem, RunEngine } from "@internal/run-engine";
import type { BatchTaskRunStatus } from "@trigger.dev/database";
import { prisma, type PrismaClientOrTransaction } from "~/db.server";
import type { AuthenticatedEnvironment } from "~/services/apiAuth.server";
import { logger } from "~/services/logger.server";
import { ServiceValidationError, WithRunEngine } from "../../v3/services/baseService.server";
import { BatchPayloadProcessor } from "../concerns/batchPayloads.server";

/**
* Phase 2 retry idempotency check (TRI-9944).
*
* Returns true when the batch is in a state that means the Phase 2 stream's
* job has already been done — every item has a TaskRun record (real or
* pre-failed) for the customer to monitor. A retry, or the original call
* racing against a fast-completing BatchQueue, should return sealed:true
* in these states so the SDK stops retrying.
*
* Three "work is done" shapes:
* - status moved out of PENDING into PROCESSING/COMPLETED/PARTIAL_FAILED
* (PROCESSING via our seal, COMPLETED via tryCompleteBatch, PARTIAL_FAILED
* via the V2 batchCompletionCallback).
* - status stuck at PENDING but `sealed=true`: another concurrent
* streamBatchItems call sealed the batch and then the callback's
* happy-path branch reset status to PENDING ("all runs created").
* - status stuck at PENDING with `sealed=false` but `processingCompletedAt`
* set: the cleanup-race. BatchQueue rushed through all items, callback
* fired (setting processingCompletedAt), cleanup deleted the Redis
* metadata — all before our service got the chance to seal. The work
* is done; the discriminator is processingCompletedAt which is set
* exclusively by the V2 completion callback.
*
* ABORTED is excluded — it means ZERO TaskRun records were created (every
* per-item attempt failed AND the pre-failed-TaskRun fallback also failed,
* or queue-overload on every item). The customer has nothing to monitor
* at the run level, so the trigger call must throw to give their retry/
* error handling a chance to create a fresh batch.
*/
export function isIdempotentRetrySuccess(
status: BatchTaskRunStatus | null | undefined,
sealed: boolean | null | undefined,
processingCompletedAt: Date | null | undefined
): boolean {
return (
status === "PROCESSING" ||
status === "COMPLETED" ||
status === "PARTIAL_FAILED" ||
(status === "PENDING" && (sealed === true || processingCompletedAt != null))
);
}

export type StreamBatchItemsServiceOptions = {
maxItemBytes: number;
};
Expand Down Expand Up @@ -93,20 +136,35 @@ export class StreamBatchItemsService extends WithRunEngine {
runCount: true,
sealed: true,
batchVersion: true,
processingCompletedAt: true,
},
});

if (!batch) {
throw new ServiceValidationError(`Batch ${batchFriendlyId} not found`);
}

if (batch.sealed) {
throw new ServiceValidationError(
`Batch ${batchFriendlyId} is already sealed and cannot accept more items`
);
if (isIdempotentRetrySuccess(batch.status, batch.sealed, batch.processingCompletedAt)) {
logger.info("Batch already sealed/completed - treating Phase 2 retry as success", {
batchId: batchFriendlyId,
batchSealed: batch.sealed,
batchStatus: batch.status,
processingCompletedAt: batch.processingCompletedAt,
});

return {
id: batchFriendlyId,
itemsAccepted: 0,
itemsDeduplicated: 0,
sealed: true,
runCount: batch.runCount,
};
}

if (batch.status !== "PENDING") {
// ABORTED or any other unexpected non-PENDING state — surface as an error.
// For ABORTED specifically, throwing is required so the customer's
// batchTrigger() retries (a new batch) can recreate the runs.
throw new ServiceValidationError(
`Batch ${batchFriendlyId} is not in PENDING status (current: ${batch.status})`
);
Expand Down Expand Up @@ -220,17 +278,24 @@ export class StreamBatchItemsService extends WithRunEngine {
// COMPLETED (sealed by the BatchQueue completion path before we got here).
const currentBatch = await this._prisma.batchTaskRun.findFirst({
where: { id: batchId },
select: { sealed: true, status: true },
select: { sealed: true, status: true, processingCompletedAt: true },
});

if (currentBatch?.sealed || currentBatch?.status === "COMPLETED") {
if (
isIdempotentRetrySuccess(
currentBatch?.status,
currentBatch?.sealed,
currentBatch?.processingCompletedAt
)
) {
logger.info("Batch already sealed before count check (fast completion)", {
batchId: batchFriendlyId,
itemsAccepted,
itemsDeduplicated,
enqueuedCount,
expectedCount: batch.runCount,
batchStatus: currentBatch.status,
batchStatus: currentBatch?.status,
processingCompletedAt: currentBatch?.processingCompletedAt,
});

return {
Expand All @@ -242,6 +307,15 @@ export class StreamBatchItemsService extends WithRunEngine {
};
}

if (currentBatch?.status === "ABORTED") {
// Zero TaskRuns exist — the count-mismatch sealed:false semantics
// ("retry with missing items") would mislead the SDK. Throw so the
// customer's batchTrigger() retry creates a fresh batch.
throw new ServiceValidationError(
`Batch ${batchFriendlyId} is not in PENDING status (current: ABORTED)`
);
}

logger.warn("Batch item count mismatch", {
batchId: batchFriendlyId,
expected: batch.runCount,
Expand Down Expand Up @@ -300,20 +374,25 @@ export class StreamBatchItemsService extends WithRunEngine {
friendlyId: true,
status: true,
sealed: true,
processingCompletedAt: true,
},
});

if (
(currentBatch?.sealed && currentBatch.status === "PROCESSING") ||
currentBatch?.status === "COMPLETED"
isIdempotentRetrySuccess(
currentBatch?.status,
currentBatch?.sealed,
currentBatch?.processingCompletedAt
)
) {
logger.info("Batch already sealed/completed by concurrent path", {
batchId: batchFriendlyId,
itemsAccepted,
itemsDeduplicated,
envId: environment.id,
batchStatus: currentBatch.status,
batchSealed: currentBatch.sealed,
batchStatus: currentBatch?.status,
batchSealed: currentBatch?.sealed,
processingCompletedAt: currentBatch?.processingCompletedAt,
});

span.setAttribute("itemsAccepted", itemsAccepted);
Expand Down
Loading