diff --git a/apps/server/src/orchestration/Layers/ProviderCommandReactor.test.ts b/apps/server/src/orchestration/Layers/ProviderCommandReactor.test.ts index ded64fb9e6..f58d628623 100644 --- a/apps/server/src/orchestration/Layers/ProviderCommandReactor.test.ts +++ b/apps/server/src/orchestration/Layers/ProviderCommandReactor.test.ts @@ -66,6 +66,16 @@ async function waitFor( } describe("ProviderCommandReactor", () => { + type HarnessOptions = { + readonly baseDir?: string; + readonly threadModelSelection?: ModelSelection; + readonly sessionModelSwitch?: "unsupported" | "in-session"; + readonly startSessionImplementation?: ( + input: unknown, + session: ProviderSession, + ) => Effect.Effect; + }; + let runtime: ManagedRuntime.ManagedRuntime< OrchestrationEngineService | ProviderCommandReactor, unknown @@ -93,11 +103,7 @@ describe("ProviderCommandReactor", () => { createdBaseDirs.clear(); }); - async function createHarness(input?: { - readonly baseDir?: string; - readonly threadModelSelection?: ModelSelection; - readonly sessionModelSwitch?: "unsupported" | "in-session"; - }) { + async function createHarness(input?: HarnessOptions) { const now = new Date().toISOString(); const baseDir = input?.baseDir ?? fs.mkdtempSync(path.join(os.tmpdir(), "t3code-reactor-")); createdBaseDirs.add(baseDir); @@ -110,28 +116,32 @@ describe("ProviderCommandReactor", () => { provider: "codex", model: "gpt-5-codex", }; - const startSession = vi.fn((_: unknown, input: unknown) => { + const startSessionImplementation: NonNullable = + input?.startSessionImplementation ?? + ((_: unknown, session: ProviderSession) => Effect.succeed(session)); + const startSession = vi.fn((_: unknown, startInput: unknown) => { const sessionIndex = nextSessionIndex++; const resumeCursor = - typeof input === "object" && input !== null && "resumeCursor" in input - ? input.resumeCursor + typeof startInput === "object" && startInput !== null && "resumeCursor" in startInput + ? startInput.resumeCursor : undefined; const threadId = - typeof input === "object" && - input !== null && - "threadId" in input && - typeof input.threadId === "string" - ? ThreadId.make(input.threadId) + typeof startInput === "object" && + startInput !== null && + "threadId" in startInput && + typeof startInput.threadId === "string" + ? ThreadId.make(startInput.threadId) : ThreadId.make(`thread-${sessionIndex}`); const session: ProviderSession = { provider: modelSelection.provider, status: "ready" as const, runtimeMode: - typeof input === "object" && - input !== null && - "runtimeMode" in input && - (input.runtimeMode === "approval-required" || input.runtimeMode === "full-access") - ? input.runtimeMode + typeof startInput === "object" && + startInput !== null && + "runtimeMode" in startInput && + (startInput.runtimeMode === "approval-required" || + startInput.runtimeMode === "full-access") + ? startInput.runtimeMode : "full-access", ...(modelSelection.model !== undefined ? { model: modelSelection.model } : {}), threadId, @@ -139,8 +149,13 @@ describe("ProviderCommandReactor", () => { createdAt: now, updatedAt: now, }; - runtimeSessions.push(session); - return Effect.succeed(session); + return startSessionImplementation(startInput, session).pipe( + Effect.tap((resolvedSession) => + Effect.sync(() => { + runtimeSessions.push(resolvedSession); + }), + ), + ); }); const sendTurn = vi.fn((_: unknown) => Effect.succeed({ @@ -325,6 +340,96 @@ describe("ProviderCommandReactor", () => { expect(thread?.session?.runtimeMode).toBe("approval-required"); }); + it("does not let one hung thread start block another thread", async () => { + const harness = await createHarness({ + startSessionImplementation: (input, session) => { + const threadId = + typeof input === "object" && + input !== null && + "threadId" in input && + typeof input.threadId === "string" + ? input.threadId + : null; + return threadId === "thread-1" ? Effect.never : Effect.succeed(session); + }, + }); + const now = new Date().toISOString(); + + await Effect.runPromise( + harness.engine.dispatch({ + type: "thread.create", + commandId: CommandId.make("cmd-thread-create-2"), + threadId: ThreadId.make("thread-2"), + projectId: asProjectId("project-1"), + title: "Thread 2", + modelSelection: { + provider: "codex", + model: "gpt-5-codex", + }, + interactionMode: DEFAULT_PROVIDER_INTERACTION_MODE, + runtimeMode: "approval-required", + branch: null, + worktreePath: null, + createdAt: now, + }), + ); + + await Effect.runPromise( + harness.engine.dispatch({ + type: "thread.turn.start", + commandId: CommandId.make("cmd-turn-start-hung-thread-1"), + threadId: ThreadId.make("thread-1"), + message: { + messageId: asMessageId("user-message-hung-thread-1"), + role: "user", + text: "thread one hangs on start", + attachments: [], + }, + interactionMode: DEFAULT_PROVIDER_INTERACTION_MODE, + runtimeMode: "approval-required", + createdAt: now, + }), + ); + + await Effect.runPromise( + harness.engine.dispatch({ + type: "thread.turn.start", + commandId: CommandId.make("cmd-turn-start-hung-thread-2"), + threadId: ThreadId.make("thread-2"), + message: { + messageId: asMessageId("user-message-hung-thread-2"), + role: "user", + text: "thread two should still run", + attachments: [], + }, + interactionMode: DEFAULT_PROVIDER_INTERACTION_MODE, + runtimeMode: "approval-required", + createdAt: now, + }), + ); + + await waitFor(() => harness.startSession.mock.calls.length >= 2); + await waitFor(() => + harness.sendTurn.mock.calls.some( + ([payload]) => + typeof payload === "object" && + payload !== null && + "threadId" in payload && + payload.threadId === ThreadId.make("thread-2"), + ), + ); + + expect( + harness.sendTurn.mock.calls.some( + ([payload]) => + typeof payload === "object" && + payload !== null && + "threadId" in payload && + payload.threadId === ThreadId.make("thread-1"), + ), + ).toBe(false); + }); + it("generates a thread title on the first turn", async () => { const harness = await createHarness(); const now = new Date().toISOString(); diff --git a/apps/server/src/orchestration/Layers/ProviderCommandReactor.ts b/apps/server/src/orchestration/Layers/ProviderCommandReactor.ts index a1a69f0efa..bd2f824b86 100644 --- a/apps/server/src/orchestration/Layers/ProviderCommandReactor.ts +++ b/apps/server/src/orchestration/Layers/ProviderCommandReactor.ts @@ -12,7 +12,7 @@ import { type TurnId, } from "@t3tools/contracts"; import { Cache, Cause, Duration, Effect, Equal, Layer, Option, Schema, Stream } from "effect"; -import { makeDrainableWorker } from "@t3tools/shared/DrainableWorker"; +import { type DrainableWorker, makeDrainableWorker } from "@t3tools/shared/DrainableWorker"; import { resolveThreadWorkspaceCwd } from "../../checkpointing/Utils.ts"; import { GitCore } from "../../git/Services/GitCore.ts"; @@ -787,7 +787,19 @@ const make = Effect.gen(function* () { }), ); - const worker = yield* makeDrainableWorker(processDomainEventSafely); + const workersByThreadId = new Map>(); + + const workerForThread = (threadId: ThreadId) => + Effect.gen(function* () { + const existing = workersByThreadId.get(threadId); + if (existing) { + return existing; + } + + const created = yield* makeDrainableWorker(processDomainEventSafely); + workersByThreadId.set(threadId, created); + return created; + }); const start: ProviderCommandReactorShape["start"] = Effect.fn("start")(function* () { const processEvent = Effect.fn("processEvent")(function* (event: OrchestrationEvent) { @@ -799,6 +811,7 @@ const make = Effect.gen(function* () { event.type === "thread.user-input-response-requested" || event.type === "thread.session-stop-requested" ) { + const worker = yield* workerForThread(event.payload.threadId); return yield* worker.enqueue(event); } }); @@ -810,7 +823,13 @@ const make = Effect.gen(function* () { return { start, - drain: worker.drain, + drain: Effect.gen(function* () { + const workers = Array.from(workersByThreadId.values()); + yield* Effect.forEach(workers, (worker) => worker.drain, { + concurrency: "unbounded", + discard: true, + }); + }), } satisfies ProviderCommandReactorShape; });