From de30c2cec79d1e9d758cbcddc11d315b17843343 Mon Sep 17 00:00:00 2001 From: Alex Potsides Date: Fri, 17 Jun 2022 14:46:31 +0100 Subject: [PATCH] feat!: limit protocol streams per-connection (#1255) * feat: limit protocol streams per-connection Uses the `maxInboundStreams` and `maxOutboundStreams` of the `registrar.handle` opts to limit the number of concurrent streams open on each connection on a per-protocol basis. Both values default to 1 so some tuning will be necessary to set appropriate values for some protocols. * chore: make error codes consistent * chore: fix up examples --- .gitignore | 2 +- doc/API.md | 8 +- examples/chat/src/dialer.js | 2 +- examples/connection-encryption/1.js | 2 +- examples/delegated-routing/package.json | 8 +- examples/echo/src/dialer.js | 2 +- examples/libp2p-in-the-browser/package.json | 6 +- examples/package.json | 2 +- examples/pnet/index.js | 2 +- examples/protocol-and-stream-muxing/1.js | 4 +- examples/protocol-and-stream-muxing/2.js | 11 +- examples/protocol-and-stream-muxing/3.js | 4 +- examples/protocol-and-stream-muxing/README.md | 20 +-- examples/transports/2.js | 2 +- examples/transports/3.js | 4 +- examples/transports/4.js | 2 +- examples/transports/README.md | 6 +- examples/webrtc-direct/dialer.js | 2 +- examples/webrtc-direct/listener.js | 2 +- examples/webrtc-direct/package.json | 2 +- package.json | 32 ++-- src/circuit/circuit/hop.ts | 4 +- src/circuit/circuit/stop.ts | 2 +- src/config.ts | 14 +- src/errors.ts | 5 +- src/fetch/index.ts | 133 ++++++++++------ src/identify/index.ts | 17 ++- src/index.ts | 8 +- src/libp2p.ts | 6 +- src/ping/index.ts | 11 +- src/registrar.ts | 12 +- src/upgrader.ts | 88 +++++++++-- test/content-routing/dht/operation.node.ts | 4 +- test/dialing/direct.node.ts | 14 +- test/dialing/direct.spec.ts | 4 +- test/fetch/index.spec.ts | 12 +- test/identify/index.spec.ts | 17 ++- test/identify/push.spec.ts | 14 +- test/metrics/index.node.ts | 4 +- test/ping/index.spec.ts | 12 +- test/ping/ping.node.ts | 7 +- test/relay/relay.node.ts | 4 +- test/upgrading/upgrader.spec.ts | 144 +++++++++++++++++- 43 files changed, 476 insertions(+), 185 deletions(-) diff --git a/.gitignore b/.gitignore index 69f5439f9c..7e4d369a6d 100644 --- a/.gitignore +++ b/.gitignore @@ -10,7 +10,7 @@ test/repo-tests* logs *.log -coverage +.coverage .nyc_output # Runtime data diff --git a/doc/API.md b/doc/API.md index d8ba8fd83e..c26a8c71f2 100644 --- a/doc/API.md +++ b/doc/API.md @@ -389,7 +389,7 @@ await libp2p.hangUp(remotePeerId) Sets up [multistream-select routing](https://github.com/multiformats/multistream-select) of protocols to their application handlers. Whenever a stream is opened on one of the provided protocols, the handler will be called. `handle` must be called in order to register a handler and support for a given protocol. This also informs other peers of the protocols you support. -`libp2p.handle(protocols, handler)` +`libp2p.handle(protocols, handler, options)` In the event of a new handler for the same protocol being added, the first one is discarded. @@ -399,6 +399,7 @@ In the event of a new handler for the same protocol being added, the first one i |------|------|-------------| | protocols | `Array|string` | protocols to register | | handler | `function({ connection:*, stream:*, protocol:string })` | handler to call | +| options | `StreamHandlerOptions` | Options including protocol stream limits | #### Example @@ -409,7 +410,10 @@ const handler = ({ connection, stream, protocol }) => { // use stream or connection according to the needs } -libp2p.handle('/echo/1.0.0', handler) +libp2p.handle('/echo/1.0.0', handler, { + maxInboundStreams: 5, + maxOutboundStreams: 5 +}) ``` ### unhandle diff --git a/examples/chat/src/dialer.js b/examples/chat/src/dialer.js index e9d59262e5..8f37e7551b 100644 --- a/examples/chat/src/dialer.js +++ b/examples/chat/src/dialer.js @@ -32,7 +32,7 @@ async function run () { // Dial to the remote peer (the "listener") const listenerMa = new Multiaddr(`/ip4/127.0.0.1/tcp/10333/p2p/${idListener.toString()}`) - const { stream } = await nodeDialer.dialProtocol(listenerMa, '/chat/1.0.0') + const stream = await nodeDialer.dialProtocol(listenerMa, '/chat/1.0.0') console.log('Dialer dialed to listener on protocol: /chat/1.0.0') console.log('Type a message and see what happens') diff --git a/examples/connection-encryption/1.js b/examples/connection-encryption/1.js index eafb4ff7e7..0e774fcb99 100644 --- a/examples/connection-encryption/1.js +++ b/examples/connection-encryption/1.js @@ -40,7 +40,7 @@ const createNode = async () => { ) }) - const { stream } = await node1.dialProtocol(node2.peerId, '/a-protocol') + const stream = await node1.dialProtocol(node2.peerId, '/a-protocol') await pipe( [uint8ArrayFromString('This information is sent out encrypted to the other peer')], diff --git a/examples/delegated-routing/package.json b/examples/delegated-routing/package.json index 399cc0afbb..3892af938f 100644 --- a/examples/delegated-routing/package.json +++ b/examples/delegated-routing/package.json @@ -8,10 +8,10 @@ "libp2p": "../../", "@libp2p/delegated-content-routing": "^2.0.1", "@libp2p/delegated-peer-routing": "^2.0.1", - "@libp2p/kad-dht": "^2.0.0", - "@libp2p/mplex": "^2.0.0", - "@libp2p/webrtc-star": "^2.0.0", - "@libp2p/websockets": "^2.0.0", + "@libp2p/kad-dht": "^3.0.0", + "@libp2p/mplex": "^3.0.0", + "@libp2p/webrtc-star": "^2.0.1", + "@libp2p/websockets": "^3.0.0", "react": "^17.0.2", "react-dom": "^17.0.2", "react-scripts": "5.0.0" diff --git a/examples/echo/src/dialer.js b/examples/echo/src/dialer.js index 74a6508798..435a9ea220 100644 --- a/examples/echo/src/dialer.js +++ b/examples/echo/src/dialer.js @@ -37,7 +37,7 @@ async function run() { // Dial the listener node console.log('Dialing to peer:', listenerMultiaddr) - const { stream } = await dialerNode.dialProtocol(listenerMultiaddr, '/echo/1.0.0') + const stream = await dialerNode.dialProtocol(listenerMultiaddr, '/echo/1.0.0') console.log('nodeA dialed to nodeB on protocol: /echo/1.0.0') diff --git a/examples/libp2p-in-the-browser/package.json b/examples/libp2p-in-the-browser/package.json index 6dbc839662..4736108cae 100644 --- a/examples/libp2p-in-the-browser/package.json +++ b/examples/libp2p-in-the-browser/package.json @@ -11,9 +11,9 @@ "dependencies": { "@chainsafe/libp2p-noise": "^6.2.0", "@libp2p/bootstrap": "^2.0.0", - "@libp2p/mplex": "^2.0.0", - "@libp2p/webrtc-star": "^2.0.0", - "@libp2p/websockets": "^2.0.0", + "@libp2p/mplex": "^3.0.0", + "@libp2p/webrtc-star": "^2.0.1", + "@libp2p/websockets": "^3.0.0", "libp2p": "../../" }, "devDependencies": { diff --git a/examples/package.json b/examples/package.json index 70984696c5..a699ca30c0 100644 --- a/examples/package.json +++ b/examples/package.json @@ -10,7 +10,7 @@ "license": "MIT", "dependencies": { "@libp2p/pubsub-peer-discovery": "^6.0.0", - "@libp2p/floodsub": "^2.0.0", + "@libp2p/floodsub": "^3.0.0", "@nodeutils/defaults-deep": "^1.1.0", "execa": "^2.1.0", "fs-extra": "^8.1.0", diff --git a/examples/pnet/index.js b/examples/pnet/index.js index 59adbaa88f..fa8268df84 100644 --- a/examples/pnet/index.js +++ b/examples/pnet/index.js @@ -43,7 +43,7 @@ generateKey(otherSwarmKey) ) }) - const { stream } = await node1.dialProtocol(node2.peerId, '/private') + const stream = await node1.dialProtocol(node2.peerId, '/private') await pipe( [uint8ArrayFromString('This message is sent on a private network')], diff --git a/examples/protocol-and-stream-muxing/1.js b/examples/protocol-and-stream-muxing/1.js index 364bffdbc7..f1ab2f20df 100644 --- a/examples/protocol-and-stream-muxing/1.js +++ b/examples/protocol-and-stream-muxing/1.js @@ -60,14 +60,14 @@ const createNode = async () => { }) */ - const { stream } = await node1.dialProtocol(node2.peerId, ['/your-protocol']) + const stream = await node1.dialProtocol(node2.peerId, ['/your-protocol']) await pipe( [uint8ArrayFromString('my own protocol, wow!')], stream ) /* - const { stream } = node1.dialProtocol(node2.peerId, ['/another-protocol/1.0.0']) + const stream = node1.dialProtocol(node2.peerId, ['/another-protocol/1.0.0']) await pipe( ['my own protocol, wow!'], diff --git a/examples/protocol-and-stream-muxing/2.js b/examples/protocol-and-stream-muxing/2.js index 44b0b1ebb3..2605938d20 100644 --- a/examples/protocol-and-stream-muxing/2.js +++ b/examples/protocol-and-stream-muxing/2.js @@ -38,22 +38,25 @@ const createNode = async () => { console.log(`from: ${protocol}, msg: ${uint8ArrayToString(msg)}`) } } - ) + ).finally(() => { + // clean up resources + stream.close() + }) }) - const { stream: stream1 } = await node1.dialProtocol(node2.peerId, ['/a']) + const stream1 = await node1.dialProtocol(node2.peerId, ['/a']) await pipe( [uint8ArrayFromString('protocol (a)')], stream1 ) - const { stream: stream2 } = await node1.dialProtocol(node2.peerId, ['/b']) + const stream2 = await node1.dialProtocol(node2.peerId, ['/b']) await pipe( [uint8ArrayFromString('protocol (b)')], stream2 ) - const { stream: stream3 } = await node1.dialProtocol(node2.peerId, ['/b']) + const stream3 = await node1.dialProtocol(node2.peerId, ['/b']) await pipe( [uint8ArrayFromString('another stream on protocol (b)')], stream3 diff --git a/examples/protocol-and-stream-muxing/3.js b/examples/protocol-and-stream-muxing/3.js index a368743509..af63bdea8e 100644 --- a/examples/protocol-and-stream-muxing/3.js +++ b/examples/protocol-and-stream-muxing/3.js @@ -54,13 +54,13 @@ const createNode = async () => { ) }) - const { stream: stream1 } = await node1.dialProtocol(node2.peerId, ['/node-2']) + const stream1 = await node1.dialProtocol(node2.peerId, ['/node-2']) await pipe( [uint8ArrayFromString('from 1 to 2')], stream1 ) - const { stream: stream2 } = await node2.dialProtocol(node1.peerId, ['/node-1']) + const stream2 = await node2.dialProtocol(node1.peerId, ['/node-1']) await pipe( [uint8ArrayFromString('from 2 to 1')], stream2 diff --git a/examples/protocol-and-stream-muxing/README.md b/examples/protocol-and-stream-muxing/README.md index cb34a65dba..3e76b3966b 100644 --- a/examples/protocol-and-stream-muxing/README.md +++ b/examples/protocol-and-stream-muxing/README.md @@ -40,7 +40,7 @@ node2.handle('/your-protocol', ({ stream }) => { After the protocol is _handled_, now we can dial to it. ```JavaScript -const { stream } = await node1.dialProtocol(node2.peerId, ['/your-protocol']) +const stream = await node1.dialProtocol(node2.peerId, ['/your-protocol']) await pipe( ['my own protocol, wow!'], @@ -62,7 +62,7 @@ node2.handle('/another-protocol/1.0.1', ({ stream }) => { ) }) // ... -const { stream } = await node1.dialProtocol(node2.peerId, ['/another-protocol/1.0.0']) +const stream = await node1.dialProtocol(node2.peerId, ['/another-protocol/1.0.0']) await pipe( ['my own protocol, wow!'], @@ -133,19 +133,19 @@ node2.handle(['/a', '/b'], ({ protocol, stream }) => { ) }) -const { stream } = await node1.dialProtocol(node2.peerId, ['/a']) +const stream = await node1.dialProtocol(node2.peerId, ['/a']) await pipe( ['protocol (a)'], stream ) -const { stream: stream2 } = await node1.dialProtocol(node2.peerId, ['/b']) +const stream2 = await node1.dialProtocol(node2.peerId, ['/b']) await pipe( ['protocol (b)'], stream2 ) -const { stream: stream3 } = await node1.dialProtocol(node2.peerId, ['/b']) +const stream3 = await node1.dialProtocol(node2.peerId, ['/b']) await pipe( ['another stream on protocol (b)'], stream3 @@ -167,7 +167,7 @@ There is one last trick on _protocol and stream multiplexing_ that libp2p uses t With the aid of both mechanisms, we can reuse an incomming connection to dial streams out too, this is specially useful when you are behind tricky NAT, firewalls or if you are running in a browser, where you can't have listening addrs, but you can dial out. By dialing out, you enable other peers to talk with you in Protocols that they want, simply by opening a new multiplexed stream. -You can see this working on example [3.js](./3.js). +You can see this working on example [3.js](./3.js). As we've seen earlier, we can create our node with this createNode function. ```js @@ -229,14 +229,14 @@ node2.handle('/node-2', ({ stream }) => { }) // Dialing node2 from node1 -const { stream: stream1 } = await node1.dialProtocol(node2.peerId, ['/node-2']) +const stream1 = await node1.dialProtocol(node2.peerId, ['/node-2']) await pipe( ['from 1 to 2'], stream1 ) // Dialing node1 from node2 -const { stream: stream2 } = await node2.dialProtocol(node1.peerId, ['/node-1']) +const stream2 = await node2.dialProtocol(node1.peerId, ['/node-1']) await pipe( ['from 2 to 1'], stream2 @@ -256,14 +256,14 @@ So, we have successfully set up a bidirectional connection with protocol muxing. The code below will result into an error as `the dial address is not valid`. ```js // Dialing from node2 to node1 -const { stream: stream2 } = await node2.dialProtocol(node1.peerId, ['/node-1']) +const stream2 = await node2.dialProtocol(node1.peerId, ['/node-1']) await pipe( ['from 2 to 1'], stream2 ) // Dialing from node1 to node2 -const { stream: stream1 } = await node1.dialProtocol(node2.peerId, ['/node-2']) +const stream1 = await node1.dialProtocol(node2.peerId, ['/node-2']) await pipe( ['from 1 to 2'], stream1 diff --git a/examples/transports/2.js b/examples/transports/2.js index d157da1191..9dee88780c 100644 --- a/examples/transports/2.js +++ b/examples/transports/2.js @@ -48,7 +48,7 @@ function printAddrs (node, number) { }) await node1.peerStore.addressBook.set(node2.peerId, node2.getMultiaddrs()) - const { stream } = await node1.dialProtocol(node2.peerId, '/print') + const stream = await node1.dialProtocol(node2.peerId, '/print') await pipe( ['Hello', ' ', 'p2p', ' ', 'world', '!'].map(str => uint8ArrayFromString(str)), diff --git a/examples/transports/3.js b/examples/transports/3.js index b1a233f867..0bc9fa7097 100644 --- a/examples/transports/3.js +++ b/examples/transports/3.js @@ -63,14 +63,14 @@ function print ({ stream }) { await node3.peerStore.addressBook.set(node1.peerId, node1.getMultiaddrs()) // node 1 (TCP) dials to node 2 (TCP+WebSockets) - const { stream } = await node1.dialProtocol(node2.peerId, '/print') + const stream = await node1.dialProtocol(node2.peerId, '/print') await pipe( [uint8ArrayFromString('node 1 dialed to node 2 successfully')], stream ) // node 2 (TCP+WebSockets) dials to node 2 (WebSockets) - const { stream: stream2 } = await node2.dialProtocol(node3.peerId, '/print') + const stream2 = await node2.dialProtocol(node3.peerId, '/print') await pipe( [uint8ArrayFromString('node 2 dialed to node 3 successfully')], stream2 diff --git a/examples/transports/4.js b/examples/transports/4.js index 389217aaf4..0e13c569a2 100644 --- a/examples/transports/4.js +++ b/examples/transports/4.js @@ -78,7 +78,7 @@ function print ({ stream }) { const targetAddr = node1.getMultiaddrs()[0]; // node 2 (Secure WebSockets) dials to node 1 (Secure Websockets) - const { stream } = await node2.dialProtocol(targetAddr, '/print') + const stream = await node2.dialProtocol(targetAddr, '/print') await pipe( [uint8ArrayFromString('node 2 dialed to node 1 successfully')], stream diff --git a/examples/transports/README.md b/examples/transports/README.md index 1d3f5d4fd8..ca951834b9 100644 --- a/examples/transports/README.md +++ b/examples/transports/README.md @@ -139,7 +139,7 @@ Then add, }) await node1.peerStore.addressBook.set(node2.peerId, node2.multiaddrs) - const { stream } = await node1.dialProtocol(node2.peerId, '/print') + const stream = await node1.dialProtocol(node2.peerId, '/print') await pipe( ['Hello', ' ', 'p2p', ' ', 'world', '!'], @@ -225,14 +225,14 @@ await node2.peerStore.addressBook.set(node3.peerId, node3.multiaddrs) await node3.peerStore.addressBook.set(node1.peerId, node1.multiaddrs) // node 1 (TCP) dials to node 2 (TCP+WebSockets) -const { stream } = await node1.dialProtocol(node2.peerId, '/print') +const stream = await node1.dialProtocol(node2.peerId, '/print') await pipe( ['node 1 dialed to node 2 successfully'], stream ) // node 2 (TCP+WebSockets) dials to node 2 (WebSockets) -const { stream: stream2 } = await node2.dialProtocol(node3.peerId, '/print') +const stream2 = await node2.dialProtocol(node3.peerId, '/print') await pipe( ['node 2 dialed to node 3 successfully'], stream2 diff --git a/examples/webrtc-direct/dialer.js b/examples/webrtc-direct/dialer.js index d2357dfa8c..1663054cfb 100644 --- a/examples/webrtc-direct/dialer.js +++ b/examples/webrtc-direct/dialer.js @@ -1,5 +1,5 @@ import { createLibp2p } from 'libp2p' -import { WebRTCDirect } from '@achingbrain/webrtc-direct' +import { WebRTCDirect } from '@libp2p/webrtc-direct' import { Mplex } from '@libp2p/mplex' import { Noise } from '@chainsafe/libp2p-noise' import { Bootstrap } from '@libp2p/bootstrap' diff --git a/examples/webrtc-direct/listener.js b/examples/webrtc-direct/listener.js index e27cb66b61..8a822d0b4e 100644 --- a/examples/webrtc-direct/listener.js +++ b/examples/webrtc-direct/listener.js @@ -1,5 +1,5 @@ import { createLibp2p } from 'libp2p' -import { WebRTCDirect } from '@achingbrain/webrtc-direct' +import { WebRTCDirect } from '@libp2p/webrtc-direct' import { Mplex } from '@libp2p/mplex' import { Noise } from '@chainsafe/libp2p-noise' import { createFromJSON } from '@libp2p/peer-id-factory' diff --git a/examples/webrtc-direct/package.json b/examples/webrtc-direct/package.json index c1d34ffee2..57d33ee0a5 100644 --- a/examples/webrtc-direct/package.json +++ b/examples/webrtc-direct/package.json @@ -12,7 +12,7 @@ "@libp2p/webrtc-direct": "^2.0.0", "@chainsafe/libp2p-noise": "^6.2.0", "@libp2p/bootstrap": "^2.0.0", - "@libp2p/mplex": "^2.0.0", + "@libp2p/mplex": "^3.0.0", "libp2p": "../../", "wrtc": "^0.4.7" }, diff --git a/package.json b/package.json index 997908acdc..37e572e957 100644 --- a/package.json +++ b/package.json @@ -97,11 +97,11 @@ }, "dependencies": { "@achingbrain/nat-port-mapper": "^1.0.3", - "@libp2p/components": "^1.0.0", - "@libp2p/connection": "^3.0.0", + "@libp2p/components": "^2.0.0", + "@libp2p/connection": "^4.0.0", "@libp2p/crypto": "^1.0.0", "@libp2p/interface-address-manager": "^1.0.1", - "@libp2p/interface-connection": "^1.0.1", + "@libp2p/interface-connection": "^2.0.0", "@libp2p/interface-connection-encrypter": "^1.0.2", "@libp2p/interface-content-routing": "^1.0.1", "@libp2p/interface-dht": "^1.0.0", @@ -111,18 +111,18 @@ "@libp2p/interface-peer-info": "^1.0.1", "@libp2p/interface-peer-routing": "^1.0.0", "@libp2p/interface-peer-store": "^1.0.0", - "@libp2p/interface-pubsub": "^1.0.1", - "@libp2p/interface-registrar": "^1.0.0", + "@libp2p/interface-pubsub": "^1.0.3", + "@libp2p/interface-registrar": "^2.0.0", "@libp2p/interface-stream-muxer": "^1.0.1", "@libp2p/interface-transport": "^1.0.0", "@libp2p/interfaces": "^3.0.2", "@libp2p/logger": "^2.0.0", - "@libp2p/multistream-select": "^2.0.0", + "@libp2p/multistream-select": "^2.0.1", "@libp2p/peer-collections": "^1.0.2", "@libp2p/peer-id": "^1.1.10", "@libp2p/peer-id-factory": "^1.0.9", "@libp2p/peer-record": "^2.0.0", - "@libp2p/peer-store": "^2.0.0", + "@libp2p/peer-store": "^3.0.0", "@libp2p/tracked-map": "^1.0.5", "@libp2p/utils": "^2.0.0", "@multiformats/mafmt": "^11.0.2", @@ -171,24 +171,24 @@ "@libp2p/daemon-server": "^2.0.0", "@libp2p/delegated-content-routing": "^2.0.0", "@libp2p/delegated-peer-routing": "^2.0.0", - "@libp2p/floodsub": "^2.0.0", + "@libp2p/floodsub": "^3.0.0", "@libp2p/interface-compliance-tests": "^3.0.1", "@libp2p/interface-connection-encrypter-compliance-tests": "^1.0.0", - "@libp2p/interface-mocks": "^1.0.1", + "@libp2p/interface-mocks": "^2.0.0", "@libp2p/interop": "^2.0.0", - "@libp2p/kad-dht": "^2.0.0", + "@libp2p/kad-dht": "^3.0.0", "@libp2p/mdns": "^2.0.0", - "@libp2p/mplex": "^2.0.0", - "@libp2p/pubsub": "^2.0.0", - "@libp2p/tcp": "^2.0.0", - "@libp2p/topology": "^2.0.0", + "@libp2p/mplex": "^3.0.0", + "@libp2p/pubsub": "^3.0.1", + "@libp2p/tcp": "^3.0.0", + "@libp2p/topology": "^3.0.0", "@libp2p/webrtc-star": "^2.0.0", - "@libp2p/websockets": "^2.0.0", + "@libp2p/websockets": "^3.0.0", "@types/node-forge": "^1.0.0", "@types/p-fifo": "^1.0.0", "@types/varint": "^6.0.0", "@types/xsalsa20": "^1.1.0", - "aegir": "^37.0.9", + "aegir": "^37.3.0", "cborg": "^1.8.1", "delay": "^5.0.0", "execa": "^6.1.0", diff --git a/src/circuit/circuit/hop.ts b/src/circuit/circuit/hop.ts index 4f5474d150..d8bbe03ed6 100644 --- a/src/circuit/circuit/hop.ts +++ b/src/circuit/circuit/hop.ts @@ -134,7 +134,7 @@ export async function hop (options: HopConfig): Promise> { } = options // Create a new stream to the relay - const { stream } = await connection.newStream(RELAY_CODEC) + const stream = await connection.newStream(RELAY_CODEC) // Send the HOP request const streamHandler = new StreamHandler({ stream }) streamHandler.write(request) @@ -169,7 +169,7 @@ export async function canHop (options: CanHopOptions) { } = options // Create a new stream to the relay - const { stream } = await connection.newStream(RELAY_CODEC) + const stream = await connection.newStream(RELAY_CODEC) // Send the HOP request const streamHandler = new StreamHandler({ stream }) diff --git a/src/circuit/circuit/stop.ts b/src/circuit/circuit/stop.ts index c953ce56bb..75c97f66f1 100644 --- a/src/circuit/circuit/stop.ts +++ b/src/circuit/circuit/stop.ts @@ -56,7 +56,7 @@ export async function stop (options: StopOptions) { request } = options - const { stream } = await connection.newStream([RELAY_CODEC]) + const stream = await connection.newStream([RELAY_CODEC]) log('starting stop request to %p', connection.remotePeer) const streamHandler = new StreamHandler({ stream }) diff --git a/src/config.ts b/src/config.ts index 432d7e9a2b..f261914213 100644 --- a/src/config.ts +++ b/src/config.ts @@ -79,13 +79,21 @@ const DefaultConfig: Partial = { host: { agentVersion: AGENT_VERSION }, - timeout: 30000 + timeout: 30000, + maxInboundStreams: 1, + maxOutboundStreams: 1, + maxPushIncomingStreams: 1, + maxPushOutgoingStreams: 1 }, ping: { - protocolPrefix: 'ipfs' + protocolPrefix: 'ipfs', + maxInboundStreams: 1, + maxOutboundStreams: 1 }, fetch: { - protocolPrefix: 'libp2p' + protocolPrefix: 'libp2p', + maxInboundStreams: 1, + maxOutboundStreams: 1 } } diff --git a/src/errors.ts b/src/errors.ts index ec02ed6002..50e2441d1a 100644 --- a/src/errors.ts +++ b/src/errors.ts @@ -70,5 +70,8 @@ export enum codes { ERR_NOT_IMPLEMENTED = 'ERR_NOT_IMPLEMENTED', ERR_WRONG_PING_ACK = 'ERR_WRONG_PING_ACK', ERR_INVALID_RECORD = 'ERR_INVALID_RECORD', - ERR_ALREADY_SUCCEEDED = 'ERR_ALREADY_SUCCEEDED' + ERR_ALREADY_SUCCEEDED = 'ERR_ALREADY_SUCCEEDED', + ERR_NO_HANDLER_FOR_PROTOCOL = 'ERR_NO_HANDLER_FOR_PROTOCOL', + ERR_TOO_MANY_OUTBOUND_PROTOCOL_STREAMS = 'ERR_TOO_MANY_OUTBOUND_PROTOCOL_STREAMS', + ERR_TOO_MANY_INBOUND_PROTOCOL_STREAMS = 'ERR_TOO_MANY_INBOUND_PROTOCOL_STREAMS' } diff --git a/src/fetch/index.ts b/src/fetch/index.ts index 218b182fe5..04d7efc817 100644 --- a/src/fetch/index.ts +++ b/src/fetch/index.ts @@ -3,7 +3,6 @@ import errCode from 'err-code' import { codes } from '../errors.js' import * as lp from 'it-length-prefixed' import { FetchRequest, FetchResponse } from './pb/proto.js' -import { handshake } from 'it-handshake' import { PROTOCOL_NAME, PROTOCOL_VERSION } from './constants.js' import type { PeerId } from '@libp2p/interface-peer-id' import type { Startable } from '@libp2p/interfaces/startable' @@ -13,11 +12,15 @@ import type { Components } from '@libp2p/components' import type { AbortOptions } from '@libp2p/interfaces' import type { Duplex } from 'it-stream-types' import { abortableDuplex } from 'abortable-iterator' +import { pipe } from 'it-pipe' +import first from 'it-first' const log = logger('libp2p:fetch') export interface FetchServiceInit { protocolPrefix: string + maxInboundStreams: number + maxOutboundStreams: number } export interface HandleMessageOptions { @@ -40,6 +43,7 @@ export class FetchService implements Startable { private readonly components: Components private readonly lookupFunctions: Map private started: boolean + private readonly init: FetchServiceInit constructor (components: Components, init: FetchServiceInit) { this.started = false @@ -47,13 +51,21 @@ export class FetchService implements Startable { this.protocol = `/${init.protocolPrefix ?? 'libp2p'}/${PROTOCOL_NAME}/${PROTOCOL_VERSION}` this.lookupFunctions = new Map() // Maps key prefix to value lookup function this.handleMessage = this.handleMessage.bind(this) + this.init = init } async start () { await this.components.getRegistrar().handle(this.protocol, (data) => { - void this.handleMessage(data).catch(err => { - log.error(err) - }) + void this.handleMessage(data) + .catch(err => { + log.error(err) + }) + .finally(() => { + data.stream.close() + }) + }, { + maxInboundStreams: this.init.maxInboundStreams, + maxOutboundStreams: this.init.maxOutboundStreams }) this.started = true } @@ -74,7 +86,7 @@ export class FetchService implements Startable { log('dialing %s to %p', this.protocol, peer) const connection = await this.components.getConnectionManager().openConnection(peer, options) - const { stream } = await connection.newStream([this.protocol], options) + const stream = await connection.newStream([this.protocol], options) let source: Duplex = stream // make stream abortable if AbortSignal passed @@ -82,28 +94,42 @@ export class FetchService implements Startable { source = abortableDuplex(stream, options.signal) } - const shake = handshake(source) - - // send message - shake.write(lp.encode.single(FetchRequest.encode({ identifier: key })).slice()) - - // read response - // @ts-expect-error fromReader returns a Source which has no .next method - const response = FetchResponse.decode((await lp.decode.fromReader(shake.reader).next()).value.slice()) - switch (response.status) { - case (FetchResponse.StatusCode.OK): { - return response.data - } - case (FetchResponse.StatusCode.NOT_FOUND): { - return null - } - case (FetchResponse.StatusCode.ERROR): { - const errmsg = (new TextDecoder()).decode(response.data) - throw errCode(new Error('Error in fetch protocol response: ' + errmsg), codes.ERR_INVALID_PARAMETERS) - } - default: { - throw errCode(new Error('Unknown response status'), codes.ERR_INVALID_MESSAGE) - } + try { + const result = await pipe( + [FetchRequest.encode({ identifier: key })], + lp.encode(), + source, + lp.decode(), + async function (source) { + const buf = await first(source) + + if (buf == null) { + throw errCode(new Error('No data received'), codes.ERR_INVALID_MESSAGE) + } + + const response = FetchResponse.decode(buf) + + switch (response.status) { + case (FetchResponse.StatusCode.OK): { + return response.data + } + case (FetchResponse.StatusCode.NOT_FOUND): { + return null + } + case (FetchResponse.StatusCode.ERROR): { + const errmsg = (new TextDecoder()).decode(response.data) + throw errCode(new Error('Error in fetch protocol response: ' + errmsg), codes.ERR_INVALID_PARAMETERS) + } + default: { + throw errCode(new Error('Unknown response status'), codes.ERR_INVALID_MESSAGE) + } + } + } + ) + + return result ?? null + } finally { + stream.close() } } @@ -114,25 +140,40 @@ export class FetchService implements Startable { */ async handleMessage (data: IncomingStreamData) { const { stream } = data - const shake = handshake(stream) - // @ts-expect-error fromReader returns a Source which has no .next method - const request = FetchRequest.decode((await lp.decode.fromReader(shake.reader).next()).value.slice()) - - let response: FetchResponse - const lookup = this._getLookupFunction(request.identifier) - if (lookup != null) { - const data = await lookup(request.identifier) - if (data != null) { - response = { status: FetchResponse.StatusCode.OK, data } - } else { - response = { status: FetchResponse.StatusCode.NOT_FOUND, data: new Uint8Array(0) } - } - } else { - const errmsg = (new TextEncoder()).encode('No lookup function registered for key: ' + request.identifier) - response = { status: FetchResponse.StatusCode.ERROR, data: errmsg } - } - - shake.write(lp.encode.single(FetchResponse.encode(response)).slice()) + const self = this + + await pipe( + stream, + lp.decode(), + async function * (source) { + const buf = await first(source) + + if (buf == null) { + throw errCode(new Error('No data received'), codes.ERR_INVALID_MESSAGE) + } + + // for await (const buf of source) { + const request = FetchRequest.decode(buf) + + let response: FetchResponse + const lookup = self._getLookupFunction(request.identifier) + if (lookup != null) { + const data = await lookup(request.identifier) + if (data != null) { + response = { status: FetchResponse.StatusCode.OK, data } + } else { + response = { status: FetchResponse.StatusCode.NOT_FOUND, data: new Uint8Array(0) } + } + } else { + const errmsg = (new TextEncoder()).encode('No lookup function registered for key: ' + request.identifier) + response = { status: FetchResponse.StatusCode.ERROR, data: errmsg } + } + + yield FetchResponse.encode(response) + }, + lp.encode(), + stream + ) } /** diff --git a/src/identify/index.ts b/src/identify/index.ts index b3fe0e9e33..e921308db5 100644 --- a/src/identify/index.ts +++ b/src/identify/index.ts @@ -60,6 +60,12 @@ export interface IdentifyServiceInit { * Identify responses larger than this in bytes will be rejected (default: 8192) */ maxIdentifyMessageSize?: number + + maxInboundStreams: number + maxOutboundStreams: number + + maxPushIncomingStreams: number + maxPushOutgoingStreams: number } export class IdentifyService implements Startable { @@ -129,11 +135,17 @@ export class IdentifyService implements Startable { void this._handleIdentify(data).catch(err => { log.error(err) }) + }, { + maxInboundStreams: this.init.maxInboundStreams, + maxOutboundStreams: this.init.maxOutboundStreams }) await this.components.getRegistrar().handle(this.identifyPushProtocolStr, (data) => { void this._handlePush(data).catch(err => { log.error(err) }) + }, { + maxInboundStreams: this.init.maxPushIncomingStreams, + maxOutboundStreams: this.init.maxPushOutgoingStreams }) this.started = true @@ -159,10 +171,9 @@ export class IdentifyService implements Startable { let stream: Stream | undefined try { - const data = await connection.newStream([this.identifyPushProtocolStr], { + stream = await connection.newStream([this.identifyPushProtocolStr], { signal: timeoutController.signal }) - stream = data.stream // make stream abortable const source: Duplex = abortableDuplex(stream, timeoutController.signal) @@ -218,7 +229,7 @@ export class IdentifyService implements Startable { } async _identify (connection: Connection, options: AbortOptions = {}): Promise { - const { stream } = await connection.newStream([this.identifyProtocolStr], options) + const stream = await connection.newStream([this.identifyProtocolStr], options) let source: Duplex = stream let timeoutController let signal = options.signal diff --git a/src/index.ts b/src/index.ts index 62a300e64d..b7aa62878e 100644 --- a/src/index.ts +++ b/src/index.ts @@ -11,14 +11,14 @@ import type { PeerStore, PeerStoreInit } from '@libp2p/interface-peer-store' import type { PeerId } from '@libp2p/interface-peer-id' import type { AutoRelayConfig, RelayAdvertiseConfig } from './circuit/index.js' import type { PeerDiscovery } from '@libp2p/interface-peer-discovery' -import type { Connection, ConnectionGater, ConnectionProtector, ProtocolStream } from '@libp2p/interface-connection' +import type { Connection, ConnectionGater, ConnectionProtector, Stream } from '@libp2p/interface-connection' import type { Transport } from '@libp2p/interface-transport' import type { StreamMuxerFactory } from '@libp2p/interface-stream-muxer' import type { ConnectionEncrypter } from '@libp2p/interface-connection-encrypter' import type { PeerRouting } from '@libp2p/interface-peer-routing' import type { ContentRouting } from '@libp2p/interface-content-routing' import type { PubSub } from '@libp2p/interface-pubsub' -import type { Registrar, StreamHandler } from '@libp2p/interface-registrar' +import type { Registrar, StreamHandler, StreamHandlerOptions } from '@libp2p/interface-registrar' import type { ConnectionManager } from '@libp2p/interface-connection-manager' import type { Metrics, MetricsInit } from '@libp2p/interface-metrics' import type { PeerInfo } from '@libp2p/interface-peer-info' @@ -177,7 +177,7 @@ export interface Libp2p extends Startable, EventEmitter { * If successful, the known metadata of the peer will be added to the nodes `peerStore`, * and the `MuxedStream` will be returned together with the successful negotiated protocol. */ - dialProtocol: (peer: PeerId | Multiaddr, protocols: string | string[], options?: AbortOptions) => Promise + dialProtocol: (peer: PeerId | Multiaddr, protocols: string | string[], options?: AbortOptions) => Promise /** * Disconnects all connections to the given `peer` @@ -187,7 +187,7 @@ export interface Libp2p extends Startable, EventEmitter { /** * Registers the `handler` for each protocol */ - handle: (protocol: string | string[], handler: StreamHandler) => Promise + handle: (protocol: string | string[], handler: StreamHandler, options?: StreamHandlerOptions) => Promise /** * Removes the handler for each protocol. The protocol diff --git a/src/libp2p.ts b/src/libp2p.ts index b7ff6b4a0f..32676c48d4 100644 --- a/src/libp2p.ts +++ b/src/libp2p.ts @@ -33,7 +33,7 @@ import type { Connection } from '@libp2p/interface-connection' import type { PeerRouting } from '@libp2p/interface-peer-routing' import type { ContentRouting } from '@libp2p/interface-content-routing' import type { PubSub } from '@libp2p/interface-pubsub' -import type { Registrar, StreamHandler } from '@libp2p/interface-registrar' +import type { Registrar, StreamHandler, StreamHandlerOptions } from '@libp2p/interface-registrar' import type { ConnectionManager } from '@libp2p/interface-connection-manager' import type { PeerInfo } from '@libp2p/interface-peer-info' import type { Libp2p, Libp2pEvents, Libp2pInit, Libp2pOptions } from './index.js' @@ -490,14 +490,14 @@ export class Libp2pNode extends EventEmitter implements Libp2p { return await this.pingService.ping(id, options) } - async handle (protocols: string | string[], handler: StreamHandler): Promise { + async handle (protocols: string | string[], handler: StreamHandler, options?: StreamHandlerOptions): Promise { if (!Array.isArray(protocols)) { protocols = [protocols] } await Promise.all( protocols.map(async protocol => { - await this.components.getRegistrar().handle(protocol, handler) + await this.components.getRegistrar().handle(protocol, handler, options) }) ) } diff --git a/src/ping/index.ts b/src/ping/index.ts index c7766ed339..f755bd51bc 100644 --- a/src/ping/index.ts +++ b/src/ping/index.ts @@ -18,21 +18,28 @@ const log = logger('libp2p:ping') export interface PingServiceInit { protocolPrefix: string + maxInboundStreams: number + maxOutboundStreams: number } export class PingService implements Startable { public readonly protocol: string private readonly components: Components private started: boolean + private readonly init: PingServiceInit constructor (components: Components, init: PingServiceInit) { this.components = components this.started = false this.protocol = `/${init.protocolPrefix}/${PROTOCOL_NAME}/${PROTOCOL_VERSION}` + this.init = init } async start () { - await this.components.getRegistrar().handle(this.protocol, this.handleMessage) + await this.components.getRegistrar().handle(this.protocol, this.handleMessage, { + maxInboundStreams: this.init.maxInboundStreams, + maxOutboundStreams: this.init.maxOutboundStreams + }) this.started = true } @@ -67,7 +74,7 @@ export class PingService implements Startable { log('dialing %s to %p', this.protocol, peer) const connection = await this.components.getConnectionManager().openConnection(peer, options) - const { stream } = await connection.newStream([this.protocol], options) + const stream = await connection.newStream([this.protocol], options) const start = Date.now() const data = randomBytes(PING_LENGTH) diff --git a/src/registrar.ts b/src/registrar.ts index 5cc8a7e710..d9304d2574 100644 --- a/src/registrar.ts +++ b/src/registrar.ts @@ -10,8 +10,8 @@ import type { Components } from '@libp2p/components' const log = logger('libp2p:registrar') -const DEFAULT_MAX_INCOMING_STREAMS = 1 -const DEFAULT_MAX_OUTGOING_STREAMS = 1 +export const DEFAULT_MAX_INBOUND_STREAMS = 1 +export const DEFAULT_MAX_OUTBOUND_STREAMS = 1 /** * Responsible for notifying registered protocols of events in the network. @@ -46,7 +46,7 @@ export class DefaultRegistrar implements Registrar { const handler = this.handlers.get(protocol) if (handler == null) { - throw new Error(`No handler registered for protocol ${protocol}`) + throw errCode(new Error(`No handler registered for protocol ${protocol}`), codes.ERR_NO_HANDLER_FOR_PROTOCOL) } return handler @@ -72,9 +72,9 @@ export class DefaultRegistrar implements Registrar { throw errCode(new Error(`Handler already registered for protocol ${protocol}`), codes.ERR_PROTOCOL_HANDLER_ALREADY_REGISTERED) } - const options = merge({ - maxIncomingStreams: DEFAULT_MAX_INCOMING_STREAMS, - maxOutgoingStreams: DEFAULT_MAX_OUTGOING_STREAMS + const options = merge.bind({ ignoreUndefined: true })({ + maxInboundStreams: DEFAULT_MAX_INBOUND_STREAMS, + maxOutboundStreams: DEFAULT_MAX_OUTBOUND_STREAMS }, opts) this.handlers.set(protocol, { diff --git a/src/upgrader.ts b/src/upgrader.ts index 961fd10da6..cb344dff6e 100644 --- a/src/upgrader.ts +++ b/src/upgrader.ts @@ -8,7 +8,7 @@ import { codes } from './errors.js' import { createConnection } from '@libp2p/connection' import { CustomEvent, EventEmitter } from '@libp2p/interfaces/events' import { peerIdFromString } from '@libp2p/peer-id' -import type { MultiaddrConnection, Connection, ProtocolStream, Stream } from '@libp2p/interface-connection' +import type { MultiaddrConnection, Connection, Stream } from '@libp2p/interface-connection' import type { ConnectionEncrypter, SecuredConnection } from '@libp2p/interface-connection-encrypter' import type { StreamMuxer, StreamMuxerFactory } from '@libp2p/interface-stream-muxer' import type { PeerId } from '@libp2p/interface-peer-id' @@ -16,6 +16,8 @@ import type { Upgrader, UpgraderEvents } from '@libp2p/interface-transport' import type { Duplex } from 'it-stream-types' import { Components, isInitializable } from '@libp2p/components' import type { AbortOptions } from '@libp2p/interfaces' +import type { Registrar } from '@libp2p/interface-registrar' +import { DEFAULT_MAX_INBOUND_STREAMS, DEFAULT_MAX_OUTBOUND_STREAMS } from './registrar.js' const log = logger('libp2p:upgrader') @@ -43,6 +45,46 @@ export interface UpgraderInit { muxers: StreamMuxerFactory[] } +function findIncomingStreamLimit (protocol: string, registrar: Registrar) { + try { + const { options } = registrar.getHandler(protocol) + + return options.maxInboundStreams + } catch (err: any) { + if (err.code !== codes.ERR_NO_HANDLER_FOR_PROTOCOL) { + throw err + } + } + + return DEFAULT_MAX_INBOUND_STREAMS +} + +function findOutgoingStreamLimit (protocol: string, registrar: Registrar) { + try { + const { options } = registrar.getHandler(protocol) + + return options.maxOutboundStreams + } catch (err: any) { + if (err.code !== codes.ERR_NO_HANDLER_FOR_PROTOCOL) { + throw err + } + } + + return DEFAULT_MAX_OUTBOUND_STREAMS +} + +function countStreams (protocol: string, direction: 'inbound' | 'outbound', connection: Connection) { + let streamCount = 0 + + connection.streams.forEach(stream => { + if (stream.stat.direction === direction && stream.stat.protocol === protocol) { + streamCount++ + } + }) + + return streamCount +} + export class DefaultUpgrader extends EventEmitter implements Upgrader { private readonly components: Components private readonly connectionEncryption: Map @@ -267,7 +309,7 @@ export class DefaultUpgrader extends EventEmitter implements Upg } = opts let muxer: StreamMuxer | undefined - let newStream: ((multicodecs: string[], options?: AbortOptions) => Promise) | undefined + let newStream: ((multicodecs: string[], options?: AbortOptions) => Promise) | undefined let connection: Connection // eslint-disable-line prefer-const if (muxerFactory != null) { @@ -296,13 +338,22 @@ export class DefaultUpgrader extends EventEmitter implements Upg return } - connection.addStream(muxedStream, { protocol }) + const incomingLimit = findIncomingStreamLimit(protocol, this.components.getRegistrar()) + const streamCount = countStreams(protocol, 'inbound', connection) + + if (streamCount === incomingLimit) { + throw errCode(new Error('Too many incoming protocol streams'), codes.ERR_TOO_MANY_INBOUND_PROTOCOL_STREAMS) + } + + muxedStream.stat.protocol = protocol + + connection.addStream(muxedStream) this._onStream({ connection, stream: { ...muxedStream, ...stream }, protocol }) }) .catch(err => { log.error(err) - if (muxedStream.timeline.close == null) { + if (muxedStream.stat.timeline.close == null) { muxedStream.close() } }) @@ -317,7 +368,7 @@ export class DefaultUpgrader extends EventEmitter implements Upg muxer.init(this.components) } - newStream = async (protocols: string[], options: AbortOptions = {}): Promise => { + newStream = async (protocols: string[], options: AbortOptions = {}): Promise => { if (muxer == null) { throw errCode(new Error('Stream is not multiplexed'), codes.ERR_MUXER_UNAVAILABLE) } @@ -334,11 +385,27 @@ export class DefaultUpgrader extends EventEmitter implements Upg stream = metrics.trackStream({ stream, remotePeer, protocol }) } - return { stream: { ...muxedStream, ...stream }, protocol } + const outgoingLimit = findOutgoingStreamLimit(protocol, this.components.getRegistrar()) + const streamCount = countStreams(protocol, 'outbound', connection) + + if (streamCount === outgoingLimit) { + throw errCode(new Error('Too many outgoing protocol streams'), codes.ERR_TOO_MANY_OUTBOUND_PROTOCOL_STREAMS) + } + + muxedStream.stat.protocol = protocol + + return { + ...muxedStream, + ...stream, + stat: { + ...muxedStream.stat, + protocol + } + } } catch (err: any) { log.error('could not create new stream', err) - if (muxedStream.timeline.close == null) { + if (muxedStream.stat.timeline.close == null) { muxedStream.close() } @@ -402,9 +469,7 @@ export class DefaultUpgrader extends EventEmitter implements Upg await maConn.close() // Ensure remaining streams are closed if (muxer != null) { - await Promise.all(muxer.streams.map(async stream => { - await stream.close() - })) + muxer.streams.forEach(s => s.close()) } } }) @@ -422,7 +487,8 @@ export class DefaultUpgrader extends EventEmitter implements Upg _onStream (opts: OnStreamOptions): void { const { connection, stream, protocol } = opts const { handler } = this.components.getRegistrar().getHandler(protocol) - handler({ connection, stream, protocol }) + + handler({ connection, stream }) } /** diff --git a/test/content-routing/dht/operation.node.ts b/test/content-routing/dht/operation.node.ts index 412ae7659f..82b03e6784 100644 --- a/test/content-routing/dht/operation.node.ts +++ b/test/content-routing/dht/operation.node.ts @@ -73,9 +73,9 @@ describe('DHT subsystem operates correctly', () => { }) it('should get notified of connected peers on dial', async () => { - const connection = await libp2p.dialProtocol(remAddr, subsystemMulticodecs) + const stream = await libp2p.dialProtocol(remAddr, subsystemMulticodecs) - expect(connection).to.exist() + expect(stream).to.exist() return await Promise.all([ pWaitFor(() => libp2p.dht.lan.routingTable.size === 1), diff --git a/test/dialing/direct.node.ts b/test/dialing/direct.node.ts index c5e3b4f5dd..428aba065a 100644 --- a/test/dialing/direct.node.ts +++ b/test/dialing/direct.node.ts @@ -307,9 +307,9 @@ describe('libp2p.dialer (direct, TCP)', () => { const connection = await libp2p.dial(remoteAddr) expect(connection).to.exist() - const { stream, protocol } = await connection.newStream(['/echo/1.0.0']) + const stream = await connection.newStream(['/echo/1.0.0']) expect(stream).to.exist() - expect(protocol).to.equal('/echo/1.0.0') + expect(stream).to.have.nested.property('stat.protocol', '/echo/1.0.0') expect(dialerDialSpy.callCount).to.be.greaterThan(0) await connection.close() }) @@ -336,9 +336,9 @@ describe('libp2p.dialer (direct, TCP)', () => { const connection = await libp2p.dial(remotePeerId) expect(connection).to.exist() - const { stream, protocol } = await connection.newStream('/echo/1.0.0') + const stream = await connection.newStream('/echo/1.0.0') expect(stream).to.exist() - expect(protocol).to.equal('/echo/1.0.0') + expect(stream).to.have.nested.property('stat.protocol', '/echo/1.0.0') await connection.close() expect(dialerDialSpy.callCount).to.be.greaterThan(0) }) @@ -377,7 +377,7 @@ describe('libp2p.dialer (direct, TCP)', () => { const connection = await libp2p.dial(remotePeerId) // Create local to remote streams - const { stream } = await connection.newStream('/echo/1.0.0') + const stream = await connection.newStream('/echo/1.0.0') await connection.newStream('/stream-count/3') await libp2p.dialProtocol(remoteLibp2p.peerId, '/stream-count/4') @@ -487,9 +487,9 @@ describe('libp2p.dialer (direct, TCP)', () => { const connection = await libp2p.dial(remoteAddr) expect(connection).to.exist() - const { stream, protocol } = await connection.newStream('/echo/1.0.0') + const stream = await connection.newStream('/echo/1.0.0') expect(stream).to.exist() - expect(protocol).to.equal('/echo/1.0.0') + expect(stream).to.have.nested.property('stat.protocol', '/echo/1.0.0') await connection.close() expect(protectorProtectSpy.callCount).to.equal(1) }) diff --git a/test/dialing/direct.spec.ts b/test/dialing/direct.spec.ts index e4112c97cc..1af0e7c6a5 100644 --- a/test/dialing/direct.spec.ts +++ b/test/dialing/direct.spec.ts @@ -427,9 +427,9 @@ describe('libp2p.dialer (direct, WebSockets)', () => { const connection = await libp2p.dial(MULTIADDRS_WEBSOCKETS[0]) expect(connection).to.exist() - const { stream, protocol } = await connection.newStream('/echo/1.0.0') + const stream = await connection.newStream('/echo/1.0.0') expect(stream).to.exist() - expect(protocol).to.equal('/echo/1.0.0') + expect(stream).to.have.nested.property('stat.protocol', '/echo/1.0.0') await connection.close() expect(dialerDialSpy.callCount).to.be.at.least(1) expect(addressBookAddSpy.callCount).to.be.at.least(1) diff --git a/test/fetch/index.spec.ts b/test/fetch/index.spec.ts index f776d77727..eadf4f2991 100644 --- a/test/fetch/index.spec.ts +++ b/test/fetch/index.spec.ts @@ -2,7 +2,7 @@ import { expect } from 'aegir/chai' import sinon from 'sinon' -import { FetchService } from '../../src/fetch/index.js' +import { FetchService, FetchServiceInit } from '../../src/fetch/index.js' import Peers from '../fixtures/peers.js' import { mockRegistrar, mockUpgrader, connectionPair } from '@libp2p/interface-mocks' import { createFromJSON } from '@libp2p/peer-id-factory' @@ -14,8 +14,10 @@ import { TimeoutController } from 'timeout-abort-controller' import delay from 'delay' import { pipe } from 'it-pipe' -const defaultInit = { - protocolPrefix: 'ipfs' +const defaultInit: FetchServiceInit = { + protocolPrefix: 'ipfs', + maxInboundStreams: 1, + maxOutboundStreams: 1 } async function createComponents (index: number) { @@ -127,7 +129,7 @@ describe('fetch', () => { // should have closed stream expect(newStreamSpy).to.have.property('callCount', 1) - const { stream } = await newStreamSpy.getCall(0).returnValue - expect(stream).to.have.nested.property('timeline.close') + const stream = await newStreamSpy.getCall(0).returnValue + expect(stream).to.have.nested.property('stat.timeline.close') }) }) diff --git a/test/identify/index.spec.ts b/test/identify/index.spec.ts index b59bbe0796..3d6623a0f0 100644 --- a/test/identify/index.spec.ts +++ b/test/identify/index.spec.ts @@ -6,7 +6,7 @@ import sinon from 'sinon' import { Multiaddr } from '@multiformats/multiaddr' import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string' import { codes } from '../../src/errors.js' -import { IdentifyService, Message } from '../../src/identify/index.js' +import { IdentifyService, IdentifyServiceInit, Message } from '../../src/identify/index.js' import Peers from '../fixtures/peers.js' import { PersistentPeerStore } from '@libp2p/peer-store' import { DefaultAddressManager } from '../../src/address-manager/index.js' @@ -32,11 +32,15 @@ import pDefer from 'p-defer' const listenMaddrs = [new Multiaddr('/ip4/127.0.0.1/tcp/15002/ws')] -const defaultInit = { +const defaultInit: IdentifyServiceInit = { protocolPrefix: 'ipfs', host: { agentVersion: 'v1.0.0' - } + }, + maxInboundStreams: 1, + maxOutboundStreams: 1, + maxPushIncomingStreams: 1, + maxPushOutgoingStreams: 1 } const protocols = [MULTICODEC_IDENTIFY, MULTICODEC_IDENTIFY_PUSH] @@ -130,6 +134,7 @@ describe('identify', () => { it('should be able to identify another peer with no certified peer records support', async () => { const agentVersion = 'js-libp2p/5.0.0' const localIdentify = new IdentifyService(localComponents, { + ...defaultInit, protocolPrefix: 'ipfs', host: { agentVersion: agentVersion @@ -137,6 +142,7 @@ describe('identify', () => { }) await start(localIdentify) const remoteIdentify = new IdentifyService(remoteComponents, { + ...defaultInit, protocolPrefix: 'ipfs', host: { agentVersion: agentVersion @@ -209,6 +215,7 @@ describe('identify', () => { it('should store own host data and protocol version into metadataBook on start', async () => { const agentVersion = 'js-project/1.0.0' const localIdentify = new IdentifyService(localComponents, { + ...defaultInit, protocolPrefix: 'ipfs', host: { agentVersion @@ -270,8 +277,8 @@ describe('identify', () => { // should have closed stream expect(newStreamSpy).to.have.property('callCount', 1) - const { stream } = await newStreamSpy.getCall(0).returnValue - expect(stream).to.have.nested.property('timeline.close') + const stream = await newStreamSpy.getCall(0).returnValue + expect(stream).to.have.nested.property('stat.timeline.close') }) it('should limit incoming identify message sizes', async () => { diff --git a/test/identify/push.spec.ts b/test/identify/push.spec.ts index e9b5a7dfbd..9dbf95c8ba 100644 --- a/test/identify/push.spec.ts +++ b/test/identify/push.spec.ts @@ -3,7 +3,7 @@ import { expect } from 'aegir/chai' import sinon from 'sinon' import { Multiaddr } from '@multiformats/multiaddr' -import { IdentifyService } from '../../src/identify/index.js' +import { IdentifyService, IdentifyServiceInit } from '../../src/identify/index.js' import Peers from '../fixtures/peers.js' import { PersistentPeerStore } from '@libp2p/peer-store' import { DefaultAddressManager } from '../../src/address-manager/index.js' @@ -27,11 +27,15 @@ import { start, stop } from '@libp2p/interfaces/startable' const listenMaddrs = [new Multiaddr('/ip4/127.0.0.1/tcp/15002/ws')] -const defaultInit = { +const defaultInit: IdentifyServiceInit = { protocolPrefix: 'ipfs', host: { agentVersion: 'v1.0.0' - } + }, + maxInboundStreams: 1, + maxOutboundStreams: 1, + maxPushIncomingStreams: 1, + maxPushOutgoingStreams: 1 } const protocols = [MULTICODEC_IDENTIFY, MULTICODEC_IDENTIFY_PUSH] @@ -213,8 +217,8 @@ describe('identify (push)', () => { // should have closed stream expect(newStreamSpy).to.have.property('callCount', 1) - const { stream } = await newStreamSpy.getCall(0).returnValue - expect(stream).to.have.nested.property('timeline.close') + const stream = await newStreamSpy.getCall(0).returnValue + expect(stream).to.have.nested.property('stat.timeline.close') // method should have returned before the remote handler completes as we timed // out so we ignore the return value diff --git a/test/metrics/index.node.ts b/test/metrics/index.node.ts index 1be7558d2f..89296b1e7b 100644 --- a/test/metrics/index.node.ts +++ b/test/metrics/index.node.ts @@ -91,7 +91,7 @@ describe('libp2p.metrics', () => { }) const connection = await libp2p.dial(remoteLibp2p.peerId) - const { stream } = await connection.newStream('/echo/1.0.0') + const stream = await connection.newStream('/echo/1.0.0') const bytes = randomBytes(512) const result = await pipe( @@ -156,7 +156,7 @@ describe('libp2p.metrics', () => { }) const connection = await libp2p.dial(remoteLibp2p.peerId) - const { stream } = await connection.newStream('/echo/1.0.0') + const stream = await connection.newStream('/echo/1.0.0') const bytes = randomBytes(512) await pipe( diff --git a/test/ping/index.spec.ts b/test/ping/index.spec.ts index b8be7e6425..1acf352eec 100644 --- a/test/ping/index.spec.ts +++ b/test/ping/index.spec.ts @@ -2,7 +2,7 @@ import { expect } from 'aegir/chai' import sinon from 'sinon' -import { PingService } from '../../src/ping/index.js' +import { PingService, PingServiceInit } from '../../src/ping/index.js' import Peers from '../fixtures/peers.js' import { mockRegistrar, mockUpgrader, connectionPair } from '@libp2p/interface-mocks' import { createFromJSON } from '@libp2p/peer-id-factory' @@ -14,8 +14,10 @@ import { TimeoutController } from 'timeout-abort-controller' import delay from 'delay' import { pipe } from 'it-pipe' -const defaultInit = { - protocolPrefix: 'ipfs' +const defaultInit: PingServiceInit = { + protocolPrefix: 'ipfs', + maxInboundStreams: 1, + maxOutboundStreams: 1 } async function createComponents (index: number) { @@ -116,7 +118,7 @@ describe('ping', () => { // should have closed stream expect(newStreamSpy).to.have.property('callCount', 1) - const { stream } = await newStreamSpy.getCall(0).returnValue - expect(stream).to.have.nested.property('timeline.close') + const stream = await newStreamSpy.getCall(0).returnValue + expect(stream).to.have.nested.property('stat.timeline.close') }) }) diff --git a/test/ping/ping.node.ts b/test/ping/ping.node.ts index 8b64ffba60..5ee6a13a38 100644 --- a/test/ping/ping.node.ts +++ b/test/ping/ping.node.ts @@ -1,7 +1,6 @@ /* eslint-env mocha */ import { expect } from 'aegir/chai' -import pTimes from 'p-times' import { pipe } from 'it-pipe' import { createNode, populateAddressBooks } from '../utils/creators/peer.js' import { createBaseOptions } from '../utils/base-options.js' @@ -41,7 +40,11 @@ describe('ping', () => { }) it('ping several times for getting an average', async () => { - const latencies = await pTimes(5, async () => await nodes[1].ping(nodes[0].peerId)) + const latencies = [] + + for (let i = 0; i < 5; i++) { + latencies.push(await nodes[1].ping(nodes[0].peerId)) + } const averageLatency = latencies.reduce((p, c) => p + c, 0) / latencies.length expect(averageLatency).to.be.a('Number') diff --git a/test/relay/relay.node.ts b/test/relay/relay.node.ts index 8b33ff07bb..60e1bf5f60 100644 --- a/test/relay/relay.node.ts +++ b/test/relay/relay.node.ts @@ -80,7 +80,7 @@ describe('Dialing (via relay, TCP)', () => { expect(connection.remotePeer.toBytes()).to.eql(dstLibp2p.peerId.toBytes()) expect(connection.remoteAddr).to.eql(dialAddr) - const { stream: echoStream } = await connection.newStream('/echo/1.0.0') + const echoStream = await connection.newStream('/echo/1.0.0') const input = uint8ArrayFromString('hello') const [output] = await pipe( @@ -156,7 +156,7 @@ describe('Dialing (via relay, TCP)', () => { // send an invalid relay message from the relay to the destination peer const connections = relayLibp2p.getConnections(dstLibp2p.peerId) - const { stream } = await connections[0].newStream(RELAY_CODEC) + const stream = await connections[0].newStream(RELAY_CODEC) const streamHandler = new StreamHandler({ stream }) streamHandler.write({ type: CircuitRelay.Type.STATUS diff --git a/test/upgrading/upgrader.spec.ts b/test/upgrading/upgrader.spec.ts index 216343d513..ac7806bac3 100644 --- a/test/upgrading/upgrader.spec.ts +++ b/test/upgrading/upgrader.spec.ts @@ -85,9 +85,15 @@ describe('Upgrader', () => { await localComponents.getRegistrar().handle('/echo/1.0.0', ({ stream }) => { void pipe(stream, stream) + }, { + maxInboundStreams: 10, + maxOutboundStreams: 10 }) await remoteComponents.getRegistrar().handle('/echo/1.0.0', ({ stream }) => { void pipe(stream, stream) + }, { + maxInboundStreams: 10, + maxOutboundStreams: 10 }) }) @@ -105,8 +111,8 @@ describe('Upgrader', () => { expect(connections).to.have.length(2) - const { stream, protocol } = await connections[0].newStream('/echo/1.0.0') - expect(protocol).to.equal('/echo/1.0.0') + const stream = await connections[0].newStream('/echo/1.0.0') + expect(stream).to.have.nested.property('stat.protocol', '/echo/1.0.0') const hello = uint8ArrayFromString('hello there!') const result = await pipe( @@ -175,8 +181,8 @@ describe('Upgrader', () => { expect(connections).to.have.length(2) - const { stream, protocol } = await connections[0].newStream('/echo/1.0.0') - expect(protocol).to.equal('/echo/1.0.0') + const stream = await connections[0].newStream('/echo/1.0.0') + expect(stream).to.have.nested.property('stat.protocol', '/echo/1.0.0') const hello = uint8ArrayFromString('hello there!') const result = await pipe( @@ -515,11 +521,11 @@ describe('libp2p.upgrader', () => { ]) const remoteLibp2pUpgraderOnStreamSpy = sinon.spy(remoteLibp2p.components.getUpgrader() as DefaultUpgrader, '_onStream') - const { stream } = await localConnection.newStream(['/echo/1.0.0']) - expect(stream).to.include.keys(['id', 'close', 'reset', 'timeline']) + const stream = await localConnection.newStream(['/echo/1.0.0']) + expect(stream).to.include.keys(['id', 'close', 'reset', 'stat']) const [arg0] = remoteLibp2pUpgraderOnStreamSpy.getCall(0).args - expect(arg0.stream).to.include.keys(['id', 'close', 'reset', 'timeline']) + expect(arg0.stream).to.include.keys(['id', 'close', 'reset', 'stat']) }) it('should emit connect and disconnect events', async () => { @@ -579,4 +585,128 @@ describe('libp2p.upgrader', () => { // @ts-expect-error detail is only on CustomEvent type expect(remotePeer.equals(event.detail.remotePeer)).to.equal(true) }) + + it('should limit the number of incoming streams that can be opened using a protocol', async () => { + const protocol = '/a-test-protocol/1.0.0' + const remotePeer = peers[1] + libp2p = await createLibp2pNode({ + peerId: peers[0], + transports: [ + new WebSockets() + ], + streamMuxers: [ + new Mplex() + ], + connectionEncryption: [ + NOISE + ] + }) + await libp2p.start() + + remoteLibp2p = await createLibp2pNode({ + peerId: remotePeer, + transports: [ + new WebSockets() + ], + streamMuxers: [ + new Mplex() + ], + connectionEncryption: [ + NOISE + ] + }) + await remoteLibp2p.start() + + const { inbound, outbound } = mockMultiaddrConnPair({ addrs, remotePeer }) + + const [localToRemote] = await Promise.all([ + libp2p.components.getUpgrader().upgradeOutbound(outbound), + remoteLibp2p.components.getUpgrader().upgradeInbound(inbound) + ]) + + let streamCount = 0 + + await libp2p.handle(protocol, (data) => {}, { + maxInboundStreams: 10, + maxOutboundStreams: 10 + }) + + await remoteLibp2p.handle(protocol, (data) => { + streamCount++ + }, { + maxInboundStreams: 1, + maxOutboundStreams: 1 + }) + + expect(streamCount).to.equal(0) + + await localToRemote.newStream(protocol) + + expect(streamCount).to.equal(1) + + await expect(localToRemote.newStream(protocol)).to.eventually.be.rejected() + .with.property('code', 'ERR_UNDER_READ') + }) + + it('should limit the number of outgoing streams that can be opened using a protocol', async () => { + const protocol = '/a-test-protocol/1.0.0' + const remotePeer = peers[1] + libp2p = await createLibp2pNode({ + peerId: peers[0], + transports: [ + new WebSockets() + ], + streamMuxers: [ + new Mplex() + ], + connectionEncryption: [ + NOISE + ] + }) + await libp2p.start() + + remoteLibp2p = await createLibp2pNode({ + peerId: remotePeer, + transports: [ + new WebSockets() + ], + streamMuxers: [ + new Mplex() + ], + connectionEncryption: [ + NOISE + ] + }) + await remoteLibp2p.start() + + const { inbound, outbound } = mockMultiaddrConnPair({ addrs, remotePeer }) + + const [localToRemote] = await Promise.all([ + libp2p.components.getUpgrader().upgradeOutbound(outbound), + remoteLibp2p.components.getUpgrader().upgradeInbound(inbound) + ]) + + let streamCount = 0 + + await libp2p.handle(protocol, (data) => {}, { + maxInboundStreams: 1, + maxOutboundStreams: 1 + }) + + await remoteLibp2p.handle(protocol, (data) => { + streamCount++ + }, { + maxInboundStreams: 10, + maxOutboundStreams: 10 + }) + + expect(streamCount).to.equal(0) + + await localToRemote.newStream(protocol) + + expect(streamCount).to.equal(1) + + await expect(localToRemote.newStream(protocol)).to.eventually.be.rejected() + .with.property('code', codes.ERR_TOO_MANY_OUTBOUND_PROTOCOL_STREAMS) + }) })