Skip to content

Add configurable assistant delivery mode with buffered-by-default ingestion#98

Merged
juliusmarminge merged 11 commits intomainfrom
codething/9d0ddddb
Feb 27, 2026
Merged

Add configurable assistant delivery mode with buffered-by-default ingestion#98
juliusmarminge merged 11 commits intomainfrom
codething/9d0ddddb

Conversation

@juliusmarminge
Copy link
Copy Markdown
Member

@juliusmarminge juliusmarminge commented Feb 27, 2026

Summary

  • Add assistantDeliveryMode support through orchestration contracts and turn-start flow.
  • Update provider runtime ingestion to track delivery mode per thread/turn/message and buffer assistant deltas by default.
  • Flush buffered assistant text on message.completed/turn finalization, while preserving live delta behavior for streaming mode.
  • Add server/web wiring for settings and chat UI to request streaming delivery mode when selected.
  • Expand tests for ingestion behavior, including buffered default and explicit streaming mode paths.
  • Remove obsolete planning doc .plans/17-claude-code.md.

Testing

  • Added/updated unit tests:
  • apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.test.ts (buffered default + streaming mode behavior)
  • apps/server/src/orchestration/decider.projectScripts.test.ts
  • apps/server/src/wsServer.test.ts
  • Lint: Not run
  • Full test suite: Not run

Note

Medium Risk
Changes the turn-start contract and runtime ingestion behavior for assistant messages, which can affect message ordering/visibility and memory usage during active turns. Risk is moderated by added test coverage for buffered, streaming, and oversized-spill paths.

Overview
Introduces AssistantDeliveryMode (buffered|streaming) in orchestration contracts and propagates it through thread.turn.startthread.turn-start-requested, defaulting to buffered.

Updates ProviderRuntimeIngestion to buffer assistant message.delta text and only emit it on message.completed/turn.completed, with a 24k-char safety spill to cap memory; streaming mode continues emitting live deltas. Web UI adds a persisted setting (enableAssistantStreaming) and sends assistantDeliveryMode when starting a turn; server/websocket tests and ingestion tests are expanded accordingly.

Written by Cursor Bugbot for commit 581d533. This will update automatically on new commits. Configure here.

Note

Add configurable assistant delivery mode and buffer assistant deltas by default with a 24,000‑char spill threshold in ProviderRuntimeIngestion.make in ProviderRuntimeIngestion.ts

Introduce AssistantDeliveryMode ('buffered'|'streaming'), default to buffered, buffer assistant deltas per MessageId with spill at 24,000 chars, flush on message.completed/turn.completed, and set delivery mode from thread.turn-start-requested domain events; wire UI setting to send assistantDeliveryMode on thread.turn.start.

📍Where to Start

Start at ProviderRuntimeIngestion.start and processRuntimeEvent in ProviderRuntimeIngestion.ts, then review decider.decideOrchestrationCommand in decider.ts for how assistantDeliveryMode is emitted.

Macroscope summarized 581d533.

Summary by CodeRabbit

  • New Features

    • Settings: "Responses" toggle to enable assistant streaming (persisted; default off).
    • Chat UI respects the setting and requests streaming (incremental) or buffered (complete) assistant delivery.
    • Backend/protocol now support both delivery modes, defaulting to buffered.
  • Bug Fixes

    • Settings toggle was inadvertently added in two places on the Settings page.
  • Tests

    • Added tests covering buffered and streaming assistant response flows.

@coderabbitai
Copy link
Copy Markdown

coderabbitai bot commented Feb 27, 2026

Warning

Rate limit exceeded

@juliusmarminge has exceeded the limit for the number of commits that can be reviewed per hour. Please wait 2 minutes and 3 seconds before requesting another review.

⌛ How to resolve this issue?

After the wait time has elapsed, a review can be triggered using the @coderabbitai review command as a PR comment. Alternatively, push new commits to this PR.

We recommend that you space out your commits to avoid hitting the rate limit.

🚦 How do rate limits work?

CodeRabbit enforces hourly rate limits for each developer per organization.

Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout.

Please see our FAQ for further information.

📥 Commits

Reviewing files that changed from the base of the PR and between 0f3e5ea and 581d533.

