Skip to content

Commit

Permalink
fix: prune connections based on stream counts and direction (#2521)
Browse files Browse the repository at this point in the history
Changes how we sort connections to prune to be more than just tags and age.

Now we'll sort by tags, the number of open streams, direction and age.

This should choose idle connections without streams to close over those that are in use.
  • Loading branch information
achingbrain committed May 7, 2024
1 parent a11e135 commit 8e36fc5
Show file tree
Hide file tree
Showing 6 changed files with 423 additions and 35 deletions.
101 changes: 66 additions & 35 deletions packages/libp2p/src/connection-manager/connection-pruner.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { PeerMap } from '@libp2p/peer-collections'
import { safelyCloseConnectionIfUnused } from '@libp2p/utils/close'
import { MAX_CONNECTIONS } from './constants.js'
import type { Libp2pEvents, Logger, ComponentLogger, TypedEventTarget, PeerStore } from '@libp2p/interface'
import type { Libp2pEvents, Logger, ComponentLogger, TypedEventTarget, PeerStore, Connection } from '@libp2p/interface'
import type { ConnectionManager } from '@libp2p/interface-internal'
import type { Multiaddr } from '@multiformats/multiaddr'

Expand Down Expand Up @@ -56,14 +57,13 @@ export class ConnectionPruner {
async maybePruneConnections (): Promise<void> {
const connections = this.connectionManager.getConnections()
const numConnections = connections.length
const toPrune = Math.max(numConnections - this.maxConnections, 0)

this.log('checking max connections limit %d/%d', numConnections, this.maxConnections)

if (numConnections <= this.maxConnections) {
return
}

this.log('max connections limit exceeded %d/%d, pruning %d connection(s)', numConnections, this.maxConnections, toPrune)
const peerValues = new PeerMap<number>()

// work out peer values
Expand All @@ -90,35 +90,10 @@ export class ConnectionPruner {
}
}

// sort by value, lowest to highest
const sortedConnections = connections.sort((a, b) => {
const peerAValue = peerValues.get(a.remotePeer) ?? 0
const peerBValue = peerValues.get(b.remotePeer) ?? 0

if (peerAValue > peerBValue) {
return 1
}

if (peerAValue < peerBValue) {
return -1
}

// if the peers have an equal tag value then we want to close short-lived connections first
const connectionALifespan = a.timeline.open
const connectionBLifespan = b.timeline.open

if (connectionALifespan < connectionBLifespan) {
return 1
}

if (connectionALifespan > connectionBLifespan) {
return -1
}

return 0
})
const sortedConnections = this.sortConnections(connections, peerValues)

// close some connections
const toPrune = Math.max(numConnections - this.maxConnections, 0)
const toClose = []

for (const connection of sortedConnections) {
Expand All @@ -141,15 +116,71 @@ export class ConnectionPruner {
// close connections
await Promise.all(
toClose.map(async connection => {
try {
await connection.close()
} catch (err) {
this.log.error(err)
}
await safelyCloseConnectionIfUnused(connection, {
signal: AbortSignal.timeout(1000)
})
})
)

// despatch prune event
this.events.safeDispatchEvent('connection:prune', { detail: toClose })
}

sortConnections (connections: Connection[], peerValues: PeerMap<number>): Connection[] {
return connections
// sort by connection age, newest to oldest
.sort((a, b) => {
const connectionALifespan = a.timeline.open
const connectionBLifespan = b.timeline.open

if (connectionALifespan < connectionBLifespan) {
return 1
}

if (connectionALifespan > connectionBLifespan) {
return -1
}

return 0
})
// sort by direction, incoming first then outgoing
.sort((a, b) => {
if (a.direction === 'outbound' && b.direction === 'inbound') {
return 1
}

if (a.direction === 'inbound' && b.direction === 'outbound') {
return -1
}

return 0
})
// sort by number of streams, lowest to highest
.sort((a, b) => {
if (a.streams.length > b.streams.length) {
return 1
}

if (a.streams.length < b.streams.length) {
return -1
}

return 0
})
// sort by tag value, lowest to highest
.sort((a, b) => {
const peerAValue = peerValues.get(a.remotePeer) ?? 0
const peerBValue = peerValues.get(b.remotePeer) ?? 0

if (peerAValue > peerBValue) {
return 1
}

if (peerAValue < peerBValue) {
return -1
}

return 0
})
}
}
98 changes: 98 additions & 0 deletions packages/libp2p/test/connection-manager/connection-pruner.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
import { TypedEventEmitter } from '@libp2p/interface'
import { defaultLogger } from '@libp2p/logger'
import { PeerMap } from '@libp2p/peer-collections'
import { createEd25519PeerId } from '@libp2p/peer-id-factory'
import { expect } from 'aegir/chai'
import { stubInterface } from 'sinon-ts'
import { ConnectionPruner } from '../../src/connection-manager/connection-pruner.js'
import type { Libp2pEvents, PeerStore, Stream, TypedEventTarget, Connection } from '@libp2p/interface'
import type { ConnectionManager } from '@libp2p/interface-internal'
import type { StubbedInstance } from 'sinon-ts'

interface ConnectionPrunerComponents {
connectionManager: StubbedInstance<ConnectionManager>
peerStore: StubbedInstance<PeerStore>
events: TypedEventTarget<Libp2pEvents>
}

describe('connection-pruner', () => {
let pruner: ConnectionPruner
let components: ConnectionPrunerComponents

beforeEach(() => {
components = {
connectionManager: stubInterface<ConnectionManager>(),
peerStore: stubInterface<PeerStore>(),
events: new TypedEventEmitter()
}

pruner = new ConnectionPruner({
...components,
logger: defaultLogger()
}, {})
})

it('should sort connections for pruning, closing connections without streams first unless they are tagged', async () => {
const tagged = ['tagged', 'untagged']
const streams = ['streams', 'no-streams']
const direction = ['inbound', 'outbound']
const age = ['old', 'new']

const connections = []
const peerValues = new PeerMap<number>()

for (const t of tagged) {
for (const s of streams) {
for (const d of direction) {
for (const a of age) {
const connection = stubInterface<Connection>({
id: `${t}-${s}-${d}-${a}`,
remotePeer: await createEd25519PeerId(),
streams: s === 'streams'
? [stubInterface<Stream>()]
: [],
direction: d === 'inbound' ? 'inbound' : 'outbound',
timeline: {
open: a === 'old' ? 0 : (Date.now() - 100)
}
})

// eslint-disable-next-line max-depth
if (t === 'tagged') {
peerValues.set(connection.remotePeer, 100)
}

connections.push(
connection
)
}
}
}
}

// priority is:
// 1. tagged peers
// 2. connections with streams
// 3. outbound connections
// 4. longer-lived connections
expect(pruner.sortConnections(connections.sort((a, b) => Math.random() > 0.5 ? -1 : 1), peerValues).map(conn => conn.id))
.to.deep.equal([
'untagged-no-streams-inbound-new',
'untagged-no-streams-inbound-old',
'untagged-no-streams-outbound-new',
'untagged-no-streams-outbound-old',
'untagged-streams-inbound-new',
'untagged-streams-inbound-old',
'untagged-streams-outbound-new',
'untagged-streams-outbound-old',
'tagged-no-streams-inbound-new',
'tagged-no-streams-inbound-old',
'tagged-no-streams-outbound-new',
'tagged-no-streams-outbound-old',
'tagged-streams-inbound-new',
'tagged-streams-inbound-old',
'tagged-streams-outbound-new',
'tagged-streams-outbound-old'
])
})
})
9 changes: 9 additions & 0 deletions packages/utils/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@
"types": "./src/index.d.ts",
"import": "./dist/src/index.js"
},
"./abort-options": {
"types": "./dist/src/abort-options.d.ts",
"import": "./dist/src/abort-options.js"
},
"./abstract-stream": {
"types": "./dist/src/abstract-stream.d.ts",
"import": "./dist/src/abstract-stream.js"
Expand All @@ -60,6 +64,10 @@
"types": "./dist/src/close-source.d.ts",
"import": "./dist/src/close-source.js"
},
"./close": {
"types": "./dist/src/close.d.ts",
"import": "./dist/src/close.js"
},
"./filters": {
"types": "./dist/src/filters/index.d.ts",
"import": "./dist/src/filters/index.js"
Expand Down Expand Up @@ -139,6 +147,7 @@
"@multiformats/multiaddr-matcher": "^1.2.0",
"@sindresorhus/fnv1a": "^3.1.0",
"@types/murmurhash3js-revisited": "^3.0.3",
"any-signal": "^4.1.1",
"delay": "^6.0.0",
"get-iterator": "^2.0.1",
"is-loopback-addr": "^2.0.2",
Expand Down
20 changes: 20 additions & 0 deletions packages/utils/src/abort-options.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import { setMaxListeners } from '@libp2p/interface'
import { anySignal } from 'any-signal'
import type { AbortOptions } from '@libp2p/interface'
import type { ClearableSignal } from 'any-signal'

export function createTimeoutOptions (timeout: number): AbortOptions
export function createTimeoutOptions (timeout: number, ...existingSignals: AbortSignal[]): { signal: ClearableSignal }
export function createTimeoutOptions (timeout: number, ...existingSignals: AbortSignal[]): AbortOptions {
let signal = AbortSignal.timeout(timeout)
setMaxListeners(Infinity, signal)

if (existingSignals.length > 0) {
signal = anySignal([signal, ...existingSignals])
setMaxListeners(Infinity, signal)
}

return {
signal
}
}
65 changes: 65 additions & 0 deletions packages/utils/src/close.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
import type { Connection, Stream, AbortOptions } from '@libp2p/interface'

/**
* Close the passed stream, falling back to aborting the stream if closing
* cleanly fails.
*/
export async function safelyCloseStream (stream?: Stream, options?: AbortOptions): Promise<void> {
try {
await stream?.close(options)
} catch (err: any) {
stream?.abort(err)
}
}

/**
* These are speculative protocols that are run automatically on connection open
* so are usually not the reason the connection was opened.
*
* Consequently when requested it should be safe to close connections that only
* have these protocol streams open.
*/
const DEFAULT_CLOSABLE_PROTOCOLS = [
// identify
'/ipfs/id/1.0.0',

// identify-push
'/ipfs/id/push/1.0.0',

// autonat
'/libp2p/autonat/1.0.0',

// dcutr
'/libp2p/dcutr'
]

export interface SafelyCloseConnectionOptions extends AbortOptions {
/**
* Only close the stream if it either has no protocol streams open or only
* ones in this list.
*
* @default ['/ipfs/id/1.0.0']
*/
closableProtocols?: string[]
}

/**
* Close the passed connection if it has no streams, or only closable protocol
* streams, falling back to aborting the connection if closing it cleanly fails.
*/
export async function safelyCloseConnectionIfUnused (connection?: Connection, options?: SafelyCloseConnectionOptions): Promise<void> {
const streamProtocols = connection?.streams?.map(stream => stream.protocol) ?? []
const closableProtocols = options?.closableProtocols ?? DEFAULT_CLOSABLE_PROTOCOLS

// if the connection has protocols not in the closable protocols list, do not
// close the connection
if (streamProtocols.filter(proto => proto != null && !closableProtocols.includes(proto)).length > 0) {
return
}

try {
await connection?.close(options)
} catch (err: any) {
connection?.abort(err)
}
}
Loading

0 comments on commit 8e36fc5

Please sign in to comment.