From 7dca3844395b377c0ac25f46afb42359bd163dbe Mon Sep 17 00:00:00 2001 From: Tony Deng Date: Fri, 29 May 2026 13:11:07 -0700 Subject: [PATCH 1/5] scope out undici-fetch alternative with configurable pool for connections --- src/lib/undici-fetch.ts | 99 +++++++++----- tests/lib/undici-fetch.test.ts | 121 ++++++++++++++++++ .../scripts/devbox-startup-benchmark.mjs | 91 ++++++++++++- 3 files changed, 272 insertions(+), 39 deletions(-) create mode 100644 tests/lib/undici-fetch.test.ts diff --git a/src/lib/undici-fetch.ts b/src/lib/undici-fetch.ts index db112b416..04ceed0a9 100644 --- a/src/lib/undici-fetch.ts +++ b/src/lib/undici-fetch.ts @@ -4,10 +4,14 @@ * undici is the same engine that powers Node's built-in global `fetch`. * Constructing an `Agent` with `allowH2: true` and passing it as the * per-request `dispatcher` makes requests negotiate HTTP/2 via ALPN, with - * automatic fallback to HTTP/1.1 when the origin doesn't advertise h2. undici - * returns a standard WHATWG `Response`, so the rest of core.ts — which only - * touches standard Response members (`.status`, `.ok`, `.headers`, `.text()`, - * `.json()`, `.body`, `.arrayBuffer()`, `.blob()`) — is unchanged. + * automatic fallback to HTTP/1.1 when the origin doesn't advertise h2. + * + * This adapter intentionally uses `undici.request` rather than `undici.fetch`. + * In undici 6.x, the fetch path opens too many H2 sessions under high + * concurrency. `request` respects the bounded dispatcher pool, then we wrap its + * output in Undici's WHATWG `Response` so core.ts can keep using standard + * Response members (`.status`, `.ok`, `.headers`, `.text()`, `.json()`, + * `.body`, `.arrayBuffer()`, `.blob()`). * * Unlike the previous got@14 approach, undici is dual CJS/ESM and `require`-able * from this `"type": "commonjs"` package, so there is no dynamic-import hack and @@ -17,7 +21,7 @@ * regeneration; the only generated file touched is the one-line wiring change * in src/_shims/node-runtime.ts. */ -import { Agent, fetch as undiciFetchImpl } from 'undici'; +import { Agent, Headers, Response, request as undiciRequest } from 'undici'; import { Readable } from 'node:stream'; import { MultipartBody } from '../_shims/MultipartBody'; import { type Fetch } from '../core'; @@ -27,55 +31,82 @@ import { type Fetch } from '../core'; // transparently falls back to HTTP/1.1 when the origin doesn't offer h2. const h2Dispatcher = new Agent({ allowH2: true, + connections: 4, + maxConcurrentStreams: 64, + pipelining: 64, keepAliveTimeout: 10 * 60 * 1000, keepAliveMaxTimeout: 10 * 60 * 1000, }); -type NormalizedBody = { body: any; isStream: boolean }; - // Map the body shapes core.ts produces (string | Buffer/ArrayBufferView | -// Node Readable for multipart | null) onto a valid undici BodyInit. A Node -// Readable must become a Web ReadableStream and requires `duplex: 'half'`. -function normalizeBody(body: unknown): NormalizedBody { - if (body == null) return { body: undefined, isStream: false }; - if (typeof body === 'string') return { body, isStream: false }; - if (Buffer.isBuffer(body)) return { body, isStream: false }; +// Node Readable for multipart | null) onto a valid undici.request body. +function normalizeBody(body: unknown): any { + if (body == null) return undefined; + if (typeof body === 'string') return body; + if (Buffer.isBuffer(body)) return body; // Unwrap MultipartBody (wraps a Readable in `.body`). core.ts already unwraps // it, but handle it defensively. if (body instanceof MultipartBody) return normalizeBody((body as MultipartBody).body); - if (body instanceof Readable) { - return { body: Readable.toWeb(body) as any, isStream: true }; - } + if (body instanceof Readable) return body; // ArrayBufferView (Uint8Array, DataView, typed arrays) and ArrayBuffer are - // valid BodyInit as-is / after a Buffer wrap. - if (ArrayBuffer.isView(body)) return { body, isStream: false }; - if (body instanceof ArrayBuffer) return { body: Buffer.from(body), isStream: false }; - return { body: String(body), isStream: false }; + // valid undici bodies as-is / after a Buffer wrap. + if (ArrayBuffer.isView(body)) return body; + if (body instanceof ArrayBuffer) return Buffer.from(body); + return String(body); +} + +function toResponseHeaders(headers: Record): Headers { + const responseHeaders = new Headers(); + for (const [name, value] of Object.entries(headers)) { + if (value === undefined) continue; + if (Array.isArray(value)) { + for (const item of value) responseHeaders.append(name, item); + } else { + responseHeaders.append(name, value); + } + } + return responseHeaders; +} + +function statusMustNotHaveBody(status: number): boolean { + return status === 204 || status === 205 || status === 304; } export const undiciFetch: Fetch = async (url, init) => { // core.ts injects a node-fetch-style `agent` in RequestInit; undici uses a - // `dispatcher` instead, so drop `agent`. Pull `signal` and `body` out to - // normalize them; pass everything else (method, headers, redirect, …) through. - const { agent: _ignoredAgent, body: rawBody, signal, ...rest } = (init ?? {}) as any; - - const { body, isStream } = normalizeBody(rawBody); + // `dispatcher` instead, so drop `agent`. Drop fetch-only fields that + // undici.request doesn't understand, and map redirects onto maxRedirections. + const { + agent: _ignoredAgent, + body: rawBody, + duplex: _ignoredDuplex, + redirect, + signal, + ...rest + } = (init ?? {}) as any; - const undiciInit: any = { + const result = await undiciRequest(url as any, { ...rest, - body, + body: normalizeBody(rawBody), + maxRedirections: redirect === 'manual' || redirect === 'error' ? 0 : 20, // core.ts passes a standard web AbortSignal (from `new AbortController()`), // which undici accepts directly. signal: signal ?? undefined, dispatcher: h2Dispatcher, - }; - // A streamed request body requires the half-duplex hint or undici throws. - if (isStream) undiciInit.duplex = 'half'; + }); + + const responseBody = statusMustNotHaveBody(result.statusCode) ? null : result.body; + if (responseBody === null) await result.body.dump(); + + const response = new Response(responseBody, { + status: result.statusCode, + headers: toResponseHeaders(result.headers), + }); + Object.defineProperty(response, 'url', { value: String(url) }); - // undici returns a genuine WHATWG Response. The SDK is typed against the - // node-fetch Response, so cast through `any` (the prior got adapter did the - // same); at runtime core.ts only uses standard Response members. - return (await undiciFetchImpl(url as any, undiciInit)) as any; + // The SDK is typed against the node-fetch Response, so cast through `any`; + // at runtime core.ts only uses standard Response members. + return response as any; }; export default undiciFetch; diff --git a/tests/lib/undici-fetch.test.ts b/tests/lib/undici-fetch.test.ts new file mode 100644 index 000000000..97c6885ca --- /dev/null +++ b/tests/lib/undici-fetch.test.ts @@ -0,0 +1,121 @@ +import http from 'node:http'; +import { Readable } from 'node:stream'; +import { Runloop } from '@runloop/api-client'; +import { Stream } from '@runloop/api-client/streaming'; +import { undiciFetch } from '@runloop/api-client/lib/undici-fetch'; +import { MultipartBody } from '@runloop/api-client/_shims/MultipartBody'; + +describe('undiciFetch', () => { + let server: http.Server; + let baseURL: string; + const requests: Array<{ method: string; url: string; body: string }> = []; + + beforeAll( + () => + new Promise((resolve) => { + server = http.createServer((req, res) => { + const chunks: Buffer[] = []; + req.on('data', (chunk) => chunks.push(Buffer.from(chunk))); + req.on('end', () => { + const body = Buffer.concat(chunks).toString('utf8'); + requests.push({ method: req.method ?? '', url: req.url ?? '', body }); + + if (req.url === '/json') { + res.writeHead(200, { 'content-type': 'application/json' }); + res.end(JSON.stringify({ ok: true })); + return; + } + + if (req.url === '/error') { + res.writeHead(401, { 'content-type': 'application/json' }); + res.end(JSON.stringify({ error: { message: 'invalid token' } })); + return; + } + + if (req.url === '/sse') { + res.writeHead(200, { 'content-type': 'text/event-stream' }); + res.write('data: {"value":1}\n\n'); + res.end(); + return; + } + + res.writeHead(200, { 'content-type': 'application/json' }); + res.end(JSON.stringify({ body })); + }); + + server.listen(0, '127.0.0.1', () => { + const address = server.address(); + if (address && typeof address === 'object') { + baseURL = `http://127.0.0.1:${address.port}`; + } + resolve(); + }); + }); + }), + ); + + afterAll( + () => + new Promise((resolve, reject) => { + server.close((error) => (error ? reject(error) : resolve())); + }), + ); + + beforeEach(() => { + requests.length = 0; + }); + + test('parses a JSON response through SDK core', async () => { + const client = new Runloop({ + bearerToken: 'test-token', + baseURL, + maxRetries: 0, + http2: true, + }); + + await expect(client.get('/json')).resolves.toEqual({ ok: true }); + }); + + test('rejects error responses with a readable body through SDK core', async () => { + const client = new Runloop({ + bearerToken: 'test-token', + baseURL, + maxRetries: 0, + http2: true, + }); + + await expect(client.get('/error')).rejects.toMatchObject({ + status: 401, + message: expect.stringMatching(/invalid token/i), + }); + }); + + test('keeps streamed SSE responses consumable', async () => { + const response = await undiciFetch(`${baseURL}/sse`); + const stream = Stream.fromSSEResponse<{ value: number }>(response as any, new AbortController()); + + const events = []; + for await (const event of stream) { + events.push(event); + } + + expect(events).toEqual([{ value: 1 }]); + }); + + test.each([ + ['string', 'hello'], + ['buffer', Buffer.from('hello')], + ['array buffer', new TextEncoder().encode('hello').buffer], + ['typed array', new Uint8Array(Buffer.from('hello'))], + ['readable', Readable.from(['hello'])], + ['multipart body', new MultipartBody(Readable.from(['hello']))], + ])('normalizes %s request bodies', async (_label, body) => { + const response = await undiciFetch(`${baseURL}/echo`, { + method: 'POST', + body: body as any, + agent: { shouldBeIgnored: true } as any, + } as any); + + await expect(response.json()).resolves.toEqual({ body: 'hello' }); + }); +}); diff --git a/tests/smoketests/scripts/devbox-startup-benchmark.mjs b/tests/smoketests/scripts/devbox-startup-benchmark.mjs index 6364fe94d..31d656e19 100644 --- a/tests/smoketests/scripts/devbox-startup-benchmark.mjs +++ b/tests/smoketests/scripts/devbox-startup-benchmark.mjs @@ -8,6 +8,7 @@ */ import fs from 'node:fs/promises'; import path from 'node:path'; +import diagnostics_channel from 'node:diagnostics_channel'; import { createRequire } from 'node:module'; import { performance } from 'node:perf_hooks'; @@ -29,7 +30,7 @@ const COMMAND_TIMEOUT_MS = 2 * 60 * 1000; const SHUTDOWN_TIMEOUT_MS = 2 * 60 * 1000; const apiKey = process.env.RUNLOOP_API_KEY; -const baseURL = process.env.RUNLOOP_BASE_URL; +const baseURL = "https://api.runloop.ai"; const devboxCount = parsePositiveInteger(process.env.RUNLOOP_E2E_DEVBOX_COUNT, DEFAULT_DEVBOX_COUNT); const resultsPath = process.env.RUNLOOP_E2E_RESULTS_PATH ?? defaultResultsPath(); @@ -81,6 +82,7 @@ const hasWorkflowFailures = passResults.some((pass) => pass.workflowFailureCount process.exit(hasWorkflowFailures ? 1 : 0); async function runTransportPass(transport) { + const diagnostics = createUndiciConnectionDiagnostics(); const client = new Runloop({ bearerToken: apiKey, baseURL, @@ -91,10 +93,17 @@ async function runTransportPass(transport) { console.log(`\nStarting ${transport.name} pass with ${devboxCount} concurrent devboxes`); const wallStart = performance.now(); - const settled = await Promise.allSettled( - Array.from({ length: devboxCount }, (_, index) => runDevboxWorkflow(client, transport.name, index)), - ); - const wallTimeMs = performance.now() - wallStart; + diagnostics.start(); + let settled; + let wallTimeMs; + try { + settled = await Promise.allSettled( + Array.from({ length: devboxCount }, (_, index) => runDevboxWorkflow(client, transport.name, index)), + ); + wallTimeMs = performance.now() - wallStart; + } finally { + diagnostics.stop(); + } const records = settled.map((result, index) => { if (result.status === 'fulfilled') return result.value; @@ -115,6 +124,7 @@ async function runTransportPass(transport) { const workflowFailureCount = records.filter((record) => record.error).length; const shutdownFailureCount = records.filter((record) => record.shutdownStatus === 'failed').length; const startupStats = summarizeDurations(records.map((record) => record.startupDurationMs)); + const connectionDiagnostics = diagnostics.summary(); const summary = { transport: transport.name, requested: devboxCount, @@ -123,6 +133,7 @@ async function runTransportPass(transport) { shutdownFailureCount, startup: startupStats, wallTimeMs, + connectionDiagnostics, }; printPassSummary(summary); @@ -136,6 +147,7 @@ async function runTransportPass(transport) { shutdownFailureCount, startupStats, wallTimeMs, + connectionDiagnostics, summary, records, }; @@ -244,8 +256,27 @@ function printPassSummary(summary) { maxMs: round(summary.startup.max), avgMs: round(summary.startup.avg), wallTimeMs: round(summary.wallTimeMs), + undiciConnections: summary.connectionDiagnostics.connectionCount, + alpnH2: summary.connectionDiagnostics.alpnCounts.h2 ?? 0, + alpnHttp1: summary.connectionDiagnostics.alpnCounts['http/1.1'] ?? 0, + h2Fallbacks: summary.connectionDiagnostics.h2FallbackCount, + uniqueLocalPorts: summary.connectionDiagnostics.uniqueLocalPorts.length, }, ]); + + if (summary.connectionDiagnostics.connectionCount > 0) { + console.log(`${summary.transport} undici connection diagnostics`); + console.table([ + { + connections: summary.connectionDiagnostics.connectionCount, + alpn: JSON.stringify(summary.connectionDiagnostics.alpnCounts), + h2: summary.connectionDiagnostics.h2ConnectionCount, + h1Fallbacks: summary.connectionDiagnostics.h2FallbackCount, + uniqueLocalPorts: summary.connectionDiagnostics.uniqueLocalPorts.length, + localPorts: summary.connectionDiagnostics.uniqueLocalPorts.join(', '), + }, + ]); + } } function printComparison(results) { @@ -319,6 +350,56 @@ function defaultResultsPath() { return path.join('tmp', `devbox-startup-benchmark-${timestamp}.json`); } +function createUndiciConnectionDiagnostics() { + const events = []; + const onConnected = (message) => { + const socket = message?.socket; + const rawAlpn = socket?.alpnProtocol; + const alpnProtocol = + typeof rawAlpn === 'string' && rawAlpn.length > 0 ? rawAlpn + : rawAlpn === false ? 'http/1.1' + : 'unknown'; + + events.push({ + alpnProtocol, + localPort: socket?.localPort ?? null, + remoteAddress: socket?.remoteAddress ?? null, + remotePort: socket?.remotePort ?? null, + encrypted: Boolean(socket?.encrypted), + }); + }; + + return { + start() { + diagnostics_channel.subscribe('undici:client:connected', onConnected); + }, + stop() { + diagnostics_channel.unsubscribe('undici:client:connected', onConnected); + }, + summary() { + const alpnCounts = {}; + const localPorts = new Set(); + for (const event of events) { + alpnCounts[event.alpnProtocol] = (alpnCounts[event.alpnProtocol] ?? 0) + 1; + if (event.localPort != null) localPorts.add(event.localPort); + } + + const h2ConnectionCount = alpnCounts.h2 ?? 0; + const http1ConnectionCount = alpnCounts['http/1.1'] ?? 0; + + return { + connectionCount: events.length, + alpnCounts, + h2ConnectionCount, + http1ConnectionCount, + h2FallbackCount: events.length - h2ConnectionCount, + uniqueLocalPorts: [...localPorts].sort((a, b) => a - b), + events, + }; + }, + }; +} + function serializeError(error) { if (!error) return null; return { From 03c03a5e4bd741c3572a20642b50b85b5f86429f Mon Sep 17 00:00:00 2001 From: Tony Deng Date: Fri, 29 May 2026 13:22:52 -0700 Subject: [PATCH 2/5] update --- src/lib/undici-fetch.ts | 358 +++++++++++++++++++++++++++++---- tests/lib/undici-fetch.test.ts | 15 +- 2 files changed, 329 insertions(+), 44 deletions(-) diff --git a/src/lib/undici-fetch.ts b/src/lib/undici-fetch.ts index 04ceed0a9..81ee28bbf 100644 --- a/src/lib/undici-fetch.ts +++ b/src/lib/undici-fetch.ts @@ -1,17 +1,12 @@ /** - * A fetch-compatible adapter backed by undici's HTTP/2 support. + * A fetch-compatible adapter backed by a bounded HTTP/2 transport. * - * undici is the same engine that powers Node's built-in global `fetch`. - * Constructing an `Agent` with `allowH2: true` and passing it as the - * per-request `dispatcher` makes requests negotiate HTTP/2 via ALPN, with - * automatic fallback to HTTP/1.1 when the origin doesn't advertise h2. - * - * This adapter intentionally uses `undici.request` rather than `undici.fetch`. - * In undici 6.x, the fetch path opens too many H2 sessions under high - * concurrency. `request` respects the bounded dispatcher pool, then we wrap its - * output in Undici's WHATWG `Response` so core.ts can keep using standard - * Response members (`.status`, `.ok`, `.headers`, `.text()`, `.json()`, - * `.body`, `.arrayBuffer()`, `.blob()`). + * The HTTPS/H2 path uses node:http2 directly so high-concurrency SDK calls share + * a few TLS sessions with many streams. undici 6.26.0's fetch/request H2 paths + * can still assert-crash under this workload, so undici is kept for the + * WHATWG `Response` implementation and HTTP/1.1 fallback only. The rest of + * core.ts keeps using standard Response members (`.status`, `.ok`, `.headers`, + * `.text()`, `.json()`, `.body`, `.arrayBuffer()`, `.blob()`). * * Unlike the previous got@14 approach, undici is dual CJS/ESM and `require`-able * from this `"type": "commonjs"` package, so there is no dynamic-import hack and @@ -22,22 +17,29 @@ * in src/_shims/node-runtime.ts. */ import { Agent, Headers, Response, request as undiciRequest } from 'undici'; +import diagnosticsChannel from 'node:diagnostics_channel'; +import http2 from 'node:http2'; import { Readable } from 'node:stream'; import { MultipartBody } from '../_shims/MultipartBody'; import { type Fetch } from '../core'; -// One module-scoped dispatcher, reused across requests: this is the HTTP/2 -// transport, with keep-alive. `allowH2` negotiates h2 over TLS via ALPN and -// transparently falls back to HTTP/1.1 when the origin doesn't offer h2. -const h2Dispatcher = new Agent({ - allowH2: true, +const MAX_H2_SESSIONS = 4; +const MAX_H2_STREAMS_PER_SESSION = 64; +const KEEP_ALIVE_TIMEOUT_MS = 10 * 60 * 1000; + +// HTTP/1.1 fallback for non-HTTPS origins and HTTPS origins that don't +// negotiate h2. The H2 path below uses node:http2 directly because undici 6.26.0 +// can assert-crash under high-concurrency H2 request multiplexing. +const h1Dispatcher = new Agent({ + allowH2: false, connections: 4, - maxConcurrentStreams: 64, - pipelining: 64, - keepAliveTimeout: 10 * 60 * 1000, - keepAliveMaxTimeout: 10 * 60 * 1000, + keepAliveTimeout: KEEP_ALIVE_TIMEOUT_MS, + keepAliveMaxTimeout: KEEP_ALIVE_TIMEOUT_MS, }); +const connectedChannel = diagnosticsChannel.channel('undici:client:connected'); +const pools = new Map(); + // Map the body shapes core.ts produces (string | Buffer/ArrayBufferView | // Node Readable for multipart | null) onto a valid undici.request body. function normalizeBody(body: unknown): any { @@ -72,6 +74,291 @@ function statusMustNotHaveBody(status: number): boolean { return status === 204 || status === 205 || status === 304; } +function abortError(): Error { + const error = new Error('The operation was aborted'); + error.name = 'AbortError'; + return error; +} + +function originFor(url: URL): string { + return `${url.protocol}//${url.host}`; +} + +function pathFor(url: URL): string { + return `${url.pathname}${url.search}`; +} + +function filterH2RequestHeaders(headers: Record | undefined): Record { + const filtered: Record = {}; + if (!headers) return filtered; + + for (const [rawName, rawValue] of Object.entries(headers)) { + if (rawValue == null) continue; + const name = rawName.toLowerCase(); + if ( + name === 'connection' || + name === 'keep-alive' || + name === 'proxy-connection' || + name === 'transfer-encoding' || + name === 'upgrade' || + name === 'host' + ) { + continue; + } + filtered[name] = String(rawValue); + } + return filtered; +} + +function toH2ResponseHeaders(headers: http2.IncomingHttpHeaders): Headers { + const responseHeaders = new Headers(); + for (const [name, value] of Object.entries(headers)) { + if (name.startsWith(':') || value === undefined) continue; + if (Array.isArray(value)) { + for (const item of value) responseHeaders.append(name, String(item)); + } else { + responseHeaders.append(name, String(value)); + } + } + return responseHeaders; +} + +function getPool(url: URL): H2Pool { + const origin = originFor(url); + let pool = pools.get(origin); + if (!pool) { + pool = new H2Pool(origin); + pools.set(origin, pool); + } + return pool; +} + +type H2SessionEntry = { + session: http2.ClientHttp2Session; + activeStreams: number; + ready: Promise; + alpnProtocol: string | false | undefined; + closed: boolean; + idleTimer: NodeJS.Timeout | undefined; +}; + +class H2Pool { + private sessions: H2SessionEntry[] = []; + private waiters: Array<(entry: H2SessionEntry) => void> = []; + + constructor(private readonly origin: string) {} + + async request(url: URL, init: any): Promise { + const entry = await this.acquire(); + try { + await entry.ready; + } catch (error) { + this.release(entry); + throw error; + } + + if (entry.alpnProtocol !== 'h2') { + this.release(entry); + entry.session.close(); + throw new Error(`HTTP/2 was not negotiated; ALPN=${String(entry.alpnProtocol || 'none')}`); + } + + return this.dispatch(entry, url, init); + } + + private acquire(): Promise { + const existing = this.sessions.find( + (entry) => !entry.closed && entry.activeStreams < MAX_H2_STREAMS_PER_SESSION, + ); + if (existing) { + if (existing.idleTimer) { + clearTimeout(existing.idleTimer); + existing.idleTimer = undefined; + } + existing.session.ref?.(); + existing.activeStreams++; + return Promise.resolve(existing); + } + + if (this.sessions.filter((entry) => !entry.closed).length < MAX_H2_SESSIONS) { + const created = this.createSession(); + created.session.ref?.(); + created.activeStreams++; + return Promise.resolve(created); + } + + return new Promise((resolve) => { + this.waiters.push(resolve); + }); + } + + private createSession(): H2SessionEntry { + const session = http2.connect(this.origin, { + ALPNProtocols: ['h2', 'http/1.1'], + }); + + const entry: H2SessionEntry = { + session, + activeStreams: 0, + ready: Promise.resolve(), + alpnProtocol: undefined, + closed: false, + idleTimer: undefined, + }; + + entry.ready = new Promise((resolve, reject) => { + session.once('connect', () => { + entry.alpnProtocol = (session.socket as any).alpnProtocol; + connectedChannel.publish({ socket: session.socket }); + resolve(); + }); + session.once('error', reject); + }); + + session.once('close', () => { + entry.closed = true; + if (entry.idleTimer) clearTimeout(entry.idleTimer); + this.sessions = this.sessions.filter((item) => item !== entry); + this.drainWaiters(); + }); + + session.on('error', () => { + entry.closed = true; + }); + + this.sessions.push(entry); + return entry; + } + + private dispatch(entry: H2SessionEntry, url: URL, init: any): Promise { + return new Promise((resolve, reject) => { + if (init.signal?.aborted) { + this.release(entry); + reject(abortError()); + return; + } + + const body = normalizeBody(init.body); + const requestHeaders: http2.OutgoingHttpHeaders = { + ...filterH2RequestHeaders(init.headers), + [http2.constants.HTTP2_HEADER_METHOD]: init.method ?? 'GET', + [http2.constants.HTTP2_HEADER_SCHEME]: url.protocol.slice(0, -1), + [http2.constants.HTTP2_HEADER_AUTHORITY]: url.host, + [http2.constants.HTTP2_HEADER_PATH]: pathFor(url), + }; + + const stream = entry.session.request(requestHeaders, { endStream: body === undefined }); + let settled = false; + let released = false; + + const releaseOnce = () => { + if (released) return; + released = true; + this.release(entry); + }; + + const rejectOnce = (error: Error) => { + if (settled) return; + settled = true; + releaseOnce(); + reject(error); + }; + + const onAbort = () => { + stream.close(http2.constants.NGHTTP2_CANCEL); + rejectOnce(abortError()); + }; + + init.signal?.addEventListener('abort', onAbort, { once: true }); + + stream.once('response', (headers) => { + if (settled) return; + settled = true; + + const status = Number(headers[http2.constants.HTTP2_HEADER_STATUS] ?? 0); + const responseBody = statusMustNotHaveBody(status) || init.method === 'HEAD' ? null : stream; + if (responseBody === null) stream.resume(); + + const response = new Response(responseBody as any, { + status, + headers: toH2ResponseHeaders(headers), + }); + Object.defineProperty(response, 'url', { value: String(url) }); + + stream.once('end', releaseOnce); + stream.once('close', releaseOnce); + stream.once('error', releaseOnce); + resolve(response); + }); + + stream.once('error', (error) => { + rejectOnce(error); + }); + + stream.once('close', () => { + init.signal?.removeEventListener('abort', onAbort); + if (!settled) rejectOnce(new Error('HTTP/2 stream closed before response headers')); + }); + + if (body instanceof Readable) { + body.once('error', (error) => stream.destroy(error)); + body.pipe(stream); + } else if (body !== undefined) { + stream.end(body); + } + }); + } + + private release(entry: H2SessionEntry) { + entry.activeStreams = Math.max(0, entry.activeStreams - 1); + this.drainWaiters(); + + if (entry.activeStreams === 0 && !entry.closed && !entry.idleTimer) { + entry.session.unref?.(); + entry.idleTimer = setTimeout(() => { + entry.session.close(); + }, KEEP_ALIVE_TIMEOUT_MS); + entry.idleTimer.unref?.(); + } + } + + private drainWaiters() { + while (this.waiters.length > 0) { + const entry = this.sessions.find( + (candidate) => !candidate.closed && candidate.activeStreams < MAX_H2_STREAMS_PER_SESSION, + ); + if (!entry) return; + + if (entry.idleTimer) { + clearTimeout(entry.idleTimer); + entry.idleTimer = undefined; + } + entry.session.ref?.(); + entry.activeStreams++; + const resolve = this.waiters.shift(); + resolve?.(entry); + } + } +} + +async function undiciFallbackFetch(url: URL, init: any): Promise { + const result = await undiciRequest(url as any, { + ...init, + body: normalizeBody(init.body), + dispatcher: h1Dispatcher, + }); + + const responseBody = statusMustNotHaveBody(result.statusCode) || init.method === 'HEAD' ? null : result.body; + if (responseBody === null) await result.body.dump(); + + const response = new Response(responseBody, { + status: result.statusCode, + headers: toResponseHeaders(result.headers), + }); + Object.defineProperty(response, 'url', { value: String(url) }); + return response; +} + export const undiciFetch: Fetch = async (url, init) => { // core.ts injects a node-fetch-style `agent` in RequestInit; undici uses a // `dispatcher` instead, so drop `agent`. Drop fetch-only fields that @@ -85,28 +372,25 @@ export const undiciFetch: Fetch = async (url, init) => { ...rest } = (init ?? {}) as any; - const result = await undiciRequest(url as any, { + const requestURL = new URL(String(url)); + const requestInit = { ...rest, - body: normalizeBody(rawBody), + body: rawBody, maxRedirections: redirect === 'manual' || redirect === 'error' ? 0 : 20, // core.ts passes a standard web AbortSignal (from `new AbortController()`), - // which undici accepts directly. signal: signal ?? undefined, - dispatcher: h2Dispatcher, - }); + }; - const responseBody = statusMustNotHaveBody(result.statusCode) ? null : result.body; - if (responseBody === null) await result.body.dump(); - - const response = new Response(responseBody, { - status: result.statusCode, - headers: toResponseHeaders(result.headers), - }); - Object.defineProperty(response, 'url', { value: String(url) }); + if (requestURL.protocol !== 'https:') return undiciFallbackFetch(requestURL, requestInit); - // The SDK is typed against the node-fetch Response, so cast through `any`; - // at runtime core.ts only uses standard Response members. - return response as any; + try { + return (await getPool(requestURL).request(requestURL, requestInit)) as any; + } catch (error) { + if (error instanceof Error && /^HTTP\/2 was not negotiated/.test(error.message)) { + return undiciFallbackFetch(requestURL, requestInit); + } + throw error; + } }; export default undiciFetch; diff --git a/tests/lib/undici-fetch.test.ts b/tests/lib/undici-fetch.test.ts index 97c6885ca..6cb7b5a34 100644 --- a/tests/lib/undici-fetch.test.ts +++ b/tests/lib/undici-fetch.test.ts @@ -42,14 +42,14 @@ describe('undiciFetch', () => { res.writeHead(200, { 'content-type': 'application/json' }); res.end(JSON.stringify({ body })); }); + }); - server.listen(0, '127.0.0.1', () => { - const address = server.address(); - if (address && typeof address === 'object') { - baseURL = `http://127.0.0.1:${address.port}`; - } - resolve(); - }); + server.listen(0, '127.0.0.1', () => { + const address = server.address(); + if (address && typeof address === 'object') { + baseURL = `http://127.0.0.1:${address.port}`; + } + resolve(); }); }), ); @@ -106,6 +106,7 @@ describe('undiciFetch', () => { ['string', 'hello'], ['buffer', Buffer.from('hello')], ['array buffer', new TextEncoder().encode('hello').buffer], + ['data view', new DataView(new TextEncoder().encode('hello').buffer)], ['typed array', new Uint8Array(Buffer.from('hello'))], ['readable', Readable.from(['hello'])], ['multipart body', new MultipartBody(Readable.from(['hello']))], From 6de22fa9ad43a844b417c880c1c4105c7c1f0e44 Mon Sep 17 00:00:00 2001 From: Tony Deng Date: Fri, 29 May 2026 13:29:22 -0700 Subject: [PATCH 3/5] Push health tes --- package.json | 1 + .../scripts/health-endpoint-benchmark.mjs | 343 ++++++++++++++++++ 2 files changed, 344 insertions(+) create mode 100644 tests/smoketests/scripts/health-endpoint-benchmark.mjs diff --git a/package.json b/package.json index 4024abed4..cf151ab2a 100644 --- a/package.json +++ b/package.json @@ -20,6 +20,7 @@ "test": "./scripts/test", "test:smoke": "RUN_SMOKETESTS=1 jest --verbose tests/smoketests", "test:e2e:devbox-startup": "yarn build && node tests/smoketests/scripts/devbox-startup-benchmark.mjs", + "test:e2e:health": "yarn build && node tests/smoketests/scripts/health-endpoint-benchmark.mjs", "test:examples": "RUN_SMOKETESTS=1 jest --verbose tests/smoketests/examples/examples.test.ts", "test:objects": "RUN_SMOKETESTS=1 jest --config jest.config.objects.js --verbose", "test:objects-coverage": "RUN_SMOKETESTS=1 jest --verbose --config jest.config.objects.js --coverage --coverageReporters=text --coverageReporters=json-summary", diff --git a/tests/smoketests/scripts/health-endpoint-benchmark.mjs b/tests/smoketests/scripts/health-endpoint-benchmark.mjs new file mode 100644 index 000000000..951ec46a0 --- /dev/null +++ b/tests/smoketests/scripts/health-endpoint-benchmark.mjs @@ -0,0 +1,343 @@ +/** + * Health endpoint benchmark for comparing the SDK HTTP/1.1 and HTTP/2 transports. + * + * Runs OUTSIDE the normal smoke suite. Build first, then run explicitly: + * + * RUNLOOP_API_KEY=... [RUNLOOP_BASE_URL=...] [RUNLOOP_E2E_HEALTH_REQUEST_COUNT=10000] yarn test:e2e:health + */ +import fs from 'node:fs/promises'; +import path from 'node:path'; +import diagnostics_channel from 'node:diagnostics_channel'; +import { createRequire } from 'node:module'; +import { performance } from 'node:perf_hooks'; + +const require = createRequire(import.meta.url); +const distPath = new URL('../../../dist/index.js', import.meta.url).pathname; + +let Runloop; +try { + ({ Runloop } = require(distPath)); +} catch (error) { + console.error(`Failed to load built SDK from ${distPath}. Run \`yarn build\` first.`); + console.error(`${error?.constructor?.name ?? 'Error'}: ${error?.message ?? error}`); + process.exit(2); +} + +const DEFAULT_REQUEST_COUNT = 10_000; + +const apiKey = process.env.RUNLOOP_API_KEY_DEV; +const baseURL = process.env.RUNLOOP_BASE_URL ?? 'https://api.runloop.pro'; +const requestCount = parsePositiveInteger( + process.env.RUNLOOP_E2E_HEALTH_REQUEST_COUNT ?? process.env.RUNLOOP_E2E_DEVBOX_COUNT, + DEFAULT_REQUEST_COUNT, +); +const resultsPath = process.env.RUNLOOP_E2E_RESULTS_PATH ?? defaultResultsPath(); + +if (!apiKey) { + console.error('RUNLOOP_API_KEY is required'); + process.exit(2); +} + +if (requestCount === undefined) { + console.error('RUNLOOP_E2E_HEALTH_REQUEST_COUNT must be a positive integer'); + process.exit(2); +} + +const transports = [ + { name: 'http1', http2: false }, + { name: 'http2', http2: true }, +]; + +const startedAt = new Date().toISOString(); +const passResults = []; + +for (const transport of transports) { + passResults.push(await runTransportPass(transport)); +} + +const endedAt = new Date().toISOString(); +const artifact = { + benchmark: 'health-endpoint-http1-vs-http2', + startedAt, + endedAt, + config: { + requestCount, + baseURL, + endpoint: '/health', + }, + summary: passResults.map(({ records, summary, ...pass }) => pass), + passes: passResults, +}; + +await fs.mkdir(path.dirname(resultsPath), { recursive: true }); +await fs.writeFile(resultsPath, `${JSON.stringify(artifact, null, 2)}\n`, 'utf8'); + +printComparison(passResults); +console.log(`\nWrote results artifact: ${resultsPath}`); + +const hasFailures = passResults.some((pass) => pass.failureCount > 0); +process.exit(hasFailures ? 1 : 0); + +async function runTransportPass(transport) { + const diagnostics = createUndiciConnectionDiagnostics(); + const client = new Runloop({ + bearerToken: apiKey, + baseURL, + timeout: 30_000, + maxRetries: 0, + http2: transport.http2, + }); + + console.log(`\nStarting ${transport.name} pass with ${requestCount} concurrent health checks`); + const wallStart = performance.now(); + diagnostics.start(); + let settled; + let wallTimeMs; + try { + settled = await Promise.allSettled( + Array.from({ length: requestCount }, (_, index) => pingHealthEndpoint(client, transport.name, index)), + ); + wallTimeMs = performance.now() - wallStart; + } finally { + diagnostics.stop(); + } + + const records = settled.map((result, index) => { + if (result.status === 'fulfilled') return result.value; + return { + transport: transport.name, + index, + healthDurationMs: null, + status: null, + contentType: null, + bodySample: null, + error: serializeError(result.reason), + }; + }); + + const failureCount = records.filter((record) => record.error).length; + const healthStats = summarizeDurations(records.map((record) => record.healthDurationMs)); + const connectionDiagnostics = diagnostics.summary(); + const summary = { + transport: transport.name, + requested: requestCount, + successCount: records.length - failureCount, + failureCount, + health: healthStats, + wallTimeMs, + connectionDiagnostics, + }; + + printPassSummary(summary); + + return { + transport: transport.name, + http2: transport.http2, + requested: requestCount, + successCount: summary.successCount, + failureCount, + healthStats, + wallTimeMs, + connectionDiagnostics, + summary, + records, + }; +} + +async function pingHealthEndpoint(client, transport, index) { + const healthStart = performance.now(); + const record = { + transport, + index, + healthDurationMs: null, + status: null, + contentType: null, + bodySample: null, + error: null, + }; + + try { + const response = await client.get('/health').asResponse(); + record.healthDurationMs = performance.now() - healthStart; + record.status = response.status; + record.contentType = response.headers.get('content-type'); + + const body = await response.text(); + record.bodySample = body.length > 200 ? `${body.slice(0, 200)}...` : body; + } catch (error) { + record.healthDurationMs = performance.now() - healthStart; + record.error = serializeError(error); + } + + return record; +} + +function printPassSummary(summary) { + console.log(`\n${summary.transport} summary`); + console.table([ + { + transport: summary.transport, + requested: summary.requested, + successes: summary.successCount, + failures: summary.failureCount, + minMs: round(summary.health.min), + p50Ms: round(summary.health.p50), + p90Ms: round(summary.health.p90), + p95Ms: round(summary.health.p95), + p99Ms: round(summary.health.p99), + maxMs: round(summary.health.max), + avgMs: round(summary.health.avg), + wallTimeMs: round(summary.wallTimeMs), + undiciConnections: summary.connectionDiagnostics.connectionCount, + alpnH2: summary.connectionDiagnostics.alpnCounts.h2 ?? 0, + alpnHttp1: summary.connectionDiagnostics.alpnCounts['http/1.1'] ?? 0, + h2Fallbacks: summary.connectionDiagnostics.h2FallbackCount, + uniqueLocalPorts: summary.connectionDiagnostics.uniqueLocalPorts.length, + }, + ]); + + if (summary.connectionDiagnostics.connectionCount > 0) { + console.log(`${summary.transport} undici connection diagnostics`); + console.table([ + { + connections: summary.connectionDiagnostics.connectionCount, + alpn: JSON.stringify(summary.connectionDiagnostics.alpnCounts), + h2: summary.connectionDiagnostics.h2ConnectionCount, + h1Fallbacks: summary.connectionDiagnostics.h2FallbackCount, + uniqueLocalPorts: summary.connectionDiagnostics.uniqueLocalPorts.length, + localPorts: summary.connectionDiagnostics.uniqueLocalPorts.join(', '), + }, + ]); + } +} + +function printComparison(results) { + const [http1, http2] = results; + const metrics = [ + ['health p50 ms', http1.healthStats.p50, http2.healthStats.p50], + ['health p90 ms', http1.healthStats.p90, http2.healthStats.p90], + ['health p95 ms', http1.healthStats.p95, http2.healthStats.p95], + ['health p99 ms', http1.healthStats.p99, http2.healthStats.p99], + ['wall time ms', http1.wallTimeMs, http2.wallTimeMs], + ]; + + console.log('\nHTTP/1.1 vs HTTP/2 comparison'); + console.table( + metrics.map(([metric, http1Value, http2Value]) => ({ + metric, + http1: round(http1Value), + http2: round(http2Value), + deltaHttp2MinusHttp1: http1Value == null || http2Value == null ? null : round(http2Value - http1Value), + })), + ); +} + +function summarizeDurations(values) { + const sorted = values.filter((value) => Number.isFinite(value)).sort((a, b) => a - b); + if (sorted.length === 0) { + return { + count: 0, + min: null, + p50: null, + p90: null, + p95: null, + p99: null, + max: null, + avg: null, + }; + } + + const sum = sorted.reduce((total, value) => total + value, 0); + return { + count: sorted.length, + min: sorted[0], + p50: percentile(sorted, 50), + p90: percentile(sorted, 90), + p95: percentile(sorted, 95), + p99: percentile(sorted, 99), + max: sorted[sorted.length - 1], + avg: sum / sorted.length, + }; +} + +function percentile(sortedValues, percentileValue) { + if (sortedValues.length === 0) return null; + const index = Math.ceil((percentileValue / 100) * sortedValues.length) - 1; + return sortedValues[Math.min(sortedValues.length - 1, Math.max(0, index))]; +} + +function parsePositiveInteger(value, fallback) { + if (value === undefined || value === '') return fallback; + if (!/^\d+$/.test(value)) return undefined; + const parsed = Number(value); + return parsed > 0 && Number.isSafeInteger(parsed) ? parsed : undefined; +} + +function defaultResultsPath() { + const timestamp = new Date().toISOString().replace(/[:.]/g, '-'); + return path.join('tmp', `health-endpoint-benchmark-${timestamp}.json`); +} + +function createUndiciConnectionDiagnostics() { + const events = []; + const onConnected = (message) => { + const socket = message?.socket; + const rawAlpn = socket?.alpnProtocol; + const alpnProtocol = + typeof rawAlpn === 'string' && rawAlpn.length > 0 ? rawAlpn + : rawAlpn === false ? 'http/1.1' + : 'unknown'; + + events.push({ + alpnProtocol, + localPort: socket?.localPort ?? null, + remoteAddress: socket?.remoteAddress ?? null, + remotePort: socket?.remotePort ?? null, + encrypted: Boolean(socket?.encrypted), + }); + }; + + return { + start() { + diagnostics_channel.subscribe('undici:client:connected', onConnected); + }, + stop() { + diagnostics_channel.unsubscribe('undici:client:connected', onConnected); + }, + summary() { + const alpnCounts = {}; + const localPorts = new Set(); + for (const event of events) { + alpnCounts[event.alpnProtocol] = (alpnCounts[event.alpnProtocol] ?? 0) + 1; + if (event.localPort != null) localPorts.add(event.localPort); + } + + const h2ConnectionCount = alpnCounts.h2 ?? 0; + const http1ConnectionCount = alpnCounts['http/1.1'] ?? 0; + + return { + connectionCount: events.length, + alpnCounts, + h2ConnectionCount, + http1ConnectionCount, + h2FallbackCount: events.length - h2ConnectionCount, + uniqueLocalPorts: [...localPorts].sort((a, b) => a - b), + events, + }; + }, + }; +} + +function serializeError(error) { + if (!error) return null; + return { + name: error.name ?? error.constructor?.name ?? 'Error', + message: error.message ?? String(error), + status: error.status ?? null, + stack: error.stack ?? null, + }; +} + +function round(value) { + return value == null ? null : Math.round(value); +} From 3f2f5c0176c2bd331e08cee09f8eebb7a2150f56 Mon Sep 17 00:00:00 2001 From: Tony Deng Date: Fri, 29 May 2026 14:03:44 -0700 Subject: [PATCH 4/5] Revert "Push health tes" This reverts commit 6de22fa9ad43a844b417c880c1c4105c7c1f0e44. --- package.json | 1 - .../scripts/health-endpoint-benchmark.mjs | 343 ------------------ 2 files changed, 344 deletions(-) delete mode 100644 tests/smoketests/scripts/health-endpoint-benchmark.mjs diff --git a/package.json b/package.json index cf151ab2a..4024abed4 100644 --- a/package.json +++ b/package.json @@ -20,7 +20,6 @@ "test": "./scripts/test", "test:smoke": "RUN_SMOKETESTS=1 jest --verbose tests/smoketests", "test:e2e:devbox-startup": "yarn build && node tests/smoketests/scripts/devbox-startup-benchmark.mjs", - "test:e2e:health": "yarn build && node tests/smoketests/scripts/health-endpoint-benchmark.mjs", "test:examples": "RUN_SMOKETESTS=1 jest --verbose tests/smoketests/examples/examples.test.ts", "test:objects": "RUN_SMOKETESTS=1 jest --config jest.config.objects.js --verbose", "test:objects-coverage": "RUN_SMOKETESTS=1 jest --verbose --config jest.config.objects.js --coverage --coverageReporters=text --coverageReporters=json-summary", diff --git a/tests/smoketests/scripts/health-endpoint-benchmark.mjs b/tests/smoketests/scripts/health-endpoint-benchmark.mjs deleted file mode 100644 index 951ec46a0..000000000 --- a/tests/smoketests/scripts/health-endpoint-benchmark.mjs +++ /dev/null @@ -1,343 +0,0 @@ -/** - * Health endpoint benchmark for comparing the SDK HTTP/1.1 and HTTP/2 transports. - * - * Runs OUTSIDE the normal smoke suite. Build first, then run explicitly: - * - * RUNLOOP_API_KEY=... [RUNLOOP_BASE_URL=...] [RUNLOOP_E2E_HEALTH_REQUEST_COUNT=10000] yarn test:e2e:health - */ -import fs from 'node:fs/promises'; -import path from 'node:path'; -import diagnostics_channel from 'node:diagnostics_channel'; -import { createRequire } from 'node:module'; -import { performance } from 'node:perf_hooks'; - -const require = createRequire(import.meta.url); -const distPath = new URL('../../../dist/index.js', import.meta.url).pathname; - -let Runloop; -try { - ({ Runloop } = require(distPath)); -} catch (error) { - console.error(`Failed to load built SDK from ${distPath}. Run \`yarn build\` first.`); - console.error(`${error?.constructor?.name ?? 'Error'}: ${error?.message ?? error}`); - process.exit(2); -} - -const DEFAULT_REQUEST_COUNT = 10_000; - -const apiKey = process.env.RUNLOOP_API_KEY_DEV; -const baseURL = process.env.RUNLOOP_BASE_URL ?? 'https://api.runloop.pro'; -const requestCount = parsePositiveInteger( - process.env.RUNLOOP_E2E_HEALTH_REQUEST_COUNT ?? process.env.RUNLOOP_E2E_DEVBOX_COUNT, - DEFAULT_REQUEST_COUNT, -); -const resultsPath = process.env.RUNLOOP_E2E_RESULTS_PATH ?? defaultResultsPath(); - -if (!apiKey) { - console.error('RUNLOOP_API_KEY is required'); - process.exit(2); -} - -if (requestCount === undefined) { - console.error('RUNLOOP_E2E_HEALTH_REQUEST_COUNT must be a positive integer'); - process.exit(2); -} - -const transports = [ - { name: 'http1', http2: false }, - { name: 'http2', http2: true }, -]; - -const startedAt = new Date().toISOString(); -const passResults = []; - -for (const transport of transports) { - passResults.push(await runTransportPass(transport)); -} - -const endedAt = new Date().toISOString(); -const artifact = { - benchmark: 'health-endpoint-http1-vs-http2', - startedAt, - endedAt, - config: { - requestCount, - baseURL, - endpoint: '/health', - }, - summary: passResults.map(({ records, summary, ...pass }) => pass), - passes: passResults, -}; - -await fs.mkdir(path.dirname(resultsPath), { recursive: true }); -await fs.writeFile(resultsPath, `${JSON.stringify(artifact, null, 2)}\n`, 'utf8'); - -printComparison(passResults); -console.log(`\nWrote results artifact: ${resultsPath}`); - -const hasFailures = passResults.some((pass) => pass.failureCount > 0); -process.exit(hasFailures ? 1 : 0); - -async function runTransportPass(transport) { - const diagnostics = createUndiciConnectionDiagnostics(); - const client = new Runloop({ - bearerToken: apiKey, - baseURL, - timeout: 30_000, - maxRetries: 0, - http2: transport.http2, - }); - - console.log(`\nStarting ${transport.name} pass with ${requestCount} concurrent health checks`); - const wallStart = performance.now(); - diagnostics.start(); - let settled; - let wallTimeMs; - try { - settled = await Promise.allSettled( - Array.from({ length: requestCount }, (_, index) => pingHealthEndpoint(client, transport.name, index)), - ); - wallTimeMs = performance.now() - wallStart; - } finally { - diagnostics.stop(); - } - - const records = settled.map((result, index) => { - if (result.status === 'fulfilled') return result.value; - return { - transport: transport.name, - index, - healthDurationMs: null, - status: null, - contentType: null, - bodySample: null, - error: serializeError(result.reason), - }; - }); - - const failureCount = records.filter((record) => record.error).length; - const healthStats = summarizeDurations(records.map((record) => record.healthDurationMs)); - const connectionDiagnostics = diagnostics.summary(); - const summary = { - transport: transport.name, - requested: requestCount, - successCount: records.length - failureCount, - failureCount, - health: healthStats, - wallTimeMs, - connectionDiagnostics, - }; - - printPassSummary(summary); - - return { - transport: transport.name, - http2: transport.http2, - requested: requestCount, - successCount: summary.successCount, - failureCount, - healthStats, - wallTimeMs, - connectionDiagnostics, - summary, - records, - }; -} - -async function pingHealthEndpoint(client, transport, index) { - const healthStart = performance.now(); - const record = { - transport, - index, - healthDurationMs: null, - status: null, - contentType: null, - bodySample: null, - error: null, - }; - - try { - const response = await client.get('/health').asResponse(); - record.healthDurationMs = performance.now() - healthStart; - record.status = response.status; - record.contentType = response.headers.get('content-type'); - - const body = await response.text(); - record.bodySample = body.length > 200 ? `${body.slice(0, 200)}...` : body; - } catch (error) { - record.healthDurationMs = performance.now() - healthStart; - record.error = serializeError(error); - } - - return record; -} - -function printPassSummary(summary) { - console.log(`\n${summary.transport} summary`); - console.table([ - { - transport: summary.transport, - requested: summary.requested, - successes: summary.successCount, - failures: summary.failureCount, - minMs: round(summary.health.min), - p50Ms: round(summary.health.p50), - p90Ms: round(summary.health.p90), - p95Ms: round(summary.health.p95), - p99Ms: round(summary.health.p99), - maxMs: round(summary.health.max), - avgMs: round(summary.health.avg), - wallTimeMs: round(summary.wallTimeMs), - undiciConnections: summary.connectionDiagnostics.connectionCount, - alpnH2: summary.connectionDiagnostics.alpnCounts.h2 ?? 0, - alpnHttp1: summary.connectionDiagnostics.alpnCounts['http/1.1'] ?? 0, - h2Fallbacks: summary.connectionDiagnostics.h2FallbackCount, - uniqueLocalPorts: summary.connectionDiagnostics.uniqueLocalPorts.length, - }, - ]); - - if (summary.connectionDiagnostics.connectionCount > 0) { - console.log(`${summary.transport} undici connection diagnostics`); - console.table([ - { - connections: summary.connectionDiagnostics.connectionCount, - alpn: JSON.stringify(summary.connectionDiagnostics.alpnCounts), - h2: summary.connectionDiagnostics.h2ConnectionCount, - h1Fallbacks: summary.connectionDiagnostics.h2FallbackCount, - uniqueLocalPorts: summary.connectionDiagnostics.uniqueLocalPorts.length, - localPorts: summary.connectionDiagnostics.uniqueLocalPorts.join(', '), - }, - ]); - } -} - -function printComparison(results) { - const [http1, http2] = results; - const metrics = [ - ['health p50 ms', http1.healthStats.p50, http2.healthStats.p50], - ['health p90 ms', http1.healthStats.p90, http2.healthStats.p90], - ['health p95 ms', http1.healthStats.p95, http2.healthStats.p95], - ['health p99 ms', http1.healthStats.p99, http2.healthStats.p99], - ['wall time ms', http1.wallTimeMs, http2.wallTimeMs], - ]; - - console.log('\nHTTP/1.1 vs HTTP/2 comparison'); - console.table( - metrics.map(([metric, http1Value, http2Value]) => ({ - metric, - http1: round(http1Value), - http2: round(http2Value), - deltaHttp2MinusHttp1: http1Value == null || http2Value == null ? null : round(http2Value - http1Value), - })), - ); -} - -function summarizeDurations(values) { - const sorted = values.filter((value) => Number.isFinite(value)).sort((a, b) => a - b); - if (sorted.length === 0) { - return { - count: 0, - min: null, - p50: null, - p90: null, - p95: null, - p99: null, - max: null, - avg: null, - }; - } - - const sum = sorted.reduce((total, value) => total + value, 0); - return { - count: sorted.length, - min: sorted[0], - p50: percentile(sorted, 50), - p90: percentile(sorted, 90), - p95: percentile(sorted, 95), - p99: percentile(sorted, 99), - max: sorted[sorted.length - 1], - avg: sum / sorted.length, - }; -} - -function percentile(sortedValues, percentileValue) { - if (sortedValues.length === 0) return null; - const index = Math.ceil((percentileValue / 100) * sortedValues.length) - 1; - return sortedValues[Math.min(sortedValues.length - 1, Math.max(0, index))]; -} - -function parsePositiveInteger(value, fallback) { - if (value === undefined || value === '') return fallback; - if (!/^\d+$/.test(value)) return undefined; - const parsed = Number(value); - return parsed > 0 && Number.isSafeInteger(parsed) ? parsed : undefined; -} - -function defaultResultsPath() { - const timestamp = new Date().toISOString().replace(/[:.]/g, '-'); - return path.join('tmp', `health-endpoint-benchmark-${timestamp}.json`); -} - -function createUndiciConnectionDiagnostics() { - const events = []; - const onConnected = (message) => { - const socket = message?.socket; - const rawAlpn = socket?.alpnProtocol; - const alpnProtocol = - typeof rawAlpn === 'string' && rawAlpn.length > 0 ? rawAlpn - : rawAlpn === false ? 'http/1.1' - : 'unknown'; - - events.push({ - alpnProtocol, - localPort: socket?.localPort ?? null, - remoteAddress: socket?.remoteAddress ?? null, - remotePort: socket?.remotePort ?? null, - encrypted: Boolean(socket?.encrypted), - }); - }; - - return { - start() { - diagnostics_channel.subscribe('undici:client:connected', onConnected); - }, - stop() { - diagnostics_channel.unsubscribe('undici:client:connected', onConnected); - }, - summary() { - const alpnCounts = {}; - const localPorts = new Set(); - for (const event of events) { - alpnCounts[event.alpnProtocol] = (alpnCounts[event.alpnProtocol] ?? 0) + 1; - if (event.localPort != null) localPorts.add(event.localPort); - } - - const h2ConnectionCount = alpnCounts.h2 ?? 0; - const http1ConnectionCount = alpnCounts['http/1.1'] ?? 0; - - return { - connectionCount: events.length, - alpnCounts, - h2ConnectionCount, - http1ConnectionCount, - h2FallbackCount: events.length - h2ConnectionCount, - uniqueLocalPorts: [...localPorts].sort((a, b) => a - b), - events, - }; - }, - }; -} - -function serializeError(error) { - if (!error) return null; - return { - name: error.name ?? error.constructor?.name ?? 'Error', - message: error.message ?? String(error), - status: error.status ?? null, - stack: error.stack ?? null, - }; -} - -function round(value) { - return value == null ? null : Math.round(value); -} From 67b980eda4e01a73f4d2d3ee75d981fb97edbb10 Mon Sep 17 00:00:00 2001 From: Tony Deng Date: Fri, 29 May 2026 14:15:00 -0700 Subject: [PATCH 5/5] Format undici pool fallback --- src/lib/undici-fetch.ts | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/lib/undici-fetch.ts b/src/lib/undici-fetch.ts index 81ee28bbf..21d46793f 100644 --- a/src/lib/undici-fetch.ts +++ b/src/lib/undici-fetch.ts @@ -348,7 +348,8 @@ async function undiciFallbackFetch(url: URL, init: any): Promise { dispatcher: h1Dispatcher, }); - const responseBody = statusMustNotHaveBody(result.statusCode) || init.method === 'HEAD' ? null : result.body; + const responseBody = + statusMustNotHaveBody(result.statusCode) || init.method === 'HEAD' ? null : result.body; if (responseBody === null) await result.body.dump(); const response = new Response(responseBody, {