Skip to content

Commit

Permalink
fix: connection pruning (#1235)
Browse files Browse the repository at this point in the history
Actually prune connections when we reach the connection limit
  • Loading branch information
achingbrain committed Jun 8, 2022
1 parent eee256d commit f9073ec
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 37 deletions.
82 changes: 49 additions & 33 deletions src/connection-manager/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ const defaultOptions: Partial<ConnectionManagerInit> = {
pollInterval: 2000,
autoDialInterval: 10000,
movingAverageInterval: 60000,
defaultPeerValue: 1
defaultPeerValue: 0.5
}

const METRICS_COMPONENT = 'connection-manager'
Expand Down Expand Up @@ -344,7 +344,10 @@ export class DefaultConnectionManager extends EventEmitter<ConnectionManagerEven
this.peerValues.set(peerIdStr, this.opts.defaultPeerValue)
}

await this._checkMaxLimit('maxConnections', this.getConnections().length)
const numConnections = this.getConnections().length
const toPrune = numConnections - this.opts.maxConnections

await this._checkMaxLimit('maxConnections', numConnections, toPrune)
this.dispatchEvent(new CustomEvent<Connection>('peer:connect', { detail: connection }))
}

Expand Down Expand Up @@ -459,7 +462,7 @@ export class DefaultConnectionManager extends EventEmitter<ConnectionManagerEven
_onLatencyMeasure (evt: CustomEvent<SummaryObject>) {
const { detail: summary } = evt

this._checkMaxLimit('maxEventLoopDelay', summary.avgMs)
this._checkMaxLimit('maxEventLoopDelay', summary.avgMs, 1)
.catch(err => {
log.error(err)
})
Expand All @@ -468,46 +471,59 @@ export class DefaultConnectionManager extends EventEmitter<ConnectionManagerEven
/**
* If the `value` of `name` has exceeded its limit, maybe close a connection
*/
async _checkMaxLimit (name: keyof ConnectionManagerInit, value: number) {
async _checkMaxLimit (name: keyof ConnectionManagerInit, value: number, toPrune: number = 1) {
const limit = this.opts[name]
log.trace('checking limit of %s. current value: %d of %d', name, value, limit)
if (value > limit) {
log('%s: limit exceeded: %p, %d', this.components.getPeerId(), name, value)
await this._maybeDisconnectOne()
log('%s: limit exceeded: %p, %d, pruning %d connection(s)', this.components.getPeerId(), name, value, toPrune)
await this._maybePruneConnections(toPrune)
}
}

/**
* If we have more connections than our maximum, close a connection
* to the lowest valued peer.
* If we have more connections than our maximum, select some excess connections
* to prune based on peer value
*/
async _maybeDisconnectOne () {
if (this.opts.minConnections < this.connections.size) {
const peerValues = Array.from(new Map([...this.peerValues.entries()].sort((a, b) => a[1] - b[1])))

log('%p: sorted peer values: %j', this.components.getPeerId(), peerValues)
const disconnectPeer = peerValues[0]

if (disconnectPeer != null) {
const peerId = disconnectPeer[0]
log('%p: lowest value peer is %s', this.components.getPeerId(), peerId)
log('%p: closing a connection to %j', this.components.getPeerId(), peerId)

for (const connections of this.connections.values()) {
if (connections[0].remotePeer.toString() === peerId) {
void connections[0].close()
.catch(err => {
log.error(err)
})

// TODO: should not need to invoke this manually
this.onDisconnect(new CustomEvent<Connection>('connectionEnd', {
detail: connections[0]
}))
break
}
async _maybePruneConnections (toPrune: number) {
const connections = this.getConnections()

if (connections.length <= this.opts.minConnections || toPrune < 1) {
return
}

const peerValues = Array.from(new Map([...this.peerValues.entries()].sort((a, b) => a[1] - b[1])))
log.trace('sorted peer values: %j', peerValues)

const toClose = []

for (const [peerId] of peerValues) {
log('too many connections open - closing a connection to %p', peerId)

for (const connection of connections) {
if (connection.remotePeer.toString() === peerId) {
toClose.push(connection)
}

if (toClose.length === toPrune) {
break
}
}
}

// close connections
await Promise.all(
toClose.map(async connection => {
try {
await connection.close()
} catch (err) {
log.error(err)
}

// TODO: should not need to invoke this manually
this.onDisconnect(new CustomEvent<Connection>('connectionEnd', {
detail: connection
}))
})
)
}
}
4 changes: 2 additions & 2 deletions test/connection-manager/index.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ describe('Connection Manager', () => {
await libp2p.start()

const connectionManager = libp2p.components.getConnectionManager() as DefaultConnectionManager
const connectionManagerMaybeDisconnectOneSpy = sinon.spy(connectionManager, '_maybeDisconnectOne')
const connectionManagerMaybeDisconnectOneSpy = sinon.spy(connectionManager, '_maybePruneConnections')

// Add 1 too many connections
const spies = new Map<number, sinon.SinonSpy<[], Promise<void>>>()
Expand Down Expand Up @@ -115,7 +115,7 @@ describe('Connection Manager', () => {
await libp2p.start()

const connectionManager = libp2p.components.getConnectionManager() as DefaultConnectionManager
const connectionManagerMaybeDisconnectOneSpy = sinon.spy(connectionManager, '_maybeDisconnectOne')
const connectionManagerMaybeDisconnectOneSpy = sinon.spy(connectionManager, '_maybePruneConnections')

// Add 1 too many connections
const spy = sinon.spy()
Expand Down
5 changes: 3 additions & 2 deletions test/metrics/index.node.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import { createBaseOptions } from '../utils/base-options.js'
import type { Libp2pNode } from '../../src/libp2p.js'
import type { Libp2pOptions } from '../../src/index.js'
import type { DefaultMetrics } from '../../src/metrics/index.js'
import drain from 'it-drain'

describe('libp2p.metrics', () => {
let libp2p: Libp2pNode
Expand Down Expand Up @@ -149,7 +150,7 @@ describe('libp2p.metrics', () => {
])
await populateAddressBooks([libp2p, remoteLibp2p])

void remoteLibp2p.handle('/echo/1.0.0', ({ stream }) => {
await remoteLibp2p.handle('/echo/1.0.0', ({ stream }) => {
void pipe(stream, stream)
})

Expand All @@ -160,7 +161,7 @@ describe('libp2p.metrics', () => {
await pipe(
[bytes],
stream,
async (source) => await toBuffer(source)
drain
)

const metrics = libp2p.components.getMetrics()
Expand Down

0 comments on commit f9073ec

Please sign in to comment.