From f9073ecd215e119b7a864e2ad31fe7067322c754 Mon Sep 17 00:00:00 2001 From: Alex Potsides Date: Wed, 8 Jun 2022 16:20:56 +0100 Subject: [PATCH] fix: connection pruning (#1235) Actually prune connections when we reach the connection limit --- src/connection-manager/index.ts | 82 ++++++++++++++++----------- test/connection-manager/index.spec.ts | 4 +- test/metrics/index.node.ts | 5 +- 3 files changed, 54 insertions(+), 37 deletions(-) diff --git a/src/connection-manager/index.ts b/src/connection-manager/index.ts index 58a9b5eda5..4a78606dc2 100644 --- a/src/connection-manager/index.ts +++ b/src/connection-manager/index.ts @@ -31,7 +31,7 @@ const defaultOptions: Partial = { pollInterval: 2000, autoDialInterval: 10000, movingAverageInterval: 60000, - defaultPeerValue: 1 + defaultPeerValue: 0.5 } const METRICS_COMPONENT = 'connection-manager' @@ -344,7 +344,10 @@ export class DefaultConnectionManager extends EventEmitter('peer:connect', { detail: connection })) } @@ -459,7 +462,7 @@ export class DefaultConnectionManager extends EventEmitter) { const { detail: summary } = evt - this._checkMaxLimit('maxEventLoopDelay', summary.avgMs) + this._checkMaxLimit('maxEventLoopDelay', summary.avgMs, 1) .catch(err => { log.error(err) }) @@ -468,46 +471,59 @@ export class DefaultConnectionManager extends EventEmitter limit) { - log('%s: limit exceeded: %p, %d', this.components.getPeerId(), name, value) - await this._maybeDisconnectOne() + log('%s: limit exceeded: %p, %d, pruning %d connection(s)', this.components.getPeerId(), name, value, toPrune) + await this._maybePruneConnections(toPrune) } } /** - * If we have more connections than our maximum, close a connection - * to the lowest valued peer. + * If we have more connections than our maximum, select some excess connections + * to prune based on peer value */ - async _maybeDisconnectOne () { - if (this.opts.minConnections < this.connections.size) { - const peerValues = Array.from(new Map([...this.peerValues.entries()].sort((a, b) => a[1] - b[1]))) - - log('%p: sorted peer values: %j', this.components.getPeerId(), peerValues) - const disconnectPeer = peerValues[0] - - if (disconnectPeer != null) { - const peerId = disconnectPeer[0] - log('%p: lowest value peer is %s', this.components.getPeerId(), peerId) - log('%p: closing a connection to %j', this.components.getPeerId(), peerId) - - for (const connections of this.connections.values()) { - if (connections[0].remotePeer.toString() === peerId) { - void connections[0].close() - .catch(err => { - log.error(err) - }) - - // TODO: should not need to invoke this manually - this.onDisconnect(new CustomEvent('connectionEnd', { - detail: connections[0] - })) - break - } + async _maybePruneConnections (toPrune: number) { + const connections = this.getConnections() + + if (connections.length <= this.opts.minConnections || toPrune < 1) { + return + } + + const peerValues = Array.from(new Map([...this.peerValues.entries()].sort((a, b) => a[1] - b[1]))) + log.trace('sorted peer values: %j', peerValues) + + const toClose = [] + + for (const [peerId] of peerValues) { + log('too many connections open - closing a connection to %p', peerId) + + for (const connection of connections) { + if (connection.remotePeer.toString() === peerId) { + toClose.push(connection) + } + + if (toClose.length === toPrune) { + break } } } + + // close connections + await Promise.all( + toClose.map(async connection => { + try { + await connection.close() + } catch (err) { + log.error(err) + } + + // TODO: should not need to invoke this manually + this.onDisconnect(new CustomEvent('connectionEnd', { + detail: connection + })) + }) + ) } } diff --git a/test/connection-manager/index.spec.ts b/test/connection-manager/index.spec.ts index 83a520f7b1..7253bcd197 100644 --- a/test/connection-manager/index.spec.ts +++ b/test/connection-manager/index.spec.ts @@ -66,7 +66,7 @@ describe('Connection Manager', () => { await libp2p.start() const connectionManager = libp2p.components.getConnectionManager() as DefaultConnectionManager - const connectionManagerMaybeDisconnectOneSpy = sinon.spy(connectionManager, '_maybeDisconnectOne') + const connectionManagerMaybeDisconnectOneSpy = sinon.spy(connectionManager, '_maybePruneConnections') // Add 1 too many connections const spies = new Map>>() @@ -115,7 +115,7 @@ describe('Connection Manager', () => { await libp2p.start() const connectionManager = libp2p.components.getConnectionManager() as DefaultConnectionManager - const connectionManagerMaybeDisconnectOneSpy = sinon.spy(connectionManager, '_maybeDisconnectOne') + const connectionManagerMaybeDisconnectOneSpy = sinon.spy(connectionManager, '_maybePruneConnections') // Add 1 too many connections const spy = sinon.spy() diff --git a/test/metrics/index.node.ts b/test/metrics/index.node.ts index 83f5783c8f..8a6e7a58f2 100644 --- a/test/metrics/index.node.ts +++ b/test/metrics/index.node.ts @@ -11,6 +11,7 @@ import { createBaseOptions } from '../utils/base-options.js' import type { Libp2pNode } from '../../src/libp2p.js' import type { Libp2pOptions } from '../../src/index.js' import type { DefaultMetrics } from '../../src/metrics/index.js' +import drain from 'it-drain' describe('libp2p.metrics', () => { let libp2p: Libp2pNode @@ -149,7 +150,7 @@ describe('libp2p.metrics', () => { ]) await populateAddressBooks([libp2p, remoteLibp2p]) - void remoteLibp2p.handle('/echo/1.0.0', ({ stream }) => { + await remoteLibp2p.handle('/echo/1.0.0', ({ stream }) => { void pipe(stream, stream) }) @@ -160,7 +161,7 @@ describe('libp2p.metrics', () => { await pipe( [bytes], stream, - async (source) => await toBuffer(source) + drain ) const metrics = libp2p.components.getMetrics()