Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 52 additions & 0 deletions app/lib/workflows/__tests__/runAgentWorkflow.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import { describe, it, expect, vi, beforeEach } from "vitest";
import { runAgentWorkflow } from "@/app/lib/workflows/runAgentWorkflow";
import { runAgentStep } from "@/app/lib/workflows/runAgentStep";
import { clearChatActiveStream } from "@/lib/chat/clearChatActiveStream";

vi.mock("@/app/lib/workflows/runAgentStep", () => ({
runAgentStep: vi.fn(),
}));
vi.mock("@/lib/chat/clearChatActiveStream", () => ({
clearChatActiveStream: vi.fn(),
}));
vi.mock("workflow", () => ({
getWritable: vi.fn(() => new WritableStream()),
getWorkflowMetadata: vi.fn(() => ({
workflowRunId: "wrun_from_metadata",
workflowName: "runAgentWorkflow",
workflowStartedAt: new Date(),
url: "https://example.invalid/workflow",
})),
}));

beforeEach(() => vi.clearAllMocks());

const baseInput = {
messages: [{ id: "m1", role: "user", parts: [{ type: "text", text: "hi" }] } as never],
chatId: "chat-1",
sessionId: "session-1",
modelId: "anthropic/claude-haiku-4.5",
agentContext: {
sandbox: { state: { type: "vercel" }, workingDirectory: "/sandbox/mono" },
} as never,
};

describe("runAgentWorkflow", () => {
it("clears active_stream_id after a successful run, using the workflow's own runId", async () => {
vi.mocked(runAgentStep).mockResolvedValue({ finishReason: "stop" });

await runAgentWorkflow(baseInput);

expect(clearChatActiveStream).toHaveBeenCalledTimes(1);
expect(clearChatActiveStream).toHaveBeenCalledWith("chat-1", "wrun_from_metadata");
});

it("clears active_stream_id even when runAgentStep throws (try/finally guarantee)", async () => {
vi.mocked(runAgentStep).mockRejectedValue(new Error("model exploded"));

await expect(runAgentWorkflow(baseInput)).rejects.toThrow("model exploded");

expect(clearChatActiveStream).toHaveBeenCalledTimes(1);
expect(clearChatActiveStream).toHaveBeenCalledWith("chat-1", "wrun_from_metadata");
});
});
30 changes: 22 additions & 8 deletions app/lib/workflows/runAgentWorkflow.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { getWritable } from "workflow";
import { getWorkflowMetadata, getWritable } from "workflow";
import type { UIMessage, UIMessageChunk } from "ai";
import { runAgentStep } from "@/app/lib/workflows/runAgentStep";
import { clearChatActiveStream } from "@/lib/chat/clearChatActiveStream";
import type { DurableAgentContext } from "@/lib/agent/tools/AgentContext";

