diff --git a/app/lib/workflows/__tests__/runAgentWorkflow.test.ts b/app/lib/workflows/__tests__/runAgentWorkflow.test.ts new file mode 100644 index 000000000..92f72594d --- /dev/null +++ b/app/lib/workflows/__tests__/runAgentWorkflow.test.ts @@ -0,0 +1,52 @@ +import { describe, it, expect, vi, beforeEach } from "vitest"; +import { runAgentWorkflow } from "@/app/lib/workflows/runAgentWorkflow"; +import { runAgentStep } from "@/app/lib/workflows/runAgentStep"; +import { clearChatActiveStream } from "@/lib/chat/clearChatActiveStream"; + +vi.mock("@/app/lib/workflows/runAgentStep", () => ({ + runAgentStep: vi.fn(), +})); +vi.mock("@/lib/chat/clearChatActiveStream", () => ({ + clearChatActiveStream: vi.fn(), +})); +vi.mock("workflow", () => ({ + getWritable: vi.fn(() => new WritableStream()), + getWorkflowMetadata: vi.fn(() => ({ + workflowRunId: "wrun_from_metadata", + workflowName: "runAgentWorkflow", + workflowStartedAt: new Date(), + url: "https://example.invalid/workflow", + })), +})); + +beforeEach(() => vi.clearAllMocks()); + +const baseInput = { + messages: [{ id: "m1", role: "user", parts: [{ type: "text", text: "hi" }] } as never], + chatId: "chat-1", + sessionId: "session-1", + modelId: "anthropic/claude-haiku-4.5", + agentContext: { + sandbox: { state: { type: "vercel" }, workingDirectory: "/sandbox/mono" }, + } as never, +}; + +describe("runAgentWorkflow", () => { + it("clears active_stream_id after a successful run, using the workflow's own runId", async () => { + vi.mocked(runAgentStep).mockResolvedValue({ finishReason: "stop" }); + + await runAgentWorkflow(baseInput); + + expect(clearChatActiveStream).toHaveBeenCalledTimes(1); + expect(clearChatActiveStream).toHaveBeenCalledWith("chat-1", "wrun_from_metadata"); + }); + + it("clears active_stream_id even when runAgentStep throws (try/finally guarantee)", async () => { + vi.mocked(runAgentStep).mockRejectedValue(new Error("model exploded")); + + await expect(runAgentWorkflow(baseInput)).rejects.toThrow("model exploded"); + + expect(clearChatActiveStream).toHaveBeenCalledTimes(1); + expect(clearChatActiveStream).toHaveBeenCalledWith("chat-1", "wrun_from_metadata"); + }); +}); diff --git a/app/lib/workflows/runAgentWorkflow.ts b/app/lib/workflows/runAgentWorkflow.ts index 3a0965342..07dce5483 100644 --- a/app/lib/workflows/runAgentWorkflow.ts +++ b/app/lib/workflows/runAgentWorkflow.ts @@ -1,6 +1,7 @@ -import { getWritable } from "workflow"; +import { getWorkflowMetadata, getWritable } from "workflow"; import type { UIMessage, UIMessageChunk } from "ai"; import { runAgentStep } from "@/app/lib/workflows/runAgentStep"; +import { clearChatActiveStream } from "@/lib/chat/clearChatActiveStream"; import type { DurableAgentContext } from "@/lib/agent/tools/AgentContext"; export type RunAgentWorkflowInput = { @@ -35,19 +36,32 @@ export type RunAgentWorkflowInput = { export async function runAgentWorkflow(input: RunAgentWorkflowInput): Promise { "use workflow"; + const { workflowRunId } = getWorkflowMetadata(); + console.log("[runAgentWorkflow] start", { chatId: input.chatId, sessionId: input.sessionId, modelId: input.modelId, + workflowRunId, }); const writable = getWritable(); - const result = await runAgentStep({ - messages: input.messages, - modelId: input.modelId, - writable, - agentContext: input.agentContext, - }); - console.log("[runAgentWorkflow] finish", { finishReason: result.finishReason }); + try { + const result = await runAgentStep({ + messages: input.messages, + modelId: input.modelId, + writable, + agentContext: input.agentContext, + }); + console.log("[runAgentWorkflow] finish", { finishReason: result.finishReason }); + } finally { + // Clear `chats.active_stream_id` (CAS-gated on this run's id) so the + // client's "is this chat still streaming?" probe flips back to false + // and the AI SDK can release `chat.status` from `submitted`. Runs + // inside the workflow body (vs. an after() callback in the request + // handler) so it fires immediately on the same workflow tick — no + // polling lag. Mirrors open-agents' chat workflow. + await clearChatActiveStream(input.chatId, workflowRunId); + } } diff --git a/lib/chat/__tests__/clearChatActiveStream.test.ts b/lib/chat/__tests__/clearChatActiveStream.test.ts new file mode 100644 index 000000000..03ad3f4e6 --- /dev/null +++ b/lib/chat/__tests__/clearChatActiveStream.test.ts @@ -0,0 +1,74 @@ +import { describe, it, expect, vi, beforeEach } from "vitest"; +import { clearChatActiveStream } from "@/lib/chat/clearChatActiveStream"; +import { compareAndSetChatActiveStreamId } from "@/lib/chat/compareAndSetChatActiveStreamId"; + +vi.mock("@/lib/chat/compareAndSetChatActiveStreamId", () => ({ + compareAndSetChatActiveStreamId: vi.fn(), +})); + +beforeEach(() => { + vi.clearAllMocks(); + vi.useFakeTimers({ shouldAdvanceTime: true }); +}); + +const CHAT_ID = "chat-1"; +const RUN_ID = "wrun_test"; + +describe("clearChatActiveStream", () => { + it("CAS-clears active_stream_id back to null on the happy path", async () => { + vi.mocked(compareAndSetChatActiveStreamId).mockResolvedValue({ + ok: true, + claimed: true, + }); + + await clearChatActiveStream(CHAT_ID, RUN_ID); + + expect(compareAndSetChatActiveStreamId).toHaveBeenCalledTimes(1); + expect(compareAndSetChatActiveStreamId).toHaveBeenCalledWith(CHAT_ID, RUN_ID, null); + }); + + it("returns without throwing when the race is lost (a newer run owns the slot)", async () => { + vi.mocked(compareAndSetChatActiveStreamId).mockResolvedValue({ + ok: true, + claimed: false, + }); + + await expect(clearChatActiveStream(CHAT_ID, RUN_ID)).resolves.toBeUndefined(); + expect(compareAndSetChatActiveStreamId).toHaveBeenCalledTimes(1); + }); + + it("retries up to 3 times on transient DB errors and stops once a CAS succeeds", async () => { + vi.mocked(compareAndSetChatActiveStreamId) + .mockResolvedValueOnce({ ok: false, error: "transient 1" }) + .mockResolvedValueOnce({ ok: true, claimed: true }); + + await clearChatActiveStream(CHAT_ID, RUN_ID); + + expect(compareAndSetChatActiveStreamId).toHaveBeenCalledTimes(2); + }); + + it("gives up after 3 failed CAS attempts and logs (does not throw)", async () => { + vi.mocked(compareAndSetChatActiveStreamId).mockResolvedValue({ + ok: false, + error: "persistent", + }); + const consoleSpy = vi.spyOn(console, "error").mockImplementation(() => {}); + + await expect(clearChatActiveStream(CHAT_ID, RUN_ID)).resolves.toBeUndefined(); + + expect(compareAndSetChatActiveStreamId).toHaveBeenCalledTimes(3); + expect(consoleSpy).toHaveBeenCalled(); + consoleSpy.mockRestore(); + }); + + it("retries on thrown exceptions and gives up after 3 attempts", async () => { + vi.mocked(compareAndSetChatActiveStreamId).mockRejectedValue(new Error("boom")); + const consoleSpy = vi.spyOn(console, "error").mockImplementation(() => {}); + + await expect(clearChatActiveStream(CHAT_ID, RUN_ID)).resolves.toBeUndefined(); + + expect(compareAndSetChatActiveStreamId).toHaveBeenCalledTimes(3); + expect(consoleSpy).toHaveBeenCalled(); + consoleSpy.mockRestore(); + }); +}); diff --git a/lib/chat/clearChatActiveStream.ts b/lib/chat/clearChatActiveStream.ts new file mode 100644 index 000000000..d45fd1fe5 --- /dev/null +++ b/lib/chat/clearChatActiveStream.ts @@ -0,0 +1,64 @@ +import { compareAndSetChatActiveStreamId } from "@/lib/chat/compareAndSetChatActiveStreamId"; +import { delay } from "@/lib/time/delay"; + +const MAX_ATTEMPTS = 3; +// 50ms picked to match open-agents' `ACTIVE_STREAM_CLEAR_RETRY_DELAY_MS` +// — a transient Supabase blip costs at most ~150ms total before the +// third attempt, keeping the cleanup tail well under the human- +// perceptible ~250ms threshold. +const RETRY_DELAY_MS = 50; + +/** + * Vercel Workflow `"use step"` that CAS-clears `chats.active_stream_id` + * back to null **only if** it still holds this workflow run's id. + * + * Designed to be called from the end of `runAgentWorkflow`'s body so it + * fires the moment the durable run finishes — no `after()` / polling + * lag. Mirrors open-agents' `clearActiveStream` step in + * `app/workflows/chat-post-finish.ts`. + * + * Why CAS instead of unconditional UPDATE: if a newer run has already + * claimed the slot (e.g. the user submitted a follow-up while this + * run was draining cleanup), the newer run's id is preserved. + * + * Retries up to 3 times with a short delay so a transient Supabase + * failure here doesn't leave the chat permanently stuck as + * "isStreaming: true". Final-attempt failures are logged but never + * thrown — the workflow has already done its real work; we don't want + * a cleanup hiccup to mark the run as failed. + * + * @param chatId - Target chat row. + * @param workflowRunId - The current run's id (from + * `getWorkflowMetadata().workflowRunId`). + */ +export async function clearChatActiveStream(chatId: string, workflowRunId: string): Promise { + "use step"; + + for (let attempt = 1; attempt <= MAX_ATTEMPTS; attempt++) { + try { + const result = await compareAndSetChatActiveStreamId(chatId, workflowRunId, null); + if (result.ok === false) { + if (attempt === MAX_ATTEMPTS) { + console.error( + `[clearChatActiveStream] CAS error chatId=${chatId} runId=${workflowRunId}: ${result.error}`, + ); + return; + } + await delay(RETRY_DELAY_MS); + continue; + } + // result.ok === true. result.claimed === false means the race was lost + // (a newer run owns the slot) — nothing to do, just return. + return; + } catch (error) { + if (attempt === MAX_ATTEMPTS) { + console.error( + `[clearChatActiveStream] unhandled error chatId=${chatId} runId=${workflowRunId}:`, + error, + ); + return; + } + await delay(RETRY_DELAY_MS); + } + } +} diff --git a/lib/time/__tests__/delay.test.ts b/lib/time/__tests__/delay.test.ts new file mode 100644 index 000000000..a4682b73e --- /dev/null +++ b/lib/time/__tests__/delay.test.ts @@ -0,0 +1,40 @@ +import { describe, it, expect, vi } from "vitest"; +import { delay } from "@/lib/time/delay"; + +describe("delay", () => { + it("resolves after the specified delay (within tolerance)", async () => { + const start = Date.now(); + await delay(50); + const elapsed = Date.now() - start; + // setTimeout fires "no earlier than" the requested duration; allow generous + // headroom for CI jitter rather than asserting an exact value. + expect(elapsed).toBeGreaterThanOrEqual(45); + expect(elapsed).toBeLessThan(500); + }); + + it("resolves on the next tick when given 0", async () => { + let resolved = false; + const promise = delay(0).then(() => { + resolved = true; + }); + // Synchronously, the timer hasn't fired yet. + expect(resolved).toBe(false); + await promise; + expect(resolved).toBe(true); + }); + + it("uses fake timers correctly (callers can drive it deterministically)", async () => { + vi.useFakeTimers(); + let resolved = false; + const promise = delay(100).then(() => { + resolved = true; + }); + expect(resolved).toBe(false); + await vi.advanceTimersByTimeAsync(99); + expect(resolved).toBe(false); + await vi.advanceTimersByTimeAsync(1); + await promise; + expect(resolved).toBe(true); + vi.useRealTimers(); + }); +}); diff --git a/lib/time/delay.ts b/lib/time/delay.ts new file mode 100644 index 000000000..bd26c20f5 --- /dev/null +++ b/lib/time/delay.ts @@ -0,0 +1,18 @@ +/** + * Promise wrapper around `setTimeout` for use inside retry loops and + * other non-workflow-body waits. + * + * NOT a substitute for `workflow.sleep()` — that one creates durable + * timer events in the workflow event log and is the correct tool + * inside `"use workflow"` bodies. `delay()` is for ordinary async + * code (including `"use step"` functions, which run as regular + * non-replayable code). + * + * @param ms - Duration in milliseconds. Negative or 0 resolves on the + * next microtask tick (same behavior as `setTimeout(_, 0)`). + */ +export function delay(ms: number): Promise { + return new Promise(resolve => { + setTimeout(resolve, ms); + }); +}