diff --git a/src/lib/undici-fetch.ts b/src/lib/undici-fetch.ts index db112b416..21d46793f 100644 --- a/src/lib/undici-fetch.ts +++ b/src/lib/undici-fetch.ts @@ -1,13 +1,12 @@ /** - * A fetch-compatible adapter backed by undici's HTTP/2 support. + * A fetch-compatible adapter backed by a bounded HTTP/2 transport. * - * undici is the same engine that powers Node's built-in global `fetch`. - * Constructing an `Agent` with `allowH2: true` and passing it as the - * per-request `dispatcher` makes requests negotiate HTTP/2 via ALPN, with - * automatic fallback to HTTP/1.1 when the origin doesn't advertise h2. undici - * returns a standard WHATWG `Response`, so the rest of core.ts — which only - * touches standard Response members (`.status`, `.ok`, `.headers`, `.text()`, - * `.json()`, `.body`, `.arrayBuffer()`, `.blob()`) — is unchanged. + * The HTTPS/H2 path uses node:http2 directly so high-concurrency SDK calls share + * a few TLS sessions with many streams. undici 6.26.0's fetch/request H2 paths + * can still assert-crash under this workload, so undici is kept for the + * WHATWG `Response` implementation and HTTP/1.1 fallback only. The rest of + * core.ts keeps using standard Response members (`.status`, `.ok`, `.headers`, + * `.text()`, `.json()`, `.body`, `.arrayBuffer()`, `.blob()`). * * Unlike the previous got@14 approach, undici is dual CJS/ESM and `require`-able * from this `"type": "commonjs"` package, so there is no dynamic-import hack and @@ -17,65 +16,382 @@ * regeneration; the only generated file touched is the one-line wiring change * in src/_shims/node-runtime.ts. */ -import { Agent, fetch as undiciFetchImpl } from 'undici'; +import { Agent, Headers, Response, request as undiciRequest } from 'undici'; +import diagnosticsChannel from 'node:diagnostics_channel'; +import http2 from 'node:http2'; import { Readable } from 'node:stream'; import { MultipartBody } from '../_shims/MultipartBody'; import { type Fetch } from '../core'; -// One module-scoped dispatcher, reused across requests: this is the HTTP/2 -// transport, with keep-alive. `allowH2` negotiates h2 over TLS via ALPN and -// transparently falls back to HTTP/1.1 when the origin doesn't offer h2. -const h2Dispatcher = new Agent({ - allowH2: true, - keepAliveTimeout: 10 * 60 * 1000, - keepAliveMaxTimeout: 10 * 60 * 1000, +const MAX_H2_SESSIONS = 4; +const MAX_H2_STREAMS_PER_SESSION = 64; +const KEEP_ALIVE_TIMEOUT_MS = 10 * 60 * 1000; + +// HTTP/1.1 fallback for non-HTTPS origins and HTTPS origins that don't +// negotiate h2. The H2 path below uses node:http2 directly because undici 6.26.0 +// can assert-crash under high-concurrency H2 request multiplexing. +const h1Dispatcher = new Agent({ + allowH2: false, + connections: 4, + keepAliveTimeout: KEEP_ALIVE_TIMEOUT_MS, + keepAliveMaxTimeout: KEEP_ALIVE_TIMEOUT_MS, }); -type NormalizedBody = { body: any; isStream: boolean }; +const connectedChannel = diagnosticsChannel.channel('undici:client:connected'); +const pools = new Map(); // Map the body shapes core.ts produces (string | Buffer/ArrayBufferView | -// Node Readable for multipart | null) onto a valid undici BodyInit. A Node -// Readable must become a Web ReadableStream and requires `duplex: 'half'`. -function normalizeBody(body: unknown): NormalizedBody { - if (body == null) return { body: undefined, isStream: false }; - if (typeof body === 'string') return { body, isStream: false }; - if (Buffer.isBuffer(body)) return { body, isStream: false }; +// Node Readable for multipart | null) onto a valid undici.request body. +function normalizeBody(body: unknown): any { + if (body == null) return undefined; + if (typeof body === 'string') return body; + if (Buffer.isBuffer(body)) return body; // Unwrap MultipartBody (wraps a Readable in `.body`). core.ts already unwraps // it, but handle it defensively. if (body instanceof MultipartBody) return normalizeBody((body as MultipartBody).body); - if (body instanceof Readable) { - return { body: Readable.toWeb(body) as any, isStream: true }; - } + if (body instanceof Readable) return body; // ArrayBufferView (Uint8Array, DataView, typed arrays) and ArrayBuffer are - // valid BodyInit as-is / after a Buffer wrap. - if (ArrayBuffer.isView(body)) return { body, isStream: false }; - if (body instanceof ArrayBuffer) return { body: Buffer.from(body), isStream: false }; - return { body: String(body), isStream: false }; + // valid undici bodies as-is / after a Buffer wrap. + if (ArrayBuffer.isView(body)) return body; + if (body instanceof ArrayBuffer) return Buffer.from(body); + return String(body); +} + +function toResponseHeaders(headers: Record): Headers { + const responseHeaders = new Headers(); + for (const [name, value] of Object.entries(headers)) { + if (value === undefined) continue; + if (Array.isArray(value)) { + for (const item of value) responseHeaders.append(name, item); + } else { + responseHeaders.append(name, value); + } + } + return responseHeaders; +} + +function statusMustNotHaveBody(status: number): boolean { + return status === 204 || status === 205 || status === 304; +} + +function abortError(): Error { + const error = new Error('The operation was aborted'); + error.name = 'AbortError'; + return error; +} + +function originFor(url: URL): string { + return `${url.protocol}//${url.host}`; +} + +function pathFor(url: URL): string { + return `${url.pathname}${url.search}`; +} + +function filterH2RequestHeaders(headers: Record | undefined): Record { + const filtered: Record = {}; + if (!headers) return filtered; + + for (const [rawName, rawValue] of Object.entries(headers)) { + if (rawValue == null) continue; + const name = rawName.toLowerCase(); + if ( + name === 'connection' || + name === 'keep-alive' || + name === 'proxy-connection' || + name === 'transfer-encoding' || + name === 'upgrade' || + name === 'host' + ) { + continue; + } + filtered[name] = String(rawValue); + } + return filtered; +} + +function toH2ResponseHeaders(headers: http2.IncomingHttpHeaders): Headers { + const responseHeaders = new Headers(); + for (const [name, value] of Object.entries(headers)) { + if (name.startsWith(':') || value === undefined) continue; + if (Array.isArray(value)) { + for (const item of value) responseHeaders.append(name, String(item)); + } else { + responseHeaders.append(name, String(value)); + } + } + return responseHeaders; +} + +function getPool(url: URL): H2Pool { + const origin = originFor(url); + let pool = pools.get(origin); + if (!pool) { + pool = new H2Pool(origin); + pools.set(origin, pool); + } + return pool; +} + +type H2SessionEntry = { + session: http2.ClientHttp2Session; + activeStreams: number; + ready: Promise; + alpnProtocol: string | false | undefined; + closed: boolean; + idleTimer: NodeJS.Timeout | undefined; +}; + +class H2Pool { + private sessions: H2SessionEntry[] = []; + private waiters: Array<(entry: H2SessionEntry) => void> = []; + + constructor(private readonly origin: string) {} + + async request(url: URL, init: any): Promise { + const entry = await this.acquire(); + try { + await entry.ready; + } catch (error) { + this.release(entry); + throw error; + } + + if (entry.alpnProtocol !== 'h2') { + this.release(entry); + entry.session.close(); + throw new Error(`HTTP/2 was not negotiated; ALPN=${String(entry.alpnProtocol || 'none')}`); + } + + return this.dispatch(entry, url, init); + } + + private acquire(): Promise { + const existing = this.sessions.find( + (entry) => !entry.closed && entry.activeStreams < MAX_H2_STREAMS_PER_SESSION, + ); + if (existing) { + if (existing.idleTimer) { + clearTimeout(existing.idleTimer); + existing.idleTimer = undefined; + } + existing.session.ref?.(); + existing.activeStreams++; + return Promise.resolve(existing); + } + + if (this.sessions.filter((entry) => !entry.closed).length < MAX_H2_SESSIONS) { + const created = this.createSession(); + created.session.ref?.(); + created.activeStreams++; + return Promise.resolve(created); + } + + return new Promise((resolve) => { + this.waiters.push(resolve); + }); + } + + private createSession(): H2SessionEntry { + const session = http2.connect(this.origin, { + ALPNProtocols: ['h2', 'http/1.1'], + }); + + const entry: H2SessionEntry = { + session, + activeStreams: 0, + ready: Promise.resolve(), + alpnProtocol: undefined, + closed: false, + idleTimer: undefined, + }; + + entry.ready = new Promise((resolve, reject) => { + session.once('connect', () => { + entry.alpnProtocol = (session.socket as any).alpnProtocol; + connectedChannel.publish({ socket: session.socket }); + resolve(); + }); + session.once('error', reject); + }); + + session.once('close', () => { + entry.closed = true; + if (entry.idleTimer) clearTimeout(entry.idleTimer); + this.sessions = this.sessions.filter((item) => item !== entry); + this.drainWaiters(); + }); + + session.on('error', () => { + entry.closed = true; + }); + + this.sessions.push(entry); + return entry; + } + + private dispatch(entry: H2SessionEntry, url: URL, init: any): Promise { + return new Promise((resolve, reject) => { + if (init.signal?.aborted) { + this.release(entry); + reject(abortError()); + return; + } + + const body = normalizeBody(init.body); + const requestHeaders: http2.OutgoingHttpHeaders = { + ...filterH2RequestHeaders(init.headers), + [http2.constants.HTTP2_HEADER_METHOD]: init.method ?? 'GET', + [http2.constants.HTTP2_HEADER_SCHEME]: url.protocol.slice(0, -1), + [http2.constants.HTTP2_HEADER_AUTHORITY]: url.host, + [http2.constants.HTTP2_HEADER_PATH]: pathFor(url), + }; + + const stream = entry.session.request(requestHeaders, { endStream: body === undefined }); + let settled = false; + let released = false; + + const releaseOnce = () => { + if (released) return; + released = true; + this.release(entry); + }; + + const rejectOnce = (error: Error) => { + if (settled) return; + settled = true; + releaseOnce(); + reject(error); + }; + + const onAbort = () => { + stream.close(http2.constants.NGHTTP2_CANCEL); + rejectOnce(abortError()); + }; + + init.signal?.addEventListener('abort', onAbort, { once: true }); + + stream.once('response', (headers) => { + if (settled) return; + settled = true; + + const status = Number(headers[http2.constants.HTTP2_HEADER_STATUS] ?? 0); + const responseBody = statusMustNotHaveBody(status) || init.method === 'HEAD' ? null : stream; + if (responseBody === null) stream.resume(); + + const response = new Response(responseBody as any, { + status, + headers: toH2ResponseHeaders(headers), + }); + Object.defineProperty(response, 'url', { value: String(url) }); + + stream.once('end', releaseOnce); + stream.once('close', releaseOnce); + stream.once('error', releaseOnce); + resolve(response); + }); + + stream.once('error', (error) => { + rejectOnce(error); + }); + + stream.once('close', () => { + init.signal?.removeEventListener('abort', onAbort); + if (!settled) rejectOnce(new Error('HTTP/2 stream closed before response headers')); + }); + + if (body instanceof Readable) { + body.once('error', (error) => stream.destroy(error)); + body.pipe(stream); + } else if (body !== undefined) { + stream.end(body); + } + }); + } + + private release(entry: H2SessionEntry) { + entry.activeStreams = Math.max(0, entry.activeStreams - 1); + this.drainWaiters(); + + if (entry.activeStreams === 0 && !entry.closed && !entry.idleTimer) { + entry.session.unref?.(); + entry.idleTimer = setTimeout(() => { + entry.session.close(); + }, KEEP_ALIVE_TIMEOUT_MS); + entry.idleTimer.unref?.(); + } + } + + private drainWaiters() { + while (this.waiters.length > 0) { + const entry = this.sessions.find( + (candidate) => !candidate.closed && candidate.activeStreams < MAX_H2_STREAMS_PER_SESSION, + ); + if (!entry) return; + + if (entry.idleTimer) { + clearTimeout(entry.idleTimer); + entry.idleTimer = undefined; + } + entry.session.ref?.(); + entry.activeStreams++; + const resolve = this.waiters.shift(); + resolve?.(entry); + } + } +} + +async function undiciFallbackFetch(url: URL, init: any): Promise { + const result = await undiciRequest(url as any, { + ...init, + body: normalizeBody(init.body), + dispatcher: h1Dispatcher, + }); + + const responseBody = + statusMustNotHaveBody(result.statusCode) || init.method === 'HEAD' ? null : result.body; + if (responseBody === null) await result.body.dump(); + + const response = new Response(responseBody, { + status: result.statusCode, + headers: toResponseHeaders(result.headers), + }); + Object.defineProperty(response, 'url', { value: String(url) }); + return response; } export const undiciFetch: Fetch = async (url, init) => { // core.ts injects a node-fetch-style `agent` in RequestInit; undici uses a - // `dispatcher` instead, so drop `agent`. Pull `signal` and `body` out to - // normalize them; pass everything else (method, headers, redirect, …) through. - const { agent: _ignoredAgent, body: rawBody, signal, ...rest } = (init ?? {}) as any; - - const { body, isStream } = normalizeBody(rawBody); + // `dispatcher` instead, so drop `agent`. Drop fetch-only fields that + // undici.request doesn't understand, and map redirects onto maxRedirections. + const { + agent: _ignoredAgent, + body: rawBody, + duplex: _ignoredDuplex, + redirect, + signal, + ...rest + } = (init ?? {}) as any; - const undiciInit: any = { + const requestURL = new URL(String(url)); + const requestInit = { ...rest, - body, + body: rawBody, + maxRedirections: redirect === 'manual' || redirect === 'error' ? 0 : 20, // core.ts passes a standard web AbortSignal (from `new AbortController()`), - // which undici accepts directly. signal: signal ?? undefined, - dispatcher: h2Dispatcher, }; - // A streamed request body requires the half-duplex hint or undici throws. - if (isStream) undiciInit.duplex = 'half'; - // undici returns a genuine WHATWG Response. The SDK is typed against the - // node-fetch Response, so cast through `any` (the prior got adapter did the - // same); at runtime core.ts only uses standard Response members. - return (await undiciFetchImpl(url as any, undiciInit)) as any; + if (requestURL.protocol !== 'https:') return undiciFallbackFetch(requestURL, requestInit); + + try { + return (await getPool(requestURL).request(requestURL, requestInit)) as any; + } catch (error) { + if (error instanceof Error && /^HTTP\/2 was not negotiated/.test(error.message)) { + return undiciFallbackFetch(requestURL, requestInit); + } + throw error; + } }; export default undiciFetch; diff --git a/tests/lib/undici-fetch.test.ts b/tests/lib/undici-fetch.test.ts new file mode 100644 index 000000000..6cb7b5a34 --- /dev/null +++ b/tests/lib/undici-fetch.test.ts @@ -0,0 +1,122 @@ +import http from 'node:http'; +import { Readable } from 'node:stream'; +import { Runloop } from '@runloop/api-client'; +import { Stream } from '@runloop/api-client/streaming'; +import { undiciFetch } from '@runloop/api-client/lib/undici-fetch'; +import { MultipartBody } from '@runloop/api-client/_shims/MultipartBody'; + +describe('undiciFetch', () => { + let server: http.Server; + let baseURL: string; + const requests: Array<{ method: string; url: string; body: string }> = []; + + beforeAll( + () => + new Promise((resolve) => { + server = http.createServer((req, res) => { + const chunks: Buffer[] = []; + req.on('data', (chunk) => chunks.push(Buffer.from(chunk))); + req.on('end', () => { + const body = Buffer.concat(chunks).toString('utf8'); + requests.push({ method: req.method ?? '', url: req.url ?? '', body }); + + if (req.url === '/json') { + res.writeHead(200, { 'content-type': 'application/json' }); + res.end(JSON.stringify({ ok: true })); + return; + } + + if (req.url === '/error') { + res.writeHead(401, { 'content-type': 'application/json' }); + res.end(JSON.stringify({ error: { message: 'invalid token' } })); + return; + } + + if (req.url === '/sse') { + res.writeHead(200, { 'content-type': 'text/event-stream' }); + res.write('data: {"value":1}\n\n'); + res.end(); + return; + } + + res.writeHead(200, { 'content-type': 'application/json' }); + res.end(JSON.stringify({ body })); + }); + }); + + server.listen(0, '127.0.0.1', () => { + const address = server.address(); + if (address && typeof address === 'object') { + baseURL = `http://127.0.0.1:${address.port}`; + } + resolve(); + }); + }), + ); + + afterAll( + () => + new Promise((resolve, reject) => { + server.close((error) => (error ? reject(error) : resolve())); + }), + ); + + beforeEach(() => { + requests.length = 0; + }); + + test('parses a JSON response through SDK core', async () => { + const client = new Runloop({ + bearerToken: 'test-token', + baseURL, + maxRetries: 0, + http2: true, + }); + + await expect(client.get('/json')).resolves.toEqual({ ok: true }); + }); + + test('rejects error responses with a readable body through SDK core', async () => { + const client = new Runloop({ + bearerToken: 'test-token', + baseURL, + maxRetries: 0, + http2: true, + }); + + await expect(client.get('/error')).rejects.toMatchObject({ + status: 401, + message: expect.stringMatching(/invalid token/i), + }); + }); + + test('keeps streamed SSE responses consumable', async () => { + const response = await undiciFetch(`${baseURL}/sse`); + const stream = Stream.fromSSEResponse<{ value: number }>(response as any, new AbortController()); + + const events = []; + for await (const event of stream) { + events.push(event); + } + + expect(events).toEqual([{ value: 1 }]); + }); + + test.each([ + ['string', 'hello'], + ['buffer', Buffer.from('hello')], + ['array buffer', new TextEncoder().encode('hello').buffer], + ['data view', new DataView(new TextEncoder().encode('hello').buffer)], + ['typed array', new Uint8Array(Buffer.from('hello'))], + ['readable', Readable.from(['hello'])], + ['multipart body', new MultipartBody(Readable.from(['hello']))], + ])('normalizes %s request bodies', async (_label, body) => { + const response = await undiciFetch(`${baseURL}/echo`, { + method: 'POST', + body: body as any, + agent: { shouldBeIgnored: true } as any, + } as any); + + await expect(response.json()).resolves.toEqual({ body: 'hello' }); + }); +}); diff --git a/tests/smoketests/scripts/devbox-startup-benchmark.mjs b/tests/smoketests/scripts/devbox-startup-benchmark.mjs index 6364fe94d..31d656e19 100644 --- a/tests/smoketests/scripts/devbox-startup-benchmark.mjs +++ b/tests/smoketests/scripts/devbox-startup-benchmark.mjs @@ -8,6 +8,7 @@ */ import fs from 'node:fs/promises'; import path from 'node:path'; +import diagnostics_channel from 'node:diagnostics_channel'; import { createRequire } from 'node:module'; import { performance } from 'node:perf_hooks'; @@ -29,7 +30,7 @@ const COMMAND_TIMEOUT_MS = 2 * 60 * 1000; const SHUTDOWN_TIMEOUT_MS = 2 * 60 * 1000; const apiKey = process.env.RUNLOOP_API_KEY; -const baseURL = process.env.RUNLOOP_BASE_URL; +const baseURL = "https://api.runloop.ai"; const devboxCount = parsePositiveInteger(process.env.RUNLOOP_E2E_DEVBOX_COUNT, DEFAULT_DEVBOX_COUNT); const resultsPath = process.env.RUNLOOP_E2E_RESULTS_PATH ?? defaultResultsPath(); @@ -81,6 +82,7 @@ const hasWorkflowFailures = passResults.some((pass) => pass.workflowFailureCount process.exit(hasWorkflowFailures ? 1 : 0); async function runTransportPass(transport) { + const diagnostics = createUndiciConnectionDiagnostics(); const client = new Runloop({ bearerToken: apiKey, baseURL, @@ -91,10 +93,17 @@ async function runTransportPass(transport) { console.log(`\nStarting ${transport.name} pass with ${devboxCount} concurrent devboxes`); const wallStart = performance.now(); - const settled = await Promise.allSettled( - Array.from({ length: devboxCount }, (_, index) => runDevboxWorkflow(client, transport.name, index)), - ); - const wallTimeMs = performance.now() - wallStart; + diagnostics.start(); + let settled; + let wallTimeMs; + try { + settled = await Promise.allSettled( + Array.from({ length: devboxCount }, (_, index) => runDevboxWorkflow(client, transport.name, index)), + ); + wallTimeMs = performance.now() - wallStart; + } finally { + diagnostics.stop(); + } const records = settled.map((result, index) => { if (result.status === 'fulfilled') return result.value; @@ -115,6 +124,7 @@ async function runTransportPass(transport) { const workflowFailureCount = records.filter((record) => record.error).length; const shutdownFailureCount = records.filter((record) => record.shutdownStatus === 'failed').length; const startupStats = summarizeDurations(records.map((record) => record.startupDurationMs)); + const connectionDiagnostics = diagnostics.summary(); const summary = { transport: transport.name, requested: devboxCount, @@ -123,6 +133,7 @@ async function runTransportPass(transport) { shutdownFailureCount, startup: startupStats, wallTimeMs, + connectionDiagnostics, }; printPassSummary(summary); @@ -136,6 +147,7 @@ async function runTransportPass(transport) { shutdownFailureCount, startupStats, wallTimeMs, + connectionDiagnostics, summary, records, }; @@ -244,8 +256,27 @@ function printPassSummary(summary) { maxMs: round(summary.startup.max), avgMs: round(summary.startup.avg), wallTimeMs: round(summary.wallTimeMs), + undiciConnections: summary.connectionDiagnostics.connectionCount, + alpnH2: summary.connectionDiagnostics.alpnCounts.h2 ?? 0, + alpnHttp1: summary.connectionDiagnostics.alpnCounts['http/1.1'] ?? 0, + h2Fallbacks: summary.connectionDiagnostics.h2FallbackCount, + uniqueLocalPorts: summary.connectionDiagnostics.uniqueLocalPorts.length, }, ]); + + if (summary.connectionDiagnostics.connectionCount > 0) { + console.log(`${summary.transport} undici connection diagnostics`); + console.table([ + { + connections: summary.connectionDiagnostics.connectionCount, + alpn: JSON.stringify(summary.connectionDiagnostics.alpnCounts), + h2: summary.connectionDiagnostics.h2ConnectionCount, + h1Fallbacks: summary.connectionDiagnostics.h2FallbackCount, + uniqueLocalPorts: summary.connectionDiagnostics.uniqueLocalPorts.length, + localPorts: summary.connectionDiagnostics.uniqueLocalPorts.join(', '), + }, + ]); + } } function printComparison(results) { @@ -319,6 +350,56 @@ function defaultResultsPath() { return path.join('tmp', `devbox-startup-benchmark-${timestamp}.json`); } +function createUndiciConnectionDiagnostics() { + const events = []; + const onConnected = (message) => { + const socket = message?.socket; + const rawAlpn = socket?.alpnProtocol; + const alpnProtocol = + typeof rawAlpn === 'string' && rawAlpn.length > 0 ? rawAlpn + : rawAlpn === false ? 'http/1.1' + : 'unknown'; + + events.push({ + alpnProtocol, + localPort: socket?.localPort ?? null, + remoteAddress: socket?.remoteAddress ?? null, + remotePort: socket?.remotePort ?? null, + encrypted: Boolean(socket?.encrypted), + }); + }; + + return { + start() { + diagnostics_channel.subscribe('undici:client:connected', onConnected); + }, + stop() { + diagnostics_channel.unsubscribe('undici:client:connected', onConnected); + }, + summary() { + const alpnCounts = {}; + const localPorts = new Set(); + for (const event of events) { + alpnCounts[event.alpnProtocol] = (alpnCounts[event.alpnProtocol] ?? 0) + 1; + if (event.localPort != null) localPorts.add(event.localPort); + } + + const h2ConnectionCount = alpnCounts.h2 ?? 0; + const http1ConnectionCount = alpnCounts['http/1.1'] ?? 0; + + return { + connectionCount: events.length, + alpnCounts, + h2ConnectionCount, + http1ConnectionCount, + h2FallbackCount: events.length - h2ConnectionCount, + uniqueLocalPorts: [...localPorts].sort((a, b) => a - b), + events, + }; + }, + }; +} + function serializeError(error) { if (!error) return null; return {