Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion .aegir.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
/** @type {import('aegir').PartialOptions} */

/** @type {import('aegir/types').PartialOptions} */
export default {
build: {
bundlesizeMax: '18kB'
},
dependencyCheck: {
ignore: [
'undici' // required by http-cookie-agent
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -167,13 +167,13 @@
"release": "aegir release"
},
"dependencies": {
"@achingbrain/http-parser-js": "^0.5.8",
"@libp2p/crypto": "^5.0.6",
"@libp2p/interface": "^2.2.0",
"@libp2p/interface-internal": "^2.0.10",
"@libp2p/peer-id": "^5.0.7",
"@multiformats/multiaddr": "^12.3.0",
"@multiformats/multiaddr-to-uri": "^11.0.0",
"@perseveranza-pets/milo": "^0.2.1",
"http-cookie-agent": "^6.0.7",
"p-defer": "^4.0.1",
"tough-cookie": "^5.0.0",
Expand Down
160 changes: 54 additions & 106 deletions src/fetch/index.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,24 @@
/* eslint-disable max-depth */
/* eslint-disable complexity */

import { HTTPParser } from '@achingbrain/http-parser-js'
import { multiaddr, protocols } from '@multiformats/multiaddr'
import { multiaddrToUri } from '@multiformats/multiaddr-to-uri'
// @ts-expect-error missing types
import { milo } from '@perseveranza-pets/milo/index-with-wasm.js'
import defer from 'p-defer'
import { Uint8ArrayList, isUint8ArrayList } from 'uint8arraylist'
import { type Uint8ArrayList } from 'uint8arraylist'

interface Fetch { (req: Request): Promise<Response> }

const METHOD_GET = 1

function getStringMethod (method: number): string {
if (method === 1) {
return 'GET'
}

return 'UNKNOWN'
}

interface Duplex<TSource, TSink = TSource, RSink = Promise<void>> {
source: AsyncIterable<TSource> | Iterable<TSource>
sink(source: AsyncIterable<TSink> | Iterable<TSink>): RSink
Expand Down Expand Up @@ -85,8 +95,6 @@ export async function handleRequestViaDuplex (s: Duplex<Uint8Array | Uint8ArrayL
await writeResponseToDuplex(s, resp)
}

const BUFFER_SIZE = 16 << 10

/**
* Exported for testing.
*
Expand All @@ -100,85 +108,37 @@ export function readHTTPMsg (expectRequest: boolean, r: Duplex<Uint8Array | Uint
return [
msgPromise.promise,
(async () => {
const unconsumedChunks = new Uint8ArrayList()

const textDecoder = new TextDecoder()
const ptr = milo.alloc(BUFFER_SIZE)

const parser = milo.create()
// Simplifies implementation at the cost of storing data twice
milo.setManageUnconsumed(parser, true)

const bodyStreamControllerPromise = defer <ReadableStreamController<Uint8Array>>()
const body = new ReadableStream<Uint8Array>({
async start (controller) {
bodyStreamControllerPromise.resolve(controller)
}
})
const bodyStreamController = await bodyStreamControllerPromise.promise

// Response
let status = ''
let reason = ''

// Requests
let url = ''
let method = ''

const body = new TransformStream()
const writer = body.writable.getWriter()
let messageComplete = false
let fulfilledMsgPromise = false

milo.setOnStatus(parser, (_: unknown, from: number, size: number) => {
status = textDecoder.decode(unconsumedChunks.subarray(from, from + size))
})
milo.setOnReason(parser, (_: unknown, from: number, size: number) => {
reason = textDecoder.decode(unconsumedChunks.subarray(from, from + size))
})
milo.setOnUrl(parser, (_: unknown, from: number, size: number) => {
url = textDecoder.decode(unconsumedChunks.subarray(from, from + size))
})
milo.setOnMethod(parser, (_: unknown, from: number, size: number) => {
method = textDecoder.decode(unconsumedChunks.subarray(from, from + size))
})
const parser = new HTTPParser(expectRequest ? 'REQUEST' : 'RESPONSE')
parser[HTTPParser.kOnHeadersComplete] = (info) => {
fulfilledMsgPromise = true

milo.setOnRequest(parser, () => {
if (!expectRequest) {
msgPromise.reject(new Error('Received request instead of response'))
fulfilledMsgPromise = true
}
})
milo.setOnResponse(parser, () => {
if (expectRequest) {
msgPromise.reject(new Error('Received response instead of request'))
fulfilledMsgPromise = true
// Handle the headers
const headers = new Headers()

for (let i = 0; i < info.headers.length; i += 2) {
headers.set(info.headers[i], info.headers[i + 1])
}
})

// Handle the headers
const headers = new Headers()
let lastHeaderName: string = ''
let reqBody: ReadableStream | null = body.readable

milo.setOnHeaderName(parser, (_: unknown, from: number, size: number) => {
lastHeaderName = textDecoder.decode(unconsumedChunks.subarray(from, from + size))
})
milo.setOnHeaderValue(parser, (_: unknown, from: number, size: number) => {
const headerVal = textDecoder.decode(unconsumedChunks.subarray(from, from + size))
headers.set(lastHeaderName, headerVal)
})
milo.setOnHeaders(parser, (_: unknown, from: number, size: number) => {
// Headers are parsed. We can return the response
try {
if (expectRequest) {
let reqBody: ReadableStream<Uint8Array> | null = body
if (method === 'GET') {
if (info.method === METHOD_GET) {
reqBody = null
}

const urlWithHost = `https://${headers.get('Host') ?? 'unknown_host._libp2p'}${url}`
const urlWithHost = `https://${headers.get('Host') ?? 'unknown_host._libp2p'}${info.url}`
detectBrokenRequestBody().then(async (broken) => {
let req: Request
if (!broken) {
req = new Request(urlWithHost, {
method,
method: getStringMethod(info.method),
body: reqBody,
headers,
// @ts-expect-error this is required by NodeJS despite being the only reasonable option https://fetch.spec.whatwg.org/#requestinit
Expand All @@ -187,7 +147,7 @@ export function readHTTPMsg (expectRequest: boolean, r: Duplex<Uint8Array | Uint
} else {
if (reqBody === null) {
req = new Request(urlWithHost, {
method,
method: getStringMethod(info.method),
headers
})
} else {
Expand All @@ -211,7 +171,7 @@ export function readHTTPMsg (expectRequest: boolean, r: Duplex<Uint8Array | Uint
offset += parts[i].byteLength
}
req = new Request(urlWithHost, {
method,
method: getStringMethod(info.method),
body,
headers
})
Expand All @@ -223,63 +183,51 @@ export function readHTTPMsg (expectRequest: boolean, r: Duplex<Uint8Array | Uint
msgPromise.reject(err)
})
} else {
let respBody: ReadableStream<Uint8Array> | null = body
if (status === '204') {
let respBody: ReadableStream<Uint8Array> | null = body.readable
if (info.statusCode === 204) {
respBody = null
}
const resp = new Response(respBody, {
headers,
status: parseInt(status),
statusText: reason
status: info.statusCode,
statusText: info.statusMessage
})
msgPromise.resolve(resp)
fulfilledMsgPromise = true
}
} catch (error) {
msgPromise.reject(error)
}
})

// Handle the body
milo.setOnData(parser, (_: unknown, from: number, size: number) => {
const c: Uint8Array = unconsumedChunks.subarray(from, from + size)
// @ts-expect-error Unclear why this fails typecheck. TODO debug
bodyStreamController.enqueue(c)
})
milo.setOnError(parser, () => {
bodyStreamController.error(new Error('Error parsing HTTP message'))
})

let messageComplete = false
milo.setOnMessageComplete(parser, () => {
bodyStreamController.close()
}
parser[HTTPParser.kOnBody] = (buf) => {
writer.write(buf)
.catch((err: Error) => {
msgPromise.reject(err)
})
}
parser[HTTPParser.kOnMessageComplete] = () => {
messageComplete = true
})
writer.close()
.catch((err: Error) => {
msgPromise.reject(err)
})
}

// Consume data
for await (let chunks of r.source) {
if (!isUint8ArrayList(chunks)) {
chunks = new Uint8ArrayList(chunks)
}
for (const chunk of chunks) {
unconsumedChunks.append(chunk)
const buffer = new Uint8Array(milo.memory.buffer, ptr, BUFFER_SIZE)
buffer.set(chunk, 0)
const consumed = milo.parse(parser, ptr, chunk.length)
unconsumedChunks.consume(consumed)
}
for await (const chunks of r.source) {
const chunk = chunks.subarray()
parser.execute(chunk)
}
milo.finish(parser)

parser.finish()

if (!messageComplete) {
bodyStreamController.error(new Error('Incomplete HTTP message'))
await writer.abort(new Error('Incomplete HTTP message'))

if (!fulfilledMsgPromise) {
msgPromise.reject(new Error('Incomplete HTTP message'))
}
}

milo.destroy(parser)
milo.dealloc(ptr, BUFFER_SIZE)
})()
]
}
Expand Down
Loading