From 1e0ad75c9c2dba30e1d920b34142b053a9f769e1 Mon Sep 17 00:00:00 2001 From: Arpit Gupta Date: Tue, 26 May 2026 04:17:02 +0530 Subject: [PATCH 1/2] feat(chat-workflow): persist assistant message per step Upgrade the success-only persist (#609) to per-step persistence so a stopped or crashed turn keeps the partial reply instead of dropping it. - runAgentStep streams through createUIMessageStream and persists on every onStepFinish/onFinish (toUIMessageStream exposes only onFinish); it still returns the final responseMessage so the credits path (#612) keeps billing from its metadata. - persistAssistantMessage now overwrites the row as it grows (DO UPDATE via the restored upsertChatMessage `update` flag) and bumps last_assistant_message_at/updated_at on every persist, so a partial reply still surfaces as unread. - runAgentWorkflow drops its own persist call (now per-step) and keeps the #612 credit charge. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../workflows/__tests__/runAgentStep.test.ts | 186 ++++++++++++------ .../__tests__/runAgentWorkflow.test.ts | 36 +--- app/lib/workflows/runAgentStep.ts | 79 ++++---- app/lib/workflows/runAgentWorkflow.ts | 26 +-- .../__tests__/persistAssistantMessage.test.ts | 106 ++++------ lib/chat/persistAssistantMessage.ts | 77 +++----- .../__tests__/upsertChatMessage.test.ts | 7 + .../chat_messages/upsertChatMessage.ts | 14 +- 8 files changed, 251 insertions(+), 280 deletions(-) diff --git a/app/lib/workflows/__tests__/runAgentStep.test.ts b/app/lib/workflows/__tests__/runAgentStep.test.ts index cfb101877..d8371afda 100644 --- a/app/lib/workflows/__tests__/runAgentStep.test.ts +++ b/app/lib/workflows/__tests__/runAgentStep.test.ts @@ -1,10 +1,11 @@ import { describe, it, expect, vi, beforeEach } from "vitest"; -import { streamText } from "ai"; +import { streamText, createUIMessageStream } from "ai"; import { runAgentStep } from "@/app/lib/workflows/runAgentStep"; +import { persistAssistantMessage } from "@/lib/chat/persistAssistantMessage"; vi.mock("ai", async () => { const actual = await vi.importActual("ai"); - return { ...actual, streamText: vi.fn() }; + return { ...actual, streamText: vi.fn(), createUIMessageStream: vi.fn() }; }); // Avoid pulling in real gateway / fetch surface. @@ -12,30 +13,38 @@ vi.mock("@ai-sdk/gateway", () => ({ gateway: vi.fn((modelId: string) => ({ modelId, __mock: "gateway" })), })); +vi.mock("@/lib/chat/persistAssistantMessage", () => ({ + persistAssistantMessage: vi.fn(), +})); + +// Captures the options runAgentStep passes to createUIMessageStream so +// tests can drive its onStepFinish / onFinish callbacks directly. +type CreateOpts = { + generateId?: () => string; + onStepFinish?: (e: { responseMessage: unknown }) => unknown; + onFinish?: (e: { responseMessage: unknown }) => unknown; + execute?: (a: { writer: { write: () => void; merge: () => void; onError: undefined } }) => void; +}; +let capturedCreateOpts: CreateOpts; + function makeStreamResult(opts?: { metadataCalls?: Array; - onFinishCalls?: Array; - emittedResponseMessage?: unknown; + generateIdCalls?: Array; }) { const calls = opts?.metadataCalls ?? []; - const onFinishCalls = opts?.onFinishCalls ?? []; + const genCalls = opts?.generateIdCalls ?? []; return { - toUIMessageStream: vi.fn((streamOpts: { messageMetadata?: unknown; onFinish?: unknown }) => { - // Capture the callbacks so tests can inspect (and invoke) them - calls.push(streamOpts.messageMetadata); - onFinishCalls.push(streamOpts.onFinish); - return (async function* () { - yield { type: "start" }; - yield { type: "finish" }; - // Mirror the AI SDK contract: onFinish fires after the - // generator yields its last chunk with the assembled message. - if (typeof streamOpts.onFinish === "function" && opts?.emittedResponseMessage) { - (streamOpts.onFinish as (a: { responseMessage: unknown }) => void)({ - responseMessage: opts.emittedResponseMessage, - }); - } - })(); - }), + toUIMessageStream: vi.fn( + (streamOpts: { messageMetadata?: unknown; generateMessageId?: unknown }) => { + // Capture the callbacks so tests can inspect them. + calls.push(streamOpts.messageMetadata); + genCalls.push(streamOpts.generateMessageId); + return (async function* () { + yield { type: "start" }; + yield { type: "finish" }; + })(); + }, + ), finishReason: Promise.resolve("stop"), }; } @@ -59,6 +68,7 @@ const baseInput = { }, ], modelId: "anthropic/claude-haiku-4.5", + chatId: "chat-1", agentContext: { sandbox: { state: { type: "vercel" }, workingDirectory: "/sandbox/mono" }, }, @@ -68,6 +78,20 @@ const baseInput = { describe("runAgentStep", () => { beforeEach(() => { vi.clearAllMocks(); + // Default: capture the options, run execute (so toUIMessageStream — and + // its messageMetadata callback — is exercised), and return an empty + // stream that closes immediately so pipeTo resolves. + vi.mocked(createUIMessageStream).mockImplementation((opts: never) => { + capturedCreateOpts = opts as CreateOpts; + capturedCreateOpts.execute?.({ + writer: { write: () => {}, merge: () => {}, onError: undefined }, + }); + return new ReadableStream({ + start(controller) { + controller.close(); + }, + }) as never; + }); }); it("wires a messageMetadata callback into toUIMessageStream", async () => { @@ -103,8 +127,7 @@ describe("runAgentStep", () => { }); it("includes cwd from agentContext.sandbox in the system prompt", async () => { - const captured: unknown[] = []; - vi.mocked(streamText).mockReturnValue(makeStreamResult({ metadataCalls: captured }) as never); + vi.mocked(streamText).mockReturnValue(makeStreamResult() as never); const { stream } = makeWritable(); await runAgentStep({ @@ -125,8 +148,7 @@ describe("runAgentStep", () => { }); it("wraps tools with anthropic cacheControl on the last tool before passing to streamText", async () => { - const captured: unknown[] = []; - vi.mocked(streamText).mockReturnValue(makeStreamResult({ metadataCalls: captured }) as never); + vi.mocked(streamText).mockReturnValue(makeStreamResult() as never); const { stream } = makeWritable(); await runAgentStep({ ...baseInput, writable: stream } as never); @@ -148,8 +170,7 @@ describe("runAgentStep", () => { }); it("wires a prepareStep callback that marks the last message with cacheControl", async () => { - const captured: unknown[] = []; - vi.mocked(streamText).mockReturnValue(makeStreamResult({ metadataCalls: captured }) as never); + vi.mocked(streamText).mockReturnValue(makeStreamResult() as never); const { stream } = makeWritable(); await runAgentStep({ ...baseInput, writable: stream } as never); @@ -189,55 +210,49 @@ describe("runAgentStep", () => { expect(cb({ part: { type: "start" } })).toBeUndefined(); }); - it("wires an onFinish callback into toUIMessageStream", async () => { - const onFinishCalls: unknown[] = []; - vi.mocked(streamText).mockReturnValue(makeStreamResult({ onFinishCalls }) as never); + it("persists the assistant message on each step via onStepFinish", async () => { + vi.mocked(streamText).mockReturnValue(makeStreamResult() as never); const { stream } = makeWritable(); await runAgentStep({ ...baseInput, writable: stream } as never); - expect(onFinishCalls).toHaveLength(1); - expect(typeof onFinishCalls[0]).toBe("function"); + const msg = { id: "a1", role: "assistant", parts: [{ type: "text", text: "partial" }] }; + await capturedCreateOpts.onStepFinish?.({ responseMessage: msg }); + + expect(persistAssistantMessage).toHaveBeenCalledWith("chat-1", msg); }); - it("returns the responseMessage captured from onFinish", async () => { - const emittedResponseMessage = { - id: "assistant-msg-1", - role: "assistant", - parts: [{ type: "text", text: "Hello" }], - }; - vi.mocked(streamText).mockReturnValue(makeStreamResult({ emittedResponseMessage }) as never); + it("persists the final assistant message via onFinish", async () => { + vi.mocked(streamText).mockReturnValue(makeStreamResult() as never); const { stream } = makeWritable(); - const result = await runAgentStep({ ...baseInput, writable: stream } as never); + await runAgentStep({ ...baseInput, writable: stream } as never); - expect(result.responseMessage).toEqual(emittedResponseMessage); - expect(result.finishReason).toBe("stop"); + const msg = { id: "a1", role: "assistant", parts: [{ type: "text", text: "done" }] }; + await capturedCreateOpts.onFinish?.({ responseMessage: msg }); + + expect(persistAssistantMessage).toHaveBeenCalledWith("chat-1", msg); }); - it("returns responseMessage: undefined when onFinish never fires", async () => { - // Default makeStreamResult — no emittedResponseMessage, so onFinish is wired but never invoked - vi.mocked(streamText).mockReturnValue(makeStreamResult() as never); + it("forwards assistantMessageId into toUIMessageStream's generateMessageId (stable row id)", async () => { + const generateIdCalls: unknown[] = []; + vi.mocked(streamText).mockReturnValue(makeStreamResult({ generateIdCalls }) as never); const { stream } = makeWritable(); - const result = await runAgentStep({ ...baseInput, writable: stream } as never); + await runAgentStep({ + ...baseInput, + writable: stream, + assistantMessageId: "asst-from-workflow-xyz", + } as never); - expect(result.responseMessage).toBeUndefined(); - expect(result.finishReason).toBe("stop"); + expect(generateIdCalls).toHaveLength(1); + const gen = generateIdCalls[0] as () => string; + expect(typeof gen).toBe("function"); + expect(gen()).toBe("asst-from-workflow-xyz"); }); - it("forwards input.assistantMessageId into toUIMessageStream's generateMessageId", async () => { - const generateMessageIdCalls: unknown[] = []; - const streamResult = makeStreamResult(); - // Spy on the options passed to toUIMessageStream to grab the generateMessageId fn. - const originalToUIMessageStream = streamResult.toUIMessageStream; - streamResult.toUIMessageStream = vi.fn((streamOpts: { generateMessageId?: unknown }) => { - generateMessageIdCalls.push(streamOpts.generateMessageId); - return (originalToUIMessageStream as unknown as (o: unknown) => AsyncGenerator)( - streamOpts, - ); - }) as never; - vi.mocked(streamText).mockReturnValue(streamResult as never); + it("sets a stable generateId on the createUIMessageStream", async () => { + vi.mocked(streamText).mockReturnValue(makeStreamResult() as never); const { stream } = makeWritable(); await runAgentStep({ @@ -246,9 +261,52 @@ describe("runAgentStep", () => { assistantMessageId: "asst-from-workflow-xyz", } as never); - expect(generateMessageIdCalls).toHaveLength(1); - const gen = generateMessageIdCalls[0] as () => string; - expect(typeof gen).toBe("function"); - expect(gen()).toBe("asst-from-workflow-xyz"); + expect(typeof capturedCreateOpts.generateId).toBe("function"); + expect(capturedCreateOpts.generateId!()).toBe("asst-from-workflow-xyz"); + }); + + it("returns the finishReason from the model result", async () => { + vi.mocked(streamText).mockReturnValue(makeStreamResult() as never); + const { stream } = makeWritable(); + + const result = await runAgentStep({ ...baseInput, writable: stream } as never); + + expect(result.finishReason).toBe("stop"); + }); + + it("returns the responseMessage captured from onFinish (so the workflow can charge credits)", async () => { + const emitted = { + id: "asst-test-id", + role: "assistant", + parts: [{ type: "text", text: "Hello" }], + metadata: { totalMessageCost: 0.05 }, + }; + vi.mocked(streamText).mockReturnValue(makeStreamResult() as never); + vi.mocked(createUIMessageStream).mockImplementation((opts: never) => { + const o = opts as CreateOpts; + o.execute?.({ writer: { write: () => {}, merge: () => {}, onError: undefined } }); + // Drive onFinish so runAgentStep captures the final message. + void o.onFinish?.({ responseMessage: emitted }); + return new ReadableStream({ + start(controller) { + controller.close(); + }, + }) as never; + }); + const { stream } = makeWritable(); + + const result = await runAgentStep({ ...baseInput, writable: stream } as never); + + expect(result.responseMessage).toEqual(emitted); + }); + + it("returns responseMessage: undefined when onFinish never fires", async () => { + // Default mock never invokes onFinish. + vi.mocked(streamText).mockReturnValue(makeStreamResult() as never); + const { stream } = makeWritable(); + + const result = await runAgentStep({ ...baseInput, writable: stream } as never); + + expect(result.responseMessage).toBeUndefined(); }); }); diff --git a/app/lib/workflows/__tests__/runAgentWorkflow.test.ts b/app/lib/workflows/__tests__/runAgentWorkflow.test.ts index 19cec3b78..18ca4a589 100644 --- a/app/lib/workflows/__tests__/runAgentWorkflow.test.ts +++ b/app/lib/workflows/__tests__/runAgentWorkflow.test.ts @@ -4,7 +4,6 @@ import { runAgentStep } from "@/app/lib/workflows/runAgentStep"; import { clearChatActiveStream } from "@/lib/chat/clearChatActiveStream"; import { closeChatStream } from "@/app/lib/workflows/closeChatStream"; import { generateAssistantMessageId } from "@/app/lib/workflows/generateAssistantMessageId"; -import { persistAssistantMessage } from "@/lib/chat/persistAssistantMessage"; import { handleChatCredits } from "@/lib/credits/handleChatCredits"; vi.mock("@/app/lib/workflows/runAgentStep", () => ({ @@ -19,9 +18,6 @@ vi.mock("@/app/lib/workflows/closeChatStream", () => ({ vi.mock("@/app/lib/workflows/generateAssistantMessageId", () => ({ generateAssistantMessageId: vi.fn(), })); -vi.mock("@/lib/chat/persistAssistantMessage", () => ({ - persistAssistantMessage: vi.fn(), -})); vi.mock("@/lib/credits/handleChatCredits", () => ({ handleChatCredits: vi.fn(), })); @@ -97,24 +93,7 @@ describe("runAgentWorkflow", () => { expect(closeChatStream).toHaveBeenCalledWith(writableStub); }); - it("persists the assistant message when runAgentStep returns one", async () => { - const responseMessage = { - id: "assistant-msg-xyz", - role: "assistant", - parts: [{ type: "text", text: "Hello!" }], - }; - vi.mocked(runAgentStep).mockResolvedValue({ - finishReason: "stop", - responseMessage: responseMessage as never, - }); - - await runAgentWorkflow(baseInput); - - expect(persistAssistantMessage).toHaveBeenCalledTimes(1); - expect(persistAssistantMessage).toHaveBeenCalledWith("chat-1", responseMessage); - }); - - it("does NOT call persistAssistantMessage when runAgentStep returns no responseMessage", async () => { + it("forwards chatId to runAgentStep so it can persist the assistant message per step", async () => { vi.mocked(runAgentStep).mockResolvedValue({ finishReason: "stop", responseMessage: undefined, @@ -122,18 +101,7 @@ describe("runAgentWorkflow", () => { await runAgentWorkflow(baseInput); - expect(persistAssistantMessage).not.toHaveBeenCalled(); - }); - - it("does NOT call persistAssistantMessage when runAgentStep throws (no message to persist)", async () => { - vi.mocked(runAgentStep).mockRejectedValue(new Error("model exploded")); - - await expect(runAgentWorkflow(baseInput)).rejects.toThrow("model exploded"); - - expect(persistAssistantMessage).not.toHaveBeenCalled(); - // But cleanup steps still run via the try/finally - expect(clearChatActiveStream).toHaveBeenCalledTimes(1); - expect(closeChatStream).toHaveBeenCalledTimes(1); + expect(runAgentStep).toHaveBeenCalledWith(expect.objectContaining({ chatId: "chat-1" })); }); it("generates a fresh assistantMessageId via the step and forwards it to runAgentStep", async () => { diff --git a/app/lib/workflows/runAgentStep.ts b/app/lib/workflows/runAgentStep.ts index 0520bb20a..66f73f58f 100644 --- a/app/lib/workflows/runAgentStep.ts +++ b/app/lib/workflows/runAgentStep.ts @@ -1,4 +1,10 @@ -import { streamText, convertToModelMessages, type UIMessage, type UIMessageChunk } from "ai"; +import { + streamText, + convertToModelMessages, + createUIMessageStream, + type UIMessage, + type UIMessageChunk, +} from "ai"; import { gateway } from "@ai-sdk/gateway"; import { agentCustomInstructions } from "@/lib/chat/agentCustomInstructions"; import { buildAgentSystemPrompt } from "@/lib/chat/buildAgentSystemPrompt"; @@ -8,11 +14,14 @@ import type { AgentContext, DurableAgentContext } from "@/lib/agent/tools/AgentC import { buildMessageMetadataCallback } from "@/lib/agent/messageMetadata/buildMessageMetadataCallback"; import { addCacheControlToTools } from "@/lib/agent/contextManagement/addCacheControlToTools"; import { addCacheControlToMessages } from "@/lib/agent/contextManagement/addCacheControlToMessages"; +import { persistAssistantMessage } from "@/lib/chat/persistAssistantMessage"; export type RunAgentStepInput = { messages: UIMessage[]; modelId: string; writable: WritableStream; + /** Target chat for persisting the assistant message as it streams. */ + chatId: string; /** * The JSON-serializable agent context that survives the durable * workflow input. `runAgentStep` widens it into a full `AgentContext` @@ -44,11 +53,10 @@ export type RunAgentStepInput = { export type RunAgentStepResult = { finishReason: string; /** - * The assembled assistant message captured from - * `toUIMessageStream`'s `onFinish` callback. `undefined` if the - * stream finished without emitting one (e.g. an error path that - * short-circuits before any chunks land). Used by `runAgentWorkflow` - * to persist the final message to `chat_messages`. + * The assembled assistant message captured from the stream's `onFinish`. + * `undefined` if the stream finished without emitting one. Per-step + * persistence happens inside this function; this is returned so + * `runAgentWorkflow` can charge credits from `responseMessage.metadata`. */ responseMessage: UIMessage | undefined; }; @@ -67,9 +75,7 @@ export type RunAgentStepResult = { * of the tool surface ports in a follow-up PR. * * @param input - Messages + selected model + writable stream + agent context. - * @returns `finishReason` from the model run plus the assembled - * `responseMessage` (when one was emitted) so the caller can - * persist it. + * @returns finishReason plus the assembled assistant message. */ export async function runAgentStep(input: RunAgentStepInput): Promise { "use step"; @@ -122,33 +128,38 @@ export async function runAgentStep(input: RunAgentStepInput): Promise({ + generateId: () => input.assistantMessageId, + onStepFinish: ({ responseMessage: stepMessage }) => + persistAssistantMessage(input.chatId, stepMessage), + onFinish: ({ responseMessage: finalMessage }) => { + responseMessage = finalMessage; + return persistAssistantMessage(input.chatId, finalMessage); + }, + execute: ({ writer }) => { + writer.merge( + result.toUIMessageStream({ + messageMetadata, + generateMessageId: () => input.assistantMessageId, + }), + ); + }, + }); - // Acquire the writer once and release in `finally` so a thrown chunk - // doesn't leak the lock. - const writer = input.writable.getWriter(); - try { - // `messageMetadata` emits {modelId, usage, cost} chunks the UI - // renders as model/cost badges. Mirrors open-agents' chat workflow - // shape so sandbox.recoupable.com sees the same metadata when cut - // over to api's /api/chat/workflow. - const messageMetadata = buildMessageMetadataCallback({ modelId: input.modelId }); - for await (const part of result.toUIMessageStream({ - messageMetadata, - generateMessageId: () => input.assistantMessageId, - onFinish: ({ responseMessage: finishedResponseMessage }) => { - responseMessage = finishedResponseMessage; - }, - })) { - await writer.write(part); - } - } finally { - writer.releaseLock(); - } + // preventClose/preventAbort: runAgentWorkflow's finally owns the writable's + // lifecycle (closeChatStream), so don't close or abort it here. + await uiStream.pipeTo(input.writable, { preventClose: true, preventAbort: true }); const finishReason = await result.finishReason; console.log("[runAgentStep] finish", { finishReason, hasResponseMessage: !!responseMessage }); diff --git a/app/lib/workflows/runAgentWorkflow.ts b/app/lib/workflows/runAgentWorkflow.ts index 67eb94b24..10e61d5d2 100644 --- a/app/lib/workflows/runAgentWorkflow.ts +++ b/app/lib/workflows/runAgentWorkflow.ts @@ -4,7 +4,6 @@ import { closeChatStream } from "@/app/lib/workflows/closeChatStream"; import { generateAssistantMessageId } from "@/app/lib/workflows/generateAssistantMessageId"; import { runAgentStep } from "@/app/lib/workflows/runAgentStep"; import { clearChatActiveStream } from "@/lib/chat/clearChatActiveStream"; -import { persistAssistantMessage } from "@/lib/chat/persistAssistantMessage"; import { handleChatCredits } from "@/lib/credits/handleChatCredits"; import type { AgentMessageMetadata } from "@/lib/agent/messageMetadata/AgentMessageMetadata"; import type { DurableAgentContext } from "@/lib/agent/tools/AgentContext"; @@ -83,30 +82,19 @@ export async function runAgentWorkflow(input: RunAgentWorkflowInput): Promise ({ beforeEach(() => { vi.clearAllMocks(); - vi.mocked(updateChat).mockResolvedValue({ - ok: true, - rowsUpdated: 1, - row: null, - }); + vi.mocked(updateChat).mockResolvedValue({ ok: true, rowsUpdated: 1, row: null }); }); const CHAT_ID = "11111111-1111-1111-1111-111111111111"; @@ -31,110 +27,82 @@ function buildAssistantMessage(overrides: Record = {}) { }; } -describe("persistAssistantMessage", () => { - it("upserts the assistant message row with role 'assistant'", async () => { - vi.mocked(upsertChatMessage).mockResolvedValue({ - ok: true, - row: { id: ASSISTANT_ID } as never, - isDuplicate: false, - }); - - await persistAssistantMessage(CHAT_ID, buildAssistantMessage()); - - expect(upsertChatMessage).toHaveBeenCalledWith( - expect.objectContaining({ - id: ASSISTANT_ID, - chat_id: CHAT_ID, - role: "assistant", - }), - ); +function okUpsert() { + vi.mocked(upsertChatMessage).mockResolvedValue({ + ok: true, + row: { id: ASSISTANT_ID } as never, + isDuplicate: false, }); +} - it("touches updated_at on the chat row on a fresh insert", async () => { - vi.mocked(upsertChatMessage).mockResolvedValue({ - ok: true, - row: { id: ASSISTANT_ID } as never, - isDuplicate: false, - }); +describe("persistAssistantMessage", () => { + it("upserts the assistant row with update:true (DO UPDATE — overwrite as it grows)", async () => { + okUpsert(); + const message = buildAssistantMessage(); - await persistAssistantMessage(CHAT_ID, buildAssistantMessage()); + await persistAssistantMessage(CHAT_ID, message as never); - expect(updateChat).toHaveBeenCalledWith( - { id: CHAT_ID }, - expect.objectContaining({ updated_at: expect.any(String) }), + expect(upsertChatMessage).toHaveBeenCalledWith( + { id: ASSISTANT_ID, chat_id: CHAT_ID, role: "assistant", parts: message }, + { update: true }, ); }); - it("bumps last_assistant_message_at on a fresh insert (drives sidebar unread badge)", async () => { - vi.mocked(upsertChatMessage).mockResolvedValue({ - ok: true, - row: { id: ASSISTANT_ID } as never, - isDuplicate: false, - }); + it("bumps updated_at and last_assistant_message_at to the same timestamp on a successful persist", async () => { + okUpsert(); - await persistAssistantMessage(CHAT_ID, buildAssistantMessage()); + await persistAssistantMessage(CHAT_ID, buildAssistantMessage() as never); expect(updateChat).toHaveBeenCalledWith( { id: CHAT_ID }, expect.objectContaining({ + updated_at: expect.any(String), last_assistant_message_at: expect.any(String), }), ); - }); - - it("uses the same timestamp for updated_at and last_assistant_message_at (matches open-agents)", async () => { - vi.mocked(upsertChatMessage).mockResolvedValue({ - ok: true, - row: { id: ASSISTANT_ID } as never, - isDuplicate: false, - }); - - await persistAssistantMessage(CHAT_ID, buildAssistantMessage()); - const updateArgs = vi.mocked(updateChat).mock.calls[0]?.[1] as { updated_at?: string; last_assistant_message_at?: string; }; - expect(updateArgs.updated_at).toBeDefined(); expect(updateArgs.last_assistant_message_at).toBe(updateArgs.updated_at); }); - it("does NOT touch updated_at on duplicate (workflow replay)", async () => { - vi.mocked(upsertChatMessage).mockResolvedValue({ - ok: true, - row: null, - isDuplicate: true, - }); + it("bumps activity on every persist so a partial/stopped reply still surfaces as unread", async () => { + // Under DO UPDATE the upsert returns a row on both insert and update, so + // a per-step partial message must keep marking the chat active — gating + // the bump on "fresh insert only" would lose unread for stopped replies. + okUpsert(); - await persistAssistantMessage(CHAT_ID, buildAssistantMessage()); + await persistAssistantMessage(CHAT_ID, buildAssistantMessage() as never); + await persistAssistantMessage( + CHAT_ID, + buildAssistantMessage({ parts: [{ type: "text", text: "Hello there!" }] }) as never, + ); - expect(updateChat).not.toHaveBeenCalled(); + expect(updateChat).toHaveBeenCalledTimes(2); }); - it("silently no-ops when the message role is not 'assistant' (guard against caller mistakes)", async () => { - await persistAssistantMessage(CHAT_ID, buildAssistantMessage({ role: "user" })); + it("no-ops when the message role is not 'assistant' (guard against caller mistakes)", async () => { + await persistAssistantMessage(CHAT_ID, buildAssistantMessage({ role: "user" }) as never); expect(upsertChatMessage).not.toHaveBeenCalled(); expect(updateChat).not.toHaveBeenCalled(); }); - it("silently no-ops when the upsert reports a DB error (fire-and-forget contract)", async () => { - vi.mocked(upsertChatMessage).mockResolvedValue({ - ok: false, - error: "transient db error", - }); + it("does not bump activity when the upsert reports a DB error (fire-and-forget contract)", async () => { + vi.mocked(upsertChatMessage).mockResolvedValue({ ok: false, error: "transient db error" }); await expect( - persistAssistantMessage(CHAT_ID, buildAssistantMessage()), + persistAssistantMessage(CHAT_ID, buildAssistantMessage() as never), ).resolves.toBeUndefined(); expect(updateChat).not.toHaveBeenCalled(); }); - it("swallows unexpected exceptions (must not bubble up)", async () => { + it("swallows unexpected exceptions (must not bubble up and tear down the stream)", async () => { vi.mocked(upsertChatMessage).mockRejectedValue(new Error("boom")); await expect( - persistAssistantMessage(CHAT_ID, buildAssistantMessage()), + persistAssistantMessage(CHAT_ID, buildAssistantMessage() as never), ).resolves.toBeUndefined(); }); }); diff --git a/lib/chat/persistAssistantMessage.ts b/lib/chat/persistAssistantMessage.ts index 13f93c27b..5217950f9 100644 --- a/lib/chat/persistAssistantMessage.ts +++ b/lib/chat/persistAssistantMessage.ts @@ -1,66 +1,35 @@ +import type { UIMessage } from "ai"; import { upsertChatMessage } from "@/lib/supabase/chat_messages/upsertChatMessage"; import { updateChat } from "@/lib/supabase/chats/updateChat"; /** - * Minimal duck-type shape we read off the assistant message. Both AI - * SDK's `UIMessage` and the in-test fixtures structurally satisfy it. - * Kept intentionally loose because the row we write to - * `chat_messages.parts` is `jsonb` — Supabase persists whatever the - * message looks like. - */ -type AssistantMessage = { - id: string; - role: string; - parts: ReadonlyArray; -}; - -/** - * Fire-and-forget persistence of the final assistant message at the - * end of a chat-workflow run. Mirrors open-agents' - * `persistAssistantMessage` step in - * `apps/web/app/workflows/chat-post-finish.ts` and closes the - * silent-data-loss gap the recoup-api cutover introduced — without - * this call the assistant response is streamed to the client but - * never written to `chat_messages`, so a page refresh after the - * stream completes wipes the message. - * - * Uses `upsertChatMessage(... { onConflict: "id", ignoreDuplicates })` - * so a workflow that's restarted (replay, recovery) doesn't - * double-insert. On a fresh insert we also bump - * `last_assistant_message_at` (drives the sidebar `hasUnread` badge - * in `getChatSummaries` — `lastAssistantMessageAt > lastReadAt`) and - * touch `updated_at` so the sidebar sort surfaces the chat. Matches - * open-agents' `updateChatAssistantActivity` which sets both columns - * to the same timestamp. + * Persist the streaming assistant message, overwriting its row as it grows + * (DO UPDATE on a stable id), and bump the chat's assistant-activity + * timestamps. Called per step from `runAgentStep`, so a stopped or crashed + * turn keeps the partial reply it produced and still surfaces as unread + * (`getChatSummaries` derives `hasUnread` from + * `last_assistant_message_at > last_read_at`); `updated_at` re-sorts the + * chat to the top. Bumped on every successful persist — under DO UPDATE the + * upsert returns a row each time, so an interrupted reply isn't dropped. * - * Title generation lives in `persistLatestUserMessage` (the first - * user message is canonical for chat titles) — this function - * deliberately does NOT update the chat title. - * - * Errors are caught and logged. Contract is "schedule it and forget" - * — never block the workflow or surface failures to the UI. - * - * @param chatId - Target chat row. - * @param message - The assembled assistant message (typically from - * `toUIMessageStream`'s `onFinish.responseMessage`). + * Never throws: the AI SDK awaits stream callbacks un-guarded, so an + * escaping error would tear down the client stream. Errors are logged. */ -export async function persistAssistantMessage( - chatId: string, - message: AssistantMessage, -): Promise { - "use step"; +export async function persistAssistantMessage(chatId: string, message: UIMessage): Promise { try { - if (!message || message.role !== "assistant") return; + if (message?.role !== "assistant") return; - const inserted = await upsertChatMessage({ - id: message.id, - chat_id: chatId, - role: "assistant", - parts: message as never, - }); + const upserted = await upsertChatMessage( + { + id: message.id, + chat_id: chatId, + role: "assistant", + parts: message as never, + }, + { update: true }, + ); - if (!inserted.ok) return; - if (inserted.isDuplicate || inserted.row === null) return; + if (!upserted.ok) return; const activityAt = new Date().toISOString(); await updateChat( diff --git a/lib/supabase/chat_messages/__tests__/upsertChatMessage.test.ts b/lib/supabase/chat_messages/__tests__/upsertChatMessage.test.ts index 0ea559058..cd770f7c6 100644 --- a/lib/supabase/chat_messages/__tests__/upsertChatMessage.test.ts +++ b/lib/supabase/chat_messages/__tests__/upsertChatMessage.test.ts @@ -38,6 +38,13 @@ describe("upsertChatMessage", () => { expect(result).toEqual({ ok: true, row: null, isDuplicate: true }); }); + it("passes ignoreDuplicates:false when update:true (DO UPDATE — overwrite as it grows)", async () => { + maybeSingleChain.mockResolvedValue({ data, error: null }); + const result = await upsertChatMessage(data, { update: true }); + expect(result).toEqual({ ok: true, row: data, isDuplicate: false }); + expect(upsertChain).toHaveBeenCalledWith(data, { onConflict: "id", ignoreDuplicates: false }); + }); + it("returns ok:false with error on Supabase failure (distinct from duplicate)", async () => { maybeSingleChain.mockResolvedValue({ data: null, error: { message: "down" } }); const result = await upsertChatMessage(data); diff --git a/lib/supabase/chat_messages/upsertChatMessage.ts b/lib/supabase/chat_messages/upsertChatMessage.ts index d98b9b343..82e029e89 100644 --- a/lib/supabase/chat_messages/upsertChatMessage.ts +++ b/lib/supabase/chat_messages/upsertChatMessage.ts @@ -13,18 +13,20 @@ export type UpsertChatMessageResult = | { ok: false; error: string }; /** - * Insert-or-skip a single chat message row. Wraps Supabase upsert with - * `ignoreDuplicates: true` on the `id` primary key, but returns a - * discriminated result so callers can tell "duplicate skipped" apart from - * "DB error" — the previous helper returned `null` for both, which made - * callers silently swallow operational failures. + * Upsert a single chat message on the `id` primary key. `update: false` + * (default) → DO NOTHING (write-once, e.g. the user message); `update: true` + * → DO UPDATE (overwrite, e.g. the assistant message as it grows per step). + * Returns a discriminated result so callers can tell "duplicate skipped" + * apart from "DB error" — the previous helper returned `null` for both, + * which made callers silently swallow operational failures. */ export async function upsertChatMessage( data: TablesInsert<"chat_messages">, + { update = false }: { update?: boolean } = {}, ): Promise { const { data: row, error } = await supabase .from("chat_messages") - .upsert(data, { onConflict: "id", ignoreDuplicates: true }) + .upsert(data, { onConflict: "id", ignoreDuplicates: !update }) .select() .maybeSingle(); From b01ec03a5dcdbf5755cf053c55906d5920b6a3d6 Mon Sep 17 00:00:00 2001 From: Sweets Sweetman Date: Mon, 25 May 2026 19:28:13 -0500 Subject: [PATCH 2/2] refactor(workflow): spread input into runAgentStep (KISS) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Per review feedback on PR #603 — drop the manual field-by-field destructure and forward the workflow input object directly. Co-Authored-By: Claude Opus 4.7 (1M context) --- app/lib/workflows/runAgentWorkflow.ts | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/app/lib/workflows/runAgentWorkflow.ts b/app/lib/workflows/runAgentWorkflow.ts index 10e61d5d2..981d0cf2a 100644 --- a/app/lib/workflows/runAgentWorkflow.ts +++ b/app/lib/workflows/runAgentWorkflow.ts @@ -79,11 +79,8 @@ export async function runAgentWorkflow(input: RunAgentWorkflowInput): Promise