From f439d9b589a0a6544b61aca3736e920943ce38b5 Mon Sep 17 00:00:00 2001 From: Alex Potsides Date: Thu, 11 Aug 2022 13:21:04 +0100 Subject: [PATCH] deps!: update all deps to support no-copy operations (#1335) Updates all deps needed to support passing lists of byte arrays where they have been created from multiple input buffers. When reading multiplexed data, all messages arrive in length-prefixed buffers, which means the first few bytes tell the consumer how many bytes long next chunk will be. One length prefixed chunk can be delivered in several payloads from the underlying network transport. The first payload can also include the length prefix and some or all of the data, so we stitch these together in a `Uint8ArrayList` to avoid having to concatenate `Uint8Array`s together. Previously once we'd received enough bytes to satisfy the length prefix we'd concatenate the bytes together, but this is a potentially expensive operation where transports have small message sizes so instead just pass the `Uint8ArrayList` to the consumer and let them decide wether to concatenate or not as some consumers will be smart enough to operate on lists of `Uint8Array`s instead of always requiring a contiguous block of memory. BREAKING CHANGE: Streams are now `Duplex` --- .aegir.js | 4 +- examples/connection-encryption/1.js | 2 +- examples/delegated-routing/package.json | 6 +- examples/echo/src/dialer.js | 2 +- examples/libp2p-in-the-browser/package.json | 4 +- examples/pnet/index.js | 2 +- examples/protocol-and-stream-muxing/1.js | 2 +- examples/protocol-and-stream-muxing/2.js | 2 +- examples/protocol-and-stream-muxing/3.js | 4 +- examples/transports/2.js | 5 + examples/transports/3.js | 2 +- examples/transports/4.js | 2 +- examples/webrtc-direct/package.json | 4 +- package.json | 74 +++++----- src/circuit/README.md | 4 +- src/circuit/circuit/hop.ts | 8 +- src/circuit/circuit/stop.ts | 3 +- src/circuit/circuit/stream-handler.ts | 4 +- src/circuit/pb/index.ts | 149 +++++++++++++++++--- src/circuit/transport.ts | 6 +- src/fetch/pb/proto.ts | 121 ++++++++++++++-- src/identify/index.ts | 7 +- src/identify/pb/message.ts | 123 ++++++++++++++-- src/insecure/index.ts | 9 +- src/insecure/pb/proto.ts | 124 ++++++++++++++-- src/metrics/index.ts | 11 +- src/ping/index.ts | 2 +- src/pnet/index.ts | 2 + src/upgrader.ts | 34 ++--- test/configuration/utils.ts | 9 +- test/core/consume-peer-record.spec.ts | 4 +- test/core/encryption.spec.ts | 4 +- test/core/get-public-key.spec.ts | 4 +- test/core/listening.node.ts | 4 +- test/dialing/direct.node.ts | 22 +-- test/dialing/direct.spec.ts | 20 +-- test/fetch/fetch.node.ts | 4 +- test/metrics/index.node.ts | 3 +- test/transports/transport-manager.spec.ts | 8 +- test/upgrading/upgrader.spec.ts | 22 +-- 40 files changed, 626 insertions(+), 200 deletions(-) diff --git a/.aegir.js b/.aegir.js index 66bccedf2f..c0a745f8e1 100644 --- a/.aegir.js +++ b/.aegir.js @@ -1,6 +1,6 @@ import { WebSockets } from '@libp2p/websockets' import { Mplex } from '@libp2p/mplex' -import { NOISE } from '@chainsafe/libp2p-noise' +import { Noise } from '@chainsafe/libp2p-noise' import { pipe } from 'it-pipe' import { createFromJSON } from '@libp2p/peer-id-factory' @@ -31,7 +31,7 @@ export default { new Mplex() ], connectionEncryption: [ - NOISE, + new Noise(), new Plaintext() ], relay: { diff --git a/examples/connection-encryption/1.js b/examples/connection-encryption/1.js index 0e774fcb99..4f03ff6d57 100644 --- a/examples/connection-encryption/1.js +++ b/examples/connection-encryption/1.js @@ -34,7 +34,7 @@ const createNode = async () => { stream, async function (source) { for await (const msg of source) { - console.log(uint8ArrayToString(msg)) + console.log(uint8ArrayToString(msg.subarray())) } } ) diff --git a/examples/delegated-routing/package.json b/examples/delegated-routing/package.json index 04ff5e32e3..8c7d430290 100644 --- a/examples/delegated-routing/package.json +++ b/examples/delegated-routing/package.json @@ -3,13 +3,13 @@ "version": "0.1.0", "private": true, "dependencies": { - "@chainsafe/libp2p-noise": "^7.0.1", - "ipfs-core": "^0.14.1", + "@chainsafe/libp2p-noise": "^8.0.0", + "ipfs-core": "^0.15.4", "libp2p": "../../", "@libp2p/delegated-content-routing": "^2.0.1", "@libp2p/delegated-peer-routing": "^2.0.1", "@libp2p/kad-dht": "^3.0.0", - "@libp2p/mplex": "^4.0.2", + "@libp2p/mplex": "^5.0.0", "@libp2p/webrtc-star": "^3.0.0", "@libp2p/websockets": "^3.0.0", "react": "^17.0.2", diff --git a/examples/echo/src/dialer.js b/examples/echo/src/dialer.js index 435a9ea220..0982e69efe 100644 --- a/examples/echo/src/dialer.js +++ b/examples/echo/src/dialer.js @@ -51,7 +51,7 @@ async function run() { // For each chunk of data for await (const data of source) { // Output the data - console.log('received echo:', uint8ArrayToString(data)) + console.log('received echo:', uint8ArrayToString(data.subarray())) } } ) diff --git a/examples/libp2p-in-the-browser/package.json b/examples/libp2p-in-the-browser/package.json index 57e4fe1a68..f53ae096a1 100644 --- a/examples/libp2p-in-the-browser/package.json +++ b/examples/libp2p-in-the-browser/package.json @@ -9,9 +9,9 @@ }, "license": "ISC", "dependencies": { - "@chainsafe/libp2p-noise": "^7.0.1", + "@chainsafe/libp2p-noise": "^8.0.0", "@libp2p/bootstrap": "^2.0.0", - "@libp2p/mplex": "^4.0.2", + "@libp2p/mplex": "^5.0.0", "@libp2p/webrtc-star": "^3.0.0", "@libp2p/websockets": "^3.0.0", "libp2p": "../../" diff --git a/examples/pnet/index.js b/examples/pnet/index.js index fa8268df84..6d3adf0a1d 100644 --- a/examples/pnet/index.js +++ b/examples/pnet/index.js @@ -37,7 +37,7 @@ generateKey(otherSwarmKey) stream, async function (source) { for await (const msg of source) { - console.log(uint8ArrayToString(msg)) + console.log(uint8ArrayToString(msg.subarray())) } } ) diff --git a/examples/protocol-and-stream-muxing/1.js b/examples/protocol-and-stream-muxing/1.js index f1ab2f20df..024aaaab80 100644 --- a/examples/protocol-and-stream-muxing/1.js +++ b/examples/protocol-and-stream-muxing/1.js @@ -36,7 +36,7 @@ const createNode = async () => { stream, async function (source) { for await (const msg of source) { - console.log(uint8ArrayToString(msg)) + console.log(uint8ArrayToString(msg.subarray())) } } ) diff --git a/examples/protocol-and-stream-muxing/2.js b/examples/protocol-and-stream-muxing/2.js index 2605938d20..0b7a332bf6 100644 --- a/examples/protocol-and-stream-muxing/2.js +++ b/examples/protocol-and-stream-muxing/2.js @@ -35,7 +35,7 @@ const createNode = async () => { stream, async function (source) { for await (const msg of source) { - console.log(`from: ${protocol}, msg: ${uint8ArrayToString(msg)}`) + console.log(`from: ${protocol}, msg: ${uint8ArrayToString(msg.subarray())}`) } } ).finally(() => { diff --git a/examples/protocol-and-stream-muxing/3.js b/examples/protocol-and-stream-muxing/3.js index af63bdea8e..84de6b1b25 100644 --- a/examples/protocol-and-stream-muxing/3.js +++ b/examples/protocol-and-stream-muxing/3.js @@ -37,7 +37,7 @@ const createNode = async () => { stream, async function (source) { for await (const msg of source) { - console.log(uint8ArrayToString(msg)) + console.log(uint8ArrayToString(msg.subarray())) } } ) @@ -48,7 +48,7 @@ const createNode = async () => { stream, async function (source) { for await (const msg of source) { - console.log(uint8ArrayToString(msg)) + console.log(uint8ArrayToString(msg.subarray())) } } ) diff --git a/examples/transports/2.js b/examples/transports/2.js index 9dee88780c..2808c82049 100644 --- a/examples/transports/2.js +++ b/examples/transports/2.js @@ -42,6 +42,11 @@ function printAddrs (node, number) { node2.handle('/print', async ({ stream }) => { const result = await pipe( stream, + async function * (source) { + for await (const list of source) { + yield list.subarray() + } + }, toBuffer ) console.log(uint8ArrayToString(result)) diff --git a/examples/transports/3.js b/examples/transports/3.js index 26abf36e98..980570c7ef 100644 --- a/examples/transports/3.js +++ b/examples/transports/3.js @@ -37,7 +37,7 @@ function print ({ stream }) { stream, async function (source) { for await (const msg of source) { - console.log(uint8ArrayToString(msg)) + console.log(uint8ArrayToString(msg.subarray())) } } ) diff --git a/examples/transports/4.js b/examples/transports/4.js index 0e13c569a2..ad35181e75 100644 --- a/examples/transports/4.js +++ b/examples/transports/4.js @@ -57,7 +57,7 @@ function print ({ stream }) { stream, async function (source) { for await (const msg of source) { - console.log(uint8ArrayToString(msg)) + console.log(uint8ArrayToString(msg.subarray())) } } ) diff --git a/examples/webrtc-direct/package.json b/examples/webrtc-direct/package.json index 664488dd28..d5be295f4a 100644 --- a/examples/webrtc-direct/package.json +++ b/examples/webrtc-direct/package.json @@ -10,9 +10,9 @@ "license": "ISC", "dependencies": { "@libp2p/webrtc-direct": "^2.0.0", - "@chainsafe/libp2p-noise": "^7.0.3", + "@chainsafe/libp2p-noise": "^8.0.0", "@libp2p/bootstrap": "^2.0.0", - "@libp2p/mplex": "^4.0.3", + "@libp2p/mplex": "^5.0.0", "libp2p": "../../", "wrtc": "^0.4.7" }, diff --git a/package.json b/package.json index a8071c1ec5..1e31ab23b3 100644 --- a/package.json +++ b/package.json @@ -98,36 +98,36 @@ }, "dependencies": { "@achingbrain/nat-port-mapper": "^1.0.3", - "@libp2p/components": "^2.0.1", - "@libp2p/connection": "^4.0.0", - "@libp2p/crypto": "^1.0.0", - "@libp2p/interface-address-manager": "^1.0.1", - "@libp2p/interface-connection": "^2.0.0", - "@libp2p/interface-connection-encrypter": "^1.0.2", - "@libp2p/interface-content-routing": "^1.0.1", - "@libp2p/interface-dht": "^1.0.0", - "@libp2p/interface-metrics": "^2.0.0", - "@libp2p/interface-peer-discovery": "^1.0.0", - "@libp2p/interface-peer-id": "^1.0.2", - "@libp2p/interface-peer-info": "^1.0.1", - "@libp2p/interface-peer-routing": "^1.0.0", - "@libp2p/interface-peer-store": "^1.2.0", - "@libp2p/interface-pubsub": "^2.0.0", - "@libp2p/interface-registrar": "^2.0.0", - "@libp2p/interface-stream-muxer": "^2.0.1", - "@libp2p/interface-transport": "^1.0.0", - "@libp2p/interfaces": "^3.0.2", + "@libp2p/components": "^2.0.3", + "@libp2p/connection": "^4.0.1", + "@libp2p/crypto": "^1.0.3", + "@libp2p/interface-address-manager": "^1.0.2", + "@libp2p/interface-connection": "^3.0.1", + "@libp2p/interface-connection-encrypter": "^2.0.1", + "@libp2p/interface-content-routing": "^1.0.2", + "@libp2p/interface-dht": "^1.0.1", + "@libp2p/interface-metrics": "^3.0.0", + "@libp2p/interface-peer-discovery": "^1.0.1", + "@libp2p/interface-peer-id": "^1.0.4", + "@libp2p/interface-peer-info": "^1.0.2", + "@libp2p/interface-peer-routing": "^1.0.1", + "@libp2p/interface-peer-store": "^1.2.1", + "@libp2p/interface-pubsub": "^2.0.1", + "@libp2p/interface-registrar": "^2.0.3", + "@libp2p/interface-stream-muxer": "^2.0.2", + "@libp2p/interface-transport": "^1.0.3", + "@libp2p/interfaces": "^3.0.3", "@libp2p/logger": "^2.0.0", - "@libp2p/multistream-select": "^2.0.1", + "@libp2p/multistream-select": "^3.0.0", "@libp2p/peer-collections": "^2.0.0", - "@libp2p/peer-id": "^1.1.10", - "@libp2p/peer-id-factory": "^1.0.9", - "@libp2p/peer-record": "^4.0.0", - "@libp2p/peer-store": "^3.0.0", + "@libp2p/peer-id": "^1.1.15", + "@libp2p/peer-id-factory": "^1.0.18", + "@libp2p/peer-record": "^4.0.1", + "@libp2p/peer-store": "^3.1.2", "@libp2p/tracked-map": "^2.0.1", - "@libp2p/utils": "^3.0.0", + "@libp2p/utils": "^3.0.1", "@multiformats/mafmt": "^11.0.2", - "@multiformats/multiaddr": "^10.1.8", + "@multiformats/multiaddr": "^10.3.3", "abortable-iterator": "^4.0.2", "any-signal": "^3.0.0", "datastore-core": "^7.0.0", @@ -140,7 +140,7 @@ "it-filter": "^1.0.3", "it-first": "^1.0.6", "it-foreach": "^0.1.1", - "it-handshake": "^4.0.0", + "it-handshake": "^4.1.2", "it-length-prefixed": "^8.0.2", "it-map": "^1.0.6", "it-merge": "^1.0.3", @@ -156,31 +156,31 @@ "p-retry": "^5.0.0", "p-settle": "^5.0.0", "private-ip": "^2.3.3", - "protons-runtime": "^2.0.2", + "protons-runtime": "^3.0.1", "retimer": "^3.0.0", "sanitize-filename": "^1.6.3", "set-delayed-interval": "^1.0.0", "timeout-abort-controller": "^3.0.0", - "uint8arraylist": "^2.0.0", + "uint8arraylist": "^2.3.2", "uint8arrays": "^3.0.0", "wherearewe": "^1.0.0", "xsalsa20": "^1.1.0" }, "devDependencies": { - "@chainsafe/libp2p-noise": "^7.0.2", + "@chainsafe/libp2p-noise": "^8.0.0", "@libp2p/bootstrap": "^2.0.0", - "@libp2p/daemon-client": "^2.0.0", - "@libp2p/daemon-server": "^2.0.0", + "@libp2p/daemon-client": "^2.0.4", + "@libp2p/daemon-server": "^2.0.4", "@libp2p/delegated-content-routing": "^2.0.1", "@libp2p/delegated-peer-routing": "^2.0.1", "@libp2p/floodsub": "^3.0.0", "@libp2p/interface-compliance-tests": "^3.0.1", - "@libp2p/interface-connection-encrypter-compliance-tests": "^1.0.0", - "@libp2p/interface-mocks": "^3.0.1", + "@libp2p/interface-connection-encrypter-compliance-tests": "^2.0.1", + "@libp2p/interface-mocks": "^4.0.1", "@libp2p/interop": "^2.0.0", - "@libp2p/kad-dht": "^3.0.0", + "@libp2p/kad-dht": "^3.0.1", "@libp2p/mdns": "^3.0.0", - "@libp2p/mplex": "^4.0.2", + "@libp2p/mplex": "^5.0.0", "@libp2p/pubsub": "^3.0.1", "@libp2p/tcp": "^3.0.0", "@libp2p/topology": "^3.0.0", @@ -205,7 +205,7 @@ "p-event": "^5.0.1", "p-times": "^4.0.0", "p-wait-for": "^5.0.0", - "protons": "^4.0.1", + "protons": "^5.0.0", "rimraf": "^3.0.2", "sinon": "^14.0.0", "ts-sinon": "^2.0.2" diff --git a/src/circuit/README.md b/src/circuit/README.md index cbf1dd1dc4..330df42a12 100644 --- a/src/circuit/README.md +++ b/src/circuit/README.md @@ -41,7 +41,7 @@ import { Multiaddr } from '@multiformats/multiaddr' import Libp2p from 'libp2p' import { TCP } from '@libp2p/tcp' import { Mplex } from '@libp2p/mplex' -import { NOISE } from '@chainsafe/libp2p-noise' +import { Noise } from '@chainsafe/libp2p-noise' const relayAddr = ... @@ -56,7 +56,7 @@ const node = await createLibp2p({ new Mplex() ], connectionEncryption: [ - NOISE + new Noise() ] }, config: { diff --git a/src/circuit/circuit/hop.ts b/src/circuit/circuit/hop.ts index a7fd38850e..1ec63e0caf 100644 --- a/src/circuit/circuit/hop.ts +++ b/src/circuit/circuit/hop.ts @@ -13,6 +13,7 @@ import type { Duplex } from 'it-stream-types' import type { Circuit } from '../transport.js' import type { ConnectionManager } from '@libp2p/interface-connection-manager' import type { AbortOptions } from '@libp2p/interfaces' +import type { Uint8ArrayList } from 'uint8arraylist' const log = logger('libp2p:circuit:hop') @@ -24,7 +25,7 @@ export interface HopRequest { connectionManager: ConnectionManager } -export async function handleHop (hopRequest: HopRequest) { +export async function handleHop (hopRequest: HopRequest): Promise { const { connection, request, @@ -84,7 +85,7 @@ export async function handleHop (hopRequest: HopRequest) { srcPeer: request.srcPeer } - let destinationStream: Duplex + let destinationStream: Duplex try { log('performing STOP request') const result = await stop({ @@ -128,7 +129,7 @@ export interface HopConfig extends AbortOptions { * Performs a HOP request to a relay peer, to request a connection to another * peer. A new, virtual, connection will be created between the two via the relay. */ -export async function hop (options: HopConfig): Promise> { +export async function hop (options: HopConfig): Promise> { const { connection, request, @@ -151,6 +152,7 @@ export async function hop (options: HopConfig): Promise> { if (response.code === CircuitPB.Status.SUCCESS) { log('hop request was successful') + return streamHandler.rest() } diff --git a/src/circuit/circuit/stop.ts b/src/circuit/circuit/stop.ts index 1ad8f8e0f1..2e27d010fe 100644 --- a/src/circuit/circuit/stop.ts +++ b/src/circuit/circuit/stop.ts @@ -6,6 +6,7 @@ import { validateAddrs } from './utils.js' import type { Connection } from '@libp2p/interface-connection' import type { Duplex } from 'it-stream-types' import type { AbortOptions } from '@libp2p/interfaces' +import type { Uint8ArrayList } from 'uint8arraylist' const log = logger('libp2p:circuit:stop') @@ -18,7 +19,7 @@ export interface HandleStopOptions { /** * Handles incoming STOP requests */ -export function handleStop (options: HandleStopOptions): Duplex | undefined { +export function handleStop (options: HandleStopOptions): Duplex | undefined { const { connection, request, diff --git a/src/circuit/circuit/stream-handler.ts b/src/circuit/circuit/stream-handler.ts index 0935d5d689..0638733f4e 100644 --- a/src/circuit/circuit/stream-handler.ts +++ b/src/circuit/circuit/stream-handler.ts @@ -22,7 +22,7 @@ export interface StreamHandlerOptions { export class StreamHandler { private readonly stream: Stream - private readonly shake: Handshake + private readonly shake: Handshake private readonly decoder: Source constructor (options: StreamHandlerOptions) { @@ -56,7 +56,7 @@ export class StreamHandler { */ write (msg: CircuitRelay) { log('write message type %s', msg.type) - this.shake.write(lp.encode.single(CircuitRelay.encode(msg)).slice()) + this.shake.write(lp.encode.single(CircuitRelay.encode(msg))) } /** diff --git a/src/circuit/pb/index.ts b/src/circuit/pb/index.ts index ec47246aab..44b0127cec 100644 --- a/src/circuit/pb/index.ts +++ b/src/circuit/pb/index.ts @@ -1,9 +1,9 @@ /* eslint-disable import/export */ /* eslint-disable @typescript-eslint/no-namespace */ -import { enumeration, encodeMessage, decodeMessage, message, bytes } from 'protons-runtime' -import type { Codec } from 'protons-runtime' +import { enumeration, encodeMessage, decodeMessage, message } from 'protons-runtime' import type { Uint8ArrayList } from 'uint8arraylist' +import type { Codec } from 'protons-runtime' export interface CircuitRelay { type?: CircuitRelay.Type @@ -53,7 +53,7 @@ export namespace CircuitRelay { export namespace Status { export const codec = () => { - return enumeration(__StatusValues) + return enumeration(__StatusValues) } } @@ -73,7 +73,7 @@ export namespace CircuitRelay { export namespace Type { export const codec = () => { - return enumeration(__TypeValues) + return enumeration(__TypeValues) } } @@ -83,14 +83,74 @@ export namespace CircuitRelay { } export namespace Peer { + let _codec: Codec + export const codec = (): Codec => { - return message({ - 1: { name: 'id', codec: bytes }, - 2: { name: 'addrs', codec: bytes, repeats: true } - }) + if (_codec == null) { + _codec = message((obj, writer, opts = {}) => { + if (opts.lengthDelimited !== false) { + writer.fork() + } + + if (obj.id != null) { + writer.uint32(10) + writer.bytes(obj.id) + } else { + throw new Error('Protocol error: required field "id" was not found in object') + } + + if (obj.addrs != null) { + for (const value of obj.addrs) { + writer.uint32(18) + writer.bytes(value) + } + } else { + throw new Error('Protocol error: required field "addrs" was not found in object') + } + + if (opts.lengthDelimited !== false) { + writer.ldelim() + } + }, (reader, length) => { + const obj: any = {} + + const end = length == null ? reader.len : reader.pos + length + + while (reader.pos < end) { + const tag = reader.uint32() + + switch (tag >>> 3) { + case 1: + obj.id = reader.bytes() + break + case 2: + obj.addrs = obj.addrs ?? [] + obj.addrs.push(reader.bytes()) + break + default: + reader.skipType(tag & 7) + break + } + } + + obj.addrs = obj.addrs ?? [] + + if (obj.id == null) { + throw new Error('Protocol error: value for required field "id" was not found in protobuf') + } + + if (obj.addrs == null) { + throw new Error('Protocol error: value for required field "addrs" was not found in protobuf') + } + + return obj + }) + } + + return _codec } - export const encode = (obj: Peer): Uint8ArrayList => { + export const encode = (obj: Peer): Uint8Array => { return encodeMessage(obj, Peer.codec()) } @@ -99,16 +159,73 @@ export namespace CircuitRelay { } } + let _codec: Codec + export const codec = (): Codec => { - return message({ - 1: { name: 'type', codec: CircuitRelay.Type.codec(), optional: true }, - 2: { name: 'srcPeer', codec: CircuitRelay.Peer.codec(), optional: true }, - 3: { name: 'dstPeer', codec: CircuitRelay.Peer.codec(), optional: true }, - 4: { name: 'code', codec: CircuitRelay.Status.codec(), optional: true } - }) + if (_codec == null) { + _codec = message((obj, writer, opts = {}) => { + if (opts.lengthDelimited !== false) { + writer.fork() + } + + if (obj.type != null) { + writer.uint32(8) + CircuitRelay.Type.codec().encode(obj.type, writer) + } + + if (obj.srcPeer != null) { + writer.uint32(18) + CircuitRelay.Peer.codec().encode(obj.srcPeer, writer) + } + + if (obj.dstPeer != null) { + writer.uint32(26) + CircuitRelay.Peer.codec().encode(obj.dstPeer, writer) + } + + if (obj.code != null) { + writer.uint32(32) + CircuitRelay.Status.codec().encode(obj.code, writer) + } + + if (opts.lengthDelimited !== false) { + writer.ldelim() + } + }, (reader, length) => { + const obj: any = {} + + const end = length == null ? reader.len : reader.pos + length + + while (reader.pos < end) { + const tag = reader.uint32() + + switch (tag >>> 3) { + case 1: + obj.type = CircuitRelay.Type.codec().decode(reader) + break + case 2: + obj.srcPeer = CircuitRelay.Peer.codec().decode(reader, reader.uint32()) + break + case 3: + obj.dstPeer = CircuitRelay.Peer.codec().decode(reader, reader.uint32()) + break + case 4: + obj.code = CircuitRelay.Status.codec().decode(reader) + break + default: + reader.skipType(tag & 7) + break + } + } + + return obj + }) + } + + return _codec } - export const encode = (obj: CircuitRelay): Uint8ArrayList => { + export const encode = (obj: CircuitRelay): Uint8Array => { return encodeMessage(obj, CircuitRelay.codec()) } diff --git a/src/circuit/transport.ts b/src/circuit/transport.ts index 5c87bd2e3a..eaa0f4a988 100644 --- a/src/circuit/transport.ts +++ b/src/circuit/transport.ts @@ -21,6 +21,8 @@ import type { RelayConfig } from '../index.js' import { abortableDuplex } from 'abortable-iterator' import { TimeoutController } from 'timeout-abort-controller' import { setMaxListeners } from 'events' +import type { Uint8ArrayList } from 'uint8arraylist' +import type { Duplex } from 'it-stream-types' const log = logger('libp2p:circuit') @@ -90,7 +92,7 @@ export class Circuit implements Transport, Initializable { return } - let virtualConnection + let virtualConnection: Duplex | undefined switch (request.type) { case CircuitPB.Type.CAN_HOP: { @@ -100,7 +102,7 @@ export class Circuit implements Transport, Initializable { } case CircuitPB.Type.HOP: { log('received HOP request from %p', connection.remotePeer) - virtualConnection = await handleHop({ + await handleHop({ connection, request, streamHandler, diff --git a/src/fetch/pb/proto.ts b/src/fetch/pb/proto.ts index 608559358b..d4997ea4fc 100644 --- a/src/fetch/pb/proto.ts +++ b/src/fetch/pb/proto.ts @@ -1,22 +1,64 @@ /* eslint-disable import/export */ /* eslint-disable @typescript-eslint/no-namespace */ -import { encodeMessage, decodeMessage, message, string, enumeration, bytes } from 'protons-runtime' -import type { Codec } from 'protons-runtime' +import { encodeMessage, decodeMessage, message, enumeration } from 'protons-runtime' import type { Uint8ArrayList } from 'uint8arraylist' +import type { Codec } from 'protons-runtime' export interface FetchRequest { identifier: string } export namespace FetchRequest { + let _codec: Codec + export const codec = (): Codec => { - return message({ - 1: { name: 'identifier', codec: string } - }) + if (_codec == null) { + _codec = message((obj, writer, opts = {}) => { + if (opts.lengthDelimited !== false) { + writer.fork() + } + + if (obj.identifier != null) { + writer.uint32(10) + writer.string(obj.identifier) + } else { + throw new Error('Protocol error: required field "identifier" was not found in object') + } + + if (opts.lengthDelimited !== false) { + writer.ldelim() + } + }, (reader, length) => { + const obj: any = {} + + const end = length == null ? reader.len : reader.pos + length + + while (reader.pos < end) { + const tag = reader.uint32() + + switch (tag >>> 3) { + case 1: + obj.identifier = reader.string() + break + default: + reader.skipType(tag & 7) + break + } + } + + if (obj.identifier == null) { + throw new Error('Protocol error: value for required field "identifier" was not found in protobuf') + } + + return obj + }) + } + + return _codec } - export const encode = (obj: FetchRequest): Uint8ArrayList => { + export const encode = (obj: FetchRequest): Uint8Array => { return encodeMessage(obj, FetchRequest.codec()) } @@ -45,18 +87,73 @@ export namespace FetchResponse { export namespace StatusCode { export const codec = () => { - return enumeration(__StatusCodeValues) + return enumeration(__StatusCodeValues) } } + let _codec: Codec + export const codec = (): Codec => { - return message({ - 1: { name: 'status', codec: FetchResponse.StatusCode.codec() }, - 2: { name: 'data', codec: bytes } - }) + if (_codec == null) { + _codec = message((obj, writer, opts = {}) => { + if (opts.lengthDelimited !== false) { + writer.fork() + } + + if (obj.status != null) { + writer.uint32(8) + FetchResponse.StatusCode.codec().encode(obj.status, writer) + } else { + throw new Error('Protocol error: required field "status" was not found in object') + } + + if (obj.data != null) { + writer.uint32(18) + writer.bytes(obj.data) + } else { + throw new Error('Protocol error: required field "data" was not found in object') + } + + if (opts.lengthDelimited !== false) { + writer.ldelim() + } + }, (reader, length) => { + const obj: any = {} + + const end = length == null ? reader.len : reader.pos + length + + while (reader.pos < end) { + const tag = reader.uint32() + + switch (tag >>> 3) { + case 1: + obj.status = FetchResponse.StatusCode.codec().decode(reader) + break + case 2: + obj.data = reader.bytes() + break + default: + reader.skipType(tag & 7) + break + } + } + + if (obj.status == null) { + throw new Error('Protocol error: value for required field "status" was not found in protobuf') + } + + if (obj.data == null) { + throw new Error('Protocol error: value for required field "data" was not found in protobuf') + } + + return obj + }) + } + + return _codec } - export const encode = (obj: FetchResponse): Uint8ArrayList => { + export const encode = (obj: FetchResponse): Uint8Array => { return encodeMessage(obj, FetchResponse.codec()) } diff --git a/src/identify/index.ts b/src/identify/index.ts index ba82aa8178..a0f051423b 100644 --- a/src/identify/index.ts +++ b/src/identify/index.ts @@ -26,7 +26,6 @@ import type { Components } from '@libp2p/components' import { TimeoutController } from 'timeout-abort-controller' import type { AbortOptions } from '@libp2p/interfaces' import { abortableDuplex } from 'abortable-iterator' -import type { Duplex } from 'it-stream-types' import { setMaxListeners } from 'events' const log = logger('libp2p:identify') @@ -179,7 +178,7 @@ export class IdentifyService implements Startable { }) // make stream abortable - const source: Duplex = abortableDuplex(stream, timeoutController.signal) + const source = abortableDuplex(stream, timeoutController.signal) await pipe( [Identify.encode({ @@ -418,7 +417,7 @@ export class IdentifyService implements Startable { }) // make stream abortable - const source: Duplex = abortableDuplex(stream, timeoutController.signal) + const source = abortableDuplex(stream, timeoutController.signal) await pipe( [message], @@ -449,7 +448,7 @@ export class IdentifyService implements Startable { let message: Identify | undefined try { // make stream abortable - const source: Duplex = abortableDuplex(stream, timeoutController.signal) + const source = abortableDuplex(stream, timeoutController.signal) const data = await pipe( [], diff --git a/src/identify/pb/message.ts b/src/identify/pb/message.ts index 1f9072d677..882b5b67a2 100644 --- a/src/identify/pb/message.ts +++ b/src/identify/pb/message.ts @@ -1,9 +1,9 @@ /* eslint-disable import/export */ /* eslint-disable @typescript-eslint/no-namespace */ -import { encodeMessage, decodeMessage, message, string, bytes } from 'protons-runtime' -import type { Codec } from 'protons-runtime' +import { encodeMessage, decodeMessage, message } from 'protons-runtime' import type { Uint8ArrayList } from 'uint8arraylist' +import type { Codec } from 'protons-runtime' export interface Identify { protocolVersion?: string @@ -16,19 +16,118 @@ export interface Identify { } export namespace Identify { + let _codec: Codec + export const codec = (): Codec => { - return message({ - 5: { name: 'protocolVersion', codec: string, optional: true }, - 6: { name: 'agentVersion', codec: string, optional: true }, - 1: { name: 'publicKey', codec: bytes, optional: true }, - 2: { name: 'listenAddrs', codec: bytes, repeats: true }, - 4: { name: 'observedAddr', codec: bytes, optional: true }, - 3: { name: 'protocols', codec: string, repeats: true }, - 8: { name: 'signedPeerRecord', codec: bytes, optional: true } - }) + if (_codec == null) { + _codec = message((obj, writer, opts = {}) => { + if (opts.lengthDelimited !== false) { + writer.fork() + } + + if (obj.protocolVersion != null) { + writer.uint32(42) + writer.string(obj.protocolVersion) + } + + if (obj.agentVersion != null) { + writer.uint32(50) + writer.string(obj.agentVersion) + } + + if (obj.publicKey != null) { + writer.uint32(10) + writer.bytes(obj.publicKey) + } + + if (obj.listenAddrs != null) { + for (const value of obj.listenAddrs) { + writer.uint32(18) + writer.bytes(value) + } + } else { + throw new Error('Protocol error: required field "listenAddrs" was not found in object') + } + + if (obj.observedAddr != null) { + writer.uint32(34) + writer.bytes(obj.observedAddr) + } + + if (obj.protocols != null) { + for (const value of obj.protocols) { + writer.uint32(26) + writer.string(value) + } + } else { + throw new Error('Protocol error: required field "protocols" was not found in object') + } + + if (obj.signedPeerRecord != null) { + writer.uint32(66) + writer.bytes(obj.signedPeerRecord) + } + + if (opts.lengthDelimited !== false) { + writer.ldelim() + } + }, (reader, length) => { + const obj: any = {} + + const end = length == null ? reader.len : reader.pos + length + + while (reader.pos < end) { + const tag = reader.uint32() + + switch (tag >>> 3) { + case 5: + obj.protocolVersion = reader.string() + break + case 6: + obj.agentVersion = reader.string() + break + case 1: + obj.publicKey = reader.bytes() + break + case 2: + obj.listenAddrs = obj.listenAddrs ?? [] + obj.listenAddrs.push(reader.bytes()) + break + case 4: + obj.observedAddr = reader.bytes() + break + case 3: + obj.protocols = obj.protocols ?? [] + obj.protocols.push(reader.string()) + break + case 8: + obj.signedPeerRecord = reader.bytes() + break + default: + reader.skipType(tag & 7) + break + } + } + + obj.listenAddrs = obj.listenAddrs ?? [] + obj.protocols = obj.protocols ?? [] + + if (obj.listenAddrs == null) { + throw new Error('Protocol error: value for required field "listenAddrs" was not found in protobuf') + } + + if (obj.protocols == null) { + throw new Error('Protocol error: value for required field "protocols" was not found in protobuf') + } + + return obj + }) + } + + return _codec } - export const encode = (obj: Identify): Uint8ArrayList => { + export const encode = (obj: Identify): Uint8Array => { return encodeMessage(obj, Identify.codec()) } diff --git a/src/insecure/index.ts b/src/insecure/index.ts index 3ffd6618be..d773415666 100644 --- a/src/insecure/index.ts +++ b/src/insecure/index.ts @@ -7,6 +7,7 @@ import type { PeerId } from '@libp2p/interface-peer-id' import { peerIdFromBytes, peerIdFromKeys } from '@libp2p/peer-id' import type { ConnectionEncrypter, SecuredConnection } from '@libp2p/interface-connection-encrypter' import type { Duplex } from 'it-stream-types' +import map from 'it-map' const log = logger('libp2p:plaintext') const PROTOCOL = '/plaintext/2.0.0' @@ -47,7 +48,7 @@ async function encrypt (localId: PeerId, conn: Duplex, remoteId?: Pe // Get the Exchange message // @ts-expect-error needs to be generator const response = (await lp.decode.fromReader(shake.reader).next()).value - const id = Exchange.decode(response.slice()) + const id = Exchange.decode(response) log('read pubkey exchange from peer %p', remoteId) let peerId @@ -81,8 +82,12 @@ async function encrypt (localId: PeerId, conn: Duplex, remoteId?: Pe log('plaintext key exchange completed successfully with peer %p', peerId) shake.rest() + return { - conn: shake.stream, + conn: { + sink: shake.stream.sink, + source: map(shake.stream.source, (buf) => buf.subarray()) + }, remotePeer: peerId, remoteEarlyData: new Uint8Array() } diff --git a/src/insecure/pb/proto.ts b/src/insecure/pb/proto.ts index ec84391110..1ff2468f5c 100644 --- a/src/insecure/pb/proto.ts +++ b/src/insecure/pb/proto.ts @@ -1,9 +1,9 @@ /* eslint-disable import/export */ /* eslint-disable @typescript-eslint/no-namespace */ -import { encodeMessage, decodeMessage, message, bytes, enumeration } from 'protons-runtime' -import type { Codec } from 'protons-runtime' +import { encodeMessage, decodeMessage, message, enumeration } from 'protons-runtime' import type { Uint8ArrayList } from 'uint8arraylist' +import type { Codec } from 'protons-runtime' export interface Exchange { id?: Uint8Array @@ -11,14 +11,57 @@ export interface Exchange { } export namespace Exchange { + let _codec: Codec + export const codec = (): Codec => { - return message({ - 1: { name: 'id', codec: bytes, optional: true }, - 2: { name: 'pubkey', codec: PublicKey.codec(), optional: true } - }) + if (_codec == null) { + _codec = message((obj, writer, opts = {}) => { + if (opts.lengthDelimited !== false) { + writer.fork() + } + + if (obj.id != null) { + writer.uint32(10) + writer.bytes(obj.id) + } + + if (obj.pubkey != null) { + writer.uint32(18) + PublicKey.codec().encode(obj.pubkey, writer) + } + + if (opts.lengthDelimited !== false) { + writer.ldelim() + } + }, (reader, length) => { + const obj: any = {} + + const end = length == null ? reader.len : reader.pos + length + + while (reader.pos < end) { + const tag = reader.uint32() + + switch (tag >>> 3) { + case 1: + obj.id = reader.bytes() + break + case 2: + obj.pubkey = PublicKey.codec().decode(reader, reader.uint32()) + break + default: + reader.skipType(tag & 7) + break + } + } + + return obj + }) + } + + return _codec } - export const encode = (obj: Exchange): Uint8ArrayList => { + export const encode = (obj: Exchange): Uint8Array => { return encodeMessage(obj, Exchange.codec()) } @@ -43,7 +86,7 @@ enum __KeyTypeValues { export namespace KeyType { export const codec = () => { - return enumeration(__KeyTypeValues) + return enumeration(__KeyTypeValues) } } export interface PublicKey { @@ -52,14 +95,69 @@ export interface PublicKey { } export namespace PublicKey { + let _codec: Codec + export const codec = (): Codec => { - return message({ - 1: { name: 'Type', codec: KeyType.codec() }, - 2: { name: 'Data', codec: bytes } - }) + if (_codec == null) { + _codec = message((obj, writer, opts = {}) => { + if (opts.lengthDelimited !== false) { + writer.fork() + } + + if (obj.Type != null) { + writer.uint32(8) + KeyType.codec().encode(obj.Type, writer) + } else { + throw new Error('Protocol error: required field "Type" was not found in object') + } + + if (obj.Data != null) { + writer.uint32(18) + writer.bytes(obj.Data) + } else { + throw new Error('Protocol error: required field "Data" was not found in object') + } + + if (opts.lengthDelimited !== false) { + writer.ldelim() + } + }, (reader, length) => { + const obj: any = {} + + const end = length == null ? reader.len : reader.pos + length + + while (reader.pos < end) { + const tag = reader.uint32() + + switch (tag >>> 3) { + case 1: + obj.Type = KeyType.codec().decode(reader) + break + case 2: + obj.Data = reader.bytes() + break + default: + reader.skipType(tag & 7) + break + } + } + + if (obj.Type == null) { + throw new Error('Protocol error: value for required field "Type" was not found in protobuf') + } + + if (obj.Data == null) { + throw new Error('Protocol error: value for required field "Data" was not found in protobuf') + } + + return obj + }) + } + + return _codec } - export const encode = (obj: PublicKey): Uint8ArrayList => { + export const encode = (obj: PublicKey): Uint8Array => { return encodeMessage(obj, PublicKey.codec()) } diff --git a/src/metrics/index.ts b/src/metrics/index.ts index 6c4429bb2f..80f1c5fa1d 100644 --- a/src/metrics/index.ts +++ b/src/metrics/index.ts @@ -6,7 +6,6 @@ import { DefaultStats, StatsInit } from './stats.js' import type { ComponentMetricsUpdate, Metrics, Stats, TrackedMetric, TrackStreamOptions } from '@libp2p/interface-metrics' import type { PeerId } from '@libp2p/interface-peer-id' import type { Startable } from '@libp2p/interfaces/startable' -import type { Duplex } from 'it-stream-types' const initialCounters: ['dataReceived', 'dataSent'] = [ 'dataReceived', @@ -263,11 +262,11 @@ export class DefaultMetrics implements Metrics, Startable { * When the `PeerId` is known, `Metrics.updatePlaceholder` should be called * with the placeholder string returned from here, and the known `PeerId`. */ - trackStream > (opts: TrackStreamOptions): T { + trackStream (opts: TrackStreamOptions): void { const { stream, remotePeer, protocol } = opts if (!this.running) { - return stream + return } const source = stream.source @@ -275,7 +274,7 @@ export class DefaultMetrics implements Metrics, Startable { remotePeer, protocol, direction: 'in', - dataLength: chunk.length + dataLength: chunk.byteLength })) const sink = stream.sink @@ -287,14 +286,12 @@ export class DefaultMetrics implements Metrics, Startable { remotePeer, protocol, direction: 'out', - dataLength: chunk.length + dataLength: chunk.byteLength }) }), sink ) } - - return stream } } diff --git a/src/ping/index.ts b/src/ping/index.ts index 8eb83fb121..e5f7386509 100644 --- a/src/ping/index.ts +++ b/src/ping/index.ts @@ -113,7 +113,7 @@ export class PingService implements Startable { ) const end = Date.now() - if (result == null || !uint8ArrayEquals(data, result)) { + if (result == null || !uint8ArrayEquals(data, result.subarray())) { throw errCode(new Error('Received wrong ping ack'), codes.ERR_WRONG_PING_ACK) } diff --git a/src/pnet/index.ts b/src/pnet/index.ts index 2a14d52dfb..e1333083d7 100644 --- a/src/pnet/index.ts +++ b/src/pnet/index.ts @@ -13,6 +13,7 @@ import { import { handshake } from 'it-handshake' import { NONCE_LENGTH } from './key-generator.js' import type { ConnectionProtector, MultiaddrConnection } from '@libp2p/interface-connection' +import map from 'it-map' const log = logger('libp2p:pnet') @@ -83,6 +84,7 @@ export class PreSharedKeyConnectionProtector implements ConnectionProtector { // Encrypt all outbound traffic createBoxStream(localNonce, this.psk), shake.stream, + (source) => map(source, (buf) => buf.subarray()), // Decrypt all inbound traffic createUnboxStream(remoteNonce, this.psk), external diff --git a/src/upgrader.ts b/src/upgrader.ts index 1c5919fe1a..8d314bb135 100644 --- a/src/upgrader.ts +++ b/src/upgrader.ts @@ -1,6 +1,6 @@ import { logger } from '@libp2p/logger' import errCode from 'err-code' -import { Dialer, Listener } from '@libp2p/multistream-select' +import * as mss from '@libp2p/multistream-select' import { pipe } from 'it-pipe' // @ts-expect-error mutable-proxy does not export types import mutableProxy from 'mutable-proxy' @@ -152,7 +152,7 @@ export class DefaultUpgrader extends EventEmitter implements Upg ({ setTarget: setPeer, proxy: proxyPeer } = mutableProxy()) const idString = `${(Math.random() * 1e9).toString(36)}${Date.now()}` setPeer({ toString: () => idString }) - maConn = metrics.trackStream({ stream: maConn, remotePeer: proxyPeer }) + metrics.trackStream({ stream: maConn, remotePeer: proxyPeer }) } log('starting the inbound connection upgrade') @@ -253,7 +253,7 @@ export class DefaultUpgrader extends EventEmitter implements Upg ({ setTarget: setPeer, proxy: proxyPeer } = mutableProxy()) const idString = `${(Math.random() * 1e9).toString(36)}${Date.now()}` setPeer({ toB58String: () => idString }) - maConn = metrics.trackStream({ stream: maConn, remotePeer: proxyPeer }) + metrics.trackStream({ stream: maConn, remotePeer: proxyPeer }) } log('Starting the outbound connection upgrade') @@ -351,9 +351,8 @@ export class DefaultUpgrader extends EventEmitter implements Upg void Promise.resolve() .then(async () => { - const mss = new Listener(muxedStream) const protocols = this.components.getRegistrar().getProtocols() - const { stream, protocol } = await mss.handle(protocols) + const { stream, protocol } = await mss.handle(muxedStream, protocols) log('%s: incoming stream opened on %s', direction, protocol) const metrics = this.components.getMetrics() @@ -405,7 +404,6 @@ export class DefaultUpgrader extends EventEmitter implements Upg log('%s: starting new stream on %s', direction, protocols) const muxedStream = muxer.newStream() - const mss = new Dialer(muxedStream) const metrics = this.components.getMetrics() let controller: TimeoutController | undefined @@ -422,10 +420,10 @@ export class DefaultUpgrader extends EventEmitter implements Upg } catch {} } - let { stream, protocol } = await mss.select(protocols, options) + const { stream, protocol } = await mss.select(muxedStream, protocols, options) if (metrics != null) { - stream = metrics.trackStream({ stream, remotePeer, protocol }) + metrics.trackStream({ stream, remotePeer, protocol }) } const outgoingLimit = findOutgoingStreamLimit(protocol, this.components.getRegistrar()) @@ -545,12 +543,13 @@ export class DefaultUpgrader extends EventEmitter implements Upg * Attempts to encrypt the incoming `connection` with the provided `cryptos` */ async _encryptInbound (connection: Duplex): Promise { - const mss = new Listener(connection) const protocols = Array.from(this.connectionEncryption.keys()) log('handling inbound crypto protocol selection', protocols) try { - const { stream, protocol } = await mss.handle(protocols) + const { stream, protocol } = await mss.handle(connection, protocols, { + writeBytes: true + }) const encrypter = this.connectionEncryption.get(protocol) if (encrypter == null) { @@ -573,12 +572,13 @@ export class DefaultUpgrader extends EventEmitter implements Upg * The first `ConnectionEncrypter` module to succeed will be used */ async _encryptOutbound (connection: MultiaddrConnection, remotePeerId: PeerId): Promise { - const mss = new Dialer(connection) const protocols = Array.from(this.connectionEncryption.keys()) log('selecting outbound crypto protocol', protocols) try { - const { stream, protocol } = await mss.select(protocols) + const { stream, protocol } = await mss.select(connection, protocols, { + writeBytes: true + }) const encrypter = this.connectionEncryption.get(protocol) if (encrypter == null) { @@ -601,11 +601,12 @@ export class DefaultUpgrader extends EventEmitter implements Upg * muxer will be used for all future streams on the connection. */ async _multiplexOutbound (connection: MultiaddrConnection, muxers: Map): Promise<{ stream: Duplex, muxerFactory?: StreamMuxerFactory}> { - const dialer = new Dialer(connection) const protocols = Array.from(muxers.keys()) log('outbound selecting muxer %s', protocols) try { - const { stream, protocol } = await dialer.select(protocols) + const { stream, protocol } = await mss.select(connection, protocols, { + writeBytes: true + }) log('%s selected as muxer protocol', protocol) const muxerFactory = muxers.get(protocol) return { stream, muxerFactory } @@ -620,11 +621,12 @@ export class DefaultUpgrader extends EventEmitter implements Upg * selected muxer will be used for all future streams on the connection. */ async _multiplexInbound (connection: MultiaddrConnection, muxers: Map): Promise<{ stream: Duplex, muxerFactory?: StreamMuxerFactory}> { - const listener = new Listener(connection) const protocols = Array.from(muxers.keys()) log('inbound handling muxers %s', protocols) try { - const { stream, protocol } = await listener.handle(protocols) + const { stream, protocol } = await mss.handle(connection, protocols, { + writeBytes: true + }) const muxerFactory = muxers.get(protocol) return { stream, muxerFactory } } catch (err: any) { diff --git a/test/configuration/utils.ts b/test/configuration/utils.ts index bc587e5cd9..a34385c9e3 100644 --- a/test/configuration/utils.ts +++ b/test/configuration/utils.ts @@ -10,7 +10,6 @@ import type { Libp2pInit, Libp2pOptions } from '../../src/index.js' import type { PeerId } from '@libp2p/interface-peer-id' import * as cborg from 'cborg' import { peerIdFromString } from '@libp2p/peer-id' -import { Uint8ArrayList } from 'uint8arraylist' const relayAddr = MULTIADDRS_WEBSOCKETS[0] @@ -33,16 +32,16 @@ class MockPubSub extends PubSubBaseProtocol { return cborg.decode(bytes) } - encodeRpc (rpc: PubSubRPC): Uint8ArrayList { - return new Uint8ArrayList(cborg.encode(rpc)) + encodeRpc (rpc: PubSubRPC): Uint8Array { + return cborg.encode(rpc) } decodeMessage (bytes: Uint8Array): PubSubRPCMessage { return cborg.decode(bytes) } - encodeMessage (rpc: PubSubRPCMessage): Uint8ArrayList { - return new Uint8ArrayList(cborg.encode(rpc)) + encodeMessage (rpc: PubSubRPCMessage): Uint8Array { + return cborg.encode(rpc) } async publishMessage (from: PeerId, message: Message): Promise { diff --git a/test/core/consume-peer-record.spec.ts b/test/core/consume-peer-record.spec.ts index 70dad598dd..0f7a126ce0 100644 --- a/test/core/consume-peer-record.spec.ts +++ b/test/core/consume-peer-record.spec.ts @@ -1,7 +1,7 @@ /* eslint-env mocha */ import { WebSockets } from '@libp2p/websockets' -import { NOISE } from '@chainsafe/libp2p-noise' +import { Plaintext } from '../../src/insecure/index.js' import { createPeerId } from '../utils/creators/peer.js' import { Multiaddr } from '@multiformats/multiaddr' import { createLibp2pNode, Libp2pNode } from '../../src/libp2p.js' @@ -18,7 +18,7 @@ describe('Consume peer record', () => { new WebSockets() ], connectionEncryption: [ - NOISE + new Plaintext() ] } libp2p = await createLibp2pNode(config) diff --git a/test/core/encryption.spec.ts b/test/core/encryption.spec.ts index 54d4d4c8c2..e8f52b80f8 100644 --- a/test/core/encryption.spec.ts +++ b/test/core/encryption.spec.ts @@ -2,7 +2,7 @@ import { expect } from 'aegir/chai' import { WebSockets } from '@libp2p/websockets' -import { NOISE } from '@chainsafe/libp2p-noise' +import { Plaintext } from '../../src/insecure/index.js' import { createLibp2p, Libp2pOptions } from '../../src/index.js' import { codes as ErrorCodes } from '../../src/errors.js' import { createPeerId } from '../utils/creators/peer.js' @@ -46,7 +46,7 @@ describe('Connection encryption configuration', () => { new WebSockets() ], connectionEncryption: [ - NOISE + new Plaintext() ] } await createLibp2p(config) diff --git a/test/core/get-public-key.spec.ts b/test/core/get-public-key.spec.ts index c72f695dd4..041d9c4971 100644 --- a/test/core/get-public-key.spec.ts +++ b/test/core/get-public-key.spec.ts @@ -2,7 +2,7 @@ import { expect } from 'aegir/chai' import { WebSockets } from '@libp2p/websockets' -import { NOISE } from '@chainsafe/libp2p-noise' +import { Plaintext } from '../../src/insecure/index.js' import { createPeerId } from '../utils/creators/peer.js' import { createLibp2pNode, Libp2pNode } from '../../src/libp2p.js' import type { Libp2pOptions } from '../../src/index.js' @@ -20,7 +20,7 @@ describe('getPublicKey', () => { new WebSockets() ], connectionEncryption: [ - NOISE + new Plaintext() ], dht: new KadDHT() } diff --git a/test/core/listening.node.ts b/test/core/listening.node.ts index fdb7e66246..41eeddf84b 100644 --- a/test/core/listening.node.ts +++ b/test/core/listening.node.ts @@ -2,7 +2,7 @@ import { expect } from 'aegir/chai' import { TCP } from '@libp2p/tcp' -import { NOISE } from '@chainsafe/libp2p-noise' +import { Plaintext } from '../../src/insecure/index.js' import { createPeerId } from '../utils/creators/peer.js' import type { PeerId } from '@libp2p/interface-peer-id' import { createLibp2pNode, Libp2pNode } from '../../src/libp2p.js' @@ -31,7 +31,7 @@ describe('Listening', () => { new TCP() ], connectionEncryption: [ - NOISE + new Plaintext() ] }) diff --git a/test/dialing/direct.node.ts b/test/dialing/direct.node.ts index 19091bf3f4..d4965adba0 100644 --- a/test/dialing/direct.node.ts +++ b/test/dialing/direct.node.ts @@ -4,7 +4,7 @@ import { expect } from 'aegir/chai' import sinon from 'sinon' import { TCP } from '@libp2p/tcp' import { Mplex } from '@libp2p/mplex' -import { NOISE } from '@chainsafe/libp2p-noise' +import { Plaintext } from '../../src/insecure/index.js' import { Multiaddr } from '@multiformats/multiaddr' import delay from 'delay' @@ -245,7 +245,7 @@ describe('libp2p.dialer (direct, TCP)', () => { new Mplex() ], connectionEncryption: [ - NOISE + new Plaintext() ] }) await remoteLibp2p.handle('/echo/1.0.0', ({ stream }) => { @@ -278,7 +278,7 @@ describe('libp2p.dialer (direct, TCP)', () => { new Mplex() ], connectionEncryption: [ - NOISE + new Plaintext() ] }) @@ -298,7 +298,7 @@ describe('libp2p.dialer (direct, TCP)', () => { new Mplex() ], connectionEncryption: [ - NOISE + new Plaintext() ] }) @@ -325,7 +325,7 @@ describe('libp2p.dialer (direct, TCP)', () => { new Mplex() ], connectionEncryption: [ - NOISE + new Plaintext() ] }) @@ -354,7 +354,7 @@ describe('libp2p.dialer (direct, TCP)', () => { new Mplex() ], connectionEncryption: [ - NOISE + new Plaintext() ] }) @@ -418,7 +418,7 @@ describe('libp2p.dialer (direct, TCP)', () => { new Mplex() ], connectionEncryption: [ - NOISE + new Plaintext() ] }) @@ -444,7 +444,7 @@ describe('libp2p.dialer (direct, TCP)', () => { new Mplex() ], connectionEncryption: [ - NOISE + new Plaintext() ] }) @@ -467,7 +467,7 @@ describe('libp2p.dialer (direct, TCP)', () => { new Mplex() ], connectionEncryption: [ - NOISE + new Plaintext() ], connectionProtector: new PreSharedKeyConnectionProtector({ psk: swarmKeyBuffer @@ -505,7 +505,7 @@ describe('libp2p.dialer (direct, TCP)', () => { new Mplex() ], connectionEncryption: [ - NOISE + new Plaintext() ] }) @@ -541,7 +541,7 @@ describe('libp2p.dialer (direct, TCP)', () => { new Mplex() ], connectionEncryption: [ - NOISE + new Plaintext() ] }) diff --git a/test/dialing/direct.spec.ts b/test/dialing/direct.spec.ts index 846800a92e..f20ad79c49 100644 --- a/test/dialing/direct.spec.ts +++ b/test/dialing/direct.spec.ts @@ -7,7 +7,7 @@ import delay from 'delay' import { WebSockets } from '@libp2p/websockets' import * as filters from '@libp2p/websockets/filters' import { Mplex } from '@libp2p/mplex' -import { NOISE } from '@chainsafe/libp2p-noise' +import { Plaintext } from '../../src/insecure/index.js' import { Multiaddr } from '@multiformats/multiaddr' import { AbortError } from '@libp2p/interfaces/errors' import { MemoryDatastore } from 'datastore-core/memory' @@ -360,7 +360,7 @@ describe('libp2p.dialer (direct, WebSockets)', () => { new Mplex() ], connectionEncryption: [ - NOISE + new Plaintext() ] }) @@ -385,7 +385,7 @@ describe('libp2p.dialer (direct, WebSockets)', () => { new Mplex() ], connectionEncryption: [ - NOISE + new Plaintext() ], connectionManager: { maxParallelDials: 10, @@ -416,7 +416,7 @@ describe('libp2p.dialer (direct, WebSockets)', () => { new Mplex() ], connectionEncryption: [ - NOISE + new Plaintext() ] }) @@ -450,7 +450,7 @@ describe('libp2p.dialer (direct, WebSockets)', () => { new Mplex() ], connectionEncryption: [ - NOISE + new Plaintext() ] }) @@ -490,7 +490,7 @@ describe('libp2p.dialer (direct, WebSockets)', () => { new Mplex() ], connectionEncryption: [ - NOISE + new Plaintext() ] }) @@ -518,7 +518,7 @@ describe('libp2p.dialer (direct, WebSockets)', () => { new Mplex() ], connectionEncryption: [ - NOISE + new Plaintext() ] }) @@ -539,7 +539,7 @@ describe('libp2p.dialer (direct, WebSockets)', () => { new Mplex() ], connectionEncryption: [ - NOISE + new Plaintext() ] }) @@ -576,7 +576,7 @@ describe('libp2p.dialer (direct, WebSockets)', () => { new Mplex() ], connectionEncryption: [ - NOISE + new Plaintext() ] }) @@ -602,7 +602,7 @@ describe('libp2p.dialer (direct, WebSockets)', () => { new Mplex() ], connectionEncryption: [ - NOISE + new Plaintext() ] }) diff --git a/test/fetch/fetch.node.ts b/test/fetch/fetch.node.ts index 32a089d96a..82a27d2aee 100644 --- a/test/fetch/fetch.node.ts +++ b/test/fetch/fetch.node.ts @@ -4,7 +4,7 @@ import { expect } from 'aegir/chai' import { createLibp2pNode, Libp2pNode } from '../../src/libp2p.js' import { TCP } from '@libp2p/tcp' import { Mplex } from '@libp2p/mplex' -import { NOISE } from '@chainsafe/libp2p-noise' +import { Plaintext } from '../../src/insecure/index.js' import { createPeerId } from '../utils/creators/peer.js' import { codes } from '../../src/errors.js' import type { PeerId } from '@libp2p/interface-peer-id' @@ -22,7 +22,7 @@ async function createNode (peerId: PeerId) { new Mplex() ], connectionEncryption: [ - NOISE + new Plaintext() ] }) } diff --git a/test/metrics/index.node.ts b/test/metrics/index.node.ts index 89296b1e7b..380989eb24 100644 --- a/test/metrics/index.node.ts +++ b/test/metrics/index.node.ts @@ -13,6 +13,7 @@ import type { Libp2pOptions } from '../../src/index.js' import type { DefaultMetrics } from '../../src/metrics/index.js' import pWaitFor from 'p-wait-for' import drain from 'it-drain' +import map from 'it-map' describe('libp2p.metrics', () => { let libp2p: Libp2pNode @@ -97,7 +98,7 @@ describe('libp2p.metrics', () => { const result = await pipe( [bytes], stream, - async (source) => await toBuffer(source) + async (source) => await toBuffer(map(source, (list) => list.subarray())) ) // Flush the call stack diff --git a/test/transports/transport-manager.spec.ts b/test/transports/transport-manager.spec.ts index 8ca53aa96e..348ef695d6 100644 --- a/test/transports/transport-manager.spec.ts +++ b/test/transports/transport-manager.spec.ts @@ -5,7 +5,7 @@ import sinon from 'sinon' import { Multiaddr } from '@multiformats/multiaddr' import { WebSockets } from '@libp2p/websockets' import * as filters from '@libp2p/websockets/filters' -import { NOISE } from '@chainsafe/libp2p-noise' +import { Plaintext } from '../../src/insecure/index.js' import { DefaultAddressManager } from '../../src/address-manager/index.js' import { DefaultTransportManager, FaultTolerance } from '../../src/transport-manager.js' import { mockUpgrader } from '@libp2p/interface-mocks' @@ -109,7 +109,7 @@ describe('libp2p.transportManager (dial only)', () => { listen: ['/ip4/127.0.0.1/tcp/0'] }, transports: [new WebSockets()], - connectionEncryption: [NOISE] + connectionEncryption: [new Plaintext()] }) await expect(libp2p.start()).to.eventually.be.rejected @@ -129,7 +129,7 @@ describe('libp2p.transportManager (dial only)', () => { new WebSockets() ], connectionEncryption: [ - NOISE + new Plaintext() ] }) @@ -149,7 +149,7 @@ describe('libp2p.transportManager (dial only)', () => { new WebSockets() ], connectionEncryption: [ - NOISE + new Plaintext() ] }) diff --git a/test/upgrading/upgrader.spec.ts b/test/upgrading/upgrader.spec.ts index 9a8bf95e74..48ca372e2c 100644 --- a/test/upgrading/upgrader.spec.ts +++ b/test/upgrading/upgrader.spec.ts @@ -8,7 +8,6 @@ import { pipe } from 'it-pipe' import all from 'it-all' import pSettle from 'p-settle' import { WebSockets } from '@libp2p/websockets' -import { NOISE } from '@chainsafe/libp2p-noise' import { PreSharedKeyConnectionProtector } from '../../src/pnet/index.js' import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string' import swarmKey from '../fixtures/swarm.key.js' @@ -30,6 +29,7 @@ import { pEvent } from 'p-event' import { TimeoutController } from 'timeout-abort-controller' import delay from 'delay' import drain from 'it-drain' +import { Uint8ArrayList } from 'uint8arraylist' const addrs = [ new Multiaddr('/ip4/127.0.0.1/tcp/0'), @@ -409,7 +409,7 @@ describe('Upgrader', () => { source: (async function * () { // longer than the timeout await delay(1000) - yield new Uint8Array() + yield new Uint8ArrayList() }()), sink: drain }) @@ -479,7 +479,7 @@ describe('libp2p.upgrader', () => { new Mplex() ], connectionEncryption: [ - NOISE + new Plaintext() ], connectionProtector: new PreSharedKeyConnectionProtector({ psk: uint8ArrayFromString(swarmKey) @@ -501,7 +501,7 @@ describe('libp2p.upgrader', () => { new Mplex() ], connectionEncryption: [ - NOISE + new Plaintext() ] }) await libp2p.start() @@ -517,7 +517,7 @@ describe('libp2p.upgrader', () => { new Mplex() ], connectionEncryption: [ - NOISE + new Plaintext() ] }) await remoteLibp2p.start() @@ -548,7 +548,7 @@ describe('libp2p.upgrader', () => { new Mplex() ], connectionEncryption: [ - NOISE + new Plaintext() ] }) await libp2p.start() @@ -562,7 +562,7 @@ describe('libp2p.upgrader', () => { new Mplex() ], connectionEncryption: [ - NOISE + new Plaintext() ] }) await remoteLibp2p.start() @@ -607,7 +607,7 @@ describe('libp2p.upgrader', () => { new Mplex() ], connectionEncryption: [ - NOISE + new Plaintext() ] }) await libp2p.start() @@ -621,7 +621,7 @@ describe('libp2p.upgrader', () => { new Mplex() ], connectionEncryption: [ - NOISE + new Plaintext() ] }) await remoteLibp2p.start() @@ -669,7 +669,7 @@ describe('libp2p.upgrader', () => { new Mplex() ], connectionEncryption: [ - NOISE + new Plaintext() ] }) await libp2p.start() @@ -683,7 +683,7 @@ describe('libp2p.upgrader', () => { new Mplex() ], connectionEncryption: [ - NOISE + new Plaintext() ] }) await remoteLibp2p.start()