📒 Files selected for processing (10)
  • TODO.md
  • apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.test.ts
  • apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.ts
  • apps/server/src/orchestration/decider.projectScripts.test.ts
  • apps/server/src/orchestration/decider.ts
  • apps/server/src/wsServer.test.ts
  • apps/web/src/appSettings.ts
  • apps/web/src/components/ChatView.tsx
  • apps/web/src/routes/_chat.settings.tsx
  • packages/contracts/src/orchestration.ts

Walkthrough

Adds assistant delivery modes ("buffered" default, "streaming") end-to-end: new contract types, decider payload fields, UI toggle, ingestion-layer delivery-mode state with per-message buffering/spill and unified domain/runtime input queuing, plus tests for buffering vs streaming (some duplicated test blocks).

Changes

Cohort / File(s) Summary
Ingestion & Runtime Flow
apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.ts
Introduces delivery-mode state (per-thread/turn/message), per-message buffered text and spill-tracking, append/take/clear helpers, finalizeAssistantMessage flow, unified input queue for domain/runtime events, domain-driven delivery-mode switching, and enhanced error/context logging.
Ingestion Tests
apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.test.ts
Adds tests for default buffered behavior and streaming when a turn requests streaming; imports MessageId and defines asMessageId helper; diff contains duplicated insertion of the same buffering/streaming test blocks.
Decider / Event Payloads
apps/server/src/orchestration/decider.ts, apps/server/src/orchestration/decider.projectScripts.test.ts
Adds DEFAULT_ASSISTANT_DELIVERY_MODE = "buffered" and threads/turn-start events now carry assistantDeliveryMode (defaulted). Assistant completion events now derive text from command.text ?? "". Test adds assertion for assistantDeliveryMode === "buffered".
Web UI & Settings
apps/web/src/appSettings.ts, apps/web/src/components/ChatView.tsx, apps/web/src/routes/_chat.settings.tsx
Adds enableAssistantStreaming setting (default false). ChatView reads setting to set assistantDeliveryMode on turn.start. Settings page gains a "Responses" toggle bound to the new setting; the toggle block appears twice in the file (duplicated).
Contracts / Types
packages/contracts/src/orchestration.ts
Adds AssistantDeliveryMode type (`"buffered"
Integration Test Update
apps/server/src/wsServer.test.ts
Updates runtime thread start test payload to include assistantDeliveryMode: "streaming".

Sequence Diagram(s)

sequenceDiagram
    participant Client
    participant ChatView
    participant IngestionLayer
    participant MessageBuffer
    participant EventQueue

    Client->>ChatView: start turn (user action)
    ChatView->>IngestionLayer: thread.turn.start (assistantDeliveryMode)
    IngestionLayer->>IngestionLayer: cache delivery mode for turn/message

    alt deliveryMode == "streaming"
        loop assistant delta
            IngestionLayer->>EventQueue: dispatch assistant.delta immediately
            EventQueue->>Client: stream delta
        end
        IngestionLayer->>EventQueue: thread.message.completed (streaming)
        EventQueue->>Client: completion
    else deliveryMode == "buffered"
        loop assistant delta
            IngestionLayer->>MessageBuffer: appendBufferedAssistantText (by messageId)
        end
        IngestionLayer->>IngestionLayer: finalizeAssistantMessage -> flush/spill if needed
        IngestionLayer->>EventQueue: thread.message.completed with buffered text
        EventQueue->>Client: send full message
    end
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

🚥 Pre-merge checks | ✅ 2 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 0.00% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Title check ✅ Passed The title accurately summarizes the main changes: introducing a configurable assistant delivery mode with buffered-as-default ingestion behavior, which aligns with the core objectives of adding AssistantDeliveryMode enum, propagating it through contracts, implementing buffering logic in ProviderRuntimeIngestion, and providing UI controls for streaming mode.
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch codething/9d0ddddb

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (3)
apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.test.ts (1)

292-294: Replace fixed sleeps with deterministic synchronization.

Line 292 and Line 338 rely on timing delays, which can make these tests flaky. Prefer waiting on durable state/events (e.g., specific orchestration events or snapshot predicates) before asserting behavior.

♻️ Suggested pattern
-    await Effect.runPromise(Effect.sleep("30 millis"));
+    await waitForOrchestrationEvent(harness.engine, (event) =>
+      event.type === "thread.turn-start-requested" &&
+      event.payload.threadId === ThreadId.makeUnsafe("thread-1") &&
+      event.payload.assistantDeliveryMode === "streaming",
+    );

Based on learnings: For integration tests, prefer deterministic inputs and explicit state checks; avoid relying on timing assumptions.

Also applies to: 338-339

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.test.ts` around
lines 292 - 294, Replace the brittle fixed-duration waits that call
Effect.runPromise(Effect.sleep(...)) with deterministic synchronization that
polls or awaits a durable condition: instead of sleeping before calling
harness.engine.getReadModel() and looking up ThreadId.makeUnsafe("thread-1"),
implement an explicit waitFor predicate that repeatedly fetches
harness.engine.getReadModel() (or listens for the orchestration event/snapshot)
until the expected thread entry exists (or a timeout/failure occurs); do the
same for the other sleep at the later assertion. Use the existing
harness.engine.getReadModel and ThreadId.makeUnsafe utilities as the condition
source so tests only proceed once the durable state/event is observed.
apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.ts (2)

180-184: Avoid quadratic string growth while buffering deltas.

Line 286 repeatedly concatenates the whole accumulated string (${text}${delta}), which gets expensive for long responses. Buffer chunks and join once at completion.

♻️ Proposed refactor
-  const bufferedAssistantTextByMessageId = yield* Cache.make<MessageId, string>({
+  const bufferedAssistantTextByMessageId = yield* Cache.make<MessageId, Array<string>>({
     capacity: BUFFERED_MESSAGE_TEXT_BY_MESSAGE_ID_CACHE_CAPACITY,
     timeToLive: BUFFERED_MESSAGE_TEXT_BY_MESSAGE_ID_TTL,
-    lookup: () => Effect.succeed(""),
+    lookup: () => Effect.succeed([]),
   });

   const appendBufferedAssistantText = (messageId: MessageId, delta: string) =>
     Cache.getOption(bufferedAssistantTextByMessageId, messageId).pipe(
       Effect.flatMap((existingText) =>
         Cache.set(
           bufferedAssistantTextByMessageId,
           messageId,
           Option.match(existingText, {
-            onNone: () => delta,
-            onSome: (text) => `${text}${delta}`,
+            onNone: () => [delta],
+            onSome: (chunks) => {
+              chunks.push(delta);
+              return chunks;
+            },
           }),
         ),
       ),
     );

   const takeBufferedAssistantText = (messageId: MessageId) =>
     Cache.getOption(bufferedAssistantTextByMessageId, messageId).pipe(
       Effect.flatMap((existingText) =>
         Cache.invalidate(bufferedAssistantTextByMessageId, messageId).pipe(
-          Effect.as(Option.getOrElse(existingText, () => "")),
+          Effect.as(
+            Option.match(existingText, {
+              onNone: () => "",
+              onSome: (chunks) => chunks.join(""),
+            }),
+          ),
         ),
       ),
     );

Also applies to: 278-287, 292-299

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.ts` around
lines 180 - 184, The current cache stores accumulated assistant text as a single
string and updates by repeatedly concatenating (`${text}${delta}`), causing
quadratic growth; change bufferedAssistantTextByMessageId to store an array of
string chunks (e.g., initialize lookup to an empty array) and, when receiving
deltas, push the delta into the array instead of concatenating; when you need
the full text (finalization or read), join the array once into a string. Update
all places that read/write bufferedAssistantTextByMessageId accordingly (push on
update, and use .join('') when converting to final text).

25-25: Centralize the default delivery mode to avoid drift.

Line 25 hardcodes "buffered" locally. This default is now part of cross-layer behavior; keeping local literals increases the risk of silent divergence between orchestration, decider, and web settings over time.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.ts` at line 25,
Replace the local hardcoded default ("buffered") by importing and using the
centralized constant instead of defining DEFAULT_ASSISTANT_DELIVERY_MODE here;
remove the local const DEFAULT_ASSISTANT_DELIVERY_MODE and import the shared
default delivery-mode constant (the project-wide default used by decider/web)
and use it where AssistantDeliveryMode is required, making sure the imported
symbol's type matches AssistantDeliveryMode and updating any references to
DEFAULT_ASSISTANT_DELIVERY_MODE to use the shared constant.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Nitpick comments:
In `@apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.test.ts`:
- Around line 292-294: Replace the brittle fixed-duration waits that call
Effect.runPromise(Effect.sleep(...)) with deterministic synchronization that
polls or awaits a durable condition: instead of sleeping before calling
harness.engine.getReadModel() and looking up ThreadId.makeUnsafe("thread-1"),
implement an explicit waitFor predicate that repeatedly fetches
harness.engine.getReadModel() (or listens for the orchestration event/snapshot)
until the expected thread entry exists (or a timeout/failure occurs); do the
same for the other sleep at the later assertion. Use the existing
harness.engine.getReadModel and ThreadId.makeUnsafe utilities as the condition
source so tests only proceed once the durable state/event is observed.

In `@apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.ts`:
- Around line 180-184: The current cache stores accumulated assistant text as a
single string and updates by repeatedly concatenating (`${text}${delta}`),
causing quadratic growth; change bufferedAssistantTextByMessageId to store an
array of string chunks (e.g., initialize lookup to an empty array) and, when
receiving deltas, push the delta into the array instead of concatenating; when
you need the full text (finalization or read), join the array once into a
string. Update all places that read/write bufferedAssistantTextByMessageId
accordingly (push on update, and use .join('') when converting to final text).
- Line 25: Replace the local hardcoded default ("buffered") by importing and
using the centralized constant instead of defining
DEFAULT_ASSISTANT_DELIVERY_MODE here; remove the local const
DEFAULT_ASSISTANT_DELIVERY_MODE and import the shared default delivery-mode
constant (the project-wide default used by decider/web) and use it where
AssistantDeliveryMode is required, making sure the imported symbol's type
matches AssistantDeliveryMode and updating any references to
DEFAULT_ASSISTANT_DELIVERY_MODE to use the shared constant.

ℹ️ Review info

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 5ec8c9f and 592391a.

📒 Files selected for processing (9)
  • apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.test.ts
  • apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.ts
  • apps/server/src/orchestration/decider.projectScripts.test.ts
  • apps/server/src/orchestration/decider.ts
  • apps/server/src/wsServer.test.ts
  • apps/web/src/appSettings.ts
  • apps/web/src/components/ChatView.tsx
  • apps/web/src/routes/_chat.settings.tsx
  • packages/contracts/src/orchestration.ts

Comment thread apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.ts Outdated
Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.ts`:
- Around line 491-501: The code currently resolves and persists message-level
assistant delivery mode immediately (via resolveAssistantDeliveryModeForMessage
and rememberAssistantDeliveryModeForMessage) which allows out-of-order event
processing to persist "buffered" before a later thread-level
turn-start-requested declares streaming; instead, change the flow to defer
committing the message-level mode and any Buffered actions (e.g.,
appendBufferedAssistantText with event.delta) until a turn-level mode is
observed or a safe timeout elapses: detect the presence of a
turn-start-requested for the same thread/turn first (or atomically wait for
resolveAssistantDeliveryModeForTurn), then resolve and persist the final mode
and only then call rememberAssistantDeliveryModeForMessage and
appendBufferedAssistantText; apply the same pattern to the other usages of
resolveAssistantDeliveryModeForMessage/rememberAssistantDeliveryModeForMessage
in the file to ensure turn-start-requested precedence and avoid silent
downgrades.

ℹ️ Review info

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 592391a and 3a9b8c8.

📒 Files selected for processing (9)
  • apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.test.ts
  • apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.ts
  • apps/server/src/orchestration/decider.projectScripts.test.ts
  • apps/server/src/orchestration/decider.ts
  • apps/server/src/wsServer.test.ts
  • apps/web/src/appSettings.ts
  • apps/web/src/components/ChatView.tsx
  • apps/web/src/routes/_chat.settings.tsx
  • packages/contracts/src/orchestration.ts
🚧 Files skipped from review as they are similar to previous changes (2)
  • apps/web/src/appSettings.ts
  • apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.test.ts

Comment thread apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.ts Outdated
Comment thread apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.ts Outdated
Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.ts`:
- Around line 161-163: assistantDeliveryModeRef is a single, global Ref
(initialized with DEFAULT_ASSISTANT_DELIVERY_MODE) which can be mutated by
interleaved threads and cause cross-thread contamination (e.g., a
turn-start-requested in thread A changing mode to "streaming" affects
message.delta handling in thread B expecting "buffered"). Fix by scoping
delivery mode per thread: replace or augment assistantDeliveryModeRef with a map
keyed by ThreadId (or create a per-thread Ref obtained/created at thread start)
and update all accesses (where turn-start-requested sets the mode and where
message.delta reads it) to use the ThreadId-keyed entry; alternatively, if
single-threaded usage is guaranteed, add a clear comment near
assistantDeliveryModeRef and DEFAULT_ASSISTANT_DELIVERY_MODE documenting the
single-thread assumption.

ℹ️ Review info

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 3a9b8c8 and 8c95335.

📒 Files selected for processing (3)
  • apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.test.ts
  • apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.ts
  • apps/server/src/orchestration/decider.projectScripts.test.ts
🚧 Files skipped from review as they are similar to previous changes (1)
  • apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.test.ts

Comment thread apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.ts
Cache.getOption(bufferedAssistantTextByMessageId, messageId).pipe(
Effect.flatMap((existingText) =>
Cache.invalidate(bufferedAssistantTextByMessageId, messageId).pipe(
Effect.as(Option.getOrElse(existingText, () => "")),
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟢 Low Layers/ProviderRuntimeIngestion.ts:262

If bufferedAssistantTextByMessageId expires between buffering and finalization, takeBufferedAssistantText returns "" and the completion event dispatches empty text—silently losing the message. Consider logging a warning when existingText is None but the message was expected to have content, or using a longer TTL / different data structure.

🚀 Reply "fix it for me" or copy this AI Prompt for your agent:
In file apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.ts around line 262:

If `bufferedAssistantTextByMessageId` expires between buffering and finalization, `takeBufferedAssistantText` returns `""` and the completion event dispatches empty text—silently losing the message. Consider logging a warning when `existingText` is `None` but the message was expected to have content, or using a longer TTL / different data structure.

Evidence trail:
apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.ts lines 29-30 (TTL definition: 120 minutes), lines 171-175 (cache creation with TTL), lines 258-265 (`takeBufferedAssistantText` using `Cache.getOption` and `Option.getOrElse` returning empty string on None), lines 300-326 (`finalizeAssistantMessage` dispatching completion with potentially empty text from expired cache entry, no logging when existingText is None)

Copy link
Copy Markdown
Contributor

@cursor cursor bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 1 potential issue.

Bugbot Autofix prepared a fix for the issue found in the latest run.

  • ✅ Fixed: Global delivery mode ref shared across all threads
    • Replaced the single global Ref<AssistantDeliveryMode> with a Ref<Map<ThreadId, AssistantDeliveryMode>> so each thread's delivery mode is stored and looked up independently, preventing concurrent threads from overwriting each other's mode.

Create PR

Or push these changes by commenting:

@cursor push cb6f052d23
Preview (cb6f052d23)
diff --git a/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.ts b/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.ts
--- a/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.ts
+++ b/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.ts
@@ -158,8 +158,8 @@
   const orchestrationEngine = yield* OrchestrationEngineService;
   const providerService = yield* ProviderService;
 
-  const assistantDeliveryModeRef = yield* Ref.make<AssistantDeliveryMode>(
-    DEFAULT_ASSISTANT_DELIVERY_MODE,
+  const assistantDeliveryModeByThreadRef = yield* Ref.make<Map<ThreadId, AssistantDeliveryMode>>(
+    new Map(),
   );
 
   const turnMessageIdsByTurnKey = yield* Cache.make<string, Set<MessageId>>({
@@ -367,7 +367,8 @@
         event.type === "turn.started" ||
         event.type === "turn.completed"
       ) {
-        const activeTurnId = event.type === "turn.started" ? (toTurnId(event.turnId) ?? null) : null;
+        const activeTurnId =
+          event.type === "turn.started" ? (toTurnId(event.turnId) ?? null) : null;
         const providerThreadIdFromEvent =
           event.type === "thread.started"
             ? ProviderThreadId.makeUnsafe(event.threadId)
@@ -418,7 +419,9 @@
           yield* rememberAssistantMessageId(event.sessionId, turnId, assistantMessageId);
         }
 
-        const assistantDeliveryMode = yield* Ref.get(assistantDeliveryModeRef);
+        const deliveryModeMap = yield* Ref.get(assistantDeliveryModeByThreadRef);
+        const assistantDeliveryMode =
+          deliveryModeMap.get(thread.id) ?? DEFAULT_ASSISTANT_DELIVERY_MODE;
         if (assistantDeliveryMode === "buffered") {
           const spillChunk = yield* appendBufferedAssistantText(assistantMessageId, event.delta);
           if (spillChunk.length > 0) {
@@ -533,10 +536,14 @@
     });
 
   const processDomainEvent = (event: TurnStartRequestedDomainEvent) =>
-    Ref.set(
-      assistantDeliveryModeRef,
-      event.payload.assistantDeliveryMode ?? DEFAULT_ASSISTANT_DELIVERY_MODE,
-    );
+    Ref.update(assistantDeliveryModeByThreadRef, (map) => {
+      const next = new Map(map);
+      next.set(
+        event.payload.threadId,
+        event.payload.assistantDeliveryMode ?? DEFAULT_ASSISTANT_DELIVERY_MODE,
+      );
+      return next;
+    });
 
   const processInput = (input: RuntimeIngestionInput) =>
     input.source === "runtime" ? processRuntimeEvent(input.event) : processDomainEvent(input.event);

Comment thread apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.ts
Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (1)
apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.ts (1)

160-162: ⚠️ Potential issue | 🟠 Major

Scope delivery mode per thread/turn instead of using a single global Ref.

At Line 160 and Line 531, a thread.turn-start-requested from one thread mutates global state that Line 417 then applies to all runtime deltas. Under interleaving activity, this can misroute buffered vs streaming behavior across threads.

Proposed fix (thread-scoped mode map)
- const assistantDeliveryModeRef = yield* Ref.make<AssistantDeliveryMode>(
-   DEFAULT_ASSISTANT_DELIVERY_MODE,
- );
+ const assistantDeliveryModeByThreadIdRef = yield* Ref.make(
+   new Map<ThreadId, AssistantDeliveryMode>(),
+ );
+
+ const getAssistantDeliveryModeForThread = (threadId: ThreadId) =>
+   Ref.get(assistantDeliveryModeByThreadIdRef).pipe(
+     Effect.map((map) => map.get(threadId) ?? DEFAULT_ASSISTANT_DELIVERY_MODE),
+   );
+
+ const setAssistantDeliveryModeForThread = (
+   threadId: ThreadId,
+   mode: AssistantDeliveryMode,
+ ) =>
+   Ref.update(assistantDeliveryModeByThreadIdRef, (map) => {
+     const next = new Map(map);
+     next.set(threadId, mode);
+     return next;
+   });

- const assistantDeliveryMode = yield* Ref.get(assistantDeliveryModeRef);
+ const assistantDeliveryMode = yield* getAssistantDeliveryModeForThread(thread.id);

- const processDomainEvent = (event: TurnStartRequestedDomainEvent) =>
-   Ref.set(
-     assistantDeliveryModeRef,
-     event.payload.assistantDeliveryMode ?? DEFAULT_ASSISTANT_DELIVERY_MODE,
-   );
+ const processDomainEvent = (event: TurnStartRequestedDomainEvent) =>
+   setAssistantDeliveryModeForThread(
+     event.threadId,
+     event.payload.assistantDeliveryMode ?? DEFAULT_ASSISTANT_DELIVERY_MODE,
+   );

Based on learnings: Maintain predictable behavior under load and during failures (session restarts, reconnects, partial streams).

Also applies to: 417-419, 531-535

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.ts` around
lines 160 - 162, The global Ref assistantDeliveryModeRef (created via Ref.make
with DEFAULT_ASSISTANT_DELIVERY_MODE) is being mutated by the
thread.turn-start-requested handler and then applied to all runtime deltas,
causing cross-thread delivery-mode leakage; change this to a thread-scoped map
(e.g., Map<threadId, AssistantDeliveryMode>) stored in a Ref or similar
per-session structure so each thread/turn reads and updates its own entry.
Update the thread.turn-start-requested handler to set the mode only for the
specific threadId, ensure the runtime-delta application logic (the code that
currently reads assistantDeliveryModeRef when building or sending deltas) looks
up the delivery mode by threadId and falls back to
DEFAULT_ASSISTANT_DELIVERY_MODE if absent, and add cleanup on thread end to
remove the threadId entry. Ensure all references to assistantDeliveryModeRef are
replaced with thread-scoped lookups to avoid global mutation.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.ts`:
- Around line 556-565: Replace the unbounded queue with a bounded one to apply
backpressure: change Queue.unbounded<RuntimeIngestionInput>() to
Queue.bounded<RuntimeIngestionInput>(N) (choose an appropriate N like 1000 or
config value) for inputQueue, keep the same finalizer
(Queue.shutdown(inputQueue)), and leave the consumer using
Queue.take(inputQueue). Ensure the producer side (where Queue.offer(inputQueue,
{ source: "runtime", event })) relies on the bounded semantics (it will suspend
when full) or explicitly handle backpressure/drops by wrapping the offer in a
timed or fallback effect if desired; reference symbols: Queue.bounded,
inputQueue, Queue.shutdown, Queue.take, processInputSafely, Queue.offer,
providerService.streamEvents.

---

Duplicate comments:
In `@apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.ts`:
- Around line 160-162: The global Ref assistantDeliveryModeRef (created via
Ref.make with DEFAULT_ASSISTANT_DELIVERY_MODE) is being mutated by the
thread.turn-start-requested handler and then applied to all runtime deltas,
causing cross-thread delivery-mode leakage; change this to a thread-scoped map
(e.g., Map<threadId, AssistantDeliveryMode>) stored in a Ref or similar
per-session structure so each thread/turn reads and updates its own entry.
Update the thread.turn-start-requested handler to set the mode only for the
specific threadId, ensure the runtime-delta application logic (the code that
currently reads assistantDeliveryModeRef when building or sending deltas) looks
up the delivery mode by threadId and falls back to
DEFAULT_ASSISTANT_DELIVERY_MODE if absent, and add cleanup on thread end to
remove the threadId entry. Ensure all references to assistantDeliveryModeRef are
replaced with thread-scoped lookups to avoid global mutation.

ℹ️ Review info

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 8c95335 and 0f3e5ea.

📒 Files selected for processing (1)
  • apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.ts

Comment thread apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.ts
juliusmarminge and others added 11 commits February 26, 2026 20:40
- add `AssistantDeliveryMode` contracts and carry mode through turn-start events
- default assistant responses to buffered mode and preserve buffered text until completion
- support streaming mode end-to-end with ingestion/decider updates and test coverage
- add a Settings toggle to enable assistant streaming in the chat UI
Use conditional spread for turnId to avoid passing undefined
explicitly when the target type expects an optional property.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
- Replace per-thread/turn/message delivery-mode caches with a single `Ref`
- Always complete assistant messages with buffered text from message state
- Remove obsolete delivery-mode cleanup and resolution paths
- cap buffered assistant text memory and spill older chunks as delta events
- finalize buffered messages by emitting retained remainder before completion when spilled
- add regression coverage for oversized buffered deltas and tighten type-safe test assertions
- Change provider runtime ingestion to flush the entire buffered assistant text once it crosses the max size
- Remove partial-retention logic and its keep-chars constant to simplify memory safety behavior
- Remove spilled-state cache tracking for buffered assistant messages
- Emit any remaining buffered text as a finalize delta on completion
- Keep assistant completion payload text empty in buffered mode
- Update wording to say buffered text is flushed as an assistant delta
- Keeps intent clear: memory is capped by emitting the current buffer
- Drop optional `text` from `ThreadMessageAssistantCompleteCommand`
- Stop passing text on assistant-complete ingestion and decider paths
- Ensure completion events no longer overwrite message content
@juliusmarminge juliusmarminge merged commit 4d6e8e1 into main Feb 27, 2026
3 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant