Skip to content

Standardized messages#11

Closed
t3dotgg wants to merge 2 commits into
mainfrom
theo/standardize-messages
Closed

Standardized messages#11
t3dotgg wants to merge 2 commits into
mainfrom
theo/standardize-messages

Conversation

@t3dotgg

@t3dotgg t3dotgg commented Feb 10, 2026

Copy link
Copy Markdown
Member

Summary by CodeRabbit

  • New Features

    • Introduced provider stream with snapshot/replay frames and per-connection filtering; added openStream() and closeStream().
  • Documentation

    • Added unified Provider WebSocket API docs describing canonical stream frames, normalization, replay semantics, and sync policy.
  • Refactor

    • Switched app from event-based to stream-frame model; renamed respondToRequest → respondToApproval and updated UI/flow to use approval-oriented fields.
  • Tests

    • Added comprehensive tests for normalization, stream store, subscriptions, server wiring, and client stream lifecycle.

Open with Devin

@coderabbitai

coderabbitai Bot commented Feb 10, 2026

Copy link
Copy Markdown

Walkthrough

Replaces raw provider event pushes with a canonical provider stream: adds event normalization, a bounded replay store, a subscription manager for WebSocket streams, new stream/frame contracts and schemas, renames request->approval APIs, and updates server and web clients to consume frames instead of raw events.

Changes

