diff --git a/app/api/chat/[chatId]/stream/route.ts b/app/api/chat/[chatId]/stream/route.ts new file mode 100644 index 000000000..9754cfc97 --- /dev/null +++ b/app/api/chat/[chatId]/stream/route.ts @@ -0,0 +1,41 @@ +import type { NextRequest } from "next/server"; +import { NextResponse } from "next/server"; +import { getCorsHeaders } from "@/lib/networking/getCorsHeaders"; +import { handleChatStreamResume } from "@/lib/chat/handleChatStreamResume"; + +export const maxDuration = 800; + +/** + * OPTIONS handler for CORS preflight requests. + * + * @returns A NextResponse with CORS headers. + */ +export async function OPTIONS() { + return new NextResponse(null, { + status: 200, + headers: getCorsHeaders(), + }); +} + +/** + * GET /api/chat/[chatId]/stream + * + * Resume/reconnect endpoint for the durable chat-workflow agent loop. Returns + * the live UIMessage stream when a run is still in flight, or 204 when there + * is nothing to resume. This is the only resume path — POST /api/chat/workflow + * never resumes. + * + * Authentication: x-api-key or Authorization bearer (caller must own the chat). + * + * @param request - The incoming request. + * @param context - Next.js route context. + * @param context.params - Async route params containing the chat id. + * @returns A streaming Response (200), an empty 204, or an error response. + */ +export async function GET( + request: NextRequest, + context: { params: Promise<{ chatId: string }> }, +): Promise { + const { chatId } = await context.params; + return handleChatStreamResume(request, chatId); +} diff --git a/lib/chat/__tests__/handleChatStreamResume.test.ts b/lib/chat/__tests__/handleChatStreamResume.test.ts new file mode 100644 index 000000000..1b99f1a96 --- /dev/null +++ b/lib/chat/__tests__/handleChatStreamResume.test.ts @@ -0,0 +1,80 @@ +import { describe, it, expect, vi, beforeEach } from "vitest"; +import { NextRequest, NextResponse } from "next/server"; + +import { handleChatStreamResume } from "@/lib/chat/handleChatStreamResume"; +import { validateGetChatStreamRequest } from "@/lib/chat/validateGetChatStreamRequest"; +import { reconcileExistingActiveStream } from "@/lib/chat/reconcileExistingActiveStream"; + +vi.mock("@/lib/chat/validateGetChatStreamRequest", () => ({ + validateGetChatStreamRequest: vi.fn(), +})); +vi.mock("@/lib/chat/reconcileExistingActiveStream", () => ({ + reconcileExistingActiveStream: vi.fn(), +})); +vi.mock("@/lib/networking/getCorsHeaders", () => ({ + getCorsHeaders: vi.fn(() => ({ "Access-Control-Allow-Origin": "*" })), +})); + +const ACCOUNT_ID = "aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa"; +const CHAT_ID = "11111111-1111-1111-1111-111111111111"; + +function makeRequest(): NextRequest { + return new NextRequest(`http://localhost/api/chat/${CHAT_ID}/stream`, { + method: "GET", + headers: { "x-api-key": "test-key" }, + }); +} + +function mockValidatedChat(activeStreamId: string | null) { + vi.mocked(validateGetChatStreamRequest).mockResolvedValue({ + chat: { id: CHAT_ID, active_stream_id: activeStreamId } as never, + accountId: ACCOUNT_ID, + }); +} + +beforeEach(() => vi.clearAllMocks()); + +describe("handleChatStreamResume", () => { + it("passes through the validator's error response", async () => { + vi.mocked(validateGetChatStreamRequest).mockResolvedValue( + NextResponse.json({ status: "error", error: "Forbidden" }, { status: 403 }), + ); + const res = await handleChatStreamResume(makeRequest(), CHAT_ID); + expect(res.status).toBe(403); + expect(reconcileExistingActiveStream).not.toHaveBeenCalled(); + }); + + it("returns 204 when the chat has no active stream id", async () => { + mockValidatedChat(null); + const res = await handleChatStreamResume(makeRequest(), CHAT_ID); + expect(res.status).toBe(204); + expect(reconcileExistingActiveStream).not.toHaveBeenCalled(); + }); + + it("returns 200 with the live stream and x-workflow-run-id when resumable", async () => { + mockValidatedChat("wrun_live"); + vi.mocked(reconcileExistingActiveStream).mockResolvedValue({ + action: "resume", + runId: "wrun_live", + stream: new ReadableStream(), + }); + const res = await handleChatStreamResume(makeRequest(), CHAT_ID); + expect(res.status).toBe(200); + expect(res.headers.get("content-type") ?? "").toMatch(/text\/event-stream/); + expect(res.headers.get("x-workflow-run-id")).toBe("wrun_live"); + }); + + it("returns 204 when the run is terminal (reconcile=ready, stale id cleared)", async () => { + mockValidatedChat("wrun_done"); + vi.mocked(reconcileExistingActiveStream).mockResolvedValue({ action: "ready" }); + const res = await handleChatStreamResume(makeRequest(), CHAT_ID); + expect(res.status).toBe(204); + }); + + it("returns 204 when the probe is indeterminate (reconcile=conflict)", async () => { + mockValidatedChat("wrun_unknown"); + vi.mocked(reconcileExistingActiveStream).mockResolvedValue({ action: "conflict" }); + const res = await handleChatStreamResume(makeRequest(), CHAT_ID); + expect(res.status).toBe(204); + }); +}); diff --git a/lib/chat/__tests__/handleChatWorkflowStream.test.ts b/lib/chat/__tests__/handleChatWorkflowStream.test.ts index 1af062c8b..f42c7a19e 100644 --- a/lib/chat/__tests__/handleChatWorkflowStream.test.ts +++ b/lib/chat/__tests__/handleChatWorkflowStream.test.ts @@ -8,7 +8,7 @@ import { selectChats } from "@/lib/supabase/chats/selectChats"; import { isSandboxActive } from "@/lib/sandbox/isSandboxActive"; import { updateSession } from "@/lib/supabase/sessions/updateSession"; import { compareAndSetChatActiveStreamId } from "@/lib/chat/compareAndSetChatActiveStreamId"; -import { maybeResumeChatStream } from "@/lib/chat/maybeResumeChatStream"; +import { reconcileExistingActiveStream } from "@/lib/chat/reconcileExistingActiveStream"; import { persistLatestUserMessage } from "@/lib/chat/persistLatestUserMessage"; import { start, getRun } from "workflow/api"; @@ -23,8 +23,8 @@ vi.mock("@/lib/supabase/sessions/updateSession", () => ({ updateSession: vi.fn() vi.mock("@/lib/sandbox/buildActiveLifecycleUpdate", () => ({ buildActiveLifecycleUpdate: vi.fn(() => ({})), })); -vi.mock("@/lib/chat/maybeResumeChatStream", () => ({ - maybeResumeChatStream: vi.fn(), +vi.mock("@/lib/chat/reconcileExistingActiveStream", () => ({ + reconcileExistingActiveStream: vi.fn(), })); vi.mock("@/lib/chat/persistLatestUserMessage", () => ({ persistLatestUserMessage: vi.fn(), @@ -109,8 +109,9 @@ function mockStartedRun(runId = "wrun_test_run_1") { beforeEach(() => { vi.clearAllMocks(); - // Default: maybeResumeChatStream returns null (no resume / no active stream) - vi.mocked(maybeResumeChatStream).mockResolvedValue(null); + // Default: a stale/terminal active stream that self-heals to "ready" so the + // POST proceeds. Only invoked when the chat has an active_stream_id. + vi.mocked(reconcileExistingActiveStream).mockResolvedValue({ action: "ready" }); }); describe("handleChatWorkflowStream", () => { @@ -166,31 +167,41 @@ describe("handleChatWorkflowStream", () => { }); }); - describe("resume / conflict via maybeResumeChatStream", () => { + describe("active stream handling (start-only; never resumes)", () => { beforeEach(() => { mockValidated(); mockSessionOwnedActive(); mockChatOwned({ active_stream_id: "wrun_existing" }); }); - it("returns the resume response when maybeResumeChatStream yields one", async () => { - const resumeResponse = new Response("ok", { - status: 200, - headers: { "x-workflow-run-id": "wrun_existing" }, + it("returns 409 without starting when a run is live (reconcile=resume)", async () => { + vi.mocked(reconcileExistingActiveStream).mockResolvedValue({ + action: "resume", + runId: "wrun_existing", + stream: new ReadableStream(), }); - vi.mocked(maybeResumeChatStream).mockResolvedValue(resumeResponse); const res = await handleChatWorkflowStream(makeRequest()); - expect(res.headers.get("x-workflow-run-id")).toBe("wrun_existing"); + expect(res.status).toBe(409); expect(start).not.toHaveBeenCalled(); }); - it("returns the conflict response when maybeResumeChatStream yields 409", async () => { - const conflict = NextResponse.json({ status: "error", error: "conflict" }, { status: 409 }); - vi.mocked(maybeResumeChatStream).mockResolvedValue(conflict); + it("returns 503 (retryable) without starting when reconcile is indeterminate (conflict)", async () => { + vi.mocked(reconcileExistingActiveStream).mockResolvedValue({ action: "conflict" }); const res = await handleChatWorkflowStream(makeRequest()); - expect(res.status).toBe(409); + expect(res.status).toBe(503); expect(start).not.toHaveBeenCalled(); }); + + it("starts a fresh run when a terminal stale id self-heals (reconcile=ready)", async () => { + vi.mocked(reconcileExistingActiveStream).mockResolvedValue({ action: "ready" }); + vi.mocked(compareAndSetChatActiveStreamId) + .mockResolvedValueOnce({ ok: true, claimed: true }) + .mockResolvedValueOnce({ ok: true, claimed: true }); + mockStartedRun(); + const res = await handleChatWorkflowStream(makeRequest()); + expect(res.status).toBe(200); + expect(start).toHaveBeenCalled(); + }); }); describe("placeholder CAS before start", () => { diff --git a/lib/chat/__tests__/maybeResumeChatStream.test.ts b/lib/chat/__tests__/maybeResumeChatStream.test.ts deleted file mode 100644 index 999c29d24..000000000 --- a/lib/chat/__tests__/maybeResumeChatStream.test.ts +++ /dev/null @@ -1,46 +0,0 @@ -import { describe, it, expect, vi, beforeEach } from "vitest"; -import { maybeResumeChatStream } from "@/lib/chat/maybeResumeChatStream"; -import { reconcileExistingActiveStream } from "@/lib/chat/reconcileExistingActiveStream"; - -vi.mock("@/lib/chat/reconcileExistingActiveStream", () => ({ - reconcileExistingActiveStream: vi.fn(), -})); -vi.mock("@/lib/networking/getCorsHeaders", () => ({ - getCorsHeaders: vi.fn(() => ({ "Access-Control-Allow-Origin": "*" })), -})); - -beforeEach(() => vi.clearAllMocks()); - -describe("maybeResumeChatStream", () => { - it("returns null when there is no active_stream_id", async () => { - const res = await maybeResumeChatStream("chat-1", null); - expect(res).toBeNull(); - expect(reconcileExistingActiveStream).not.toHaveBeenCalled(); - }); - - it("returns null when reconcile says action=ready", async () => { - vi.mocked(reconcileExistingActiveStream).mockResolvedValue({ action: "ready" }); - const res = await maybeResumeChatStream("chat-1", "wrun_dead"); - expect(res).toBeNull(); - }); - - it("returns a 200 SSE response with x-workflow-run-id on resume", async () => { - const stream = new ReadableStream(); - vi.mocked(reconcileExistingActiveStream).mockResolvedValue({ - action: "resume", - runId: "wrun_live", - stream, - }); - const res = await maybeResumeChatStream("chat-1", "wrun_live"); - expect(res).not.toBeNull(); - expect(res!.status).toBe(200); - expect(res!.headers.get("x-workflow-run-id")).toBe("wrun_live"); - expect(res!.headers.get("content-type") ?? "").toMatch(/text\/event-stream/); - }); - - it("returns a 409 on conflict", async () => { - vi.mocked(reconcileExistingActiveStream).mockResolvedValue({ action: "conflict" }); - const res = await maybeResumeChatStream("chat-1", "wrun_x"); - expect(res!.status).toBe(409); - }); -}); diff --git a/lib/chat/__tests__/validateGetChatStreamRequest.test.ts b/lib/chat/__tests__/validateGetChatStreamRequest.test.ts new file mode 100644 index 000000000..65fb5fae3 --- /dev/null +++ b/lib/chat/__tests__/validateGetChatStreamRequest.test.ts @@ -0,0 +1,112 @@ +import { describe, it, expect, vi, beforeEach } from "vitest"; +import { NextRequest, NextResponse } from "next/server"; + +import { validateGetChatStreamRequest } from "@/lib/chat/validateGetChatStreamRequest"; +import { validateAuthContext } from "@/lib/auth/validateAuthContext"; +import { selectChats } from "@/lib/supabase/chats/selectChats"; +import { selectSessions } from "@/lib/supabase/sessions/selectSessions"; + +vi.mock("@/lib/auth/validateAuthContext", () => ({ validateAuthContext: vi.fn() })); +vi.mock("@/lib/supabase/chats/selectChats", () => ({ selectChats: vi.fn() })); +vi.mock("@/lib/supabase/sessions/selectSessions", () => ({ selectSessions: vi.fn() })); +vi.mock("@/lib/networking/getCorsHeaders", () => ({ + getCorsHeaders: vi.fn(() => ({ "Access-Control-Allow-Origin": "*" })), +})); + +const ACCOUNT_ID = "aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa"; +const OTHER_ACCOUNT_ID = "bbbbbbbb-bbbb-bbbb-bbbb-bbbbbbbbbbbb"; +const SESSION_ID = "22222222-2222-4222-8222-222222222222"; +const CHAT_ID = "11111111-1111-4111-8111-111111111111"; + +function makeRequest(): NextRequest { + return new NextRequest(`http://localhost/api/chat/${CHAT_ID}/stream`, { + method: "GET", + headers: { "x-api-key": "test-key" }, + }); +} + +function mockAuthed() { + vi.mocked(validateAuthContext).mockResolvedValue({ + accountId: ACCOUNT_ID, + orgId: null, + authToken: "test-key", + }); +} + +beforeEach(() => vi.clearAllMocks()); + +describe("validateGetChatStreamRequest", () => { + it("returns 400 when chatId is empty", async () => { + const res = await validateGetChatStreamRequest(makeRequest(), ""); + expect(res).toBeInstanceOf(NextResponse); + expect((res as NextResponse).status).toBe(400); + }); + + it("returns 400 when chatId is not a valid UUID", async () => { + const res = await validateGetChatStreamRequest(makeRequest(), "not-a-uuid"); + expect((res as NextResponse).status).toBe(400); + }); + + it("passes through the auth error response", async () => { + vi.mocked(validateAuthContext).mockResolvedValue( + NextResponse.json({ status: "error", error: "Unauthorized" }, { status: 401 }), + ); + const res = await validateGetChatStreamRequest(makeRequest(), CHAT_ID); + expect((res as NextResponse).status).toBe(401); + }); + + it("returns 404 when the chat does not exist", async () => { + mockAuthed(); + vi.mocked(selectChats).mockResolvedValue([]); + const res = await validateGetChatStreamRequest(makeRequest(), CHAT_ID); + expect((res as NextResponse).status).toBe(404); + }); + + it("returns 500 when the chat lookup errors (selectChats null)", async () => { + mockAuthed(); + vi.mocked(selectChats).mockResolvedValue(null); + const res = await validateGetChatStreamRequest(makeRequest(), CHAT_ID); + expect((res as NextResponse).status).toBe(500); + }); + + it("returns 500 when the session lookup errors", async () => { + mockAuthed(); + vi.mocked(selectChats).mockResolvedValue([{ id: CHAT_ID, session_id: SESSION_ID } as never]); + vi.mocked(selectSessions).mockResolvedValue(null); + const res = await validateGetChatStreamRequest(makeRequest(), CHAT_ID); + expect((res as NextResponse).status).toBe(500); + }); + + it("returns 404 when the session does not exist", async () => { + mockAuthed(); + vi.mocked(selectChats).mockResolvedValue([{ id: CHAT_ID, session_id: SESSION_ID } as never]); + vi.mocked(selectSessions).mockResolvedValue([]); + const res = await validateGetChatStreamRequest(makeRequest(), CHAT_ID); + expect((res as NextResponse).status).toBe(404); + }); + + it("returns 403 when the caller does not own the session", async () => { + mockAuthed(); + vi.mocked(selectChats).mockResolvedValue([{ id: CHAT_ID, session_id: SESSION_ID } as never]); + vi.mocked(selectSessions).mockResolvedValue([ + { id: SESSION_ID, account_id: OTHER_ACCOUNT_ID } as never, + ]); + const res = await validateGetChatStreamRequest(makeRequest(), CHAT_ID); + expect((res as NextResponse).status).toBe(403); + }); + + it("returns the chat row when the caller owns it", async () => { + mockAuthed(); + vi.mocked(selectChats).mockResolvedValue([ + { id: CHAT_ID, session_id: SESSION_ID, active_stream_id: "wrun_x" } as never, + ]); + vi.mocked(selectSessions).mockResolvedValue([ + { id: SESSION_ID, account_id: ACCOUNT_ID } as never, + ]); + const res = await validateGetChatStreamRequest(makeRequest(), CHAT_ID); + expect(res).not.toBeInstanceOf(NextResponse); + if (res instanceof NextResponse) return; + expect(res.chat.id).toBe(CHAT_ID); + expect(res.accountId).toBe(ACCOUNT_ID); + }); +}); diff --git a/lib/chat/handleChatStreamResume.ts b/lib/chat/handleChatStreamResume.ts new file mode 100644 index 000000000..f52e52978 --- /dev/null +++ b/lib/chat/handleChatStreamResume.ts @@ -0,0 +1,49 @@ +import type { NextRequest } from "next/server"; +import { NextResponse } from "next/server"; +import { createUIMessageStreamResponse, type UIMessageChunk } from "ai"; +import { validateGetChatStreamRequest } from "@/lib/chat/validateGetChatStreamRequest"; +import { reconcileExistingActiveStream } from "@/lib/chat/reconcileExistingActiveStream"; +import { getCorsHeaders } from "@/lib/networking/getCorsHeaders"; + +/** + * Handles GET /api/chat/[chatId]/stream — the resume/reconnect path. + * + * Reattaches the client to an in-flight workflow run for the chat. This is + * the ONLY way to resume; POST /api/chat/workflow never resumes. + * + * - No active stream id on the chat → 204 (nothing to resume). + * - Run still running/pending → 200 with the live UIMessage stream. + * - Run terminally done → stale id is cleared, 204. + * - Indeterminate probe (workflow API error) → 204 WITHOUT clearing the id, + * so a later request can resolve it rather than us falsely declaring the + * run dead. + * + * 204 is the contract the AI SDK's reconnect expects: it maps an empty + * response to "no active stream" and settles the chat to `ready`. + * + * @param request - The incoming GET request (auth headers only). + * @param chatId - Chat identifier from the route params. + * @returns A streaming 200 Response, a 204 Response, or a NextResponse error. + */ +/** 204 response signalling "no active stream to resume". */ +const noResumeResponse = (): Response => + new Response(null, { status: 204, headers: getCorsHeaders() }); + +export async function handleChatStreamResume( + request: NextRequest, + chatId: string, +): Promise { + const validated = await validateGetChatStreamRequest(request, chatId); + if (validated instanceof NextResponse) return validated; + + const { chat } = validated; + if (!chat.active_stream_id) return noResumeResponse(); + + const reconciled = await reconcileExistingActiveStream(chatId, chat.active_stream_id); + if (reconciled.action !== "resume") return noResumeResponse(); + + return createUIMessageStreamResponse({ + stream: reconciled.stream as ReadableStream, + headers: { ...getCorsHeaders(), "x-workflow-run-id": reconciled.runId }, + }); +} diff --git a/lib/chat/handleChatWorkflowStream.ts b/lib/chat/handleChatWorkflowStream.ts index 0e7af2f3e..18a453546 100644 --- a/lib/chat/handleChatWorkflowStream.ts +++ b/lib/chat/handleChatWorkflowStream.ts @@ -2,7 +2,7 @@ import { NextRequest, NextResponse } from "next/server"; import { createUIMessageStreamResponse, type UIMessageChunk } from "ai"; import { start, getRun } from "workflow/api"; import { validateChatWorkflow } from "@/lib/chat/validateChatWorkflow"; -import { maybeResumeChatStream } from "@/lib/chat/maybeResumeChatStream"; +import { reconcileExistingActiveStream } from "@/lib/chat/reconcileExistingActiveStream"; import { selectSessions } from "@/lib/supabase/sessions/selectSessions"; import { selectChats } from "@/lib/supabase/chats/selectChats"; import { compareAndSetChatActiveStreamId } from "@/lib/chat/compareAndSetChatActiveStreamId"; @@ -31,8 +31,10 @@ const DEFAULT_MODEL_ID = "anthropic/claude-haiku-4.5"; * * 1. Validate auth + body (validateChatWorkflow). * 2. Verify session + chat ownership; ensure the session has an active sandbox. - * 3. If a workflow is already running for this chat, resume / 409 via - * maybeResumeChatStream (extracted for OCP). + * 3. Gate on any existing run — resume is GET-only (GET /api/chat/[chatId]/stream), + * never on POST. A genuinely live run → 409 (reconnect via GET); an + * indeterminate probe → 503 (retry the POST shortly); a terminally-done + * run's stale id is cleared here so the next POST can start fresh. * 4. **Claim `chats.active_stream_id` BEFORE starting the workflow** using * a `pending-` placeholder CAS. Closes the race window where two * concurrent requests could both call `start()` and bill the model @@ -67,14 +69,35 @@ export async function handleChatWorkflowStream(request: NextRequest): Promise { - if (!activeStreamId) return null; - - const reconciled = await reconcileExistingActiveStream(chatId, activeStreamId); - - if (reconciled.action === "resume") { - return createUIMessageStreamResponse({ - stream: reconciled.stream as ReadableStream, - headers: { ...getCorsHeaders(), "x-workflow-run-id": reconciled.runId }, - }); - } - - if (reconciled.action === "conflict") { - return errorResponse("Another workflow is already running for this chat", 409); - } - - return null; // action: "ready" — caller starts a new workflow. -} diff --git a/lib/chat/validateGetChatStreamRequest.ts b/lib/chat/validateGetChatStreamRequest.ts new file mode 100644 index 000000000..8588a1b17 --- /dev/null +++ b/lib/chat/validateGetChatStreamRequest.ts @@ -0,0 +1,53 @@ +import type { NextRequest } from "next/server"; +import { NextResponse } from "next/server"; +import { z } from "zod"; +import { validateAuthContext } from "@/lib/auth/validateAuthContext"; +import { selectChats } from "@/lib/supabase/chats/selectChats"; +import { selectSessions } from "@/lib/supabase/sessions/selectSessions"; +import { errorResponse } from "@/lib/networking/errorResponse"; + +const chatIdSchema = z.string().uuid("chatId must be a valid UUID"); + +export type GetChatStreamRequest = { + chat: NonNullable>>[number]; + accountId: string; +}; + +/** + * Validates a GET /api/chat/[chatId]/stream request: authenticates the caller + * (x-api-key or Authorization bearer) and verifies they own the chat via its + * parent session. Returns a NextResponse error short-circuit (401/403/404/500) + * or the resolved chat row. + * + * Resume reattaches to a durable workflow run, so no sandbox-active check is + * needed here — the run lives in workflow infra, not the sandbox. + * + * @param request - The incoming GET request (auth headers only; no body). + * @param chatId - Chat identifier from the route params. + * @returns A NextResponse error or the validated, owned chat row. + */ +export async function validateGetChatStreamRequest( + request: NextRequest, + chatId: string, +): Promise { + const parsedChatId = chatIdSchema.safeParse(chatId); + if (!parsedChatId.success) { + return errorResponse(parsedChatId.error.issues[0]?.message ?? "Invalid chatId", 400); + } + + const auth = await validateAuthContext(request); + if (auth instanceof NextResponse) return auth; + + const chats = await selectChats({ id: parsedChatId.data }); + if (chats === null) return errorResponse("Internal server error", 500); + const chat = chats[0]; + if (!chat) return errorResponse("Chat not found", 404); + + const sessions = await selectSessions({ id: chat.session_id }); + if (sessions === null) return errorResponse("Internal server error", 500); + const session = sessions[0]; + if (!session) return errorResponse("Chat not found", 404); + if (session.account_id !== auth.accountId) return errorResponse("Forbidden", 403); + + return { chat, accountId: auth.accountId }; +} diff --git a/lib/sandbox/hasActiveStreamForSession.ts b/lib/sandbox/hasActiveStreamForSession.ts index 3529a5f3d..b8979f43c 100644 --- a/lib/sandbox/hasActiveStreamForSession.ts +++ b/lib/sandbox/hasActiveStreamForSession.ts @@ -11,5 +11,8 @@ import { selectChats } from "@/lib/supabase/chats/selectChats"; */ export async function hasActiveStreamForSession(sessionId: string): Promise { const chats = await selectChats({ sessionId }); + // On DB error we can't confirm the chats are idle; assume a stream may be + // active and defer hibernation rather than risk pausing mid-stream. + if (chats === null) return true; return chats.some(chat => chat.active_stream_id !== null); } diff --git a/lib/sessions/chats/createSessionChatHandler.ts b/lib/sessions/chats/createSessionChatHandler.ts index 868691246..7ef883d22 100644 --- a/lib/sessions/chats/createSessionChatHandler.ts +++ b/lib/sessions/chats/createSessionChatHandler.ts @@ -34,7 +34,14 @@ export async function createSessionChatHandler( const requestedChatId = body.id ?? null; if (requestedChatId) { - const existing = (await selectChats({ id: requestedChatId }))[0] ?? null; + const existingRows = await selectChats({ id: requestedChatId }); + if (existingRows === null) { + return NextResponse.json( + { status: "error", error: "Internal server error" }, + { status: 500, headers: getCorsHeaders() }, + ); + } + const existing = existingRows[0] ?? null; if (existing) { if (existing.session_id !== sessionId) { return NextResponse.json( diff --git a/lib/sessions/chats/getChatSummaries.ts b/lib/sessions/chats/getChatSummaries.ts index bddd1963a..572d6ad02 100644 --- a/lib/sessions/chats/getChatSummaries.ts +++ b/lib/sessions/chats/getChatSummaries.ts @@ -30,6 +30,10 @@ export async function getChatSummaries({ accountId: string; }): Promise { const chats = await selectChats({ sessionId }); + // Surface DB failures rather than masking them as an empty chat list. + if (chats === null) { + throw new Error("[getChatSummaries] failed to load chats"); + } if (chats.length === 0) { return []; } diff --git a/lib/supabase/chats/selectChats.ts b/lib/supabase/chats/selectChats.ts index e36c0454c..6e36bd0b5 100644 --- a/lib/supabase/chats/selectChats.ts +++ b/lib/supabase/chats/selectChats.ts @@ -11,12 +11,16 @@ interface SelectChatsFilter { /** * General-purpose `chats` reader. Pass any combination of filters to * narrow the result set; an unset filter is ignored. Mirrors the - * `selectSessions` pattern. Returns [] on Supabase error after logging. + * `selectSessions` pattern: returns `null` on Supabase error (so callers + * can distinguish a backend failure from a legitimately empty result), + * and `[]` when the query succeeds but matches no rows. * * @param filter - Optional filters narrowing the query. - * @returns Matching chat rows, or [] on error / no match. + * @returns Matching chat rows, `[]` on no match, or `null` on DB error. */ -export async function selectChats(filter: SelectChatsFilter = {}): Promise[]> { +export async function selectChats( + filter: SelectChatsFilter = {}, +): Promise[] | null> { let query = supabase.from("chats").select("*"); if (filter.id) query = query.eq("id", filter.id); if (filter.sessionId) query = query.eq("session_id", filter.sessionId); @@ -24,7 +28,7 @@ export async function selectChats(filter: SelectChatsFilter = {}): Promise