Skip to content

Commit

Permalink
feat!: limit protocol streams per-connection (#1255)
Browse files Browse the repository at this point in the history
* feat: limit protocol streams per-connection

Uses the `maxInboundStreams` and `maxOutboundStreams` of the `registrar.handle`
opts to limit the number of concurrent streams open on each connection
on a per-protocol basis.

Both values default to 1 so some tuning will be necessary to set
appropriate values for some protocols.

* chore: make error codes consistent

* chore: fix up examples
  • Loading branch information
achingbrain committed Jun 17, 2022
1 parent 5371729 commit de30c2c
Show file tree
Hide file tree
Showing 43 changed files with 476 additions and 185 deletions.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ test/repo-tests*
logs
*.log

coverage
.coverage
.nyc_output

# Runtime data
Expand Down
8 changes: 6 additions & 2 deletions doc/API.md
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,7 @@ await libp2p.hangUp(remotePeerId)

Sets up [multistream-select routing](https://github.com/multiformats/multistream-select) of protocols to their application handlers. Whenever a stream is opened on one of the provided protocols, the handler will be called. `handle` must be called in order to register a handler and support for a given protocol. This also informs other peers of the protocols you support.

`libp2p.handle(protocols, handler)`
`libp2p.handle(protocols, handler, options)`

In the event of a new handler for the same protocol being added, the first one is discarded.

Expand All @@ -399,6 +399,7 @@ In the event of a new handler for the same protocol being added, the first one i
|------|------|-------------|
| protocols | `Array<string>|string` | protocols to register |
| handler | `function({ connection:*, stream:*, protocol:string })` | handler to call |
| options | `StreamHandlerOptions` | Options including protocol stream limits |


#### Example
Expand All @@ -409,7 +410,10 @@ const handler = ({ connection, stream, protocol }) => {
// use stream or connection according to the needs
}

libp2p.handle('/echo/1.0.0', handler)
libp2p.handle('/echo/1.0.0', handler, {
maxInboundStreams: 5,
maxOutboundStreams: 5
})
```

### unhandle
Expand Down
2 changes: 1 addition & 1 deletion examples/chat/src/dialer.js
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ async function run () {

// Dial to the remote peer (the "listener")
const listenerMa = new Multiaddr(`/ip4/127.0.0.1/tcp/10333/p2p/${idListener.toString()}`)
const { stream } = await nodeDialer.dialProtocol(listenerMa, '/chat/1.0.0')
const stream = await nodeDialer.dialProtocol(listenerMa, '/chat/1.0.0')

console.log('Dialer dialed to listener on protocol: /chat/1.0.0')
console.log('Type a message and see what happens')
Expand Down
2 changes: 1 addition & 1 deletion examples/connection-encryption/1.js
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ const createNode = async () => {
)
})

const { stream } = await node1.dialProtocol(node2.peerId, '/a-protocol')
const stream = await node1.dialProtocol(node2.peerId, '/a-protocol')

await pipe(
[uint8ArrayFromString('This information is sent out encrypted to the other peer')],
Expand Down
8 changes: 4 additions & 4 deletions examples/delegated-routing/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@
"libp2p": "../../",
"@libp2p/delegated-content-routing": "^2.0.1",
"@libp2p/delegated-peer-routing": "^2.0.1",
"@libp2p/kad-dht": "^2.0.0",
"@libp2p/mplex": "^2.0.0",
"@libp2p/webrtc-star": "^2.0.0",
"@libp2p/websockets": "^2.0.0",
"@libp2p/kad-dht": "^3.0.0",
"@libp2p/mplex": "^3.0.0",
"@libp2p/webrtc-star": "^2.0.1",
"@libp2p/websockets": "^3.0.0",
"react": "^17.0.2",
"react-dom": "^17.0.2",
"react-scripts": "5.0.0"
Expand Down
2 changes: 1 addition & 1 deletion examples/echo/src/dialer.js
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ async function run() {

// Dial the listener node
console.log('Dialing to peer:', listenerMultiaddr)
const { stream } = await dialerNode.dialProtocol(listenerMultiaddr, '/echo/1.0.0')
const stream = await dialerNode.dialProtocol(listenerMultiaddr, '/echo/1.0.0')

console.log('nodeA dialed to nodeB on protocol: /echo/1.0.0')

Expand Down
6 changes: 3 additions & 3 deletions examples/libp2p-in-the-browser/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@
"dependencies": {
"@chainsafe/libp2p-noise": "^6.2.0",
"@libp2p/bootstrap": "^2.0.0",
"@libp2p/mplex": "^2.0.0",
"@libp2p/webrtc-star": "^2.0.0",
"@libp2p/websockets": "^2.0.0",
"@libp2p/mplex": "^3.0.0",
"@libp2p/webrtc-star": "^2.0.1",
"@libp2p/websockets": "^3.0.0",
"libp2p": "../../"
},
"devDependencies": {
Expand Down
2 changes: 1 addition & 1 deletion examples/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
"license": "MIT",
"dependencies": {
"@libp2p/pubsub-peer-discovery": "^6.0.0",
"@libp2p/floodsub": "^2.0.0",
"@libp2p/floodsub": "^3.0.0",
"@nodeutils/defaults-deep": "^1.1.0",
"execa": "^2.1.0",
"fs-extra": "^8.1.0",
Expand Down
2 changes: 1 addition & 1 deletion examples/pnet/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ generateKey(otherSwarmKey)
)
})

const { stream } = await node1.dialProtocol(node2.peerId, '/private')
const stream = await node1.dialProtocol(node2.peerId, '/private')

await pipe(
[uint8ArrayFromString('This message is sent on a private network')],
Expand Down
4 changes: 2 additions & 2 deletions examples/protocol-and-stream-muxing/1.js
Original file line number Diff line number Diff line change
Expand Up @@ -60,14 +60,14 @@ const createNode = async () => {
})
*/

const { stream } = await node1.dialProtocol(node2.peerId, ['/your-protocol'])
const stream = await node1.dialProtocol(node2.peerId, ['/your-protocol'])
await pipe(
[uint8ArrayFromString('my own protocol, wow!')],
stream
)

/*
const { stream } = node1.dialProtocol(node2.peerId, ['/another-protocol/1.0.0'])
const stream = node1.dialProtocol(node2.peerId, ['/another-protocol/1.0.0'])
await pipe(
['my own protocol, wow!'],
Expand Down
11 changes: 7 additions & 4 deletions examples/protocol-and-stream-muxing/2.js
Original file line number Diff line number Diff line change
Expand Up @@ -38,22 +38,25 @@ const createNode = async () => {
console.log(`from: ${protocol}, msg: ${uint8ArrayToString(msg)}`)
}
}
)
).finally(() => {
// clean up resources
stream.close()
})
})

const { stream: stream1 } = await node1.dialProtocol(node2.peerId, ['/a'])
const stream1 = await node1.dialProtocol(node2.peerId, ['/a'])
await pipe(
[uint8ArrayFromString('protocol (a)')],
stream1
)

const { stream: stream2 } = await node1.dialProtocol(node2.peerId, ['/b'])
const stream2 = await node1.dialProtocol(node2.peerId, ['/b'])
await pipe(
[uint8ArrayFromString('protocol (b)')],
stream2
)

const { stream: stream3 } = await node1.dialProtocol(node2.peerId, ['/b'])
const stream3 = await node1.dialProtocol(node2.peerId, ['/b'])
await pipe(
[uint8ArrayFromString('another stream on protocol (b)')],
stream3
Expand Down
4 changes: 2 additions & 2 deletions examples/protocol-and-stream-muxing/3.js
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,13 @@ const createNode = async () => {
)
})

const { stream: stream1 } = await node1.dialProtocol(node2.peerId, ['/node-2'])
const stream1 = await node1.dialProtocol(node2.peerId, ['/node-2'])
await pipe(
[uint8ArrayFromString('from 1 to 2')],
stream1
)

const { stream: stream2 } = await node2.dialProtocol(node1.peerId, ['/node-1'])
const stream2 = await node2.dialProtocol(node1.peerId, ['/node-1'])
await pipe(
[uint8ArrayFromString('from 2 to 1')],
stream2
Expand Down
20 changes: 10 additions & 10 deletions examples/protocol-and-stream-muxing/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ node2.handle('/your-protocol', ({ stream }) => {
After the protocol is _handled_, now we can dial to it.

```JavaScript
const { stream } = await node1.dialProtocol(node2.peerId, ['/your-protocol'])
const stream = await node1.dialProtocol(node2.peerId, ['/your-protocol'])

await pipe(
['my own protocol, wow!'],
Expand All @@ -62,7 +62,7 @@ node2.handle('/another-protocol/1.0.1', ({ stream }) => {
)
})
// ...
const { stream } = await node1.dialProtocol(node2.peerId, ['/another-protocol/1.0.0'])
const stream = await node1.dialProtocol(node2.peerId, ['/another-protocol/1.0.0'])

await pipe(
['my own protocol, wow!'],
Expand Down Expand Up @@ -133,19 +133,19 @@ node2.handle(['/a', '/b'], ({ protocol, stream }) => {
)
})

const { stream } = await node1.dialProtocol(node2.peerId, ['/a'])
const stream = await node1.dialProtocol(node2.peerId, ['/a'])
await pipe(
['protocol (a)'],
stream
)

const { stream: stream2 } = await node1.dialProtocol(node2.peerId, ['/b'])
const stream2 = await node1.dialProtocol(node2.peerId, ['/b'])
await pipe(
['protocol (b)'],
stream2
)

const { stream: stream3 } = await node1.dialProtocol(node2.peerId, ['/b'])
const stream3 = await node1.dialProtocol(node2.peerId, ['/b'])
await pipe(
['another stream on protocol (b)'],
stream3
Expand All @@ -167,7 +167,7 @@ There is one last trick on _protocol and stream multiplexing_ that libp2p uses t

With the aid of both mechanisms, we can reuse an incomming connection to dial streams out too, this is specially useful when you are behind tricky NAT, firewalls or if you are running in a browser, where you can't have listening addrs, but you can dial out. By dialing out, you enable other peers to talk with you in Protocols that they want, simply by opening a new multiplexed stream.

You can see this working on example [3.js](./3.js).
You can see this working on example [3.js](./3.js).

As we've seen earlier, we can create our node with this createNode function.
```js
Expand Down Expand Up @@ -229,14 +229,14 @@ node2.handle('/node-2', ({ stream }) => {
})

// Dialing node2 from node1
const { stream: stream1 } = await node1.dialProtocol(node2.peerId, ['/node-2'])
const stream1 = await node1.dialProtocol(node2.peerId, ['/node-2'])
await pipe(
['from 1 to 2'],
stream1
)

// Dialing node1 from node2
const { stream: stream2 } = await node2.dialProtocol(node1.peerId, ['/node-1'])
const stream2 = await node2.dialProtocol(node1.peerId, ['/node-1'])
await pipe(
['from 2 to 1'],
stream2
Expand All @@ -256,14 +256,14 @@ So, we have successfully set up a bidirectional connection with protocol muxing.
The code below will result into an error as `the dial address is not valid`.
```js
// Dialing from node2 to node1
const { stream: stream2 } = await node2.dialProtocol(node1.peerId, ['/node-1'])
const stream2 = await node2.dialProtocol(node1.peerId, ['/node-1'])
await pipe(
['from 2 to 1'],
stream2
)

// Dialing from node1 to node2
const { stream: stream1 } = await node1.dialProtocol(node2.peerId, ['/node-2'])
const stream1 = await node1.dialProtocol(node2.peerId, ['/node-2'])
await pipe(
['from 1 to 2'],
stream1
Expand Down
2 changes: 1 addition & 1 deletion examples/transports/2.js
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ function printAddrs (node, number) {
})

await node1.peerStore.addressBook.set(node2.peerId, node2.getMultiaddrs())
const { stream } = await node1.dialProtocol(node2.peerId, '/print')
const stream = await node1.dialProtocol(node2.peerId, '/print')

await pipe(
['Hello', ' ', 'p2p', ' ', 'world', '!'].map(str => uint8ArrayFromString(str)),
Expand Down
4 changes: 2 additions & 2 deletions examples/transports/3.js
Original file line number Diff line number Diff line change
Expand Up @@ -63,14 +63,14 @@ function print ({ stream }) {
await node3.peerStore.addressBook.set(node1.peerId, node1.getMultiaddrs())

// node 1 (TCP) dials to node 2 (TCP+WebSockets)
const { stream } = await node1.dialProtocol(node2.peerId, '/print')
const stream = await node1.dialProtocol(node2.peerId, '/print')
await pipe(
[uint8ArrayFromString('node 1 dialed to node 2 successfully')],
stream
)

// node 2 (TCP+WebSockets) dials to node 2 (WebSockets)
const { stream: stream2 } = await node2.dialProtocol(node3.peerId, '/print')
const stream2 = await node2.dialProtocol(node3.peerId, '/print')
await pipe(
[uint8ArrayFromString('node 2 dialed to node 3 successfully')],
stream2
Expand Down
2 changes: 1 addition & 1 deletion examples/transports/4.js
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ function print ({ stream }) {
const targetAddr = node1.getMultiaddrs()[0];

// node 2 (Secure WebSockets) dials to node 1 (Secure Websockets)
const { stream } = await node2.dialProtocol(targetAddr, '/print')
const stream = await node2.dialProtocol(targetAddr, '/print')
await pipe(
[uint8ArrayFromString('node 2 dialed to node 1 successfully')],
stream
Expand Down
6 changes: 3 additions & 3 deletions examples/transports/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ Then add,
})

await node1.peerStore.addressBook.set(node2.peerId, node2.multiaddrs)
const { stream } = await node1.dialProtocol(node2.peerId, '/print')
const stream = await node1.dialProtocol(node2.peerId, '/print')

await pipe(
['Hello', ' ', 'p2p', ' ', 'world', '!'],
Expand Down Expand Up @@ -225,14 +225,14 @@ await node2.peerStore.addressBook.set(node3.peerId, node3.multiaddrs)
await node3.peerStore.addressBook.set(node1.peerId, node1.multiaddrs)

// node 1 (TCP) dials to node 2 (TCP+WebSockets)
const { stream } = await node1.dialProtocol(node2.peerId, '/print')
const stream = await node1.dialProtocol(node2.peerId, '/print')
await pipe(
['node 1 dialed to node 2 successfully'],
stream
)

// node 2 (TCP+WebSockets) dials to node 2 (WebSockets)
const { stream: stream2 } = await node2.dialProtocol(node3.peerId, '/print')
const stream2 = await node2.dialProtocol(node3.peerId, '/print')
await pipe(
['node 2 dialed to node 3 successfully'],
stream2
Expand Down
2 changes: 1 addition & 1 deletion examples/webrtc-direct/dialer.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { createLibp2p } from 'libp2p'
import { WebRTCDirect } from '@achingbrain/webrtc-direct'
import { WebRTCDirect } from '@libp2p/webrtc-direct'
import { Mplex } from '@libp2p/mplex'
import { Noise } from '@chainsafe/libp2p-noise'
import { Bootstrap } from '@libp2p/bootstrap'
Expand Down
2 changes: 1 addition & 1 deletion examples/webrtc-direct/listener.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { createLibp2p } from 'libp2p'
import { WebRTCDirect } from '@achingbrain/webrtc-direct'
import { WebRTCDirect } from '@libp2p/webrtc-direct'
import { Mplex } from '@libp2p/mplex'
import { Noise } from '@chainsafe/libp2p-noise'
import { createFromJSON } from '@libp2p/peer-id-factory'
Expand Down
2 changes: 1 addition & 1 deletion examples/webrtc-direct/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
"@libp2p/webrtc-direct": "^2.0.0",
"@chainsafe/libp2p-noise": "^6.2.0",
"@libp2p/bootstrap": "^2.0.0",
"@libp2p/mplex": "^2.0.0",
"@libp2p/mplex": "^3.0.0",
"libp2p": "../../",
"wrtc": "^0.4.7"
},
Expand Down
Loading

0 comments on commit de30c2c

Please sign in to comment.