diff --git a/apps/web/src/orchestrationRecovery.test.ts b/apps/web/src/orchestrationRecovery.test.ts index fbdbea4ee3..b04622cdbd 100644 --- a/apps/web/src/orchestrationRecovery.test.ts +++ b/apps/web/src/orchestrationRecovery.test.ts @@ -1,6 +1,9 @@ import { describe, expect, it } from "vitest"; -import { createOrchestrationRecoveryCoordinator } from "./orchestrationRecovery"; +import { + createOrchestrationRecoveryCoordinator, + deriveReplayRetryDecision, +} from "./orchestrationRecovery"; describe("createOrchestrationRecoveryCoordinator", () => { it("defers live events until bootstrap completes and then requests replay", () => { @@ -59,10 +62,13 @@ describe("createOrchestrationRecoveryCoordinator", () => { coordinator.classifyDomainEvent(7); coordinator.markEventBatchApplied([{ sequence: 4 }, { sequence: 5 }, { sequence: 6 }]); - expect(coordinator.completeReplayRecovery()).toBe(true); + expect(coordinator.completeReplayRecovery()).toEqual({ + replayMadeProgress: true, + shouldReplay: true, + }); }); - it("does not immediately replay again when replay returns no new events", () => { + it("retries replay when no progress was made but higher live sequences were observed", () => { const coordinator = createOrchestrationRecoveryCoordinator(); coordinator.beginSnapshotRecovery("bootstrap"); @@ -70,7 +76,10 @@ describe("createOrchestrationRecoveryCoordinator", () => { coordinator.classifyDomainEvent(5); coordinator.beginReplayRecovery("sequence-gap"); - expect(coordinator.completeReplayRecovery()).toBe(false); + expect(coordinator.completeReplayRecovery()).toEqual({ + replayMadeProgress: false, + shouldReplay: true, + }); expect(coordinator.getState()).toMatchObject({ latestSequence: 3, highestObservedSequence: 5, @@ -79,6 +88,19 @@ describe("createOrchestrationRecoveryCoordinator", () => { }); }); + it("does not request another replay when a replay made no progress and nothing newer was observed", () => { + const coordinator = createOrchestrationRecoveryCoordinator(); + + coordinator.beginSnapshotRecovery("bootstrap"); + coordinator.completeSnapshotRecovery(3); + coordinator.beginReplayRecovery("sequence-gap"); + + expect(coordinator.completeReplayRecovery()).toEqual({ + replayMadeProgress: false, + shouldReplay: false, + }); + }); + it("marks replay failure as unbootstrapped so snapshot fallback is recovery-only", () => { const coordinator = createOrchestrationRecoveryCoordinator(); @@ -131,3 +153,154 @@ describe("createOrchestrationRecoveryCoordinator", () => { }); }); }); + +describe("deriveReplayRetryDecision", () => { + it("retries immediately when replay made progress", () => { + expect( + deriveReplayRetryDecision({ + previousTracker: { + attempts: 2, + latestSequence: 3, + highestObservedSequence: 5, + }, + completion: { + replayMadeProgress: true, + shouldReplay: true, + }, + recoveryState: { + latestSequence: 5, + highestObservedSequence: 5, + }, + baseDelayMs: 100, + maxNoProgressRetries: 3, + }), + ).toEqual({ + shouldRetry: true, + delayMs: 0, + tracker: null, + }); + }); + + it("caps no-progress retries for the same frontier", () => { + const first = deriveReplayRetryDecision({ + previousTracker: null, + completion: { + replayMadeProgress: false, + shouldReplay: true, + }, + recoveryState: { + latestSequence: 3, + highestObservedSequence: 5, + }, + baseDelayMs: 100, + maxNoProgressRetries: 3, + }); + + const second = deriveReplayRetryDecision({ + previousTracker: first.tracker, + completion: { + replayMadeProgress: false, + shouldReplay: true, + }, + recoveryState: { + latestSequence: 3, + highestObservedSequence: 5, + }, + baseDelayMs: 100, + maxNoProgressRetries: 3, + }); + + const third = deriveReplayRetryDecision({ + previousTracker: second.tracker, + completion: { + replayMadeProgress: false, + shouldReplay: true, + }, + recoveryState: { + latestSequence: 3, + highestObservedSequence: 5, + }, + baseDelayMs: 100, + maxNoProgressRetries: 3, + }); + + const fourth = deriveReplayRetryDecision({ + previousTracker: third.tracker, + completion: { + replayMadeProgress: false, + shouldReplay: true, + }, + recoveryState: { + latestSequence: 3, + highestObservedSequence: 5, + }, + baseDelayMs: 100, + maxNoProgressRetries: 3, + }); + + expect(first).toEqual({ + shouldRetry: true, + delayMs: 100, + tracker: { + attempts: 1, + latestSequence: 3, + highestObservedSequence: 5, + }, + }); + expect(second).toEqual({ + shouldRetry: true, + delayMs: 200, + tracker: { + attempts: 2, + latestSequence: 3, + highestObservedSequence: 5, + }, + }); + expect(third).toEqual({ + shouldRetry: true, + delayMs: 400, + tracker: { + attempts: 3, + latestSequence: 3, + highestObservedSequence: 5, + }, + }); + expect(fourth).toEqual({ + shouldRetry: false, + delayMs: 0, + tracker: null, + }); + }); + + it("resets the retry budget when the replay frontier changes", () => { + const exhausted = { + attempts: 3, + latestSequence: 3, + highestObservedSequence: 5, + }; + + expect( + deriveReplayRetryDecision({ + previousTracker: exhausted, + completion: { + replayMadeProgress: false, + shouldReplay: true, + }, + recoveryState: { + latestSequence: 3, + highestObservedSequence: 6, + }, + baseDelayMs: 100, + maxNoProgressRetries: 3, + }), + ).toEqual({ + shouldRetry: true, + delayMs: 100, + tracker: { + attempts: 1, + latestSequence: 3, + highestObservedSequence: 6, + }, + }); + }); +}); diff --git a/apps/web/src/orchestrationRecovery.ts b/apps/web/src/orchestrationRecovery.ts index 5af48f85b9..270461bd29 100644 --- a/apps/web/src/orchestrationRecovery.ts +++ b/apps/web/src/orchestrationRecovery.ts @@ -13,8 +13,74 @@ export interface OrchestrationRecoveryState { inFlight: OrchestrationRecoveryPhase | null; } +export interface ReplayRecoveryCompletion { + replayMadeProgress: boolean; + shouldReplay: boolean; +} + +export interface ReplayRetryTracker { + attempts: number; + latestSequence: number; + highestObservedSequence: number; +} + +export interface ReplayRetryDecision { + shouldRetry: boolean; + delayMs: number; + tracker: ReplayRetryTracker | null; +} + type SequencedEvent = Readonly<{ sequence: number }>; +export function deriveReplayRetryDecision(input: { + previousTracker: ReplayRetryTracker | null; + completion: ReplayRecoveryCompletion; + recoveryState: Pick; + baseDelayMs: number; + maxNoProgressRetries: number; +}): ReplayRetryDecision { + if (!input.completion.shouldReplay) { + return { + shouldRetry: false, + delayMs: 0, + tracker: null, + }; + } + + if (input.completion.replayMadeProgress) { + return { + shouldRetry: true, + delayMs: 0, + tracker: null, + }; + } + + const previousTracker = input.previousTracker; + const sameFrontier = + previousTracker !== null && + previousTracker.latestSequence === input.recoveryState.latestSequence && + previousTracker.highestObservedSequence === input.recoveryState.highestObservedSequence; + + const attempts = sameFrontier && previousTracker !== null ? previousTracker.attempts + 1 : 1; + if (attempts > input.maxNoProgressRetries) { + return { + shouldRetry: false, + delayMs: 0, + tracker: null, + }; + } + + return { + shouldRetry: true, + delayMs: input.baseDelayMs * 2 ** (attempts - 1), + tracker: { + attempts, + latestSequence: input.recoveryState.latestSequence, + highestObservedSequence: input.recoveryState.highestObservedSequence, + }, + }; +} + export function createOrchestrationRecoveryCoordinator() { let state: OrchestrationRecoveryState = { latestSequence: 0, @@ -120,16 +186,16 @@ export function createOrchestrationRecoveryCoordinator() { return true; }, - completeReplayRecovery(): boolean { + completeReplayRecovery(): ReplayRecoveryCompletion { const replayMadeProgress = replayStartSequence !== null && state.latestSequence > replayStartSequence; replayStartSequence = null; state.inFlight = null; - if (!replayMadeProgress) { - state.pendingReplay = false; - return false; - } - return resolveReplayNeedAfterRecovery().shouldReplay; + const replayResolution = resolveReplayNeedAfterRecovery(); + return { + replayMadeProgress, + shouldReplay: replayResolution.shouldReplay, + }; }, failReplayRecovery(): void { diff --git a/apps/web/src/routes/__root.tsx b/apps/web/src/routes/__root.tsx index 3377e4bb44..c96044fa10 100644 --- a/apps/web/src/routes/__root.tsx +++ b/apps/web/src/routes/__root.tsx @@ -43,6 +43,7 @@ import { projectQueryKeys } from "../lib/projectReactQuery"; import { collectActiveTerminalThreadIds } from "../lib/terminalStateCleanup"; import { deriveOrchestrationBatchEffects } from "../orchestrationEventEffects"; import { createOrchestrationRecoveryCoordinator } from "../orchestrationRecovery"; +import { deriveReplayRetryDecision } from "../orchestrationRecovery"; import { getWsRpcClient } from "~/wsRpcClient"; export const Route = createRootRouteWithContext<{ @@ -189,6 +190,9 @@ function coalesceOrchestrationUiEvents( return coalesced; } +const REPLAY_RECOVERY_RETRY_DELAY_MS = 100; +const MAX_NO_PROGRESS_REPLAY_RETRIES = 3; + function ServerStateBootstrap() { useEffect(() => startServerStateSync(getWsRpcClient().server), []); @@ -309,6 +313,7 @@ function EventRouter() { let disposed = false; disposedRef.current = false; const recovery = createOrchestrationRecoveryCoordinator(); + let replayRetryTracker: import("../orchestrationRecovery").ReplayRetryTracker | null = null; let needsProviderInvalidation = false; const pendingDomainEvents: OrchestrationEvent[] = []; let flushPendingDomainEventsScheduled = false; @@ -435,13 +440,42 @@ function EventRouter() { applyEventBatch(events); } } catch { + replayRetryTracker = null; recovery.failReplayRecovery(); void fallbackToSnapshotRecovery(); return; } - if (!disposed && recovery.completeReplayRecovery()) { - void recoverFromSequenceGap(); + if (!disposed) { + const replayCompletion = recovery.completeReplayRecovery(); + const retryDecision = deriveReplayRetryDecision({ + previousTracker: replayRetryTracker, + completion: replayCompletion, + recoveryState: recovery.getState(), + baseDelayMs: REPLAY_RECOVERY_RETRY_DELAY_MS, + maxNoProgressRetries: MAX_NO_PROGRESS_REPLAY_RETRIES, + }); + replayRetryTracker = retryDecision.tracker; + + if (retryDecision.shouldRetry) { + if (retryDecision.delayMs > 0) { + await new Promise((resolve) => { + setTimeout(resolve, retryDecision.delayMs); + }); + if (disposed) { + return; + } + } + void recoverFromSequenceGap(); + } else if (replayCompletion.shouldReplay && import.meta.env.MODE !== "test") { + console.warn( + "[orchestration-recovery]", + "Stopping replay recovery after no-progress retries.", + { + state: recovery.getState(), + }, + ); + } } };