export type RunAgentWorkflowInput = {
Expand Down Expand Up @@ -35,19 +36,32 @@ export type RunAgentWorkflowInput = {
export async function runAgentWorkflow(input: RunAgentWorkflowInput): Promise<void> {
"use workflow";

const { workflowRunId } = getWorkflowMetadata();

console.log("[runAgentWorkflow] start", {
chatId: input.chatId,
sessionId: input.sessionId,
modelId: input.modelId,
workflowRunId,
});

const writable = getWritable<UIMessageChunk>();
const result = await runAgentStep({
messages: input.messages,
modelId: input.modelId,
writable,
agentContext: input.agentContext,
});

console.log("[runAgentWorkflow] finish", { finishReason: result.finishReason });
try {
const result = await runAgentStep({
messages: input.messages,
modelId: input.modelId,
writable,
agentContext: input.agentContext,
});
console.log("[runAgentWorkflow] finish", { finishReason: result.finishReason });
} finally {
// Clear `chats.active_stream_id` (CAS-gated on this run's id) so the
// client's "is this chat still streaming?" probe flips back to false
// and the AI SDK can release `chat.status` from `submitted`. Runs
// inside the workflow body (vs. an after() callback in the request
// handler) so it fires immediately on the same workflow tick — no
// polling lag. Mirrors open-agents' chat workflow.
await clearChatActiveStream(input.chatId, workflowRunId);
}
}
74 changes: 74 additions & 0 deletions lib/chat/__tests__/clearChatActiveStream.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
import { describe, it, expect, vi, beforeEach } from "vitest";
import { clearChatActiveStream } from "@/lib/chat/clearChatActiveStream";
import { compareAndSetChatActiveStreamId } from "@/lib/chat/compareAndSetChatActiveStreamId";

vi.mock("@/lib/chat/compareAndSetChatActiveStreamId", () => ({
compareAndSetChatActiveStreamId: vi.fn(),
}));

beforeEach(() => {
vi.clearAllMocks();
vi.useFakeTimers({ shouldAdvanceTime: true });
});

const CHAT_ID = "chat-1";
const RUN_ID = "wrun_test";

describe("clearChatActiveStream", () => {
it("CAS-clears active_stream_id back to null on the happy path", async () => {
vi.mocked(compareAndSetChatActiveStreamId).mockResolvedValue({
ok: true,
claimed: true,
});

await clearChatActiveStream(CHAT_ID, RUN_ID);

expect(compareAndSetChatActiveStreamId).toHaveBeenCalledTimes(1);
expect(compareAndSetChatActiveStreamId).toHaveBeenCalledWith(CHAT_ID, RUN_ID, null);
});

it("returns without throwing when the race is lost (a newer run owns the slot)", async () => {
vi.mocked(compareAndSetChatActiveStreamId).mockResolvedValue({
ok: true,
claimed: false,
});

await expect(clearChatActiveStream(CHAT_ID, RUN_ID)).resolves.toBeUndefined();
expect(compareAndSetChatActiveStreamId).toHaveBeenCalledTimes(1);
});

it("retries up to 3 times on transient DB errors and stops once a CAS succeeds", async () => {
vi.mocked(compareAndSetChatActiveStreamId)
.mockResolvedValueOnce({ ok: false, error: "transient 1" })
.mockResolvedValueOnce({ ok: true, claimed: true });

await clearChatActiveStream(CHAT_ID, RUN_ID);

expect(compareAndSetChatActiveStreamId).toHaveBeenCalledTimes(2);
});

it("gives up after 3 failed CAS attempts and logs (does not throw)", async () => {
vi.mocked(compareAndSetChatActiveStreamId).mockResolvedValue({
ok: false,
error: "persistent",
});
const consoleSpy = vi.spyOn(console, "error").mockImplementation(() => {});

await expect(clearChatActiveStream(CHAT_ID, RUN_ID)).resolves.toBeUndefined();

expect(compareAndSetChatActiveStreamId).toHaveBeenCalledTimes(3);
expect(consoleSpy).toHaveBeenCalled();
consoleSpy.mockRestore();
});

it("retries on thrown exceptions and gives up after 3 attempts", async () => {
vi.mocked(compareAndSetChatActiveStreamId).mockRejectedValue(new Error("boom"));
const consoleSpy = vi.spyOn(console, "error").mockImplementation(() => {});

await expect(clearChatActiveStream(CHAT_ID, RUN_ID)).resolves.toBeUndefined();

expect(compareAndSetChatActiveStreamId).toHaveBeenCalledTimes(3);
expect(consoleSpy).toHaveBeenCalled();
consoleSpy.mockRestore();
});
});
64 changes: 64 additions & 0 deletions lib/chat/clearChatActiveStream.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
import { compareAndSetChatActiveStreamId } from "@/lib/chat/compareAndSetChatActiveStreamId";
import { delay } from "@/lib/time/delay";

const MAX_ATTEMPTS = 3;
// 50ms picked to match open-agents' `ACTIVE_STREAM_CLEAR_RETRY_DELAY_MS`
// — a transient Supabase blip costs at most ~150ms total before the
// third attempt, keeping the cleanup tail well under the human-
// perceptible ~250ms threshold.
const RETRY_DELAY_MS = 50;

/**
* Vercel Workflow `"use step"` that CAS-clears `chats.active_stream_id`
* back to null **only if** it still holds this workflow run's id.
*
* Designed to be called from the end of `runAgentWorkflow`'s body so it
* fires the moment the durable run finishes — no `after()` / polling
* lag. Mirrors open-agents' `clearActiveStream` step in
* `app/workflows/chat-post-finish.ts`.
*
* Why CAS instead of unconditional UPDATE: if a newer run has already
* claimed the slot (e.g. the user submitted a follow-up while this
* run was draining cleanup), the newer run's id is preserved.
*
* Retries up to 3 times with a short delay so a transient Supabase
* failure here doesn't leave the chat permanently stuck as
* "isStreaming: true". Final-attempt failures are logged but never
* thrown — the workflow has already done its real work; we don't want
* a cleanup hiccup to mark the run as failed.
*
* @param chatId - Target chat row.
* @param workflowRunId - The current run's id (from
* `getWorkflowMetadata().workflowRunId`).
*/
export async function clearChatActiveStream(chatId: string, workflowRunId: string): Promise<void> {
"use step";

for (let attempt = 1; attempt <= MAX_ATTEMPTS; attempt++) {
try {
const result = await compareAndSetChatActiveStreamId(chatId, workflowRunId, null);
if (result.ok === false) {
if (attempt === MAX_ATTEMPTS) {
console.error(
`[clearChatActiveStream] CAS error chatId=${chatId} runId=${workflowRunId}: ${result.error}`,
);
return;
}
await delay(RETRY_DELAY_MS);
continue;
}
// result.ok === true. result.claimed === false means the race was lost
// (a newer run owns the slot) — nothing to do, just return.
return;
} catch (error) {
if (attempt === MAX_ATTEMPTS) {
console.error(
`[clearChatActiveStream] unhandled error chatId=${chatId} runId=${workflowRunId}:`,
error,
);
return;
}
await delay(RETRY_DELAY_MS);
}
}
}
40 changes: 40 additions & 0 deletions lib/time/__tests__/delay.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import { describe, it, expect, vi } from "vitest";
import { delay } from "@/lib/time/delay";

describe("delay", () => {
it("resolves after the specified delay (within tolerance)", async () => {
const start = Date.now();
await delay(50);
const elapsed = Date.now() - start;
// setTimeout fires "no earlier than" the requested duration; allow generous
// headroom for CI jitter rather than asserting an exact value.
expect(elapsed).toBeGreaterThanOrEqual(45);
expect(elapsed).toBeLessThan(500);
});

it("resolves on the next tick when given 0", async () => {
let resolved = false;
const promise = delay(0).then(() => {
resolved = true;
});
// Synchronously, the timer hasn't fired yet.
expect(resolved).toBe(false);
await promise;
expect(resolved).toBe(true);
});

it("uses fake timers correctly (callers can drive it deterministically)", async () => {
vi.useFakeTimers();
let resolved = false;
const promise = delay(100).then(() => {
resolved = true;
});
expect(resolved).toBe(false);
await vi.advanceTimersByTimeAsync(99);
expect(resolved).toBe(false);
await vi.advanceTimersByTimeAsync(1);
await promise;
expect(resolved).toBe(true);
vi.useRealTimers();
});
});
18 changes: 18 additions & 0 deletions lib/time/delay.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
/**
* Promise wrapper around `setTimeout` for use inside retry loops and
* other non-workflow-body waits.
*
* NOT a substitute for `workflow.sleep()` — that one creates durable
* timer events in the workflow event log and is the correct tool
* inside `"use workflow"` bodies. `delay()` is for ordinary async
* code (including `"use step"` functions, which run as regular
* non-replayable code).
*
* @param ms - Duration in milliseconds. Negative or 0 resolves on the
* next microtask tick (same behavior as `setTimeout(_, 0)`).
*/
export function delay(ms: number): Promise<void> {
return new Promise(resolve => {
setTimeout(resolve, ms);
});
}
Loading