Cohort / File(s) Summary
Docs & Contracts
AGENTS.md, README.md, docs/provider-ws-api.md, packages/contracts/src/index.ts, packages/contracts/src/ws.ts
Documented canonical provider WS API, renamed WebSocket channel to providers.stream, added provider stream module export, and introduced new WS methods (openStream/closeStream) and renamed respondToRequest→respondToApproval.
Stream Schemas
packages/contracts/src/providerStream.ts, packages/contracts/src/provider.test.ts, packages/contracts/src/providerStream.test.ts
Added comprehensive provider stream schemas/types (ProviderCoreEvent, ProviderStreamFrame, ProviderSnapshot, ProvidersOpenStream*), and tests validating frame and core-event shapes.
Provider Raw/Event Renames
packages/contracts/src/provider.ts, packages/contracts/src/ipc.ts, packages/contracts/src/provider.test.ts
Renamed ProviderEvent→ProviderRawEvent, providerEvent schemas→raw variants, requestId→approvalId, and updated IPC surface to expose respondToApproval and streaming APIs.
Server: Normalization & Store
apps/server/src/providerEventNormalizer.ts, apps/server/src/providerEventNormalizer.test.ts, apps/server/src/providerStreamStore.ts, apps/server/src/providerStreamStore.test.ts
New normalizer that maps raw provider events to canonical ProviderCoreEvent, and a ProviderStreamStore that sequences, snapshots, prunes, and serves replay/gap frames; tests added.
Server: Subscription & WS Integration
apps/server/src/providerStreamSubscriptionManager.ts, apps/server/src/providerStreamSubscriptionManager.test.ts, apps/server/src/wsServer.ts, apps/server/src/wsServer.test.ts
New subscription manager to open/close streams, filter frames per-socket, manage backpressure, and publish frames; wsServer wired to normalize, append, and publish frames instead of broadcasting raw events.
Server: Manager/API Changes
apps/server/src/providerManager.ts, apps/server/src/providerManager.test.ts, apps/server/src/codexAppServerManager.ts
Switched internal/public event types to ProviderRawEvent, replaced respondToRequest→respondToApproval (approvalId), and updated emission and pending-approval handling across manager code and tests.
Web: Native API & Transport
apps/web/src/wsNativeApi.ts, apps/web/src/wsNativeApi.test.ts, apps/web/src/wsTransport.ts
Replaced onEvent with onStream, added openStream/closeStream APIs and stream listener plumbing, added connection-state hooks in transport, and tests mocking stream lifecycle.
Web: Store, Session, UI
apps/web/src/store.ts, apps/web/src/store.test.ts, apps/web/src/session-logic.ts, apps/web/src/session-logic.test.ts, apps/web/src/types.ts, apps/web/src/App.tsx, apps/web/src/components/ChatView.tsx
Frame-driven client changes: introduced ThreadEvent (seq/at/event), APPLY_STREAM_FRAME action, lastProviderSeq state, refactored evolveSession/deriveWorkLogEntries/applyEventToMessages to use ProviderCoreEvent and ThreadEvent, and updated UI to use approvalId/approvalKind and activeAssistantMessageRef.
Misc Tests & Additions
apps/server/src/providerStreamStore.test.ts, other new tests across server/web/packages`
Multiple new/updated tests exercising normalization, store replay/eviction, subscription behavior, frame application, and web client stream handling.

Sequence Diagram(s)

sequenceDiagram
    participant Provider as Provider
    participant Codex as CodexAppServer
    participant Normalizer as EventNormalizer
    participant Store as StreamStore
    participant SubMgr as SubscriptionManager
    participant WS as WebSocketClient

    Provider->>Codex: send ProviderRawEvent
    Codex->>Normalizer: normalize(raw, currentSession)
    Normalizer-->>Codex: ProviderCoreEvent[]
    Codex->>Store: appendEvent(coreEvent)
    Store-->>Codex: ProviderStreamFrame
    Codex->>SubMgr: publish(frame)
    SubMgr->>SubMgr: filter & apply backpressure
    SubMgr->>WS: push frame on "providers.stream"
    WS->>Client: client processes snapshot/replay/event/gap
Loading
sequenceDiagram
    participant WSClient as Client
    participant WSServer as WS Server
    participant SubMgr as SubscriptionManager
    participant Store as StreamStore

    WSClient->>WSServer: providers.openStream(input:{afterSeq?, sessionIds?, ...})
    WSServer->>SubMgr: openStream(ws, input)
    SubMgr->>Store: if snapshot needed -> buildSnapshotFrame()
    Store-->>SubMgr: snapshot frame
    SubMgr->>WSClient: push snapshot frame (mode: snapshot)
    alt afterSeq valid
        SubMgr->>Store: selectReplay(afterSeq)
        Store-->>SubMgr: events
        SubMgr->>WSClient: push replay/event frames (mode: replay)
    else cursor too old
        SubMgr->>WSClient: push gap frame (reason: cursor_too_old)
        SubMgr->>WSClient: then push snapshot_resync
    end
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~75 minutes

🚥 Pre-merge checks | ✅ 1 | ❌ 2
❌ Failed checks (2 warnings)
Check name Status Explanation Resolution
Title check ⚠️ Warning The PR title 'Standardized messages' is vague and does not clearly convey the primary change. The actual changeset involves a major architectural refactor: transitioning from event-based to frame-based streaming with approval/normalization/replay logic. Update the title to reflect the core change, e.g., 'Implement canonical provider stream with frame-based architecture and normalization pipeline' or 'Refactor provider events to stream-based architecture with approval/replay support'.
Docstring Coverage ⚠️ Warning Docstring coverage is 4.41% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (1 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch theo/standardize-messages

Tip

Issue Planner is now in beta. Read the docs and try it out! Share your feedback on Discord.


Comment @coderabbitai help to get the list of available commands and usage tips.

Comment thread apps/web/src/store.ts
if (frame.kind === "snapshot") {
return {
...state,
threads: applySnapshotToThreads(state.threads, frame.data, frame.seq),

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 Medium

src/store.ts:466 When a snapshot arrives, applySnapshotToThread filters out locally-streaming messages and only adds back those in snapshot.activeMessages. If a message completed server-side between the last delta and the snapshot, it disappears from both sets. Consider also keeping messages whose IDs aren't in activeMessageIds regardless of their streaming flag, or fetching completed messages from the snapshot.

🚀 Want me to fix this? Reply ex: "fix it for me".

🤖 Prompt for AI
In file apps/web/src/store.ts around line 466:

When a snapshot arrives, `applySnapshotToThread` filters out locally-streaming messages and only adds back those in `snapshot.activeMessages`. If a message completed server-side between the last delta and the snapshot, it disappears from both sets. Consider also keeping messages whose IDs aren't in `activeMessageIds` regardless of their `streaming` flag, or fetching completed messages from the snapshot.

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Fix all issues with AI agents
In `@apps/server/src/wsServer.test.ts`:
- Around line 216-262: The test currently reads exactly two frames after sending
WS_METHODS.providersOpenStream which can miss the snapshot or response if extra
frames arrive; change the logic in the test (around the waitForMessage calls and
variables sawSnapshotPush/response) to loop calling waitForMessage until both a
push with channel WS_CHANNELS.providerStream and data.kind === "snapshot" is
seen and a WsResponse matching the sent id is observed (store messages or update
sawSnapshotPush and response as you iterate), but enforce a hard cap (max
iterations or timeout) to avoid hangs and fail the test if the cap is reached.
🧹 Nitpick comments (3)
apps/web/src/components/ChatView.tsx (1)

122-124: Consider useTransition for approval-response pending state.
respondingApprovalIds manually tracks async pending state; per repo guidance, consider moving this flow to useTransition/startTransition (even if you still keep a per-approval map for UI granularity).
Based on learnings: "Use useTransition for async actions instead of manually managing pending state. Replace manual pending state management with startTransition callback pattern."

Also applies to: 485-512, 612-675

apps/server/src/providerEventNormalizer.ts (1)

45-58: mapApprovalDecision defaults unknown decisions to "cancel".

This is a reasonable defensive default, but consider logging unknown decision values for debugging purposes during development, as silently mapping unexpected values could mask provider API changes.

apps/web/src/store.ts (1)

138-184: Consider consolidating eventThreadId with a type-narrowing approach.

The function has repetitive structure checking each event type individually. While correct and explicit, this could be simplified using a common pattern if most event types share a threadId property.

♻️ Optional: Simplified extraction
 function eventThreadId(event: ProviderCoreEvent): string | undefined {
   if (event.type === "session.updated") {
     return event.session.threadId;
   }
-
-  if (event.type === "turn.started") {
-    return event.threadId;
-  }
-
-  if (event.type === "turn.completed") {
-    return event.threadId;
-  }
-  // ... etc
+  
+  // Most event types have a direct threadId property
+  if ("threadId" in event) {
+    return event.threadId;
+  }
 
   return undefined;
 }

Comment on lines +216 to +262
it("opens provider stream with snapshot and returns stream metadata", async () => {
server = createTestServer({ cwd: "/test" });
await server.start();
const addr = server.httpServer.address();
const port = typeof addr === "object" && addr !== null ? addr.port : 0;

const ws = await connectWs(port);
connections.push(ws);

// Consume welcome
await waitForMessage(ws);

const id = crypto.randomUUID();
ws.send(
JSON.stringify({
id,
method: WS_METHODS.providersOpenStream,
params: {},
}),
);

const first = (await waitForMessage(ws)) as Record<string, unknown>;
const second = (await waitForMessage(ws)) as Record<string, unknown>;
const messages = [first, second];

let response: WsResponse | null = null;
const sawSnapshotPush = messages.some(
(message) =>
message.type === "push" &&
message.channel === WS_CHANNELS.providerStream &&
typeof message.data === "object" &&
message.data !== null &&
(message.data as { kind?: string }).kind === "snapshot",
);

for (const message of messages) {
if (message.id === id) {
response = message as WsResponse;
}
}

expect(sawSnapshotPush).toBe(true);
expect(response).not.toBeNull();
expect(response?.error).toBeUndefined();
const result = response?.result as { mode?: string };
expect(result?.mode).toBe("snapshot");
});

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Make the open-stream test resilient to extra frames.

The test only captures two messages after providersOpenStream. If the server emits an extra frame (e.g., replay/gap/debug), the response might fall outside those two and the test can flake or miss the assertion. Consider looping until both the snapshot push and matching response are observed (with a cap to avoid hangs).

🔧 Suggested fix
-    const first = (await waitForMessage(ws)) as Record<string, unknown>;
-    const second = (await waitForMessage(ws)) as Record<string, unknown>;
-    const messages = [first, second];
-
-    let response: WsResponse | null = null;
-    const sawSnapshotPush = messages.some(
-      (message) =>
-        message.type === "push" &&
-        message.channel === WS_CHANNELS.providerStream &&
-        typeof message.data === "object" &&
-        message.data !== null &&
-        (message.data as { kind?: string }).kind === "snapshot",
-    );
-
-    for (const message of messages) {
-      if (message.id === id) {
-        response = message as WsResponse;
-      }
-    }
+    const messages: Record<string, unknown>[] = [];
+    let response: WsResponse | null = null;
+    let sawSnapshotPush = false;
+
+    while (messages.length < 5 && (!response || !sawSnapshotPush)) {
+      const message = (await waitForMessage(ws)) as Record<string, unknown>;
+      messages.push(message);
+      if (
+        message.type === "push" &&
+        message.channel === WS_CHANNELS.providerStream &&
+        typeof message.data === "object" &&
+        message.data !== null &&
+        (message.data as { kind?: string }).kind === "snapshot"
+      ) {
+        sawSnapshotPush = true;
+      }
+      if (message.id === id) {
+        response = message as WsResponse;
+      }
+    }

Based on learnings: For integration tests: Prefer deterministic inputs and explicit state checks; avoid relying on logs or timing assumptions. Keep fixtures cohesive and reuse identities across sequential tests. Synchronize on durable workflow status or snapshot state. Assert on final state and side effects. Use stable helper scripts like SHELL_SETUP_SCRIPT for runtime setup. Clean up external resources in finally blocks.

🤖 Prompt for AI Agents
In `@apps/server/src/wsServer.test.ts` around lines 216 - 262, The test currently
reads exactly two frames after sending WS_METHODS.providersOpenStream which can
miss the snapshot or response if extra frames arrive; change the logic in the
test (around the waitForMessage calls and variables sawSnapshotPush/response) to
loop calling waitForMessage until both a push with channel
WS_CHANNELS.providerStream and data.kind === "snapshot" is seen and a WsResponse
matching the sent id is observed (store messages or update sawSnapshotPush and
response as you iterate), but enforce a hard cap (max iterations or timeout) to
avoid hangs and fail the test if the cap is reached.

@macroscopeapp

macroscopeapp Bot commented Feb 12, 2026

Copy link
Copy Markdown
Contributor

Standardize provider messaging by replacing legacy providers.onEvent with sequenced providers.onStream frames and updating web reducer to apply APPLY_STREAM_FRAME with lastProviderSeq monotonic gating across apps/web, apps/server, and contracts

Move to a single canonical provider stream with snapshot/event/gap framing, update client reducer and session/message logic to process ProviderCoreEvent from ProviderStreamFrame, add connection-aware open/close lifecycle in createWsNativeApi, and implement server-side normalization, replay, snapshots, and subscription management. Key entry points include store.ts, wsNativeApi.ts, and wsServer.ts.

📍Where to Start

Start with the stream reducer and frame handling in APPLY_STREAM_FRAME within store.ts, then review server stream framing and subscriptions in providerStreamStore.ts and providerStreamSubscriptionManager.ts.


📊 Macroscope summarized 58c10bb. 19 files reviewed, 61 issues evaluated, 2 issues filtered, 5 comments posted. View details

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 5

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
apps/web/src/store.ts (1)

80-98: ⚠️ Potential issue | 🟠 Major

Persisted lastProviderSeq is discarded.
Line 97 resets the cursor to 0 even when a persisted value exists, which breaks resume/dedupe across reloads. Preserve the hydrated value when present (default to 0 only when missing).

🔧 Proposed fix
-    return { ...hydrated, diffOpen: false, lastProviderSeq: 0 };
+    return {
+      ...hydrated,
+      diffOpen: false,
+      lastProviderSeq: hydrated.lastProviderSeq ?? 0,
+    };
🤖 Fix all issues with AI agents
In `@apps/server/src/providerEventNormalizer.ts`:
- Around line 84-97: The mapApprovalDecision function currently coerces timeout
decisions to "cancel"; update it to explicitly return "timed_out" for timeout
inputs by adding a branch that maps the external timeout value(s) (e.g.,
"timedOut" and optionally "timed_out") to the internal "timed_out" return value;
modify the switch in mapApprovalDecision to include case "timedOut": return
"timed_out"; (and add case "timed_out": return "timed_out" if your input source
may already use that format) so timeouts are handled explicitly instead of
falling through to the default "cancel".

In `@apps/web/src/store.ts`:
- Around line 288-362: The current applySnapshotToThread clears a thread when
its session is missing from ProviderSnapshot (making partial snapshots
destructive); change this to detect partial snapshots in applySnapshotToThreads
by collecting snapshot.sessions' sessionIds and, if any existing
thread.session.sessionId is missing, treat the snapshot as partial and skip
applying it (return threads unchanged) or pass a flag into applySnapshotToThread
so it returns the original thread when snapshotSession is undefined; update
applySnapshotToThread to only reset/clear a thread when snapshot is
known-complete (snapshotSession exists) and/or log/debug the partial-snapshot
case so callers know filtering occurred (refer to applySnapshotToThreads,
applySnapshotToThread, ProviderSnapshot, and session.sessionId).

In `@apps/web/src/wsNativeApi.test.ts`:
- Around line 114-155: The test is emitting on WS_CHANNELS.providerStream (which
maps to providers.stream) but the app expects provider events on the
providers.event channel; change the emit call in the test from
WS_CHANNELS.providerStream to the correct channel constant (e.g.
WS_CHANNELS.providersEvent or WS_CHANNELS['providers.event']) in the
transportState.latest?.emitPush invocation and any other test usages, ensuring
the provider event payload remains unchanged and the test now pushes to the
providers.event channel that the web consumers use.

In `@apps/web/src/wsNativeApi.ts`:
- Around line 210-243: The closeStream implementation can leave the stream open
if streamOpenInFlight is pending; modify closeStream (and its use of
pendingCloseAfterOpen/streamOpen flags) to, when streamOpenInFlight exists,
await that promise before calling
transport.request(WS_METHODS.providersCloseStream), then set streamOpen = false
and pendingCloseAfterOpen = false after the close completes (and still
catch/ignore errors from transport.request). Ensure you reference and update the
same symbols: closeStream, streamOpenInFlight, streamOpen,
pendingCloseAfterOpen, openStream, and transport.request so the close always
happens after any in-flight open resolves and the flags are reset.
- Around line 155-178: Update the WebSocket subscription to use the providers
event channel instead of the stream channel: replace the use of
WS_CHANNELS.providerStream in the transport.subscribe call with the providers
event channel (WS_CHANNELS.providersEvent or the literal "providers.event" as
defined in your WS_CHANNELS), leaving the rest of the handler
(providerStreamFrameSchema parsing, lastAppliedSeq/streamOpening logic, and
dispatch to providerStreamListeners) unchanged so the web app consumes provider
events on the correct channel.
🧹 Nitpick comments (1)
docs/provider-ws-api.md (1)

313-321: Reduce repeated “Update” sentence starts in the checklist.
LanguageTool flagged successive sentences starting with “Update”; varying wording improves readability.

✍️ Possible rewording
-3. Update server normalizer/store/subscriptions as needed.
-4. Update client stream/reducer logic as needed.
-5. Update tests in contracts/server/web.
+3. Adjust server normalizer/store/subscriptions as needed.
+4. Revise client stream/reducer logic as needed.
+5. Refresh tests in contracts/server/web.

Comment on lines +84 to +97
function mapApprovalDecision(decision: string): "accept" | "accept_for_session" | "decline" | "cancel" | "timed_out" {
switch (decision) {
case "accept":
return "accept";
case "acceptForSession":
return "accept_for_session";
case "decline":
return "decline";
case "cancel":
return "cancel";
default:
return "cancel";
}
}

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Handle timed-out approval decisions explicitly.
Line 84: mapApprovalDecision never returns timed_out, so timeout decisions will be coerced to cancel.

🔧 Proposed fix
 function mapApprovalDecision(decision: string): "accept" | "accept_for_session" | "decline" | "cancel" | "timed_out" {
   switch (decision) {
     case "accept":
       return "accept";
     case "acceptForSession":
       return "accept_for_session";
     case "decline":
       return "decline";
     case "cancel":
       return "cancel";
+    case "timed_out":
+    case "timedOut":
+      return "timed_out";
     default:
       return "cancel";
   }
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
function mapApprovalDecision(decision: string): "accept" | "accept_for_session" | "decline" | "cancel" | "timed_out" {
switch (decision) {
case "accept":
return "accept";
case "acceptForSession":
return "accept_for_session";
case "decline":
return "decline";
case "cancel":
return "cancel";
default:
return "cancel";
}
}
function mapApprovalDecision(decision: string): "accept" | "accept_for_session" | "decline" | "cancel" | "timed_out" {
switch (decision) {
case "accept":
return "accept";
case "acceptForSession":
return "accept_for_session";
case "decline":
return "decline";
case "cancel":
return "cancel";
case "timed_out":
case "timedOut":
return "timed_out";
default:
return "cancel";
}
}
🤖 Prompt for AI Agents
In `@apps/server/src/providerEventNormalizer.ts` around lines 84 - 97, The
mapApprovalDecision function currently coerces timeout decisions to "cancel";
update it to explicitly return "timed_out" for timeout inputs by adding a branch
that maps the external timeout value(s) (e.g., "timedOut" and optionally
"timed_out") to the internal "timed_out" return value; modify the switch in
mapApprovalDecision to include case "timedOut": return "timed_out"; (and add
case "timed_out": return "timed_out" if your input source may already use that
format) so timeouts are handled explicitly instead of falling through to the
default "cancel".

Comment thread apps/web/src/store.ts
Comment on lines +288 to +362
function applySnapshotToThread(
thread: Thread,
snapshot: ProviderSnapshot,
seq: number,
): Thread {
if (!thread.session) {
return thread;
}

const sessionId = thread.session.sessionId;
const snapshotSession = snapshot.sessions.find((session) => session.sessionId === sessionId);
if (!snapshotSession) {
return {
...thread,
session: null,
messages: thread.messages.map((message) => ({
...message,
streaming: false,
})),
events: [],
error: null,
latestTurnId: undefined,
latestTurnStartedAt: undefined,
latestTurnCompletedAt: undefined,
latestTurnDurationMs: undefined,
};
}

const activeTurns = snapshot.activeTurns
.filter((turn) => turn.sessionId === sessionId)
.toSorted((left, right) => Date.parse(right.startedAt) - Date.parse(left.startedAt));
const activeTurn = activeTurns[0];

const activeMessages = snapshot.activeMessages
.filter((message) => message.sessionId === sessionId)
.toSorted((left, right) => Date.parse(left.startedAt) - Date.parse(right.startedAt));
const activeMessageIds = new Set(activeMessages.map((message) => message.messageId));

const keptMessages = thread.messages
.filter((message) => !message.streaming)
.filter((message) => !activeMessageIds.has(message.id));

const hydratedMessages = activeMessages.map((message) => ({
id: message.messageId,
role: "assistant" as const,
text: message.text,
createdAt: message.startedAt,
streaming: true,
}));

const pendingApprovals = snapshot.pendingApprovals.filter(
(approval) => approval.sessionId === sessionId,
);

return {
...thread,
codexThreadId: snapshotSession.threadId ?? thread.codexThreadId,
session: snapshotSession,
messages: [...keptMessages, ...hydratedMessages],
events: approvalEventsFromSnapshot(pendingApprovals, seq),
error: snapshotSession.lastError ?? (snapshotSession.status === "error" ? thread.error : null),
latestTurnId: activeTurn?.turnId,
latestTurnStartedAt: activeTurn?.startedAt,
latestTurnCompletedAt: undefined,
latestTurnDurationMs: undefined,
};
}

function applySnapshotToThreads(
threads: Thread[],
snapshot: ProviderSnapshot,
seq: number,
): Thread[] {
return threads.map((thread) => applySnapshotToThread(thread, snapshot, seq));
}

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Search for openStream calls and sessionIds usage
echo "=== Searching for openStream calls ==="
rg -n "openStream" apps/web/src --type ts --type tsx -A 3 -B 1

echo "=== Searching for providers.openStream calls ==="
rg -n "providers\.openStream" apps/web/src --type ts --type tsx -A 3 -B 1

echo "=== Searching for sessionIds filtering ==="
rg -n "sessionIds" apps/web/src --type ts --type tsx -A 2 -B 1

Repository: pingdotgg/codething-mvp

Length of output: 298


🏁 Script executed:

#!/bin/bash
# Look for the ProviderSnapshot type definition to understand its structure
echo "=== Searching for ProviderSnapshot definition ==="
rg -n "type ProviderSnapshot|interface ProviderSnapshot" --type ts --type tsx -A 10

echo "=== Search in packages/contracts for shared types ==="
fd "contracts" . -type d

Repository: pingdotgg/codething-mvp

Length of output: 386


🏁 Script executed:

#!/bin/bash
# Look for the actual openStream implementation or API documentation
echo "=== Searching for openStream definition ==="
rg -n "openStream.*=" apps/ --type ts --type tsx -A 5

echo "=== Search for useSnapshot or snapshot-related hooks ==="
rg -n "useSnapshot|getSnapshot|applySnapshot" apps/web/src --type ts --type tsx -A 2

Repository: pingdotgg/codething-mvp

Length of output: 240


🏁 Script executed:

#!/bin/bash
# Search for openStream calls without tsx type
echo "=== Searching for openStream in apps/web/src ==="
rg -n "openStream" apps/web/src --glob "*.ts" --glob "*.tsx" -A 3 -B 1

echo ""
echo "=== Searching for providers.openStream ==="
rg -n "providers\.openStream" apps/web/src --glob "*.ts" --glob "*.tsx" -A 3 -B 1

echo ""
echo "=== Searching for sessionIds parameter ==="
rg -n "sessionIds" apps/web/src --glob "*.ts" --glob "*.tsx" -A 2 -B 1

Repository: pingdotgg/codething-mvp

Length of output: 2309


🏁 Script executed:

#!/bin/bash
# Find ProviderSnapshot type and understand structure
echo "=== Searching for ProviderSnapshot definition ==="
rg -n "ProviderSnapshot" --glob "*.ts" --glob "*.tsx" -A 5 | head -50

echo ""
echo "=== Search in packages/contracts ==="
find . -path "*/packages/contracts/*" -name "*.ts" 2>/dev/null | head -10

Repository: pingdotgg/codething-mvp

Length of output: 3438


🏁 Script executed:

#!/bin/bash
# Look at the store.ts file to understand the full context
echo "=== Reading store.ts to understand snapshot subscription ==="
wc -l apps/web/src/store.ts

echo ""
echo "=== Search for stream subscription/listener setup in store.ts ==="
rg -n "subscribe|listener|on\(" apps/web/src/store.ts -B 2 -A 2

Repository: pingdotgg/codething-mvp

Length of output: 411


🏁 Script executed:

#!/bin/bash
# Search for actual openStream calls to see how it's invoked
echo "=== Searching for openStream() calls with arguments ==="
rg -n "openStream\(" apps/web/src --glob "*.ts" --glob "*.tsx" -B 5 -A 5

echo ""
echo "=== Look for getSnapshot calls to understand snapshot behavior ==="
rg -n "getSnapshot" apps/server/src --glob "*.ts" -B 2 -A 5

Repository: pingdotgg/codething-mvp

Length of output: 6123


🏁 Script executed:

#!/bin/bash
# Examine the full context of snapshot application in store.ts
echo "=== Reading relevant section of store.ts to see where applySnapshotToThread is called ==="
sed -n '200,400p' apps/web/src/store.ts | cat -n

Repository: pingdotgg/codething-mvp

Length of output: 7438


🏁 Script executed:

#!/bin/bash
# Look at the providerStreamStore getSnapshot implementation more closely
echo "=== Reading getSnapshot implementation in providerStreamStore.ts ==="
sed -n '255,280p' apps/server/src/providerStreamStore.ts | cat -n

echo ""
echo "=== Search for isFromSessionFilter definition ==="
rg -n "isFromSessionFilter" apps/server/src --glob "*.ts" -B 2 -A 5

Repository: pingdotgg/codething-mvp

Length of output: 2616


🏁 Script executed:

#!/bin/bash
# Check if openStream is ever called with parameters
echo "=== Check how openStream is called in wsNativeApi ==="
sed -n '70,130p' apps/web/src/wsNativeApi.ts | cat -n

echo ""
echo "=== Search for any listener setups that might filter by sessionIds ==="
rg -n "providerStreamListeners|addProviderStreamListener" apps/web/src --glob "*.ts" --glob "*.tsx" -B 3 -A 3

Repository: pingdotgg/codething-mvp

Length of output: 5076


🏁 Script executed:

#!/bin/bash
# Look at the complete flow of snapshot handling in store reducer
echo "=== Search for where applySnapshotToThreads is called ==="
rg -n "applySnapshotToThreads" apps/web/src --glob "*.ts" --glob "*.tsx" -B 5 -A 5

echo ""
echo "=== Check the PROVIDER_SNAPSHOT action in reducer ==="
rg -n "PROVIDER_SNAPSHOT" apps/web/src/store.ts -B 2 -A 15

Repository: pingdotgg/codething-mvp

Length of output: 1306


🏁 Script executed:

#!/bin/bash
# Verify the test case mentioned in the search results
echo "=== Check the test for filtered snapshots ==="
sed -n '145,165p' apps/server/src/providerStreamStore.test.ts | cat -n

Repository: pingdotgg/codething-mvp

Length of output: 869


🏁 Script executed:

#!/bin/bash
# Verify the complete flow: check if there's any mechanism to set sessionIds in streamConfig
echo "=== Check where streamConfig is set or modified ==="
rg -n "streamConfig" apps/web/src/wsNativeApi.ts -B 2 -A 2

echo ""
echo "=== Look for any place that might pass sessionIds to openStream ==="
rg -n "openStream\(" apps/web/src --glob "*.ts" --glob "*.tsx" -B 10 | grep -A 10 "sessionIds"

Repository: pingdotgg/codething-mvp

Length of output: 772


🏁 Script executed:

#!/bin/bash
# Check if there's any defensive logic or safeguard for partial snapshots
echo "=== Look for any warnings or guards about partial snapshots ==="
rg -n "partial|filter|incomplete" apps/web/src/store.ts -i -B 2 -A 2

echo ""
echo "=== Check how ProviderStreamFrame is typed ==="
rg -n "ProviderStreamFrame" packages/contracts/src/providerStream.ts -A 10 | head -40

Repository: pingdotgg/codething-mvp

Length of output: 1841


🏁 Script executed:

#!/bin/bash
# Check the type definition for ProvidersOpenStreamInput to see what filtering options exist
echo "=== Check ProvidersOpenStreamInput schema ==="
rg -n "providersOpenStreamInputSchema" packages/contracts/src/providerStream.ts -A 15

echo ""
echo "=== Check ProvidersOpenStreamResult to understand what the client receives ==="
rg -n "providersOpenStreamResultSchema" packages/contracts/src/providerStream.ts -A 10

Repository: pingdotgg/codething-mvp

Length of output: 1777


🏁 Script executed:

#!/bin/bash
# Check if there are any comments or documentation about snapshot expectations
echo "=== Look for comments about snapshot behavior ==="
rg -n "snapshot|Snapshot" apps/web/src/store.ts -B 1 -A 1 | grep -i "comment\|//\|\/\*" | head -20

echo ""
echo "=== Check the full context where applySnapshotToThreads is called ==="
sed -n '460,475p' apps/web/src/store.ts | cat -n

Repository: pingdotgg/codething-mvp

Length of output: 823


Add a safeguard or document that snapshots must include all sessions.

The snapshot application assumes snapshots are complete. While openStream is currently called without sessionIds filtering, the API supports it (ProvidersOpenStreamInput line 247), and server-side filtering is implemented (providerStreamStore.ts lines 260–263). If openStream were called with sessionIds, the snapshot would be partial, and line 298 would silently clear threads whose sessions aren't in the filtered snapshot.

Either document the invariant that snapshots always include all sessions, or add a guard in applySnapshotToThreads to detect and handle partial snapshots defensively.

🤖 Prompt for AI Agents
In `@apps/web/src/store.ts` around lines 288 - 362, The current
applySnapshotToThread clears a thread when its session is missing from
ProviderSnapshot (making partial snapshots destructive); change this to detect
partial snapshots in applySnapshotToThreads by collecting snapshot.sessions'
sessionIds and, if any existing thread.session.sessionId is missing, treat the
snapshot as partial and skip applying it (return threads unchanged) or pass a
flag into applySnapshotToThread so it returns the original thread when
snapshotSession is undefined; update applySnapshotToThread to only reset/clear a
thread when snapshot is known-complete (snapshotSession exists) and/or log/debug
the partial-snapshot case so callers know filtering occurred (refer to
applySnapshotToThreads, applySnapshotToThread, ProviderSnapshot, and
session.sessionId).

Comment on lines +114 to +155
it("does not close the stream when open is in flight and a listener re-subscribes", async () => {
const openDeferred = createDeferred<ProvidersOpenStreamResult>();
transportState.openDeferredQueue.push(openDeferred);

const { createWsNativeApi } = await import("./wsNativeApi");
const api = createWsNativeApi();

const frames: unknown[] = [];
const unsubscribeA = api.providers.onStream((frame) => {
frames.push(frame);
});

expect(transportState.openRequests).toHaveLength(1);

unsubscribeA();
expect(transportState.closeRequests).toHaveLength(0);

const unsubscribeB = api.providers.onStream((frame) => {
frames.push(frame);
});
expect(transportState.openRequests).toHaveLength(1);

openDeferred.resolve({
mode: "snapshot",
currentSeq: 10,
oldestSeq: 1,
replayedCount: 0,
});
await flushMicrotasks();

expect(transportState.closeRequests).toHaveLength(0);

transportState.latest?.emitPush(WS_CHANNELS.providerStream, {
kind: "event",
seq: 11,
at: "2026-02-10T08:30:00.000Z",
data: {
type: "error",
code: "runtime/error",
message: "boom",
},
});

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Provider stream channel should be providers.event.
This test emits on WS_CHANNELS.providerStream, which maps to providers.stream, but the web app must consume provider events on providers.event. Please align the contracts/channel constant and update the test emit channel accordingly.

As per coding guidelines, apps/web/src/**/*.{ts,tsx}: Web app must consume provider event streams via WebSocket push on channel providers.event.

🤖 Prompt for AI Agents
In `@apps/web/src/wsNativeApi.test.ts` around lines 114 - 155, The test is
emitting on WS_CHANNELS.providerStream (which maps to providers.stream) but the
app expects provider events on the providers.event channel; change the emit call
in the test from WS_CHANNELS.providerStream to the correct channel constant
(e.g. WS_CHANNELS.providersEvent or WS_CHANNELS['providers.event']) in the
transportState.latest?.emitPush invocation and any other test usages, ensuring
the provider event payload remains unchanged and the test now pushes to the
providers.event channel that the web consumers use.

Comment on lines +155 to +178
transport.subscribe(WS_CHANNELS.providerStream, (data) => {
const parsed = providerStreamFrameSchema.safeParse(data);
if (!parsed.success) {
return;
}

const frame = parsed.data;

if (frame.kind === "snapshot" && streamOpening) {
lastAppliedSeq = frame.seq;
} else if (frame.seq <= lastAppliedSeq) {
return;
} else {
lastAppliedSeq = frame.seq;
}

for (const listener of providerStreamListeners) {
try {
listener(frame);
} catch {
// Swallow listener errors
}
}
});

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Provider stream channel should be providers.event.
This subscribes to WS_CHANNELS.providerStream (providers.stream), but the web app is required to consume provider events on providers.event. Please align the contracts/channel constant and update this subscription accordingly.

As per coding guidelines, apps/web/src/**/*.{ts,tsx}: Web app must consume provider event streams via WebSocket push on channel providers.event.

🤖 Prompt for AI Agents
In `@apps/web/src/wsNativeApi.ts` around lines 155 - 178, Update the WebSocket
subscription to use the providers event channel instead of the stream channel:
replace the use of WS_CHANNELS.providerStream in the transport.subscribe call
with the providers event channel (WS_CHANNELS.providersEvent or the literal
"providers.event" as defined in your WS_CHANNELS), leaving the rest of the
handler (providerStreamFrameSchema parsing, lastAppliedSeq/streamOpening logic,
and dispatch to providerStreamListeners) unchanged so the web app consumes
provider events on the correct channel.

Comment on lines 210 to +243
startSession: (input) => transport.request(WS_METHODS.providersStartSession, input),
sendTurn: (input) => transport.request(WS_METHODS.providersSendTurn, input),
interruptTurn: (input) => transport.request(WS_METHODS.providersInterruptTurn, input),
respondToRequest: (input) => transport.request(WS_METHODS.providersRespondToRequest, input),
respondToApproval: (input) =>
transport.request(WS_METHODS.providersRespondToApproval, input),
stopSession: (input) => transport.request(WS_METHODS.providersStopSession, input),
listSessions: () => transport.request(WS_METHODS.providersListSessions),
onEvent: (callback) =>
transport.subscribe(WS_CHANNELS.providerEvent, callback as (data: unknown) => void),
openStream: (input) => openStream(input),
closeStream: async () => {
pendingCloseAfterOpen = false;
streamOpen = false;
await transport.request(WS_METHODS.providersCloseStream);
},
onStream: (callback) => {
providerStreamListeners.add(callback);
pendingCloseAfterOpen = false;
ensureStreamOpen();

return () => {
providerStreamListeners.delete(callback);
if (providerStreamListeners.size === 0) {
if (streamOpenInFlight) {
pendingCloseAfterOpen = true;
return;
}

streamOpen = false;
pendingCloseAfterOpen = false;
void transport.request(WS_METHODS.providersCloseStream).catch(() => {
// Ignore close errors while transport reconnects.
});
}
};
},

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

closeStream can leave the stream open if an open is in flight.
If closeStream is called while streamOpenInFlight is pending, the open resolves afterward and can re-mark the stream as open, leaving it active without a subsequent close. Await the open before closing and reset streamOpen afterward.

🛠️ Proposed fix
       closeStream: async () => {
-        pendingCloseAfterOpen = false;
-        streamOpen = false;
-        await transport.request(WS_METHODS.providersCloseStream);
+        if (streamOpenInFlight) {
+          try {
+            await streamOpenInFlight;
+          } catch {
+            // Ignore open failures; still proceed to close.
+          }
+        }
+        pendingCloseAfterOpen = false;
+        streamOpen = false;
+        await transport.request(WS_METHODS.providersCloseStream);
       },
🤖 Prompt for AI Agents
In `@apps/web/src/wsNativeApi.ts` around lines 210 - 243, The closeStream
implementation can leave the stream open if streamOpenInFlight is pending;
modify closeStream (and its use of pendingCloseAfterOpen/streamOpen flags) to,
when streamOpenInFlight exists, await that promise before calling
transport.request(WS_METHODS.providersCloseStream), then set streamOpen = false
and pendingCloseAfterOpen = false after the close completes (and still
catch/ignore errors from transport.request). Ensure you reference and update the
same symbols: closeStream, streamOpenInFlight, streamOpen,
pendingCloseAfterOpen, openStream, and transport.request so the close always
happens after any in-flight open resolves and the flags are reset.

Comment on lines 120 to 124
ws.addEventListener("close", () => {
this.ws = null;
this.emitConnectionState("closed");
this.scheduleReconnect();
});

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

src/wsTransport.ts:120 "closed" can be delivered twice and can trigger re-entrant dispose() from listeners. Suggest clearing connectionListeners at the start of dispose() (before emitting) so "closed" isn’t double-delivered and re-entrant dispose() becomes a no-op.

Suggested change
ws.addEventListener("close", () => {
this.ws = null;
this.emitConnectionState("closed");
this.scheduleReconnect();
});
ws.addEventListener("close", () => {
if (this.disposed) return;
this.ws = null;
this.emitConnectionState("closed");
this.scheduleReconnect();
});

🚀 Want me to fix this? Reply ex: "fix it for me".

🤖 Prompt for AI
In file apps/web/src/wsTransport.ts around lines 120-124:

`"closed"` can be delivered twice and can trigger re-entrant `dispose()` from listeners. Suggest clearing `connectionListeners` at the start of `dispose()` (before emitting) so `"closed"` isn’t double-delivered and re-entrant `dispose()` becomes a no-op.

return true;
}

if (nowMs - subscription.overHighWaterSinceMs < BACKPRESSURE_HIGH_WATER_DURATION_MS) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 Medium

src/providerStreamSubscriptionManager.ts:282 The time-based backpressure grace period doesn't protect against synchronous bursts (like the replay loop in openStream). Since Date.now() barely advances during a tight loop, the buffer can grow unbounded. Consider adding an immediate buffer size check that returns false when bufferedAmount exceeds a stricter threshold (e.g., 2x the high-water mark), regardless of elapsed time.

🚀 Want me to fix this? Reply ex: "fix it for me".

🤖 Prompt for AI
In file apps/server/src/providerStreamSubscriptionManager.ts around line 282:

The time-based backpressure grace period doesn't protect against synchronous bursts (like the replay loop in `openStream`). Since `Date.now()` barely advances during a tight loop, the buffer can grow unbounded. Consider adding an immediate buffer size check that returns `false` when `bufferedAmount` exceeds a stricter threshold (e.g., 2x the high-water mark), regardless of elapsed time.

Comment on lines +163 to +165
if (frame.kind === "snapshot" && streamOpening) {
lastAppliedSeq = frame.seq;
} else if (frame.seq <= lastAppliedSeq) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 Medium

src/wsNativeApi.ts:163 Race condition: if the snapshot frame arrives after openStream resolves, streamOpening is already false and frame.seq <= lastAppliedSeq causes the snapshot to be discarded. Consider checking frame.kind === "snapshot" independently of streamOpening, or updating lastAppliedSeq only after the snapshot is processed.

-    if (frame.kind === "snapshot" && streamOpening) {
+    if (frame.kind === "snapshot") {
       lastAppliedSeq = frame.seq;
     } else if (frame.seq <= lastAppliedSeq) {

🚀 Want me to fix this? Reply ex: "fix it for me".

🤖 Prompt for AI
In file apps/web/src/wsNativeApi.ts around lines 163-165:

Race condition: if the snapshot frame arrives after `openStream` resolves, `streamOpening` is already `false` and `frame.seq <= lastAppliedSeq` causes the snapshot to be discarded. Consider checking `frame.kind === "snapshot"` independently of `streamOpening`, or updating `lastAppliedSeq` only after the snapshot is processed.

Comment on lines +324 to +325
this.removeSessionState(event.session.sessionId);
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 Medium

src/providerStreamStore.ts:324 Closed sessions are never removed from sessionsById, causing unbounded memory growth. Consider deleting the session from the map after calling removeSessionState.

Suggested change
this.removeSessionState(event.session.sessionId);
}
this.removeSessionState(event.session.sessionId);
this.sessionsById.delete(event.session.sessionId);

🚀 Want me to fix this? Reply ex: "fix it for me".

🤖 Prompt for AI
In file apps/server/src/providerStreamStore.ts around lines 324-325:

Closed sessions are never removed from `sessionsById`, causing unbounded memory growth. Consider deleting the session from the map after calling `removeSessionState`.

oldestSeq,
reason: "cursor_too_old",
};
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 Medium

src/providerStreamStore.ts:225 When replayLog is empty but currentSeq > 0, oldestSeq equals currentSeq. A client with afterSeq = currentSeq - 1 passes validation but gets empty events with ok: true. Consider returning cursor_too_old when the log is empty and afterSeq < currentSeq.

-    }
+    }
+
+    if (this.replayLog.length === 0 && afterSeq < currentSeq) {
+      return {
+        ok: false,
+        currentSeq,
+        oldestSeq,
+        reason: "cursor_too_old",
+      };
+    }

🚀 Want me to fix this? Reply ex: "fix it for me".

🤖 Prompt for AI
In file apps/server/src/providerStreamStore.ts around line 225:

When `replayLog` is empty but `currentSeq > 0`, `oldestSeq` equals `currentSeq`. A client with `afterSeq = currentSeq - 1` passes validation but gets empty events with `ok: true`. Consider returning `cursor_too_old` when the log is empty and `afterSeq < currentSeq`.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants