Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: prune connections based on stream counts and direction #2521

Merged
merged 2 commits into from
May 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 66 additions & 35 deletions packages/libp2p/src/connection-manager/connection-pruner.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { PeerMap } from '@libp2p/peer-collections'
import { safelyCloseConnectionIfUnused } from '@libp2p/utils/close'
import { MAX_CONNECTIONS } from './constants.js'
import type { Libp2pEvents, Logger, ComponentLogger, TypedEventTarget, PeerStore } from '@libp2p/interface'
import type { Libp2pEvents, Logger, ComponentLogger, TypedEventTarget, PeerStore, Connection } from '@libp2p/interface'
import type { ConnectionManager } from '@libp2p/interface-internal'
import type { Multiaddr } from '@multiformats/multiaddr'

Expand Down Expand Up @@ -56,14 +57,13 @@ export class ConnectionPruner {
async maybePruneConnections (): Promise<void> {
const connections = this.connectionManager.getConnections()
const numConnections = connections.length
const toPrune = Math.max(numConnections - this.maxConnections, 0)

this.log('checking max connections limit %d/%d', numConnections, this.maxConnections)

if (numConnections <= this.maxConnections) {
return
}

this.log('max connections limit exceeded %d/%d, pruning %d connection(s)', numConnections, this.maxConnections, toPrune)
const peerValues = new PeerMap<number>()

// work out peer values
Expand All @@ -90,35 +90,10 @@ export class ConnectionPruner {
}
}

// sort by value, lowest to highest
const sortedConnections = connections.sort((a, b) => {
const peerAValue = peerValues.get(a.remotePeer) ?? 0
const peerBValue = peerValues.get(b.remotePeer) ?? 0

if (peerAValue > peerBValue) {
return 1
}

if (peerAValue < peerBValue) {
return -1
}

// if the peers have an equal tag value then we want to close short-lived connections first
const connectionALifespan = a.timeline.open
const connectionBLifespan = b.timeline.open

if (connectionALifespan < connectionBLifespan) {
return 1
}

if (connectionALifespan > connectionBLifespan) {
return -1
}

return 0
})
const sortedConnections = this.sortConnections(connections, peerValues)

// close some connections
const toPrune = Math.max(numConnections - this.maxConnections, 0)
const toClose = []

for (const connection of sortedConnections) {
Expand All @@ -141,15 +116,71 @@ export class ConnectionPruner {
// close connections
await Promise.all(
toClose.map(async connection => {
try {
await connection.close()
} catch (err) {
this.log.error(err)
}
await safelyCloseConnectionIfUnused(connection, {
signal: AbortSignal.timeout(1000)
})
})
)

// despatch prune event
this.events.safeDispatchEvent('connection:prune', { detail: toClose })
}

sortConnections (connections: Connection[], peerValues: PeerMap<number>): Connection[] {
return connections
// sort by connection age, newest to oldest
.sort((a, b) => {
const connectionALifespan = a.timeline.open
const connectionBLifespan = b.timeline.open

if (connectionALifespan < connectionBLifespan) {
return 1
}

if (connectionALifespan > connectionBLifespan) {
return -1
}

return 0
})
// sort by direction, incoming first then outgoing
.sort((a, b) => {
if (a.direction === 'outbound' && b.direction === 'inbound') {
return 1
}

if (a.direction === 'inbound' && b.direction === 'outbound') {
return -1
}

return 0
})
// sort by number of streams, lowest to highest
.sort((a, b) => {
if (a.streams.length > b.streams.length) {
return 1
}

if (a.streams.length < b.streams.length) {
return -1
}

return 0
})
// sort by tag value, lowest to highest
.sort((a, b) => {
const peerAValue = peerValues.get(a.remotePeer) ?? 0
const peerBValue = peerValues.get(b.remotePeer) ?? 0

if (peerAValue > peerBValue) {
return 1
}

if (peerAValue < peerBValue) {
return -1
}

return 0
})
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
import { TypedEventEmitter } from '@libp2p/interface'
import { defaultLogger } from '@libp2p/logger'
import { PeerMap } from '@libp2p/peer-collections'
import { createEd25519PeerId } from '@libp2p/peer-id-factory'
import { expect } from 'aegir/chai'
import { stubInterface } from 'sinon-ts'
import { ConnectionPruner } from '../../src/connection-manager/connection-pruner.js'
import type { Libp2pEvents, PeerStore, Stream, TypedEventTarget, Connection } from '@libp2p/interface'
import type { ConnectionManager } from '@libp2p/interface-internal'
import type { StubbedInstance } from 'sinon-ts'

interface ConnectionPrunerComponents {
connectionManager: StubbedInstance<ConnectionManager>
peerStore: StubbedInstance<PeerStore>
events: TypedEventTarget<Libp2pEvents>
}

describe('connection-pruner', () => {
let pruner: ConnectionPruner
let components: ConnectionPrunerComponents

beforeEach(() => {
components = {
connectionManager: stubInterface<ConnectionManager>(),
peerStore: stubInterface<PeerStore>(),
events: new TypedEventEmitter()
}

pruner = new ConnectionPruner({
...components,
logger: defaultLogger()
}, {})
})

it('should sort connections for pruning, closing connections without streams first unless they are tagged', async () => {
const tagged = ['tagged', 'untagged']
const streams = ['streams', 'no-streams']
const direction = ['inbound', 'outbound']
const age = ['old', 'new']

const connections = []
const peerValues = new PeerMap<number>()

for (const t of tagged) {
for (const s of streams) {
for (const d of direction) {
for (const a of age) {
const connection = stubInterface<Connection>({
id: `${t}-${s}-${d}-${a}`,
remotePeer: await createEd25519PeerId(),
streams: s === 'streams'
? [stubInterface<Stream>()]
: [],
direction: d === 'inbound' ? 'inbound' : 'outbound',
timeline: {
open: a === 'old' ? 0 : (Date.now() - 100)
}
})

// eslint-disable-next-line max-depth
if (t === 'tagged') {
peerValues.set(connection.remotePeer, 100)
}

connections.push(
connection
)
}
}
}
}

// priority is:
// 1. tagged peers
// 2. connections with streams
// 3. outbound connections
maschad marked this conversation as resolved.
Show resolved Hide resolved
// 4. longer-lived connections
expect(pruner.sortConnections(connections.sort((a, b) => Math.random() > 0.5 ? -1 : 1), peerValues).map(conn => conn.id))
.to.deep.equal([
'untagged-no-streams-inbound-new',
'untagged-no-streams-inbound-old',
'untagged-no-streams-outbound-new',
'untagged-no-streams-outbound-old',
'untagged-streams-inbound-new',
'untagged-streams-inbound-old',
'untagged-streams-outbound-new',
'untagged-streams-outbound-old',
'tagged-no-streams-inbound-new',
'tagged-no-streams-inbound-old',
'tagged-no-streams-outbound-new',
'tagged-no-streams-outbound-old',
'tagged-streams-inbound-new',
'tagged-streams-inbound-old',
'tagged-streams-outbound-new',
'tagged-streams-outbound-old'
])
})
})
9 changes: 9 additions & 0 deletions packages/utils/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@
"types": "./src/index.d.ts",
"import": "./dist/src/index.js"
},
"./abort-options": {
"types": "./dist/src/abort-options.d.ts",
"import": "./dist/src/abort-options.js"
},
"./abstract-stream": {
"types": "./dist/src/abstract-stream.d.ts",
"import": "./dist/src/abstract-stream.js"
Expand All @@ -60,6 +64,10 @@
"types": "./dist/src/close-source.d.ts",
"import": "./dist/src/close-source.js"
},
"./close": {
"types": "./dist/src/close.d.ts",
"import": "./dist/src/close.js"
},
"./filters": {
"types": "./dist/src/filters/index.d.ts",
"import": "./dist/src/filters/index.js"
Expand Down Expand Up @@ -139,6 +147,7 @@
"@multiformats/multiaddr-matcher": "^1.2.0",
"@sindresorhus/fnv1a": "^3.1.0",
"@types/murmurhash3js-revisited": "^3.0.3",
"any-signal": "^4.1.1",
"delay": "^6.0.0",
"get-iterator": "^2.0.1",
"is-loopback-addr": "^2.0.2",
Expand Down
20 changes: 20 additions & 0 deletions packages/utils/src/abort-options.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import { setMaxListeners } from '@libp2p/interface'
import { anySignal } from 'any-signal'
import type { AbortOptions } from '@libp2p/interface'
import type { ClearableSignal } from 'any-signal'

export function createTimeoutOptions (timeout: number): AbortOptions
export function createTimeoutOptions (timeout: number, ...existingSignals: AbortSignal[]): { signal: ClearableSignal }
export function createTimeoutOptions (timeout: number, ...existingSignals: AbortSignal[]): AbortOptions {
let signal = AbortSignal.timeout(timeout)
setMaxListeners(Infinity, signal)

if (existingSignals.length > 0) {
signal = anySignal([signal, ...existingSignals])
setMaxListeners(Infinity, signal)
}

return {
signal
}
}
65 changes: 65 additions & 0 deletions packages/utils/src/close.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
import type { Connection, Stream, AbortOptions } from '@libp2p/interface'

/**
* Close the passed stream, falling back to aborting the stream if closing
* cleanly fails.
*/
export async function safelyCloseStream (stream?: Stream, options?: AbortOptions): Promise<void> {
try {
await stream?.close(options)
} catch (err: any) {
stream?.abort(err)
}
}

/**
* These are speculative protocols that are run automatically on connection open
* so are usually not the reason the connection was opened.
*
* Consequently when requested it should be safe to close connections that only
* have these protocol streams open.
*/
const DEFAULT_CLOSABLE_PROTOCOLS = [
// identify
'/ipfs/id/1.0.0',

// identify-push
'/ipfs/id/push/1.0.0',

// autonat
'/libp2p/autonat/1.0.0',

// dcutr
'/libp2p/dcutr'
]

export interface SafelyCloseConnectionOptions extends AbortOptions {
/**
* Only close the stream if it either has no protocol streams open or only
* ones in this list.
*
* @default ['/ipfs/id/1.0.0']
*/
closableProtocols?: string[]
}

/**
* Close the passed connection if it has no streams, or only closable protocol
* streams, falling back to aborting the connection if closing it cleanly fails.
*/
export async function safelyCloseConnectionIfUnused (connection?: Connection, options?: SafelyCloseConnectionOptions): Promise<void> {
const streamProtocols = connection?.streams?.map(stream => stream.protocol) ?? []
const closableProtocols = options?.closableProtocols ?? DEFAULT_CLOSABLE_PROTOCOLS

// if the connection has protocols not in the closable protocols list, do not
// close the connection
if (streamProtocols.filter(proto => proto != null && !closableProtocols.includes(proto)).length > 0) {
return
}

try {
await connection?.close(options)
} catch (err: any) {
connection?.abort(err)
}
}
Loading
Loading