diff --git a/apps/server/src/orchestration/Layers/CheckpointReactor.ts b/apps/server/src/orchestration/Layers/CheckpointReactor.ts index 0b1b203ba2..a9bcf05f18 100644 --- a/apps/server/src/orchestration/Layers/CheckpointReactor.ts +++ b/apps/server/src/orchestration/Layers/CheckpointReactor.ts @@ -792,7 +792,9 @@ const make = Effect.gen(function* () { }), ); - const worker = yield* makeDrainableWorker(processInputSafely); + const worker = yield* makeDrainableWorker((_threadId: ThreadId, input: ReactorInput) => + processInputSafely(input), + ); const start: CheckpointReactorShape["start"] = Effect.fn("start")(function* () { yield* Effect.forkScoped( @@ -805,7 +807,7 @@ const make = Effect.gen(function* () { ) { return Effect.void; } - return worker.enqueue({ source: "domain", event }); + return worker.enqueue(event.payload.threadId, { source: "domain", event }); }), ); @@ -814,7 +816,7 @@ const make = Effect.gen(function* () { if (event.type !== "turn.started" && event.type !== "turn.completed") { return Effect.void; } - return worker.enqueue({ source: "runtime", event }); + return worker.enqueue(event.threadId, { source: "runtime", event }); }), ); }); diff --git a/apps/server/src/orchestration/Layers/ProviderCommandReactor.test.ts b/apps/server/src/orchestration/Layers/ProviderCommandReactor.test.ts index ded64fb9e6..f58d628623 100644 --- a/apps/server/src/orchestration/Layers/ProviderCommandReactor.test.ts +++ b/apps/server/src/orchestration/Layers/ProviderCommandReactor.test.ts @@ -66,6 +66,16 @@ async function waitFor( } describe("ProviderCommandReactor", () => { + type HarnessOptions = { + readonly baseDir?: string; + readonly threadModelSelection?: ModelSelection; + readonly sessionModelSwitch?: "unsupported" | "in-session"; + readonly startSessionImplementation?: ( + input: unknown, + session: ProviderSession, + ) => Effect.Effect; + }; + let runtime: ManagedRuntime.ManagedRuntime< OrchestrationEngineService | ProviderCommandReactor, unknown @@ -93,11 +103,7 @@ describe("ProviderCommandReactor", () => { createdBaseDirs.clear(); }); - async function createHarness(input?: { - readonly baseDir?: string; - readonly threadModelSelection?: ModelSelection; - readonly sessionModelSwitch?: "unsupported" | "in-session"; - }) { + async function createHarness(input?: HarnessOptions) { const now = new Date().toISOString(); const baseDir = input?.baseDir ?? fs.mkdtempSync(path.join(os.tmpdir(), "t3code-reactor-")); createdBaseDirs.add(baseDir); @@ -110,28 +116,32 @@ describe("ProviderCommandReactor", () => { provider: "codex", model: "gpt-5-codex", }; - const startSession = vi.fn((_: unknown, input: unknown) => { + const startSessionImplementation: NonNullable = + input?.startSessionImplementation ?? + ((_: unknown, session: ProviderSession) => Effect.succeed(session)); + const startSession = vi.fn((_: unknown, startInput: unknown) => { const sessionIndex = nextSessionIndex++; const resumeCursor = - typeof input === "object" && input !== null && "resumeCursor" in input - ? input.resumeCursor + typeof startInput === "object" && startInput !== null && "resumeCursor" in startInput + ? startInput.resumeCursor : undefined; const threadId = - typeof input === "object" && - input !== null && - "threadId" in input && - typeof input.threadId === "string" - ? ThreadId.make(input.threadId) + typeof startInput === "object" && + startInput !== null && + "threadId" in startInput && + typeof startInput.threadId === "string" + ? ThreadId.make(startInput.threadId) : ThreadId.make(`thread-${sessionIndex}`); const session: ProviderSession = { provider: modelSelection.provider, status: "ready" as const, runtimeMode: - typeof input === "object" && - input !== null && - "runtimeMode" in input && - (input.runtimeMode === "approval-required" || input.runtimeMode === "full-access") - ? input.runtimeMode + typeof startInput === "object" && + startInput !== null && + "runtimeMode" in startInput && + (startInput.runtimeMode === "approval-required" || + startInput.runtimeMode === "full-access") + ? startInput.runtimeMode : "full-access", ...(modelSelection.model !== undefined ? { model: modelSelection.model } : {}), threadId, @@ -139,8 +149,13 @@ describe("ProviderCommandReactor", () => { createdAt: now, updatedAt: now, }; - runtimeSessions.push(session); - return Effect.succeed(session); + return startSessionImplementation(startInput, session).pipe( + Effect.tap((resolvedSession) => + Effect.sync(() => { + runtimeSessions.push(resolvedSession); + }), + ), + ); }); const sendTurn = vi.fn((_: unknown) => Effect.succeed({ @@ -325,6 +340,96 @@ describe("ProviderCommandReactor", () => { expect(thread?.session?.runtimeMode).toBe("approval-required"); }); + it("does not let one hung thread start block another thread", async () => { + const harness = await createHarness({ + startSessionImplementation: (input, session) => { + const threadId = + typeof input === "object" && + input !== null && + "threadId" in input && + typeof input.threadId === "string" + ? input.threadId + : null; + return threadId === "thread-1" ? Effect.never : Effect.succeed(session); + }, + }); + const now = new Date().toISOString(); + + await Effect.runPromise( + harness.engine.dispatch({ + type: "thread.create", + commandId: CommandId.make("cmd-thread-create-2"), + threadId: ThreadId.make("thread-2"), + projectId: asProjectId("project-1"), + title: "Thread 2", + modelSelection: { + provider: "codex", + model: "gpt-5-codex", + }, + interactionMode: DEFAULT_PROVIDER_INTERACTION_MODE, + runtimeMode: "approval-required", + branch: null, + worktreePath: null, + createdAt: now, + }), + ); + + await Effect.runPromise( + harness.engine.dispatch({ + type: "thread.turn.start", + commandId: CommandId.make("cmd-turn-start-hung-thread-1"), + threadId: ThreadId.make("thread-1"), + message: { + messageId: asMessageId("user-message-hung-thread-1"), + role: "user", + text: "thread one hangs on start", + attachments: [], + }, + interactionMode: DEFAULT_PROVIDER_INTERACTION_MODE, + runtimeMode: "approval-required", + createdAt: now, + }), + ); + + await Effect.runPromise( + harness.engine.dispatch({ + type: "thread.turn.start", + commandId: CommandId.make("cmd-turn-start-hung-thread-2"), + threadId: ThreadId.make("thread-2"), + message: { + messageId: asMessageId("user-message-hung-thread-2"), + role: "user", + text: "thread two should still run", + attachments: [], + }, + interactionMode: DEFAULT_PROVIDER_INTERACTION_MODE, + runtimeMode: "approval-required", + createdAt: now, + }), + ); + + await waitFor(() => harness.startSession.mock.calls.length >= 2); + await waitFor(() => + harness.sendTurn.mock.calls.some( + ([payload]) => + typeof payload === "object" && + payload !== null && + "threadId" in payload && + payload.threadId === ThreadId.make("thread-2"), + ), + ); + + expect( + harness.sendTurn.mock.calls.some( + ([payload]) => + typeof payload === "object" && + payload !== null && + "threadId" in payload && + payload.threadId === ThreadId.make("thread-1"), + ), + ).toBe(false); + }); + it("generates a thread title on the first turn", async () => { const harness = await createHarness(); const now = new Date().toISOString(); diff --git a/apps/server/src/orchestration/Layers/ProviderCommandReactor.ts b/apps/server/src/orchestration/Layers/ProviderCommandReactor.ts index a1a69f0efa..ec535e1118 100644 --- a/apps/server/src/orchestration/Layers/ProviderCommandReactor.ts +++ b/apps/server/src/orchestration/Layers/ProviderCommandReactor.ts @@ -787,7 +787,9 @@ const make = Effect.gen(function* () { }), ); - const worker = yield* makeDrainableWorker(processDomainEventSafely); + const worker = yield* makeDrainableWorker((_threadId: ThreadId, event: ProviderIntentEvent) => + processDomainEventSafely(event), + ); const start: ProviderCommandReactorShape["start"] = Effect.fn("start")(function* () { const processEvent = Effect.fn("processEvent")(function* (event: OrchestrationEvent) { @@ -799,7 +801,7 @@ const make = Effect.gen(function* () { event.type === "thread.user-input-response-requested" || event.type === "thread.session-stop-requested" ) { - return yield* worker.enqueue(event); + return yield* worker.enqueue(event.payload.threadId, event); } }); diff --git a/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.ts b/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.ts index c1241241cc..ec3f7aaddb 100644 --- a/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.ts +++ b/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.ts @@ -1240,12 +1240,14 @@ const make = Effect.fn("make")(function* () { }), ); - const worker = yield* makeDrainableWorker(processInputSafely); + const worker = yield* makeDrainableWorker((_threadId: ThreadId, input: RuntimeIngestionInput) => + processInputSafely(input), + ); const start: ProviderRuntimeIngestionShape["start"] = Effect.fn("start")(function* () { yield* Effect.forkScoped( Stream.runForEach(providerService.streamEvents, (event) => - worker.enqueue({ source: "runtime", event }), + worker.enqueue(event.threadId, { source: "runtime", event }), ), ); yield* Effect.forkScoped( @@ -1253,7 +1255,7 @@ const make = Effect.fn("make")(function* () { if (event.type !== "thread.turn-start-requested") { return Effect.void; } - return worker.enqueue({ source: "domain", event }); + return worker.enqueue(event.payload.threadId, { source: "domain", event }); }), ); }); diff --git a/packages/shared/src/DrainableWorker.test.ts b/packages/shared/src/DrainableWorker.test.ts index 1d7a3a83c7..2fea8b3bc4 100644 --- a/packages/shared/src/DrainableWorker.test.ts +++ b/packages/shared/src/DrainableWorker.test.ts @@ -14,7 +14,7 @@ describe("makeDrainableWorker", () => { const secondStarted = yield* Deferred.make(); const releaseSecond = yield* Deferred.make(); - const worker = yield* makeDrainableWorker((item: string) => + const worker = yield* makeDrainableWorker((key: string, item: string) => Effect.gen(function* () { if (item === "first") { yield* Deferred.succeed(firstStarted, undefined).pipe(Effect.orDie); @@ -26,11 +26,11 @@ describe("makeDrainableWorker", () => { yield* Deferred.await(releaseSecond); } - processed.push(item); + processed.push(`${key}:${item}`); }), ); - yield* worker.enqueue("first"); + yield* worker.enqueue("thread-1", "first"); yield* Deferred.await(firstStarted); const drained = yield* Deferred.make(); @@ -40,7 +40,7 @@ describe("makeDrainableWorker", () => { ), ); - yield* worker.enqueue("second"); + yield* worker.enqueue("thread-1", "second"); yield* Deferred.succeed(releaseFirst, undefined); yield* Deferred.await(secondStarted); @@ -49,7 +49,42 @@ describe("makeDrainableWorker", () => { yield* Deferred.succeed(releaseSecond, undefined); yield* Deferred.await(drained); - expect(processed).toEqual(["first", "second"]); + expect(processed).toEqual(["thread-1:first", "thread-1:second"]); + }), + ), + ); + + it.live("does not let one blocked key stop another key from processing", () => + Effect.scoped( + Effect.gen(function* () { + const releaseFirst = yield* Deferred.make(); + const secondProcessed = yield* Deferred.make(); + const processed: string[] = []; + + const worker = yield* makeDrainableWorker((key: string, item: string) => + Effect.gen(function* () { + if (key === "thread-1") { + yield* Deferred.await(releaseFirst); + } + + processed.push(`${key}:${item}`); + + if (key === "thread-2") { + yield* Deferred.succeed(secondProcessed, undefined).pipe(Effect.orDie); + } + }), + ); + + yield* worker.enqueue("thread-1", "blocked"); + yield* worker.enqueue("thread-2", "ready"); + yield* Deferred.await(secondProcessed); + + expect(processed).toEqual(["thread-2:ready"]); + + yield* Deferred.succeed(releaseFirst, undefined); + yield* worker.drain; + + expect(processed).toEqual(["thread-2:ready", "thread-1:blocked"]); }), ), ); diff --git a/packages/shared/src/DrainableWorker.ts b/packages/shared/src/DrainableWorker.ts index 55483f33e8..b96e5dc631 100644 --- a/packages/shared/src/DrainableWorker.ts +++ b/packages/shared/src/DrainableWorker.ts @@ -1,68 +1,119 @@ /** - * DrainableWorker - A queue-based worker that exposes a `drain()` effect. + * DrainableWorker - A keyed queue-based worker with a global `drain()` effect. * - * Wraps the common `Queue.unbounded` + `Effect.forever` pattern and adds - * a signal that resolves when the queue is empty **and** the current item - * has finished processing. This lets tests replace timing-sensitive - * `Effect.sleep` calls with deterministic `drain()`. + * Each key is processed serially, while different keys may run concurrently. + * This allows thread-scoped work to avoid head-of-line blocking across + * unrelated keys while still preserving in-order processing per key. + * + * We intentionally do not implement `drainKey()`: there is no current + * production use case for per-key draining, and carrying that API would add + * state and semantics we do not need. * * @module DrainableWorker */ -import type { Scope } from "effect"; -import { Effect, TxQueue, TxRef } from "effect"; - -export interface DrainableWorker { - /** - * Enqueue a work item and track it for `drain()`. - * - * This wraps `Queue.offer` so drain state is updated atomically with the - * enqueue path instead of inferring it from queue internals. - */ - readonly enqueue: (item: A) => Effect.Effect; +import { Effect, Exit, Scope, TxRef } from "effect"; - /** - * Resolves when the queue is empty and the worker is idle (not processing). - */ +export interface DrainableWorker { + readonly enqueue: (key: K, item: A) => Effect.Effect; readonly drain: Effect.Effect; } -/** - * Create a drainable worker that processes items from an unbounded queue. - * - * The worker is forked into the current scope and will be interrupted when - * the scope closes. A finalizer shuts down the queue. - * - * @param process - The effect to run for each queued item. - * @returns A `DrainableWorker` with `queue` and `drain`. - */ -export const makeDrainableWorker = ( - process: (item: A) => Effect.Effect, -): Effect.Effect, never, Scope.Scope | R> => +interface DrainableWorkerState { + readonly queuedByKey: Map>; + readonly activeKeys: Set; + readonly totalOutstanding: number; +} + +export const makeDrainableWorker = ( + process: (key: K, item: A) => Effect.Effect, +): Effect.Effect, never, Scope.Scope | R> => Effect.gen(function* () { - const queue = yield* Effect.acquireRelease(TxQueue.unbounded(), TxQueue.shutdown); - const outstanding = yield* TxRef.make(0); + const context = yield* Effect.context(); + const workerScope = yield* Scope.make("sequential"); + yield* Effect.addFinalizer(() => Scope.close(workerScope, Exit.void)); + + const stateRef = yield* TxRef.make>({ + queuedByKey: new Map(), + activeKeys: new Set(), + totalOutstanding: 0, + }); + + const takeNext = (key: K) => + TxRef.modify(stateRef, (state) => { + const queued = state.queuedByKey.get(key); + if (queued === undefined || queued.length === 0) { + const queuedByKey = new Map(state.queuedByKey); + queuedByKey.delete(key); + const activeKeys = new Set(state.activeKeys); + activeKeys.delete(key); + return [null, { ...state, queuedByKey, activeKeys }] as const; + } + + const queuedByKey = new Map(state.queuedByKey); + if (queued.length === 1) { + queuedByKey.delete(key); + } else { + queuedByKey.set(key, queued.slice(1)); + } + return [queued[0] ?? null, { ...state, queuedByKey }] as const; + }).pipe(Effect.tx); - yield* TxQueue.take(queue).pipe( - Effect.tap((a) => - Effect.ensuring( - process(a), - TxRef.update(outstanding, (n) => n - 1), + const completeOne = TxRef.update(stateRef, (state) => ({ + ...state, + totalOutstanding: Math.max(0, state.totalOutstanding - 1), + })).pipe(Effect.tx); + + const runKey = (key: K): Effect.Effect => + takeNext(key).pipe( + Effect.flatMap((item) => + item === null + ? Effect.void + : process(key, item).pipe(Effect.ensuring(completeOne), Effect.andThen(runKey(key))), ), - ), - Effect.forever, - Effect.forkScoped, - ); + ); - const drain: DrainableWorker["drain"] = TxRef.get(outstanding).pipe( - Effect.tap((n) => (n > 0 ? Effect.txRetry : Effect.void)), - Effect.tx, - ); + const enqueue: DrainableWorker["enqueue"] = (key, item) => + TxRef.modify(stateRef, (state) => { + const queuedByKey = new Map(state.queuedByKey); + queuedByKey.set(key, [...(queuedByKey.get(key) ?? []), item]); + + if (state.activeKeys.has(key)) { + return [ + false, + { + ...state, + queuedByKey, + totalOutstanding: state.totalOutstanding + 1, + }, + ] as const; + } - const enqueue = (element: A): Effect.Effect => - TxQueue.offer(queue, element).pipe( - Effect.tap(() => TxRef.update(outstanding, (n) => n + 1)), + const activeKeys = new Set(state.activeKeys); + activeKeys.add(key); + + return [ + true, + { + ...state, + queuedByKey, + activeKeys, + totalOutstanding: state.totalOutstanding + 1, + }, + ] as const; + }).pipe( Effect.tx, + Effect.flatMap((shouldStart) => + shouldStart + ? runKey(key).pipe(Effect.provide(context), Effect.forkIn(workerScope), Effect.asVoid) + : Effect.void, + ), ); - return { enqueue, drain } satisfies DrainableWorker; + const drain: DrainableWorker["drain"] = TxRef.get(stateRef).pipe( + Effect.tap((state) => (state.totalOutstanding > 0 ? Effect.txRetry : Effect.void)), + Effect.asVoid, + Effect.tx, + ); + + return { enqueue, drain } satisfies DrainableWorker; });