Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 67 additions & 0 deletions apps/server/src/provider/Layers/ClaudeAdapter.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1153,6 +1153,73 @@ describe("ClaudeAdapterLive", () => {
);
});

it.effect("stopSession does not throw into the SDK prompt consumer", () => {
// The SDK consumes user messages via `for await (... of prompt)`.
// Stopping a session must end that loop cleanly — not throw an error.
//
// FakeClaudeQuery.close() masks this by resolving pending iterators
// before the shutdown propagates. Override it to match real SDK behavior
// where close() does not resolve the prompt consumer.
const query = new FakeClaudeQuery();
(query as { close: () => void }).close = () => {
query.closeCalls += 1;
};

let promptConsumerError: unknown = undefined;

const layer = makeClaudeAdapterLive({
createQuery: (input) => {
// Simulate the SDK consuming the prompt iterable
(async () => {
try {
for await (const _message of input.prompt) {
/* SDK processes user messages */
}
} catch (error) {
promptConsumerError = error;
}
})();
return query;
},
}).pipe(
Layer.provideMerge(ServerConfig.layerTest("/tmp/claude-adapter-test", "/tmp")),
Layer.provideMerge(NodeServices.layer),
);

return Effect.gen(function* () {
const adapter = yield* ClaudeAdapter;

const runtimeEventsFiber = Effect.runFork(
Stream.runForEach(adapter.streamEvents, () => Effect.void),
);

yield* adapter.startSession({
threadId: THREAD_ID,
provider: "claudeAgent",
runtimeMode: "full-access",
});

yield* adapter.stopSession(THREAD_ID);

yield* Effect.yieldNow;
yield* Effect.yieldNow;
yield* Effect.yieldNow;
yield* Effect.promise(() => new Promise((resolve) => setTimeout(resolve, 50)));

runtimeEventsFiber.interruptUnsafe();

assert.equal(
promptConsumerError,
undefined,
`Prompt consumer should not receive a thrown error on session stop, ` +
`but got: "${promptConsumerError instanceof Error ? promptConsumerError.message : String(promptConsumerError)}"`,
);
}).pipe(
Effect.provideService(Random.Random, makeDeterministicRandomService()),
Effect.provide(layer),
);
});

it.effect("forwards Claude task progress summaries for subagent updates", () => {
const harness = makeHarness();
return Effect.gen(function* () {
Expand Down
3 changes: 3 additions & 0 deletions apps/server/src/provider/Layers/ClaudeAdapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2258,6 +2258,9 @@ function makeClaudeAdapter(options?: ClaudeAdapterLiveOptions) {
const prompt = Stream.fromQueue(promptQueue).pipe(
Stream.filter((item) => item.type === "message"),
Stream.map((item) => item.message),
Stream.catchCause((cause) =>
Cause.hasInterruptsOnly(cause) ? Stream.empty : Stream.failCause(cause),
),
Stream.toAsyncIterable,
);

Expand Down