Skip to content

Commit

Permalink
fix: add pending connection limit (#1423)
Browse files Browse the repository at this point in the history
Adds a `maxIncomingPendingConnections` option to the Connection Manager that limits how many connections can be open but not yet upgraded.
  • Loading branch information
achingbrain committed Oct 11, 2022
1 parent 487b942 commit b717beb
Show file tree
Hide file tree
Showing 7 changed files with 81 additions and 10 deletions.
9 changes: 8 additions & 1 deletion doc/LIMITS.md
Expand Up @@ -20,6 +20,8 @@ In order to prevent excessive resource consumption by a libp2p node it's importa

It's possible to limit the amount of incoming and outgoing connections a node is able to make. When this limit is reached and an attempt to open a new connection is made, existing connections may be closed to make room for the new connection.

We can also limit the number of connections in a "pending" state. These connections have been opened by a remote peer but peer IDs have yet to be exchanged and/or connection encryption and multiplexing negotiated. Once this limit is hit further connections will be closed unless the remote peer has an address in the [allow list](#allowdeny-lists).

```js
const node = await createLibp2pNode({
connectionManager: {
Expand All @@ -32,7 +34,12 @@ const node = await createLibp2pNode({
* If the number of open connections goes below this number, the node
* will try to connect to nearby peers from the peer store
*/
minConnections: 20
minConnections: 20,

/**
* How many connections can be open but not yet upgraded
*/
maxIncomingPendingConnections: 10
}
})
```
Expand Down
2 changes: 1 addition & 1 deletion examples/package.json
Expand Up @@ -10,7 +10,7 @@
"license": "MIT",
"dependencies": {
"@libp2p/pubsub-peer-discovery": "^6.0.2",
"@libp2p/floodsub": "^4.0.0",
"@libp2p/floodsub": "^4.0.1",
"@nodeutils/defaults-deep": "^1.1.0",
"execa": "^6.1.0",
"fs-extra": "^10.1.0",
Expand Down
6 changes: 3 additions & 3 deletions package.json
Expand Up @@ -113,7 +113,7 @@
"@libp2p/interface-peer-info": "^1.0.3",
"@libp2p/interface-peer-routing": "^1.0.1",
"@libp2p/interface-peer-store": "^1.2.2",
"@libp2p/interface-pubsub": "^2.1.0",
"@libp2p/interface-pubsub": "^3.0.0",
"@libp2p/interface-registrar": "^2.0.3",
"@libp2p/interface-stream-muxer": "^3.0.0",
"@libp2p/interface-transport": "^2.0.0",
Expand Down Expand Up @@ -174,7 +174,7 @@
"@libp2p/bootstrap": "^4.0.0",
"@libp2p/daemon-client": "^3.0.1",
"@libp2p/daemon-server": "^3.0.1",
"@libp2p/floodsub": "^4.0.0",
"@libp2p/floodsub": "^4.0.1",
"@libp2p/interface-compliance-tests": "^3.0.2",
"@libp2p/interface-connection-encrypter-compliance-tests": "^2.0.2",
"@libp2p/interface-mocks": "^6.0.1",
Expand Down Expand Up @@ -211,4 +211,4 @@
"browser": {
"nat-api": false
}
}
}
24 changes: 22 additions & 2 deletions src/connection-manager/index.ts
Expand Up @@ -33,7 +33,8 @@ const defaultOptions: Partial<ConnectionManagerInit> = {
pollInterval: 2000,
autoDialInterval: 10000,
movingAverageInterval: 60000,
inboundConnectionThreshold: 5
inboundConnectionThreshold: 5,
maxIncomingPendingConnections: 10
}

const METRICS_SYSTEM = 'libp2p'
Expand Down Expand Up @@ -152,6 +153,12 @@ export interface ConnectionManagerInit {
* host, reject subsequent connections
*/
inboundConnectionThreshold?: number

/**
* The maximum number of parallel incoming connections allowed that have yet to
* complete the connection upgrade - e.g. choosing connection encryption, muxer, etc
*/
maxIncomingPendingConnections?: number
}

export interface ConnectionManagerEvents {
Expand All @@ -175,6 +182,7 @@ export class DefaultConnectionManager extends EventEmitter<ConnectionManagerEven
private readonly allow: Multiaddr[]
private readonly deny: Multiaddr[]
private readonly inboundConnectionRateLimiter: RateLimiterMemory
private incomingPendingConnections: number

constructor (init: ConnectionManagerInit) {
super()
Expand Down Expand Up @@ -218,6 +226,8 @@ export class DefaultConnectionManager extends EventEmitter<ConnectionManagerEven
points: this.opts.inboundConnectionThreshold,
duration: 1
})

this.incomingPendingConnections = 0
}

