Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 103 additions & 3 deletions packages/nuxi/src/dev/fetch.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
import type { IncomingMessage, RequestListener, ServerResponse } from 'node:http'
import { request as httpRequest } from 'node:http'
import { Readable } from 'node:stream'
import { pipeline } from 'node:stream/promises'
import { NodeRequest } from 'srvx/node'
import { isWindows } from 'std-env'
import { Agent } from 'undici'

interface DevAddress {
Expand Down Expand Up @@ -32,6 +36,88 @@ function fetchSocketOptions(socketPath: string) {
}
}

interface NodeHttpResponse {
status: number
statusText: string
headers: Headers
body: Readable
}
/**
* fetch using native Node.js http.request for Windows named pipes
* this bypasses undici's Web Streams which have buffering issues on Windows
*/
function fetchWithNodeHttp(socketPath: string, url: URL, init?: RequestInit & { duplex?: string }): Promise<NodeHttpResponse> {
return new Promise((resolve, reject) => {
const headers: Record<string, string> = {}
if (init?.headers) {
if (init.headers instanceof Headers) {
for (const [key, value] of init.headers.entries()) {
headers[key] = value
}
}
else if (Array.isArray(init.headers)) {
for (const [key, value] of init.headers) {
headers[key] = value
}
}
else {
Object.assign(headers, init.headers)
}
}

const req = httpRequest({
socketPath,
path: url.pathname + url.search,
method: init?.method || 'GET',
headers,
}, (res) => {
const responseHeaders = new Headers()
for (const [key, value] of Object.entries(res.headers)) {
if (value !== undefined) {
responseHeaders.set(key, Array.isArray(value) ? value.join(', ') : value)
}
}

resolve({
status: res.statusCode || 200,
statusText: res.statusMessage || 'OK',
headers: responseHeaders,
body: res,
})
})

req.on('error', reject)

if (init?.body) {
if (typeof init.body === 'string') {
req.write(init.body)
}
else if (init.body instanceof ReadableStream) {
const reader = init.body.getReader()
const pump = async () => {
try {
while (true) {
const { done, value } = await reader.read()
if (done) {
break
}
req.write(value)
}
req.end()
}
catch (err) {
req.destroy(err as Error)
}
}
pump()
return
}
}

req.end()
})
}

/**
* Fetch to a specific address (socket or network)
* Based on Nitro's fetchAddress implementation
Expand All @@ -40,7 +126,7 @@ function fetchAddress(
addr: DevAddress,
input: string | URL | Request,
inputInit?: RequestInit,
): Promise<Response> {
): Promise<Response | NodeHttpResponse> {
let url: URL
let init: (RequestInit & { duplex?: string }) | undefined

Expand All @@ -64,6 +150,11 @@ function fetchAddress(
...init,
}

if (addr.socketPath && isWindows) {
url.protocol = 'http:'
return fetchWithNodeHttp(addr.socketPath, url, init)
}

if (addr.socketPath) {
url.protocol = 'http:'
return fetch(url, {
Expand All @@ -80,7 +171,7 @@ function fetchAddress(
/**
* Send Web API Response to Node.js ServerResponse
*/
async function sendWebResponse(res: ServerResponse, webResponse: Response): Promise<void> {
async function sendWebResponse(res: ServerResponse, webResponse: Response | NodeHttpResponse): Promise<void> {
// Set status
res.statusCode = webResponse.status
res.statusMessage = webResponse.statusText
Expand All @@ -92,14 +183,23 @@ async function sendWebResponse(res: ServerResponse, webResponse: Response): Prom

// Stream body
if (webResponse.body) {
// handle node readable stream (from Windows named pipe fetch)
if (webResponse.body instanceof Readable) {
await pipeline(webResponse.body, res, { end: true })
return
}

const reader = webResponse.body.getReader()
try {
while (true) {
const { done, value } = await reader.read()
if (done) {
break
}
res.write(value)
// backpressure
if (!res.write(value)) {
await new Promise<void>(resolve => res.once('drain', resolve))
}
}
}
finally {
Expand Down