diff --git a/examples/serving-websites-from-web-browsers/index.js b/examples/serving-websites-from-web-browsers/index.js index 6cec1a7..a69171a 100644 --- a/examples/serving-websites-from-web-browsers/index.js +++ b/examples/serving-websites-from-web-browsers/index.js @@ -1,7 +1,6 @@ import { noise } from '@chainsafe/libp2p-noise' import { yamux } from '@chainsafe/libp2p-yamux' import { circuitRelayTransport } from '@libp2p/circuit-relay-v2' -import { devToolsMetrics } from '@libp2p/devtools-metrics' import { http } from '@libp2p/http' import { nodeServer } from '@libp2p/http-server' import { createServer } from '@libp2p/http-server/node' @@ -91,8 +90,7 @@ const libp2p = await createLibp2p({ server: nodeServer(server) }), ping: ping() - }, - metrics: devToolsMetrics() + } }) // update listening addresses diff --git a/examples/serving-websites-from-web-browsers/package.json b/examples/serving-websites-from-web-browsers/package.json index 124b6fb..9482683 100644 --- a/examples/serving-websites-from-web-browsers/package.json +++ b/examples/serving-websites-from-web-browsers/package.json @@ -23,7 +23,6 @@ "@chainsafe/libp2p-noise": "^16.1.3", "@chainsafe/libp2p-yamux": "^7.0.1", "@libp2p/circuit-relay-v2": "^3.2.14", - "@libp2p/devtools-metrics": "^1.2.17", "@libp2p/http": "^1.0.0", "@libp2p/http-server": "^1.0.0", "@libp2p/identify": "^3.0.32", diff --git a/packages/http-utils/src/stream-to-socket.ts b/packages/http-utils/src/stream-to-socket.ts index 972fa9f..0fbf5ee 100644 --- a/packages/http-utils/src/stream-to-socket.ts +++ b/packages/http-utils/src/stream-to-socket.ts @@ -1,115 +1,123 @@ import { Duplex } from 'node:stream' import { byteStream } from 'it-byte-stream' import type { Connection, Stream } from '@libp2p/interface' +import type { ByteStream } from 'it-byte-stream' import type { Socket, SocketConnectOpts, AddressInfo, SocketReadyState } from 'node:net' const MAX_TIMEOUT = 2_147_483_647 -class Libp2pSocket extends Duplex { +export class Libp2pSocket extends Duplex { public readonly autoSelectFamilyAttemptedAddresses = [] public readonly connecting = false public readonly pending = false - public readonly remoteAddress: string + public remoteAddress: string public bytesRead: number public bytesWritten: number public timeout = MAX_TIMEOUT public allowHalfOpen: boolean - private readonly stream: Stream + #stream?: Stream + #bytes?: ByteStream - constructor (stream: Stream, connection: Connection) { - const bytes = byteStream(stream) + constructor () { + super() - super({ - write: (chunk, encoding, cb) => { - this.stream.log('write %d bytes', chunk.byteLength) + this.bytesRead = 0 + this.bytesWritten = 0 + this.allowHalfOpen = true + this.remoteAddress = '' + } - this.bytesWritten += chunk.byteLength - bytes.write(chunk) - .then(() => { - cb() - }, err => { - cb(err) + setStream (stream: Stream, connection: Connection): void { + this.#bytes = byteStream(stream) + this.#stream = stream + this.remoteAddress = connection.remoteAddr.toString() + } + + _write (chunk: Uint8Array, encoding: string, cb: (err?: Error) => void): void { + this.#stream?.log('write %d bytes', chunk.byteLength) + + this.bytesWritten += chunk.byteLength + this.#bytes?.write(chunk) + .then(() => { + cb() + }, err => { + cb(err) + }) + } + + _read (size: number): void { + this.#stream?.log('asked to read %d bytes', size) + + void Promise.resolve().then(async () => { + try { + while (true) { + const chunk = await this.#bytes?.read({ + signal: AbortSignal.timeout(this.timeout) }) - }, - read: (size) => { - this.stream.log('asked to read %d bytes', size) - - void Promise.resolve().then(async () => { - try { - while (true) { - const chunk = await bytes.read({ - signal: AbortSignal.timeout(this.timeout) - }) - - if (chunk == null) { - this.stream.log('socket readable end closed') - this.push(null) - return - } - - this.bytesRead += chunk.byteLength - - this.stream.log('socket read %d bytes', chunk.byteLength) - const more = this.push(chunk.subarray()) - - if (!more) { - break - } - } - } catch (err: any) { - this.destroy(err) + + if (chunk == null) { + this.#stream?.log('socket readable end closed') + this.push(null) + return } - }) - }, - destroy: (err, cb) => { - this.stream.log('destroy with %d bytes buffered - %e', this.bufferSize, err) - if (err != null) { - bytes.unwrap().abort(err) - cb() - } else { - bytes.unwrap().close() - .then(() => { - cb() - }) - .catch(err => { - stream.abort(err) - cb(err) - }) - } - }, - final: (cb) => { - this.stream.log('final') + this.bytesRead += chunk.byteLength - bytes.unwrap().closeWrite() - .then(() => { - cb() - }) - .catch(err => { - bytes.unwrap().abort(err) - cb(err) - }) + this.#stream?.log('socket read %d bytes', chunk.byteLength) + const more = this.push(chunk.subarray()) + + if (!more) { + break + } + } + } catch (err: any) { + this.destroy(err) } }) + } - this.stream = stream - this.remoteAddress = connection.remoteAddr.toString() - this.bytesRead = 0 - this.bytesWritten = 0 - this.allowHalfOpen = true + _destroy (err: Error, cb: (err?: Error) => void): void { + this.#stream?.log('destroy with %d bytes buffered - %e', this.bufferSize, err) + + if (err != null) { + this.#bytes?.unwrap().abort(err) + cb() + } else { + this.#bytes?.unwrap().close() + .then(() => { + cb() + }) + .catch(err => { + this.#stream?.abort(err) + cb(err) + }) + } + } + + _final (cb: (err?: Error) => void): void { + this.#stream?.log('final') + + this.#bytes?.unwrap().closeWrite() + .then(() => { + cb() + }) + .catch(err => { + this.#bytes?.unwrap().abort(err) + cb(err) + }) } public get readyState (): SocketReadyState { - if (this.stream.status === 'closed') { + if (this.#stream?.status === 'closed') { return 'closed' } - if (this.stream.writeStatus === 'closed' || this.stream.writeStatus === 'closing') { + if (this.#stream?.writeStatus === 'closed' || this.#stream?.writeStatus === 'closing') { return 'readOnly' } - if (this.stream.readStatus === 'closed' || this.stream.readStatus === 'closing') { + if (this.#stream?.readStatus === 'closed' || this.#stream?.readStatus === 'closing') { return 'writeOnly' } @@ -121,7 +129,7 @@ class Libp2pSocket extends Duplex { } destroySoon (): void { - this.stream.log('destroySoon with %d bytes buffered', this.bufferSize) + this.#stream?.log('destroySoon with %d bytes buffered', this.bufferSize) this.destroy() } @@ -130,24 +138,24 @@ class Libp2pSocket extends Duplex { connect (port: number, connectionListener?: () => void): this connect (path: string, connectionListener?: () => void): this connect (...args: any[]): this { - this.stream.log('connect %o', args) + this.#stream?.log('connect %o', args) return this } setEncoding (encoding?: BufferEncoding): this { - this.stream.log('setEncoding %s', encoding) + this.#stream?.log('setEncoding %s', encoding) return this } resetAndDestroy (): this { - this.stream.log('resetAndDestroy') - this.stream.abort(new Error('Libp2pSocket.resetAndDestroy')) + this.#stream?.log('resetAndDestroy') + this.#stream?.abort(new Error('Libp2pSocket.resetAndDestroy')) return this } setTimeout (timeout: number, callback?: () => void): this { - this.stream.log('setTimeout %d', timeout) + this.#stream?.log('setTimeout %d', timeout) if (callback != null) { this.addListener('timeout', callback) @@ -159,31 +167,31 @@ class Libp2pSocket extends Duplex { } setNoDelay (noDelay?: boolean): this { - this.stream.log('setNoDelay %b', noDelay) + this.#stream?.log('setNoDelay %b', noDelay) return this } setKeepAlive (enable?: boolean, initialDelay?: number): this { - this.stream.log('setKeepAlive %b %d', enable, initialDelay) + this.#stream?.log('setKeepAlive %b %d', enable, initialDelay) return this } address (): AddressInfo | Record { - this.stream.log('address') + this.#stream?.log('address') return {} } unref (): this { - this.stream.log('unref') + this.#stream?.log('unref') return this } ref (): this { - this.stream.log('ref') + this.#stream?.log('ref') return this } @@ -196,5 +204,8 @@ class Libp2pSocket extends Duplex { } export function streamToSocket (stream: Stream, connection: Connection): Socket { - return new Libp2pSocket(stream, connection) + const socket = new Libp2pSocket() + socket.setStream(stream, connection) + + return socket } diff --git a/packages/http/src/http.ts b/packages/http/src/http.ts index 778fc42..c675f1a 100644 --- a/packages/http/src/http.ts +++ b/packages/http/src/http.ts @@ -1,5 +1,5 @@ import { Agent as NodeAgent } from 'node:http' -import { streamToSocket, toResource } from '@libp2p/http-utils' +import { Libp2pSocket, toResource } from '@libp2p/http-utils' import { isPeerId } from '@libp2p/interface' import { Agent as UndiciAgent } from 'undici' import { HTTP_PROTOCOL } from './constants.js' @@ -10,15 +10,27 @@ import type { ConnectionManager, Registrar } from '@libp2p/interface-internal' import type { Multiaddr } from '@multiformats/multiaddr' import type { Agent, AgentOptions } from 'node:http' import type { Socket, TcpNetConnectOpts } from 'node:net' +import type { Duplex } from 'node:stream' import type { Dispatcher } from 'undici' export type { HTTPComponents } from './http.browser.js' -async function createConnection (connectionManager: ConnectionManager, peer: PeerId | Multiaddr | Multiaddr[], options?: AbortOptions): Promise { - const connection = await connectionManager.openConnection(peer, options) - const stream = await connection.newStream(HTTP_PROTOCOL, options) +function createConnection (connectionManager: ConnectionManager, peer: PeerId | Multiaddr | Multiaddr[], options?: AbortOptions): Socket { + const socket = new Libp2pSocket() - return streamToSocket(stream, connection) + Promise.resolve() + .then(async () => { + const connection = await connectionManager.openConnection(peer, options) + const stream = await connection.newStream(HTTP_PROTOCOL, options) + + socket.setStream(stream, connection) + socket.emit('connect') + }) + .catch(err => { + socket.emit('error', err) + }) + + return socket } interface HTTPDispatcherComponents { @@ -34,15 +46,24 @@ export class Libp2pDispatcher extends UndiciAgent { super({ ...init, connect: (options, cb) => { - createConnection(components.connectionManager, init.peer, { + const socket = createConnection(components.connectionManager, init.peer, { // @ts-expect-error types are wonky signal: options.timeout != null ? AbortSignal.timeout(options.timeout) : undefined }) - .then(socket => { - cb(null, socket) - }, err => { - cb(err, null) - }) + + const onConnect = (): void => { + socket.removeListener('error', onError) + socket.removeListener('connect', onConnect) + cb(null, socket) + } + const onError = (err: Error): void => { + socket.removeListener('error', onError) + socket.removeListener('connect', onConnect) + cb(err, null) + } + + socket.addListener('connect', onConnect) + socket.addListener('error', onError) } }) } @@ -67,14 +88,24 @@ class Libp2pAgent extends NodeAgent { this.peer = init.peer } - // @ts-expect-error types are wrong - createConnection (options: TcpNetConnectOpts, cb: (err?: Error, socket?: Socket) => void): void { - createConnection(this.components.connectionManager, this.peer, options) - .then(socket => { - cb(undefined, socket) - }, err => { - cb(err) - }) + createConnection (options: TcpNetConnectOpts, cb: (err: Error | null, socket: Duplex) => void): Duplex { + const socket = createConnection(this.components.connectionManager, this.peer, options) + + const onConnect = (): void => { + socket.removeListener('error', onError) + socket.removeListener('connect', onConnect) + cb(null, socket) + } + const onError = (err: Error): void => { + socket.removeListener('error', onError) + socket.removeListener('connect', onConnect) + cb(err, socket) + } + + socket.addListener('connect', onConnect) + socket.addListener('error', onError) + + return socket } } @@ -89,7 +120,6 @@ export class HTTP extends HTTPBrowser implements HTTPInterface { return new NodeAgent(options) } - // @ts-expect-error types are wrong return new Libp2pAgent(this.components, { ...options, peer