Skip to content

Commit

Permalink
feat!: use tag values to choose which connections to close (#1276)
Browse files Browse the repository at this point in the history
Uses peer tag values to select low-value connections to prune when we have too many connections open.

BREAKING CHANGE: `connectionManager.peerValue` has been removed, use `peerStore.tagPeer` instead
  • Loading branch information
achingbrain committed Jun 27, 2022
1 parent ceb44f9 commit b1b2b21
Show file tree
Hide file tree
Showing 6 changed files with 135 additions and 92 deletions.
105 changes: 78 additions & 27 deletions doc/API.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@
* [`peerStore.delete`](#peerstoredelete)
* [`peerStore.get`](#peerstoreget)
* [`peerStore.peers`](#peerstorepeers)
* [`peerStore.tagPeer`](#peerstoretagpeer)
* [`peerStore.unTagPeer`](#peerstoreuntagpeer)
* [`peerStore.getTags`](#peerstoregettags)
* [`pubsub.getSubscribers`](#pubsubgetsubscribers)
* [`pubsub.getTopics`](#pubsubgettopics)
* [`pubsub.publish`](#pubsubpublish)
Expand All @@ -56,7 +59,6 @@
* [`pubsub.topicValidators.set`](#pubsubtopicvalidatorsset)
* [`pubsub.topicValidators.delete`](#pubsubtopicvalidatorsdelete)
* [`connectionManager.get`](#connectionmanagerget)
* [`connectionManager.setPeerValue`](#connectionmanagersetpeervalue)
* [`connectionManager.size`](#connectionmanagersize)
* [`keychain.createKey`](#keychaincreatekey)
* [`keychain.renameKey`](#keychainrenamekey)
Expand Down Expand Up @@ -1399,6 +1401,81 @@ for (let [peerIdString, peer] of peerStore.peers.entries()) {
}
```

### peerStore.tagPeer

Tags a peer with the specified tag and optional value/expiry time

`peerStore.tagPeer(peerId, tag, options)`

#### Parameters

| Name | Type | Description |
|------|------|-------------|
| peerId | `PeerId` | The peer to tag |
| tag | `string` | The name of the tag to add |
| options | `{ value?: number, ttl?: number }` | An optional value (1-100) and an optional ttl after which the tag will expire (ms) |

#### Returns

| Type | Description |
|------|-------------|
| `Promise<void>` | Promise resolves once the tag is stored |

#### Example

```js
await peerStore.tagPeer(peerId, 'my-tag', { value: 100, ttl: Date.now() + 60000 })
```

### peerStore.unTagPeer

Remove the tag from the specified peer

`peerStore.unTagPeer(peerId, tag)`

#### Parameters

| Name | Type | Description |
|------|------|-------------|
| peerId | `PeerId` | The peer to untag |
| tag | `string` | The name of the tag to remove |

#### Returns

| Type | Description |
|------|-------------|
| `Promise<void>` | Promise resolves once the tag has been removed |

#### Example

```js
await peerStore.unTagPeer(peerId, 'my-tag')
```

### peerStore.getTags

Remove the tag from the specified peer

`peerStore.getTags(peerId)`

#### Parameters

| Name | Type | Description |
|------|------|-------------|
| peerId | `PeerId` | The peer to get the tags for |

#### Returns

| Type | Description |
|------|-------------|
| `Promise<Array<{ name: string, value: number }>>` | The promise resolves to the list of tags for the passed peer |

#### Example

```js
await peerStore.getTags(peerId)
```

### pubsub.getSubscribers

Gets a list of the peer-ids that are subscribed to one topic.
Expand Down Expand Up @@ -1672,32 +1749,6 @@ Get a connection with a given peer, if it exists.
libp2p.connectionManager.get(peerId)
```

### connectionManager.setPeerValue

Enables users to change the value of certain peers in a range of 0 to 1. Peers with the lowest values will have their Connections pruned first, if any Connection Manager limits are exceeded. See [./CONFIGURATION.md#configuring-connection-manager](./CONFIGURATION.md#configuring-connection-manager) for details on how to configure these limits.

`libp2p.connectionManager.setPeerValue(peerId, value)`

#### Parameters

| Name | Type | Description |
|------|------|-------------|
| peerId | [`PeerId`][peer-id] | The peer to set the value for |
| value | `number` | The value of the peer from 0 to 1 |

#### Returns

| Type | Description |
|------|-------------|
| `void` | |

#### Example

```js
libp2p.connectionManager.setPeerValue(highPriorityPeerId, 1)
libp2p.connectionManager.setPeerValue(lowPriorityPeerId, 0)
```

### connectionManager.size

Getter for obtaining the current number of open connections.
Expand Down
1 change: 0 additions & 1 deletion doc/CONFIGURATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -556,7 +556,6 @@ const node = await createLibp2p({
maxConnections: Infinity,
minConnections: 0,
pollInterval: 2000,
defaultPeerValue: 1,
// The below values will only be taken into account when Metrics are enabled
maxData: Infinity,
maxSentData: Infinity,
Expand Down
1 change: 0 additions & 1 deletion doc/CONNECTION_MANAGER.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,3 @@ The following is a list of available options for setting limits for the Connecti
- `maxEventLoopDelay`: sets the maximum event loop delay (measured in milliseconds) this node is willing to endure before it starts disconnecting peers. Defaults to `Infinity`.
- `pollInterval`: sets the poll interval (in milliseconds) for assessing the current state and determining if this peer needs to force a disconnect. Defaults to `2000` (2 seconds).
- `movingAverageInterval`: the interval used to calculate moving averages (in milliseconds). Defaults to `60000` (1 minute). This must be an available interval configured in `Metrics`
- `defaultPeerValue`: number between 0 and 1. Defaults to 1.
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@
"generate:proto:fetch": "protons ./src/fetch/pb/proto.proto",
"generate:proto:identify": "protons ./src/identify/pb/message.proto",
"generate:proto:plaintext": "protons ./src/insecure/pb/proto.proto",
"generate:proto:tags": "protons ./src/connection-manager/tags/tags.proto",
"test": "aegir test",
"test:node": "aegir test -t node -f \"./dist/test/**/*.{node,spec}.js\" --cov",
"test:chrome": "aegir test -t browser -f \"./dist/test/**/*.spec.js\" --cov",
Expand Down
92 changes: 42 additions & 50 deletions src/connection-manager/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import * as STATUS from '@libp2p/interface-connection/status'
import { Dialer } from './dialer/index.js'
import type { AddressSorter } from '@libp2p/interface-peer-store'
import type { Resolver } from '@multiformats/multiaddr'
import { PeerMap } from '@libp2p/peer-collections'

const log = logger('libp2p:connection-manager')

Expand All @@ -30,13 +31,11 @@ const defaultOptions: Partial<ConnectionManagerInit> = {
maxEventLoopDelay: Infinity,
pollInterval: 2000,
autoDialInterval: 10000,
movingAverageInterval: 60000,
defaultPeerValue: 0.5
movingAverageInterval: 60000
}

const METRICS_COMPONENT = 'connection-manager'
const METRICS_PEER_CONNECTIONS = 'peer-connections'
const METRICS_PEER_VALUES = 'peer-values'

export interface ConnectionManagerInit {
/**
Expand Down Expand Up @@ -79,11 +78,6 @@ export interface ConnectionManagerInit {
*/
movingAverageInterval?: number

/**
* The value of the peer
*/
defaultPeerValue?: number

/**
* If true, try to connect to all discovered peers up to the connection manager limit
*/
Expand Down Expand Up @@ -138,7 +132,6 @@ export class DefaultConnectionManager extends EventEmitter<ConnectionManagerEven
public readonly dialer: Dialer
private components = new Components()
private readonly opts: Required<ConnectionManagerInit>
private readonly peerValues: Map<string, number>
private readonly connections: Map<string, Connection[]>
private started: boolean
private timer?: ReturnType<retimer>
Expand All @@ -155,17 +148,6 @@ export class DefaultConnectionManager extends EventEmitter<ConnectionManagerEven

log('options: %o', this.opts)

/**
* Map of peer identifiers to their peer value for pruning connections.
*
* @type {Map<string, number>}
*/
this.peerValues = trackedMap({
component: METRICS_COMPONENT,
metric: METRICS_PEER_VALUES,
metrics: this.components.getMetrics()
})

/**
* Map of connections per peer
*/
Expand Down Expand Up @@ -271,18 +253,6 @@ export class DefaultConnectionManager extends EventEmitter<ConnectionManagerEven
this.connections.clear()
}

/**
* Sets the value of the given peer. Peers with lower values
* will be disconnected first.
*/
setPeerValue (peerId: PeerId, value: number) {
if (value < 0 || value > 1) {
throw new Error('value should be a number between 0 and 1')
}

this.peerValues.set(peerId.toString(), value)
}

/**
* Checks the libp2p metrics to determine if any values have exceeded
* the configured maximums.
Expand Down Expand Up @@ -340,10 +310,6 @@ export class DefaultConnectionManager extends EventEmitter<ConnectionManagerEven
await this.components.getPeerStore().keyBook.set(peerId, peerId.publicKey)
}

if (!this.peerValues.has(peerIdStr)) {
this.peerValues.set(peerIdStr, this.opts.defaultPeerValue)
}

const numConnections = this.getConnections().length
const toPrune = numConnections - this.opts.maxConnections

Expand All @@ -370,7 +336,6 @@ export class DefaultConnectionManager extends EventEmitter<ConnectionManagerEven
this.connections.set(peerId, storedConn)
} else if (storedConn != null) {
this.connections.delete(peerId)
this.peerValues.delete(connection.remotePeer.toString())
this.dispatchEvent(new CustomEvent<Connection>('peer:disconnect', { detail: connection }))

this.components.getMetrics()?.onPeerDisconnected(connection.remotePeer)
Expand Down Expand Up @@ -475,7 +440,7 @@ export class DefaultConnectionManager extends EventEmitter<ConnectionManagerEven
const limit = this.opts[name]
log.trace('checking limit of %s. current value: %d of %d', name, value, limit)
if (value > limit) {
log('%s: limit exceeded: %p, %d, pruning %d connection(s)', this.components.getPeerId(), name, value, toPrune)
log('%s: limit exceeded: %p, %d/%d, pruning %d connection(s)', this.components.getPeerId(), name, value, limit, toPrune)
await this._maybePruneConnections(toPrune)
}
}
Expand All @@ -491,22 +456,49 @@ export class DefaultConnectionManager extends EventEmitter<ConnectionManagerEven
return
}

const peerValues = Array.from(new Map([...this.peerValues.entries()].sort((a, b) => a[1] - b[1])))
log.trace('sorted peer values: %j', peerValues)
const peerValues = new PeerMap<number>()

const toClose = []
// work out peer values
for (const connection of connections) {
const remotePeer = connection.remotePeer

for (const [peerId] of peerValues) {
log('too many connections open - closing a connection to %p', peerId)
if (peerValues.has(remotePeer)) {
continue
}

for (const connection of connections) {
if (connection.remotePeer.toString() === peerId) {
toClose.push(connection)
}
const tags = await this.components.getPeerStore().getTags(remotePeer)

if (toClose.length === toPrune) {
break
}
// sum all tag values
peerValues.set(remotePeer, tags.reduce((acc, curr) => {
return acc + curr.value
}, 0))
}

// sort by value, lowest to highest
const sortedConnections = connections.sort((a, b) => {
const peerAValue = peerValues.get(a.remotePeer) ?? 0
const peerBValue = peerValues.get(b.remotePeer) ?? 0

if (peerAValue > peerBValue) {
return 1
}

if (peerAValue < peerBValue) {
return -1
}

return 0
})

// close some connections
const toClose = []

for (const connection of sortedConnections) {
log('too many connections open - closing a connection to %p', connection.remotePeer)
toClose.push(connection)

if (toClose.length === toPrune) {
break
}
}

Expand Down
27 changes: 14 additions & 13 deletions test/connection-manager/index.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ describe('Connection Manager', () => {
expect(libp2p.components.getMetrics()).to.exist()
})

it('should close lowest value peer connection when the maximum has been reached', async () => {
it('should close connections with low tag values first', async () => {
const max = 5
libp2p = await createNode({
config: createBaseOptions({
Expand All @@ -67,20 +67,21 @@ describe('Connection Manager', () => {

const connectionManager = libp2p.components.getConnectionManager() as DefaultConnectionManager
const connectionManagerMaybeDisconnectOneSpy = sinon.spy(connectionManager, '_maybePruneConnections')

// Add 1 too many connections
const spies = new Map<number, sinon.SinonSpy<[], Promise<void>>>()
await Promise.all([...new Array(max + 1)].map(async (_, index) => {

// Add 1 connection too many
for (let i = 0; i < max + 1; i++) {
const connection = mockConnection(mockMultiaddrConnection(mockDuplex(), await createEd25519PeerId()))
const spy = sinon.spy(connection, 'close')
// The connections have the same remote id, give them random ones
// so that we can verify the correct connection was closed
// sinon.stub(connection.remotePeer, 'toString').returns(index)
const value = Math.random()

const value = Math.round(Math.random() * 100)
spies.set(value, spy)
connectionManager.setPeerValue(connection.remotePeer, value)
await libp2p.peerStore.tagPeer(connection.remotePeer, 'test-tag', {
value
})

await connectionManager._onConnect(new CustomEvent('connection', { detail: connection }))
}))
}

// get the lowest value
const lowest = Array.from(spies.keys()).sort((a, b) => {
Expand All @@ -100,7 +101,7 @@ describe('Connection Manager', () => {
expect(lowestSpy).to.have.property('callCount', 1)
})

it('should close connection when the maximum has been reached even without peer values', async () => {
it('should close connection when the maximum has been reached even without tags', async () => {
const max = 5
libp2p = await createNode({
config: createBaseOptions({
Expand All @@ -119,11 +120,11 @@ describe('Connection Manager', () => {

// Add 1 too many connections
const spy = sinon.spy()
await Promise.all([...new Array(max + 1)].map(async () => {
for (let i = 0; i < max + 1; i++) {
const connection = mockConnection(mockMultiaddrConnection(mockDuplex(), await createEd25519PeerId()))
sinon.stub(connection, 'close').callsFake(async () => spy()) // eslint-disable-line
await connectionManager._onConnect(new CustomEvent('connection', { detail: connection }))
}))
}

expect(connectionManagerMaybeDisconnectOneSpy.callCount).to.equal(1)
expect(spy).to.have.property('callCount', 1)
Expand Down

0 comments on commit b1b2b21

Please sign in to comment.