diff --git a/app/lib/workflows/__tests__/generateAssistantMessageId.test.ts b/app/lib/workflows/__tests__/generateAssistantMessageId.test.ts new file mode 100644 index 000000000..c83cebfb1 --- /dev/null +++ b/app/lib/workflows/__tests__/generateAssistantMessageId.test.ts @@ -0,0 +1,20 @@ +import { describe, it, expect, vi } from "vitest"; +import { generateAssistantMessageId } from "@/app/lib/workflows/generateAssistantMessageId"; + +vi.mock("ai", async () => { + const actual = await vi.importActual("ai"); + return { ...actual, generateId: vi.fn(() => "generated-by-mock") }; +}); + +describe("generateAssistantMessageId", () => { + it("returns the value from ai's generateId()", async () => { + const id = await generateAssistantMessageId(); + expect(id).toBe("generated-by-mock"); + }); + + it("returns a string", async () => { + const id = await generateAssistantMessageId(); + expect(typeof id).toBe("string"); + expect(id.length).toBeGreaterThan(0); + }); +}); diff --git a/app/lib/workflows/__tests__/runAgentStep.test.ts b/app/lib/workflows/__tests__/runAgentStep.test.ts index b2e90475b..cfb101877 100644 --- a/app/lib/workflows/__tests__/runAgentStep.test.ts +++ b/app/lib/workflows/__tests__/runAgentStep.test.ts @@ -12,15 +12,28 @@ vi.mock("@ai-sdk/gateway", () => ({ gateway: vi.fn((modelId: string) => ({ modelId, __mock: "gateway" })), })); -function makeStreamResult(opts?: { metadataCalls?: Array }) { +function makeStreamResult(opts?: { + metadataCalls?: Array; + onFinishCalls?: Array; + emittedResponseMessage?: unknown; +}) { const calls = opts?.metadataCalls ?? []; + const onFinishCalls = opts?.onFinishCalls ?? []; return { - toUIMessageStream: vi.fn((streamOpts: { messageMetadata?: unknown }) => { - // Capture the callback so tests can inspect it + toUIMessageStream: vi.fn((streamOpts: { messageMetadata?: unknown; onFinish?: unknown }) => { + // Capture the callbacks so tests can inspect (and invoke) them calls.push(streamOpts.messageMetadata); + onFinishCalls.push(streamOpts.onFinish); return (async function* () { yield { type: "start" }; yield { type: "finish" }; + // Mirror the AI SDK contract: onFinish fires after the + // generator yields its last chunk with the assembled message. + if (typeof streamOpts.onFinish === "function" && opts?.emittedResponseMessage) { + (streamOpts.onFinish as (a: { responseMessage: unknown }) => void)({ + responseMessage: opts.emittedResponseMessage, + }); + } })(); }), finishReason: Promise.resolve("stop"), @@ -49,6 +62,7 @@ const baseInput = { agentContext: { sandbox: { state: { type: "vercel" }, workingDirectory: "/sandbox/mono" }, }, + assistantMessageId: "asst-test-id", }; describe("runAgentStep", () => { @@ -174,4 +188,67 @@ describe("runAgentStep", () => { expect(cb({ part: { type: "text-delta" } })).toBeUndefined(); expect(cb({ part: { type: "start" } })).toBeUndefined(); }); + + it("wires an onFinish callback into toUIMessageStream", async () => { + const onFinishCalls: unknown[] = []; + vi.mocked(streamText).mockReturnValue(makeStreamResult({ onFinishCalls }) as never); + const { stream } = makeWritable(); + + await runAgentStep({ ...baseInput, writable: stream } as never); + + expect(onFinishCalls).toHaveLength(1); + expect(typeof onFinishCalls[0]).toBe("function"); + }); + + it("returns the responseMessage captured from onFinish", async () => { + const emittedResponseMessage = { + id: "assistant-msg-1", + role: "assistant", + parts: [{ type: "text", text: "Hello" }], + }; + vi.mocked(streamText).mockReturnValue(makeStreamResult({ emittedResponseMessage }) as never); + const { stream } = makeWritable(); + + const result = await runAgentStep({ ...baseInput, writable: stream } as never); + + expect(result.responseMessage).toEqual(emittedResponseMessage); + expect(result.finishReason).toBe("stop"); + }); + + it("returns responseMessage: undefined when onFinish never fires", async () => { + // Default makeStreamResult — no emittedResponseMessage, so onFinish is wired but never invoked + vi.mocked(streamText).mockReturnValue(makeStreamResult() as never); + const { stream } = makeWritable(); + + const result = await runAgentStep({ ...baseInput, writable: stream } as never); + + expect(result.responseMessage).toBeUndefined(); + expect(result.finishReason).toBe("stop"); + }); + + it("forwards input.assistantMessageId into toUIMessageStream's generateMessageId", async () => { + const generateMessageIdCalls: unknown[] = []; + const streamResult = makeStreamResult(); + // Spy on the options passed to toUIMessageStream to grab the generateMessageId fn. + const originalToUIMessageStream = streamResult.toUIMessageStream; + streamResult.toUIMessageStream = vi.fn((streamOpts: { generateMessageId?: unknown }) => { + generateMessageIdCalls.push(streamOpts.generateMessageId); + return (originalToUIMessageStream as unknown as (o: unknown) => AsyncGenerator)( + streamOpts, + ); + }) as never; + vi.mocked(streamText).mockReturnValue(streamResult as never); + const { stream } = makeWritable(); + + await runAgentStep({ + ...baseInput, + writable: stream, + assistantMessageId: "asst-from-workflow-xyz", + } as never); + + expect(generateMessageIdCalls).toHaveLength(1); + const gen = generateMessageIdCalls[0] as () => string; + expect(typeof gen).toBe("function"); + expect(gen()).toBe("asst-from-workflow-xyz"); + }); }); diff --git a/app/lib/workflows/__tests__/runAgentWorkflow.test.ts b/app/lib/workflows/__tests__/runAgentWorkflow.test.ts index d061abbce..3e59ffc2d 100644 --- a/app/lib/workflows/__tests__/runAgentWorkflow.test.ts +++ b/app/lib/workflows/__tests__/runAgentWorkflow.test.ts @@ -3,6 +3,8 @@ import { runAgentWorkflow } from "@/app/lib/workflows/runAgentWorkflow"; import { runAgentStep } from "@/app/lib/workflows/runAgentStep"; import { clearChatActiveStream } from "@/lib/chat/clearChatActiveStream"; import { closeChatStream } from "@/app/lib/workflows/closeChatStream"; +import { generateAssistantMessageId } from "@/app/lib/workflows/generateAssistantMessageId"; +import { persistAssistantMessage } from "@/lib/chat/persistAssistantMessage"; vi.mock("@/app/lib/workflows/runAgentStep", () => ({ runAgentStep: vi.fn(), @@ -13,6 +15,12 @@ vi.mock("@/lib/chat/clearChatActiveStream", () => ({ vi.mock("@/app/lib/workflows/closeChatStream", () => ({ closeChatStream: vi.fn(), })); +vi.mock("@/app/lib/workflows/generateAssistantMessageId", () => ({ + generateAssistantMessageId: vi.fn(), +})); +vi.mock("@/lib/chat/persistAssistantMessage", () => ({ + persistAssistantMessage: vi.fn(), +})); // Captured writable stub so tests can assert closeChatStream got the // same instance the workflow body holds. const writableStub = new WritableStream(); @@ -26,7 +34,10 @@ vi.mock("workflow", () => ({ })), })); -beforeEach(() => vi.clearAllMocks()); +beforeEach(() => { + vi.clearAllMocks(); + vi.mocked(generateAssistantMessageId).mockResolvedValue("asst-fresh-id"); +}); const baseInput = { messages: [{ id: "m1", role: "user", parts: [{ type: "text", text: "hi" }] } as never], @@ -40,7 +51,10 @@ const baseInput = { describe("runAgentWorkflow", () => { it("clears active_stream_id after a successful run, using the workflow's own runId", async () => { - vi.mocked(runAgentStep).mockResolvedValue({ finishReason: "stop" }); + vi.mocked(runAgentStep).mockResolvedValue({ + finishReason: "stop", + responseMessage: undefined, + }); await runAgentWorkflow(baseInput); @@ -58,7 +72,10 @@ describe("runAgentWorkflow", () => { }); it("explicitly closes the chat writable after a successful run so SSE ends promptly", async () => { - vi.mocked(runAgentStep).mockResolvedValue({ finishReason: "stop" }); + vi.mocked(runAgentStep).mockResolvedValue({ + finishReason: "stop", + responseMessage: undefined, + }); await runAgentWorkflow(baseInput); @@ -74,4 +91,83 @@ describe("runAgentWorkflow", () => { expect(closeChatStream).toHaveBeenCalledTimes(1); expect(closeChatStream).toHaveBeenCalledWith(writableStub); }); + + it("persists the assistant message when runAgentStep returns one", async () => { + const responseMessage = { + id: "assistant-msg-xyz", + role: "assistant", + parts: [{ type: "text", text: "Hello!" }], + }; + vi.mocked(runAgentStep).mockResolvedValue({ + finishReason: "stop", + responseMessage: responseMessage as never, + }); + + await runAgentWorkflow(baseInput); + + expect(persistAssistantMessage).toHaveBeenCalledTimes(1); + expect(persistAssistantMessage).toHaveBeenCalledWith("chat-1", responseMessage); + }); + + it("does NOT call persistAssistantMessage when runAgentStep returns no responseMessage", async () => { + vi.mocked(runAgentStep).mockResolvedValue({ + finishReason: "stop", + responseMessage: undefined, + }); + + await runAgentWorkflow(baseInput); + + expect(persistAssistantMessage).not.toHaveBeenCalled(); + }); + + it("does NOT call persistAssistantMessage when runAgentStep throws (no message to persist)", async () => { + vi.mocked(runAgentStep).mockRejectedValue(new Error("model exploded")); + + await expect(runAgentWorkflow(baseInput)).rejects.toThrow("model exploded"); + + expect(persistAssistantMessage).not.toHaveBeenCalled(); + // But cleanup steps still run via the try/finally + expect(clearChatActiveStream).toHaveBeenCalledTimes(1); + expect(closeChatStream).toHaveBeenCalledTimes(1); + }); + + it("generates a fresh assistantMessageId via the step and forwards it to runAgentStep", async () => { + vi.mocked(runAgentStep).mockResolvedValue({ + finishReason: "stop", + responseMessage: undefined, + }); + + await runAgentWorkflow(baseInput); + + expect(generateAssistantMessageId).toHaveBeenCalledTimes(1); + expect(runAgentStep).toHaveBeenCalledWith( + expect.objectContaining({ assistantMessageId: "asst-fresh-id" }), + ); + }); + + it("reuses the latest assistant message id when resuming a tool-call turn (no fresh generation)", async () => { + vi.mocked(runAgentStep).mockResolvedValue({ + finishReason: "stop", + responseMessage: undefined, + }); + + const resumingInput = { + ...baseInput, + messages: [ + { id: "m1", role: "user", parts: [{ type: "text", text: "go" }] }, + { + id: "asst-in-progress", + role: "assistant", + parts: [{ type: "text", text: "thinking..." }], + }, + ] as never, + }; + + await runAgentWorkflow(resumingInput); + + expect(generateAssistantMessageId).not.toHaveBeenCalled(); + expect(runAgentStep).toHaveBeenCalledWith( + expect.objectContaining({ assistantMessageId: "asst-in-progress" }), + ); + }); }); diff --git a/app/lib/workflows/generateAssistantMessageId.ts b/app/lib/workflows/generateAssistantMessageId.ts new file mode 100644 index 000000000..d33d0ce65 --- /dev/null +++ b/app/lib/workflows/generateAssistantMessageId.ts @@ -0,0 +1,19 @@ +import { generateId } from "ai"; + +/** + * Vercel Workflow `"use step"` that returns a fresh message id. + * + * Wrapped as a step rather than inlined into the workflow body + * because `generateId()` is non-deterministic — calling it directly + * from workflow code would break the durable-replay contract (a + * replay would generate a *different* id and diverge from the + * captured event log). As a step, the result is captured in the + * event log once and reused on every replay. + * + * Mirrors open-agents' `generateId` step in + * `apps/web/app/workflows/chat.ts`. + */ +export async function generateAssistantMessageId(): Promise { + "use step"; + return generateId(); +} diff --git a/app/lib/workflows/runAgentStep.ts b/app/lib/workflows/runAgentStep.ts index 7ed847d5d..0520bb20a 100644 --- a/app/lib/workflows/runAgentStep.ts +++ b/app/lib/workflows/runAgentStep.ts @@ -22,6 +22,35 @@ export type RunAgentStepInput = { * is added to `experimental_context` right before each model call. */ agentContext: DurableAgentContext; + /** + * Stable id to assign to the assistant message produced by this + * step. Generated once in `runAgentWorkflow` so: + * + * - Every chunk in this step's `toUIMessageStream` carries the + * same id (the AI SDK threads it through). + * - Future multi-step iterations of the agent loop reuse the + * same id so a single conversational reply is one row in + * `chat_messages` rather than fragmenting per tool-call cycle. + * - Resume after tool-call interaction reattaches to the in- + * progress assistant message rather than spawning a new one. + * + * Mirrors open-agents' `runAgentStep(messages, originalMessages, + * messageId, ...)` signature in + * `apps/web/app/workflows/chat.ts`. + */ + assistantMessageId: string; +}; + +export type RunAgentStepResult = { + finishReason: string; + /** + * The assembled assistant message captured from + * `toUIMessageStream`'s `onFinish` callback. `undefined` if the + * stream finished without emitting one (e.g. an error path that + * short-circuits before any chunks land). Used by `runAgentWorkflow` + * to persist the final message to `chat_messages`. + */ + responseMessage: UIMessage | undefined; }; /** @@ -38,9 +67,11 @@ export type RunAgentStepInput = { * of the tool surface ports in a follow-up PR. * * @param input - Messages + selected model + writable stream + agent context. - * @returns finishReason from the model run. + * @returns `finishReason` from the model run plus the assembled + * `responseMessage` (when one was emitted) so the caller can + * persist it. */ -export async function runAgentStep(input: RunAgentStepInput): Promise<{ finishReason: string }> { +export async function runAgentStep(input: RunAgentStepInput): Promise { "use step"; console.log("[runAgentStep] start", { @@ -91,6 +122,12 @@ export async function runAgentStep(input: RunAgentStepInput): Promise<{ finishRe }), }); + // Capture the assembled assistant message via `onFinish` so the + // caller can persist it. Mirrors open-agents' `runAgentStep` which + // stashes `finishedResponseMessage` into a closure-scoped variable + // for the outer workflow body to forward into `persistAssistantMessage`. + let responseMessage: UIMessage | undefined; + // Acquire the writer once and release in `finally` so a thrown chunk // doesn't leak the lock. const writer = input.writable.getWriter(); @@ -100,7 +137,13 @@ export async function runAgentStep(input: RunAgentStepInput): Promise<{ finishRe // shape so sandbox.recoupable.com sees the same metadata when cut // over to api's /api/chat/workflow. const messageMetadata = buildMessageMetadataCallback({ modelId: input.modelId }); - for await (const part of result.toUIMessageStream({ messageMetadata })) { + for await (const part of result.toUIMessageStream({ + messageMetadata, + generateMessageId: () => input.assistantMessageId, + onFinish: ({ responseMessage: finishedResponseMessage }) => { + responseMessage = finishedResponseMessage; + }, + })) { await writer.write(part); } } finally { @@ -108,6 +151,6 @@ export async function runAgentStep(input: RunAgentStepInput): Promise<{ finishRe } const finishReason = await result.finishReason; - console.log("[runAgentStep] finish", { finishReason }); - return { finishReason }; + console.log("[runAgentStep] finish", { finishReason, hasResponseMessage: !!responseMessage }); + return { finishReason, responseMessage }; } diff --git a/app/lib/workflows/runAgentWorkflow.ts b/app/lib/workflows/runAgentWorkflow.ts index c9e5fdcd9..e4e628e96 100644 --- a/app/lib/workflows/runAgentWorkflow.ts +++ b/app/lib/workflows/runAgentWorkflow.ts @@ -1,8 +1,10 @@ import { getWorkflowMetadata, getWritable } from "workflow"; import type { UIMessage, UIMessageChunk } from "ai"; import { closeChatStream } from "@/app/lib/workflows/closeChatStream"; +import { generateAssistantMessageId } from "@/app/lib/workflows/generateAssistantMessageId"; import { runAgentStep } from "@/app/lib/workflows/runAgentStep"; import { clearChatActiveStream } from "@/lib/chat/clearChatActiveStream"; +import { persistAssistantMessage } from "@/lib/chat/persistAssistantMessage"; import type { DurableAgentContext } from "@/lib/agent/tools/AgentContext"; export type RunAgentWorkflowInput = { @@ -48,14 +50,39 @@ export async function runAgentWorkflow(input: RunAgentWorkflowInput): Promise(); + // Pick or generate a stable id for the assistant message. If the + // last message in the conversation is already an assistant message + // (we're resuming an in-progress turn after a tool-call interaction) + // reuse its id so chunks append to the same `chat_messages` row. + // Otherwise generate a fresh id once via a `"use step"` so the + // value is durable across workflow replays. Mirrors open-agents' + // pattern in `apps/web/app/workflows/chat.ts` where the id is + // generated in the workflow body and threaded into every + // `runAgentStep` call. + const latestMessage = input.messages.at(-1); + const assistantMessageId = + latestMessage?.role === "assistant" ? latestMessage.id : await generateAssistantMessageId(); + try { const result = await runAgentStep({ messages: input.messages, modelId: input.modelId, writable, agentContext: input.agentContext, + assistantMessageId, }); console.log("[runAgentWorkflow] finish", { finishReason: result.finishReason }); + + // Persist the final assistant message to `chat_messages` so a page + // refresh after the stream completes still shows the reply. Without + // this, the recoup-api cutover silently drops assistant responses — + // they stream to the client over SSE but never land in the DB. + // `persistAssistantMessage` is fire-and-forget by contract; it + // swallows its own errors so a transient DB failure here doesn't + // mark the workflow run failed. + if (result.responseMessage) { + await persistAssistantMessage(input.chatId, result.responseMessage); + } } finally { // Run two cleanup steps in parallel: // 1) `clearChatActiveStream` — CAS-gated DB clear of the chat's diff --git a/lib/chat/__tests__/persistAssistantMessage.test.ts b/lib/chat/__tests__/persistAssistantMessage.test.ts new file mode 100644 index 000000000..fc6def35e --- /dev/null +++ b/lib/chat/__tests__/persistAssistantMessage.test.ts @@ -0,0 +1,140 @@ +import { describe, it, expect, vi, beforeEach } from "vitest"; +import { persistAssistantMessage } from "@/lib/chat/persistAssistantMessage"; +import { upsertChatMessage } from "@/lib/supabase/chat_messages/upsertChatMessage"; +import { updateChat } from "@/lib/supabase/chats/updateChat"; + +vi.mock("@/lib/supabase/chat_messages/upsertChatMessage", () => ({ + upsertChatMessage: vi.fn(), +})); +vi.mock("@/lib/supabase/chats/updateChat", () => ({ + updateChat: vi.fn(), +})); + +beforeEach(() => { + vi.clearAllMocks(); + vi.mocked(updateChat).mockResolvedValue({ + ok: true, + rowsUpdated: 1, + row: null, + }); +}); + +const CHAT_ID = "11111111-1111-1111-1111-111111111111"; +const ASSISTANT_ID = "msg_abc"; + +function buildAssistantMessage(overrides: Record = {}) { + return { + id: ASSISTANT_ID, + role: "assistant", + parts: [{ type: "text", text: "Hello!" }], + ...overrides, + }; +} + +describe("persistAssistantMessage", () => { + it("upserts the assistant message row with role 'assistant'", async () => { + vi.mocked(upsertChatMessage).mockResolvedValue({ + ok: true, + row: { id: ASSISTANT_ID } as never, + isDuplicate: false, + }); + + await persistAssistantMessage(CHAT_ID, buildAssistantMessage()); + + expect(upsertChatMessage).toHaveBeenCalledWith( + expect.objectContaining({ + id: ASSISTANT_ID, + chat_id: CHAT_ID, + role: "assistant", + }), + ); + }); + + it("touches updated_at on the chat row on a fresh insert", async () => { + vi.mocked(upsertChatMessage).mockResolvedValue({ + ok: true, + row: { id: ASSISTANT_ID } as never, + isDuplicate: false, + }); + + await persistAssistantMessage(CHAT_ID, buildAssistantMessage()); + + expect(updateChat).toHaveBeenCalledWith( + { id: CHAT_ID }, + expect.objectContaining({ updated_at: expect.any(String) }), + ); + }); + + it("bumps last_assistant_message_at on a fresh insert (drives sidebar unread badge)", async () => { + vi.mocked(upsertChatMessage).mockResolvedValue({ + ok: true, + row: { id: ASSISTANT_ID } as never, + isDuplicate: false, + }); + + await persistAssistantMessage(CHAT_ID, buildAssistantMessage()); + + expect(updateChat).toHaveBeenCalledWith( + { id: CHAT_ID }, + expect.objectContaining({ + last_assistant_message_at: expect.any(String), + }), + ); + }); + + it("uses the same timestamp for updated_at and last_assistant_message_at (matches open-agents)", async () => { + vi.mocked(upsertChatMessage).mockResolvedValue({ + ok: true, + row: { id: ASSISTANT_ID } as never, + isDuplicate: false, + }); + + await persistAssistantMessage(CHAT_ID, buildAssistantMessage()); + + const updateArgs = vi.mocked(updateChat).mock.calls[0]?.[1] as { + updated_at?: string; + last_assistant_message_at?: string; + }; + expect(updateArgs.updated_at).toBeDefined(); + expect(updateArgs.last_assistant_message_at).toBe(updateArgs.updated_at); + }); + + it("does NOT touch updated_at on duplicate (workflow replay)", async () => { + vi.mocked(upsertChatMessage).mockResolvedValue({ + ok: true, + row: null, + isDuplicate: true, + }); + + await persistAssistantMessage(CHAT_ID, buildAssistantMessage()); + + expect(updateChat).not.toHaveBeenCalled(); + }); + + it("silently no-ops when the message role is not 'assistant' (guard against caller mistakes)", async () => { + await persistAssistantMessage(CHAT_ID, buildAssistantMessage({ role: "user" })); + + expect(upsertChatMessage).not.toHaveBeenCalled(); + expect(updateChat).not.toHaveBeenCalled(); + }); + + it("silently no-ops when the upsert reports a DB error (fire-and-forget contract)", async () => { + vi.mocked(upsertChatMessage).mockResolvedValue({ + ok: false, + error: "transient db error", + }); + + await expect( + persistAssistantMessage(CHAT_ID, buildAssistantMessage()), + ).resolves.toBeUndefined(); + expect(updateChat).not.toHaveBeenCalled(); + }); + + it("swallows unexpected exceptions (must not bubble up)", async () => { + vi.mocked(upsertChatMessage).mockRejectedValue(new Error("boom")); + + await expect( + persistAssistantMessage(CHAT_ID, buildAssistantMessage()), + ).resolves.toBeUndefined(); + }); +}); diff --git a/lib/chat/persistAssistantMessage.ts b/lib/chat/persistAssistantMessage.ts new file mode 100644 index 000000000..13f93c27b --- /dev/null +++ b/lib/chat/persistAssistantMessage.ts @@ -0,0 +1,73 @@ +import { upsertChatMessage } from "@/lib/supabase/chat_messages/upsertChatMessage"; +import { updateChat } from "@/lib/supabase/chats/updateChat"; + +/** + * Minimal duck-type shape we read off the assistant message. Both AI + * SDK's `UIMessage` and the in-test fixtures structurally satisfy it. + * Kept intentionally loose because the row we write to + * `chat_messages.parts` is `jsonb` — Supabase persists whatever the + * message looks like. + */ +type AssistantMessage = { + id: string; + role: string; + parts: ReadonlyArray; +}; + +/** + * Fire-and-forget persistence of the final assistant message at the + * end of a chat-workflow run. Mirrors open-agents' + * `persistAssistantMessage` step in + * `apps/web/app/workflows/chat-post-finish.ts` and closes the + * silent-data-loss gap the recoup-api cutover introduced — without + * this call the assistant response is streamed to the client but + * never written to `chat_messages`, so a page refresh after the + * stream completes wipes the message. + * + * Uses `upsertChatMessage(... { onConflict: "id", ignoreDuplicates })` + * so a workflow that's restarted (replay, recovery) doesn't + * double-insert. On a fresh insert we also bump + * `last_assistant_message_at` (drives the sidebar `hasUnread` badge + * in `getChatSummaries` — `lastAssistantMessageAt > lastReadAt`) and + * touch `updated_at` so the sidebar sort surfaces the chat. Matches + * open-agents' `updateChatAssistantActivity` which sets both columns + * to the same timestamp. + * + * Title generation lives in `persistLatestUserMessage` (the first + * user message is canonical for chat titles) — this function + * deliberately does NOT update the chat title. + * + * Errors are caught and logged. Contract is "schedule it and forget" + * — never block the workflow or surface failures to the UI. + * + * @param chatId - Target chat row. + * @param message - The assembled assistant message (typically from + * `toUIMessageStream`'s `onFinish.responseMessage`). + */ +export async function persistAssistantMessage( + chatId: string, + message: AssistantMessage, +): Promise { + "use step"; + try { + if (!message || message.role !== "assistant") return; + + const inserted = await upsertChatMessage({ + id: message.id, + chat_id: chatId, + role: "assistant", + parts: message as never, + }); + + if (!inserted.ok) return; + if (inserted.isDuplicate || inserted.row === null) return; + + const activityAt = new Date().toISOString(); + await updateChat( + { id: chatId }, + { updated_at: activityAt, last_assistant_message_at: activityAt }, + ); + } catch (error) { + console.error("[persistAssistantMessage] error:", error); + } +}