diff --git a/doc/LIMITS.md b/doc/LIMITS.md index 6cbd20b32a..f1e34c5d2f 100644 --- a/doc/LIMITS.md +++ b/doc/LIMITS.md @@ -20,6 +20,8 @@ In order to prevent excessive resource consumption by a libp2p node it's importa It's possible to limit the amount of incoming and outgoing connections a node is able to make. When this limit is reached and an attempt to open a new connection is made, existing connections may be closed to make room for the new connection. +We can also limit the number of connections in a "pending" state. These connections have been opened by a remote peer but peer IDs have yet to be exchanged and/or connection encryption and multiplexing negotiated. Once this limit is hit further connections will be closed unless the remote peer has an address in the [allow list](#allowdeny-lists). + ```js const node = await createLibp2pNode({ connectionManager: { @@ -32,7 +34,12 @@ const node = await createLibp2pNode({ * If the number of open connections goes below this number, the node * will try to connect to nearby peers from the peer store */ - minConnections: 20 + minConnections: 20, + + /** + * How many connections can be open but not yet upgraded + */ + maxIncomingPendingConnections: 10 } }) ``` diff --git a/examples/package.json b/examples/package.json index a43e2a9df1..4c17cf07fa 100644 --- a/examples/package.json +++ b/examples/package.json @@ -10,7 +10,7 @@ "license": "MIT", "dependencies": { "@libp2p/pubsub-peer-discovery": "^6.0.2", - "@libp2p/floodsub": "^4.0.0", + "@libp2p/floodsub": "^4.0.1", "@nodeutils/defaults-deep": "^1.1.0", "execa": "^6.1.0", "fs-extra": "^10.1.0", diff --git a/package.json b/package.json index 2ad2304a60..35b705beaa 100644 --- a/package.json +++ b/package.json @@ -113,7 +113,7 @@ "@libp2p/interface-peer-info": "^1.0.3", "@libp2p/interface-peer-routing": "^1.0.1", "@libp2p/interface-peer-store": "^1.2.2", - "@libp2p/interface-pubsub": "^2.1.0", + "@libp2p/interface-pubsub": "^3.0.0", "@libp2p/interface-registrar": "^2.0.3", "@libp2p/interface-stream-muxer": "^3.0.0", "@libp2p/interface-transport": "^2.0.0", @@ -174,7 +174,7 @@ "@libp2p/bootstrap": "^4.0.0", "@libp2p/daemon-client": "^3.0.1", "@libp2p/daemon-server": "^3.0.1", - "@libp2p/floodsub": "^4.0.0", + "@libp2p/floodsub": "^4.0.1", "@libp2p/interface-compliance-tests": "^3.0.2", "@libp2p/interface-connection-encrypter-compliance-tests": "^2.0.2", "@libp2p/interface-mocks": "^6.0.1", @@ -211,4 +211,4 @@ "browser": { "nat-api": false } -} \ No newline at end of file +} diff --git a/src/connection-manager/index.ts b/src/connection-manager/index.ts index 7717cb4241..9223c7d695 100644 --- a/src/connection-manager/index.ts +++ b/src/connection-manager/index.ts @@ -33,7 +33,8 @@ const defaultOptions: Partial = { pollInterval: 2000, autoDialInterval: 10000, movingAverageInterval: 60000, - inboundConnectionThreshold: 5 + inboundConnectionThreshold: 5, + maxIncomingPendingConnections: 10 } const METRICS_SYSTEM = 'libp2p' @@ -152,6 +153,12 @@ export interface ConnectionManagerInit { * host, reject subsequent connections */ inboundConnectionThreshold?: number + + /** + * The maximum number of parallel incoming connections allowed that have yet to + * complete the connection upgrade - e.g. choosing connection encryption, muxer, etc + */ + maxIncomingPendingConnections?: number } export interface ConnectionManagerEvents { @@ -175,6 +182,7 @@ export class DefaultConnectionManager extends EventEmitter implements PubSub { + public topicValidators = new Map() + isStarted (): boolean { return false } diff --git a/src/upgrader.ts b/src/upgrader.ts index 6f2ba74c84..c23070cd31 100644 --- a/src/upgrader.ts +++ b/src/upgrader.ts @@ -126,7 +126,6 @@ export class DefaultUpgrader extends EventEmitter implements Upg const accept = await this.components.getConnectionManager().acceptIncomingConnection(maConn) if (!accept) { - await maConn.close() throw errCode(new Error('connection denied'), codes.ERR_CONNECTION_DENIED) } @@ -201,7 +200,6 @@ export class DefaultUpgrader extends EventEmitter implements Upg } } catch (err: any) { log.error('Failed to upgrade inbound connection', err) - await maConn.close(err) throw err } @@ -228,6 +226,7 @@ export class DefaultUpgrader extends EventEmitter implements Upg remotePeer }) } finally { + this.components.getConnectionManager().afterUpgradeInbound() timeoutController.clear() } } diff --git a/test/connection-manager/index.spec.ts b/test/connection-manager/index.spec.ts index 79a557f3d3..86b6ba8480 100644 --- a/test/connection-manager/index.spec.ts +++ b/test/connection-manager/index.spec.ts @@ -304,4 +304,47 @@ describe('Connection Manager', () => { await expect(connectionManager.acceptIncomingConnection(maConn)) .to.eventually.be.true() }) + + it('should limit the number of inbound pending connections', async () => { + const connectionManager = new DefaultConnectionManager({ + ...defaultOptions, + maxIncomingPendingConnections: 1 + }) + + const dialer = stubInterface() + dialer.dial.resolves(stubInterface()) + + const components = new Components({ + dialer + }) + + // set mocks + connectionManager.init(components) + + // start the upgrade + const maConn1 = mockMultiaddrConnection({ + source: [], + sink: async () => {} + }, await createEd25519PeerId()) + + await expect(connectionManager.acceptIncomingConnection(maConn1)) + .to.eventually.be.true() + + // start the upgrade + const maConn2 = mockMultiaddrConnection({ + source: [], + sink: async () => {} + }, await createEd25519PeerId()) + + // should be false because we have not completed the upgrade of maConn1 + await expect(connectionManager.acceptIncomingConnection(maConn2)) + .to.eventually.be.false() + + // finish the maConn1 pending upgrade + connectionManager.afterUpgradeInbound() + + // should be true because we have now completed the upgrade of maConn1 + await expect(connectionManager.acceptIncomingConnection(maConn2)) + .to.eventually.be.true() + }) })