Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
145 changes: 125 additions & 20 deletions apps/server/src/orchestration/Layers/ProviderCommandReactor.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,16 @@ async function waitFor(
}

describe("ProviderCommandReactor", () => {
type HarnessOptions = {
readonly baseDir?: string;
readonly threadModelSelection?: ModelSelection;
readonly sessionModelSwitch?: "unsupported" | "in-session";
readonly startSessionImplementation?: (
input: unknown,
session: ProviderSession,
) => Effect.Effect<ProviderSession>;
};

let runtime: ManagedRuntime.ManagedRuntime<
OrchestrationEngineService | ProviderCommandReactor,
unknown
Expand Down Expand Up @@ -93,11 +103,7 @@ describe("ProviderCommandReactor", () => {
createdBaseDirs.clear();
});

async function createHarness(input?: {
readonly baseDir?: string;
readonly threadModelSelection?: ModelSelection;
readonly sessionModelSwitch?: "unsupported" | "in-session";
}) {
async function createHarness(input?: HarnessOptions) {
const now = new Date().toISOString();
const baseDir = input?.baseDir ?? fs.mkdtempSync(path.join(os.tmpdir(), "t3code-reactor-"));
createdBaseDirs.add(baseDir);
Expand All @@ -110,37 +116,46 @@ describe("ProviderCommandReactor", () => {
provider: "codex",
model: "gpt-5-codex",
};
const startSession = vi.fn((_: unknown, input: unknown) => {
const startSessionImplementation: NonNullable<HarnessOptions["startSessionImplementation"]> =
input?.startSessionImplementation ??
((_: unknown, session: ProviderSession) => Effect.succeed(session));
const startSession = vi.fn((_: unknown, startInput: unknown) => {
const sessionIndex = nextSessionIndex++;
const resumeCursor =
typeof input === "object" && input !== null && "resumeCursor" in input
? input.resumeCursor
typeof startInput === "object" && startInput !== null && "resumeCursor" in startInput
? startInput.resumeCursor
: undefined;
const threadId =
typeof input === "object" &&
input !== null &&
"threadId" in input &&
typeof input.threadId === "string"
? ThreadId.make(input.threadId)
typeof startInput === "object" &&
startInput !== null &&
"threadId" in startInput &&
typeof startInput.threadId === "string"
? ThreadId.make(startInput.threadId)
: ThreadId.make(`thread-${sessionIndex}`);
const session: ProviderSession = {
provider: modelSelection.provider,
status: "ready" as const,
runtimeMode:
typeof input === "object" &&
input !== null &&
"runtimeMode" in input &&
(input.runtimeMode === "approval-required" || input.runtimeMode === "full-access")
? input.runtimeMode
typeof startInput === "object" &&
startInput !== null &&
"runtimeMode" in startInput &&
(startInput.runtimeMode === "approval-required" ||
startInput.runtimeMode === "full-access")
? startInput.runtimeMode
: "full-access",
...(modelSelection.model !== undefined ? { model: modelSelection.model } : {}),
threadId,
resumeCursor: resumeCursor ?? { opaque: `resume-${sessionIndex}` },
createdAt: now,
updatedAt: now,
};
runtimeSessions.push(session);
return Effect.succeed(session);
return startSessionImplementation(startInput, session).pipe(
Effect.tap((resolvedSession) =>
Effect.sync(() => {
runtimeSessions.push(resolvedSession);
}),
),
);
});
const sendTurn = vi.fn((_: unknown) =>
Effect.succeed({
Expand Down Expand Up @@ -325,6 +340,96 @@ describe("ProviderCommandReactor", () => {
expect(thread?.session?.runtimeMode).toBe("approval-required");
});

it("does not let one hung thread start block another thread", async () => {
const harness = await createHarness({
startSessionImplementation: (input, session) => {
const threadId =
typeof input === "object" &&
input !== null &&
"threadId" in input &&
typeof input.threadId === "string"
? input.threadId
: null;
return threadId === "thread-1" ? Effect.never : Effect.succeed(session);
},
});
const now = new Date().toISOString();

await Effect.runPromise(
harness.engine.dispatch({
type: "thread.create",
commandId: CommandId.make("cmd-thread-create-2"),
threadId: ThreadId.make("thread-2"),
projectId: asProjectId("project-1"),
title: "Thread 2",
modelSelection: {
provider: "codex",
model: "gpt-5-codex",
},
interactionMode: DEFAULT_PROVIDER_INTERACTION_MODE,
runtimeMode: "approval-required",
branch: null,
worktreePath: null,
createdAt: now,
}),
);

await Effect.runPromise(
harness.engine.dispatch({
type: "thread.turn.start",
commandId: CommandId.make("cmd-turn-start-hung-thread-1"),
threadId: ThreadId.make("thread-1"),
message: {
messageId: asMessageId("user-message-hung-thread-1"),
role: "user",
text: "thread one hangs on start",
attachments: [],
},
interactionMode: DEFAULT_PROVIDER_INTERACTION_MODE,
runtimeMode: "approval-required",
createdAt: now,
}),
);

await Effect.runPromise(
harness.engine.dispatch({
type: "thread.turn.start",
commandId: CommandId.make("cmd-turn-start-hung-thread-2"),
threadId: ThreadId.make("thread-2"),
message: {
messageId: asMessageId("user-message-hung-thread-2"),
role: "user",
text: "thread two should still run",
attachments: [],
},
interactionMode: DEFAULT_PROVIDER_INTERACTION_MODE,
runtimeMode: "approval-required",
createdAt: now,
}),
);

await waitFor(() => harness.startSession.mock.calls.length >= 2);
await waitFor(() =>
harness.sendTurn.mock.calls.some(
([payload]) =>
typeof payload === "object" &&
payload !== null &&
"threadId" in payload &&
payload.threadId === ThreadId.make("thread-2"),
),
);

expect(
harness.sendTurn.mock.calls.some(
([payload]) =>
typeof payload === "object" &&
payload !== null &&
"threadId" in payload &&
payload.threadId === ThreadId.make("thread-1"),
),
).toBe(false);
});

it("generates a thread title on the first turn", async () => {
const harness = await createHarness();
const now = new Date().toISOString();
Expand Down
25 changes: 22 additions & 3 deletions apps/server/src/orchestration/Layers/ProviderCommandReactor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import {
type TurnId,
} from "@t3tools/contracts";
import { Cache, Cause, Duration, Effect, Equal, Layer, Option, Schema, Stream } from "effect";
import { makeDrainableWorker } from "@t3tools/shared/DrainableWorker";
import { type DrainableWorker, makeDrainableWorker } from "@t3tools/shared/DrainableWorker";

import { resolveThreadWorkspaceCwd } from "../../checkpointing/Utils.ts";
import { GitCore } from "../../git/Services/GitCore.ts";
Expand Down Expand Up @@ -787,7 +787,19 @@ const make = Effect.gen(function* () {
}),
);

const worker = yield* makeDrainableWorker(processDomainEventSafely);
const workersByThreadId = new Map<ThreadId, DrainableWorker<ProviderIntentEvent>>();
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this map is ever growing, workers are never torn down?

Copy link
Copy Markdown
Contributor Author

@ashvinnihalani ashvinnihalani Apr 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah good point, I have never reached a point where the workers threads have grown that much since I regulalry rebuild and clear the thread history. Let me make sure when either we send a session_close id or something similar we remove delete the thread form the map

Copy link
Copy Markdown
Contributor Author

@ashvinnihalani ashvinnihalani Apr 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe this isn't the right abstraction here. I made this because I ran into a very specific use case where I wanted to start a new thread on the existing thread was still starting/hanging. Given that we have a DurableWorker for the ProviderCommandReactor, ProviderRuntimeIngestion, CheckpointReactor it may make more sense to instead make DurableWorker non-blocking for seperate thread events

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Honestly not sure how much punch the workers are pulling anymore. They did a good job guaranteeing order before when stuff were more async. Now we're more bought into the effect runtime so I think some of the issues the worker aimed to solve went away so I don't think we need these workers in every place we currently have them anymore.

I have some branches with some performance work that I got sidetracked from finishing. Will see if I can dig those up


const workerForThread = (threadId: ThreadId) =>
Effect.gen(function* () {
const existing = workersByThreadId.get(threadId);
if (existing) {
return existing;
}

const created = yield* makeDrainableWorker(processDomainEventSafely);
workersByThreadId.set(threadId, created);
return created;
});

const start: ProviderCommandReactorShape["start"] = Effect.fn("start")(function* () {
const processEvent = Effect.fn("processEvent")(function* (event: OrchestrationEvent) {
Expand All @@ -799,6 +811,7 @@ const make = Effect.gen(function* () {
event.type === "thread.user-input-response-requested" ||
event.type === "thread.session-stop-requested"
) {
const worker = yield* workerForThread(event.payload.threadId);
return yield* worker.enqueue(event);
}
});
Expand All @@ -810,7 +823,13 @@ const make = Effect.gen(function* () {

return {
start,
drain: worker.drain,
drain: Effect.gen(function* () {
const workers = Array.from(workersByThreadId.values());
yield* Effect.forEach(workers, (worker) => worker.drain, {
concurrency: "unbounded",
discard: true,
});
}),
} satisfies ProviderCommandReactorShape;
});

Expand Down
Loading