From 307c3627970706e8cc2ac4577ed7514cb37e42e0 Mon Sep 17 00:00:00 2001 From: Nathan Flurry Date: Sun, 3 May 2026 15:46:19 -0700 Subject: [PATCH] test(kitchen-sink): add sqlite memory soak harness --- .agent/notes/sqlite-memory-soak-issues.md | 508 +++++ examples/kitchen-sink/frontend/page-data.ts | 36 + examples/kitchen-sink/package.json | 2 + .../scripts/proc-metrics-report.ts | 711 +++++++ .../scripts/sqlite-memory-soak.ts | 1865 +++++++++++++++++ .../actors/testing/sqlite-memory-pressure.ts | 334 +++ examples/kitchen-sink/src/index.ts | 2 + 7 files changed, 3458 insertions(+) create mode 100644 .agent/notes/sqlite-memory-soak-issues.md create mode 100644 examples/kitchen-sink/scripts/proc-metrics-report.ts create mode 100644 examples/kitchen-sink/scripts/sqlite-memory-soak.ts create mode 100644 examples/kitchen-sink/src/actors/testing/sqlite-memory-pressure.ts diff --git a/.agent/notes/sqlite-memory-soak-issues.md b/.agent/notes/sqlite-memory-soak-issues.md new file mode 100644 index 0000000000..739bb94841 --- /dev/null +++ b/.agent/notes/sqlite-memory-soak-issues.md @@ -0,0 +1,508 @@ +# SQLite Memory Soak Issues + +Date: 2026-05-03 + +This note captures the current known issues from the kitchen-sink SQLite memory soak and release spike work. The goal of the soak was to verify Rivet Actor correctness under SQLite churn and prove whether memory is reclaimed after actors sleep. + +## Current conclusion + +The lower-concurrency runs show that kitchen-sink memory can be reclaimed after actors sleep. The release 200-concurrency runs are not valid leak proofs because SQLite correctness failed before a clean drain. + +The highest-priority issue is the release no-reset spike producing `database disk image is malformed` during SQLite startup/migration. The second issue is the reset-enabled spike failing on fresh actors with many missing-database preload/read errors. Those are correctness issues first; memory conclusions from those runs are secondary. + +## Harness context + +The current harness lives in `examples/kitchen-sink/scripts/sqlite-memory-soak.ts`. + +Important behavior: + +- It spawns the local engine and kitchen-sink serverless process so the harness can sample exact PIDs. +- It samples harness, engine, and kitchen RSS from `/proc`. +- It samples CPU, IO, fd count, and smaps-derived memory details when available. +- It calls the kitchen `/debug/memory` endpoint for JS heap, external memory, native estimate, GC hooks, and active actor diagnostics. +- It supports fixed-timer churn so load is time-based instead of throughput-based. +- It can force an actor to sleep after `--churn-sleep-after-ms`, then spawn another actor to keep target concurrency. +- It supports spike mode with `--spike-min-concurrency`, `--spike-max-concurrency`, and `--spike-period-ms`. +- It supports an idle baseline before workload with `--pre-workload-wait-ms`. + +Reports are rendered by `examples/kitchen-sink/scripts/proc-metrics-report.ts` to `~/tmp/proc-metrics//index.html`. + +The charts currently include RSS, sampled process details, sampled kitchen debug memory, and Envoy-reported active actors as an overlay. Actor wake vertical lines were removed. + +## Release build context + +Release artifacts were built with: + +```bash +cargo build --release -p rivet-engine +pnpm --filter @rivetkit/rivetkit-napi build:force:release +``` + +Release artifact checks: + +- Engine binary: `target/release/rivet-engine`, about 81 MiB, not stripped. +- NAPI module: `rivetkit-typescript/packages/rivetkit-napi/rivetkit-napi.linux-x64-gnu.node`, matching `target/release/librivetkit_napi.so`, about 16 MiB, not stripped. + +Earlier spike runs were not release for both sides. The 10 to 50 spike used debug engine and debug NAPI. + +## Workload shape + +The actor is `examples/kitchen-sink/src/actors/testing/sqlite-memory-pressure.ts`. + +Current important behavior: + +- `onMigrate` creates the workload tables and index. +- `runCycle` inserts many rows, stores blob-like payloads, scans data, and returns integrity/storage metrics. +- `releaseStorage` no longer deletes rows or runs `VACUUM`; that code is commented out per the current focus. +- `reset` still deletes from `pressure_cycles`, deletes from `pressure_rows`, and runs `VACUUM`. +- `onSleep` increments `c.state.sleepCount` and logs a structured JSON line with `kind: "sqlite_memory_pressure_on_sleep"`. + +The original delete/`VACUUM` workload was useful for forcing page churn and cache pressure, but it confused the memory question because it intentionally shrank the database before sleep. For the current concern, actor sleep should release local memory regardless of database contents because the SQLite database is remote. + +## Run artifacts + +### 2 minute fixed-concurrency run + +Run ID: `proc-metrics-2m-c10-20260503-133508` + +Report: `/home/nathan/tmp/proc-metrics/proc-metrics-2m-c10-20260503-133508/index.html` + +Result: + +- Manually stopped around 155s. +- Max actor index: 372. +- Wakes: 373. +- Verified sleeps: 368. +- Envoy active actors: min 3, max 13, final 12. +- Engine RSS: 149.4 MiB start, 317.3 MiB max, 315.2 MiB final. +- Kitchen RSS: 260.1 MiB start, 445.4 MiB max, 264.6 MiB final. + +Interpretation: + +- Kitchen RSS dropped back near baseline while actors were still cycling. +- Active actors were roughly stable around 10. +- JS heap was roughly flat around 31 MiB. +- External memory was roughly flat around 5 MiB. +- Native estimate dropped from roughly 384 MiB to 300 MiB to 206 MiB. +- smaps anonymous/private dirty memory dropped from roughly 340 MiB to 256 MiB to 167 MiB. +- `MALLOC_ARENA_MAX=2` and `MALLOC_TRIM_THRESHOLD_=131072` likely helped RSS return in large chunks. + +This is good evidence that kitchen memory can reclaim after actors sleep. It is not enough by itself to prove there is no leak. + +### 5 minute 10 to 50 spike + +Run ID: `20260503-135136-sqlite-spike-5m-c10-50` + +Report: `/home/nathan/tmp/proc-metrics/20260503-135136-sqlite-spike-5m-c10-50/index.html` + +Build mode: + +- Engine: debug, `/home/nathan/r7/target/debug/rivet-engine`. +- NAPI: debug, matched `target/debug/librivetkit_napi.so`, about 330 MiB with debug info. + +Result: + +- Completed successfully. +- Run errors: 0. +- SQLite cycles: 1854. +- Actor wakes: 1147. +- Verified sleeps: 1147. +- Harness active concurrency hit 50. +- Envoy active actors peaked at 56. +- Envoy active actors final: 0. +- Kitchen RSS: 249.4 MiB start, 573.0 MiB max, 342.6 MiB final. +- Kitchen post-churn: 426.1 MiB start, 334.6 MiB min, 342.6 MiB final. +- Engine RSS: 146.9 MiB start, 355.1 MiB max, 347.9 MiB final. + +Interpretation: + +- Kitchen reclaimed substantially after churn. +- Kitchen finished about 93 MiB above start after 60s of post-churn wait. +- Engine stayed near peak. +- This was a useful signal, but not a full no-leak proof because it was debug mode and the post-churn window was short. + +### Release 10 to 200 spike with reset enabled + +Run ID: `20260503-142448-sqlite-spike-release-5m-c10-200` + +Report: `/home/nathan/tmp/proc-metrics/20260503-142448-sqlite-spike-release-5m-c10-200/index.html` + +Config: + +- Release engine through `RIVET_ENGINE_BINARY=/home/nathan/r7/target/release/rivet-engine`. +- Release NAPI. +- 60s idle baseline. +- 5m target spike from 10 to 200 to 10 concurrent actors. +- 60s post-churn requested. +- Reset enabled. +- Cleanup disabled. +- Forced GC sampling enabled. + +Result: + +- Failed before completion around 113s elapsed. +- Failure happened in `handle.reset()` before the actor's main SQL cycle. +- Client surfaced only `RivetError: An internal error occurred`. +- Engine logs contained many `sqlite get_pages request failed` and `sqlite database was not found in this bucket branch` messages. + +Counts: + +- Samples: 112. +- Final elapsed: 113050ms. +- Max target: 200. +- Max harness concurrency: 197. +- Next actor index: 486. +- Cycles: 763. +- Wakes: 491. +- Sleeps: 491. +- Errors: 1. +- Engine RSS: 89.7 MiB start, 385.9 MiB max, 381.0 MiB final. +- Kitchen RSS: 235.3 MiB start, 1068.1 MiB max, 352.8 MiB final. +- Idle baseline engine: 89.7 MiB to 92.9 MiB. +- Idle baseline kitchen: 235.3 MiB to 236.7 MiB. +- Cycle latency: p50 1411.8ms, p95 4303.4ms, p99 4957.7ms, max 5052.5ms. + +Interpretation: + +- This is not a valid memory-leak result because the run failed early. +- The failing operation was redundant reset on fresh actor IDs. +- The likely issue is in the reset/open/preload/missing-database path under high concurrency. +- Missing fresh DB state should not create a large error storm or poison actor startup. +- Exact root cause was not proven. + +### Release 10 to 200 spike with no reset + +Run ID: `20260503-142821-sqlite-spike-release-5m-c10-200-no-reset` + +Report: `/home/nathan/tmp/proc-metrics/20260503-142821-sqlite-spike-release-5m-c10-200-no-reset/index.html` + +Config: + +- Release engine. +- Release NAPI. +- 60s idle baseline. +- 5m target spike from 10 to 200 to 10 concurrent actors. +- 60s post-churn requested. +- `--no-reset`. +- Cleanup disabled. +- Forced GC sampling enabled. + +Partial progress: + +- Around 70s: 0 errors, 187 cycles, 99 sleeps, Envoy active 62, engine RSS 279.7 MiB, kitchen RSS 356.0 MiB. +- Around 146s: max target/harness 199, max Envoy active 192, 1059 cycles, 902 sleeps, no surfaced run errors, engine RSS 390.3 MiB, kitchen RSS 602.8 MiB, kitchen max 880.3 MiB. +- Around 280s: max Envoy active 204, 2375 cycles, 2164 sleeps, no surfaced run errors, engine RSS 412.7 MiB, kitchen RSS 717.1 MiB, kitchen max 1090.0 MiB. +- After scheduling ended, drain got stuck with Envoy active around 15 and about 10 drivers waiting for sleep completion. + +Final partial stats: + +- Completed: false. +- Manually stopped: true. +- Surfaced harness run errors: 0. +- Samples: 520. +- Final elapsed: 532497ms. +- Max target: 199. +- Max harness concurrency: 199. +- Max Envoy active actors: 204. +- Next actor index: 2265. +- Cycles: 2469. +- Sleeps: 2255. +- Engine RSS: 90.0 MiB start, 415.8 MiB max, 396.9 MiB final. +- Kitchen RSS: 237.0 MiB start, 1090.0 MiB max, 512.2 MiB final. +- Idle engine: 90.0 MiB start, 94.2 MiB final. +- Idle kitchen: 237.0 MiB start, 233.1 MiB final. +- Active engine: 94.2 MiB start, 415.8 MiB max, 396.9 MiB final. +- Active kitchen: 233.1 MiB start, 1090.0 MiB max, 512.2 MiB final. +- Cycle latency: p50 8182.9ms, p95 19027.5ms, p99 26105.4ms, max 34667.8ms. + +Important log errors: + +- `sqlite batch atomic probe failed`. +- `database disk image is malformed`. +- `failed to verify sqlite batch atomic writes`. +- `encoded structured bridge error`. +- `actor run handler failed`. +- `actor start failed`. +- `Cannot read properties of undefined (reading 'sleepCount')`. +- Many `actor_ready_timeout` errors for specific actor keys. +- Many `sqlite get_pages request failed`. +- Many `sqlite database was not found in this bucket branch`. + +Approximate error counts from logs: + +- `database disk image is malformed`: 30. +- `actor_ready_timeout`: 402. +- `Cannot read properties of undefined`: 14. +- `sqlite get_pages request failed`: 2269. +- `encoded structured bridge error`: 5. + +The batch atomic probe SQL that failed: + +```sql +BEGIN IMMEDIATE; +CREATE TABLE IF NOT EXISTS __rivet_batch_probe(x INTEGER); +INSERT INTO __rivet_batch_probe VALUES(1); +DELETE FROM __rivet_batch_probe; +DROP TABLE IF EXISTS __rivet_batch_probe; +COMMIT; +``` + +Interpretation: + +- This is the strongest correctness failure found so far. +- It happened in actor startup/migration, not just during steady `runCycle`. +- It points at a VFS/depot/page-cache/commit consistency problem under high churn. +- The run did not produce a valid leak conclusion because actors failed and the harness got stuck draining. + +### Release 10 to 100 spike with no reset + +Run ID: `20260503-145252-sqlite-spike-release-5m-c10-100-no-reset` + +Report: `/home/nathan/tmp/proc-metrics/20260503-145252-sqlite-spike-release-5m-c10-100-no-reset/index.html` + +Config: + +- Release engine. +- Release NAPI. +- 60s idle baseline. +- 5m target spike from 10 to 100 to 10 concurrent actors. +- 60s post-churn requested. +- `--no-reset`. +- Cleanup disabled. +- Forced GC sampling enabled. + +Result: + +- Did not complete cleanly. +- Failed at about 178s elapsed, after reaching target 100. +- Failure: `timed out waiting for actor sleeping log for 1474y8ra7z2o4835yezyh7zbc0bl00`. +- No `database disk image is malformed` errors were found. +- No `sqlite batch atomic probe failed` errors were found. +- No `Cannot read properties` errors were found. +- No `encoded structured bridge error` errors were found. + +Counts: + +- Samples: 174. +- Cycles: 1172. +- Actor API sleep calls: 953. +- Verified sleeps: 944. +- Max target concurrency: 100. +- Max harness active concurrency: 100. +- Max Envoy active actors sampled: 91. +- Final Envoy active actors sampled: 10. +- Engine RSS: 88.5 MiB start, 361.2 MiB max, 354.2 MiB final. +- Kitchen RSS: 228.8 MiB start, 596.3 MiB max, 328.3 MiB final. +- Kitchen JS heap used: 28.9 MiB start, 34.0 MiB max, 31.8 MiB final. +- Kitchen external memory: 5.0 MiB start, 5.1 MiB max, 5.0 MiB final. +- Kitchen native non-V8 resident estimate: 168.9 MiB start, 526.1 MiB max, 291.7 MiB final. +- Cycle latency: p50 3549.5ms, p95 18823.9ms, p99 29156.4ms, max 37399.0ms. + +Error counts: + +- `sqlite get_pages request failed`: 955. +- `sqlite database was not found in this bucket branch`: 955. +- `actor_ready_timeout`: 44. +- `timed out waiting for actor sleeping log`: 1. +- `database disk image is malformed`: 0. +- `sqlite batch atomic probe failed`: 0. +- `Cannot read properties`: 0. +- `encoded structured bridge error`: 0. + +Interpretation: + +- At max concurrency 100, the malformed database failure did not reproduce. +- The missing-database preload/read storm still reproduced without reset. +- Actor ready timeouts appeared under load and likely contributed to the harness failing to observe one sleep log. +- Kitchen memory again dropped substantially from peak before failure, and the growth was mostly native non-V8 memory. +- This is still not a valid leak proof because the run failed before clean drain and post-churn measurement. + +## Issue 1: SQLite malformed database during batch atomic probe + +Severity: high. + +Evidence: + +- Release no-reset 10 to 200 spike logged `database disk image is malformed`. +- The error happened while verifying SQLite batch atomic writes. +- The failing path was in actor startup/migration, with `onMigrate` in the stack. +- The same run then produced actor startup failures and repeated ready timeouts. + +Current theory: + +- The VFS or depot read/write path is returning inconsistent SQLite page state under high actor churn. +- Likely areas are batch atomic write handling, page cache invalidation, commit staging/finalization, preload hydration, or read-after-write visibility. +- This is not proven yet. + +What would prove it: + +- A focused repro that fails without the full harness. +- Extra VFS logging around database ID, generation, branch, page count, dirty page commit, preload hints, and batch atomic probe lifecycle. +- A direct VFS/depot stress test that repeatedly opens, migrates, writes, sleeps, and reopens the same shape of actor databases under bounded concurrency. + +## Issue 2: Missing-database storm during reset on fresh actors + +Severity: high, but probably lower than malformed DB. + +Evidence: + +- Release reset-enabled 10 to 200 spike failed in `handle.reset()`. +- Actor IDs were fresh, so reset was redundant. +- Engine logs had many `sqlite get_pages request failed` and `sqlite database was not found in this bucket branch`. +- Client surfaced only a generic internal error. + +Current theory: + +- The fresh DB/open/reset/preload path treats an expected empty or not-yet-created database as a hard error somewhere. +- Under high concurrency this becomes an error storm and can fail actor startup. +- Reset also does delete plus `VACUUM`, making it a much heavier startup operation than needed for fresh actor IDs. + +What would prove it: + +- Reproduce with a smaller harness that calls only `reset` on fresh actors. +- Log whether the missing database is from preload, `get_pages`, reset, `VACUUM`, or initial open. +- Distinguish expected missing fresh database from an actually corrupted or lost database branch. + +## Issue 3: Actor ready timeout and stuck drain after startup failure + +Severity: medium-high. + +Evidence: + +- The release no-reset spike got stuck after the scheduling window ended. +- Envoy still reported active actors. +- Logs repeated `actor_ready_timeout` for specific actors. +- The harness had no surfaced `run_error` even while the engine logs showed startup failures. + +Current theory: + +- Actor startup failures can leave the client or harness waiting for sleep/drain forever. +- The engine/client retry path may repeatedly try actors that are already poisoned by startup failure. +- The harness should have a bounded drain grace period so soak failures become explicit instead of hanging. + +What would prove it: + +- Add per-actor lifecycle event logging for create, ready, cycle start, cycle done, force sleep request, sleep observed, and startup failure. +- Make the harness emit an error if drain exceeds a configured grace period. +- Verify whether actor-ready retries stop once the underlying startup error is terminal. + +## Issue 4: `onSleep` received undefined state after failed startup + +Severity: medium. + +Evidence: + +- Logs showed `Cannot read properties of undefined (reading 'sleepCount')` in `sqlite-memory-pressure.ts`. +- This happened after startup/migration failures. +- The `onSleep` hook expects `c.state` to be initialized. + +Current theory: + +- This is likely secondary fallout from failed startup. +- Either user lifecycle hooks are being called in a state where actor state was not initialized, or the test actor should defensively tolerate failed-start cleanup. + +What would prove it: + +- Reproduce a forced `onMigrate` failure and observe whether `onSleep` is called with undefined state. +- Inspect core/NAPI lifecycle cleanup to decide whether `onSleep` is valid after failed startup. + +## Memory observations + +Known good signals: + +- Kitchen memory drops substantially after actors sleep in lower-concurrency and 10 to 50 spike runs. +- In the 2 minute fixed-concurrency run, kitchen RSS returned from 445.4 MiB max to 264.6 MiB final. +- smaps showed native anonymous/private dirty memory dropping, while JS heap stayed roughly flat. +- The memory source in kitchen appears mostly native, not JS heap. + +Known concerning signals: + +- Engine RSS did not drop much in the successful 10 to 50 spike: 355.1 MiB max, 347.9 MiB final. +- The release 10 to 200 partial no-reset run had engine RSS at 396.9 MiB final and kitchen RSS at 512.2 MiB final, but this is not a clean memory result because correctness failed and the run was manually stopped. +- The release 10 to 50/200 question needs another clean run after correctness issues are isolated or avoided. + +Known release baselines: + +- Release engine idle baseline was about 90 to 94 MiB. +- Release kitchen idle baseline was about 233 to 237 MiB. + +## Serverless and payload-size context + +The kitchen sink runs serverless by default in this path. + +The start payload size concern was that serverless actor start carries startup data needed to hydrate the actor, including SQLite preload data. If body-size limits are too low, actor start can fail or silently create misleading runtime behavior. The production checklist was updated to verify that serverless request start body size has generous headroom for SQLite preload data. + +The default engine preload size should be treated as the floor for sizing the serverless start body limit. Configure a generous margin above the maximum default SQLite page preload payload, not a tight limit equal to the current default. + +## Things that are not proven yet + +- We have not proven there are no remaining memory leaks. +- We have not proven whether engine RSS staying high is a leak, allocator behavior, cache retention, or intentionally retained engine state. +- We have not proven the exact root cause of `database disk image is malformed`. +- We have not proven whether `sqlite database was not found in this bucket branch` is expected missing fresh DB state being over-reported or a real storage lookup bug. +- We have not proven whether `onSleep` with undefined state is a core lifecycle bug or just a test actor assumption exposed by failed startup. + +## Recommended next steps + +1. Focus first on `database disk image is malformed` from the release no-reset spike. +2. Reduce the repro to the smallest concurrency that still fails, likely trying max concurrency 100, 150, then 200. +3. Add structured VFS/depot logs for batch atomic probe, page reads, page writes, branch/generation, and commit finalize. +4. Build a focused direct VFS/depot stress test for repeated open, migrate, write, sleep, and reopen. +5. Separately isolate fresh-actor `reset` failures by running only reset on fresh actors. +6. Add a harness drain timeout so stuck actor-ready retries become an explicit failure. +7. Add a clean post-fix release soak: 60s idle baseline, 5m spike, at least 2 to 5m post-drain, concurrency 10 to 200. +8. Only use clean completed runs for leak conclusions. + +## Useful commands + +Release build: + +```bash +cargo build --release -p rivet-engine +pnpm --filter @rivetkit/rivetkit-napi build:force:release +``` + +Release reset-enabled spike command shape: + +```bash +RIVET_ENGINE_BINARY=/home/nathan/r7/target/release/rivet-engine \ +pnpm --filter kitchen-sink memory-soak -- \ + --endpoint http://127.0.0.1:6634 \ + --seed 20260503-142448-sqlite-spike-release-5m-c10-200 \ + --actors 20000 \ + --duration-ms 300000 \ + --cycle-interval-ms 1000 \ + --churn-sleep-after-ms 2000 \ + --spike-min-concurrency 10 \ + --spike-max-concurrency 200 \ + --spike-period-ms 60000 \ + --sample-interval-ms 1000 \ + --pre-workload-wait-ms 60000 \ + --post-churn-wait-ms 60000 \ + --post-cleanup-wait-ms 10000 \ + --no-cleanup \ + --force-gc-samples +``` + +Release no-reset spike command shape: + +```bash +RIVET_ENGINE_BINARY=/home/nathan/r7/target/release/rivet-engine \ +pnpm --filter kitchen-sink memory-soak -- \ + --endpoint http://127.0.0.1:6634 \ + --seed 20260503-142821-sqlite-spike-release-5m-c10-200-no-reset \ + --actors 20000 \ + --duration-ms 300000 \ + --cycle-interval-ms 1000 \ + --churn-sleep-after-ms 2000 \ + --spike-min-concurrency 10 \ + --spike-max-concurrency 200 \ + --spike-period-ms 60000 \ + --sample-interval-ms 1000 \ + --pre-workload-wait-ms 60000 \ + --post-churn-wait-ms 60000 \ + --post-cleanup-wait-ms 10000 \ + --no-reset \ + --no-cleanup \ + --force-gc-samples +``` diff --git a/examples/kitchen-sink/frontend/page-data.ts b/examples/kitchen-sink/frontend/page-data.ts index eb4b32d356..a5b9c3fc2f 100644 --- a/examples/kitchen-sink/frontend/page-data.ts +++ b/examples/kitchen-sink/frontend/page-data.ts @@ -173,6 +173,15 @@ export const parallelismTest = actor({ c.broadcast("sqliteCountChanged", { count }); }, }, +});`, + sqliteMemoryPressure: `const actor = client.sqliteMemoryPressure.getOrCreate(["memory"]); +await actor.runCycle({ + seed: "local", + cycle: 0, + insertRows: 128, + rowBytes: 16384, + deleteRows: 64, + retainRows: 1024, });`, }; @@ -371,6 +380,24 @@ export const ACTION_TEMPLATES: Record = { { label: "Increment SQLite", action: "incrementSqlite", args: [] }, { label: "Get SQLite Count", action: "getSqliteCount", args: [] }, ], + sqliteMemoryPressure: [ + { + label: "Run Cycle", + action: "runCycle", + args: [ + { + seed: "ui", + cycle: 0, + insertRows: 64, + rowBytes: 8192, + deleteRows: 32, + retainRows: 512, + }, + ], + }, + { label: "Stats", action: "stats", args: [] }, + { label: "Reset", action: "reset", args: [] }, + ], }; export const PAGE_GROUPS: PageGroup[] = [ @@ -1255,6 +1282,15 @@ export const PAGE_GROUPS: PageGroup[] = [ T -->|action| A A -->|result| T`, }, + { + id: "sqlite-memory-pressure", + title: "SQLite Memory Pressure", + description: + "Drive bounded SQLite write, scan, update, and delete churn for local memory-soak runs.", + docs: [], + actors: ["sqliteMemoryPressure"], + snippet: SNIPPETS.sqliteMemoryPressure, + }, ], }, ]; diff --git a/examples/kitchen-sink/package.json b/examples/kitchen-sink/package.json index 40a16bb14b..24a55ab07a 100644 --- a/examples/kitchen-sink/package.json +++ b/examples/kitchen-sink/package.json @@ -10,6 +10,8 @@ "build": "vite build", "test": "node --import tsx --test tests/*.test.ts", "start": "node --import @rivetkit/sql-loader --import tsx src/server.ts", + "memory-soak": "tsx scripts/sqlite-memory-soak.ts", + "proc-metrics": "tsx scripts/proc-metrics-report.ts", "smoke:raw-websocket-serverless": "tsx scripts/raw-websocket-serverless-smoke.ts", "benchmark": "tsx scripts/benchmark.ts", "db:generate": "find src/actors -name drizzle.config.ts -exec drizzle-kit generate --config {} \\;" diff --git a/examples/kitchen-sink/scripts/proc-metrics-report.ts b/examples/kitchen-sink/scripts/proc-metrics-report.ts new file mode 100644 index 0000000000..d4881c5a50 --- /dev/null +++ b/examples/kitchen-sink/scripts/proc-metrics-report.ts @@ -0,0 +1,711 @@ +#!/usr/bin/env -S pnpm exec tsx + +import { existsSync, mkdirSync, readFileSync, writeFileSync } from "node:fs"; +import { homedir } from "node:os"; +import { basename, dirname, join, resolve } from "node:path"; +import { fileURLToPath } from "node:url"; + +type Json = Record; + +interface Trace { + name: string; + x: number[]; + y: Array; + mode?: "lines" | "markers" | "lines+markers"; + type?: "scatter"; + yaxis?: "y" | "y2" | "y3"; + line?: { + color?: string; + dash?: string; + shape?: "linear" | "hv"; + width?: number; + }; + marker?: { + color?: string; + size?: number; + }; +} + +interface Chart { + id: string; + title: string; + yTitle: string; + traces: Trace[]; + y2Title?: string; + y3Title?: string; +} + +const BYTES_PER_MIB = 1024 * 1024; +const DEFAULT_OUTPUT_ROOT = join(homedir(), "tmp/proc-metrics"); +const REPO_ROOT = fileURLToPath(new URL("../../..", import.meta.url)); + +function usage(exitCode = 1): never { + console.error(`Usage: + pnpm --filter kitchen-sink proc-metrics -- [--out-dir ] + +Examples: + pnpm --filter kitchen-sink proc-metrics -- .agent/benchmarks/sqlite-memory-soak/no-delete-sleep-5m-10c/events.jsonl +`); + process.exit(exitCode); +} + +function readFlag(argv: string[], name: string): string | undefined { + const index = argv.indexOf(name); + if (index === -1) return undefined; + const value = argv[index + 1]; + if (!value || value.startsWith("--")) usage(); + return value; +} + +function sanitizeRunId(value: string): string { + return value.replace(/[^a-zA-Z0-9_.-]/g, "_"); +} + +function resolveInputPath(inputPath: string): string { + const direct = resolve(inputPath); + if (existsSync(direct)) return direct; + const repoRelative = resolve(REPO_ROOT, inputPath); + if (existsSync(repoRelative)) return repoRelative; + return direct; +} + +function numberAt(value: unknown): number | null { + return typeof value === "number" && Number.isFinite(value) ? value : null; +} + +function get(obj: unknown, path: string[]): unknown { + let current = obj; + for (const part of path) { + if (typeof current !== "object" || current === null) return undefined; + current = (current as Json)[part]; + } + return current; +} + +function bytesToMiB(value: unknown): number | null { + const n = numberAt(value); + return n === null ? null : n / BYTES_PER_MIB; +} + +function pagesToMiB(pages: unknown, pageSize: unknown): number | null { + const pageCount = numberAt(pages); + const size = numberAt(pageSize) ?? 4096; + return pageCount === null ? null : (pageCount * size) / BYTES_PER_MIB; +} + +function eventTime(event: Json, firstTimestampMs: number): number { + const elapsedMs = numberAt(event.elapsedMs); + if (elapsedMs !== null) return elapsedMs / 1000; + const timestamp = typeof event.timestamp === "string" ? Date.parse(event.timestamp) : NaN; + if (Number.isFinite(timestamp)) return (timestamp - firstTimestampMs) / 1000; + return 0; +} + +function makeTrace( + name: string, + events: Json[], + firstTimestampMs: number, + read: (event: Json) => number | null, +): Trace { + return { + name, + x: events.map((event) => eventTime(event, firstTimestampMs)), + y: events.map(read), + mode: "lines", + type: "scatter", + }; +} + +function nonEmpty(trace: Trace): boolean { + return trace.y.some((value) => value !== null); +} + +function rateTrace( + name: string, + events: Json[], + firstTimestampMs: number, + readCumulative: (event: Json) => number | null, + scale: number, +): Trace { + const x: number[] = []; + const y: Array = []; + let prevT: number | null = null; + let prevValue: number | null = null; + for (const event of events) { + const value = readCumulative(event); + const t = eventTime(event, firstTimestampMs); + if (value === null || prevValue === null || prevT === null || t <= prevT) { + prevT = t; + prevValue = value; + continue; + } + x.push(t); + y.push(((value - prevValue) / (t - prevT)) * scale); + prevT = t; + prevValue = value; + } + return { name, x, y, mode: "lines", type: "scatter" }; +} + +function countSeries( + name: string, + events: Json[], + firstTimestampMs: number, + match: (event: Json) => boolean, +): Trace { + const x: number[] = []; + const y: number[] = []; + let count = 0; + for (const event of events) { + if (!match(event)) continue; + count++; + x.push(eventTime(event, firstTimestampMs)); + y.push(count); + } + return { name, x, y, mode: "lines", type: "scatter" }; +} + +function activeActorTrace(events: Json[], firstTimestampMs: number): Trace { + const seen = new Set(); + const points: Array<{ t: number; delta: number }> = []; + for (const event of events) { + const actorIndex = numberAt(event.actorIndex); + if (actorIndex === null) continue; + if ((event.kind === "cycle" || event.kind === "actor_reset") && !seen.has(actorIndex)) { + seen.add(actorIndex); + points.push({ t: eventTime(event, firstTimestampMs), delta: 1 }); + } + if (event.kind === "actor_sleep_verified") { + points.push({ t: eventTime(event, firstTimestampMs), delta: -1 }); + } + } + points.sort((a, b) => a.t - b.t); + let active = 0; + return { + name: "active actor estimate", + x: points.map((point) => point.t), + y: points.map((point) => { + active += point.delta; + return active; + }), + mode: "lines", + type: "scatter", + }; +} + +function envoyActiveActorTrace(samples: Json[], firstTimestampMs: number): Trace { + return { + name: "envoy active actors", + x: samples.map((event) => eventTime(event, firstTimestampMs)), + y: samples.map((event) => + numberAt( + get(event, [ + "kitchenSinkBreakdown", + "registry", + "envoyActiveActorCount", + ]), + ), + ), + mode: "lines", + type: "scatter", + line: { color: "#1f7a4d", width: 2, shape: "hv" }, + }; +} + +function actorWakeTimes(events: Json[], firstTimestampMs: number): number[] { + const seen = new Set(); + const times: number[] = []; + for (const event of events) { + const actorIndex = numberAt(event.actorIndex); + if (actorIndex === null || seen.has(actorIndex)) continue; + if ( + event.kind === "actor_wake" || + event.kind === "actor_reset" || + event.kind === "cycle" + ) { + seen.add(actorIndex); + times.push(eventTime(event, firstTimestampMs)); + } + } + return times.sort((a, b) => a - b); +} + +function addAlignmentOverlays( + charts: Chart[], + activeActors: Trace, +): Chart[] { + return charts.map((chart) => { + if (chart.id === "actors") { + return chart; + } + const activeAxis: "y2" | "y3" = chart.y2Title ? "y3" : "y2"; + const activeTrace = { + ...activeActors, + yaxis: activeAxis, + line: { + ...activeActors.line, + dash: "dot", + width: 1.5, + }, + }; + return { + ...chart, + y2Title: chart.y2Title ?? "active actors", + y3Title: chart.y2Title ? "active actors" : chart.y3Title, + traces: [...chart.traces, activeTrace], + }; + }); +} + +function rollingThroughputTrace( + name: string, + events: Json[], + firstTimestampMs: number, + match: (event: Json) => boolean, + windowSeconds: number, +): Trace { + const times = events + .filter(match) + .map((event) => eventTime(event, firstTimestampMs)) + .sort((a, b) => a - b); + const x: number[] = []; + const y: number[] = []; + let start = 0; + for (let end = 0; end < times.length; end += 1) { + while (times[start] !== undefined && times[end] - times[start] > windowSeconds) { + start++; + } + x.push(times[end]); + y.push((end - start + 1) / windowSeconds); + } + return { name, x, y, mode: "lines", type: "scatter" }; +} + +function quantile(values: number[], q: number): number | null { + if (values.length === 0) return null; + const sorted = [...values].sort((a, b) => a - b); + return sorted[Math.min(sorted.length - 1, Math.floor(q * sorted.length))] ?? null; +} + +function formatMiB(value: number | null): string { + return value === null ? "n/a" : `${value.toFixed(1)} MiB`; +} + +function htmlEscape(value: string): string { + return value + .replaceAll("&", "&") + .replaceAll("<", "<") + .replaceAll(">", ">") + .replaceAll('"', """); +} + +function jsonForHtml(value: unknown): string { + return JSON.stringify(value).replaceAll("<", "\\u003c"); +} + +function buildHtml(runId: string, inputPath: string, charts: Chart[], summary: Json): string { + return ` + + + + + Proc Metrics ${htmlEscape(runId)} + + + + +
+

