diff --git a/packages/nuxi/src/dev/fetch.ts b/packages/nuxi/src/dev/fetch.ts index ba5a380fa..acfd1e824 100644 --- a/packages/nuxi/src/dev/fetch.ts +++ b/packages/nuxi/src/dev/fetch.ts @@ -1,5 +1,9 @@ import type { IncomingMessage, RequestListener, ServerResponse } from 'node:http' +import { request as httpRequest } from 'node:http' +import { Readable } from 'node:stream' +import { pipeline } from 'node:stream/promises' import { NodeRequest } from 'srvx/node' +import { isWindows } from 'std-env' import { Agent } from 'undici' interface DevAddress { @@ -32,6 +36,88 @@ function fetchSocketOptions(socketPath: string) { } } +interface NodeHttpResponse { + status: number + statusText: string + headers: Headers + body: Readable +} +/** + * fetch using native Node.js http.request for Windows named pipes + * this bypasses undici's Web Streams which have buffering issues on Windows + */ +function fetchWithNodeHttp(socketPath: string, url: URL, init?: RequestInit & { duplex?: string }): Promise { + return new Promise((resolve, reject) => { + const headers: Record = {} + if (init?.headers) { + if (init.headers instanceof Headers) { + for (const [key, value] of init.headers.entries()) { + headers[key] = value + } + } + else if (Array.isArray(init.headers)) { + for (const [key, value] of init.headers) { + headers[key] = value + } + } + else { + Object.assign(headers, init.headers) + } + } + + const req = httpRequest({ + socketPath, + path: url.pathname + url.search, + method: init?.method || 'GET', + headers, + }, (res) => { + const responseHeaders = new Headers() + for (const [key, value] of Object.entries(res.headers)) { + if (value !== undefined) { + responseHeaders.set(key, Array.isArray(value) ? value.join(', ') : value) + } + } + + resolve({ + status: res.statusCode || 200, + statusText: res.statusMessage || 'OK', + headers: responseHeaders, + body: res, + }) + }) + + req.on('error', reject) + + if (init?.body) { + if (typeof init.body === 'string') { + req.write(init.body) + } + else if (init.body instanceof ReadableStream) { + const reader = init.body.getReader() + const pump = async () => { + try { + while (true) { + const { done, value } = await reader.read() + if (done) { + break + } + req.write(value) + } + req.end() + } + catch (err) { + req.destroy(err as Error) + } + } + pump() + return + } + } + + req.end() + }) +} + /** * Fetch to a specific address (socket or network) * Based on Nitro's fetchAddress implementation @@ -40,7 +126,7 @@ function fetchAddress( addr: DevAddress, input: string | URL | Request, inputInit?: RequestInit, -): Promise { +): Promise { let url: URL let init: (RequestInit & { duplex?: string }) | undefined @@ -64,6 +150,11 @@ function fetchAddress( ...init, } + if (addr.socketPath && isWindows) { + url.protocol = 'http:' + return fetchWithNodeHttp(addr.socketPath, url, init) + } + if (addr.socketPath) { url.protocol = 'http:' return fetch(url, { @@ -80,7 +171,7 @@ function fetchAddress( /** * Send Web API Response to Node.js ServerResponse */ -async function sendWebResponse(res: ServerResponse, webResponse: Response): Promise { +async function sendWebResponse(res: ServerResponse, webResponse: Response | NodeHttpResponse): Promise { // Set status res.statusCode = webResponse.status res.statusMessage = webResponse.statusText @@ -92,6 +183,12 @@ async function sendWebResponse(res: ServerResponse, webResponse: Response): Prom // Stream body if (webResponse.body) { + // handle node readable stream (from Windows named pipe fetch) + if (webResponse.body instanceof Readable) { + await pipeline(webResponse.body, res, { end: true }) + return + } + const reader = webResponse.body.getReader() try { while (true) { @@ -99,7 +196,10 @@ async function sendWebResponse(res: ServerResponse, webResponse: Response): Prom if (done) { break } - res.write(value) + // backpressure + if (!res.write(value)) { + await new Promise(resolve => res.once('drain', resolve)) + } } } finally {