From 7ac1f0d8fd79b66ccfd8aa3ef10dd09c70092046 Mon Sep 17 00:00:00 2001 From: Nathan Flurry Date: Thu, 2 Jul 2026 06:30:22 -0700 Subject: [PATCH] test(benchmarks): concurrency and interference lanes MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Three focused lanes measuring the multi-tenant behavior the single-op matrix cannot see, wired as BENCH_ONLY lanes with JSON regression rows: - concurrency-vms: N in {1,4,8} prewarmed VMs running a sustained echo loop. Result: near-ideal isolation — 7.06x aggregate at N=8 (0.88 of ideal), p95 5.5 -> 8.4ms. - interference: busy VM (cpu+fs churn) beside a latency-probe VM. Result: no measurable cross-VM tax on this host (p50 tax ~1.0). - concurrent-processes: N in {1,4,8} guest processes in ONE VM. Result: the per-VM single-threaded servicing ceiling — 2.18x at N=8 (0.27 of ideal), p95 0.09 -> 0.21ms. This is the number the Phase 5 concurrency work moves. Lanes self-assert N=1 agreement with the matrix ballpark and verify their work; note: a persistent-listener TCP variant hits the sync-bridge deferred message queue cap, so the TCP loop is connect/echo/close-shaped (cap noted for follow-up). --- packages/benchmarks/README.md | 3 + packages/benchmarks/run-benchmarks.sh | 6 + .../src/focused/concurrency-common.ts | 476 ++++++++++++++++++ .../src/focused/concurrency-vms.bench.ts | 116 +++++ .../src/focused/concurrent-processes.bench.ts | 104 ++++ .../src/focused/interference.bench.ts | 153 ++++++ 6 files changed, 858 insertions(+) create mode 100644 packages/benchmarks/src/focused/concurrency-common.ts create mode 100644 packages/benchmarks/src/focused/concurrency-vms.bench.ts create mode 100644 packages/benchmarks/src/focused/concurrent-processes.bench.ts create mode 100644 packages/benchmarks/src/focused/interference.bench.ts diff --git a/packages/benchmarks/README.md b/packages/benchmarks/README.md index 02a355b69..0fc81089d 100644 --- a/packages/benchmarks/README.md +++ b/packages/benchmarks/README.md @@ -97,6 +97,9 @@ Focused lanes live under `src/focused/` and preserve the legacy CLI flags, env v - **`dns-lookup-floor`**: warm, repeated, concurrent, and fresh-process DNS lookup rows. Knobs: `BENCH_DNS_LOOKUP_ITERATIONS`, `BENCH_DNS_LOOKUP_WARMUP`, `BENCH_DNS_LOOKUP_ROWS`. - **`net-tcp-event-floor`**: TCP loopback event-floor rows. Knobs: `BENCH_NET_TCP_ITERATIONS`, `BENCH_NET_TCP_WARMUP`, `BENCH_NET_TCP_ROWS`, `BENCH_NET_TCP_POLL_DELAY_MS`, `BENCH_NET_TCP_TRACE`. - **`net-tcp-cadence-trace`**: TCP trace attribution rows with bridge tracing enabled. +- **`concurrency-vms`**: N owned sidecars/VMs concurrently run sustained `tcp_echo_small` loops for a fixed wall window. Knobs: `BENCH_CONCURRENCY_COUNTS` (default `1,4,8`), `BENCH_CONCURRENCY_DURATION_MS` (default `5000`). +- **`interference`**: one busy VM alternates CPU spin and filesystem write churn while a second VM samples `fs_write_small`. Knobs: `BENCH_INTERFERENCE_DURATION_MS` (default `5000`), `BENCH_INTERFERENCE_BUSY_DURATION_MS` (default probe duration plus `1000`). +- **`concurrent-processes`**: one VM runs N concurrent guest Node processes doing sustained `fs_write_small` loops, exposing the per-VM service ceiling. Knobs: `BENCH_PROCESS_COUNTS` (default `1,4,8`), `BENCH_PROCESS_DURATION_MS` (default `5000`). - **`readdir-scaling`**: pure readdir scaling with setup outside the timed loop. Knobs: `BENCH_READDIR_ITERATIONS`, `BENCH_READDIR_WARMUP`, `BENCH_READDIR_ENTRY_COUNTS`, `BENCH_READDIR_MODES`, `BENCH_READDIR_FIXTURES`, `BENCH_READDIR_WORKLOADS`. - **`readdir-probe`**: guarded/probe readdir shapes. - **`mount-readdir`**: host mount-table readdir scaling. Knobs: `BENCH_MOUNT_READDIR_ITERATIONS`, `BENCH_MOUNT_READDIR_WARMUP`, `BENCH_MOUNT_READDIR_COUNTS`, `BENCH_MOUNT_READDIR_ENTRY_COUNT`. diff --git a/packages/benchmarks/run-benchmarks.sh b/packages/benchmarks/run-benchmarks.sh index 5e1d8229e..3468a8171 100755 --- a/packages/benchmarks/run-benchmarks.sh +++ b/packages/benchmarks/run-benchmarks.sh @@ -223,6 +223,12 @@ run_tsx "net-tcp-cadence-trace" \ --net-bridge-trace \ ${BENCH_NET_TCP_TRACE_POLL_DELAY_MS:+--net-poll-delay-ms="${BENCH_NET_TCP_TRACE_POLL_DELAY_MS}"} +run_tsx "concurrency-vms" "$HERE/src/focused/concurrency-vms.bench.ts" + +run_tsx "interference" "$HERE/src/focused/interference.bench.ts" + +run_tsx "concurrent-processes" "$HERE/src/focused/concurrent-processes.bench.ts" + run_tsx "wasm-command-floor" \ "$HERE/src/focused/wasm-command-floor.bench.ts" \ --iterations="${BENCH_WASM_COMMAND_FLOOR_ITERATIONS:-3}" \ diff --git a/packages/benchmarks/src/focused/concurrency-common.ts b/packages/benchmarks/src/focused/concurrency-common.ts new file mode 100644 index 000000000..cf8621303 --- /dev/null +++ b/packages/benchmarks/src/focused/concurrency-common.ts @@ -0,0 +1,476 @@ +import { existsSync, readFileSync } from "node:fs"; +import { join } from "node:path"; +import { fileURLToPath } from "node:url"; +import type { NodeRuntimeResourceSnapshot } from "@secure-exec/core"; +import { round, stats, type Stats } from "../lib/perf-utils.js"; +import { + formatPacificIso, + prewarmBenchVm, + resolveBenchSidecarProvenance, + type BenchVm, + type SidecarBinaryProvenance, +} from "../lib/vm.js"; + +export const DEFAULT_CONCURRENCY_COUNTS = [1, 4, 8] as const; +export const DEFAULT_DURATION_MS = 5_000; +export const DEFAULT_TCP_WARMUP = 5; +export const DEFAULT_FS_WARMUP = 25; +export const FS_WRITE_SMALL_BYTES = 4 * 1024; + +export interface GuestLoopResult { + op: "tcp_echo_small" | "fs_write_small" | "busy_interference"; + durationMs: number; + ops: number; + verifiedOps: number; + samplesMs?: number[]; + latencyMs?: Stats; + rawSampleCount?: number; + payloadBytes?: number; + writeBytes?: number; +} + +export interface ParticipantResult { + index: number; + ops: number; + durationMs: number; + opsPerSec: number; + latencyMs: Stats; + resourceSnapshot?: NodeRuntimeResourceSnapshot; + sidecarVmHwmBytes?: number; + rawSampleCount: number; +} + +export interface ConcurrencyRow { + n: number; + durationMs: number; + aggregateOps: number; + aggregateOpsPerSec: number; + meanParticipantOpsPerSec: number; + minParticipantOpsPerSec: number; + maxParticipantOpsPerSec: number; + meanP50Ms: number; + meanP95Ms: number; + maxP95Ms: number; + scaling: { + idealScaling: 1; + throughputVsN1: number; + measuredOfIdeal: number; + }; + participants: ParticipantResult[]; +} + +export interface RegressionRow { + rowKey: string; + metric: string; + value: number; + unit: string; +} + +const REPO_ROOT = fileURLToPath(new URL("../../../..", import.meta.url)); +const MATRIX_PATH = join(REPO_ROOT, "packages/benchmarks/results/latency-matrix.json"); + +const PREWARM_OP = { + family: "focused", + name: "concurrency-prewarm", + fileLine: "packages/benchmarks/src/focused/concurrency-common.ts", + reproducer: "guest node prewarm for focused concurrency lanes", + program: "async () => {}", +}; + +export function generatedAtPacific(): string { + return formatPacificIso(new Date()); +} + +export function benchmarkProvenance(): { + generatedAt: string; + sidecar: SidecarBinaryProvenance; +} { + return { + generatedAt: generatedAtPacific(), + sidecar: resolveBenchSidecarProvenance(), + }; +} + +export async function prewarmConcurrencyVm(vm: BenchVm): Promise { + await prewarmBenchVm(vm, PREWARM_OP); +} + +export function parseCounts(value: string | undefined): number[] { + if (!value) return [...DEFAULT_CONCURRENCY_COUNTS]; + const counts = value + .split(",") + .map((part) => Number(part.trim())) + .filter((n) => Number.isInteger(n) && n > 0); + if (counts.length === 0) { + throw new Error(`invalid concurrency counts: ${JSON.stringify(value)}`); + } + if (Math.max(...counts) > 8) { + throw new Error(`concurrency count exceeds bounded max 8: ${Math.max(...counts)}`); + } + return counts; +} + +export function envNumber(name: string, fallback: number): number { + const raw = process.env[name]; + if (raw === undefined || raw === "") return fallback; + const value = Number(raw); + if (!Number.isFinite(value) || value <= 0) { + throw new Error(`${name} must be a positive number, got ${JSON.stringify(raw)}`); + } + return value; +} + +export async function writeGuestProgram( + vm: BenchVm, + path: string, + source: string, +): Promise { + await vm.writeFile(path, source); +} + +export async function runGuestJsonProgram( + vm: BenchVm, + path: string, + env: Record, +): Promise { + const result = await vm.spawnNodeCapture(path, env); + if (result.exitCode !== 0) { + throw new Error( + `guest program ${path} exited ${result.exitCode}\nstdout:\n${result.stdout}\nstderr:\n${result.stderr}`, + ); + } + const parsed = JSON.parse(result.stdout) as GuestLoopResult; + if (parsed.ops !== parsed.verifiedOps) { + throw new Error( + `${path} verification mismatch: ops=${parsed.ops} verified=${parsed.verifiedOps}`, + ); + } + const sampleCount = parsed.rawSampleCount ?? parsed.samplesMs?.length ?? 0; + if (sampleCount <= 0) { + throw new Error(`${path} produced no latency samples`); + } + if (parsed.ops <= 0) { + throw new Error(`${path} produced no operations`); + } + return parsed; +} + +export async function participantFromLoop( + index: number, + vm: BenchVm, + loop: GuestLoopResult, +): Promise { + const pid = vm.sidecarPid(); + const sampleCount = loop.rawSampleCount ?? loop.samplesMs?.length ?? 0; + return { + index, + ops: loop.ops, + durationMs: round(loop.durationMs), + opsPerSec: round((loop.ops / loop.durationMs) * 1000, 2), + latencyMs: loop.latencyMs ?? stats(loop.samplesMs ?? []), + resourceSnapshot: await vm.getResourceSnapshot(), + sidecarVmHwmBytes: pid === null ? undefined : readVmHwmBytes(pid), + rawSampleCount: sampleCount, + }; +} + +export function buildConcurrencyRow( + n: number, + durationMs: number, + participants: ParticipantResult[], + baselineOpsPerSec: number, +): ConcurrencyRow { + const aggregateOps = participants.reduce((sum, row) => sum + row.ops, 0); + const aggregateOpsPerSec = round( + participants.reduce((sum, row) => sum + row.opsPerSec, 0), + 2, + ); + const participantOps = participants.map((row) => row.opsPerSec); + const p50s = participants.map((row) => row.latencyMs.p50); + const p95s = participants.map((row) => row.latencyMs.p95); + const throughputVsN1 = round(aggregateOpsPerSec / baselineOpsPerSec, 2); + return { + n, + durationMs, + aggregateOps, + aggregateOpsPerSec, + meanParticipantOpsPerSec: round(mean(participantOps), 2), + minParticipantOpsPerSec: round(Math.min(...participantOps), 2), + maxParticipantOpsPerSec: round(Math.max(...participantOps), 2), + meanP50Ms: round(mean(p50s), 2), + meanP95Ms: round(mean(p95s), 2), + maxP95Ms: round(Math.max(...p95s), 2), + scaling: { + idealScaling: 1, + throughputVsN1, + measuredOfIdeal: round(aggregateOpsPerSec / (baselineOpsPerSec * n), 2), + }, + participants, + }; +} + +export function concurrencyRegressionRows( + benchmark: "concurrency-vms" | "concurrent-processes", + rows: ConcurrencyRow[], +): RegressionRow[] { + return rows.flatMap((row) => [ + { + rowKey: `${benchmark}.n${row.n}.aggregate_ops_per_sec`, + metric: "aggregateOpsPerSec", + value: row.aggregateOpsPerSec, + unit: "ops/s", + }, + { + rowKey: `${benchmark}.n${row.n}.mean_p95_ms`, + metric: "meanP95Ms", + value: row.meanP95Ms, + unit: "ms", + }, + { + rowKey: `${benchmark}.n${row.n}.scaling_of_ideal`, + metric: "measuredOfIdeal", + value: row.scaling.measuredOfIdeal, + unit: "ratio", + }, + ]); +} + +export function assertMatrixBallpark( + op: "fs_write_small" | "tcp_echo_small", + observedP50Ms: number, + options: { multiplier: number; fallbackCeilingMs: number }, +): void { + const matrixP50 = readMatrixGuestP50(op); + const threshold = matrixP50 === undefined + ? options.fallbackCeilingMs + : matrixP50 * options.multiplier; + if (observedP50Ms > threshold) { + throw new Error( + `${op} N=1 p50 ${observedP50Ms}ms exceeds sanity ceiling ${round( + threshold, + 2, + )}ms${matrixP50 === undefined ? " (fallback)" : ` (matrix ${matrixP50}ms x ${options.multiplier})`}`, + ); + } + console.error( + ` sanity ${op}: N=1 p50=${observedP50Ms}ms <= ${round(threshold, 2)}ms${ + matrixP50 === undefined ? " fallback ceiling" : ` (${options.multiplier}x matrix ${matrixP50}ms)` + }`, + ); +} + +export function tcpEchoSmallLoopProgram(): string { + return ` +import net from "node:net"; + +const durationMs = Number(process.env.BENCH_DURATION_MS || ${DEFAULT_DURATION_MS}); +const warmup = Number(process.env.BENCH_WARMUP || ${DEFAULT_TCP_WARMUP}); +const payload = Buffer.from("secure-exec-tcp-echo"); +const samplesMs = []; +const now = () => Number(process.hrtime.bigint()) / 1e6; + +async function once() { + const body = await new Promise((resolve, reject) => { + const server = net.createServer((socket) => { + const serverChunks = []; + let serverReceived = 0; + socket.on("data", (chunk) => { + serverChunks.push(Buffer.from(chunk)); + serverReceived += chunk.length; + if (serverReceived >= payload.length) { + socket.end(Buffer.concat(serverChunks)); + } + }); + socket.on("error", reject); + }); + server.on("error", reject); + server.listen(0, "127.0.0.1", () => { + const address = server.address(); + if (!address || typeof address === "string") { + reject(new Error("tcp echo server did not bind to a TCP port")); + return; + } + const socket = net.connect(address.port, "127.0.0.1"); + const chunks = []; + let received = 0; + socket.on("connect", () => socket.write(payload)); + socket.on("data", (chunk) => { + chunks.push(Buffer.from(chunk)); + received += chunk.length; + }); + socket.on("error", reject); + socket.on("close", () => { + server.close((error) => { + if (error) reject(error); + else resolve(Buffer.concat(chunks)); + }); + }); + }); + }); + if (!Buffer.from(body).equals(payload)) { + throw new Error("bad tcp echo payload: " + Buffer.from(body).toString("hex")); + } +} + +for (let i = 0; i < warmup; i++) await once(); +const start = now(); +const deadline = start + durationMs; +while (now() < deadline) { + const opStart = now(); + await once(); + samplesMs.push(now() - opStart); +} +const elapsed = now() - start; +process.stdout.write(JSON.stringify({ + op: "tcp_echo_small", + durationMs: elapsed, + ops: samplesMs.length, + verifiedOps: samplesMs.length, + payloadBytes: payload.length, + samplesMs, +})); +`; +} + +export function fsWriteSmallLoopProgram(): string { + return ` +import fs from "node:fs"; + +const durationMs = Number(process.env.BENCH_DURATION_MS || ${DEFAULT_DURATION_MS}); +const warmup = Number(process.env.BENCH_WARMUP || ${DEFAULT_FS_WARMUP}); +const processIndex = process.env.BENCH_PROCESS_INDEX || "0"; +const payload = Buffer.alloc(${FS_WRITE_SMALL_BYTES}, 7); +const path = "/tmp/focused-fs-write-small-" + process.pid + "-" + processIndex + ".bin"; +const samplesMs = []; +const now = () => Number(process.hrtime.bigint()) / 1e6; +const round = (n) => Math.round(n * 100) / 100; +function summarize(values) { + const sorted = [...values].sort((a, b) => a - b); + const percentile = (p) => sorted[Math.max(0, Math.ceil((p / 100) * sorted.length) - 1)]; + return { + mean: round(values.reduce((sum, value) => sum + value, 0) / values.length), + p50: round(percentile(50)), + p95: round(percentile(95)), + p99: round(percentile(99)), + min: round(sorted[0]), + max: round(sorted[sorted.length - 1]), + }; +} + +function once(iteration) { + payload[0] = iteration & 255; + fs.writeFileSync(path, payload); +} + +for (let i = 0; i < warmup; i++) once(i); +const start = now(); +const deadline = start + durationMs; +while (now() < deadline) { + const opStart = now(); + once(samplesMs.length); + samplesMs.push(now() - opStart); +} +const stat = fs.statSync(path); +fs.unlinkSync(path); +if (stat.size !== payload.length) { + throw new Error("bad fs write size: " + stat.size); +} +process.stdout.write(JSON.stringify({ + op: "fs_write_small", + durationMs: now() - start, + ops: samplesMs.length, + verifiedOps: samplesMs.length, + writeBytes: payload.length, + latencyMs: summarize(samplesMs), + rawSampleCount: samplesMs.length, +})); +`; +} + +export function busyInterferenceProgram(): string { + return ` +import fs from "node:fs"; + +const durationMs = Number(process.env.BENCH_DURATION_MS || ${DEFAULT_DURATION_MS}); +const payload = Buffer.alloc(${FS_WRITE_SMALL_BYTES}, 3); +const path = "/tmp/focused-busy-interference-" + process.pid + ".bin"; +const samplesMs = []; +const now = () => Number(process.hrtime.bigint()) / 1e6; +const round = (n) => Math.round(n * 100) / 100; +function summarize(values) { + const sorted = [...values].sort((a, b) => a - b); + const percentile = (p) => sorted[Math.max(0, Math.ceil((p / 100) * sorted.length) - 1)]; + return { + mean: round(values.reduce((sum, value) => sum + value, 0) / values.length), + p50: round(percentile(50)), + p95: round(percentile(95)), + p99: round(percentile(99)), + min: round(sorted[0]), + max: round(sorted[sorted.length - 1]), + }; +} + +function spin(ms) { + const end = now() + ms; + let x = 0; + while (now() < end) x = (x + 1) % 1000003; + return x; +} + +const start = now(); +const deadline = start + durationMs; +let writes = 0; +while (now() < deadline) { + spin(5); + const opStart = now(); + for (let i = 0; i < 8; i++) { + payload[0] = (writes + i) & 255; + fs.writeFileSync(path, payload); + } + writes += 8; + samplesMs.push(now() - opStart); +} +const stat = fs.statSync(path); +fs.unlinkSync(path); +if (stat.size !== payload.length) { + throw new Error("bad busy write size: " + stat.size); +} +process.stdout.write(JSON.stringify({ + op: "busy_interference", + durationMs: now() - start, + ops: writes, + verifiedOps: writes, + writeBytes: payload.length, + latencyMs: summarize(samplesMs), + rawSampleCount: samplesMs.length, +})); +`; +} + +function readMatrixGuestP50(op: string): number | undefined { + if (!existsSync(MATRIX_PATH)) return undefined; + try { + const parsed = JSON.parse(readFileSync(MATRIX_PATH, "utf8")) as { + latency?: Array<{ op?: string; layers?: { guest?: { p50?: number } } }>; + }; + const row = parsed.latency?.find((entry) => entry.op === op); + const p50 = row?.layers?.guest?.p50; + return typeof p50 === "number" && Number.isFinite(p50) ? p50 : undefined; + } catch { + return undefined; + } +} + +function readVmHwmBytes(pid: number): number | undefined { + try { + const status = readFileSync(`/proc/${pid}/status`, "utf8"); + const match = status.match(/^VmHWM:\s+(\d+)\s+kB/m); + return match ? Number(match[1]) * 1024 : undefined; + } catch { + return undefined; + } +} + +function mean(values: number[]): number { + return values.reduce((sum, value) => sum + value, 0) / values.length; +} diff --git a/packages/benchmarks/src/focused/concurrency-vms.bench.ts b/packages/benchmarks/src/focused/concurrency-vms.bench.ts new file mode 100644 index 000000000..e1fcad2c2 --- /dev/null +++ b/packages/benchmarks/src/focused/concurrency-vms.bench.ts @@ -0,0 +1,116 @@ +import { createBenchVm, formatSidecarProvenance, type BenchVm } from "../lib/vm.js"; +import { getHardware, printTable } from "../lib/perf-utils.js"; +import { + assertMatrixBallpark, + benchmarkProvenance, + buildConcurrencyRow, + concurrencyRegressionRows, + envNumber, + parseCounts, + participantFromLoop, + prewarmConcurrencyVm, + runGuestJsonProgram, + tcpEchoSmallLoopProgram, + writeGuestProgram, + type ConcurrencyRow, +} from "./concurrency-common.js"; + +const PROGRAM_PATH = "/tmp/focused-concurrency-vms.mjs"; + +async function runRow(n: number, durationMs: number): Promise { + const vms: BenchVm[] = []; + try { + for (let i = 0; i < n; i++) { + const vm = await createBenchVm(); + await prewarmConcurrencyVm(vm); + await writeGuestProgram(vm, PROGRAM_PATH, tcpEchoSmallLoopProgram()); + vms.push(vm); + } + const loops = await Promise.all( + vms.map((vm, index) => + runGuestJsonProgram(vm, PROGRAM_PATH, { + BENCH_DURATION_MS: String(durationMs), + BENCH_PROCESS_INDEX: String(index), + }), + ), + ); + const participants = await Promise.all( + loops.map((loop, index) => participantFromLoop(index, vms[index], loop)), + ); + return buildConcurrencyRow(n, durationMs, participants, 1); + } finally { + await Promise.allSettled(vms.map((vm) => vm.dispose())); + } +} + +async function main(): Promise { + const counts = parseCounts(process.env.BENCH_CONCURRENCY_COUNTS); + const durationMs = envNumber("BENCH_CONCURRENCY_DURATION_MS", 5_000); + const hardware = getHardware(); + const provenance = benchmarkProvenance(); + console.error("=== Concurrency VMs Focused Benchmark ==="); + console.error(`CPU: ${hardware.cpu}`); + console.error(`RAM: ${hardware.ram} | Node: ${hardware.node}`); + console.error(formatSidecarProvenance(provenance.sidecar)); + console.error(`Counts: ${counts.join(",")} | duration: ${durationMs}ms`); + + const rows: ConcurrencyRow[] = []; + let baselineOpsPerSec: number | undefined; + for (const n of counts) { + const provisional = await runRow(n, durationMs); + if (baselineOpsPerSec === undefined) { + baselineOpsPerSec = provisional.aggregateOpsPerSec; + } + const row = buildConcurrencyRow( + n, + durationMs, + provisional.participants, + baselineOpsPerSec, + ); + if (n === 1) { + assertMatrixBallpark("tcp_echo_small", row.meanP50Ms, { + multiplier: 5, + fallbackCeilingMs: 100, + }); + } + rows.push(row); + console.error( + ` N=${n}: ops/s=${row.aggregateOpsPerSec} mean.p95=${row.meanP95Ms}ms scaling=${row.scaling.measuredOfIdeal}x ideal`, + ); + } + + printTable( + ["N", "ops/s", "mean p50", "mean p95", "max p95", "vs N=1", "of ideal"], + rows.map((row) => [ + row.n, + row.aggregateOpsPerSec, + `${row.meanP50Ms}ms`, + `${row.meanP95Ms}ms`, + `${row.maxP95Ms}ms`, + `${row.scaling.throughputVsN1}x`, + `${row.scaling.measuredOfIdeal}x`, + ]), + ); + + console.log( + JSON.stringify( + { + benchmark: "concurrency-vms", + ...provenance, + hardware, + op: "tcp_echo_small", + counts, + durationMs, + rows, + regressionRows: concurrencyRegressionRows("concurrency-vms", rows), + }, + null, + 2, + ), + ); +} + +main().catch((error) => { + console.error(error); + process.exit(1); +}); diff --git a/packages/benchmarks/src/focused/concurrent-processes.bench.ts b/packages/benchmarks/src/focused/concurrent-processes.bench.ts new file mode 100644 index 000000000..08fb42e28 --- /dev/null +++ b/packages/benchmarks/src/focused/concurrent-processes.bench.ts @@ -0,0 +1,104 @@ +import { createBenchVm, formatSidecarProvenance } from "../lib/vm.js"; +import { getHardware, printTable } from "../lib/perf-utils.js"; +import { + assertMatrixBallpark, + benchmarkProvenance, + buildConcurrencyRow, + concurrencyRegressionRows, + envNumber, + fsWriteSmallLoopProgram, + parseCounts, + participantFromLoop, + prewarmConcurrencyVm, + runGuestJsonProgram, + writeGuestProgram, + type ConcurrencyRow, +} from "./concurrency-common.js"; + +const PROGRAM_PATH = "/tmp/focused-concurrent-processes.mjs"; + +async function main(): Promise { + const counts = parseCounts(process.env.BENCH_PROCESS_COUNTS); + const durationMs = envNumber("BENCH_PROCESS_DURATION_MS", 5_000); + const hardware = getHardware(); + const provenance = benchmarkProvenance(); + console.error("=== Concurrent Processes Focused Benchmark ==="); + console.error(`CPU: ${hardware.cpu}`); + console.error(`RAM: ${hardware.ram} | Node: ${hardware.node}`); + console.error(formatSidecarProvenance(provenance.sidecar)); + console.error(`Counts: ${counts.join(",")} | duration: ${durationMs}ms`); + + const vm = await createBenchVm(); + try { + await prewarmConcurrencyVm(vm); + await writeGuestProgram(vm, PROGRAM_PATH, fsWriteSmallLoopProgram()); + const rows: ConcurrencyRow[] = []; + let baselineOpsPerSec: number | undefined; + for (const n of counts) { + const loops = await Promise.all( + Array.from({ length: n }, (_, index) => + runGuestJsonProgram(vm, PROGRAM_PATH, { + BENCH_DURATION_MS: String(durationMs), + BENCH_PROCESS_INDEX: String(index), + }), + ), + ); + const participants = await Promise.all( + loops.map((loop, index) => participantFromLoop(index, vm, loop)), + ); + const provisional = buildConcurrencyRow(n, durationMs, participants, 1); + if (baselineOpsPerSec === undefined) { + baselineOpsPerSec = provisional.aggregateOpsPerSec; + } + const row = buildConcurrencyRow(n, durationMs, participants, baselineOpsPerSec); + if (n === 1) { + assertMatrixBallpark("fs_write_small", row.meanP50Ms, { + multiplier: 5, + fallbackCeilingMs: 5, + }); + } + rows.push(row); + console.error( + ` N=${n}: ops/s=${row.aggregateOpsPerSec} mean.p95=${row.meanP95Ms}ms scaling=${row.scaling.measuredOfIdeal}x ideal`, + ); + } + + printTable( + ["N", "ops/s", "mean p50", "mean p95", "max p95", "vs N=1", "of ideal"], + rows.map((row) => [ + row.n, + row.aggregateOpsPerSec, + `${row.meanP50Ms}ms`, + `${row.meanP95Ms}ms`, + `${row.maxP95Ms}ms`, + `${row.scaling.throughputVsN1}x`, + `${row.scaling.measuredOfIdeal}x`, + ]), + ); + + console.log( + JSON.stringify( + { + benchmark: "concurrent-processes", + ...provenance, + hardware, + op: "fs_write_small", + counts, + durationMs, + rows, + vmResourceSnapshot: await vm.getResourceSnapshot(), + regressionRows: concurrencyRegressionRows("concurrent-processes", rows), + }, + null, + 2, + ), + ); + } finally { + await vm.dispose(); + } +} + +main().catch((error) => { + console.error(error); + process.exit(1); +}); diff --git a/packages/benchmarks/src/focused/interference.bench.ts b/packages/benchmarks/src/focused/interference.bench.ts new file mode 100644 index 000000000..f85d9f628 --- /dev/null +++ b/packages/benchmarks/src/focused/interference.bench.ts @@ -0,0 +1,153 @@ +import { createBenchVm, formatSidecarProvenance, type BenchVm } from "../lib/vm.js"; +import { getHardware, printTable, round, stats, type Stats } from "../lib/perf-utils.js"; +import { + assertMatrixBallpark, + benchmarkProvenance, + busyInterferenceProgram, + envNumber, + fsWriteSmallLoopProgram, + participantFromLoop, + prewarmConcurrencyVm, + runGuestJsonProgram, + writeGuestProgram, + type RegressionRow, +} from "./concurrency-common.js"; + +const PROBE_PROGRAM_PATH = "/tmp/focused-interference-probe.mjs"; +const BUSY_PROGRAM_PATH = "/tmp/focused-interference-busy.mjs"; + +interface ProbeCase { + mode: "idle" | "busy"; + ops: number; + opsPerSec: number; + durationMs: number; + latencyMs: Stats; + rawSampleCount: number; + busyOps?: number; +} + +async function runProbe(vm: BenchVm, durationMs: number, mode: ProbeCase["mode"]): Promise { + const loop = await runGuestJsonProgram(vm, PROBE_PROGRAM_PATH, { + BENCH_DURATION_MS: String(durationMs), + BENCH_PROCESS_INDEX: mode, + }); + const participant = await participantFromLoop(0, vm, loop); + return { + mode, + ops: participant.ops, + opsPerSec: participant.opsPerSec, + durationMs: participant.durationMs, + latencyMs: participant.latencyMs, + rawSampleCount: participant.rawSampleCount, + }; +} + +async function main(): Promise { + const durationMs = envNumber("BENCH_INTERFERENCE_DURATION_MS", 5_000); + const busyDurationMs = envNumber("BENCH_INTERFERENCE_BUSY_DURATION_MS", durationMs + 1_000); + const hardware = getHardware(); + const provenance = benchmarkProvenance(); + console.error("=== Interference Focused Benchmark ==="); + console.error(`CPU: ${hardware.cpu}`); + console.error(`RAM: ${hardware.ram} | Node: ${hardware.node}`); + console.error(formatSidecarProvenance(provenance.sidecar)); + console.error(`Probe duration: ${durationMs}ms | busy duration: ${busyDurationMs}ms`); + + const probeVm = await createBenchVm(); + const busyVm = await createBenchVm(); + try { + await Promise.all([prewarmConcurrencyVm(probeVm), prewarmConcurrencyVm(busyVm)]); + await Promise.all([ + writeGuestProgram(probeVm, PROBE_PROGRAM_PATH, fsWriteSmallLoopProgram()), + writeGuestProgram(busyVm, BUSY_PROGRAM_PATH, busyInterferenceProgram()), + ]); + + const idle = await runProbe(probeVm, durationMs, "idle"); + assertMatrixBallpark("fs_write_small", idle.latencyMs.p50, { + multiplier: 5, + fallbackCeilingMs: 5, + }); + + const busyPromise = runGuestJsonProgram(busyVm, BUSY_PROGRAM_PATH, { + BENCH_DURATION_MS: String(busyDurationMs), + }); + const busy = await runProbe(probeVm, durationMs, "busy"); + const busyLoop = await busyPromise; + busy.busyOps = busyLoop.ops; + + const interferenceTax = { + p50: round(busy.latencyMs.p50 / idle.latencyMs.p50, 2), + p95: round(busy.latencyMs.p95 / idle.latencyMs.p95, 2), + }; + const regressionRows: RegressionRow[] = [ + { + rowKey: "interference.idle.fs_write_small_p50_ms", + metric: "idleP50Ms", + value: idle.latencyMs.p50, + unit: "ms", + }, + { + rowKey: "interference.busy.fs_write_small_p50_ms", + metric: "busyP50Ms", + value: busy.latencyMs.p50, + unit: "ms", + }, + { + rowKey: "interference.fs_write_small_p50_tax", + metric: "interferenceTaxP50", + value: interferenceTax.p50, + unit: "ratio", + }, + { + rowKey: "interference.fs_write_small_p95_tax", + metric: "interferenceTaxP95", + value: interferenceTax.p95, + unit: "ratio", + }, + ]; + + printTable( + ["mode", "ops/s", "p50", "p95", "samples", "busy ops"], + [idle, busy].map((row) => [ + row.mode, + row.opsPerSec, + `${row.latencyMs.p50}ms`, + `${row.latencyMs.p95}ms`, + row.rawSampleCount, + row.busyOps ?? "n/a", + ]), + ); + console.error( + ` interferenceTax: p50=${interferenceTax.p50}x p95=${interferenceTax.p95}x`, + ); + + console.log( + JSON.stringify( + { + benchmark: "interference", + ...provenance, + hardware, + probeOp: "fs_write_small", + busyOp: "cpu_spin_plus_fs_write_churn", + durationMs, + busyDurationMs, + idle, + busy, + interferenceTax, + probeVmResourceSnapshot: await probeVm.getResourceSnapshot(), + busyVmResourceSnapshot: await busyVm.getResourceSnapshot(), + regressionRows, + }, + null, + 2, + ), + ); + } finally { + await Promise.allSettled([probeVm.dispose(), busyVm.dispose()]); + } +} + +main().catch((error) => { + console.error(error); + process.exit(1); +});