Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions app/lib/workflows/__tests__/generateAssistantMessageId.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import { describe, it, expect, vi } from "vitest";
import { generateAssistantMessageId } from "@/app/lib/workflows/generateAssistantMessageId";

vi.mock("ai", async () => {
const actual = await vi.importActual<typeof import("ai")>("ai");
return { ...actual, generateId: vi.fn(() => "generated-by-mock") };
});

describe("generateAssistantMessageId", () => {
it("returns the value from ai's generateId()", async () => {
const id = await generateAssistantMessageId();
expect(id).toBe("generated-by-mock");
});

it("returns a string", async () => {
const id = await generateAssistantMessageId();
expect(typeof id).toBe("string");
expect(id.length).toBeGreaterThan(0);
});
});
83 changes: 80 additions & 3 deletions app/lib/workflows/__tests__/runAgentStep.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,28 @@ vi.mock("@ai-sdk/gateway", () => ({
gateway: vi.fn((modelId: string) => ({ modelId, __mock: "gateway" })),
}));

function makeStreamResult(opts?: { metadataCalls?: Array<unknown> }) {
function makeStreamResult(opts?: {
metadataCalls?: Array<unknown>;
onFinishCalls?: Array<unknown>;
emittedResponseMessage?: unknown;
}) {
const calls = opts?.metadataCalls ?? [];
const onFinishCalls = opts?.onFinishCalls ?? [];
return {
toUIMessageStream: vi.fn((streamOpts: { messageMetadata?: unknown }) => {
// Capture the callback so tests can inspect it
toUIMessageStream: vi.fn((streamOpts: { messageMetadata?: unknown; onFinish?: unknown }) => {
// Capture the callbacks so tests can inspect (and invoke) them
calls.push(streamOpts.messageMetadata);
onFinishCalls.push(streamOpts.onFinish);
return (async function* () {
yield { type: "start" };
yield { type: "finish" };
// Mirror the AI SDK contract: onFinish fires after the
// generator yields its last chunk with the assembled message.
if (typeof streamOpts.onFinish === "function" && opts?.emittedResponseMessage) {
(streamOpts.onFinish as (a: { responseMessage: unknown }) => void)({
responseMessage: opts.emittedResponseMessage,
});
}
})();
}),
finishReason: Promise.resolve("stop"),
Expand Down Expand Up @@ -49,6 +62,7 @@ const baseInput = {
agentContext: {
sandbox: { state: { type: "vercel" }, workingDirectory: "/sandbox/mono" },
},
assistantMessageId: "asst-test-id",
};

describe("runAgentStep", () => {
Expand Down Expand Up @@ -174,4 +188,67 @@ describe("runAgentStep", () => {
expect(cb({ part: { type: "text-delta" } })).toBeUndefined();
expect(cb({ part: { type: "start" } })).toBeUndefined();
});

it("wires an onFinish callback into toUIMessageStream", async () => {
const onFinishCalls: unknown[] = [];
vi.mocked(streamText).mockReturnValue(makeStreamResult({ onFinishCalls }) as never);
const { stream } = makeWritable();

await runAgentStep({ ...baseInput, writable: stream } as never);

expect(onFinishCalls).toHaveLength(1);
expect(typeof onFinishCalls[0]).toBe("function");
});

it("returns the responseMessage captured from onFinish", async () => {
const emittedResponseMessage = {
id: "assistant-msg-1",
role: "assistant",
parts: [{ type: "text", text: "Hello" }],
};
vi.mocked(streamText).mockReturnValue(makeStreamResult({ emittedResponseMessage }) as never);
const { stream } = makeWritable();

const result = await runAgentStep({ ...baseInput, writable: stream } as never);

expect(result.responseMessage).toEqual(emittedResponseMessage);
expect(result.finishReason).toBe("stop");
});

it("returns responseMessage: undefined when onFinish never fires", async () => {
// Default makeStreamResult — no emittedResponseMessage, so onFinish is wired but never invoked
vi.mocked(streamText).mockReturnValue(makeStreamResult() as never);
const { stream } = makeWritable();

const result = await runAgentStep({ ...baseInput, writable: stream } as never);

expect(result.responseMessage).toBeUndefined();
expect(result.finishReason).toBe("stop");
});

it("forwards input.assistantMessageId into toUIMessageStream's generateMessageId", async () => {
const generateMessageIdCalls: unknown[] = [];
const streamResult = makeStreamResult();
// Spy on the options passed to toUIMessageStream to grab the generateMessageId fn.
const originalToUIMessageStream = streamResult.toUIMessageStream;
streamResult.toUIMessageStream = vi.fn((streamOpts: { generateMessageId?: unknown }) => {
generateMessageIdCalls.push(streamOpts.generateMessageId);
return (originalToUIMessageStream as unknown as (o: unknown) => AsyncGenerator<unknown>)(
streamOpts,
);
}) as never;
vi.mocked(streamText).mockReturnValue(streamResult as never);
const { stream } = makeWritable();

await runAgentStep({
...baseInput,
writable: stream,
assistantMessageId: "asst-from-workflow-xyz",
} as never);

expect(generateMessageIdCalls).toHaveLength(1);
const gen = generateMessageIdCalls[0] as () => string;
expect(typeof gen).toBe("function");
expect(gen()).toBe("asst-from-workflow-xyz");
});
});
102 changes: 99 additions & 3 deletions app/lib/workflows/__tests__/runAgentWorkflow.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ import { runAgentWorkflow } from "@/app/lib/workflows/runAgentWorkflow";
import { runAgentStep } from "@/app/lib/workflows/runAgentStep";
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2: Custom agent: Enforce Clear Code Style and Maintainability Practices

Test file exceeds the 100-line maximum required by the style rule.

Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At app/lib/workflows/__tests__/runAgentWorkflow.test.ts, line 95:

<comment>Test file exceeds the 100-line maximum required by the style rule.</comment>

<file context>
@@ -74,4 +91,83 @@ describe("runAgentWorkflow", () => {
     expect(closeChatStream).toHaveBeenCalledWith(writableStub);
   });
+
+  it("persists the assistant message when runAgentStep returns one", async () => {
+    const responseMessage = {
+      id: "assistant-msg-xyz",
</file context>

import { clearChatActiveStream } from "@/lib/chat/clearChatActiveStream";
import { closeChatStream } from "@/app/lib/workflows/closeChatStream";
import { generateAssistantMessageId } from "@/app/lib/workflows/generateAssistantMessageId";
import { persistAssistantMessage } from "@/lib/chat/persistAssistantMessage";

vi.mock("@/app/lib/workflows/runAgentStep", () => ({
runAgentStep: vi.fn(),
Expand All @@ -13,6 +15,12 @@ vi.mock("@/lib/chat/clearChatActiveStream", () => ({
vi.mock("@/app/lib/workflows/closeChatStream", () => ({
closeChatStream: vi.fn(),
}));
vi.mock("@/app/lib/workflows/generateAssistantMessageId", () => ({
generateAssistantMessageId: vi.fn(),
}));
vi.mock("@/lib/chat/persistAssistantMessage", () => ({
persistAssistantMessage: vi.fn(),
}));
// Captured writable stub so tests can assert closeChatStream got the
// same instance the workflow body holds.
const writableStub = new WritableStream();
Expand All @@ -26,7 +34,10 @@ vi.mock("workflow", () => ({
})),
}));

beforeEach(() => vi.clearAllMocks());
beforeEach(() => {
vi.clearAllMocks();
vi.mocked(generateAssistantMessageId).mockResolvedValue("asst-fresh-id");
});

const baseInput = {
messages: [{ id: "m1", role: "user", parts: [{ type: "text", text: "hi" }] } as never],
Expand All @@ -40,7 +51,10 @@ const baseInput = {

describe("runAgentWorkflow", () => {
it("clears active_stream_id after a successful run, using the workflow's own runId", async () => {
vi.mocked(runAgentStep).mockResolvedValue({ finishReason: "stop" });
vi.mocked(runAgentStep).mockResolvedValue({
finishReason: "stop",
responseMessage: undefined,
});

await runAgentWorkflow(baseInput);

Expand All @@ -58,7 +72,10 @@ describe("runAgentWorkflow", () => {
});

it("explicitly closes the chat writable after a successful run so SSE ends promptly", async () => {
vi.mocked(runAgentStep).mockResolvedValue({ finishReason: "stop" });
vi.mocked(runAgentStep).mockResolvedValue({
finishReason: "stop",
responseMessage: undefined,
});

await runAgentWorkflow(baseInput);

Expand All @@ -74,4 +91,83 @@ describe("runAgentWorkflow", () => {
expect(closeChatStream).toHaveBeenCalledTimes(1);
expect(closeChatStream).toHaveBeenCalledWith(writableStub);
});

it("persists the assistant message when runAgentStep returns one", async () => {
const responseMessage = {
id: "assistant-msg-xyz",
role: "assistant",
parts: [{ type: "text", text: "Hello!" }],
};
vi.mocked(runAgentStep).mockResolvedValue({
finishReason: "stop",
responseMessage: responseMessage as never,
});

await runAgentWorkflow(baseInput);

expect(persistAssistantMessage).toHaveBeenCalledTimes(1);
expect(persistAssistantMessage).toHaveBeenCalledWith("chat-1", responseMessage);
});

it("does NOT call persistAssistantMessage when runAgentStep returns no responseMessage", async () => {
vi.mocked(runAgentStep).mockResolvedValue({
finishReason: "stop",
responseMessage: undefined,
});

await runAgentWorkflow(baseInput);

expect(persistAssistantMessage).not.toHaveBeenCalled();
});

it("does NOT call persistAssistantMessage when runAgentStep throws (no message to persist)", async () => {
vi.mocked(runAgentStep).mockRejectedValue(new Error("model exploded"));

await expect(runAgentWorkflow(baseInput)).rejects.toThrow("model exploded");

expect(persistAssistantMessage).not.toHaveBeenCalled();
// But cleanup steps still run via the try/finally
expect(clearChatActiveStream).toHaveBeenCalledTimes(1);
expect(closeChatStream).toHaveBeenCalledTimes(1);
});

it("generates a fresh assistantMessageId via the step and forwards it to runAgentStep", async () => {
vi.mocked(runAgentStep).mockResolvedValue({
finishReason: "stop",
responseMessage: undefined,
});

await runAgentWorkflow(baseInput);

expect(generateAssistantMessageId).toHaveBeenCalledTimes(1);
expect(runAgentStep).toHaveBeenCalledWith(
expect.objectContaining({ assistantMessageId: "asst-fresh-id" }),
);
});

it("reuses the latest assistant message id when resuming a tool-call turn (no fresh generation)", async () => {
vi.mocked(runAgentStep).mockResolvedValue({
finishReason: "stop",
responseMessage: undefined,
});

const resumingInput = {
...baseInput,
messages: [
{ id: "m1", role: "user", parts: [{ type: "text", text: "go" }] },
{
id: "asst-in-progress",
role: "assistant",
parts: [{ type: "text", text: "thinking..." }],
},
] as never,
};

await runAgentWorkflow(resumingInput);

expect(generateAssistantMessageId).not.toHaveBeenCalled();
expect(runAgentStep).toHaveBeenCalledWith(
expect.objectContaining({ assistantMessageId: "asst-in-progress" }),
);
});
});
19 changes: 19 additions & 0 deletions app/lib/workflows/generateAssistantMessageId.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import { generateId } from "ai";

/**
* Vercel Workflow `"use step"` that returns a fresh message id.
*
* Wrapped as a step rather than inlined into the workflow body
* because `generateId()` is non-deterministic — calling it directly
* from workflow code would break the durable-replay contract (a
* replay would generate a *different* id and diverge from the
* captured event log). As a step, the result is captured in the
* event log once and reused on every replay.
*
* Mirrors open-agents' `generateId` step in
* `apps/web/app/workflows/chat.ts`.
*/
export async function generateAssistantMessageId(): Promise<string> {
"use step";
return generateId();
}
53 changes: 48 additions & 5 deletions app/lib/workflows/runAgentStep.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,35 @@ export type RunAgentStepInput = {
* is added to `experimental_context` right before each model call.
*/
agentContext: DurableAgentContext;
/**
* Stable id to assign to the assistant message produced by this
* step. Generated once in `runAgentWorkflow` so:
*
* - Every chunk in this step's `toUIMessageStream` carries the
* same id (the AI SDK threads it through).
* - Future multi-step iterations of the agent loop reuse the
* same id so a single conversational reply is one row in
* `chat_messages` rather than fragmenting per tool-call cycle.
* - Resume after tool-call interaction reattaches to the in-
* progress assistant message rather than spawning a new one.
*
* Mirrors open-agents' `runAgentStep(messages, originalMessages,
* messageId, ...)` signature in
* `apps/web/app/workflows/chat.ts`.
*/
assistantMessageId: string;
};

export type RunAgentStepResult = {
finishReason: string;
/**
* The assembled assistant message captured from
* `toUIMessageStream`'s `onFinish` callback. `undefined` if the
* stream finished without emitting one (e.g. an error path that
* short-circuits before any chunks land). Used by `runAgentWorkflow`
* to persist the final message to `chat_messages`.
*/
responseMessage: UIMessage | undefined;
};

/**
Expand All @@ -38,9 +67,11 @@ export type RunAgentStepInput = {
* of the tool surface ports in a follow-up PR.
*
* @param input - Messages + selected model + writable stream + agent context.
* @returns finishReason from the model run.
* @returns `finishReason` from the model run plus the assembled
* `responseMessage` (when one was emitted) so the caller can
* persist it.
*/
export async function runAgentStep(input: RunAgentStepInput): Promise<{ finishReason: string }> {
export async function runAgentStep(input: RunAgentStepInput): Promise<RunAgentStepResult> {
"use step";

console.log("[runAgentStep] start", {
Expand Down Expand Up @@ -91,6 +122,12 @@ export async function runAgentStep(input: RunAgentStepInput): Promise<{ finishRe
}),
});

// Capture the assembled assistant message via `onFinish` so the
// caller can persist it. Mirrors open-agents' `runAgentStep` which
// stashes `finishedResponseMessage` into a closure-scoped variable
// for the outer workflow body to forward into `persistAssistantMessage`.
let responseMessage: UIMessage | undefined;

// Acquire the writer once and release in `finally` so a thrown chunk
// doesn't leak the lock.
const writer = input.writable.getWriter();
Expand All @@ -100,14 +137,20 @@ export async function runAgentStep(input: RunAgentStepInput): Promise<{ finishRe
// shape so sandbox.recoupable.com sees the same metadata when cut
// over to api's /api/chat/workflow.
const messageMetadata = buildMessageMetadataCallback({ modelId: input.modelId });
for await (const part of result.toUIMessageStream({ messageMetadata })) {
for await (const part of result.toUIMessageStream({
messageMetadata,
generateMessageId: () => input.assistantMessageId,
onFinish: ({ responseMessage: finishedResponseMessage }) => {
responseMessage = finishedResponseMessage;
},
})) {
await writer.write(part);
}
} finally {
writer.releaseLock();
}

const finishReason = await result.finishReason;
console.log("[runAgentStep] finish", { finishReason });
return { finishReason };
console.log("[runAgentStep] finish", { finishReason, hasResponseMessage: !!responseMessage });
return { finishReason, responseMessage };
}
Loading
Loading