From a93fd1dc47983a8ea222f00ad49094248d815075 Mon Sep 17 00:00:00 2001 From: Eric P Green Date: Mon, 20 Apr 2026 19:21:00 +0300 Subject: [PATCH 01/11] Migrate SSE consumers onto layered client (PR 2) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Completes the SSE refactor by moving the three consumers onto the new layered API, extracting pure invalidator modules, wiring the cloud editor's JWT refresh, and dropping the singleton/compat shims from PR 1. Pure extractions (testable, injectable deps): - `runtime-client/invalidation/file-invalidators.ts` — file WRITE/DELETE handling, rill.yaml side effects, throttled `ListFiles` refetch. - `runtime-client/invalidation/resource-invalidators.ts` — per-kind dispatch for Connector, Source/Model, MetricsView, Explore, Canvas, and Component resources plus delete variants. - `web-admin/.../status/logs/log-store.ts` — ProjectLogsPage state machine (monotonic ids, ring buffer, filter-by-level + search). Consumer migrations: - `FileAndResourceWatcher` is a thin per-mount class over `SSEConnection` + `SSESubscriber` + optional `SSELifecycle`. The singleton export is gone; the Svelte component constructs the watcher synchronously so `setContext` runs before descendants read it. - `RuntimeTrafficLights` reads watcher state via the new `watcher-context.ts` key, with a safe fallback so it still renders outside a watcher provider. - `Conversation` uses `SSESubscriber` for typed message/error routing; untagged frames normalize to the "message" decoder per the SSE spec. Transport vs. server errors stay on separate channels. - `ProjectLogsPage` uses `SSEConnection` + typed `SSESubscriber<{ log, error }>` and delegates state to `LogStore`. Call-site updates: - `web-local/src/routes/+layout.svelte` → `lifecycle="aggressive"`. - `web-admin/.../edit/+layout.svelte` → `lifecycle="none"` plus an `onBeforeReconnect` hook that invalidates the branch-scoped `GetProject` query key so the next `runtimeClient.getJwt()` returns a fresh token. Also: - `SSEConnection.handleError` now fires `close` when `retryOnError` is false so one-shot consumers (chat) can settle cleanly on failure. - Deprecation shims removed: `sse-fetch-client.ts` and `sse-connection-manager.ts` at the old paths; auto-close methods and the `autoCloseTimeouts` param stripped from `SSEConnection`. - New specs: `file-invalidators.spec.ts`, `resource-invalidators.spec.ts`, `file-and-resource-watcher.spec.ts`, `RuntimeTrafficLights.spec.ts`, `conversation-streaming.spec.ts`, `log-store.spec.ts`. `web-admin/package.json` gains a `test:unit` script for the latter. --- web-admin/package.json | 1 + .../status/logs/ProjectLogsPage.svelte | 103 +-- .../projects/status/logs/log-store.spec.ts | 86 +++ .../projects/status/logs/log-store.ts | 53 ++ .../[project]/-/edit/+layout.svelte | 19 +- .../chat/core/conversation-streaming.spec.ts | 230 ++++++ .../src/features/chat/core/conversation.ts | 131 ++-- .../FileAndResourceWatcher.svelte | 91 +-- .../RuntimeTrafficLights.spec.ts | 31 + .../RuntimeTrafficLights.svelte | 14 +- .../__fixtures__/SlotPassthrough.svelte | 1 + .../file-and-resource-watcher.spec.ts | 227 ++++++ .../file-and-resource-watcher.ts | 682 +++++------------- .../entity-management/watcher-context.ts | 12 + .../invalidation/file-invalidators.spec.ts | 267 +++++++ .../invalidation/file-invalidators.ts | 132 ++++ .../resource-invalidators.spec.ts | 361 +++++++++ .../invalidation/resource-invalidators.ts | 415 +++++++++++ .../runtime-client/sse-connection-manager.ts | 12 - .../src/runtime-client/sse-fetch-client.ts | 10 - .../src/runtime-client/sse/sse-connection.ts | 67 +- web-local/src/routes/+layout.svelte | 2 +- 22 files changed, 2220 insertions(+), 727 deletions(-) create mode 100644 web-admin/src/features/projects/status/logs/log-store.spec.ts create mode 100644 web-admin/src/features/projects/status/logs/log-store.ts create mode 100644 web-common/src/features/chat/core/conversation-streaming.spec.ts create mode 100644 web-common/src/features/entity-management/RuntimeTrafficLights.spec.ts create mode 100644 web-common/src/features/entity-management/__fixtures__/SlotPassthrough.svelte create mode 100644 web-common/src/features/entity-management/file-and-resource-watcher.spec.ts create mode 100644 web-common/src/features/entity-management/watcher-context.ts create mode 100644 web-common/src/runtime-client/invalidation/file-invalidators.spec.ts create mode 100644 web-common/src/runtime-client/invalidation/file-invalidators.ts create mode 100644 web-common/src/runtime-client/invalidation/resource-invalidators.spec.ts create mode 100644 web-common/src/runtime-client/invalidation/resource-invalidators.ts delete mode 100644 web-common/src/runtime-client/sse-connection-manager.ts delete mode 100644 web-common/src/runtime-client/sse-fetch-client.ts diff --git a/web-admin/package.json b/web-admin/package.json index 9986c1a229a..136ac21d541 100644 --- a/web-admin/package.json +++ b/web-admin/package.json @@ -7,6 +7,7 @@ "build": "npm run generate:sveltekit -w web-common && vite build && cp _redirects build", "preview": "vite preview", "test": "playwright test", + "test:unit": "vitest run", "generate:sveltekit": "svelte-kit sync", "test:setup": "E2E_NO_TEARDOWN=true playwright test --project=setup", "test:dev": "E2E_NO_SETUP_OR_TEARDOWN=true playwright test --project=e2e", diff --git a/web-admin/src/features/projects/status/logs/ProjectLogsPage.svelte b/web-admin/src/features/projects/status/logs/ProjectLogsPage.svelte index 6d79a1b2d10..716650c6c8a 100644 --- a/web-admin/src/features/projects/status/logs/ProjectLogsPage.svelte +++ b/web-admin/src/features/projects/status/logs/ProjectLogsPage.svelte @@ -2,11 +2,16 @@ import { page } from "$app/stores"; import { onMount, onDestroy } from "svelte"; import { - SSEConnectionManager, ConnectionStatus, - } from "@rilldata/web-common/runtime-client/sse-connection-manager"; + SSEConnection, + SSESubscriber, + } from "@rilldata/web-common/runtime-client/sse"; import { useRuntimeClient } from "@rilldata/web-common/runtime-client/v2"; - import { V1LogLevel, type V1Log } from "@rilldata/web-common/runtime-client"; + import { + V1LogLevel, + type V1Log, + type V1WatchLogsResponse, + } from "@rilldata/web-common/runtime-client"; import Search from "@rilldata/web-common/components/search/Search.svelte"; import * as DropdownMenu from "@rilldata/web-common/components/dropdown-menu"; import CaretDownIcon from "@rilldata/web-common/components/icons/CaretDownIcon.svelte"; @@ -16,6 +21,7 @@ parseArrayParam, parseStringParam, } from "@rilldata/web-common/lib/url-filter-sync"; + import { LogStore, type LogEntry } from "./log-store"; const runtimeClient = useRuntimeClient(); @@ -29,9 +35,9 @@ { value: V1LogLevel.LOG_LEVEL_ERROR, label: "Error" }, ]; - type LogEntry = V1Log & { _id: number }; - let nextLogId = 0; - let logs: LogEntry[] = []; + const logStore = new LogStore(MAX_LOGS); + // Bumped on each addLog so Svelte reactivity re-runs filtering. + let logsVersion = 0; let logsContainer: HTMLDivElement; let connectionError: string | null = null; let filterDropdownOpen = false; @@ -57,29 +63,46 @@ filterSync.syncToUrl({ q: searchText, level: selectedLevels }); } - const logsConnection = new SSEConnectionManager({ + const logsConnection = new SSEConnection({ maxRetryAttempts: 5, retryOnError: true, retryOnClose: true, }); + // Typed decoder layer: the subscriber owns JSON.parse and the per-event-type + // routing so the component code stays in UI concerns. + const logsSubscriber = new SSESubscriber<{ + log: V1WatchLogsResponse; + error: { code?: string; message?: string }; + }>(logsConnection, { + log: (data) => JSON.parse(data) as V1WatchLogsResponse, + error: (data) => { + try { + return JSON.parse(data) as { code?: string; message?: string }; + } catch { + return { message: data }; + } + }, + }); + $: connectionStatus = logsConnection.status; $: isConnected = $connectionStatus === ConnectionStatus.OPEN; $: isConnecting = $connectionStatus === ConnectionStatus.CONNECTING; $: isClosed = $connectionStatus === ConnectionStatus.CLOSED; $: hasConnectionError = isClosed && connectionError !== null; - $: filteredLogs = logs.filter((log) => { - const matchesLevel = - selectedLevels.length === 0 || selectedLevels.includes(log.level ?? ""); - const matchesSearch = - !searchText || - (log.message?.toLowerCase().includes(searchText.toLowerCase()) ?? - false) || - (log.jsonPayload?.toLowerCase().includes(searchText.toLowerCase()) ?? - false); - return matchesLevel && matchesSearch; - }); + let filteredLogs: LogEntry[] = []; + let totalLogs = 0; + $: { + // Depend on logsVersion so Svelte re-runs when new logs arrive; the + // reference below keeps the compiler from treating it as unused. + void logsVersion; + filteredLogs = logStore.getFiltered({ + levels: selectedLevels, + search: searchText, + }); + totalLogs = logStore.getAll().length; + } $: selectedLevelLabel = (() => { if (selectedLevels.length === 0) return "All levels"; @@ -105,8 +128,9 @@ const url = `${host}/v1/instances/${instanceId}/sse?events=log&logs_replay=true&logs_replay_limit=${REPLAY_LIMIT}`; unsubs = [ - logsConnection.on("message", handleMessage), - logsConnection.on("error", handleError), + logsSubscriber.on("log", handleLogMessage), + logsSubscriber.on("error", handleServerError), + logsConnection.on("error", handleTransportError), logsConnection.on("open", handleOpen), ]; @@ -117,6 +141,7 @@ onDestroy(() => { unsubs.forEach((fn) => fn()); + logsSubscriber.cleanup(); logsConnection.close(true); }); @@ -124,29 +149,25 @@ return el.scrollHeight - el.scrollTop - el.clientHeight <= threshold; } - function handleMessage(message: { data: string; type?: string }) { - try { - if (message.type && message.type !== "log") return; - - const response = JSON.parse(message.data); - const log = response.log as V1Log; - if (log) { - logs = [...logs, { ...log, _id: nextLogId++ }].slice(-MAX_LOGS); - - if (logsContainer && isNearBottom(logsContainer)) { - requestAnimationFrame(() => { - logsContainer.scrollTop = logsContainer.scrollHeight; - }); - } - } - } catch (e) { - if (import.meta.env.DEV) { - console.warn("Failed to parse log message:", e); - } + function handleLogMessage(response: V1WatchLogsResponse) { + const log = response.log as V1Log | undefined; + if (!log) return; + + logStore.addLog(log); + logsVersion++; + + if (logsContainer && isNearBottom(logsContainer)) { + requestAnimationFrame(() => { + logsContainer.scrollTop = logsContainer.scrollHeight; + }); } } - function handleError(error: Error) { + function handleServerError(payload: { code?: string; message?: string }) { + connectionError = payload.message || payload.code || "Server error"; + } + + function handleTransportError(error: Error) { console.error("Logs SSE error:", error); connectionError = error.message || "Connection failed"; } @@ -300,7 +321,7 @@ Connection failed: {connectionError} - {:else if logs.length === 0} + {:else if totalLogs === 0}
Waiting for logs...
{:else if filteredLogs.length === 0}
No logs match the current filters
diff --git a/web-admin/src/features/projects/status/logs/log-store.spec.ts b/web-admin/src/features/projects/status/logs/log-store.spec.ts new file mode 100644 index 00000000000..0a4cfc64de1 --- /dev/null +++ b/web-admin/src/features/projects/status/logs/log-store.spec.ts @@ -0,0 +1,86 @@ +import { V1LogLevel } from "@rilldata/web-common/runtime-client"; +import { describe, expect, it } from "vitest"; +import { LogStore } from "./log-store"; + +describe("LogStore", () => { + it("addLog assigns a monotonic _id counter", () => { + const store = new LogStore(100); + const a = store.addLog({ message: "a" }); + const b = store.addLog({ message: "b" }); + const c = store.addLog({ message: "c" }); + expect(a._id).toBe(0); + expect(b._id).toBe(1); + expect(c._id).toBe(2); + }); + + it("drops the oldest entry once maxLogs is exceeded (ring buffer)", () => { + const store = new LogStore(3); + store.addLog({ message: "a" }); + store.addLog({ message: "b" }); + store.addLog({ message: "c" }); + store.addLog({ message: "d" }); + const messages = store.getAll().map((e) => e.message); + expect(messages).toEqual(["b", "c", "d"]); + }); + + it("continues to assign fresh _ids even after buffer wraps", () => { + const store = new LogStore(2); + store.addLog({ message: "a" }); + store.addLog({ message: "b" }); + const third = store.addLog({ message: "c" }); + expect(third._id).toBe(2); + expect(store.getAll().map((e) => e._id)).toEqual([1, 2]); + }); + + it("filters by level", () => { + const store = new LogStore(100); + store.addLog({ message: "a", level: V1LogLevel.LOG_LEVEL_INFO }); + store.addLog({ message: "b", level: V1LogLevel.LOG_LEVEL_ERROR }); + store.addLog({ message: "c", level: V1LogLevel.LOG_LEVEL_WARN }); + + const errors = store.getFiltered({ + levels: [V1LogLevel.LOG_LEVEL_ERROR], + }); + expect(errors.map((e) => e.message)).toEqual(["b"]); + }); + + it("search matches message and jsonPayload case-insensitively", () => { + const store = new LogStore(100); + store.addLog({ message: "Fooooo", jsonPayload: "{}" }); + store.addLog({ message: "bar", jsonPayload: '{"reason":"foobar"}' }); + store.addLog({ message: "baz", jsonPayload: "{}" }); + + const hits = store.getFiltered({ search: "FOO" }); + expect(hits.map((e) => e.message)).toEqual(["Fooooo", "bar"]); + }); + + it("combines level and search as an intersection", () => { + const store = new LogStore(100); + store.addLog({ + message: "boot", + level: V1LogLevel.LOG_LEVEL_ERROR, + }); + store.addLog({ + message: "boot", + level: V1LogLevel.LOG_LEVEL_INFO, + }); + store.addLog({ + message: "crash", + level: V1LogLevel.LOG_LEVEL_ERROR, + }); + + const hits = store.getFiltered({ + levels: [V1LogLevel.LOG_LEVEL_ERROR], + search: "boot", + }); + expect(hits.map((e) => e.message)).toEqual(["boot"]); + }); + + it("returns everything when filters are empty", () => { + const store = new LogStore(100); + store.addLog({ message: "a" }); + store.addLog({ message: "b" }); + expect(store.getFiltered({}).map((e) => e.message)).toEqual(["a", "b"]); + expect(store.getFiltered().map((e) => e.message)).toEqual(["a", "b"]); + }); +}); diff --git a/web-admin/src/features/projects/status/logs/log-store.ts b/web-admin/src/features/projects/status/logs/log-store.ts new file mode 100644 index 00000000000..45b308aa07e --- /dev/null +++ b/web-admin/src/features/projects/status/logs/log-store.ts @@ -0,0 +1,53 @@ +import type { V1Log } from "@rilldata/web-common/runtime-client"; + +export type LogEntry = V1Log & { _id: number }; + +export interface LogFilters { + levels?: string[]; + search?: string; +} + +/** + * In-memory log buffer for ProjectLogsPage. Keeps the state machine out of + * the Svelte component so the filtering + ring-buffer behavior can be + * tested without a DOM. + */ +export class LogStore { + private readonly entries: LogEntry[] = []; + private nextId = 0; + + constructor(public readonly maxLogs: number) {} + + public addLog(log: V1Log): LogEntry { + const entry: LogEntry = { ...log, _id: this.nextId++ }; + this.entries.push(entry); + if (this.entries.length > this.maxLogs) { + this.entries.shift(); + } + return entry; + } + + public getAll(): LogEntry[] { + return [...this.entries]; + } + + public getFiltered(filters: LogFilters = {}): LogEntry[] { + const { levels = [], search = "" } = filters; + const needle = search.toLowerCase(); + return this.entries.filter((log) => { + const matchesLevel = + levels.length === 0 || levels.includes(log.level ?? ""); + if (!matchesLevel) return false; + + if (!needle) return true; + return ( + (log.message?.toLowerCase().includes(needle) ?? false) || + (log.jsonPayload?.toLowerCase().includes(needle) ?? false) + ); + }); + } + + public clear(): void { + this.entries.length = 0; + } +} diff --git a/web-admin/src/routes/[organization]/[project]/-/edit/+layout.svelte b/web-admin/src/routes/[organization]/[project]/-/edit/+layout.svelte index 03c58788ca0..08c8a1bfe65 100644 --- a/web-admin/src/routes/[organization]/[project]/-/edit/+layout.svelte +++ b/web-admin/src/routes/[organization]/[project]/-/edit/+layout.svelte @@ -3,9 +3,11 @@ import { createAdminServiceGetCurrentUser, createAdminServiceGetProject, + getAdminServiceGetProjectQueryKey, V1DeploymentStatus, type V1Organization, } from "@rilldata/web-admin/client"; + import { useQueryClient } from "@tanstack/svelte-query"; import { branchPathPrefix, extractBranchFromPath, @@ -121,6 +123,20 @@ $: projectUrl = `/${organization}/${project}`; $: branchUrl = `/${organization}/${project}${branchPathPrefix(branch)}`; + // Invalidating this query refetches a fresh JWT; `runtimeClient.getJwt()` + // reads the updated value on the next call. Branch must be part of the + // key or the invalidation misses the branch-scoped cache entry. + const queryClient = useQueryClient(); + $: onBeforeReconnect = async () => { + await queryClient.invalidateQueries({ + queryKey: getAdminServiceGetProjectQueryKey( + organization, + project, + branch ? { branch } : undefined, + ), + }); + }; + onDestroy(() => { $editorRoutePrefix = ""; }); @@ -172,7 +188,8 @@
diff --git a/web-common/src/features/chat/core/conversation-streaming.spec.ts b/web-common/src/features/chat/core/conversation-streaming.spec.ts new file mode 100644 index 00000000000..bf4078d59f9 --- /dev/null +++ b/web-common/src/features/chat/core/conversation-streaming.spec.ts @@ -0,0 +1,230 @@ +import { queryClient } from "@rilldata/web-common/lib/svelte-query/globalQueryClient"; +import { + getRuntimeServiceGetConversationQueryKey, + type V1GetConversationResponse, +} from "@rilldata/web-common/runtime-client"; +import type { RuntimeClient } from "@rilldata/web-common/runtime-client/v2"; +import { get } from "svelte/store"; +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; + +// ============================================================================= +// MOCKS +// ============================================================================= + +class FakeSSEConnection { + private readonly handlers = new Map void>>(); + public url = ""; + public options: unknown; + + public start = vi.fn((url: string, opts: unknown) => { + this.url = url; + this.options = opts; + }); + public close = vi.fn(); + public pause = vi.fn(); + public heartbeat = vi.fn(); + + public on = (event: string, listener: (arg?: unknown) => void) => { + if (!this.handlers.has(event)) this.handlers.set(event, new Set()); + this.handlers.get(event)!.add(listener); + return () => this.handlers.get(event)!.delete(listener); + }; + public once = this.on; + + public fire(event: string, arg?: unknown) { + this.handlers.get(event)?.forEach((h) => h(arg)); + } +} + +class FakeSSESubscriber { + public readonly decoders: Record unknown>; + private readonly handlers = new Map void>>(); + + constructor( + _connection: unknown, + decoders: Record unknown>, + ) { + this.decoders = decoders; + } + + public on = (event: string, listener: (arg?: unknown) => void) => { + if (!this.handlers.has(event)) this.handlers.set(event, new Set()); + this.handlers.get(event)!.add(listener); + return () => this.handlers.get(event)!.delete(listener); + }; + public once = this.on; + public cleanup = vi.fn(); + + /** Simulate a typed event arriving from the transport. */ + public deliver(event: string, payload: unknown) { + this.handlers.get(event)?.forEach((h) => h(payload)); + } +} + +const fakeConnections: FakeSSEConnection[] = []; +const fakeSubscribers: FakeSSESubscriber[] = []; + +vi.mock("@rilldata/web-common/runtime-client/sse", () => { + // Defined inside the factory so vi.mock's hoisting doesn't hit a TDZ when + // resolving this module graph. + class FakeSSEHttpError extends Error { + constructor( + public readonly status: number, + public readonly statusText: string, + ) { + super(`HTTP ${status}: ${statusText}`); + this.name = "SSEHttpError"; + } + } + + return { + SSEConnection: class { + constructor() { + const c = new FakeSSEConnection(); + fakeConnections.push(c); + return c as unknown as object; + } + }, + SSESubscriber: class { + constructor( + connection: unknown, + decoders: Record unknown>, + ) { + const s = new FakeSSESubscriber(connection, decoders); + fakeSubscribers.push(s); + return s as unknown as object; + } + }, + SSEHttpError: FakeSSEHttpError, + ConnectionStatus: { + CONNECTING: "connecting", + OPEN: "open", + PAUSED: "paused", + CLOSED: "closed", + }, + }; +}); + +import { SSEHttpError as MockedSSEHttpError } from "@rilldata/web-common/runtime-client/sse"; + +import { Conversation } from "./conversation"; + +const INSTANCE_ID = "test-instance"; + +const mockRuntimeClient = { + host: "http://localhost:9009", + instanceId: INSTANCE_ID, + getJwt: () => undefined, +} as unknown as RuntimeClient; + +function latestConnection() { + return fakeConnections[fakeConnections.length - 1]; +} +function latestSubscriber() { + return fakeSubscribers[fakeSubscribers.length - 1]; +} + +function getCachedData(conversationId: string) { + return queryClient.getQueryData( + getRuntimeServiceGetConversationQueryKey(INSTANCE_ID, { + conversationId, + }), + ); +} + +describe("Conversation streaming", () => { + beforeEach(() => { + queryClient.clear(); + fakeConnections.length = 0; + fakeSubscribers.length = 0; + }); + + afterEach(() => { + queryClient.clear(); + }); + + it("routes untagged frames through the 'message' decoder and updates the message cache", async () => { + queryClient.setQueryData( + getRuntimeServiceGetConversationQueryKey(INSTANCE_ID, { + conversationId: "conv-1", + }), + { conversation: { id: "conv-1" }, messages: [], isOwner: true }, + ); + + const conversation = new Conversation(mockRuntimeClient, "conv-1"); + conversation.draftMessage.set("hi"); + + let emittedMessage: unknown; + conversation.on("message", (msg) => { + emittedMessage = msg; + }); + + const sendPromise = conversation.sendMessage({}).catch(() => {}); + // Let startStreaming register its listeners. + await Promise.resolve(); + await Promise.resolve(); + + const subscriber = latestSubscriber(); + expect(subscriber).toBeDefined(); + // Untagged frames would route through the "message" decoder — here we + // exercise the decoder directly by delivering the decoded payload. + subscriber.deliver("message", { + conversationId: "conv-1", + message: { + id: "assistant-msg-1", + role: "assistant", + contentData: "hello from the server", + }, + }); + + latestConnection().fire("close"); + await sendPromise; + + expect(emittedMessage).toMatchObject({ id: "assistant-msg-1" }); + + // Message was added to the cache. + const cached = getCachedData("conv-1"); + const cachedIds = (cached?.messages ?? []).map((m) => m.id); + expect(cachedIds).toContain("assistant-msg-1"); + }); + + it("surfaces typed 'error' frames through streamError", async () => { + const conversation = new Conversation(mockRuntimeClient, "conv-1"); + conversation.draftMessage.set("hi"); + + const sendPromise = conversation.sendMessage({}).catch(() => {}); + await Promise.resolve(); + await Promise.resolve(); + + const subscriber = latestSubscriber(); + subscriber.deliver("error", { code: "internal", error: "boom" }); + + latestConnection().fire("close"); + await sendPromise; + + expect(get(conversation.streamError)).toBe("boom"); + }); + + it("surfaces transport errors (SSEHttpError) through streamError", async () => { + const conversation = new Conversation(mockRuntimeClient, "conv-1"); + conversation.draftMessage.set("hi"); + + const sendPromise = conversation.sendMessage({}).catch(() => {}); + await Promise.resolve(); + await Promise.resolve(); + + const conn = latestConnection(); + conn.fire( + "error", + new (MockedSSEHttpError as unknown as new ( + status: number, + statusText: string, + ) => Error)(401, "Unauthorized"), + ); + conn.fire("close"); + await sendPromise; + + const err = get(conversation.streamError); + expect(err).toMatch(/Authentication failed/); + }); +}); diff --git a/web-common/src/features/chat/core/conversation.ts b/web-common/src/features/chat/core/conversation.ts index 37091642871..537b1ae9d6b 100644 --- a/web-common/src/features/chat/core/conversation.ts +++ b/web-common/src/features/chat/core/conversation.ts @@ -13,10 +13,10 @@ import { import type { ConnectError } from "@connectrpc/connect"; import type { RuntimeClient } from "@rilldata/web-common/runtime-client/v2"; import { - SSEFetchClient, + SSEConnection, SSEHttpError, - type SSEMessage, -} from "@rilldata/web-common/runtime-client/sse-fetch-client"; + SSESubscriber, +} from "@rilldata/web-common/runtime-client/sse"; import { createQuery, type CreateQueryResult } from "@tanstack/svelte-query"; import { derived, @@ -47,6 +47,21 @@ type ConversationEvents = { error: string; }; +/** + * Application-level error sent by the server over SSE as `event: error`. + */ +interface ChatServerError { + code?: string; + error?: string; +} + +type ChatEventMap = { + message: V1CompleteStreamingResponse; + error: ChatServerError; +}; + +type ChatSubscriber = SSESubscriber; + /** * Individual conversation state management. * @@ -69,7 +84,8 @@ export class Conversation { ) as typeof this.events.once; // Private state - private sseClient: SSEFetchClient | null = null; + private connection: SSEConnection | null = null; + private subscriber: ChatSubscriber | null = null; private hasReceivedFirstMessage = false; private readonly conversationQuery: CreateQueryResult< V1GetConversationResponse, @@ -297,9 +313,7 @@ export class Conversation { * Cancel the current streaming session (user-initiated) */ public cancelStream(): void { - if (this.sseClient) { - this.sseClient.stop(); - } + this.connection?.close(); this.isStreaming.set(false); this.streamError.set(null); } @@ -308,13 +322,15 @@ export class Conversation { * Clean up all resources when conversation is no longer needed (lifecycle cleanup) */ public cleanup(): void { - // Cancel any active streaming first this.cancelStream(); - // Full resource cleanup - if (this.sseClient) { - this.sseClient.cleanup(); - this.sseClient = null; + if (this.subscriber) { + this.subscriber.cleanup(); + this.subscriber = null; + } + if (this.connection) { + this.connection.close(true); + this.connection = null; } this.events.clearListeners(); @@ -338,8 +354,8 @@ export class Conversation { comment?: string; }; }): Promise { - this.ensureSSEClient(); - this.sseClient!.stop(); + this.ensureSubscriber(); + this.connection!.close(); const baseUrl = `${this.client.host}/v1/instances/${this.instanceId}/ai/complete/stream?stream=messages`; @@ -357,47 +373,69 @@ export class Conversation { ...request.context, }; - await this.sseClient!.start(baseUrl, { + this.connection!.start(baseUrl, { method: "POST", body: requestBody, getJwt: () => this.client.getJwt(), }); + + // Resolve once the stream closes, matching the old await-on-fetch-client + // behavior so callers keep their sequential control flow. + await new Promise((resolve) => { + const offClose = this.connection!.once("close", () => { + offClose(); + resolve(); + }); + }); } /** - * Ensure SSE client is initialized with event handlers. + * Ensure the SSE connection + typed subscriber are initialized. + * + * The admin AI endpoint emits success frames untagged (SSESubscriber + * normalizes `type === undefined` → "message") and failures as + * `event: error`. Both routes through typed decoders here, so this + * method no longer needs to parse JSON itself. */ - private ensureSSEClient(): void { - if (this.sseClient) return; - - this.sseClient = new SSEFetchClient(); + private ensureSubscriber(): void { + if (this.subscriber) return; + + // Chat is a one-shot stream: no retry. SSELifecycle is also not + // attached — the server ends the stream when the response is complete, + // and any cancellation is user-initiated. + this.connection = new SSEConnection(); + + this.subscriber = new SSESubscriber(this.connection, { + message: (data) => JSON.parse(data) as V1CompleteStreamingResponse, + error: (data) => { + try { + return JSON.parse(data) as ChatServerError; + } catch { + return { error: data }; + } + }, + }); - // Set up SSE event handlers - this.sseClient.on("message", (message: SSEMessage) => { - // Mark that we've received data - // Since server always emits user message first (after persisting), - // receiving any message means the server has persisted our message + this.subscriber.on("message", (response) => { + // Receiving any message means the server has persisted our user + // message (it echoes it back first). this.hasReceivedFirstMessage = true; - - // Handle application-level errors sent via SSE - if (message.type === "error") { - this.handleServerError(message.data); - return; - } - - // Handle normal streaming data try { - const response: V1CompleteStreamingResponse = JSON.parse(message.data); this.processStreamingResponse(response); if (response.message) this.events.emit("message", response.message); } catch (error) { - console.error("Failed to parse streaming response:", error); + console.error("Failed to process streaming response:", error); this.streamError.set("Failed to process server response"); } }); - this.sseClient.on("error", (error) => { - // Transport errors only: connection, network, HTTP failures + this.subscriber.on("error", (payload) => { + this.hasReceivedFirstMessage = true; + this.handleServerError(payload); + }); + + this.connection.on("error", (error) => { + // Transport errors: connection, network, HTTP failures. console.error("[SSE] Transport error:", { message: error.message, status: error instanceof SSEHttpError ? error.status : undefined, @@ -407,10 +445,6 @@ export class Conversation { }); this.streamError.set(this.formatTransportError(error)); }); - - this.sseClient.on("close", () => { - // Stream closed - completion handled in caller - }); } // ----- Business Logic Layer: Message Processing ----- @@ -636,15 +670,10 @@ export class Conversation { // ----- Server Errors (Application-level) ----- /** - * Format server error data into user-friendly message + * Format a typed server-side error payload into a user-facing string. */ - private formatServerError(errorData: string): string { - try { - const parsed = JSON.parse(errorData); - return parsed.error || "Server error occurred"; - } catch { - return `Server error: ${errorData}`; - } + private formatServerError(payload: ChatServerError): string { + return payload.error || "Server error occurred"; } /** @@ -654,8 +683,8 @@ export class Conversation { * AFTER the server has already persisted the user's message. No rollback is needed - * the user's message should remain visible in the conversation with an error indicator. */ - private handleServerError(errorData: string): void { - this.streamError.set(this.formatServerError(errorData)); + private handleServerError(payload: ChatServerError): void { + this.streamError.set(this.formatServerError(payload)); } // ----- Transport Errors (Connection-level) ----- diff --git a/web-common/src/features/entity-management/FileAndResourceWatcher.svelte b/web-common/src/features/entity-management/FileAndResourceWatcher.svelte index aee1b3e3589..8aaafaad166 100644 --- a/web-common/src/features/entity-management/FileAndResourceWatcher.svelte +++ b/web-common/src/features/entity-management/FileAndResourceWatcher.svelte @@ -1,73 +1,60 @@ - { - if (!keepAlive) scheduleAutoClose(); - }} - onclick={heartbeat} - onkeydown={heartbeat} - onfocus={heartbeat} -/> - -{#if status === ConnectionStatus.CLOSED} +{#if $status === ConnectionStatus.CLOSED} { + const Stub = (await import("./__fixtures__/SlotPassthrough.svelte")).default; + return { + Root: Stub, + Trigger: Stub, + Content: Stub, + }; +}); +vi.mock( + "@rilldata/web-common/components/tooltip/TooltipContent.svelte", + async () => ({ + default: (await import("./__fixtures__/SlotPassthrough.svelte")).default, + }), +); + +import RuntimeTrafficLights from "./RuntimeTrafficLights.svelte"; + +describe("RuntimeTrafficLights", () => { + it("renders without throwing when no watcher context is provided", () => { + // The component must not throw when mounted outside a + // . Its fallback is a static store set to + // CLOSED, so the tooltip content still renders in the accessible tree. + expect(() => render(RuntimeTrafficLights)).not.toThrow(); + }); +}); diff --git a/web-common/src/features/entity-management/RuntimeTrafficLights.svelte b/web-common/src/features/entity-management/RuntimeTrafficLights.svelte index 4b3369d842e..e97d3e842da 100644 --- a/web-common/src/features/entity-management/RuntimeTrafficLights.svelte +++ b/web-common/src/features/entity-management/RuntimeTrafficLights.svelte @@ -1,10 +1,18 @@