diff --git a/.agent/notes/driver-test-progress.md b/.agent/notes/driver-test-progress.md index 01fe79b81c..3108b239af 100644 --- a/.agent/notes/driver-test-progress.md +++ b/.agent/notes/driver-test-progress.md @@ -1,7 +1,7 @@ # Driver Test Suite Progress Started: 2026-05-01 -Config: registry (static), client type (http), encoding (bare) +Config: registry (static), encoding (bare), runtime (native only) Scope: DB driver tests only ## DB Tests @@ -14,50 +14,24 @@ Scope: DB driver tests only - [x] actor-db-init-order | Actor DB Init Order ## Log -<<<<<<< HEAD - 2026-04-26T14:06:57-07:00 manager-driver: PASS - - 2026-04-26T14:07:27-07:00 actor-conn: PASS - - 2026-04-26T14:07:37-07:00 actor-conn-state: PASS - - 2026-04-26T14:07:42-07:00 conn-error-serialization: PASS - - 2026-04-26T14:08:14-07:00 actor-destroy: PASS - - 2026-04-26T14:08:19-07:00 request-access: PASS - - 2026-04-26T14:08:31-07:00 actor-handle: PASS - - 2026-04-26T14:08:31-07:00 action-features: PASS - - 2026-04-26T14:08:46-07:00 access-control: PASS - - 2026-04-26T14:08:51-07:00 actor-vars: PASS - - 2026-04-26T14:08:58-07:00 actor-metadata: PASS - - 2026-04-26T14:08:59-07:00 actor-onstatechange: PASS - - 2026-04-26T14:10:59-07:00 actor-db: FAIL (exit 124) - - 2026-04-26T14:12:00-07:00 runner: stale suite-description filters found for action-features, actor-onstatechange, actor-db, gateway-query-url, and likely other renamed suites; switching to per-file bare filter. - - 2026-04-26T14:12:54-07:00 action-features: PASS (bare file filter) - - 2026-04-26T14:12:59-07:00 actor-onstatechange: PASS (bare file filter) - - 2026-04-26T14:17:33-07:00 actor-db: FAIL (exit 1, bare file filter) - -- 2026-05-02T00:19:36-07:00 actor-conn-state: FAIL - new onConnect send regression timed out before sender wiring fix. - -- 2026-05-02T00:24:56-07:00 actor-conn-state: PASS (static/bare file filter, 9 tests). - -- 2026-05-02T02:26:38-07:00 actor-conn-state: PASS (static/bare file filter with c.conns onConnect send, 9 tests). - -- 2026-05-02T02:55:45-07:00 actor-conn-state: PASS (static/bare file filter with explicit pre-await onConnect subscription regression, 9 tests). -======= - 2026-05-01 12:45:05 PDT actor-db: FAIL - 4 failures in static/bare run. First failing test reproduced standalone: `persists across sleep and wake cycles` returned count 0 instead of 1 after sleep/wake. - 2026-05-01 13:02:09 PDT actor-db: PASS (13 passed, 26 skipped, 25.4s). Fixed VFS persisted page-1 bootstrap, hot-only sparse page reads, and actor2 serverful reallocate transition ordering. - 2026-05-01 13:02:30 PDT actor-db-raw: PASS (5 passed, 10 skipped, 4.5s). @@ -74,4 +48,12 @@ Scope: DB driver tests only - 2026-05-01 14:27:04 PDT actor-db-stress rerun: PASS (3 passed, 28.7s). - 2026-05-01 14:28:00 PDT actor-db-init-order rerun: PASS (6 passed, 12 skipped, 7.9s). - 2026-05-01 14:28:04 PDT DB TESTS RERUN COMPLETE - 6/6 DB file groups passed for static/bare. ->>>>>>> 62f797206 (feat(sqlite): pitr & forking) +- 2026-05-02T02:55:45-07:00 actor-conn-state: PASS (static/bare file filter with explicit pre-await onConnect subscription regression, 9 tests). +- 2026-05-03 18:13 PDT actor-sleep-db rerun [native]: PASS (26 passed, 208 skipped, 62.6s). +- 2026-05-03 18:25 PDT DB TESTS RERUN STARTED [native only]. +- 2026-05-03 18:25 PDT actor-db rerun [native]: PASS (13 passed, 104 skipped, 14.0s). +- 2026-05-03 18:26 PDT actor-db-raw rerun [native]: PASS (5 passed, 40 skipped, 6.7s). +- 2026-05-03 18:27 PDT actor-db-pragma-migration rerun [native]: PASS (4 passed, 32 skipped, 4.4s). +- 2026-05-03 18:28 PDT actor-db-stress rerun [native]: PASS (5 passed, 40 skipped, 25.2s). +- 2026-05-03 18:29 PDT actor-db-init-order rerun [native]: PASS (6 passed, 48 skipped, 6.6s). +- 2026-05-03 18:29 PDT DB TESTS RERUN COMPLETE [native only] - 6/6 DB file groups passed. diff --git a/.claude/scheduled_tasks.lock b/.claude/scheduled_tasks.lock deleted file mode 100644 index 744e53fe1f..0000000000 --- a/.claude/scheduled_tasks.lock +++ /dev/null @@ -1 +0,0 @@ -{"sessionId":"c093af1a-f110-4b76-b744-ee93ecc131c6","pid":3823846,"procStart":"15021522","acquiredAt":1777764221363} \ No newline at end of file diff --git a/examples/kitchen-sink/frontend/App.tsx b/examples/kitchen-sink/frontend/App.tsx index 95cd5f957d..cc449043c8 100644 --- a/examples/kitchen-sink/frontend/App.tsx +++ b/examples/kitchen-sink/frontend/App.tsx @@ -929,6 +929,7 @@ type AgenticHandle = { expectedRequests: number; expectedTotalRows: number; totalRows: number; + rows: AgenticEntry[]; unexpectedRequestIds: string[]; requests: AgenticVerification[]; ok: boolean; @@ -1133,14 +1134,15 @@ function MockAgenticLoopPanel({ page }: { page: PageConfig }) { seconds: number; received: number[]; } | null>(null); + const [queuedRequests, setQueuedRequests] = useState([]); const [expectedRequests, setExpectedRequests] = useState([]); const [lastVerification, setLastVerification] = useState("No requests yet."); const [lastHistory, setLastHistory] = useState("No history loaded yet."); const [lastBypass, setLastBypass] = useState("No bypass requests yet."); const [lastHealth, setLastHealth] = useState("No health checks yet."); const [isConnecting, setIsConnecting] = useState(false); - const [isRunningInference, setIsRunningInference] = useState(false); const [isCheckingHealth, setIsCheckingHealth] = useState(false); + const [isVerifying, setIsVerifying] = useState(false); const [stats, setStats] = useState({ requests: 0, expectedRows: 0, @@ -1164,6 +1166,7 @@ function MockAgenticLoopPanel({ page }: { page: PageConfig }) { const handleRef = useRef(null); const socketRef = useRef(null); const expectedRequestsRef = useRef([]); + const pendingRequestsRef = useRef([]); const activeRequestRef = useRef(null); const reconnectTimerRef = useRef | null>(null); const progressTimerRef = useRef | null>(null); @@ -1242,17 +1245,19 @@ function MockAgenticLoopPanel({ page }: { page: PageConfig }) { socketRef.current = null; handleRef.current = null; expectedRequestsRef.current = []; + pendingRequestsRef.current = []; activeRequestRef.current = null; setKey(randomAgenticKey()); setActorId(""); setConnectionStatus("idle"); setCurrentRequest(null); + setQueuedRequests([]); setExpectedRequests([]); - setIsRunningInference(false); setLastVerification("No requests yet."); setLastHistory("No history loaded yet."); setLastBypass("No bypass requests yet."); setLastHealth("No health checks yet."); + setIsVerifying(false); setStats({ requests: 0, expectedRows: 0, @@ -1312,21 +1317,41 @@ function MockAgenticLoopPanel({ page }: { page: PageConfig }) { const verifyAll = useCallback(async () => { const handle = handleRef.current; if (!handle) return; - const result = await handle.verifyAll(expectedRequestsRef.current); - if (!result.ok) { - markValidationError(`aggregate verification failed: ${formatJson(result)}`); - return; + setIsVerifying(true); + try { + const expectedState = { + completed: expectedRequestsRef.current, + active: activeRequestRef.current + ? { + requestId: activeRequestRef.current.requestId, + seconds: activeRequestRef.current.seconds, + received: activeRequestRef.current.received, + } + : null, + queued: pendingRequestsRef.current, + }; + const result = await handle.verifyAll(expectedRequestsRef.current); + setStats((prev) => ({ + ...prev, + actualRows: result.totalRows, + expectedRows: result.expectedTotalRows, + validationErrors: result.ok + ? prev.validationErrors + : prev.validationErrors + 1, + })); + setLastVerification(formatJson({ expectedState, actorRows: result })); + addLog( + result.ok ? "ok" : "error", + `manual verify ${result.ok ? "ok" : "failed"} expectedRows=${result.expectedTotalRows} actorRows=${result.totalRows} unexpected=${result.unexpectedRequestIds.length}`, + ); + requestHistory(); + } catch (error) { + const message = error instanceof Error ? error.message : String(error); + markValidationError(`manual verification failed: ${message}`); + } finally { + setIsVerifying(false); } - setStats((prev) => ({ - ...prev, - actualRows: result.totalRows, - expectedRows: result.expectedTotalRows, - })); - addLog( - "ok", - `verified all requests=${result.expectedRequests} rows=${result.totalRows}`, - ); - }, [addLog, markValidationError]); + }, [addLog, markValidationError, requestHistory]); const handleHistory = useCallback((message: AgenticHistory) => { const validation = validateAgenticRows( @@ -1354,6 +1379,44 @@ function MockAgenticLoopPanel({ page }: { page: PageConfig }) { } }, [addLog]); + const handleStarted = useCallback( + (message: Extract) => { + const [nextRequest, ...remainingRequests] = pendingRequestsRef.current; + if (!nextRequest || nextRequest.requestId !== message.requestId) { + markValidationError( + `unexpected start for ${message.requestId.slice(0, 8)}`, + ); + } + if (activeRequestRef.current) { + markValidationError( + `started ${message.requestId.slice(0, 8)} while another inference is active`, + ); + } + + pendingRequestsRef.current = remainingRequests; + setQueuedRequests(remainingRequests); + activeRequestRef.current = { + requestId: message.requestId, + seconds: message.seconds, + expectedIdx: 1, + received: [], + lastProgressAt: performance.now(), + startedAt: performance.now(), + }; + setCurrentRequest({ + requestId: message.requestId, + seconds: message.seconds, + received: [], + }); + addLog( + "ok", + `started ${message.requestId.slice(0, 8)} seconds=${message.seconds}`, + ); + scheduleProgressTimeout(); + }, + [addLog, markValidationError, scheduleProgressTimeout], + ); + const handleProgress = useCallback((message: Extract) => { const active = activeRequestRef.current; if (!active || active.requestId !== message.requestId) { @@ -1385,7 +1448,6 @@ function MockAgenticLoopPanel({ page }: { page: PageConfig }) { const handleDone = useCallback(async (message: Extract) => { const active = activeRequestRef.current; clearProgressTimer(); - setIsRunningInference(false); activeRequestRef.current = null; if (!active || active.requestId !== message.requestId) { @@ -1403,20 +1465,6 @@ function MockAgenticLoopPanel({ page }: { page: PageConfig }) { return; } - const handle = handleRef.current; - if (handle) { - const explicit = await handle.verify(active.requestId, active.seconds); - const explicitOk = - explicit.count === active.seconds && - explicit.indexes.every((idx, offset) => idx === offset + 1); - if (!explicitOk) { - markValidationError( - `action verification failed: ${formatJson(explicit)}`, - ); - return; - } - } - const completed = { requestId: active.requestId, seconds: active.seconds, @@ -1435,9 +1483,8 @@ function MockAgenticLoopPanel({ page }: { page: PageConfig }) { "ok", `done ${active.requestId.slice(0, 8)} rows=${active.seconds}`, ); - await verifyAll(); requestHistory(); - }, [addLog, clearProgressTimer, markValidationError, requestHistory, verifyAll]); + }, [addLog, clearProgressTimer, markValidationError, requestHistory]); const onSocketMessage = useCallback((event: MessageEvent) => { if (typeof event.data !== "string") return; @@ -1459,7 +1506,7 @@ function MockAgenticLoopPanel({ page }: { page: PageConfig }) { return; } if (message.type === "started") { - addLog("ok", `started ${message.requestId.slice(0, 8)} seconds=${message.seconds}`); + handleStarted(message); return; } if (message.type === "progress") { @@ -1473,7 +1520,7 @@ function MockAgenticLoopPanel({ page }: { page: PageConfig }) { if (message.type === "error") { markValidationError(`actor error: ${message.message}`); } - }, [addLog, handleDone, handleHistory, handleProgress, markValidationError]); + }, [addLog, handleDone, handleHistory, handleProgress, handleStarted, markValidationError]); const connect = useCallback(async (countReconnect = false) => { if (isConnecting) return; @@ -1570,26 +1617,17 @@ function MockAgenticLoopPanel({ page }: { page: PageConfig }) { addLog("error", "main websocket is not connected"); return; } - if (activeRequestRef.current) { - addLog("warn", "inference already active"); - return; - } const safeSeconds = Math.max(1, Math.floor(seconds)); const requestId = crypto.randomUUID(); - activeRequestRef.current = { - requestId, - seconds: safeSeconds, - expectedIdx: 1, - received: [], - lastProgressAt: performance.now(), - startedAt: performance.now(), - }; - setCurrentRequest({ requestId, seconds: safeSeconds, received: [] }); - setIsRunningInference(true); + const queuedRequest = { requestId, seconds: safeSeconds }; + pendingRequestsRef.current = [...pendingRequestsRef.current, queuedRequest]; + setQueuedRequests(pendingRequestsRef.current); socket.send(JSON.stringify({ type: "infer", requestId, seconds: safeSeconds })); - addLog("info", `infer ${requestId.slice(0, 8)} seconds=${safeSeconds}`); - scheduleProgressTimeout(); - }, [addLog, scheduleProgressTimeout, seconds]); + addLog( + "info", + `queued infer ${requestId.slice(0, 8)} seconds=${safeSeconds} queue=${pendingRequestsRef.current.length}`, + ); + }, [addLog, seconds]); const forceSleep = useCallback(async () => { if (!actorId) { @@ -1892,6 +1930,20 @@ function MockAgenticLoopPanel({ page }: { page: PageConfig }) { }, [clearProgressTimer]); const currentIndexes = currentRequest?.received ?? []; + const expectedStateText = formatJson({ + completed: expectedRequests.map((request) => ({ + requestId: request.requestId, + seconds: request.seconds, + })), + active: currentRequest + ? { + requestId: currentRequest.requestId, + seconds: currentRequest.seconds, + received: currentRequest.received, + } + : null, + queued: queuedRequests, + }); const invariantStatus = stats.validationErrors === 0 ? "pass" : "fail"; @@ -1996,14 +2048,22 @@ function MockAgenticLoopPanel({ page }: { page: PageConfig }) { +
{currentRequest ? ( @@ -2030,6 +2090,11 @@ function MockAgenticLoopPanel({ page }: { page: PageConfig }) { ) : (
No active inference.
)} + {queuedRequests.length > 0 && ( +
+ Queued: {queuedRequests.map((request) => request.requestId.slice(0, 8)).join(", ")} +
+ )}
@@ -2115,6 +2180,7 @@ function MockAgenticLoopPanel({ page }: { page: PageConfig }) { +
{expectedStateText}
{lastVerification}
{lastHistory}
diff --git a/examples/kitchen-sink/package.json b/examples/kitchen-sink/package.json index e55478a15c..acb2ac47df 100644 --- a/examples/kitchen-sink/package.json +++ b/examples/kitchen-sink/package.json @@ -15,6 +15,7 @@ "memory-soak": "tsx scripts/sqlite-memory-soak.ts", "proc-metrics": "tsx scripts/proc-metrics-report.ts", "smoke:raw-websocket-serverless": "tsx scripts/raw-websocket-serverless-smoke.ts", + "fuzz:sleep-close": "tsx scripts/sleep-close-fuzz.ts", "mock-agentic-loop": "tsx scripts/mock-agentic-loop.ts", "benchmark": "tsx scripts/benchmark.ts", "db:generate": "find src/actors -name drizzle.config.ts -exec drizzle-kit generate --config {} \\;" diff --git a/examples/kitchen-sink/scripts/sleep-close-fuzz.ts b/examples/kitchen-sink/scripts/sleep-close-fuzz.ts new file mode 100644 index 0000000000..5195b0ce7e --- /dev/null +++ b/examples/kitchen-sink/scripts/sleep-close-fuzz.ts @@ -0,0 +1,368 @@ +// Fuzz test for the force-sleep → non-hibernatable WS close path. +// +// For each parallel worker, repeats: +// 1. getOrCreate a unique sleepCloseFuzz actor +// 2. Open a raw WebSocket and wait for `welcome` +// 3. Wait `WAIT_BEFORE_SLEEP_MS` (a few seconds of normal operation) +// 4. POST {endpoint}/actors/{id}/sleep +// 5. Measure how long until the client close event fires +// 6. Flag a leak if the close does not arrive within `LEAK_THRESHOLD_MS` +// +// Usage: +// RIVET_ENDPOINT=http://127.0.0.1:6420 \ +// FUZZ_PARALLELISM=20 \ +// FUZZ_DURATION_MS=120000 \ +// pnpm --filter kitchen-sink fuzz:sleep-close + +import { createClient } from "rivetkit/client"; +import type { registry } from "../src/index.ts"; + +const ENDPOINT = process.env.RIVET_ENDPOINT ?? "http://127.0.0.1:6420"; +const NAMESPACE = + process.env.FUZZ_NAMESPACE ?? process.env.RIVET_NAMESPACE ?? "default"; +const TOKEN = process.env.FUZZ_TOKEN ?? process.env.RIVET_TOKEN ?? "dev"; +const PARALLELISM = Number(process.env.FUZZ_PARALLELISM ?? "10"); +const DURATION_MS = Number(process.env.FUZZ_DURATION_MS ?? "60000"); +const WAIT_BEFORE_SLEEP_MS = Number(process.env.FUZZ_WAIT_BEFORE_SLEEP_MS ?? "2000"); +const WAIT_BEFORE_SLEEP_JITTER_MS = Number( + process.env.FUZZ_WAIT_BEFORE_SLEEP_JITTER_MS ?? "3000", +); +const LEAK_THRESHOLD_MS = Number(process.env.FUZZ_LEAK_THRESHOLD_MS ?? "30000"); +const KEY_PREFIX = process.env.FUZZ_KEY_PREFIX ?? `sleep-close-fuzz-${Date.now()}`; +const STAGGER_MS = Number(process.env.FUZZ_STAGGER_MS ?? "100"); +const VERBOSE = process.env.FUZZ_VERBOSE === "1"; + +interface IterationResult { + workerIndex: number; + iteration: number; + actorId: string; + openMs: number; + sleepPostMs: number; + closeMs: number | null; // null if leaked + closeCode?: number; + closeReason?: string; + leaked: boolean; + error?: string; +} + +function sleep(ms: number): Promise { + return new Promise((resolve) => setTimeout(resolve, ms)); +} + +function formatError(error: unknown): string { + if (error instanceof Error) return `${error.name}: ${error.message}`; + return String(error); +} + +function appendPath(endpoint: string, path: string): URL { + const url = new URL(endpoint); + const prefix = url.pathname.replace(/\/$/, ""); + url.pathname = `${prefix}${path}`; + url.search = ""; + url.hash = ""; + return url; +} + +function buildSleepUrl(actorId: string): string { + const url = appendPath( + ENDPOINT, + `/actors/${encodeURIComponent(actorId)}/sleep`, + ); + url.searchParams.set("namespace", NAMESPACE); + return url.toString(); +} + +function buildWebSocketUrl(actorId: string): string { + const tokenSegment = TOKEN ? `@${encodeURIComponent(TOKEN)}` : ""; + const url = appendPath( + ENDPOINT, + `/gateway/${encodeURIComponent(actorId)}${tokenSegment}/websocket`, + ); + url.protocol = url.protocol === "https:" ? "wss:" : "ws:"; + return url.toString(); +} + +async function waitForOpen(ws: WebSocket, timeoutMs: number): Promise { + if (ws.readyState === WebSocket.OPEN) return; + await new Promise((resolve, reject) => { + const t = setTimeout(() => reject(new Error("ws open timeout")), timeoutMs); + ws.addEventListener( + "open", + () => { + clearTimeout(t); + resolve(); + }, + { once: true }, + ); + ws.addEventListener( + "error", + () => { + clearTimeout(t); + reject(new Error("websocket error")); + }, + { once: true }, + ); + ws.addEventListener( + "close", + (event) => { + clearTimeout(t); + reject( + new Error( + `websocket closed before open code=${event.code} reason=${event.reason}`, + ), + ); + }, + { once: true }, + ); + }); +} + +async function runIteration( + workerIndex: number, + iteration: number, +): Promise { + const key = `${KEY_PREFIX}-w${workerIndex}-i${iteration}`; + const client = createClient({ + endpoint: ENDPOINT, + namespace: NAMESPACE, + token: TOKEN, + }); + + const handle = client.sleepCloseFuzz.getOrCreate([key]); + const t0 = Date.now(); + const actorId = await handle.resolve(); + + const wsUrl = buildWebSocketUrl(actorId); + const ws = new WebSocket(wsUrl, ["rivet", "rivet_encoding.json"]); + + let closeCode: number | undefined; + let closeReason: string | undefined; + let closeAt = 0; + const closePromise = new Promise((resolve) => { + ws.addEventListener( + "close", + (event) => { + closeCode = event.code; + closeReason = event.reason; + closeAt = Date.now(); + resolve(); + }, + { once: true }, + ); + }); + + try { + await waitForOpen(ws, 15_000); + } catch (error) { + try { + ws.close(); + } catch {} + return { + workerIndex, + iteration, + actorId, + openMs: Date.now() - t0, + sleepPostMs: 0, + closeMs: null, + leaked: false, + error: `open: ${formatError(error)}`, + }; + } + + const openMs = Date.now() - t0; + + // Run for a few seconds with random jitter so sleep timing varies across iterations. + const wait = + WAIT_BEFORE_SLEEP_MS + Math.floor(Math.random() * WAIT_BEFORE_SLEEP_JITTER_MS); + await sleep(wait); + + const tSleepStart = Date.now(); + let sleepPostStatus = 0; + try { + const response = await fetch(buildSleepUrl(actorId), { + method: "POST", + headers: { + Authorization: TOKEN ? `Bearer ${TOKEN}` : "", + "content-type": "application/json", + }, + body: "{}", + }); + sleepPostStatus = response.status; + if (!response.ok) { + const body = await response.text(); + throw new Error(`sleep POST status=${response.status} body=${body}`); + } + } catch (error) { + try { + ws.close(); + } catch {} + return { + workerIndex, + iteration, + actorId, + openMs, + sleepPostMs: Date.now() - tSleepStart, + closeMs: null, + leaked: false, + error: `sleep-post: ${formatError(error)} status=${sleepPostStatus}`, + }; + } + const sleepPostMs = Date.now() - tSleepStart; + + // Race: close event vs leak threshold + const leakTimeout = sleep(LEAK_THRESHOLD_MS).then(() => "timeout" as const); + const result = await Promise.race([ + closePromise.then(() => "closed" as const), + leakTimeout, + ]); + + if (result === "timeout") { + // Leak: close did not arrive in time. Force-close client side and flag it. + const stillOpen = ws.readyState === WebSocket.OPEN || ws.readyState === WebSocket.CONNECTING; + try { + ws.close(4000, "fuzz leak forced close"); + } catch {} + return { + workerIndex, + iteration, + actorId, + openMs, + sleepPostMs, + closeMs: null, + leaked: stillOpen, + error: stillOpen ? undefined : "race lost but ws already closed", + }; + } + + const closeMs = closeAt - tSleepStart; + return { + workerIndex, + iteration, + actorId, + openMs, + sleepPostMs, + closeMs, + closeCode, + closeReason, + leaked: false, + }; +} + +async function runWorker( + workerIndex: number, + stopAt: number, + results: IterationResult[], +): Promise { + if (workerIndex * STAGGER_MS > 0) { + await sleep(workerIndex * STAGGER_MS); + } + let iteration = 0; + while (Date.now() < stopAt) { + iteration += 1; + try { + const result = await runIteration(workerIndex, iteration); + results.push(result); + if (result.leaked) { + console.error( + `[LEAK] worker=${workerIndex} iter=${iteration} actorId=${result.actorId} openMs=${result.openMs} sleepPostMs=${result.sleepPostMs} did NOT close within ${LEAK_THRESHOLD_MS}ms`, + ); + } else if (result.error) { + console.warn( + `[err] worker=${workerIndex} iter=${iteration} actorId=${result.actorId} ${result.error}`, + ); + } else if (VERBOSE) { + console.log( + `[ok] worker=${workerIndex} iter=${iteration} actorId=${result.actorId} closeMs=${result.closeMs} code=${result.closeCode} reason=${result.closeReason}`, + ); + } + } catch (error) { + console.error( + `[fatal-iter] worker=${workerIndex} iter=${iteration} ${formatError(error)}`, + ); + results.push({ + workerIndex, + iteration, + actorId: "", + openMs: 0, + sleepPostMs: 0, + closeMs: null, + leaked: false, + error: formatError(error), + }); + } + } +} + +function summarize(results: IterationResult[]) { + const total = results.length; + const leaks = results.filter((r) => r.leaked); + const errors = results.filter((r) => r.error && !r.leaked); + const ok = results.filter((r) => !r.leaked && !r.error); + const closeMs = ok.map((r) => r.closeMs ?? 0).sort((a, b) => a - b); + + function pct(p: number): number { + if (closeMs.length === 0) return 0; + const i = Math.min(closeMs.length - 1, Math.floor((p / 100) * closeMs.length)); + return closeMs[i]; + } + + const avg = + closeMs.length > 0 + ? closeMs.reduce((s, x) => s + x, 0) / closeMs.length + : 0; + + console.log(`\n[summary] ===========================`); + console.log(` total iterations: ${total}`); + console.log(` ok: ${ok.length}`); + console.log(` errors: ${errors.length}`); + console.log(` LEAKS: ${leaks.length}`); + if (ok.length > 0) { + console.log( + ` closeMs avg=${avg.toFixed(0)} p50=${pct(50)} p95=${pct(95)} p99=${pct(99)} max=${closeMs[closeMs.length - 1]}`, + ); + } + if (leaks.length > 0) { + console.log(`\n[leaks] -------------------------`); + for (const leak of leaks) { + console.log( + ` worker=${leak.workerIndex} iter=${leak.iteration} actorId=${leak.actorId} openMs=${leak.openMs} sleepPostMs=${leak.sleepPostMs}`, + ); + } + } + if (errors.length > 0 && VERBOSE) { + console.log(`\n[errors] ------------------------`); + const byMsg = new Map(); + for (const e of errors) { + const msg = e.error ?? ""; + byMsg.set(msg, (byMsg.get(msg) ?? 0) + 1); + } + for (const [msg, count] of byMsg) { + console.log(` x${count} ${msg}`); + } + } +} + +async function main() { + if (!Number.isInteger(PARALLELISM) || PARALLELISM < 1) { + throw new Error("FUZZ_PARALLELISM must be a positive integer"); + } + + console.log( + `[fuzz] endpoint=${ENDPOINT} namespace=${NAMESPACE} parallelism=${PARALLELISM} durationMs=${DURATION_MS} waitBeforeSleepMs=${WAIT_BEFORE_SLEEP_MS}+jitter${WAIT_BEFORE_SLEEP_JITTER_MS} leakThresholdMs=${LEAK_THRESHOLD_MS} keyPrefix=${KEY_PREFIX}`, + ); + + const results: IterationResult[] = []; + const stopAt = Date.now() + DURATION_MS; + await Promise.all( + Array.from({ length: PARALLELISM }, (_, i) => runWorker(i, stopAt, results)), + ); + summarize(results); + + const leaks = results.filter((r) => r.leaked).length; + process.exit(leaks > 0 ? 1 : 0); +} + +main().catch((error) => { + console.error(`[fatal] ${formatError(error)}`); + process.exit(1); +}); diff --git a/examples/kitchen-sink/src/actors/testing/mock-agentic-loop.ts b/examples/kitchen-sink/src/actors/testing/mock-agentic-loop.ts index 1e7f00c3c0..d431912054 100644 --- a/examples/kitchen-sink/src/actors/testing/mock-agentic-loop.ts +++ b/examples/kitchen-sink/src/actors/testing/mock-agentic-loop.ts @@ -253,6 +253,7 @@ function verifyAllRows(rows: EntryRow[], expectedRequests: ExpectedRequest[]) { expectedRequests: expectedRequests.length, expectedTotalRows, totalRows: rows.length, + rows, unexpectedRequestIds, requests, ok, @@ -505,10 +506,6 @@ export const mockAgenticLoop = actor({ } if (type === "infer") { - if (activeInference !== undefined) { - throw new Error("inference already active"); - } - const requestId = stringValue(message.requestId, "requestId"); const seconds = positiveInteger(message.seconds, "seconds"); await recordDebugEvent(c, { @@ -519,11 +516,16 @@ export const mockAgenticLoop = actor({ seconds, }, }); - const inference = runInference(requestId, seconds).finally(() => { - activeInference = undefined; - }); + const previousInference = activeInference; + const inference = (async () => { + await previousInference?.catch(() => undefined); + await runInference(requestId, seconds); + })(); activeInference = inference; await c.keepAwake(inference); + if (activeInference === inference) { + activeInference = undefined; + } return; } diff --git a/examples/kitchen-sink/src/actors/testing/sleep-close-fuzz.ts b/examples/kitchen-sink/src/actors/testing/sleep-close-fuzz.ts new file mode 100644 index 0000000000..2d8aa819ca --- /dev/null +++ b/examples/kitchen-sink/src/actors/testing/sleep-close-fuzz.ts @@ -0,0 +1,61 @@ +import { actor, type RivetMessageEvent, type UniversalWebSocket } from "rivetkit"; + +// Minimal non-hibernatable WebSocket actor for fuzz-testing the +// force-sleep → gateway close path. Keeps state intentionally tiny so the +// only thing being exercised is the close lifecycle, not user code. +export const sleepCloseFuzz = actor({ + options: { + canHibernateWebSocket: false, + }, + state: { + connectionCount: 0, + messageCount: 0, + }, + onWebSocket(c, websocket: UniversalWebSocket) { + c.state.connectionCount += 1; + const connectionId = crypto.randomUUID(); + + websocket.send( + JSON.stringify({ + type: "welcome", + connectionId, + connectionCount: c.state.connectionCount, + }), + ); + + const interval = setInterval(() => { + if (websocket.readyState !== 1) return; + websocket.send( + JSON.stringify({ + type: "tick", + connectionId, + timestamp: Date.now(), + }), + ); + }, 500); + + websocket.addEventListener("message", (event: RivetMessageEvent) => { + c.state.messageCount += 1; + websocket.send( + JSON.stringify({ + type: "echo", + connectionId, + received: event.data, + }), + ); + }); + + websocket.addEventListener("close", () => { + clearInterval(interval); + c.state.connectionCount -= 1; + }); + }, + actions: { + getStats(c) { + return { + connectionCount: c.state.connectionCount, + messageCount: c.state.messageCount, + }; + }, + }, +}); diff --git a/examples/kitchen-sink/src/index.ts b/examples/kitchen-sink/src/index.ts index c3268e9035..a4d5f2fb04 100644 --- a/examples/kitchen-sink/src/index.ts +++ b/examples/kitchen-sink/src/index.ts @@ -122,6 +122,7 @@ import { sqliteRealworldBench } from "./actors/testing/sqlite-realworld-bench.ts import { rawSqliteFuzzer } from "./actors/testing/raw-sqlite-fuzzer.ts"; import { sqliteMemoryPressure } from "./actors/testing/sqlite-memory-pressure.ts"; import { mockAgenticLoop } from "./actors/testing/mock-agentic-loop.ts"; +import { sleepCloseFuzz } from "./actors/testing/sleep-close-fuzz.ts"; // AI import { aiAgent } from "./actors/ai/ai-agent.ts"; @@ -277,6 +278,7 @@ export const registry = setup({ rawSqliteFuzzer, sqliteMemoryPressure, mockAgenticLoop, + sleepCloseFuzz, // AI aiAgent, }, diff --git a/rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/actors/sleepAbortListenerVarsActor.ts b/rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/actors/sleepAbortListenerVarsActor.ts new file mode 100644 index 0000000000..fd9e1105ee --- /dev/null +++ b/rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/actors/sleepAbortListenerVarsActor.ts @@ -0,0 +1,3 @@ +import { sleepAbortListenerVarsActor } from "../sleep"; + +export default sleepAbortListenerVarsActor; diff --git a/rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/registry-static.ts b/rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/registry-static.ts index 6a940f1c42..df4f7c22d0 100644 --- a/rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/registry-static.ts +++ b/rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/registry-static.ts @@ -111,6 +111,7 @@ import { sleepRawWsSendOnSleep, sleepRawWsDelayedSendOnSleep, sleepWithWaitUntilInOnWake, + sleepAbortListenerVarsActor, } from "./sleep"; import { sleepWithDb, @@ -207,6 +208,7 @@ export const registry = setup({ sleepRawWsSendOnSleep, sleepRawWsDelayedSendOnSleep, sleepWithWaitUntilInOnWake, + sleepAbortListenerVarsActor, counterWaitUntilProbe, // From sleep-db.ts sleepWithDb, diff --git a/rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/sleep.ts b/rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/sleep.ts index 28da755263..56fb3aa865 100644 --- a/rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/sleep.ts +++ b/rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/sleep.ts @@ -499,6 +499,104 @@ export const counterWaitUntilProbe = actor({ }, }); +type AbortVarsObservation = + | "unset" + | "object" + | "undefined" + | `error:${string}`; + +export const sleepAbortListenerVarsActor = actor({ + state: { + startCount: 0, + sleepCount: 0, + // Per-wake observations keyed by `startCount` (1-indexed). Each + // wake registers its own abort listener that records what + // `c.vars` looked like when the listener fired. The user-reported + // crash typically lands on a *later* wake/sleep cycle once the + // runtime has been bounced through reset/cleanup at least once. + abortVarsSeenPerWake: [] as AbortVarsObservation[], + }, + createVars: () => ({ + isStopping: false, + }), + onWake: (c) => { + c.state.startCount += 1; + const wakeIndex = c.state.startCount; + c.vars.isStopping = c.aborted; + + // Reserve a slot for this wake's observation. + const observations = [...c.state.abortVarsSeenPerWake]; + observations[wakeIndex - 1] = "unset"; + c.state.abortVarsSeenPerWake = observations; + + if (c.aborted) { + return; + } + + c.abortSignal.addEventListener( + "abort", + () => { + let observation: AbortVarsObservation; + try { + const vars = c.vars as + | { isStopping: boolean } + | undefined; + if (vars === undefined || vars === null) { + observation = "undefined"; + } else { + vars.isStopping = true; + observation = "object"; + } + } catch (error) { + observation = `error:${ + error instanceof Error + ? error.message + : String(error) + }`; + } + try { + const next = [...c.state.abortVarsSeenPerWake]; + next[wakeIndex - 1] = observation; + c.state.abortVarsSeenPerWake = next; + } catch { + // State write may itself fail if the bag was cleared. + // Swallow so the test still reads `getStatus` cleanly. + } + }, + { once: true }, + ); + }, + onSleep: async (c) => { + c.state.sleepCount += 1; + // Drain a few macrotasks so the cleanup that runs in this handler's + // `finally` (cleanupNativeSleepRuntimeState) has the best chance of + // landing before any other shutdown TSF (notably the abort signal) + // is processed on the JS side. This makes the abort-listener race + // observable in the test. + for (let i = 0; i < 5; i += 1) { + await new Promise((resolve) => setTimeout(resolve, 0)); + } + }, + onWebSocket: (c, websocket) => { + websocket.send(JSON.stringify({ type: "connected" })); + }, + actions: { + triggerSleep: (c) => { + c.sleep(); + }, + getStatus: (c) => { + return { + startCount: c.state.startCount, + sleepCount: c.state.sleepCount, + abortVarsSeenPerWake: c.state.abortVarsSeenPerWake, + }; + }, + }, + options: { + sleepTimeout: SLEEP_TIMEOUT, + }, +}); + export const sleepWithNoSleepOption = actor({ state: { startCount: 0, sleepCount: 0 }, onWake: (c) => { diff --git a/rivetkit-typescript/packages/rivetkit/tests/driver/actor-sleep.test.ts b/rivetkit-typescript/packages/rivetkit/tests/driver/actor-sleep.test.ts index 6c7d3254fe..28fc0a76d6 100644 --- a/rivetkit-typescript/packages/rivetkit/tests/driver/actor-sleep.test.ts +++ b/rivetkit-typescript/packages/rivetkit/tests/driver/actor-sleep.test.ts @@ -303,6 +303,82 @@ describeDriverMatrix("Actor Sleep", (driverTestConfig) => { } }); + // Reproduces the user-reported pattern where an abort listener is + // registered inside `onWake` and touches `c.vars` when the actor + // begins shutting down: + // + // onWake: (c) => { + // c.abortSignal.addEventListener('abort', () => { + // c.vars.isStopping = true; // TypeError in production + // }, { once: true }); + // } + // + // In production this throws `Cannot set properties of undefined + // (setting 'isStopping')` when the abort TSF lands on the JS side + // after `onSleep`'s `finally` block has already run + // `cleanupNativeSleepRuntimeState`, which clears the per-actor JS + // runtime-state bag that backs the `vars` getter + // (rivetkit/src/registry/native.ts:419, 2484, 3996). The next read + // of `c.vars` then sees a freshly-created empty bag and returns + // `undefined`. + // + // The race is between: + // - the tokio task in rivetkit-napi/src/actor_context.rs:502 + // that dispatches the abort TSF when `actor_token` cancels, + // - the `RunGracefulCleanup` handler in + // rivetkit-napi/src/napi_actor_events.rs:596 that dispatches + // the `on_sleep` TSF. + // Whichever TSF lands first in Node's event-loop FIFO wins. When + // `on_sleep` lands first and its async `finally` (saveState + + // cleanup) finishes before the abort TSF lands, the abort listener + // observes a cleared bag and crashes. + test("c.vars survives in abort listener captured by onWake", async (c) => { + const { client } = await setupDriverTest(c, { + ...driverTestConfig, + useRealTimers: true, + }); + + const sleepActor = + client.sleepAbortListenerVarsActor.getOrCreate(); + + { + const status = await sleepActor.getStatus(); + expect(status.startCount).toBe(1); + expect(status.sleepCount).toBe(0); + expect(status.abortVarsSeenPerWake).toEqual(["unset"]); + } + + // Drive the actor through several wake/sleep cycles. The user + // reported the crash can surface on the *second* abort, after + // the actor has already gone through one full + // reset_runtime_state + cleanup cycle on a previous wake. + for (let cycle = 0; cycle < 3; cycle += 1) { + const ws = await connectRawWebSocketWithRetry(sleepActor); + await sleepActor.triggerSleep(); + await new Promise((resolve) => setTimeout(resolve, 1500)); + try { + ws.close(); + } catch { + // WebSocket may already be closed. + } + } + + const status = await sleepActor.getStatus(); + expect(status.sleepCount).toBe(3); + expect(status.startCount).toBeGreaterThanOrEqual(4); + + // Every abort listener registered in onWake must have observed + // `c.vars` as a real object. If the race condition fires on + // any cycle, that wake's slot reads `"undefined"` (or an error + // string) and the assertion fails — pinning the bug. + for (let i = 0; i < status.abortVarsSeenPerWake.length - 1; i += 1) { + expect({ + wake: i + 1, + observation: status.abortVarsSeenPerWake[i], + }).toEqual({ wake: i + 1, observation: "object" }); + } + }, 60_000); + test("waitUntil accepts promises that resolve to undefined", async (c) => { const { client, getRuntimeOutput } = await setupDriverTest( c,