From efb0172faa23c3610ffe1c3cb490e5462a041fb2 Mon Sep 17 00:00:00 2001 From: "sweetman.eth" Date: Mon, 25 May 2026 13:49:36 -0500 Subject: [PATCH] fix(chat-workflow): persist the assistant message after a successful run (#609) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * fix(chat-workflow): persist the assistant message after a successful run Closes the silent-data-loss gap that the open-agents → recoup-api cutover introduced: the chat workflow streamed the final assistant message to the client over SSE but never wrote it to `chat_messages`, so a page refresh after a successful exchange wiped the reply. Changes: - New `lib/chat/persistAssistantMessage.ts` step (mirrors open-agents' `app/workflows/chat-post-finish.ts` helper of the same name). Fire-and-forget upsert + chat `updated_at` touch on fresh inserts; idempotent on workflow replay; never throws. - `runAgentStep` now wires an `onFinish` callback into `toUIMessageStream` to capture the assembled assistant message, and returns it alongside `finishReason` as part of the new `RunAgentStepResult` type. - `runAgentWorkflow` calls `persistAssistantMessage(chatId, responseMessage)` after a successful `runAgentStep` (in the try block, BEFORE the existing `clearChatActiveStream` + `closeChatStream` finally). On throw, no message is persisted (nothing was generated); cleanup still runs. Tests: - `persistAssistantMessage.test.ts` — 6 cases (insert + touch, duplicate skip, wrong-role guard, DB-error swallow, exception swallow, role assertion). - `runAgentStep.test.ts` — 3 new cases (onFinish wired, captured responseMessage returned, undefined when onFinish never fires). - `runAgentWorkflow.test.ts` — 3 new cases (persist called on success, not called when responseMessage undefined, not called on throw while cleanup still runs). Full suite: 3159 → 3171 passing. Tracking: #605 (Tier 1, item 1) Co-Authored-By: Claude Opus 4.7 (1M context) * fix(chat-workflow): loosen AssistantMessage type to accept UIMessage The over-strict `& Record` intersection on the outer shape required an index signature that `UIMessage` from `ai` doesn't carry, so wiring runAgentStep's UIMessage return into persistAssistantMessage failed the Vercel build with TS2345. Switched to a minimal duck-typed shape (id/role/parts) — matches both UIMessage and the in-test fixtures structurally. The `chat_messages.parts` column is jsonb so persistence doesn't care about the part subtypes. Co-Authored-By: Claude Opus 4.7 (1M context) * fix(chat-workflow): mark persistAssistantMessage as a "use step" Vercel Workflow blocks `fetch()` in workflow-body code; the Supabase JS client uses fetch under the hood, so `upsertChatMessage` inside `persistAssistantMessage` failed at runtime with: Global "fetch" is unavailable in workflow functions. Use the "fetch" step function from "workflow" to make HTTP requests. `"use step"` directive moves the function into step-context where fetch is legal. Mirrors open-agents' `persistAssistantMessage` step in `app/workflows/chat-post-finish.ts` (which carries the same directive). Caught via runtime log inspection on the PR preview before merge. Co-Authored-By: Claude Opus 4.7 (1M context) * debug(chat-workflow): TEMP diagnostic logs in persistAssistantMessage Hard-refresh of a chat that ran on PR #609's preview showed the assistant message NOT in chat_messages — meaning silence in the existing error log is NOT the same as "row was written." Adding explicit logs at entry, after upsert, and after updateChat so the runtime tail can disambiguate: - "skip: not assistant role" branch - upsert result shape (ok / isDuplicate / rowPresent) - "persisted + touched chat" success line Will be reverted before merge. Co-Authored-By: Claude Opus 4.7 (1M context) * fix(chat-workflow): pass generateMessageId to toUIMessageStream Diagnostic logs revealed every assistant message was arriving at persistAssistantMessage with `messageId: ''` — the AI SDK's default when `generateMessageId` isn't provided. Supabase's `chat_messages.id` PK then treated every workflow run after the first as a duplicate (`onConflict: "id", ignoreDuplicates: true` → isDuplicate: true, rowPresent: false) so no assistant row landed. Generating a stable id once per `runAgentStep` invocation via `generateId()` from `ai`, then plumbing it into `result.toUIMessageStream({ generateMessageId: () => ... })` so: - the streamed chunks carry the id (existing wire format), - `onFinish.responseMessage.id` carries the id, - `persistAssistantMessage` sees a real id and the upsert lands a fresh row. Co-Authored-By: Claude Opus 4.7 (1M context) * refactor(chat-workflow): move assistantMessageId generation to workflow body Match open-agents' structural pattern instead of generating the id inline inside runAgentStep. Rationale (which I should have applied the first time, per review feedback): 1. **Multi-step support** — when the Tier 2 outer loop lands, each runAgentStep call needs the SAME assistantMessageId so chunks accumulate under one chat_messages row instead of fragmenting per tool-call iteration. Generating inside the step gives every call a fresh id; generating in the workflow body and threading through makes the upgrade path one-line. 2. **Resume-after-tool-call** — open-agents reuses the latest message's id when `latestMessage.role === "assistant"` so the in-progress assistant turn re-attaches instead of starting a new row. Ported now to avoid a future surprise. 3. **Determinism** — `generateId()` is non-deterministic; the workflow body's WDK constraint forbids that. Wrapping it in a `"use step"` (`generateAssistantMessageId.ts`) makes the value durable across workflow replays. Changes: - New `app/lib/workflows/generateAssistantMessageId.ts` step (mirrors open-agents' local `generateId` step in `apps/web/app/workflows/chat.ts`). - `RunAgentStepInput` gains `assistantMessageId: string`. The inline `generateId()` call is removed. - `runAgentWorkflow` reads `latestMessage`; reuses its id when it's an assistant message, otherwise awaits the step. Threads the result into `runAgentStep`. - Tests: 2 new for the step, 1 new for runAgentStep forwarding, 2 new for the resume-aware branch in runAgentWorkflow. Co-Authored-By: Claude Opus 4.7 (1M context) * chore(chat-workflow): revert temp diagnostic logs in persistAssistantMessage Logs served their purpose — surfaced the empty-messageId bug (fixed in 8974a37e by threading a workflow-generated id through toUIMessageStream's generateMessageId). UI verification on the PR preview confirmed the assistant row now persists. Reverting the debug logs so production runtime stays quiet. Co-Authored-By: Claude Opus 4.7 (1M context) * fix(chat-workflow): bump last_assistant_message_at on persist (unread badge parity) Match open-agents' `updateChatAssistantActivity` which sets BOTH `updated_at` and `last_assistant_message_at` to the same timestamp. The recoup-api sidebar's `hasUnread` badge is computed in `lib/sessions/chats/getChatSummaries.ts` as `lastAssistantMessageAt > lastReadAt`, mirroring open-agents' identical query in `apps/web/lib/db/sessions.ts:201`. Without this column bump, an assistant message persisted by the workflow streams to the client, lands in `chat_messages`, but never lights up the unread badge for any other tabs/devices the user has open. The column already exists in `api`'s `chats` schema and `updateChat` already accepts it via `ChatMutableFields` — this is purely a "we forgot to set it" fix. Added two new unit tests: - bumps `last_assistant_message_at` on fresh insert - uses the same timestamp for both columns Co-Authored-By: Claude Opus 4.7 (1M context) * chore: format-fix workflow files (prettier --write) Resolves the format/lint CI failures on df312dbc — purely whitespace collapsing per the repo's prettier config (no behavior change). Co-Authored-By: Claude Opus 4.7 (1M context) --------- Co-authored-by: Claude Opus 4.7 (1M context) --- .../generateAssistantMessageId.test.ts | 20 +++ .../workflows/__tests__/runAgentStep.test.ts | 83 ++++++++++- .../__tests__/runAgentWorkflow.test.ts | 102 ++++++++++++- .../workflows/generateAssistantMessageId.ts | 19 +++ app/lib/workflows/runAgentStep.ts | 53 ++++++- app/lib/workflows/runAgentWorkflow.ts | 27 ++++ .../__tests__/persistAssistantMessage.test.ts | 140 ++++++++++++++++++ lib/chat/persistAssistantMessage.ts | 73 +++++++++ 8 files changed, 506 insertions(+), 11 deletions(-) create mode 100644 app/lib/workflows/__tests__/generateAssistantMessageId.test.ts create mode 100644 app/lib/workflows/generateAssistantMessageId.ts create mode 100644 lib/chat/__tests__/persistAssistantMessage.test.ts create mode 100644 lib/chat/persistAssistantMessage.ts diff --git a/app/lib/workflows/__tests__/generateAssistantMessageId.test.ts b/app/lib/workflows/__tests__/generateAssistantMessageId.test.ts new file mode 100644 index 000000000..c83cebfb1 --- /dev/null +++ b/app/lib/workflows/__tests__/generateAssistantMessageId.test.ts @@ -0,0 +1,20 @@ +import { describe, it, expect, vi } from "vitest"; +import { generateAssistantMessageId } from "@/app/lib/workflows/generateAssistantMessageId"; + +vi.mock("ai", async () => { + const actual = await vi.importActual("ai"); + return { ...actual, generateId: vi.fn(() => "generated-by-mock") }; +}); + +describe("generateAssistantMessageId", () => { + it("returns the value from ai's generateId()", async () => { + const id = await generateAssistantMessageId(); + expect(id).toBe("generated-by-mock"); + }); + + it("returns a string", async () => { + const id = await generateAssistantMessageId(); + expect(typeof id).toBe("string"); + expect(id.length).toBeGreaterThan(0); + }); +}); diff --git a/app/lib/workflows/__tests__/runAgentStep.test.ts b/app/lib/workflows/__tests__/runAgentStep.test.ts index b2e90475b..cfb101877 100644 --- a/app/lib/workflows/__tests__/runAgentStep.test.ts +++ b/app/lib/workflows/__tests__/runAgentStep.test.ts @@ -12,15 +12,28 @@ vi.mock("@ai-sdk/gateway", () => ({ gateway: vi.fn((modelId: string) => ({ modelId, __mock: "gateway" })), })); -function makeStreamResult(opts?: { metadataCalls?: Array }) { +function makeStreamResult(opts?: { + metadataCalls?: Array; + onFinishCalls?: Array; + emittedResponseMessage?: unknown; +}) { const calls = opts?.metadataCalls ?? []; + const onFinishCalls = opts?.onFinishCalls ?? []; return { - toUIMessageStream: vi.fn((streamOpts: { messageMetadata?: unknown }) => { - // Capture the callback so tests can inspect it + toUIMessageStream: vi.fn((streamOpts: { messageMetadata?: unknown; onFinish?: unknown }) => { + // Capture the callbacks so tests can inspect (and invoke) them calls.push(streamOpts.messageMetadata); + onFinishCalls.push(streamOpts.onFinish); return (async function* () { yield { type: "start" }; yield { type: "finish" }; + // Mirror the AI SDK contract: onFinish fires after the + // generator yields its last chunk with the assembled message. + if (typeof streamOpts.onFinish === "function" && opts?.emittedResponseMessage) { + (streamOpts.onFinish as (a: { responseMessage: unknown }) => void)({ + responseMessage: opts.emittedResponseMessage, + }); + } })(); }), finishReason: Promise.resolve("stop"), @@ -49,6 +62,7 @@ const baseInput = { agentContext: { sandbox: { state: { type: "vercel" }, workingDirectory: "/sandbox/mono" }, }, + assistantMessageId: "asst-test-id", }; describe("runAgentStep", () => { @@ -174,4 +188,67 @@ describe("runAgentStep", () => { expect(cb({ part: { type: "text-delta" } })).toBeUndefined(); expect(cb({ part: { type: "start" } })).toBeUndefined(); }); + + it("wires an onFinish callback into toUIMessageStream", async () => { + const onFinishCalls: unknown[] = []; + vi.mocked(streamText).mockReturnValue(makeStreamResult({ onFinishCalls }) as never); + const { stream } = makeWritable(); + + await runAgentStep({ ...baseInput, writable: stream } as never); + + expect(onFinishCalls).toHaveLength(1); + expect(typeof onFinishCalls[0]).toBe("function"); + }); + + it("returns the responseMessage captured from onFinish", async () => { + const emittedResponseMessage = { + id: "assistant-msg-1", + role: "assistant", + parts: [{ type: "text", text: "Hello" }], + }; + vi.mocked(streamText).mockReturnValue(makeStreamResult({ emittedResponseMessage }) as never); + const { stream } = makeWritable(); + + const result = await runAgentStep({ ...baseInput, writable: stream } as never); + + expect(result.responseMessage).toEqual(emittedResponseMessage); + expect(result.finishReason).toBe("stop"); + }); + + it("returns responseMessage: undefined when onFinish never fires", async () => { + // Default makeStreamResult — no emittedResponseMessage, so onFinish is wired but never invoked + vi.mocked(streamText).mockReturnValue(makeStreamResult() as never); + const { stream } = makeWritable(); + + const result = await runAgentStep({ ...baseInput, writable: stream } as never); + + expect(result.responseMessage).toBeUndefined(); + expect(result.finishReason).toBe("stop"); + }); + + it("forwards input.assistantMessageId into toUIMessageStream's generateMessageId", async () => { + const generateMessageIdCalls: unknown[] = []; + const streamResult = makeStreamResult(); + // Spy on the options passed to toUIMessageStream to grab the generateMessageId fn. + const originalToUIMessageStream = streamResult.toUIMessageStream; + streamResult.toUIMessageStream = vi.fn((streamOpts: { generateMessageId?: unknown }) => { + generateMessageIdCalls.push(streamOpts.generateMessageId); + return (originalToUIMessageStream as unknown as (o: unknown) => AsyncGenerator)( + streamOpts, + ); + }) as never; + vi.mocked(streamText).mockReturnValue(streamResult as never); + const { stream } = makeWritable(); + + await runAgentStep({ + ...baseInput, + writable: stream, + assistantMessageId: "asst-from-workflow-xyz", + } as never); + + expect(generateMessageIdCalls).toHaveLength(1); + const gen = generateMessageIdCalls[0] as () => string; + expect(typeof gen).toBe("function"); + expect(gen()).toBe("asst-from-workflow-xyz"); + }); }); diff --git a/app/lib/workflows/__tests__/runAgentWorkflow.test.ts b/app/lib/workflows/__tests__/runAgentWorkflow.test.ts index d061abbce..3e59ffc2d 100644 --- a/app/lib/workflows/__tests__/runAgentWorkflow.test.ts +++ b/app/lib/workflows/__tests__/runAgentWorkflow.test.ts @@ -3,6 +3,8 @@ import { runAgentWorkflow } from "@/app/lib/workflows/runAgentWorkflow"; import { runAgentStep } from "@/app/lib/workflows/runAgentStep"; import { clearChatActiveStream } from "@/lib/chat/clearChatActiveStream"; import { closeChatStream } from "@/app/lib/workflows/closeChatStream"; +import { generateAssistantMessageId } from "@/app/lib/workflows/generateAssistantMessageId"; +import { persistAssistantMessage } from "@/lib/chat/persistAssistantMessage"; vi.mock("@/app/lib/workflows/runAgentStep", () => ({ runAgentStep: vi.fn(), @@ -13,6 +15,12 @@ vi.mock("@/lib/chat/clearChatActiveStream", () => ({ vi.mock("@/app/lib/workflows/closeChatStream", () => ({ closeChatStream: vi.fn(), })); +vi.mock("@/app/lib/workflows/generateAssistantMessageId", () => ({ + generateAssistantMessageId: vi.fn(), +})); +vi.mock("@/lib/chat/persistAssistantMessage", () => ({ + persistAssistantMessage: vi.fn(), +})); // Captured writable stub so tests can assert closeChatStream got the // same instance the workflow body holds. const writableStub = new WritableStream(); @@ -26,7 +34,10 @@ vi.mock("workflow", () => ({ })), })); -beforeEach(() => vi.clearAllMocks()); +beforeEach(() => { + vi.clearAllMocks(); + vi.mocked(generateAssistantMessageId).mockResolvedValue("asst-fresh-id"); +}); const baseInput = { messages: [{ id: "m1", role: "user", parts: [{ type: "text", text: "hi" }] } as never], @@ -40,7 +51,10 @@ const baseInput = { describe("runAgentWorkflow", () => { it("clears active_stream_id after a successful run, using the workflow's own runId", async () => { - vi.mocked(runAgentStep).mockResolvedValue({ finishReason: "stop" }); + vi.mocked(runAgentStep).mockResolvedValue({ + finishReason: "stop", + responseMessage: undefined, + }); await runAgentWorkflow(baseInput); @@ -58,7 +72,10 @@ describe("runAgentWorkflow", () => { }); it("explicitly closes the chat writable after a successful run so SSE ends promptly", async () => { - vi.mocked(runAgentStep).mockResolvedValue({ finishReason: "stop" }); + vi.mocked(runAgentStep).mockResolvedValue({ + finishReason: "stop", + responseMessage: undefined, + }); await runAgentWorkflow(baseInput); @@ -74,4 +91,83 @@ describe("runAgentWorkflow", () => { expect(closeChatStream).toHaveBeenCalledTimes(1); expect(closeChatStream).toHaveBeenCalledWith(writableStub); }); + + it("persists the assistant message when runAgentStep returns one", async () => { + const responseMessage = { + id: "assistant-msg-xyz", + role: "assistant", + parts: [{ type: "text", text: "Hello!" }], + }; + vi.mocked(runAgentStep).mockResolvedValue({ + finishReason: "stop", + responseMessage: responseMessage as never, + }); + + await runAgentWorkflow(baseInput); + + expect(persistAssistantMessage).toHaveBeenCalledTimes(1); + expect(persistAssistantMessage).toHaveBeenCalledWith("chat-1", responseMessage); + }); + + it("does NOT call persistAssistantMessage when runAgentStep returns no responseMessage", async () => { + vi.mocked(runAgentStep).mockResolvedValue({ + finishReason: "stop", + responseMessage: undefined, + }); + + await runAgentWorkflow(baseInput); + + expect(persistAssistantMessage).not.toHaveBeenCalled(); + }); + + it("does NOT call persistAssistantMessage when runAgentStep throws (no message to persist)", async () => { + vi.mocked(runAgentStep).mockRejectedValue(new Error("model exploded")); + + await expect(runAgentWorkflow(baseInput)).rejects.toThrow("model exploded"); + + expect(persistAssistantMessage).not.toHaveBeenCalled(); + // But cleanup steps still run via the try/finally + expect(clearChatActiveStream).toHaveBeenCalledTimes(1); + expect(closeChatStream).toHaveBeenCalledTimes(1); + }); + + it("generates a fresh assistantMessageId via the step and forwards it to runAgentStep", async () => { + vi.mocked(runAgentStep).mockResolvedValue({ + finishReason: "stop", + responseMessage: undefined, + }); + + await runAgentWorkflow(baseInput); + + expect(generateAssistantMessageId).toHaveBeenCalledTimes(1); + expect(runAgentStep).toHaveBeenCalledWith( + expect.objectContaining({ assistantMessageId: "asst-fresh-id" }), + ); + }); + + it("reuses the latest assistant message id when resuming a tool-call turn (no fresh generation)", async () => { + vi.mocked(runAgentStep).mockResolvedValue({ + finishReason: "stop", + responseMessage: undefined, + }); + + const resumingInput = { + ...baseInput, + messages: [ + { id: "m1", role: "user", parts: [{ type: "text", text: "go" }] }, + { + id: "asst-in-progress", + role: "assistant", + parts: [{ type: "text", text: "thinking..." }], + }, + ] as never, + }; + + await runAgentWorkflow(resumingInput); + + expect(generateAssistantMessageId).not.toHaveBeenCalled(); + expect(runAgentStep).toHaveBeenCalledWith( + expect.objectContaining({ assistantMessageId: "asst-in-progress" }), + ); + }); }); diff --git a/app/lib/workflows/generateAssistantMessageId.ts b/app/lib/workflows/generateAssistantMessageId.ts new file mode 100644 index 000000000..d33d0ce65 --- /dev/null +++ b/app/lib/workflows/generateAssistantMessageId.ts @@ -0,0 +1,19 @@ +import { generateId } from "ai"; + +/** + * Vercel Workflow `"use step"` that returns a fresh message id. + * + * Wrapped as a step rather than inlined into the workflow body + * because `generateId()` is non-deterministic — calling it directly + * from workflow code would break the durable-replay contract (a + * replay would generate a *different* id and diverge from the + * captured event log). As a step, the result is captured in the + * event log once and reused on every replay. + * + * Mirrors open-agents' `generateId` step in + * `apps/web/app/workflows/chat.ts`. + */ +export async function generateAssistantMessageId(): Promise { + "use step"; + return generateId(); +} diff --git a/app/lib/workflows/runAgentStep.ts b/app/lib/workflows/runAgentStep.ts index 7ed847d5d..0520bb20a 100644 --- a/app/lib/workflows/runAgentStep.ts +++ b/app/lib/workflows/runAgentStep.ts @@ -22,6 +22,35 @@ export type RunAgentStepInput = { * is added to `experimental_context` right before each model call. */ agentContext: DurableAgentContext; + /** + * Stable id to assign to the assistant message produced by this + * step. Generated once in `runAgentWorkflow` so: + * + * - Every chunk in this step's `toUIMessageStream` carries the + * same id (the AI SDK threads it through). + * - Future multi-step iterations of the agent loop reuse the + * same id so a single conversational reply is one row in + * `chat_messages` rather than fragmenting per tool-call cycle. + * - Resume after tool-call interaction reattaches to the in- + * progress assistant message rather than spawning a new one. + * + * Mirrors open-agents' `runAgentStep(messages, originalMessages, + * messageId, ...)` signature in + * `apps/web/app/workflows/chat.ts`. + */ + assistantMessageId: string; +}; + +export type RunAgentStepResult = { + finishReason: string; + /** + * The assembled assistant message captured from + * `toUIMessageStream`'s `onFinish` callback. `undefined` if the + * stream finished without emitting one (e.g. an error path that + * short-circuits before any chunks land). Used by `runAgentWorkflow` + * to persist the final message to `chat_messages`. + */ + responseMessage: UIMessage | undefined; }; /** @@ -38,9 +67,11 @@ export type RunAgentStepInput = { * of the tool surface ports in a follow-up PR. * * @param input - Messages + selected model + writable stream + agent context. - * @returns finishReason from the model run. + * @returns `finishReason` from the model run plus the assembled + * `responseMessage` (when one was emitted) so the caller can + * persist it. */ -export async function runAgentStep(input: RunAgentStepInput): Promise<{ finishReason: string }> { +export async function runAgentStep(input: RunAgentStepInput): Promise { "use step"; console.log("[runAgentStep] start", { @@ -91,6 +122,12 @@ export async function runAgentStep(input: RunAgentStepInput): Promise<{ finishRe }), }); + // Capture the assembled assistant message via `onFinish` so the + // caller can persist it. Mirrors open-agents' `runAgentStep` which + // stashes `finishedResponseMessage` into a closure-scoped variable + // for the outer workflow body to forward into `persistAssistantMessage`. + let responseMessage: UIMessage | undefined; + // Acquire the writer once and release in `finally` so a thrown chunk // doesn't leak the lock. const writer = input.writable.getWriter(); @@ -100,7 +137,13 @@ export async function runAgentStep(input: RunAgentStepInput): Promise<{ finishRe // shape so sandbox.recoupable.com sees the same metadata when cut // over to api's /api/chat/workflow. const messageMetadata = buildMessageMetadataCallback({ modelId: input.modelId }); - for await (const part of result.toUIMessageStream({ messageMetadata })) { + for await (const part of result.toUIMessageStream({ + messageMetadata, + generateMessageId: () => input.assistantMessageId, + onFinish: ({ responseMessage: finishedResponseMessage }) => { + responseMessage = finishedResponseMessage; + }, + })) { await writer.write(part); } } finally { @@ -108,6 +151,6 @@ export async function runAgentStep(input: RunAgentStepInput): Promise<{ finishRe } const finishReason = await result.finishReason; - console.log("[runAgentStep] finish", { finishReason }); - return { finishReason }; + console.log("[runAgentStep] finish", { finishReason, hasResponseMessage: !!responseMessage }); + return { finishReason, responseMessage }; } diff --git a/app/lib/workflows/runAgentWorkflow.ts b/app/lib/workflows/runAgentWorkflow.ts index c9e5fdcd9..e4e628e96 100644 --- a/app/lib/workflows/runAgentWorkflow.ts +++ b/app/lib/workflows/runAgentWorkflow.ts @@ -1,8 +1,10 @@ import { getWorkflowMetadata, getWritable } from "workflow"; import type { UIMessage, UIMessageChunk } from "ai"; import { closeChatStream } from "@/app/lib/workflows/closeChatStream"; +import { generateAssistantMessageId } from "@/app/lib/workflows/generateAssistantMessageId"; import { runAgentStep } from "@/app/lib/workflows/runAgentStep"; import { clearChatActiveStream } from "@/lib/chat/clearChatActiveStream"; +import { persistAssistantMessage } from "@/lib/chat/persistAssistantMessage"; import type { DurableAgentContext } from "@/lib/agent/tools/AgentContext"; export type RunAgentWorkflowInput = { @@ -48,14 +50,39 @@ export async function runAgentWorkflow(input: RunAgentWorkflowInput): Promise(); + // Pick or generate a stable id for the assistant message. If the + // last message in the conversation is already an assistant message + // (we're resuming an in-progress turn after a tool-call interaction) + // reuse its id so chunks append to the same `chat_messages` row. + // Otherwise generate a fresh id once via a `"use step"` so the + // value is durable across workflow replays. Mirrors open-agents' + // pattern in `apps/web/app/workflows/chat.ts` where the id is + // generated in the workflow body and threaded into every + // `runAgentStep` call. + const latestMessage = input.messages.at(-1); + const assistantMessageId = + latestMessage?.role === "assistant" ? latestMessage.id : await generateAssistantMessageId(); + try { const result = await runAgentStep({ messages: input.messages, modelId: input.modelId, writable, agentContext: input.agentContext, + assistantMessageId, }); console.log("[runAgentWorkflow] finish", { finishReason: result.finishReason }); + + // Persist the final assistant message to `chat_messages` so a page + // refresh after the stream completes still shows the reply. Without + // this, the recoup-api cutover silently drops assistant responses — + // they stream to the client over SSE but never land in the DB. + // `persistAssistantMessage` is fire-and-forget by contract; it + // swallows its own errors so a transient DB failure here doesn't + // mark the workflow run failed. + if (result.responseMessage) { + await persistAssistantMessage(input.chatId, result.responseMessage); + } } finally { // Run two cleanup steps in parallel: // 1) `clearChatActiveStream` — CAS-gated DB clear of the chat's diff --git a/lib/chat/__tests__/persistAssistantMessage.test.ts b/lib/chat/__tests__/persistAssistantMessage.test.ts new file mode 100644 index 000000000..fc6def35e --- /dev/null +++ b/lib/chat/__tests__/persistAssistantMessage.test.ts @@ -0,0 +1,140 @@ +import { describe, it, expect, vi, beforeEach } from "vitest"; +import { persistAssistantMessage } from "@/lib/chat/persistAssistantMessage"; +import { upsertChatMessage } from "@/lib/supabase/chat_messages/upsertChatMessage"; +import { updateChat } from "@/lib/supabase/chats/updateChat"; + +vi.mock("@/lib/supabase/chat_messages/upsertChatMessage", () => ({ + upsertChatMessage: vi.fn(), +})); +vi.mock("@/lib/supabase/chats/updateChat", () => ({ + updateChat: vi.fn(), +})); + +beforeEach(() => { + vi.clearAllMocks(); + vi.mocked(updateChat).mockResolvedValue({ + ok: true, + rowsUpdated: 1, + row: null, + }); +}); + +const CHAT_ID = "11111111-1111-1111-1111-111111111111"; +const ASSISTANT_ID = "msg_abc"; + +function buildAssistantMessage(overrides: Record = {}) { + return { + id: ASSISTANT_ID, + role: "assistant", + parts: [{ type: "text", text: "Hello!" }], + ...overrides, + }; +} + +describe("persistAssistantMessage", () => { + it("upserts the assistant message row with role 'assistant'", async () => { + vi.mocked(upsertChatMessage).mockResolvedValue({ + ok: true, + row: { id: ASSISTANT_ID } as never, + isDuplicate: false, + }); + + await persistAssistantMessage(CHAT_ID, buildAssistantMessage()); + + expect(upsertChatMessage).toHaveBeenCalledWith( + expect.objectContaining({ + id: ASSISTANT_ID, + chat_id: CHAT_ID, + role: "assistant", + }), + ); + }); + + it("touches updated_at on the chat row on a fresh insert", async () => { + vi.mocked(upsertChatMessage).mockResolvedValue({ + ok: true, + row: { id: ASSISTANT_ID } as never, + isDuplicate: false, + }); + + await persistAssistantMessage(CHAT_ID, buildAssistantMessage()); + + expect(updateChat).toHaveBeenCalledWith( + { id: CHAT_ID }, + expect.objectContaining({ updated_at: expect.any(String) }), + ); + }); + + it("bumps last_assistant_message_at on a fresh insert (drives sidebar unread badge)", async () => { + vi.mocked(upsertChatMessage).mockResolvedValue({ + ok: true, + row: { id: ASSISTANT_ID } as never, + isDuplicate: false, + }); + + await persistAssistantMessage(CHAT_ID, buildAssistantMessage()); + + expect(updateChat).toHaveBeenCalledWith( + { id: CHAT_ID }, + expect.objectContaining({ + last_assistant_message_at: expect.any(String), + }), + ); + }); + + it("uses the same timestamp for updated_at and last_assistant_message_at (matches open-agents)", async () => { + vi.mocked(upsertChatMessage).mockResolvedValue({ + ok: true, + row: { id: ASSISTANT_ID } as never, + isDuplicate: false, + }); + + await persistAssistantMessage(CHAT_ID, buildAssistantMessage()); + + const updateArgs = vi.mocked(updateChat).mock.calls[0]?.[1] as { + updated_at?: string; + last_assistant_message_at?: string; + }; + expect(updateArgs.updated_at).toBeDefined(); + expect(updateArgs.last_assistant_message_at).toBe(updateArgs.updated_at); + }); + + it("does NOT touch updated_at on duplicate (workflow replay)", async () => { + vi.mocked(upsertChatMessage).mockResolvedValue({ + ok: true, + row: null, + isDuplicate: true, + }); + + await persistAssistantMessage(CHAT_ID, buildAssistantMessage()); + + expect(updateChat).not.toHaveBeenCalled(); + }); + + it("silently no-ops when the message role is not 'assistant' (guard against caller mistakes)", async () => { + await persistAssistantMessage(CHAT_ID, buildAssistantMessage({ role: "user" })); + + expect(upsertChatMessage).not.toHaveBeenCalled(); + expect(updateChat).not.toHaveBeenCalled(); + }); + + it("silently no-ops when the upsert reports a DB error (fire-and-forget contract)", async () => { + vi.mocked(upsertChatMessage).mockResolvedValue({ + ok: false, + error: "transient db error", + }); + + await expect( + persistAssistantMessage(CHAT_ID, buildAssistantMessage()), + ).resolves.toBeUndefined(); + expect(updateChat).not.toHaveBeenCalled(); + }); + + it("swallows unexpected exceptions (must not bubble up)", async () => { + vi.mocked(upsertChatMessage).mockRejectedValue(new Error("boom")); + + await expect( + persistAssistantMessage(CHAT_ID, buildAssistantMessage()), + ).resolves.toBeUndefined(); + }); +}); diff --git a/lib/chat/persistAssistantMessage.ts b/lib/chat/persistAssistantMessage.ts new file mode 100644 index 000000000..13f93c27b --- /dev/null +++ b/lib/chat/persistAssistantMessage.ts @@ -0,0 +1,73 @@ +import { upsertChatMessage } from "@/lib/supabase/chat_messages/upsertChatMessage"; +import { updateChat } from "@/lib/supabase/chats/updateChat"; + +/** + * Minimal duck-type shape we read off the assistant message. Both AI + * SDK's `UIMessage` and the in-test fixtures structurally satisfy it. + * Kept intentionally loose because the row we write to + * `chat_messages.parts` is `jsonb` — Supabase persists whatever the + * message looks like. + */ +type AssistantMessage = { + id: string; + role: string; + parts: ReadonlyArray; +}; + +/** + * Fire-and-forget persistence of the final assistant message at the + * end of a chat-workflow run. Mirrors open-agents' + * `persistAssistantMessage` step in + * `apps/web/app/workflows/chat-post-finish.ts` and closes the + * silent-data-loss gap the recoup-api cutover introduced — without + * this call the assistant response is streamed to the client but + * never written to `chat_messages`, so a page refresh after the + * stream completes wipes the message. + * + * Uses `upsertChatMessage(... { onConflict: "id", ignoreDuplicates })` + * so a workflow that's restarted (replay, recovery) doesn't + * double-insert. On a fresh insert we also bump + * `last_assistant_message_at` (drives the sidebar `hasUnread` badge + * in `getChatSummaries` — `lastAssistantMessageAt > lastReadAt`) and + * touch `updated_at` so the sidebar sort surfaces the chat. Matches + * open-agents' `updateChatAssistantActivity` which sets both columns + * to the same timestamp. + * + * Title generation lives in `persistLatestUserMessage` (the first + * user message is canonical for chat titles) — this function + * deliberately does NOT update the chat title. + * + * Errors are caught and logged. Contract is "schedule it and forget" + * — never block the workflow or surface failures to the UI. + * + * @param chatId - Target chat row. + * @param message - The assembled assistant message (typically from + * `toUIMessageStream`'s `onFinish.responseMessage`). + */ +export async function persistAssistantMessage( + chatId: string, + message: AssistantMessage, +): Promise { + "use step"; + try { + if (!message || message.role !== "assistant") return; + + const inserted = await upsertChatMessage({ + id: message.id, + chat_id: chatId, + role: "assistant", + parts: message as never, + }); + + if (!inserted.ok) return; + if (inserted.isDuplicate || inserted.row === null) return; + + const activityAt = new Date().toISOString(); + await updateChat( + { id: chatId }, + { updated_at: activityAt, last_assistant_message_at: activityAt }, + ); + } catch (error) { + console.error("[persistAssistantMessage] error:", error); + } +}