init (components: Components): void {
Expand Down Expand Up @@ -719,9 +729,17 @@ export class DefaultConnectionManager extends EventEmitter<ConnectionManagerEven
})

if (allowConnection) {
this.incomingPendingConnections++

return true
}

// check pending connections
if (this.incomingPendingConnections === this.opts.maxIncomingPendingConnections) {
log('connection from %s refused - incomingPendingConnections exceeded by peer %s', maConn.remoteAddr)
return false
}

if (maConn.remoteAddr.isThinWaistAddress()) {
const host = maConn.remoteAddr.nodeAddress().address

Expand All @@ -734,6 +752,8 @@ export class DefaultConnectionManager extends EventEmitter<ConnectionManagerEven
}

if (this.getConnections().length < this.opts.maxConnections) {
this.incomingPendingConnections++

return true
}

Expand All @@ -742,6 +762,6 @@ export class DefaultConnectionManager extends EventEmitter<ConnectionManagerEven
}

afterUpgradeInbound () {

this.incomingPendingConnections--
}
}
4 changes: 3 additions & 1 deletion src/pubsub/dummy-pubsub.ts
@@ -1,10 +1,12 @@
import { EventEmitter } from '@libp2p/interfaces/events'
import type { PeerId } from '@libp2p/interface-peer-id'
import type { PublishResult, PubSub, PubSubEvents, StrictNoSign, StrictSign } from '@libp2p/interface-pubsub'
import type { PublishResult, PubSub, PubSubEvents, StrictNoSign, StrictSign, TopicValidatorFn } from '@libp2p/interface-pubsub'
import errCode from 'err-code'
import { messages, codes } from '../errors.js'

export class DummyPubSub extends EventEmitter<PubSubEvents> implements PubSub {
public topicValidators = new Map<string, TopicValidatorFn>()

isStarted (): boolean {
return false
}
Expand Down
3 changes: 1 addition & 2 deletions src/upgrader.ts
Expand Up @@ -126,7 +126,6 @@ export class DefaultUpgrader extends EventEmitter<UpgraderEvents> implements Upg
const accept = await this.components.getConnectionManager().acceptIncomingConnection(maConn)

if (!accept) {
await maConn.close()
throw errCode(new Error('connection denied'), codes.ERR_CONNECTION_DENIED)
}

Expand Down Expand Up @@ -201,7 +200,6 @@ export class DefaultUpgrader extends EventEmitter<UpgraderEvents> implements Upg
}
} catch (err: any) {
log.error('Failed to upgrade inbound connection', err)
await maConn.close(err)
throw err
}

Expand All @@ -228,6 +226,7 @@ export class DefaultUpgrader extends EventEmitter<UpgraderEvents> implements Upg
remotePeer
})
} finally {
this.components.getConnectionManager().afterUpgradeInbound()
timeoutController.clear()
}
}
Expand Down
43 changes: 43 additions & 0 deletions test/connection-manager/index.spec.ts
Expand Up @@ -304,4 +304,47 @@ describe('Connection Manager', () => {
await expect(connectionManager.acceptIncomingConnection(maConn))
.to.eventually.be.true()
})

it('should limit the number of inbound pending connections', async () => {
const connectionManager = new DefaultConnectionManager({
...defaultOptions,
maxIncomingPendingConnections: 1
})

const dialer = stubInterface<Dialer>()
dialer.dial.resolves(stubInterface<Connection>())

const components = new Components({
dialer
})

// set mocks
connectionManager.init(components)

// start the upgrade
const maConn1 = mockMultiaddrConnection({
source: [],
sink: async () => {}
}, await createEd25519PeerId())

await expect(connectionManager.acceptIncomingConnection(maConn1))
.to.eventually.be.true()

// start the upgrade
const maConn2 = mockMultiaddrConnection({
source: [],
sink: async () => {}
}, await createEd25519PeerId())

// should be false because we have not completed the upgrade of maConn1
await expect(connectionManager.acceptIncomingConnection(maConn2))
.to.eventually.be.false()

// finish the maConn1 pending upgrade
connectionManager.afterUpgradeInbound()

// should be true because we have now completed the upgrade of maConn1
await expect(connectionManager.acceptIncomingConnection(maConn2))
.to.eventually.be.true()
})
})

0 comments on commit b717beb

Please sign in to comment.