Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions apps/server/src/orchestration/Layers/CheckpointReactor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -792,7 +792,9 @@ const make = Effect.gen(function* () {
}),
);

const worker = yield* makeDrainableWorker(processInputSafely);
const worker = yield* makeDrainableWorker((_threadId: ThreadId, input: ReactorInput) =>
processInputSafely(input),
);

const start: CheckpointReactorShape["start"] = Effect.fn("start")(function* () {
yield* Effect.forkScoped(
Expand All @@ -805,7 +807,7 @@ const make = Effect.gen(function* () {
) {
return Effect.void;
}
return worker.enqueue({ source: "domain", event });
return worker.enqueue(event.payload.threadId, { source: "domain", event });
}),
);

Expand All @@ -814,7 +816,7 @@ const make = Effect.gen(function* () {
if (event.type !== "turn.started" && event.type !== "turn.completed") {
return Effect.void;
}
return worker.enqueue({ source: "runtime", event });
return worker.enqueue(event.threadId, { source: "runtime", event });
}),
);
});
Expand Down
145 changes: 125 additions & 20 deletions apps/server/src/orchestration/Layers/ProviderCommandReactor.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,16 @@ async function waitFor(
}

describe("ProviderCommandReactor", () => {
type HarnessOptions = {
readonly baseDir?: string;
readonly threadModelSelection?: ModelSelection;
readonly sessionModelSwitch?: "unsupported" | "in-session";
readonly startSessionImplementation?: (
input: unknown,
session: ProviderSession,
) => Effect.Effect<ProviderSession>;
};

let runtime: ManagedRuntime.ManagedRuntime<
OrchestrationEngineService | ProviderCommandReactor,
unknown
Expand Down Expand Up @@ -93,11 +103,7 @@ describe("ProviderCommandReactor", () => {
createdBaseDirs.clear();
});

async function createHarness(input?: {
readonly baseDir?: string;
readonly threadModelSelection?: ModelSelection;
readonly sessionModelSwitch?: "unsupported" | "in-session";
}) {
async function createHarness(input?: HarnessOptions) {
const now = new Date().toISOString();
const baseDir = input?.baseDir ?? fs.mkdtempSync(path.join(os.tmpdir(), "t3code-reactor-"));
createdBaseDirs.add(baseDir);
Expand All @@ -110,37 +116,46 @@ describe("ProviderCommandReactor", () => {
provider: "codex",
model: "gpt-5-codex",
};
const startSession = vi.fn((_: unknown, input: unknown) => {
const startSessionImplementation: NonNullable<HarnessOptions["startSessionImplementation"]> =
input?.startSessionImplementation ??
((_: unknown, session: ProviderSession) => Effect.succeed(session));
const startSession = vi.fn((_: unknown, startInput: unknown) => {
const sessionIndex = nextSessionIndex++;
const resumeCursor =
typeof input === "object" && input !== null && "resumeCursor" in input
? input.resumeCursor
typeof startInput === "object" && startInput !== null && "resumeCursor" in startInput
? startInput.resumeCursor
: undefined;
const threadId =
typeof input === "object" &&
input !== null &&
"threadId" in input &&
typeof input.threadId === "string"
? ThreadId.make(input.threadId)
typeof startInput === "object" &&
startInput !== null &&
"threadId" in startInput &&
typeof startInput.threadId === "string"
? ThreadId.make(startInput.threadId)
: ThreadId.make(`thread-${sessionIndex}`);
const session: ProviderSession = {
provider: modelSelection.provider,
status: "ready" as const,
runtimeMode:
typeof input === "object" &&
input !== null &&
"runtimeMode" in input &&
(input.runtimeMode === "approval-required" || input.runtimeMode === "full-access")
? input.runtimeMode
typeof startInput === "object" &&
startInput !== null &&
"runtimeMode" in startInput &&
(startInput.runtimeMode === "approval-required" ||
startInput.runtimeMode === "full-access")
? startInput.runtimeMode
: "full-access",
...(modelSelection.model !== undefined ? { model: modelSelection.model } : {}),
threadId,
resumeCursor: resumeCursor ?? { opaque: `resume-${sessionIndex}` },
createdAt: now,
updatedAt: now,
};
runtimeSessions.push(session);
return Effect.succeed(session);
return startSessionImplementation(startInput, session).pipe(
Effect.tap((resolvedSession) =>
Effect.sync(() => {
runtimeSessions.push(resolvedSession);
}),
),
);
});
const sendTurn = vi.fn((_: unknown) =>
Effect.succeed({
Expand Down Expand Up @@ -325,6 +340,96 @@ describe("ProviderCommandReactor", () => {
expect(thread?.session?.runtimeMode).toBe("approval-required");
});

it("does not let one hung thread start block another thread", async () => {
const harness = await createHarness({
startSessionImplementation: (input, session) => {
const threadId =
typeof input === "object" &&
input !== null &&
"threadId" in input &&
typeof input.threadId === "string"
? input.threadId
: null;
return threadId === "thread-1" ? Effect.never : Effect.succeed(session);
},
});
const now = new Date().toISOString();

await Effect.runPromise(
harness.engine.dispatch({
type: "thread.create",
commandId: CommandId.make("cmd-thread-create-2"),
threadId: ThreadId.make("thread-2"),
projectId: asProjectId("project-1"),
title: "Thread 2",
modelSelection: {
provider: "codex",
model: "gpt-5-codex",
},
interactionMode: DEFAULT_PROVIDER_INTERACTION_MODE,
runtimeMode: "approval-required",
branch: null,
worktreePath: null,
createdAt: now,
}),
);

await Effect.runPromise(
harness.engine.dispatch({
type: "thread.turn.start",
commandId: CommandId.make("cmd-turn-start-hung-thread-1"),
threadId: ThreadId.make("thread-1"),
message: {
messageId: asMessageId("user-message-hung-thread-1"),
role: "user",
text: "thread one hangs on start",
attachments: [],
},
interactionMode: DEFAULT_PROVIDER_INTERACTION_MODE,
runtimeMode: "approval-required",
createdAt: now,
}),
);

await Effect.runPromise(
harness.engine.dispatch({
type: "thread.turn.start",
commandId: CommandId.make("cmd-turn-start-hung-thread-2"),
threadId: ThreadId.make("thread-2"),
message: {
messageId: asMessageId("user-message-hung-thread-2"),
role: "user",
text: "thread two should still run",
attachments: [],
},
interactionMode: DEFAULT_PROVIDER_INTERACTION_MODE,
runtimeMode: "approval-required",
createdAt: now,
}),
);

await waitFor(() => harness.startSession.mock.calls.length >= 2);
await waitFor(() =>
harness.sendTurn.mock.calls.some(
([payload]) =>
typeof payload === "object" &&
payload !== null &&
"threadId" in payload &&
payload.threadId === ThreadId.make("thread-2"),
),
);

expect(
harness.sendTurn.mock.calls.some(
([payload]) =>
typeof payload === "object" &&
payload !== null &&
"threadId" in payload &&
payload.threadId === ThreadId.make("thread-1"),
),
).toBe(false);
});

it("generates a thread title on the first turn", async () => {
const harness = await createHarness();
const now = new Date().toISOString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -787,7 +787,9 @@ const make = Effect.gen(function* () {
}),
);

const worker = yield* makeDrainableWorker(processDomainEventSafely);
const worker = yield* makeDrainableWorker((_threadId: ThreadId, event: ProviderIntentEvent) =>
processDomainEventSafely(event),
);

const start: ProviderCommandReactorShape["start"] = Effect.fn("start")(function* () {
const processEvent = Effect.fn("processEvent")(function* (event: OrchestrationEvent) {
Expand All @@ -799,7 +801,7 @@ const make = Effect.gen(function* () {
event.type === "thread.user-input-response-requested" ||
event.type === "thread.session-stop-requested"
) {
return yield* worker.enqueue(event);
return yield* worker.enqueue(event.payload.threadId, event);
}
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1240,20 +1240,22 @@ const make = Effect.fn("make")(function* () {
}),
);

const worker = yield* makeDrainableWorker(processInputSafely);
const worker = yield* makeDrainableWorker((_threadId: ThreadId, input: RuntimeIngestionInput) =>
processInputSafely(input),
);

const start: ProviderRuntimeIngestionShape["start"] = Effect.fn("start")(function* () {
yield* Effect.forkScoped(
Stream.runForEach(providerService.streamEvents, (event) =>
worker.enqueue({ source: "runtime", event }),
worker.enqueue(event.threadId, { source: "runtime", event }),
),
);
yield* Effect.forkScoped(
Stream.runForEach(orchestrationEngine.streamDomainEvents, (event) => {
if (event.type !== "thread.turn-start-requested") {
return Effect.void;
}
return worker.enqueue({ source: "domain", event });
return worker.enqueue(event.payload.threadId, { source: "domain", event });
}),
);
});
Expand Down
45 changes: 40 additions & 5 deletions packages/shared/src/DrainableWorker.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ describe("makeDrainableWorker", () => {
const secondStarted = yield* Deferred.make<void>();
const releaseSecond = yield* Deferred.make<void>();

const worker = yield* makeDrainableWorker((item: string) =>
const worker = yield* makeDrainableWorker((key: string, item: string) =>
Effect.gen(function* () {
if (item === "first") {
yield* Deferred.succeed(firstStarted, undefined).pipe(Effect.orDie);
Expand All @@ -26,11 +26,11 @@ describe("makeDrainableWorker", () => {
yield* Deferred.await(releaseSecond);
}

processed.push(item);
processed.push(`${key}:${item}`);
}),
);

yield* worker.enqueue("first");
yield* worker.enqueue("thread-1", "first");
yield* Deferred.await(firstStarted);

const drained = yield* Deferred.make<void>();
Expand All @@ -40,7 +40,7 @@ describe("makeDrainableWorker", () => {
),
);

yield* worker.enqueue("second");
yield* worker.enqueue("thread-1", "second");
yield* Deferred.succeed(releaseFirst, undefined);
yield* Deferred.await(secondStarted);

Expand All @@ -49,7 +49,42 @@ describe("makeDrainableWorker", () => {
yield* Deferred.succeed(releaseSecond, undefined);
yield* Deferred.await(drained);

expect(processed).toEqual(["first", "second"]);
expect(processed).toEqual(["thread-1:first", "thread-1:second"]);
}),
),
);

it.live("does not let one blocked key stop another key from processing", () =>
Effect.scoped(
Effect.gen(function* () {
const releaseFirst = yield* Deferred.make<void>();
const secondProcessed = yield* Deferred.make<void>();
const processed: string[] = [];

const worker = yield* makeDrainableWorker((key: string, item: string) =>
Effect.gen(function* () {
if (key === "thread-1") {
yield* Deferred.await(releaseFirst);
}

processed.push(`${key}:${item}`);

if (key === "thread-2") {
yield* Deferred.succeed(secondProcessed, undefined).pipe(Effect.orDie);
}
}),
);

yield* worker.enqueue("thread-1", "blocked");
yield* worker.enqueue("thread-2", "ready");
yield* Deferred.await(secondProcessed);

expect(processed).toEqual(["thread-2:ready"]);

yield* Deferred.succeed(releaseFirst, undefined);
yield* worker.drain;

expect(processed).toEqual(["thread-2:ready", "thread-1:blocked"]);
}),
),
);
Expand Down
Loading
Loading