From baf01fa6ef9797b0d76db73ba88f74a107e3ac23 Mon Sep 17 00:00:00 2001 From: Arpit Gupta Date: Fri, 22 May 2026 05:53:31 +0530 Subject: [PATCH 1/2] feat(api): split chat-workflow resume into GET /api/chat/[chatId]/stream POST /api/chat/workflow is now start-only: a genuinely live run returns 409 instead of silently resuming and dropping the incoming messages. A terminal stale active_stream_id still self-heals so the next POST can start fresh. Resume moves to a dedicated GET /api/chat/[chatId]/stream that returns the live UIMessage stream or 204 when there is nothing to resume. Co-Authored-By: Claude Opus 4.7 (1M context) --- app/api/chat/[chatId]/stream/route.ts | 41 +++++++ .../__tests__/handleChatStreamResume.test.ts | 80 ++++++++++++++ .../handleChatWorkflowStream.test.ts | 41 ++++--- .../__tests__/maybeResumeChatStream.test.ts | 46 -------- .../validateGetChatStreamRequest.test.ts | 100 ++++++++++++++++++ lib/chat/handleChatStreamResume.ts | 49 +++++++++ lib/chat/handleChatWorkflowStream.ts | 24 +++-- lib/chat/maybeResumeChatStream.ts | 40 ------- lib/chat/validateGetChatStreamRequest.ts | 46 ++++++++ 9 files changed, 360 insertions(+), 107 deletions(-) create mode 100644 app/api/chat/[chatId]/stream/route.ts create mode 100644 lib/chat/__tests__/handleChatStreamResume.test.ts delete mode 100644 lib/chat/__tests__/maybeResumeChatStream.test.ts create mode 100644 lib/chat/__tests__/validateGetChatStreamRequest.test.ts create mode 100644 lib/chat/handleChatStreamResume.ts delete mode 100644 lib/chat/maybeResumeChatStream.ts create mode 100644 lib/chat/validateGetChatStreamRequest.ts diff --git a/app/api/chat/[chatId]/stream/route.ts b/app/api/chat/[chatId]/stream/route.ts new file mode 100644 index 000000000..9754cfc97 --- /dev/null +++ b/app/api/chat/[chatId]/stream/route.ts @@ -0,0 +1,41 @@ +import type { NextRequest } from "next/server"; +import { NextResponse } from "next/server"; +import { getCorsHeaders } from "@/lib/networking/getCorsHeaders"; +import { handleChatStreamResume } from "@/lib/chat/handleChatStreamResume"; + +export const maxDuration = 800; + +/** + * OPTIONS handler for CORS preflight requests. + * + * @returns A NextResponse with CORS headers. + */ +export async function OPTIONS() { + return new NextResponse(null, { + status: 200, + headers: getCorsHeaders(), + }); +} + +/** + * GET /api/chat/[chatId]/stream + * + * Resume/reconnect endpoint for the durable chat-workflow agent loop. Returns + * the live UIMessage stream when a run is still in flight, or 204 when there + * is nothing to resume. This is the only resume path — POST /api/chat/workflow + * never resumes. + * + * Authentication: x-api-key or Authorization bearer (caller must own the chat). + * + * @param request - The incoming request. + * @param context - Next.js route context. + * @param context.params - Async route params containing the chat id. + * @returns A streaming Response (200), an empty 204, or an error response. + */ +export async function GET( + request: NextRequest, + context: { params: Promise<{ chatId: string }> }, +): Promise { + const { chatId } = await context.params; + return handleChatStreamResume(request, chatId); +} diff --git a/lib/chat/__tests__/handleChatStreamResume.test.ts b/lib/chat/__tests__/handleChatStreamResume.test.ts new file mode 100644 index 000000000..1b99f1a96 --- /dev/null +++ b/lib/chat/__tests__/handleChatStreamResume.test.ts @@ -0,0 +1,80 @@ +import { describe, it, expect, vi, beforeEach } from "vitest"; +import { NextRequest, NextResponse } from "next/server"; + +import { handleChatStreamResume } from "@/lib/chat/handleChatStreamResume"; +import { validateGetChatStreamRequest } from "@/lib/chat/validateGetChatStreamRequest"; +import { reconcileExistingActiveStream } from "@/lib/chat/reconcileExistingActiveStream"; + +vi.mock("@/lib/chat/validateGetChatStreamRequest", () => ({ + validateGetChatStreamRequest: vi.fn(), +})); +vi.mock("@/lib/chat/reconcileExistingActiveStream", () => ({ + reconcileExistingActiveStream: vi.fn(), +})); +vi.mock("@/lib/networking/getCorsHeaders", () => ({ + getCorsHeaders: vi.fn(() => ({ "Access-Control-Allow-Origin": "*" })), +})); + +const ACCOUNT_ID = "aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa"; +const CHAT_ID = "11111111-1111-1111-1111-111111111111"; + +function makeRequest(): NextRequest { + return new NextRequest(`http://localhost/api/chat/${CHAT_ID}/stream`, { + method: "GET", + headers: { "x-api-key": "test-key" }, + }); +} + +function mockValidatedChat(activeStreamId: string | null) { + vi.mocked(validateGetChatStreamRequest).mockResolvedValue({ + chat: { id: CHAT_ID, active_stream_id: activeStreamId } as never, + accountId: ACCOUNT_ID, + }); +} + +beforeEach(() => vi.clearAllMocks()); + +describe("handleChatStreamResume", () => { + it("passes through the validator's error response", async () => { + vi.mocked(validateGetChatStreamRequest).mockResolvedValue( + NextResponse.json({ status: "error", error: "Forbidden" }, { status: 403 }), + ); + const res = await handleChatStreamResume(makeRequest(), CHAT_ID); + expect(res.status).toBe(403); + expect(reconcileExistingActiveStream).not.toHaveBeenCalled(); + }); + + it("returns 204 when the chat has no active stream id", async () => { + mockValidatedChat(null); + const res = await handleChatStreamResume(makeRequest(), CHAT_ID); + expect(res.status).toBe(204); + expect(reconcileExistingActiveStream).not.toHaveBeenCalled(); + }); + + it("returns 200 with the live stream and x-workflow-run-id when resumable", async () => { + mockValidatedChat("wrun_live"); + vi.mocked(reconcileExistingActiveStream).mockResolvedValue({ + action: "resume", + runId: "wrun_live", + stream: new ReadableStream(), + }); + const res = await handleChatStreamResume(makeRequest(), CHAT_ID); + expect(res.status).toBe(200); + expect(res.headers.get("content-type") ?? "").toMatch(/text\/event-stream/); + expect(res.headers.get("x-workflow-run-id")).toBe("wrun_live"); + }); + + it("returns 204 when the run is terminal (reconcile=ready, stale id cleared)", async () => { + mockValidatedChat("wrun_done"); + vi.mocked(reconcileExistingActiveStream).mockResolvedValue({ action: "ready" }); + const res = await handleChatStreamResume(makeRequest(), CHAT_ID); + expect(res.status).toBe(204); + }); + + it("returns 204 when the probe is indeterminate (reconcile=conflict)", async () => { + mockValidatedChat("wrun_unknown"); + vi.mocked(reconcileExistingActiveStream).mockResolvedValue({ action: "conflict" }); + const res = await handleChatStreamResume(makeRequest(), CHAT_ID); + expect(res.status).toBe(204); + }); +}); diff --git a/lib/chat/__tests__/handleChatWorkflowStream.test.ts b/lib/chat/__tests__/handleChatWorkflowStream.test.ts index 702edb918..a4ae78656 100644 --- a/lib/chat/__tests__/handleChatWorkflowStream.test.ts +++ b/lib/chat/__tests__/handleChatWorkflowStream.test.ts @@ -8,7 +8,7 @@ import { selectChats } from "@/lib/supabase/chats/selectChats"; import { isSandboxActive } from "@/lib/sandbox/isSandboxActive"; import { updateSession } from "@/lib/supabase/sessions/updateSession"; import { compareAndSetChatActiveStreamId } from "@/lib/chat/compareAndSetChatActiveStreamId"; -import { maybeResumeChatStream } from "@/lib/chat/maybeResumeChatStream"; +import { reconcileExistingActiveStream } from "@/lib/chat/reconcileExistingActiveStream"; import { persistLatestUserMessage } from "@/lib/chat/persistLatestUserMessage"; import { start, getRun } from "workflow/api"; @@ -23,8 +23,8 @@ vi.mock("@/lib/supabase/sessions/updateSession", () => ({ updateSession: vi.fn() vi.mock("@/lib/sandbox/buildActiveLifecycleUpdate", () => ({ buildActiveLifecycleUpdate: vi.fn(() => ({})), })); -vi.mock("@/lib/chat/maybeResumeChatStream", () => ({ - maybeResumeChatStream: vi.fn(), +vi.mock("@/lib/chat/reconcileExistingActiveStream", () => ({ + reconcileExistingActiveStream: vi.fn(), })); vi.mock("@/lib/chat/persistLatestUserMessage", () => ({ persistLatestUserMessage: vi.fn(), @@ -109,8 +109,9 @@ function mockStartedRun(runId = "wrun_test_run_1") { beforeEach(() => { vi.clearAllMocks(); - // Default: maybeResumeChatStream returns null (no resume / no active stream) - vi.mocked(maybeResumeChatStream).mockResolvedValue(null); + // Default: a stale/terminal active stream that self-heals to "ready" so the + // POST proceeds. Only invoked when the chat has an active_stream_id. + vi.mocked(reconcileExistingActiveStream).mockResolvedValue({ action: "ready" }); }); describe("handleChatWorkflowStream", () => { @@ -166,31 +167,41 @@ describe("handleChatWorkflowStream", () => { }); }); - describe("resume / conflict via maybeResumeChatStream", () => { + describe("active stream handling (start-only; never resumes)", () => { beforeEach(() => { mockValidated(); mockSessionOwnedActive(); mockChatOwned({ active_stream_id: "wrun_existing" }); }); - it("returns the resume response when maybeResumeChatStream yields one", async () => { - const resumeResponse = new Response("ok", { - status: 200, - headers: { "x-workflow-run-id": "wrun_existing" }, + it("returns 409 without starting when a run is live (reconcile=resume)", async () => { + vi.mocked(reconcileExistingActiveStream).mockResolvedValue({ + action: "resume", + runId: "wrun_existing", + stream: new ReadableStream(), }); - vi.mocked(maybeResumeChatStream).mockResolvedValue(resumeResponse); const res = await handleChatWorkflowStream(makeRequest()); - expect(res.headers.get("x-workflow-run-id")).toBe("wrun_existing"); + expect(res.status).toBe(409); expect(start).not.toHaveBeenCalled(); }); - it("returns the conflict response when maybeResumeChatStream yields 409", async () => { - const conflict = NextResponse.json({ status: "error", error: "conflict" }, { status: 409 }); - vi.mocked(maybeResumeChatStream).mockResolvedValue(conflict); + it("returns 409 without starting when reconcile is indeterminate (conflict)", async () => { + vi.mocked(reconcileExistingActiveStream).mockResolvedValue({ action: "conflict" }); const res = await handleChatWorkflowStream(makeRequest()); expect(res.status).toBe(409); expect(start).not.toHaveBeenCalled(); }); + + it("starts a fresh run when a terminal stale id self-heals (reconcile=ready)", async () => { + vi.mocked(reconcileExistingActiveStream).mockResolvedValue({ action: "ready" }); + vi.mocked(compareAndSetChatActiveStreamId) + .mockResolvedValueOnce({ ok: true, claimed: true }) + .mockResolvedValueOnce({ ok: true, claimed: true }); + mockStartedRun(); + const res = await handleChatWorkflowStream(makeRequest()); + expect(res.status).toBe(200); + expect(start).toHaveBeenCalled(); + }); }); describe("placeholder CAS before start", () => { diff --git a/lib/chat/__tests__/maybeResumeChatStream.test.ts b/lib/chat/__tests__/maybeResumeChatStream.test.ts deleted file mode 100644 index 999c29d24..000000000 --- a/lib/chat/__tests__/maybeResumeChatStream.test.ts +++ /dev/null @@ -1,46 +0,0 @@ -import { describe, it, expect, vi, beforeEach } from "vitest"; -import { maybeResumeChatStream } from "@/lib/chat/maybeResumeChatStream"; -import { reconcileExistingActiveStream } from "@/lib/chat/reconcileExistingActiveStream"; - -vi.mock("@/lib/chat/reconcileExistingActiveStream", () => ({ - reconcileExistingActiveStream: vi.fn(), -})); -vi.mock("@/lib/networking/getCorsHeaders", () => ({ - getCorsHeaders: vi.fn(() => ({ "Access-Control-Allow-Origin": "*" })), -})); - -beforeEach(() => vi.clearAllMocks()); - -describe("maybeResumeChatStream", () => { - it("returns null when there is no active_stream_id", async () => { - const res = await maybeResumeChatStream("chat-1", null); - expect(res).toBeNull(); - expect(reconcileExistingActiveStream).not.toHaveBeenCalled(); - }); - - it("returns null when reconcile says action=ready", async () => { - vi.mocked(reconcileExistingActiveStream).mockResolvedValue({ action: "ready" }); - const res = await maybeResumeChatStream("chat-1", "wrun_dead"); - expect(res).toBeNull(); - }); - - it("returns a 200 SSE response with x-workflow-run-id on resume", async () => { - const stream = new ReadableStream(); - vi.mocked(reconcileExistingActiveStream).mockResolvedValue({ - action: "resume", - runId: "wrun_live", - stream, - }); - const res = await maybeResumeChatStream("chat-1", "wrun_live"); - expect(res).not.toBeNull(); - expect(res!.status).toBe(200); - expect(res!.headers.get("x-workflow-run-id")).toBe("wrun_live"); - expect(res!.headers.get("content-type") ?? "").toMatch(/text\/event-stream/); - }); - - it("returns a 409 on conflict", async () => { - vi.mocked(reconcileExistingActiveStream).mockResolvedValue({ action: "conflict" }); - const res = await maybeResumeChatStream("chat-1", "wrun_x"); - expect(res!.status).toBe(409); - }); -}); diff --git a/lib/chat/__tests__/validateGetChatStreamRequest.test.ts b/lib/chat/__tests__/validateGetChatStreamRequest.test.ts new file mode 100644 index 000000000..7b915d9c7 --- /dev/null +++ b/lib/chat/__tests__/validateGetChatStreamRequest.test.ts @@ -0,0 +1,100 @@ +import { describe, it, expect, vi, beforeEach } from "vitest"; +import { NextRequest, NextResponse } from "next/server"; + +import { validateGetChatStreamRequest } from "@/lib/chat/validateGetChatStreamRequest"; +import { validateAuthContext } from "@/lib/auth/validateAuthContext"; +import { selectChats } from "@/lib/supabase/chats/selectChats"; +import { selectSessions } from "@/lib/supabase/sessions/selectSessions"; + +vi.mock("@/lib/auth/validateAuthContext", () => ({ validateAuthContext: vi.fn() })); +vi.mock("@/lib/supabase/chats/selectChats", () => ({ selectChats: vi.fn() })); +vi.mock("@/lib/supabase/sessions/selectSessions", () => ({ selectSessions: vi.fn() })); +vi.mock("@/lib/networking/getCorsHeaders", () => ({ + getCorsHeaders: vi.fn(() => ({ "Access-Control-Allow-Origin": "*" })), +})); + +const ACCOUNT_ID = "aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa"; +const OTHER_ACCOUNT_ID = "bbbbbbbb-bbbb-bbbb-bbbb-bbbbbbbbbbbb"; +const SESSION_ID = "22222222-2222-2222-2222-222222222222"; +const CHAT_ID = "11111111-1111-1111-1111-111111111111"; + +function makeRequest(): NextRequest { + return new NextRequest(`http://localhost/api/chat/${CHAT_ID}/stream`, { + method: "GET", + headers: { "x-api-key": "test-key" }, + }); +} + +function mockAuthed() { + vi.mocked(validateAuthContext).mockResolvedValue({ + accountId: ACCOUNT_ID, + orgId: null, + authToken: "test-key", + }); +} + +beforeEach(() => vi.clearAllMocks()); + +describe("validateGetChatStreamRequest", () => { + it("returns 400 when chatId is empty", async () => { + const res = await validateGetChatStreamRequest(makeRequest(), ""); + expect(res).toBeInstanceOf(NextResponse); + expect((res as NextResponse).status).toBe(400); + }); + + it("passes through the auth error response", async () => { + vi.mocked(validateAuthContext).mockResolvedValue( + NextResponse.json({ status: "error", error: "Unauthorized" }, { status: 401 }), + ); + const res = await validateGetChatStreamRequest(makeRequest(), CHAT_ID); + expect((res as NextResponse).status).toBe(401); + }); + + it("returns 404 when the chat does not exist", async () => { + mockAuthed(); + vi.mocked(selectChats).mockResolvedValue([]); + const res = await validateGetChatStreamRequest(makeRequest(), CHAT_ID); + expect((res as NextResponse).status).toBe(404); + }); + + it("returns 500 when the session lookup errors", async () => { + mockAuthed(); + vi.mocked(selectChats).mockResolvedValue([{ id: CHAT_ID, session_id: SESSION_ID } as never]); + vi.mocked(selectSessions).mockResolvedValue(null); + const res = await validateGetChatStreamRequest(makeRequest(), CHAT_ID); + expect((res as NextResponse).status).toBe(500); + }); + + it("returns 404 when the session does not exist", async () => { + mockAuthed(); + vi.mocked(selectChats).mockResolvedValue([{ id: CHAT_ID, session_id: SESSION_ID } as never]); + vi.mocked(selectSessions).mockResolvedValue([]); + const res = await validateGetChatStreamRequest(makeRequest(), CHAT_ID); + expect((res as NextResponse).status).toBe(404); + }); + + it("returns 403 when the caller does not own the session", async () => { + mockAuthed(); + vi.mocked(selectChats).mockResolvedValue([{ id: CHAT_ID, session_id: SESSION_ID } as never]); + vi.mocked(selectSessions).mockResolvedValue([ + { id: SESSION_ID, account_id: OTHER_ACCOUNT_ID } as never, + ]); + const res = await validateGetChatStreamRequest(makeRequest(), CHAT_ID); + expect((res as NextResponse).status).toBe(403); + }); + + it("returns the chat row when the caller owns it", async () => { + mockAuthed(); + vi.mocked(selectChats).mockResolvedValue([ + { id: CHAT_ID, session_id: SESSION_ID, active_stream_id: "wrun_x" } as never, + ]); + vi.mocked(selectSessions).mockResolvedValue([ + { id: SESSION_ID, account_id: ACCOUNT_ID } as never, + ]); + const res = await validateGetChatStreamRequest(makeRequest(), CHAT_ID); + expect(res).not.toBeInstanceOf(NextResponse); + if (res instanceof NextResponse) return; + expect(res.chat.id).toBe(CHAT_ID); + expect(res.accountId).toBe(ACCOUNT_ID); + }); +}); diff --git a/lib/chat/handleChatStreamResume.ts b/lib/chat/handleChatStreamResume.ts new file mode 100644 index 000000000..b5028559c --- /dev/null +++ b/lib/chat/handleChatStreamResume.ts @@ -0,0 +1,49 @@ +import type { NextRequest } from "next/server"; +import { NextResponse } from "next/server"; +import { createUIMessageStreamResponse, type UIMessageChunk } from "ai"; +import { validateGetChatStreamRequest } from "@/lib/chat/validateGetChatStreamRequest"; +import { reconcileExistingActiveStream } from "@/lib/chat/reconcileExistingActiveStream"; +import { getCorsHeaders } from "@/lib/networking/getCorsHeaders"; + +/** + * Handles GET /api/chat/[chatId]/stream — the resume/reconnect path. + * + * Reattaches the client to an in-flight workflow run for the chat. This is + * the ONLY way to resume; POST /api/chat/workflow never resumes. + * + * - No active stream id on the chat → 204 (nothing to resume). + * - Run still running/pending → 200 with the live UIMessage stream. + * - Run terminally done → stale id is cleared, 204. + * - Indeterminate probe (workflow API error) → 204 WITHOUT clearing the id, + * so a later request can resolve it rather than us falsely declaring the + * run dead. + * + * 204 is the contract the AI SDK's reconnect expects: it maps an empty + * response to "no active stream" and settles the chat to `ready`. + * + * @param request - The incoming GET request (auth headers only). + * @param chatId - Chat identifier from the route params. + * @returns A streaming 200 Response, a 204 Response, or a NextResponse error. + */ +export async function handleChatStreamResume( + request: NextRequest, + chatId: string, +): Promise { + const validated = await validateGetChatStreamRequest(request, chatId); + if (validated instanceof NextResponse) return validated; + + const { chat } = validated; + if (!chat.active_stream_id) { + return new Response(null, { status: 204, headers: getCorsHeaders() }); + } + + const reconciled = await reconcileExistingActiveStream(chatId, chat.active_stream_id); + if (reconciled.action !== "resume") { + return new Response(null, { status: 204, headers: getCorsHeaders() }); + } + + return createUIMessageStreamResponse({ + stream: reconciled.stream as ReadableStream, + headers: { ...getCorsHeaders(), "x-workflow-run-id": reconciled.runId }, + }); +} diff --git a/lib/chat/handleChatWorkflowStream.ts b/lib/chat/handleChatWorkflowStream.ts index 818c70f8c..17d81bc93 100644 --- a/lib/chat/handleChatWorkflowStream.ts +++ b/lib/chat/handleChatWorkflowStream.ts @@ -2,7 +2,7 @@ import { NextRequest, NextResponse } from "next/server"; import { createUIMessageStreamResponse, type UIMessageChunk } from "ai"; import { start, getRun } from "workflow/api"; import { validateChatWorkflow } from "@/lib/chat/validateChatWorkflow"; -import { maybeResumeChatStream } from "@/lib/chat/maybeResumeChatStream"; +import { reconcileExistingActiveStream } from "@/lib/chat/reconcileExistingActiveStream"; import { selectSessions } from "@/lib/supabase/sessions/selectSessions"; import { selectChats } from "@/lib/supabase/chats/selectChats"; import { compareAndSetChatActiveStreamId } from "@/lib/chat/compareAndSetChatActiveStreamId"; @@ -30,8 +30,10 @@ const DEFAULT_MODEL_ID = "anthropic/claude-haiku-4.5"; * * 1. Validate auth + body (validateChatWorkflow). * 2. Verify session + chat ownership; ensure the session has an active sandbox. - * 3. If a workflow is already running for this chat, resume / 409 via - * maybeResumeChatStream (extracted for OCP). + * 3. If a workflow is already running for this chat, reject with 409 — + * resume is GET-only (GET /api/chat/[chatId]/stream), never on POST. + * A terminally-done run's stale id is cleared here so the next POST can + * start fresh. * 4. **Claim `chats.active_stream_id` BEFORE starting the workflow** using * a `pending-` placeholder CAS. Closes the race window where two * concurrent requests could both call `start()` and bill the model @@ -71,9 +73,19 @@ export async function handleChatWorkflowStream(request: NextRequest): Promise { - if (!activeStreamId) return null; - - const reconciled = await reconcileExistingActiveStream(chatId, activeStreamId); - - if (reconciled.action === "resume") { - return createUIMessageStreamResponse({ - stream: reconciled.stream as ReadableStream, - headers: { ...getCorsHeaders(), "x-workflow-run-id": reconciled.runId }, - }); - } - - if (reconciled.action === "conflict") { - return errorResponse("Another workflow is already running for this chat", 409); - } - - return null; // action: "ready" — caller starts a new workflow. -} diff --git a/lib/chat/validateGetChatStreamRequest.ts b/lib/chat/validateGetChatStreamRequest.ts new file mode 100644 index 000000000..81aebd3ec --- /dev/null +++ b/lib/chat/validateGetChatStreamRequest.ts @@ -0,0 +1,46 @@ +import type { NextRequest } from "next/server"; +import { NextResponse } from "next/server"; +import { validateAuthContext } from "@/lib/auth/validateAuthContext"; +import { selectChats } from "@/lib/supabase/chats/selectChats"; +import { selectSessions } from "@/lib/supabase/sessions/selectSessions"; +import { errorResponse } from "@/lib/networking/errorResponse"; + +export type GetChatStreamRequest = { + chat: Awaited>[number]; + accountId: string; +}; + +/** + * Validates a GET /api/chat/[chatId]/stream request: authenticates the caller + * (x-api-key or Authorization bearer) and verifies they own the chat via its + * parent session. Returns a NextResponse error short-circuit (401/403/404/500) + * or the resolved chat row. + * + * Resume reattaches to a durable workflow run, so no sandbox-active check is + * needed here — the run lives in workflow infra, not the sandbox. + * + * @param request - The incoming GET request (auth headers only; no body). + * @param chatId - Chat identifier from the route params. + * @returns A NextResponse error or the validated, owned chat row. + */ +export async function validateGetChatStreamRequest( + request: NextRequest, + chatId: string, +): Promise { + if (!chatId) return errorResponse("chatId is required", 400); + + const auth = await validateAuthContext(request); + if (auth instanceof NextResponse) return auth; + + const chats = await selectChats({ id: chatId }); + const chat = chats[0]; + if (!chat) return errorResponse("Chat not found", 404); + + const sessions = await selectSessions({ id: chat.session_id }); + if (sessions === null) return errorResponse("Internal server error", 500); + const session = sessions[0]; + if (!session) return errorResponse("Chat not found", 404); + if (session.account_id !== auth.accountId) return errorResponse("Forbidden", 403); + + return { chat, accountId: auth.accountId }; +} From 1eea16d6c81d8963396b784c051aa2e34d274226 Mon Sep 17 00:00:00 2001 From: Arpit Gupta Date: Sat, 23 May 2026 05:02:15 +0530 Subject: [PATCH 2/2] =?UTF-8?q?fix(chat):=20address=20review=20=E2=80=94?= =?UTF-8?q?=20retryable=20503=20on=20indeterminate=20stream,=20UUID=20+=20?= =?UTF-8?q?DB-error=20handling?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - POST /api/chat/workflow: distinguish a live run (409, reconnect via GET) from an indeterminate reconcile (503, retry POST) instead of 409 for both — avoids a dead-end where GET returns 204 and the message is lost. - selectChats now returns null on DB error (matching selectSessions) instead of swallowing into []; all callers handle null (validators/handlers → 500, hasActiveStreamForSession defers hibernation, getChatSummaries surfaces). - validateGetChatStreamRequest validates chatId as a UUID (400) and maps a selectChats DB error to 500 rather than masking it as 404. - Extract noResumeResponse() helper to de-dupe the 204 branch. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../handleChatWorkflowStream.test.ts | 4 +-- .../validateGetChatStreamRequest.test.ts | 16 ++++++++-- lib/chat/handleChatStreamResume.ts | 12 ++++---- lib/chat/handleChatWorkflowStream.ts | 29 +++++++++++++------ lib/chat/validateGetChatStreamRequest.ts | 13 +++++++-- lib/sandbox/hasActiveStreamForSession.ts | 3 ++ .../chats/createSessionChatHandler.ts | 9 +++++- lib/sessions/chats/getChatSummaries.ts | 4 +++ lib/supabase/chats/selectChats.ts | 12 +++++--- 9 files changed, 75 insertions(+), 27 deletions(-) diff --git a/lib/chat/__tests__/handleChatWorkflowStream.test.ts b/lib/chat/__tests__/handleChatWorkflowStream.test.ts index a4ae78656..57c6c8a18 100644 --- a/lib/chat/__tests__/handleChatWorkflowStream.test.ts +++ b/lib/chat/__tests__/handleChatWorkflowStream.test.ts @@ -185,10 +185,10 @@ describe("handleChatWorkflowStream", () => { expect(start).not.toHaveBeenCalled(); }); - it("returns 409 without starting when reconcile is indeterminate (conflict)", async () => { + it("returns 503 (retryable) without starting when reconcile is indeterminate (conflict)", async () => { vi.mocked(reconcileExistingActiveStream).mockResolvedValue({ action: "conflict" }); const res = await handleChatWorkflowStream(makeRequest()); - expect(res.status).toBe(409); + expect(res.status).toBe(503); expect(start).not.toHaveBeenCalled(); }); diff --git a/lib/chat/__tests__/validateGetChatStreamRequest.test.ts b/lib/chat/__tests__/validateGetChatStreamRequest.test.ts index 7b915d9c7..65fb5fae3 100644 --- a/lib/chat/__tests__/validateGetChatStreamRequest.test.ts +++ b/lib/chat/__tests__/validateGetChatStreamRequest.test.ts @@ -15,8 +15,8 @@ vi.mock("@/lib/networking/getCorsHeaders", () => ({ const ACCOUNT_ID = "aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa"; const OTHER_ACCOUNT_ID = "bbbbbbbb-bbbb-bbbb-bbbb-bbbbbbbbbbbb"; -const SESSION_ID = "22222222-2222-2222-2222-222222222222"; -const CHAT_ID = "11111111-1111-1111-1111-111111111111"; +const SESSION_ID = "22222222-2222-4222-8222-222222222222"; +const CHAT_ID = "11111111-1111-4111-8111-111111111111"; function makeRequest(): NextRequest { return new NextRequest(`http://localhost/api/chat/${CHAT_ID}/stream`, { @@ -42,6 +42,11 @@ describe("validateGetChatStreamRequest", () => { expect((res as NextResponse).status).toBe(400); }); + it("returns 400 when chatId is not a valid UUID", async () => { + const res = await validateGetChatStreamRequest(makeRequest(), "not-a-uuid"); + expect((res as NextResponse).status).toBe(400); + }); + it("passes through the auth error response", async () => { vi.mocked(validateAuthContext).mockResolvedValue( NextResponse.json({ status: "error", error: "Unauthorized" }, { status: 401 }), @@ -57,6 +62,13 @@ describe("validateGetChatStreamRequest", () => { expect((res as NextResponse).status).toBe(404); }); + it("returns 500 when the chat lookup errors (selectChats null)", async () => { + mockAuthed(); + vi.mocked(selectChats).mockResolvedValue(null); + const res = await validateGetChatStreamRequest(makeRequest(), CHAT_ID); + expect((res as NextResponse).status).toBe(500); + }); + it("returns 500 when the session lookup errors", async () => { mockAuthed(); vi.mocked(selectChats).mockResolvedValue([{ id: CHAT_ID, session_id: SESSION_ID } as never]); diff --git a/lib/chat/handleChatStreamResume.ts b/lib/chat/handleChatStreamResume.ts index b5028559c..f52e52978 100644 --- a/lib/chat/handleChatStreamResume.ts +++ b/lib/chat/handleChatStreamResume.ts @@ -25,6 +25,10 @@ import { getCorsHeaders } from "@/lib/networking/getCorsHeaders"; * @param chatId - Chat identifier from the route params. * @returns A streaming 200 Response, a 204 Response, or a NextResponse error. */ +/** 204 response signalling "no active stream to resume". */ +const noResumeResponse = (): Response => + new Response(null, { status: 204, headers: getCorsHeaders() }); + export async function handleChatStreamResume( request: NextRequest, chatId: string, @@ -33,14 +37,10 @@ export async function handleChatStreamResume( if (validated instanceof NextResponse) return validated; const { chat } = validated; - if (!chat.active_stream_id) { - return new Response(null, { status: 204, headers: getCorsHeaders() }); - } + if (!chat.active_stream_id) return noResumeResponse(); const reconciled = await reconcileExistingActiveStream(chatId, chat.active_stream_id); - if (reconciled.action !== "resume") { - return new Response(null, { status: 204, headers: getCorsHeaders() }); - } + if (reconciled.action !== "resume") return noResumeResponse(); return createUIMessageStreamResponse({ stream: reconciled.stream as ReadableStream, diff --git a/lib/chat/handleChatWorkflowStream.ts b/lib/chat/handleChatWorkflowStream.ts index 17d81bc93..6fd339efd 100644 --- a/lib/chat/handleChatWorkflowStream.ts +++ b/lib/chat/handleChatWorkflowStream.ts @@ -30,10 +30,10 @@ const DEFAULT_MODEL_ID = "anthropic/claude-haiku-4.5"; * * 1. Validate auth + body (validateChatWorkflow). * 2. Verify session + chat ownership; ensure the session has an active sandbox. - * 3. If a workflow is already running for this chat, reject with 409 — - * resume is GET-only (GET /api/chat/[chatId]/stream), never on POST. - * A terminally-done run's stale id is cleared here so the next POST can - * start fresh. + * 3. Gate on any existing run — resume is GET-only (GET /api/chat/[chatId]/stream), + * never on POST. A genuinely live run → 409 (reconnect via GET); an + * indeterminate probe → 503 (retry the POST shortly); a terminally-done + * run's stale id is cleared here so the next POST can start fresh. * 4. **Claim `chats.active_stream_id` BEFORE starting the workflow** using * a `pending-` placeholder CAS. Closes the race window where two * concurrent requests could both call `start()` and bill the model @@ -68,23 +68,34 @@ export async function handleChatWorkflowStream(request: NextRequest): Promise>[number]; + chat: NonNullable>>[number]; accountId: string; }; @@ -27,12 +30,16 @@ export async function validateGetChatStreamRequest( request: NextRequest, chatId: string, ): Promise { - if (!chatId) return errorResponse("chatId is required", 400); + const parsedChatId = chatIdSchema.safeParse(chatId); + if (!parsedChatId.success) { + return errorResponse(parsedChatId.error.issues[0]?.message ?? "Invalid chatId", 400); + } const auth = await validateAuthContext(request); if (auth instanceof NextResponse) return auth; - const chats = await selectChats({ id: chatId }); + const chats = await selectChats({ id: parsedChatId.data }); + if (chats === null) return errorResponse("Internal server error", 500); const chat = chats[0]; if (!chat) return errorResponse("Chat not found", 404); diff --git a/lib/sandbox/hasActiveStreamForSession.ts b/lib/sandbox/hasActiveStreamForSession.ts index 3529a5f3d..b8979f43c 100644 --- a/lib/sandbox/hasActiveStreamForSession.ts +++ b/lib/sandbox/hasActiveStreamForSession.ts @@ -11,5 +11,8 @@ import { selectChats } from "@/lib/supabase/chats/selectChats"; */ export async function hasActiveStreamForSession(sessionId: string): Promise { const chats = await selectChats({ sessionId }); + // On DB error we can't confirm the chats are idle; assume a stream may be + // active and defer hibernation rather than risk pausing mid-stream. + if (chats === null) return true; return chats.some(chat => chat.active_stream_id !== null); } diff --git a/lib/sessions/chats/createSessionChatHandler.ts b/lib/sessions/chats/createSessionChatHandler.ts index 868691246..7ef883d22 100644 --- a/lib/sessions/chats/createSessionChatHandler.ts +++ b/lib/sessions/chats/createSessionChatHandler.ts @@ -34,7 +34,14 @@ export async function createSessionChatHandler( const requestedChatId = body.id ?? null; if (requestedChatId) { - const existing = (await selectChats({ id: requestedChatId }))[0] ?? null; + const existingRows = await selectChats({ id: requestedChatId }); + if (existingRows === null) { + return NextResponse.json( + { status: "error", error: "Internal server error" }, + { status: 500, headers: getCorsHeaders() }, + ); + } + const existing = existingRows[0] ?? null; if (existing) { if (existing.session_id !== sessionId) { return NextResponse.json( diff --git a/lib/sessions/chats/getChatSummaries.ts b/lib/sessions/chats/getChatSummaries.ts index bddd1963a..572d6ad02 100644 --- a/lib/sessions/chats/getChatSummaries.ts +++ b/lib/sessions/chats/getChatSummaries.ts @@ -30,6 +30,10 @@ export async function getChatSummaries({ accountId: string; }): Promise { const chats = await selectChats({ sessionId }); + // Surface DB failures rather than masking them as an empty chat list. + if (chats === null) { + throw new Error("[getChatSummaries] failed to load chats"); + } if (chats.length === 0) { return []; } diff --git a/lib/supabase/chats/selectChats.ts b/lib/supabase/chats/selectChats.ts index e36c0454c..6e36bd0b5 100644 --- a/lib/supabase/chats/selectChats.ts +++ b/lib/supabase/chats/selectChats.ts @@ -11,12 +11,16 @@ interface SelectChatsFilter { /** * General-purpose `chats` reader. Pass any combination of filters to * narrow the result set; an unset filter is ignored. Mirrors the - * `selectSessions` pattern. Returns [] on Supabase error after logging. + * `selectSessions` pattern: returns `null` on Supabase error (so callers + * can distinguish a backend failure from a legitimately empty result), + * and `[]` when the query succeeds but matches no rows. * * @param filter - Optional filters narrowing the query. - * @returns Matching chat rows, or [] on error / no match. + * @returns Matching chat rows, `[]` on no match, or `null` on DB error. */ -export async function selectChats(filter: SelectChatsFilter = {}): Promise[]> { +export async function selectChats( + filter: SelectChatsFilter = {}, +): Promise[] | null> { let query = supabase.from("chats").select("*"); if (filter.id) query = query.eq("id", filter.id); if (filter.sessionId) query = query.eq("session_id", filter.sessionId); @@ -24,7 +28,7 @@ export async function selectChats(filter: SelectChatsFilter = {}): Promise