Process Metrics: ${htmlEscape(runId)}

+
source: ${htmlEscape(inputPath)}
+
+
+
CPU and I/O charts render when the soak events include cumulative /proc samples. Older runs may only contain memory, actor, and VFS charts.
+
+ ${charts.map((chart) => `
`).join("\n\t\t")} +
+ + + +`; +} + +function main(): void { + const cliArgs = process.argv.slice(2).filter((arg) => arg !== "--"); + const inputPathArg = cliArgs[0]; + if (!inputPathArg || inputPathArg.startsWith("--")) usage(); + const inputPath = resolveInputPath(inputPathArg); + const outDirArg = readFlag(cliArgs.slice(1), "--out-dir"); + const events = readFileSync(inputPath, "utf8") + .trim() + .split("\n") + .filter(Boolean) + .map((line) => JSON.parse(line) as Json); + + if (events.length === 0) throw new Error(`no events in ${inputPath}`); + const runStart = events.find((event) => event.kind === "run_start"); + const samples = events.filter((event) => event.kind === "memory_sample"); + const cycles = events.filter((event) => event.kind === "cycle"); + const sleeps = events.filter((event) => event.kind === "actor_sleep_verified"); + const runId = sanitizeRunId( + (typeof runStart?.runId === "string" && runStart.runId) || + (typeof samples[0]?.runId === "string" && samples[0].runId) || + basename(dirname(inputPath)), + ); + const firstTimestampMs = Date.parse( + (typeof events[0]?.timestamp === "string" && events[0].timestamp) || + (typeof samples[0]?.timestamp === "string" && samples[0].timestamp) || + new Date().toISOString(), + ); + const estimatedActiveActors = activeActorTrace(events, firstTimestampMs); + const reportedActiveActors = envoyActiveActorTrace(samples, firstTimestampMs); + const activeActorOverlay = nonEmpty(reportedActiveActors) + ? reportedActiveActors + : estimatedActiveActors; + const wakeTimes = actorWakeTimes(events, firstTimestampMs); + + const memoryChart: Chart = { + id: "memory-rss", + title: "Process RSS", + yTitle: "MiB", + traces: [ + makeTrace("harness RSS", samples, firstTimestampMs, (event) => + bytesToMiB(get(event, ["harness", "rssBytes"])), + ), + makeTrace("engine RSS", samples, firstTimestampMs, (event) => + bytesToMiB(get(event, ["engine", "rssBytes"])), + ), + makeTrace("kitchen RSS", samples, firstTimestampMs, (event) => + bytesToMiB(get(event, ["kitchenSink", "rssBytes"])), + ), + ].filter(nonEmpty), + }; + const pssChart: Chart = { + id: "memory-pss-anon", + title: "PSS And Anonymous Memory", + yTitle: "MiB", + traces: [ + makeTrace("engine PSS", samples, firstTimestampMs, (event) => + bytesToMiB(get(event, ["engine", "smapsRollup", "Pss"])), + ), + makeTrace("engine anon PSS", samples, firstTimestampMs, (event) => + bytesToMiB(get(event, ["engine", "smapsRollup", "Pss_Anon"])), + ), + makeTrace("kitchen PSS", samples, firstTimestampMs, (event) => + bytesToMiB(get(event, ["kitchenSink", "smapsRollup", "Pss"])), + ), + makeTrace("kitchen anon PSS", samples, firstTimestampMs, (event) => + bytesToMiB(get(event, ["kitchenSink", "smapsRollup", "Pss_Anon"])), + ), + ].filter(nonEmpty), + }; + const kitchenChart: Chart = { + id: "kitchen-v8-native", + title: "Kitchen-Sink V8 vs Native Estimate", + yTitle: "MiB", + traces: [ + makeTrace("JS heap used", samples, firstTimestampMs, (event) => + bytesToMiB( + get(event, [ + "kitchenSinkBreakdown", + "estimates", + "jsHeapUsedBytes", + ]), + ), + ), + makeTrace("JS heap resident", samples, firstTimestampMs, (event) => + bytesToMiB( + get(event, [ + "kitchenSinkBreakdown", + "estimates", + "jsHeapResidentBytes", + ]), + ), + ), + makeTrace("V8 external", samples, firstTimestampMs, (event) => + bytesToMiB( + get(event, [ + "kitchenSinkBreakdown", + "estimates", + "v8ExternalBytes", + ]), + ), + ), + makeTrace("native non-V8 estimate", samples, firstTimestampMs, (event) => + bytesToMiB( + get(event, [ + "kitchenSinkBreakdown", + "estimates", + "nativeNonV8ResidentEstimateBytes", + ]), + ), + ), + ].filter(nonEmpty), + }; + const cpuChart: Chart = { + id: "cpu", + title: "CPU Utilization From /proc", + yTitle: "% of one core", + traces: [ + rateTrace( + "harness CPU", + samples, + firstTimestampMs, + (event) => numberAt(get(event, ["harness", "cpuTotalSeconds"])), + 100, + ), + rateTrace( + "engine CPU", + samples, + firstTimestampMs, + (event) => numberAt(get(event, ["engine", "cpuTotalSeconds"])), + 100, + ), + rateTrace( + "kitchen CPU", + samples, + firstTimestampMs, + (event) => numberAt(get(event, ["kitchenSink", "cpuTotalSeconds"])), + 100, + ), + ].filter(nonEmpty), + }; + const ioChart: Chart = { + id: "io", + title: "Process I/O Throughput", + yTitle: "MiB/s", + traces: [ + rateTrace( + "engine read", + samples, + firstTimestampMs, + (event) => bytesToMiB(get(event, ["engine", "io", "readBytes"])), + 1, + ), + rateTrace( + "engine write", + samples, + firstTimestampMs, + (event) => bytesToMiB(get(event, ["engine", "io", "writeBytes"])), + 1, + ), + rateTrace( + "kitchen read", + samples, + firstTimestampMs, + (event) => + bytesToMiB(get(event, ["kitchenSink", "io", "readBytes"])), + 1, + ), + rateTrace( + "kitchen write", + samples, + firstTimestampMs, + (event) => + bytesToMiB(get(event, ["kitchenSink", "io", "writeBytes"])), + 1, + ), + ].filter(nonEmpty), + }; + const threadsFdsChart: Chart = { + id: "threads-fds", + title: "Threads And File Descriptors", + yTitle: "count", + traces: [ + makeTrace("engine threads", samples, firstTimestampMs, (event) => + numberAt(get(event, ["engine", "threads"])), + ), + makeTrace("engine fds", samples, firstTimestampMs, (event) => + numberAt(get(event, ["engine", "openFds"])), + ), + makeTrace("kitchen threads", samples, firstTimestampMs, (event) => + numberAt(get(event, ["kitchenSink", "threads"])), + ), + makeTrace("kitchen fds", samples, firstTimestampMs, (event) => + numberAt(get(event, ["kitchenSink", "openFds"])), + ), + ].filter(nonEmpty), + }; + const actorChart: Chart = { + id: "actors", + title: "Actor Churn", + yTitle: "count", + traces: [ + activeActorOverlay, + countSeries("actors slept", events, firstTimestampMs, (event) => + event.kind === "actor_sleep_verified", + ), + countSeries("cycles completed", events, firstTimestampMs, (event) => + event.kind === "cycle", + ), + ].filter(nonEmpty), + }; + const cycleChart: Chart = { + id: "cycle-throughput-latency", + title: "Cycle Throughput And Latency", + yTitle: "cycles/s", + y2Title: "ms", + traces: [ + rollingThroughputTrace( + "cycle throughput, 10s window", + events, + firstTimestampMs, + (event) => event.kind === "cycle", + 10, + ), + { + name: "cycle latency", + x: cycles.map((event) => eventTime(event, firstTimestampMs)), + y: cycles.map((event) => numberAt(event.durationMs)), + mode: "markers" as const, + type: "scatter" as const, + yaxis: "y2" as const, + }, + ].filter(nonEmpty), + }; + const vfsChart: Chart = { + id: "sqlite-vfs", + title: "SQLite VFS Per-Cycle Metrics", + yTitle: "MiB", + y2Title: "pages / entries", + traces: [ + makeTrace("page cache", cycles, firstTimestampMs, (event) => + pagesToMiB( + get(event, [ + "result", + "storage", + "vfs", + "pageCacheWeightedSize", + ]), + get(event, ["result", "storage", "page_size"]), + ), + ), + makeTrace("db size", cycles, firstTimestampMs, (event) => + pagesToMiB( + get(event, ["result", "storage", "vfs", "dbSizePages"]), + get(event, ["result", "storage", "page_size"]), + ), + ), + { + ...makeTrace("cache entries", cycles, firstTimestampMs, (event) => + numberAt( + get(event, [ + "result", + "storage", + "vfs", + "pageCacheEntries", + ]), + ), + ), + yaxis: "y2" as const, + }, + { + ...makeTrace("dirty pages", cycles, firstTimestampMs, (event) => + numberAt( + get(event, [ + "result", + "storage", + "vfs", + "writeBufferDirtyPages", + ]), + ), + ), + yaxis: "y2" as const, + }, + ].filter(nonEmpty), + }; + + const charts = addAlignmentOverlays([ + memoryChart, + pssChart, + kitchenChart, + cpuChart, + ioChart, + threadsFdsChart, + actorChart, + cycleChart, + vfsChart, + ].filter((chart) => chart.traces.length > 0), activeActorOverlay); + + const engineRss = samples + .map((event) => bytesToMiB(get(event, ["engine", "rssBytes"]))) + .filter((value): value is number => value !== null); + const kitchenRss = samples + .map((event) => bytesToMiB(get(event, ["kitchenSink", "rssBytes"]))) + .filter((value): value is number => value !== null); + const cycleLatencies = cycles + .map((event) => numberAt(event.durationMs)) + .filter((value): value is number => value !== null); + const summary: Json = { + "samples": samples.length, + "cycles": cycles.length, + "actors slept": sleeps.length, + "actor wakes": wakeTimes.length, + "active actor source": nonEmpty(reportedActiveActors) ? "envoy" : "harness estimate", + "engine RSS max": formatMiB(engineRss.length ? Math.max(...engineRss) : null), + "engine RSS final": formatMiB(engineRss.at(-1) ?? null), + "kitchen RSS max": formatMiB(kitchenRss.length ? Math.max(...kitchenRss) : null), + "kitchen RSS final": formatMiB(kitchenRss.at(-1) ?? null), + "cycle latency p95": `${ + quantile(cycleLatencies, 0.95)?.toFixed(1) ?? "n/a" + } ms`, + }; + + const outputDir = resolve(outDirArg ?? join(DEFAULT_OUTPUT_ROOT, runId)); + mkdirSync(outputDir, { recursive: true }); + const html = buildHtml(runId, inputPath, charts, summary); + const htmlPath = join(outputDir, "index.html"); + const jsonPath = join(outputDir, "charts.json"); + writeFileSync(htmlPath, html); + writeFileSync(jsonPath, JSON.stringify({ runId, inputPath, summary, charts }, null, 2)); + console.log(`wrote ${htmlPath}`); + console.log(`wrote ${jsonPath}`); +} + +main(); diff --git a/examples/kitchen-sink/scripts/sqlite-memory-soak.ts b/examples/kitchen-sink/scripts/sqlite-memory-soak.ts new file mode 100644 index 0000000000..2748fde94e --- /dev/null +++ b/examples/kitchen-sink/scripts/sqlite-memory-soak.ts @@ -0,0 +1,1865 @@ +#!/usr/bin/env -S pnpm exec tsx + +import { spawn, type ChildProcess } from "node:child_process"; +import { + appendFileSync, + existsSync, + mkdirSync, + mkdtempSync, + readFileSync, + readdirSync, + rmSync, + statSync, + writeFileSync, +} from "node:fs"; +import { createServer } from "node:net"; +import { tmpdir } from "node:os"; +import { join, resolve } from "node:path"; +import { fileURLToPath } from "node:url"; +import { createClient } from "rivetkit/client"; +import type { registry } from "../src/index.ts"; + +const REPO_ROOT = fileURLToPath(new URL("../../..", import.meta.url)); +const EXAMPLE_DIR = fileURLToPath(new URL("..", import.meta.url)); +const REPO_ENGINE_BINARY = fileURLToPath( + new URL("../../../target/debug/rivet-engine", import.meta.url), +); +const DEFAULT_ENGINE_PORT = 6520; +const DEFAULT_OUTPUT_DIR = ".agent/benchmarks/sqlite-memory-soak"; +const SQLITE_PAGE_SIZE_BYTES = 4096; +const CLOCK_TICKS_PER_SECOND = 100; + +interface Args { + endpoint: string; + serverPort: number; + seed: string; + actors: number; + cycles: number; + durationMs: number; + cycleIntervalMs: number; + actorStartIntervalMs: number; + concurrency: number; + spikeMinConcurrency: number; + spikeMaxConcurrency: number; + spikePeriodMs: number; + insertRows: number; + rowBytes: number; + scanRows: number; + sampleIntervalMs: number; + wakeEvery: number; + wakeDelayMs: number; + churnSleepAfterMs: number; + sleepLogTimeoutMs: number; + preWorkloadWaitMs: number; + postChurnWaitMs: number; + postCleanupWaitMs: number; + requestLifespanSeconds: number; + serverlessMaxStartPayloadBytes: number; + outputDir: string; + metricsToken: string; + reset: boolean; + cleanup: boolean; + forceGcSamples: boolean; + keepStorage: boolean; +} + +interface ManagedChild { + child: ChildProcess; + label: string; + logPath: string; + logs: string[]; +} + +interface LocalEngine extends ManagedChild { + dbRoot: string; +} + +interface MemorySample { + kind: "memory_sample"; + runId: string; + elapsedMs: number; + timestamp: string; + harness: ProcMemory; + engine: ProcMemory; + kitchenSink: ProcMemory; + kitchenSinkBreakdown: unknown; +} + +interface ProcMemory { + pid: number | null; + alive: boolean; + rssBytes: number | null; + hwmRssBytes: number | null; + vmSizeBytes: number | null; + threads: number | null; + procState?: string | null; + cpuUserSeconds?: number | null; + cpuSystemSeconds?: number | null; + cpuTotalSeconds?: number | null; + openFds?: number | null; + io?: ProcIo; + smapsRollup?: Record; + error?: string; +} + +interface ProcIo { + readBytes: number | null; + writeBytes: number | null; + syscr: number | null; + syscw: number | null; +} + +function kitchenSinkPidFromBreakdown(breakdown: unknown): number | undefined { + if (typeof breakdown !== "object" || breakdown === null) return undefined; + const pid = (breakdown as { pid?: unknown }).pid; + return typeof pid === "number" && Number.isInteger(pid) ? pid : undefined; +} + +function usage(exitCode = 1): never { + console.error(`Usage: + pnpm --filter kitchen-sink memory-soak [options] + +Options: + --endpoint Engine endpoint. Default: http://127.0.0.1:6520. + --server-port Kitchen-sink HTTP port. Default: open port. + --seed Actor key seed. Default: generated. + --actors Actor instances. Default: 4. + --cycles Max cycles per actor. Default: 20, or unbounded with --duration-ms. + --duration-ms Stop scheduling cycles after this duration. Default: 0. + --cycle-interval-ms Fixed interval per actor cycle. Default: 1000. + --actor-start-interval-ms + Fixed interval between actor cold starts. Default: 1000. + --concurrency Concurrent actor drivers. Default: 4. + --spike-min-concurrency If >0, enable spike mode with this minimum target concurrency. Default: 0. + --spike-max-concurrency Spike mode maximum target concurrency. Default: --concurrency. + --spike-period-ms Full up/down spike period. Default: 60000. + --insert-rows Rows inserted per cycle. Default: 128. + --row-bytes randomblob bytes per inserted row. Default: 16384. + --scan-rows Rows scanned per cycle. Default: 512. + --sample-interval-ms Memory sample interval. Default: 1000. + --wake-every Sleep each actor every N cycles. Default: 0. + --wake-delay-ms Delay after sleep. Default: 1000. + --churn-sleep-after-ms If >0, sleep each actor through the engine API after this many ms, then spawn another. Default: 0. + --sleep-log-timeout-ms Timeout waiting for the actor sleeping log after API sleep. Default: 10000. + --pre-workload-wait-ms Sample idle engine and kitchen-sink before creating actors. Default: 0. + --post-churn-wait-ms Sample after churn completes and before cleanup wakes actors. Default: 0. + --post-cleanup-wait-ms Final sample window after cleanup. Default: 5000. + --request-lifespan-seconds + Serverless request lifespan. Default: scheduled work plus startup and slow-tail margin. + --serverless-max-start-payload-bytes + Local /api/rivet/start body limit. Default: 8388608. + --output-dir Output directory. Default: ${DEFAULT_OUTPUT_DIR}. + --metrics-token Engine metrics token. Default: dev-metrics. + --no-reset Reuse actor DBs instead of resetting first. + --no-cleanup Leave actor DB contents after run. + --force-gc-samples Request /debug/memory?gc=1 on each sample. + --keep-storage Keep the temp engine storage directory. + +The harness refuses port 6420 by default so it does not collide with the normal local engine.`); + process.exit(exitCode); +} + +function readFlag(argv: string[], name: string): string | undefined { + const prefix = `${name}=`; + const inline = argv.find((arg) => arg.startsWith(prefix)); + if (inline) return inline.slice(prefix.length); + const index = argv.indexOf(name); + if (index >= 0) return argv[index + 1]; + return undefined; +} + +function readNumber( + argv: string[], + flag: string, + envName: string, + defaultValue: number, +): number { + const raw = readFlag(argv, flag) ?? process.env[envName]; + if (raw === undefined) return defaultValue; + const value = Number.parseInt(raw, 10); + if (!Number.isFinite(value) || value < 0) { + throw new Error(`invalid ${flag}: ${raw}`); + } + return value; +} + +function timestampRunPrefix(date = new Date()): string { + const pad = (value: number) => value.toString().padStart(2, "0"); + return [ + date.getFullYear(), + pad(date.getMonth() + 1), + pad(date.getDate()), + "-", + pad(date.getHours()), + pad(date.getMinutes()), + pad(date.getSeconds()), + ].join(""); +} + +function scheduledWorkMs(args: Args): number { + if (args.durationMs > 0) return args.durationMs; + return args.cycles * args.cycleIntervalMs; +} + +function defaultRequestLifespanSeconds(args: Args): number { + const resetBudgetMs = args.reset ? args.actorStartIntervalMs * args.actors : 0; + const slowTailBudgetMs = Math.max(5 * 60_000, args.cycleIntervalMs * 2); + return Math.max( + 300, + Math.ceil( + (scheduledWorkMs(args) + + args.preWorkloadWaitMs + + args.postCleanupWaitMs + + args.postChurnWaitMs + + resetBudgetMs + + slowTailBudgetMs) / + 1000, + ), + ); +} + +function parseArgs(argv: string[]): Args { + if (argv.includes("--help") || argv.includes("-h")) usage(0); + + const endpoint = + readFlag(argv, "--endpoint") ?? + process.env.SQLITE_MEMORY_SOAK_ENDPOINT ?? + `http://127.0.0.1:${DEFAULT_ENGINE_PORT}`; + const endpointUrl = new URL(endpoint); + if (endpointUrl.port === "6420") { + throw new Error("sqlite-memory-soak must not run the engine on port 6420"); + } + + const args: Args = { + endpoint, + serverPort: readNumber(argv, "--server-port", "SQLITE_MEMORY_SOAK_SERVER_PORT", 0), + seed: + readFlag(argv, "--seed") ?? + process.env.SQLITE_MEMORY_SOAK_SEED ?? + `${timestampRunPrefix()}-sqlite-memory-soak`, + actors: readNumber(argv, "--actors", "SQLITE_MEMORY_SOAK_ACTORS", 4), + cycles: readNumber(argv, "--cycles", "SQLITE_MEMORY_SOAK_CYCLES", 20), + durationMs: readNumber( + argv, + "--duration-ms", + "SQLITE_MEMORY_SOAK_DURATION_MS", + 0, + ), + cycleIntervalMs: readNumber( + argv, + "--cycle-interval-ms", + "SQLITE_MEMORY_SOAK_CYCLE_INTERVAL_MS", + 1000, + ), + actorStartIntervalMs: readNumber( + argv, + "--actor-start-interval-ms", + "SQLITE_MEMORY_SOAK_ACTOR_START_INTERVAL_MS", + 1000, + ), + concurrency: readNumber( + argv, + "--concurrency", + "SQLITE_MEMORY_SOAK_CONCURRENCY", + 4, + ), + spikeMinConcurrency: readNumber( + argv, + "--spike-min-concurrency", + "SQLITE_MEMORY_SOAK_SPIKE_MIN_CONCURRENCY", + 0, + ), + spikeMaxConcurrency: readNumber( + argv, + "--spike-max-concurrency", + "SQLITE_MEMORY_SOAK_SPIKE_MAX_CONCURRENCY", + 0, + ), + spikePeriodMs: readNumber( + argv, + "--spike-period-ms", + "SQLITE_MEMORY_SOAK_SPIKE_PERIOD_MS", + 60_000, + ), + insertRows: readNumber( + argv, + "--insert-rows", + "SQLITE_MEMORY_SOAK_INSERT_ROWS", + 128, + ), + rowBytes: readNumber( + argv, + "--row-bytes", + "SQLITE_MEMORY_SOAK_ROW_BYTES", + 16 * 1024, + ), + scanRows: readNumber( + argv, + "--scan-rows", + "SQLITE_MEMORY_SOAK_SCAN_ROWS", + 512, + ), + sampleIntervalMs: readNumber( + argv, + "--sample-interval-ms", + "SQLITE_MEMORY_SOAK_SAMPLE_INTERVAL_MS", + 1000, + ), + wakeEvery: readNumber( + argv, + "--wake-every", + "SQLITE_MEMORY_SOAK_WAKE_EVERY", + 0, + ), + wakeDelayMs: readNumber( + argv, + "--wake-delay-ms", + "SQLITE_MEMORY_SOAK_WAKE_DELAY_MS", + 1000, + ), + churnSleepAfterMs: readNumber( + argv, + "--churn-sleep-after-ms", + "SQLITE_MEMORY_SOAK_CHURN_SLEEP_AFTER_MS", + 0, + ), + sleepLogTimeoutMs: readNumber( + argv, + "--sleep-log-timeout-ms", + "SQLITE_MEMORY_SOAK_SLEEP_LOG_TIMEOUT_MS", + 10_000, + ), + preWorkloadWaitMs: readNumber( + argv, + "--pre-workload-wait-ms", + "SQLITE_MEMORY_SOAK_PRE_WORKLOAD_WAIT_MS", + 0, + ), + postChurnWaitMs: readNumber( + argv, + "--post-churn-wait-ms", + "SQLITE_MEMORY_SOAK_POST_CHURN_WAIT_MS", + 0, + ), + postCleanupWaitMs: readNumber( + argv, + "--post-cleanup-wait-ms", + "SQLITE_MEMORY_SOAK_POST_CLEANUP_WAIT_MS", + 5000, + ), + requestLifespanSeconds: 0, + serverlessMaxStartPayloadBytes: readNumber( + argv, + "--serverless-max-start-payload-bytes", + "SQLITE_MEMORY_SOAK_SERVERLESS_MAX_START_PAYLOAD_BYTES", + 16 * 1024 * 1024, + ), + outputDir: + readFlag(argv, "--output-dir") ?? + process.env.SQLITE_MEMORY_SOAK_OUTPUT_DIR ?? + DEFAULT_OUTPUT_DIR, + metricsToken: + readFlag(argv, "--metrics-token") ?? + process.env.SQLITE_MEMORY_SOAK_METRICS_TOKEN ?? + "dev-metrics", + reset: !argv.includes("--no-reset"), + cleanup: !argv.includes("--no-cleanup"), + forceGcSamples: argv.includes("--force-gc-samples"), + keepStorage: argv.includes("--keep-storage"), + }; + if ( + args.durationMs > 0 && + readFlag(argv, "--cycles") === undefined && + process.env.SQLITE_MEMORY_SOAK_CYCLES === undefined + ) { + args.cycles = Number.MAX_SAFE_INTEGER; + } + args.requestLifespanSeconds = readNumber( + argv, + "--request-lifespan-seconds", + "SQLITE_MEMORY_SOAK_REQUEST_LIFESPAN_SECONDS", + defaultRequestLifespanSeconds(args), + ); + + for (const [name, value] of [ + ["--actors", args.actors], + ["--cycles", args.cycles], + ["--cycle-interval-ms", args.cycleIntervalMs], + ["--actor-start-interval-ms", args.actorStartIntervalMs], + ["--concurrency", args.concurrency], + [ + "--spike-max-concurrency", + args.spikeMaxConcurrency > 0 + ? args.spikeMaxConcurrency + : args.spikeMinConcurrency > 0 + ? args.concurrency + : 1, + ], + ["--spike-period-ms", args.spikePeriodMs], + ["--insert-rows", args.insertRows], + ["--row-bytes", args.rowBytes], + ["--scan-rows", args.scanRows], + ["--sample-interval-ms", args.sampleIntervalMs], + ["--request-lifespan-seconds", args.requestLifespanSeconds], + ["--sleep-log-timeout-ms", args.sleepLogTimeoutMs], + [ + "--serverless-max-start-payload-bytes", + args.serverlessMaxStartPayloadBytes, + ], + ] as const) { + if (value < 1) throw new Error(`${name} must be >= 1`); + } + if (args.churnSleepAfterMs < 0) { + throw new Error("--churn-sleep-after-ms must be >= 0"); + } + if (args.preWorkloadWaitMs < 0) { + throw new Error("--pre-workload-wait-ms must be >= 0"); + } + if (args.spikeMinConcurrency > 0) { + if (args.spikeMaxConcurrency === 0) { + args.spikeMaxConcurrency = args.concurrency; + } + if (args.spikeMaxConcurrency < args.spikeMinConcurrency) { + throw new Error( + "--spike-max-concurrency must be >= --spike-min-concurrency", + ); + } + } + + return args; +} + +function sleep(ms: number): Promise { + return new Promise((resolveSleep) => setTimeout(resolveSleep, ms)); +} + +function resolveEngineBinary(): string { + if (process.env.RIVET_ENGINE_BINARY) return process.env.RIVET_ENGINE_BINARY; + if (existsSync(REPO_ENGINE_BINARY)) return REPO_ENGINE_BINARY; + throw new Error( + `No local rivet-engine binary found. Build one with cargo build -p rivet-engine or set RIVET_ENGINE_BINARY.`, + ); +} + +async function findOpenPort(): Promise { + return new Promise((resolvePort, reject) => { + const server = createServer(); + server.on("error", reject); + server.listen(0, "127.0.0.1", () => { + const address = server.address(); + if (address === null || typeof address === "string") { + server.close(() => reject(new Error("failed to allocate open port"))); + return; + } + const port = address.port; + server.close(() => resolvePort(port)); + }); + }); +} + +async function waitForHttpOk( + url: string, + label: string, + child: ChildProcess, + logs: string[], + timeoutMs = 20_000, +): Promise { + const deadline = Date.now() + timeoutMs; + let lastError: unknown; + + while (Date.now() < deadline) { + if (child.exitCode !== null) { + throw new Error(`${label} exited before ready:\n${logs.join("")}`); + } + + try { + const response = await fetch(url); + if (response.ok) return; + lastError = new Error(`${label} returned ${response.status}`); + } catch (err) { + lastError = err; + } + + await sleep(100); + } + + throw lastError instanceof Error + ? lastError + : new Error(`timed out waiting for ${label}`); +} + +function attachLogs( + child: ChildProcess, + label: string, + logPath: string, + logs: string[], +) { + const append = (chunk: Buffer) => { + const text = chunk.toString(); + logs.push(text); + if (logs.length > 200) logs.splice(0, logs.length - 200); + appendFileSync(logPath, text); + }; + child.stdout?.on("data", append); + child.stderr?.on("data", append); + child.once("exit", (code, signal) => { + append( + Buffer.from( + JSON.stringify({ + kind: "child_exit", + label, + code, + signal, + timestamp: new Date().toISOString(), + }) + "\n", + ), + ); + }); +} + +async function startEngine(args: Args, runDir: string): Promise { + const endpointUrl = new URL(args.endpoint); + const guardHost = endpointUrl.hostname || "127.0.0.1"; + const guardPort = Number.parseInt(endpointUrl.port, 10); + if (!Number.isFinite(guardPort) || guardPort <= 0) { + throw new Error(`endpoint must include a numeric port: ${args.endpoint}`); + } + + const dbRoot = mkdtempSync(join(tmpdir(), "sqlite-memory-soak-engine-")); + const configPath = join(runDir, "engine.config.json"); + const logPath = join(runDir, "engine.log"); + const logs: string[] = []; + writeFileSync( + configPath, + `${JSON.stringify( + { + topology: { + datacenter_label: 1, + datacenters: { + default: { + datacenter_label: 1, + is_leader: true, + public_url: `${args.endpoint.replace(/\/$/, "")}/`, + peer_url: `http://${guardHost}:${guardPort + 1}/`, + proxy_url: null, + valid_hosts: null, + }, + }, + }, + }, + null, + 2, + )}\n`, + ); + const env: NodeJS.ProcessEnv = { + ...process.env, + RIVET__GUARD__HOST: guardHost, + RIVET__GUARD__PORT: guardPort.toString(), + RIVET__API_PEER__HOST: guardHost, + RIVET__API_PEER__PORT: (guardPort + 1).toString(), + RIVET__METRICS__HOST: guardHost, + RIVET__METRICS__PORT: (guardPort + 10).toString(), + RIVET__FILE_SYSTEM__PATH: join(dbRoot, "db"), + _RIVET_METRICS_TOKEN: args.metricsToken, + MALLOC_ARENA_MAX: process.env.MALLOC_ARENA_MAX ?? "2", + MALLOC_TRIM_THRESHOLD_: process.env.MALLOC_TRIM_THRESHOLD_ ?? "131072", + }; + const child = spawn(resolveEngineBinary(), ["start", "--config", configPath], { + env, + stdio: ["ignore", "pipe", "pipe"], + }); + attachLogs(child, "engine", logPath, logs); + + try { + await waitForHttpOk( + `${args.endpoint.replace(/\/$/, "")}/health`, + "rivet-engine", + child, + logs, + ); + return { child, label: "engine", logPath, logs, dbRoot }; + } catch (err) { + await stopChild({ child, label: "engine", logPath, logs }); + rmSync(dbRoot, { recursive: true, force: true }); + throw err; + } +} + +async function startKitchenSinkServer( + args: Args, + runDir: string, + serverPort: number, +): Promise { + const logPath = join(runDir, "kitchen-sink.log"); + const logs: string[] = []; + const serverlessUrl = `http://127.0.0.1:${serverPort}/api/rivet`; + const env: NodeJS.ProcessEnv = { + ...process.env, + PORT: serverPort.toString(), + RIVET_ENDPOINT: args.endpoint, + RIVET_TOKEN: process.env.RIVET_TOKEN ?? "dev", + RIVET_NAMESPACE: process.env.RIVET_NAMESPACE ?? "default", + RIVET_POOL: process.env.RIVET_POOL ?? "default", + RIVET_SERVERLESS_URL: serverlessUrl, + RIVET_SERVERLESS_REQUEST_LIFESPAN: + args.requestLifespanSeconds.toString(), + RIVET_SERVERLESS_DRAIN_GRACE_PERIOD: + process.env.RIVET_SERVERLESS_DRAIN_GRACE_PERIOD ?? "5", + RIVET_SERVERLESS_MAX_START_PAYLOAD_BYTES: + args.serverlessMaxStartPayloadBytes.toString(), + SQLITE_MEMORY_SOAK_DIAGNOSTICS: "1", + MALLOC_ARENA_MAX: process.env.MALLOC_ARENA_MAX ?? "2", + MALLOC_TRIM_THRESHOLD_: process.env.MALLOC_TRIM_THRESHOLD_ ?? "131072", + }; + delete env.RIVET_RUN_ENGINE; + + const nodeArgs = [ + ...(args.forceGcSamples ? ["--expose-gc"] : []), + "--import", + "@rivetkit/sql-loader", + "--import", + "tsx", + "src/server.ts", + ]; + const command = + process.env.SQLITE_MEMORY_SOAK_STRACE === "1" ? "strace" : process.execPath; + const commandArgs = + process.env.SQLITE_MEMORY_SOAK_STRACE === "1" + ? [ + "-ff", + "-tt", + "-e", + "trace=none", + "-e", + "signal=all", + "-o", + join(runDir, "kitchen-sink.strace"), + process.execPath, + ...nodeArgs, + ] + : nodeArgs; + const child = spawn(command, commandArgs, { + cwd: EXAMPLE_DIR, + env, + stdio: ["ignore", "pipe", "pipe"], + }); + attachLogs(child, "kitchen-sink", logPath, logs); + + try { + await waitForHttpOk( + `http://127.0.0.1:${serverPort}/debug/memory`, + "kitchen-sink", + child, + logs, + ); + await waitForHttpOk( + `${serverlessUrl}/metadata`, + "kitchen-sink metadata", + child, + logs, + ); + await configureServerlessRunner(args, serverlessUrl); + return { child, label: "kitchen-sink", logPath, logs }; + } catch (err) { + await stopChild({ child, label: "kitchen-sink", logPath, logs }); + throw err; + } +} + +async function configureServerlessRunner( + args: Args, + serverlessUrl: string, +): Promise { + const base = args.endpoint.replace(/\/$/, ""); + const namespace = process.env.RIVET_NAMESPACE ?? "default"; + const token = process.env.RIVET_TOKEN ?? "dev"; + const poolName = process.env.RIVET_POOL ?? "default"; + const datacentersResponse = await fetch(`${base}/datacenters?namespace=${namespace}`, { + headers: { Authorization: `Bearer ${token}` }, + }); + if (!datacentersResponse.ok) { + throw new Error( + `failed to list local datacenters: ${datacentersResponse.status} ${await datacentersResponse.text()}`, + ); + } + + const datacentersBody = (await datacentersResponse.json()) as { + datacenters: Array<{ name: string }>; + }; + const datacenter = datacentersBody.datacenters[0]?.name; + if (!datacenter) throw new Error("local engine returned no datacenters"); + + const response = await fetch( + `${base}/runner-configs/${poolName}?namespace=${namespace}`, + { + method: "PUT", + headers: { + Authorization: `Bearer ${token}`, + "Content-Type": "application/json", + }, + body: JSON.stringify({ + datacenters: { + [datacenter]: { + serverless: { + url: serverlessUrl, + headers: { + "x-rivet-token": token, + }, + request_lifespan: args.requestLifespanSeconds, + drain_grace_period: 5, + metadata_poll_interval: 1000, + max_runners: 100_000, + min_runners: 0, + runners_margin: 0, + slots_per_runner: 1, + }, + metadata: { + source: "kitchen-sink", + workload: "sqlite-memory-soak", + }, + drain_on_version_upgrade: true, + }, + }, + }), + }, + ); + if (!response.ok) { + throw new Error( + `failed to configure local serverless runner: ${response.status} ${await response.text()}`, + ); + } +} + +async function stopChild(managed: ManagedChild | undefined): Promise { + if (!managed) return; + const { child, label, logPath, logs } = managed; + if (child.exitCode !== null) return; + + const event = + JSON.stringify({ + kind: "harness_stop_child", + label, + pid: child.pid, + timestamp: new Date().toISOString(), + stack: new Error().stack, + }) + "\n"; + logs.push(event); + appendFileSync(logPath, event); + child.kill("SIGTERM"); + await Promise.race([ + new Promise((resolveExit) => child.once("exit", () => resolveExit())), + sleep(5_000), + ]); + if (child.exitCode === null) child.kill("SIGKILL"); +} + +async function stopEngine( + engine: LocalEngine | undefined, + keepStorage: boolean, +): Promise { + if (!engine) return; + await stopChild(engine); + if (!keepStorage) rmSync(engine.dbRoot, { recursive: true, force: true }); +} + +function parseKbLine(text: string, name: string): number | null { + const match = new RegExp(`^${name}:\\s+(\\d+)\\s+kB$`, "m").exec(text); + if (!match) return null; + return Number.parseInt(match[1]!, 10) * 1024; +} + +function parseNumberLine(text: string, name: string): number | null { + const match = new RegExp(`^${name}:\\s+(\\d+)$`, "m").exec(text); + if (!match) return null; + return Number.parseInt(match[1]!, 10); +} + +function readSmapsRollup(pid: number): Record | undefined { + try { + const text = readFileSync(`/proc/${pid}/smaps_rollup`, "utf8"); + const result: Record = {}; + for (const line of text.split("\n")) { + const match = /^([A-Za-z_]+):\s+(\d+)\s+kB$/.exec(line); + if (match) result[match[1]!] = Number.parseInt(match[2]!, 10) * 1024; + } + return result; + } catch { + return undefined; + } +} + +function parseProcStat(pid: number): { + procState: string | null; + cpuUserSeconds: number | null; + cpuSystemSeconds: number | null; + cpuTotalSeconds: number | null; + threads: number | null; +} { + try { + const text = readFileSync(`/proc/${pid}/stat`, "utf8").trim(); + const closeParen = text.lastIndexOf(")"); + if (closeParen === -1) throw new Error("missing comm terminator"); + const fields = text.slice(closeParen + 2).split(" "); + const utime = Number.parseInt(fields[11] ?? "", 10); + const stime = Number.parseInt(fields[12] ?? "", 10); + const threads = Number.parseInt(fields[17] ?? "", 10); + const cpuUserSeconds = Number.isFinite(utime) + ? utime / CLOCK_TICKS_PER_SECOND + : null; + const cpuSystemSeconds = Number.isFinite(stime) + ? stime / CLOCK_TICKS_PER_SECOND + : null; + return { + procState: fields[0] ?? null, + cpuUserSeconds, + cpuSystemSeconds, + cpuTotalSeconds: + cpuUserSeconds !== null && cpuSystemSeconds !== null + ? cpuUserSeconds + cpuSystemSeconds + : null, + threads: Number.isFinite(threads) ? threads : null, + }; + } catch { + return { + procState: null, + cpuUserSeconds: null, + cpuSystemSeconds: null, + cpuTotalSeconds: null, + threads: null, + }; + } +} + +function parseProcIo(pid: number): ProcIo | undefined { + try { + const text = readFileSync(`/proc/${pid}/io`, "utf8"); + const field = (name: string) => { + const match = new RegExp(`^${name}:\\s+(\\d+)$`, "m").exec(text); + return match ? Number.parseInt(match[1]!, 10) : null; + }; + return { + readBytes: field("read_bytes"), + writeBytes: field("write_bytes"), + syscr: field("syscr"), + syscw: field("syscw"), + }; + } catch { + return undefined; + } +} + +function countOpenFds(pid: number): number | null { + try { + return readdirSync(`/proc/${pid}/fd`).length; + } catch { + return null; + } +} + +function readProcMemory(pid: number | undefined): ProcMemory { + if (pid === undefined) { + return { + pid: null, + alive: false, + rssBytes: null, + hwmRssBytes: null, + vmSizeBytes: null, + threads: null, + error: "missing pid", + }; + } + + try { + const status = readFileSync(`/proc/${pid}/status`, "utf8"); + const stat = parseProcStat(pid); + return { + pid, + alive: true, + rssBytes: parseKbLine(status, "VmRSS"), + hwmRssBytes: parseKbLine(status, "VmHWM"), + vmSizeBytes: parseKbLine(status, "VmSize"), + threads: stat.threads ?? parseNumberLine(status, "Threads"), + procState: stat.procState, + cpuUserSeconds: stat.cpuUserSeconds, + cpuSystemSeconds: stat.cpuSystemSeconds, + cpuTotalSeconds: stat.cpuTotalSeconds, + openFds: countOpenFds(pid), + io: parseProcIo(pid), + smapsRollup: readSmapsRollup(pid), + }; + } catch (err) { + return { + pid, + alive: false, + rssBytes: null, + hwmRssBytes: null, + vmSizeBytes: null, + threads: null, + error: err instanceof Error ? err.message : String(err), + }; + } +} + +async function fetchKitchenSinkBreakdown( + serverPort: number, + forceGc: boolean, +): Promise { + try { + const url = new URL(`http://127.0.0.1:${serverPort}/debug/memory`); + if (forceGc) url.searchParams.set("gc", "1"); + const response = await fetch(url); + if (!response.ok) { + return { error: `status ${response.status}`, body: await response.text() }; + } + return await response.json(); + } catch (err) { + return { error: err instanceof Error ? err.message : String(err) }; + } +} + +async function captureKitchenSinkHeapSnapshot( + serverPort: number, + jsonlPath: string, + path: string, + label: string, +) { + const url = new URL(`http://127.0.0.1:${serverPort}/debug/heap-snapshot`); + url.searchParams.set("path", path); + const response = await fetch(url, { method: "POST" }); + const body = await response.text(); + writeEvent(jsonlPath, { + kind: "heap_snapshot", + label, + path, + status: response.status, + body, + timestamp: new Date().toISOString(), + }); + if (!response.ok) { + throw new Error(`failed to capture heap snapshot ${label}: ${response.status} ${body}`); + } +} + +function writeEvent(jsonlPath: string, event: unknown) { + appendFileSync(jsonlPath, `${JSON.stringify(event)}\n`); +} + +function logOffset(logPath: string): number { + try { + return statSync(logPath).size; + } catch { + return 0; + } +} + +function readLogSince(logPath: string, offset: number): string { + const text = readFileSync(logPath, "utf8"); + return text.slice(Math.min(offset, text.length)); +} + +async function waitForActorSleepLog( + server: ManagedChild, + actorId: string, + offset: number, + timeoutMs: number, +): Promise<{ matched: string }> { + const deadline = Date.now() + timeoutMs; + while (Date.now() < deadline) { + if (server.child.exitCode !== null) { + throw new Error( + `kitchen-sink exited before actor sleep log was observed for ${actorId}`, + ); + } + + const text = readLogSince(server.logPath, offset); + const matched = text + .split("\n") + .find( + (line) => + line.includes(actorId) && + (line.includes("sqlite_memory_pressure_on_sleep") || + line.includes("actor sleeping")), + ); + if (matched) return { matched }; + + await sleep(100); + } + + throw new Error(`timed out waiting for actor sleeping log for ${actorId}`); +} + +async function forceActorSleepViaApi( + args: Args, + actorId: string, +): Promise { + const namespace = process.env.RIVET_NAMESPACE ?? "default"; + const token = process.env.RIVET_TOKEN ?? "dev"; + const response = await fetch( + `${args.endpoint.replace(/\/$/, "")}/actors/${encodeURIComponent(actorId)}/sleep?namespace=${encodeURIComponent(namespace)}`, + { + method: "POST", + headers: { + Authorization: `Bearer ${token}`, + "Content-Type": "application/json", + }, + body: "{}", + }, + ); + const bodyText = await response.text(); + if (!response.ok) { + throw new Error( + `failed to force actor sleep: ${response.status} ${bodyText}`, + ); + } + if (!bodyText) return null; + try { + return JSON.parse(bodyText) as unknown; + } catch { + return bodyText; + } +} + +async function captureSample( + args: Args, + runId: string, + startedAt: number, + jsonlPath: string, + engine: LocalEngine, + server: ManagedChild, + serverPort: number, + samples: MemorySample[], +): Promise { + const kitchenSinkBreakdown = await fetchKitchenSinkBreakdown( + serverPort, + args.forceGcSamples, + ); + const sample: MemorySample = { + kind: "memory_sample", + runId, + elapsedMs: Date.now() - startedAt, + timestamp: new Date().toISOString(), + harness: readProcMemory(process.pid), + engine: readProcMemory(engine.child.pid), + kitchenSink: readProcMemory( + kitchenSinkPidFromBreakdown(kitchenSinkBreakdown) ?? server.child.pid, + ), + kitchenSinkBreakdown, + }; + samples.push(sample); + writeEvent(jsonlPath, sample); +} + +async function sampleLoop( + args: Args, + runId: string, + startedAt: number, + jsonlPath: string, + engine: LocalEngine, + server: ManagedChild, + serverPort: number, + samples: MemorySample[], + shouldStop: () => boolean, +): Promise { + while (!shouldStop()) { + await captureSample( + args, + runId, + startedAt, + jsonlPath, + engine, + server, + serverPort, + samples, + ); + await sleep(args.sampleIntervalMs); + } +} + +function assertCycle(result: { + integrityCheck: string; + activeRows: number; + activeBytes: number; +}) { + if (result.integrityCheck !== "ok") { + throw new Error(`sqlite integrity check failed: ${result.integrityCheck}`); + } + if (result.activeRows < 0 || result.activeBytes < 0) { + throw new Error(`invalid actor stats: ${JSON.stringify(result)}`); + } +} + +async function runActorDriver( + args: Args, + client: ReturnType>, + actorIndex: number, + workloadStartedAt: number, + jsonlPath: string, +): Promise { + const key = ["sqlite-memory-soak", args.seed, String(actorIndex)]; + const handle = client.sqliteMemoryPressure.getOrCreate(key); + const actorOffsetMs = Math.floor( + (args.cycleIntervalMs * actorIndex) / args.actors, + ); + const stopAt = + args.durationMs > 0 ? workloadStartedAt + args.durationMs : Number.POSITIVE_INFINITY; + let wroteActorWake = false; + + for (let cycle = 0; cycle < args.cycles; cycle += 1) { + const scheduledAt = + workloadStartedAt + actorOffsetMs + cycle * args.cycleIntervalMs; + if (scheduledAt >= stopAt) break; + + const waitMs = scheduledAt - Date.now(); + if (waitMs > 0) { + await sleep(waitMs); + } + if (Date.now() >= stopAt) { + writeEvent(jsonlPath, { + kind: "cycle_skipped_after_duration", + actorIndex, + key, + cycle, + scheduledAt: new Date(scheduledAt).toISOString(), + timestamp: new Date().toISOString(), + }); + break; + } + + const lateMs = Math.max(0, Date.now() - scheduledAt); + const startedAt = performance.now(); + const result = await handle.runCycle({ + seed: `${args.seed}:${actorIndex}`, + cycle, + insertRows: args.insertRows, + rowBytes: args.rowBytes, + scanRows: args.scanRows, + }); + assertCycle(result); + const durationMs = performance.now() - startedAt; + if (!wroteActorWake) { + wroteActorWake = true; + writeEvent(jsonlPath, { + kind: "actor_wake", + actorIndex, + key, + source: "first_cycle", + timestamp: new Date().toISOString(), + }); + } + writeEvent(jsonlPath, { + kind: "cycle", + actorIndex, + key, + cycle, + scheduledAt: new Date(scheduledAt).toISOString(), + lateMs, + durationMs, + result, + timestamp: new Date().toISOString(), + }); + console.log( + `cycle ok actor=${actorIndex} cycle=${cycle} rows=${result.activeRows} bytes=${result.activeBytes} pages=${result.storage.page_count} ms=${durationMs.toFixed(1)}`, + ); + + if (args.wakeEvery > 0 && (cycle + 1) % args.wakeEvery === 0) { + await handle.goToSleep(); + await sleep(args.wakeDelayMs); + const stats = await handle.stats(); + if (stats.integrityCheck !== "ok") { + throw new Error( + `sqlite integrity check failed after wake: ${stats.integrityCheck}`, + ); + } + writeEvent(jsonlPath, { + kind: "wake", + actorIndex, + key, + cycle, + stats, + timestamp: new Date().toISOString(), + }); + } + } +} + +async function runChurnActorDriver( + args: Args, + client: ReturnType>, + server: ManagedChild, + actorIndex: number, + jsonlPath: string, +): Promise { + const key = ["sqlite-memory-soak", args.seed, String(actorIndex)]; + const handle = client.sqliteMemoryPressure.getOrCreate(key); + if (args.reset) { + const resetStartedAt = performance.now(); + const reset = await handle.reset(); + writeEvent(jsonlPath, { + kind: "actor_wake", + actorIndex, + key, + source: "reset", + timestamp: new Date().toISOString(), + }); + writeEvent(jsonlPath, { + kind: "actor_reset", + actorIndex, + key, + durationMs: performance.now() - resetStartedAt, + reset, + timestamp: new Date().toISOString(), + }); + } + + const actorStartedAt = Date.now(); + const sleepAt = actorStartedAt + args.churnSleepAfterMs; + const stopAt = + args.durationMs > 0 ? actorStartedAt + args.durationMs : Number.POSITIVE_INFINITY; + + for (let cycle = 0; cycle < args.cycles; cycle += 1) { + const scheduledAt = actorStartedAt + cycle * args.cycleIntervalMs; + if (scheduledAt >= sleepAt || scheduledAt >= stopAt) break; + + const waitMs = scheduledAt - Date.now(); + if (waitMs > 0) await sleep(waitMs); + if (Date.now() >= sleepAt || Date.now() >= stopAt) break; + + const lateMs = Math.max(0, Date.now() - scheduledAt); + const startedAt = performance.now(); + const result = await handle.runCycle({ + seed: `${args.seed}:${actorIndex}`, + cycle, + insertRows: args.insertRows, + rowBytes: args.rowBytes, + scanRows: args.scanRows, + }); + assertCycle(result); + const durationMs = performance.now() - startedAt; + writeEvent(jsonlPath, { + kind: "cycle", + actorIndex, + key, + cycle, + scheduledAt: new Date(scheduledAt).toISOString(), + lateMs, + durationMs, + result, + timestamp: new Date().toISOString(), + }); + console.log( + `cycle ok actor=${actorIndex} cycle=${cycle} rows=${result.activeRows} bytes=${result.activeBytes} pages=${result.storage.page_count} ms=${durationMs.toFixed(1)}`, + ); + } + + const remainingMs = sleepAt - Date.now(); + if (remainingMs > 0) await sleep(remainingMs); + + const actorId = await handle.resolve(); + const logStart = logOffset(server.logPath); + const sleepStartedAt = performance.now(); + const response = await forceActorSleepViaApi(args, actorId); + writeEvent(jsonlPath, { + kind: "actor_api_sleep", + actorIndex, + key, + actorId, + durationMs: performance.now() - sleepStartedAt, + response, + timestamp: new Date().toISOString(), + }); + const verified = await waitForActorSleepLog( + server, + actorId, + logStart, + args.sleepLogTimeoutMs, + ); + writeEvent(jsonlPath, { + kind: "actor_sleep_verified", + actorIndex, + key, + actorId, + log: verified.matched, + timestamp: new Date().toISOString(), + }); + console.log(`actor sleep verified actor=${actorIndex} actor_id=${actorId}`); +} + +async function resetActorOnSchedule( + args: Args, + client: ReturnType>, + actorIndex: number, + startupStartedAt: number, + jsonlPath: string, +): Promise { + const scheduledAt = startupStartedAt + actorIndex * args.actorStartIntervalMs; + const waitMs = scheduledAt - Date.now(); + if (waitMs > 0) { + await sleep(waitMs); + } + + const key = ["sqlite-memory-soak", args.seed, String(actorIndex)]; + const handle = client.sqliteMemoryPressure.getOrCreate(key); + const resetStartedAt = performance.now(); + const reset = await handle.reset(); + writeEvent(jsonlPath, { + kind: "actor_reset", + actorIndex, + key, + scheduledAt: new Date(scheduledAt).toISOString(), + lateMs: Math.max(0, Date.now() - scheduledAt), + durationMs: performance.now() - resetStartedAt, + reset, + timestamp: new Date().toISOString(), + }); +} + +async function resetActors( + args: Args, + client: ReturnType>, + jsonlPath: string, +): Promise { + if (!args.reset) return; + + let nextActor = 0; + const startupStartedAt = Date.now(); + + async function worker(workerId: number) { + for (;;) { + const actorIndex = nextActor; + nextActor += 1; + if (actorIndex >= args.actors) return; + + console.log(`actor reset start worker=${workerId} actor=${actorIndex}`); + await resetActorOnSchedule( + args, + client, + actorIndex, + startupStartedAt, + jsonlPath, + ); + } + } + + await Promise.all( + Array.from( + { length: Math.min(args.concurrency, args.actors) }, + (_, workerId) => worker(workerId), + ), + ); +} + +function spikeModeEnabled(args: Args): boolean { + return args.spikeMinConcurrency > 0; +} + +function targetConcurrencyForElapsed(args: Args, elapsedMs: number): number { + if (!spikeModeEnabled(args)) return args.concurrency; + + const min = args.spikeMinConcurrency; + const max = args.spikeMaxConcurrency; + if (min === max) return min; + + const phase = (elapsedMs % args.spikePeriodMs) / args.spikePeriodMs; + const wave = phase < 0.5 ? phase * 2 : (1 - phase) * 2; + return Math.round(min + (max - min) * wave); +} + +async function runWithSpikeConcurrency( + args: Args, + client: ReturnType>, + server: ManagedChild, + jsonlPath: string, +): Promise { + if (args.durationMs <= 0) { + throw new Error("--duration-ms is required with spike concurrency"); + } + + let nextActor = 0; + let active = 0; + let completed = 0; + let failed = false; + const errors: unknown[] = []; + const startedAt = Date.now(); + const stopSchedulingAt = startedAt + args.durationMs; + const workers = new Set>(); + + function spawnActor(actorIndex: number) { + active += 1; + let worker: Promise; + worker = runChurnActorDriver( + args, + client, + server, + actorIndex, + jsonlPath, + ) + .catch((err) => { + failed = true; + errors.push(err); + }) + .finally(() => { + active -= 1; + completed += 1; + workers.delete(worker); + }); + workers.add(worker); + } + + while (Date.now() < stopSchedulingAt && nextActor < args.actors && !failed) { + const elapsedMs = Date.now() - startedAt; + const target = targetConcurrencyForElapsed(args, elapsedMs); + writeEvent(jsonlPath, { + kind: "concurrency_target", + elapsedMs, + target, + active, + completed, + nextActor, + timestamp: new Date().toISOString(), + }); + + while (active < target && nextActor < args.actors) { + const actorIndex = nextActor; + nextActor += 1; + console.log( + `actor spike start actor=${actorIndex} active=${active + 1} target=${target}`, + ); + spawnActor(actorIndex); + } + + await sleep(Math.min(250, Math.max(25, args.cycleIntervalMs))); + } + + await Promise.allSettled(workers); + if (errors.length > 0) { + const first = errors[0]; + throw first instanceof Error ? first : new Error(String(first)); + } + if (nextActor >= args.actors) { + throw new Error( + `ran out of actors before spike duration completed: actors=${args.actors}`, + ); + } +} + +async function runWithActorConcurrency( + args: Args, + client: ReturnType>, + server: ManagedChild, + jsonlPath: string, +): Promise { + let nextActor = 0; + if (args.churnSleepAfterMs > 0) { + if (spikeModeEnabled(args)) { + await runWithSpikeConcurrency(args, client, server, jsonlPath); + return; + } + + async function churnWorker(workerId: number) { + for (;;) { + const actorIndex = nextActor; + nextActor += 1; + if (actorIndex >= args.actors) return; + + console.log(`actor churn start worker=${workerId} actor=${actorIndex}`); + await runChurnActorDriver( + args, + client, + server, + actorIndex, + jsonlPath, + ); + } + } + + await Promise.all( + Array.from( + { length: Math.min(args.concurrency, args.actors) }, + (_, workerId) => churnWorker(workerId), + ), + ); + return; + } + + await resetActors(args, client, jsonlPath); + + const workloadStartedAt = Date.now() + args.cycleIntervalMs; + + async function worker(workerId: number) { + for (;;) { + const actorIndex = nextActor; + nextActor += 1; + if (actorIndex >= args.actors) return; + + console.log(`actor driver start worker=${workerId} actor=${actorIndex}`); + await runActorDriver( + args, + client, + actorIndex, + workloadStartedAt, + jsonlPath, + ); + } + } + + await Promise.all( + Array.from( + { length: Math.min(args.concurrency, args.actors) }, + (_, workerId) => worker(workerId), + ), + ); +} + +async function cleanupActors( + args: Args, + client: ReturnType>, + jsonlPath: string, +): Promise { + if (!args.cleanup) return; + + await Promise.all( + Array.from({ length: args.actors }, async (_, actorIndex) => { + const key = ["sqlite-memory-soak", args.seed, String(actorIndex)]; + const handle = client.sqliteMemoryPressure.getOrCreate(key); + const reset = await handle.reset(); + const sleep = await handle.goToSleep(); + writeEvent(jsonlPath, { + kind: "actor_cleanup", + actorIndex, + key, + reset, + sleep, + timestamp: new Date().toISOString(), + }); + }), + ); +} + +function bytesToMiB(bytes: number | null): string { + if (bytes === null) return "n/a"; + return `${(bytes / 1024 / 1024).toFixed(1)} MiB`; +} + +function summarizeProcess( + label: string, + samples: MemorySample[], + select: (sample: MemorySample) => ProcMemory, +): string { + const rssValues = samples + .map((sample) => select(sample).rssBytes) + .filter((value): value is number => typeof value === "number"); + if (rssValues.length === 0) return `${label}: no samples`; + + const first = rssValues[0]!; + const final = rssValues[rssValues.length - 1]!; + const max = Math.max(...rssValues); + return `${label}: start=${bytesToMiB(first)} max=${bytesToMiB(max)} final=${bytesToMiB(final)} delta=${bytesToMiB(final - first)}`; +} + +function summarizeKitchenBreakdown(samples: MemorySample[]): string | undefined { + const breakdowns = samples + .map((sample) => sample.kitchenSinkBreakdown) + .filter((value): value is { estimates?: Record } => { + return typeof value === "object" && value !== null && "estimates" in value; + }); + if (breakdowns.length === 0) return undefined; + + const first = breakdowns[0]!.estimates ?? {}; + const final = breakdowns[breakdowns.length - 1]!.estimates ?? {}; + return [ + "kitchen estimates:", + `jsHeapUsed ${bytesToMiB(first.jsHeapUsedBytes ?? null)} -> ${bytesToMiB(final.jsHeapUsedBytes ?? null)}`, + `v8External ${bytesToMiB(first.v8ExternalBytes ?? null)} -> ${bytesToMiB(final.v8ExternalBytes ?? null)}`, + `nativeNonV8 ${bytesToMiB(first.nativeNonV8ResidentEstimateBytes ?? null)} -> ${bytesToMiB(final.nativeNonV8ResidentEstimateBytes ?? null)}`, + ].join(" "); +} + +function summarizeCycleVfs(jsonlPath: string): string | undefined { + let count = 0; + let final: + | { + actorIndex: number; + cycle: number; + result?: { + storage?: { + page_count?: number; + page_size?: number; + vfs?: { + pageCacheEntries?: number; + pageCacheWeightedSize?: number; + pageCacheCapacityPages?: number; + writeBufferDirtyPages?: number; + dbSizePages?: number; + }; + }; + }; + } + | undefined; + let totalCacheEntries = 0; + let totalCacheBytes = 0; + let totalDbPages = 0; + for (const line of readFileSync(jsonlPath, "utf8").split("\n")) { + if (!line) continue; + const event = JSON.parse(line) as { + kind?: string; + actorIndex?: number; + cycle?: number; + result?: { + storage?: { + page_count?: number; + page_size?: number; + vfs?: { + pageCacheEntries?: number; + pageCacheWeightedSize?: number; + pageCacheCapacityPages?: number; + writeBufferDirtyPages?: number; + dbSizePages?: number; + }; + }; + }; + }; + if (event.kind !== "cycle") continue; + const vfs = event.result?.storage?.vfs; + if (!vfs || typeof vfs.pageCacheEntries !== "number") continue; + count++; + final = event as typeof final; + totalCacheEntries += vfs.pageCacheEntries; + totalCacheBytes += + (vfs.pageCacheWeightedSize ?? 0) * + (event.result?.storage?.page_size ?? SQLITE_PAGE_SIZE_BYTES); + totalDbPages += vfs.dbSizePages ?? event.result?.storage?.page_count ?? 0; + } + if (!final || count === 0) return undefined; + const avgCacheBytes = totalCacheBytes / count; + const avgCacheEntries = totalCacheEntries / count; + const avgDbBytes = (totalDbPages / count) * 4096; + const finalVfs = final.result?.storage?.vfs; + return [ + "vfs:", + `samples=${count}`, + `avg_cache=${bytesToMiB(avgCacheBytes)}`, + `avg_entries=${avgCacheEntries.toFixed(0)}`, + `avg_db=${bytesToMiB(avgDbBytes)}`, + `final_actor=${final.actorIndex}`, + `final_cycle=${final.cycle}`, + `final_cache=${bytesToMiB( + finalVfs?.pageCacheWeightedSize === undefined + ? null + : finalVfs.pageCacheWeightedSize * + (final.result?.storage?.page_size ?? SQLITE_PAGE_SIZE_BYTES), + )}`, + `final_entries=${finalVfs?.pageCacheEntries ?? "n/a"}`, + ].join(" "); +} + +function summarizeActorSleeps(jsonlPath: string): string | undefined { + let requested = 0; + let verified = 0; + for (const line of readFileSync(jsonlPath, "utf8").split("\n")) { + if (!line) continue; + const event = JSON.parse(line) as { kind?: string }; + if (event.kind === "actor_api_sleep") requested++; + if (event.kind === "actor_sleep_verified") verified++; + } + if (requested === 0 && verified === 0) return undefined; + return `actor sleeps: api_requested=${requested} log_verified=${verified}`; +} + +async function main(): Promise { + const args = parseArgs(process.argv.slice(2)); + const serverPort = args.serverPort > 0 ? args.serverPort : await findOpenPort(); + const runId = args.seed.replace(/[^a-zA-Z0-9_.-]/g, "_"); + const outputRoot = resolve(REPO_ROOT, args.outputDir); + const runDir = join(outputRoot, runId); + const jsonlPath = join(runDir, "events.jsonl"); + mkdirSync(runDir, { recursive: true }); + + let engine: LocalEngine | undefined; + let server: ManagedChild | undefined; + let stopSampling = false; + const samples: MemorySample[] = []; + const startedAt = Date.now(); + + writeEvent(jsonlPath, { + kind: "run_start", + runId, + args: { ...args, serverPort }, + timestamp: new Date().toISOString(), + }); + + try { + console.log("SQLite memory soak"); + console.log(`run_id=${runId}`); + console.log(`endpoint=${args.endpoint}`); + console.log(`server_port=${serverPort}`); + console.log( + `request_lifespan_seconds=${args.requestLifespanSeconds}`, + ); + console.log(`output=${jsonlPath}`); + + engine = await startEngine(args, runDir); + console.log(`engine pid=${engine.child.pid} log=${engine.logPath}`); + + server = await startKitchenSinkServer(args, runDir, serverPort); + console.log(`kitchen_sink pid=${server.child.pid} log=${server.logPath}`); + + await captureSample( + args, + runId, + startedAt, + jsonlPath, + engine, + server, + serverPort, + samples, + ); + if (process.env.SQLITE_MEMORY_SOAK_HEAP_SNAPSHOTS === "1") { + await captureKitchenSinkHeapSnapshot( + serverPort, + jsonlPath, + join(runDir, "kitchen-sink-start.heapsnapshot"), + "start", + ); + } + const sampler = sampleLoop( + args, + runId, + startedAt, + jsonlPath, + engine, + server, + serverPort, + samples, + () => stopSampling, + ); + + const client = createClient({ + endpoint: args.endpoint, + namespace: process.env.RIVET_NAMESPACE ?? "default", + token: process.env.RIVET_TOKEN ?? "dev", + poolName: process.env.RIVET_POOL ?? "default", + }); + + if (args.preWorkloadWaitMs > 0) { + writeEvent(jsonlPath, { + kind: "pre_workload_wait_start", + durationMs: args.preWorkloadWaitMs, + timestamp: new Date().toISOString(), + }); + await sleep(args.preWorkloadWaitMs); + await captureSample( + args, + runId, + startedAt, + jsonlPath, + engine, + server, + serverPort, + samples, + ); + writeEvent(jsonlPath, { + kind: "pre_workload_wait_end", + durationMs: args.preWorkloadWaitMs, + timestamp: new Date().toISOString(), + }); + } + + await runWithActorConcurrency(args, client, server, jsonlPath); + await captureSample( + args, + runId, + startedAt, + jsonlPath, + engine, + server, + serverPort, + samples, + ); + if (args.postChurnWaitMs > 0) { + writeEvent(jsonlPath, { + kind: "post_churn_wait_start", + durationMs: args.postChurnWaitMs, + timestamp: new Date().toISOString(), + }); + await sleep(args.postChurnWaitMs); + await captureSample( + args, + runId, + startedAt, + jsonlPath, + engine, + server, + serverPort, + samples, + ); + writeEvent(jsonlPath, { + kind: "post_churn_wait_end", + durationMs: args.postChurnWaitMs, + timestamp: new Date().toISOString(), + }); + } + if (process.env.SQLITE_MEMORY_SOAK_HEAP_SNAPSHOTS === "1") { + await captureKitchenSinkHeapSnapshot( + serverPort, + jsonlPath, + join(runDir, "kitchen-sink-final.heapsnapshot"), + "final", + ); + } + await cleanupActors(args, client, jsonlPath); + if (args.cleanup && args.postCleanupWaitMs > 0) { + await sleep(args.postCleanupWaitMs); + } + await captureSample( + args, + runId, + startedAt, + jsonlPath, + engine, + server, + serverPort, + samples, + ); + + stopSampling = true; + await sampler; + + writeEvent(jsonlPath, { + kind: "run_complete", + runId, + durationMs: Date.now() - startedAt, + timestamp: new Date().toISOString(), + }); + + console.log("summary"); + console.log(summarizeProcess("harness", samples, (sample) => sample.harness)); + console.log(summarizeProcess("engine", samples, (sample) => sample.engine)); + console.log( + summarizeProcess("kitchen-sink", samples, (sample) => sample.kitchenSink), + ); + const kitchenSummary = summarizeKitchenBreakdown(samples); + if (kitchenSummary) console.log(kitchenSummary); + const vfsSummary = summarizeCycleVfs(jsonlPath); + if (vfsSummary) console.log(vfsSummary); + const sleepSummary = summarizeActorSleeps(jsonlPath); + if (sleepSummary) console.log(sleepSummary); + console.log(`events=${jsonlPath}`); + } catch (err) { + writeEvent(jsonlPath, { + kind: "run_error", + runId, + error: err instanceof Error ? err.stack ?? err.message : String(err), + timestamp: new Date().toISOString(), + }); + throw err; + } finally { + stopSampling = true; + await stopChild(server); + await stopEngine(engine, args.keepStorage); + } +} + +main().catch((err) => { + console.error(err instanceof Error ? err.stack ?? err.message : err); + process.exit(1); +}); diff --git a/examples/kitchen-sink/src/actors/testing/sqlite-memory-pressure.ts b/examples/kitchen-sink/src/actors/testing/sqlite-memory-pressure.ts new file mode 100644 index 0000000000..2279dafa88 --- /dev/null +++ b/examples/kitchen-sink/src/actors/testing/sqlite-memory-pressure.ts @@ -0,0 +1,334 @@ +import { actor } from "rivetkit"; +import type { SqliteNativeMetrics } from "rivetkit/db"; +import { db } from "rivetkit/db"; + +interface RunCycleInput { + seed: string; + cycle: number; + insertRows?: number; + rowBytes?: number; + deleteRows?: number; + retainRows?: number; + scanRows?: number; +} + +interface CountRow { + count: number; +} + +interface StorageRow { + page_count: number; + freelist_count: number; + page_size: number; + vfs: SqliteNativeMetrics | null; +} + +const DEFAULT_INSERT_ROWS = 128; +const DEFAULT_ROW_BYTES = 16 * 1024; +// const DEFAULT_DELETE_ROWS = 64; +// const DEFAULT_RETAIN_ROWS = 1024; +const DEFAULT_SCAN_ROWS = 512; +const INSERT_BATCH_ROWS = 32; + +function finiteInt(value: number | undefined, fallback: number): number { + if (value === undefined) return fallback; + if (!Number.isFinite(value) || value < 0) { + throw new Error(`expected a non-negative finite number, got ${value}`); + } + return Math.floor(value); +} + +function copyNativeMetrics( + metrics: SqliteNativeMetrics | null | undefined, +): SqliteNativeMetrics | null { + if (!metrics) return null; + const raw = metrics as unknown as Record; + const numberField = (camel: string, snake: string) => + Number(raw[camel] ?? raw[snake] ?? 0); + return { + requestBuildNs: numberField("requestBuildNs", "request_build_ns"), + serializeNs: numberField("serializeNs", "serialize_ns"), + transportNs: numberField("transportNs", "transport_ns"), + stateUpdateNs: numberField("stateUpdateNs", "state_update_ns"), + totalNs: numberField("totalNs", "total_ns"), + commitCount: numberField("commitCount", "commit_count"), + pageCacheEntries: numberField("pageCacheEntries", "page_cache_entries"), + pageCacheWeightedSize: numberField( + "pageCacheWeightedSize", + "page_cache_weighted_size", + ), + pageCacheCapacityPages: numberField( + "pageCacheCapacityPages", + "page_cache_capacity_pages", + ), + writeBufferDirtyPages: numberField( + "writeBufferDirtyPages", + "write_buffer_dirty_pages", + ), + dbSizePages: numberField("dbSizePages", "db_size_pages"), + }; +} + +async function queryOne( + database: { execute: (sql: string, ...args: unknown[]) => Promise }, + sql: string, + ...args: unknown[] +): Promise { + const rows = await database.execute(sql, ...args); + if (!rows[0]) throw new Error(`query returned no rows: ${sql}`); + return rows[0] as T; +} + +async function storageStats(database: { + execute: (sql: string, ...args: unknown[]) => Promise; + nativeMetrics?: () => + | SqliteNativeMetrics + | Promise + | null; +}): Promise { + const [pageCount, freelistCount, pageSize] = await Promise.all([ + queryOne<{ page_count: number }>(database, "PRAGMA page_count"), + queryOne<{ freelist_count: number }>(database, "PRAGMA freelist_count"), + queryOne<{ page_size: number }>(database, "PRAGMA page_size"), + ]); + + const nativeMetrics = await database.nativeMetrics?.(); + const copiedMetrics = copyNativeMetrics(nativeMetrics); + + return { + page_count: pageCount.page_count, + freelist_count: freelistCount.freelist_count, + page_size: pageSize.page_size, + vfs: copiedMetrics, + }; +} + +export const sqliteMemoryPressure = actor({ + options: { + actionTimeout: 300_000, + }, + state: { + sleepCount: 0, + }, + db: db({ + onMigrate: async (database) => { + await database.execute(` + CREATE TABLE IF NOT EXISTS pressure_rows ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + seed TEXT NOT NULL, + cycle INTEGER NOT NULL, + bucket INTEGER NOT NULL, + payload BLOB NOT NULL, + touched_count INTEGER NOT NULL DEFAULT 0, + created_at INTEGER NOT NULL + ) + `); + await database.execute( + "CREATE INDEX IF NOT EXISTS idx_pressure_rows_seed_cycle ON pressure_rows(seed, cycle)", + ); + await database.execute( + "CREATE INDEX IF NOT EXISTS idx_pressure_rows_bucket ON pressure_rows(bucket)", + ); + await database.execute(` + CREATE TABLE IF NOT EXISTS pressure_cycles ( + cycle INTEGER PRIMARY KEY, + seed TEXT NOT NULL, + inserted_rows INTEGER NOT NULL, + deleted_rows INTEGER NOT NULL, + active_rows INTEGER NOT NULL, + active_bytes INTEGER NOT NULL, + duration_ms REAL NOT NULL, + created_at INTEGER NOT NULL + ) + `); + }, + }), + onSleep: (c) => { + c.state.sleepCount += 1; + console.log( + JSON.stringify({ + kind: "sqlite_memory_pressure_on_sleep", + actorId: c.actorId, + sleepCount: c.state.sleepCount, + timestamp: new Date().toISOString(), + }), + ); + }, + actions: { + reset: async (c) => { + await c.db.execute("DELETE FROM pressure_cycles"); + await c.db.execute("DELETE FROM pressure_rows"); + await c.db.execute("VACUUM"); + return { + ok: true, + storage: await storageStats(c.db), + }; + }, + + goToSleep: (c) => { + c.sleep(); + return { ok: true }; + }, + + releaseStorage: async (c) => { + const before = await storageStats(c.db); + // Keep the remote DB large for the sleep reclamation soak. + // await c.db.execute("DELETE FROM pressure_cycles"); + // await c.db.execute("DELETE FROM pressure_rows"); + // await c.db.execute("VACUUM"); + return { + ok: true, + before, + after: await storageStats(c.db), + }; + }, + + stats: async (c) => { + const rowStats = await queryOne<{ + active_rows: number; + active_bytes: number | null; + touched_sum: number | null; + }>( + c.db, + "SELECT COUNT(*) AS active_rows, COALESCE(SUM(length(payload)), 0) AS active_bytes, COALESCE(SUM(touched_count), 0) AS touched_sum FROM pressure_rows", + ); + const cycles = await queryOne( + c.db, + "SELECT COUNT(*) AS count FROM pressure_cycles", + ); + const integrity = await queryOne<{ integrity_check: string }>( + c.db, + "PRAGMA integrity_check", + ); + + return { + activeRows: rowStats.active_rows, + activeBytes: rowStats.active_bytes ?? 0, + touchedCount: rowStats.touched_sum ?? 0, + cycles: cycles.count, + integrityCheck: integrity.integrity_check, + storage: await storageStats(c.db), + }; + }, + + runCycle: async (c, input: RunCycleInput) => { + const startedAt = performance.now(); + const insertRows = finiteInt(input.insertRows, DEFAULT_INSERT_ROWS); + const rowBytes = finiteInt(input.rowBytes, DEFAULT_ROW_BYTES); + // const deleteRows = finiteInt(input.deleteRows, DEFAULT_DELETE_ROWS); + // const retainRows = Math.max( + // 1, + // finiteInt(input.retainRows, DEFAULT_RETAIN_ROWS), + // ); + const scanRows = Math.max(1, finiteInt(input.scanRows, DEFAULT_SCAN_ROWS)); + const now = Date.now(); + let insertedRows = 0; + + await c.db.execute("BEGIN"); + try { + while (insertedRows < insertRows) { + const batchRows = Math.min( + INSERT_BATCH_ROWS, + insertRows - insertedRows, + ); + const placeholders: string[] = []; + const args: unknown[] = []; + + for (let i = 0; i < batchRows; i += 1) { + const rowIndex = insertedRows + i; + placeholders.push("(?, ?, ?, randomblob(?), 0, ?)"); + args.push( + input.seed, + input.cycle, + (input.cycle + rowIndex) % 32, + rowBytes, + now + rowIndex, + ); + } + + await c.db.execute( + `INSERT INTO pressure_rows (seed, cycle, bucket, payload, touched_count, created_at) VALUES ${placeholders.join(", ")}`, + ...args, + ); + insertedRows += batchRows; + } + await c.db.execute("COMMIT"); + } catch (err) { + await c.db.execute("ROLLBACK").catch(() => undefined); + throw err; + } + + const scan = await c.db.execute( + "SELECT id, length(payload) AS payload_bytes FROM pressure_rows ORDER BY id DESC LIMIT ?", + scanRows, + ); + const bucketAgg = await c.db.execute( + "SELECT bucket, COUNT(*) AS rows, SUM(length(payload)) AS bytes FROM pressure_rows WHERE bucket BETWEEN ? AND ? GROUP BY bucket ORDER BY bucket", + input.cycle % 16, + (input.cycle % 16) + 15, + ); + await c.db.execute( + "UPDATE pressure_rows SET touched_count = touched_count + 1 WHERE id IN (SELECT id FROM pressure_rows ORDER BY id DESC LIMIT ?)", + Math.min(scanRows, insertRows), + ); + + let deletedRows = 0; + // const beforeDelete = await queryOne( + // c.db, + // "SELECT COUNT(*) AS count FROM pressure_rows", + // ); + // const overRetainRows = Math.max(0, beforeDelete.count - retainRows); + // const deleteLimit = Math.max(deleteRows, overRetainRows); + // if (deleteLimit > 0) { + // await c.db.execute( + // "DELETE FROM pressure_rows WHERE id IN (SELECT id FROM pressure_rows ORDER BY id ASC LIMIT ?)", + // deleteLimit, + // ); + // const afterDelete = await queryOne( + // c.db, + // "SELECT changes() AS count", + // ); + // deletedRows = afterDelete.count; + // } + + const rowStats = await queryOne<{ + active_rows: number; + active_bytes: number | null; + }>( + c.db, + "SELECT COUNT(*) AS active_rows, COALESCE(SUM(length(payload)), 0) AS active_bytes FROM pressure_rows", + ); + const integrity = await queryOne<{ integrity_check: string }>( + c.db, + "PRAGMA integrity_check", + ); + const durationMs = performance.now() - startedAt; + + await c.db.execute( + "INSERT OR REPLACE INTO pressure_cycles (cycle, seed, inserted_rows, deleted_rows, active_rows, active_bytes, duration_ms, created_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?)", + input.cycle, + input.seed, + insertedRows, + deletedRows, + rowStats.active_rows, + rowStats.active_bytes ?? 0, + durationMs, + now, + ); + + return { + seed: input.seed, + cycle: input.cycle, + insertedRows, + deletedRows, + activeRows: rowStats.active_rows, + activeBytes: rowStats.active_bytes ?? 0, + scannedRows: scan.length, + bucketsRead: bucketAgg.length, + integrityCheck: integrity.integrity_check, + storage: await storageStats(c.db), + durationMs, + }; + }, + }, +}); diff --git a/examples/kitchen-sink/src/index.ts b/examples/kitchen-sink/src/index.ts index 34c21aa063..96f908d8fc 100644 --- a/examples/kitchen-sink/src/index.ts +++ b/examples/kitchen-sink/src/index.ts @@ -120,6 +120,7 @@ import { testSqliteBench } from "./actors/testing/test-sqlite-bench.ts"; import { sqliteColdStartBench } from "./actors/testing/sqlite-cold-start-bench.ts"; import { sqliteRealworldBench } from "./actors/testing/sqlite-realworld-bench.ts"; import { rawSqliteFuzzer } from "./actors/testing/raw-sqlite-fuzzer.ts"; +import { sqliteMemoryPressure } from "./actors/testing/sqlite-memory-pressure.ts"; // AI import { aiAgent } from "./actors/ai/ai-agent.ts"; @@ -259,6 +260,7 @@ export const registry = setup({ sqliteColdStartBench, sqliteRealworldBench, rawSqliteFuzzer, + sqliteMemoryPressure, // AI aiAgent, },