From 5a88c77139cab40a969a0e3151f17f8b5b0830a0 Mon Sep 17 00:00:00 2001 From: achingbrain Date: Mon, 13 Dec 2021 12:04:36 +0000 Subject: [PATCH 1/4] feat: allow per-component metrics to be collected Implements the idea from #1060 - allows us to get some insight into what's happening in a libp2p node out side of just bandwidth stats. --- src/connection-manager/index.js | 13 ++++++++++++ src/dialer/index.js | 18 ++++++++++++++++- src/index.js | 13 +++++++++--- src/metrics/index.js | 26 ++++++++++++++++-------- test/metrics/index.spec.js | 36 +++++++++++++++++++++------------ 5 files changed, 81 insertions(+), 25 deletions(-) diff --git a/src/connection-manager/index.js b/src/connection-manager/index.js index 84711272b9..1b908e1633 100644 --- a/src/connection-manager/index.js +++ b/src/connection-manager/index.js @@ -32,6 +32,10 @@ const defaultOptions = { defaultPeerValue: 1 } +const METRICS_COMPONENT = 'connection-manager' +const METRICS_PEER_CONNECTIONS = 'peer-connections' +const METRICS_ALL_CONNECTIONS = 'all-connections' + /** * @typedef {import('../')} Libp2p * @typedef {import('libp2p-interfaces/src/connection').Connection} Connection @@ -50,6 +54,7 @@ const defaultOptions = { * @property {number} [defaultPeerValue = 1] - The value of the peer. * @property {boolean} [autoDial = true] - Should preemptively guarantee connections are above the low watermark. * @property {number} [autoDialInterval = 10000] - How often, in milliseconds, it should preemptively guarantee connections are above the low watermark. + * @property {import('../metrics')} [metrics] */ /** @@ -95,6 +100,7 @@ class ConnectionManager extends EventEmitter { this._started = false this._timer = null this._checkMetrics = this._checkMetrics.bind(this) + this._metrics = this._options.metrics this._latencyMonitor = new LatencyMonitor({ latencyCheckIntervalMs: this._options.pollInterval, @@ -160,6 +166,8 @@ class ConnectionManager extends EventEmitter { await Promise.all(tasks) this.connections.clear() + this._metrics && this._metrics.updateMetric(METRICS_COMPONENT, METRICS_PEER_CONNECTIONS, 0) + this._metrics && this._metrics.updateMetric(METRICS_COMPONENT, METRICS_ALL_CONNECTIONS, 0) } /** @@ -215,6 +223,8 @@ class ConnectionManager extends EventEmitter { storedConn.push(connection) } else { this.connections.set(peerIdStr, [connection]) + this._metrics && this._metrics.updateMetric(METRICS_COMPONENT, METRICS_PEER_CONNECTIONS, this.connections.size) + this._metrics && this._metrics.updateMetric(METRICS_COMPONENT, METRICS_ALL_CONNECTIONS, this.size) } this._libp2p.peerStore.keyBook.set(peerId, peerId.pubKey) @@ -244,6 +254,9 @@ class ConnectionManager extends EventEmitter { this._peerValues.delete(connection.remotePeer.toB58String()) this.emit('peer:disconnect', connection) } + + this._metrics && this._metrics.updateMetric(METRICS_COMPONENT, METRICS_PEER_CONNECTIONS, this.connections.size) + this._metrics && this._metrics.updateMetric(METRICS_COMPONENT, METRICS_ALL_CONNECTIONS, this.size) } /** diff --git a/src/dialer/index.js b/src/dialer/index.js index b4dd87c26d..e6251be88e 100644 --- a/src/dialer/index.js +++ b/src/dialer/index.js @@ -22,6 +22,10 @@ const { MAX_ADDRS_TO_DIAL } = require('../constants') +const METRICS_COMPONENT = 'dialler' +const METRICS_PENDING_DIALS = 'pending-dials' +const METRICS_PENDING_DIAL_TARGETS = 'pending-dials-targers' + /** * @typedef {import('libp2p-interfaces/src/connection').Connection} Connection * @typedef {import('peer-id')} PeerId @@ -44,6 +48,7 @@ const { * @property {number} [maxDialsPerPeer = MAX_PER_PEER_DIALS] - Number of max concurrent dials per peer. * @property {number} [dialTimeout = DIAL_TIMEOUT] - How long a dial attempt is allowed to take. * @property {Record} [resolvers = {}] - multiaddr resolvers to use when dialing + * @property {import('../metrics')} [metrics] * * @typedef DialTarget * @property {string} id @@ -69,7 +74,8 @@ class Dialer { maxAddrsToDial = MAX_ADDRS_TO_DIAL, dialTimeout = DIAL_TIMEOUT, maxDialsPerPeer = MAX_PER_PEER_DIALS, - resolvers = {} + resolvers = {}, + metrics }) { this.transportManager = transportManager this.peerStore = peerStore @@ -81,6 +87,7 @@ class Dialer { this.tokens = [...new Array(maxParallelDials)].map((_, index) => index) this._pendingDials = new Map() this._pendingDialTargets = new Map() + this._metrics = metrics for (const [key, value] of Object.entries(resolvers)) { Multiaddr.resolvers.set(key, value) @@ -104,6 +111,9 @@ class Dialer { pendingTarget.reject(new AbortError('Dialer was destroyed')) } this._pendingDialTargets.clear() + + this._metrics && this._metrics.updateMetric(METRICS_COMPONENT, METRICS_PENDING_DIALS, 0) + this._metrics && this._metrics.updateMetric(METRICS_COMPONENT, METRICS_PENDING_DIAL_TARGETS, 0) } /** @@ -153,6 +163,7 @@ class Dialer { const id = `${(parseInt(String(Math.random() * 1e9), 10)).toString() + Date.now()}` const cancellablePromise = new Promise((resolve, reject) => { this._pendingDialTargets.set(id, { resolve, reject }) + this._metrics && this._metrics.updateMetric(METRICS_COMPONENT, METRICS_PENDING_DIAL_TARGETS, this._pendingDialTargets.size) }) try { @@ -164,6 +175,7 @@ class Dialer { return dialTarget } finally { this._pendingDialTargets.delete(id) + this._metrics && this._metrics.updateMetric(METRICS_COMPONENT, METRICS_PENDING_DIAL_TARGETS, this._pendingDialTargets.size) } } @@ -252,9 +264,13 @@ class Dialer { destroy: () => { timeoutController.clear() this._pendingDials.delete(dialTarget.id) + this._metrics && this._metrics.updateMetric(METRICS_COMPONENT, METRICS_PENDING_DIALS, this._pendingDials.size) } } this._pendingDials.set(dialTarget.id, pendingDial) + + this._metrics && this._metrics.updateMetric(METRICS_COMPONENT, METRICS_PENDING_DIALS, this._pendingDials.size) + return pendingDial } diff --git a/src/index.js b/src/index.js index 5e890479ea..dfcf80faf4 100644 --- a/src/index.js +++ b/src/index.js @@ -197,10 +197,16 @@ class Libp2p extends EventEmitter { // Create Metrics if (this._options.metrics.enabled) { - this.metrics = new Metrics({ - ...this._options.metrics, - connectionManager: this.connectionManager + const metrics = new Metrics({ + ...this._options.metrics }) + + // listen for peer disconnect events + this.connectionManager.on('peer:disconnect', (connection) => { + metrics.onPeerDisconnected(connection.remotePeer) + }) + + this.metrics = metrics } // Create keychain @@ -262,6 +268,7 @@ class Libp2p extends EventEmitter { this.dialer = new Dialer({ transportManager: this.transportManager, peerStore: this.peerStore, + metrics: this.metrics, ...this._options.dialer }) diff --git a/src/metrics/index.js b/src/metrics/index.js index 8d94861d81..8aa674d742 100644 --- a/src/metrics/index.js +++ b/src/metrics/index.js @@ -24,9 +24,6 @@ const directionToEvent = { */ /** - * @typedef MetricsProperties - * @property {import('../connection-manager')} connectionManager - * * @typedef MetricsOptions * @property {number} [computeThrottleMaxQueueSize = defaultOptions.computeThrottleMaxQueueSize] * @property {number} [computeThrottleTimeout = defaultOptions.computeThrottleTimeout] @@ -37,7 +34,7 @@ const directionToEvent = { class Metrics { /** * @class - * @param {MetricsProperties & MetricsOptions} options + * @param {MetricsOptions} options */ constructor (options) { this._options = mergeOptions(defaultOptions, options) @@ -47,10 +44,7 @@ class Metrics { this._oldPeers = oldPeerLRU(this._options.maxOldPeersRetention) this._running = false this._onMessage = this._onMessage.bind(this) - this._connectionManager = options.connectionManager - this._connectionManager.on('peer:disconnect', (connection) => { - this.onPeerDisconnected(connection.remotePeer) - }) + this._componentMetrics = new Map() } /** @@ -94,6 +88,22 @@ class Metrics { return Array.from(this._peerStats.keys()) } + /** + * @returns {Map} + */ + getComponentMetrics () { + return this._componentMetrics + } + + updateMetric (component, metric, value) { + if (!this._componentMetrics.has(component)) { + this._componentMetrics.set(component, new Map()) + } + + const map = this._componentMetrics.get(component) + map.set(metric, value) + } + /** * Returns the `Stats` object for the given `PeerId` whether it * is a live peer, or in the disconnected peer LRU cache. diff --git a/test/metrics/index.spec.js b/test/metrics/index.spec.js index 5f401e8ac6..1f50d2dfc4 100644 --- a/test/metrics/index.spec.js +++ b/test/metrics/index.spec.js @@ -3,9 +3,6 @@ const { expect } = require('aegir/utils/chai') const sinon = require('sinon') - -const { EventEmitter } = require('events') - const { randomBytes } = require('libp2p-crypto') const duplexPair = require('it-pair/duplex') const pipe = require('it-pipe') @@ -34,8 +31,7 @@ describe('Metrics', () => { const [local, remote] = duplexPair() const metrics = new Metrics({ computeThrottleMaxQueueSize: 1, // compute after every message - movingAverageIntervals: [10, 100, 1000], - connectionManager: new EventEmitter() + movingAverageIntervals: [10, 100, 1000] }) metrics.trackStream({ @@ -70,8 +66,7 @@ describe('Metrics', () => { const [local, remote] = duplexPair() const metrics = new Metrics({ computeThrottleMaxQueueSize: 1, // compute after every message - movingAverageIntervals: [10, 100, 1000], - connectionManager: new EventEmitter() + movingAverageIntervals: [10, 100, 1000] }) metrics.trackStream({ @@ -119,8 +114,7 @@ describe('Metrics', () => { const [local2, remote2] = duplexPair() const metrics = new Metrics({ computeThrottleMaxQueueSize: 1, // compute after every message - movingAverageIntervals: [10, 100, 1000], - connectionManager: new EventEmitter() + movingAverageIntervals: [10, 100, 1000] }) const protocol = '/echo/1.0.0' metrics.start() @@ -175,8 +169,7 @@ describe('Metrics', () => { const [local, remote] = duplexPair() const metrics = new Metrics({ computeThrottleMaxQueueSize: 1, // compute after every message - movingAverageIntervals: [10, 100, 1000], - connectionManager: new EventEmitter() + movingAverageIntervals: [10, 100, 1000] }) metrics.start() @@ -231,8 +224,7 @@ describe('Metrics', () => { })) const metrics = new Metrics({ - maxOldPeersRetention: 5, // Only keep track of 5 - connectionManager: new EventEmitter() + maxOldPeersRetention: 5 // Only keep track of 5 }) // Clone so trackedPeers isn't modified @@ -262,4 +254,22 @@ describe('Metrics', () => { expect(spy).to.have.property('callCount', 1) } }) + + it('should allow components to track metrics', () => { + const metrics = new Metrics({ + maxOldPeersRetention: 5 // Only keep track of 5 + }) + + expect(metrics.getComponentMetrics()).to.be.empty() + + const component = 'my-component' + const metric = 'some-metric' + const value = 1 + + metrics.updateMetric(component, metric, value) + + expect(metrics.getComponentMetrics()).to.have.lengthOf(1) + expect(metrics.getComponentMetrics().get(component)).to.have.lengthOf(1) + expect(metrics.getComponentMetrics().get(component).get(metric)).to.equal(value) + }) }) From 1bb1cb57d887754024254822824df2854768dd66 Mon Sep 17 00:00:00 2001 From: achingbrain Date: Mon, 13 Dec 2021 12:41:07 +0000 Subject: [PATCH 2/4] chore: try updated action --- .github/workflows/main.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index e9ecbb6043..c96240d6f6 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -19,7 +19,7 @@ jobs: - run: npx aegir lint - run: npx aegir build - run: npx aegir dep-check - - uses: ipfs/aegir/actions/bundle-size@v32.1.0 + - uses: ipfs/aegir/actions/bundle-size@fix/use-node-16-for-bundle-size name: size with: github_token: ${{ secrets.GITHUB_TOKEN }} From eb3d21f765232b2fd8764a84d0c2795fa1e0380d Mon Sep 17 00:00:00 2001 From: achingbrain Date: Mon, 13 Dec 2021 12:52:12 +0000 Subject: [PATCH 3/4] chore: change method name --- .github/workflows/main.yml | 2 +- src/connection-manager/index.js | 12 ++++++------ src/dialer/index.js | 12 ++++++------ src/metrics/index.js | 2 +- test/metrics/index.spec.js | 2 +- 5 files changed, 15 insertions(+), 15 deletions(-) diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index c96240d6f6..a82a8cd798 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -19,7 +19,7 @@ jobs: - run: npx aegir lint - run: npx aegir build - run: npx aegir dep-check - - uses: ipfs/aegir/actions/bundle-size@fix/use-node-16-for-bundle-size + - uses: ipfs/aegir/actions/bundle-size name: size with: github_token: ${{ secrets.GITHUB_TOKEN }} diff --git a/src/connection-manager/index.js b/src/connection-manager/index.js index 1b908e1633..c1f35b3661 100644 --- a/src/connection-manager/index.js +++ b/src/connection-manager/index.js @@ -166,8 +166,8 @@ class ConnectionManager extends EventEmitter { await Promise.all(tasks) this.connections.clear() - this._metrics && this._metrics.updateMetric(METRICS_COMPONENT, METRICS_PEER_CONNECTIONS, 0) - this._metrics && this._metrics.updateMetric(METRICS_COMPONENT, METRICS_ALL_CONNECTIONS, 0) + this._metrics && this._metrics.updateComponentMetric(METRICS_COMPONENT, METRICS_PEER_CONNECTIONS, 0) + this._metrics && this._metrics.updateComponentMetric(METRICS_COMPONENT, METRICS_ALL_CONNECTIONS, 0) } /** @@ -223,8 +223,8 @@ class ConnectionManager extends EventEmitter { storedConn.push(connection) } else { this.connections.set(peerIdStr, [connection]) - this._metrics && this._metrics.updateMetric(METRICS_COMPONENT, METRICS_PEER_CONNECTIONS, this.connections.size) - this._metrics && this._metrics.updateMetric(METRICS_COMPONENT, METRICS_ALL_CONNECTIONS, this.size) + this._metrics && this._metrics.updateComponentMetric(METRICS_COMPONENT, METRICS_PEER_CONNECTIONS, this.connections.size) + this._metrics && this._metrics.updateComponentMetric(METRICS_COMPONENT, METRICS_ALL_CONNECTIONS, this.size) } this._libp2p.peerStore.keyBook.set(peerId, peerId.pubKey) @@ -255,8 +255,8 @@ class ConnectionManager extends EventEmitter { this.emit('peer:disconnect', connection) } - this._metrics && this._metrics.updateMetric(METRICS_COMPONENT, METRICS_PEER_CONNECTIONS, this.connections.size) - this._metrics && this._metrics.updateMetric(METRICS_COMPONENT, METRICS_ALL_CONNECTIONS, this.size) + this._metrics && this._metrics.updateComponentMetric(METRICS_COMPONENT, METRICS_PEER_CONNECTIONS, this.connections.size) + this._metrics && this._metrics.updateComponentMetric(METRICS_COMPONENT, METRICS_ALL_CONNECTIONS, this.size) } /** diff --git a/src/dialer/index.js b/src/dialer/index.js index e6251be88e..e119d10e5c 100644 --- a/src/dialer/index.js +++ b/src/dialer/index.js @@ -112,8 +112,8 @@ class Dialer { } this._pendingDialTargets.clear() - this._metrics && this._metrics.updateMetric(METRICS_COMPONENT, METRICS_PENDING_DIALS, 0) - this._metrics && this._metrics.updateMetric(METRICS_COMPONENT, METRICS_PENDING_DIAL_TARGETS, 0) + this._metrics && this._metrics.updateComponentMetric(METRICS_COMPONENT, METRICS_PENDING_DIALS, 0) + this._metrics && this._metrics.updateComponentMetric(METRICS_COMPONENT, METRICS_PENDING_DIAL_TARGETS, 0) } /** @@ -163,7 +163,7 @@ class Dialer { const id = `${(parseInt(String(Math.random() * 1e9), 10)).toString() + Date.now()}` const cancellablePromise = new Promise((resolve, reject) => { this._pendingDialTargets.set(id, { resolve, reject }) - this._metrics && this._metrics.updateMetric(METRICS_COMPONENT, METRICS_PENDING_DIAL_TARGETS, this._pendingDialTargets.size) + this._metrics && this._metrics.updateComponentMetric(METRICS_COMPONENT, METRICS_PENDING_DIAL_TARGETS, this._pendingDialTargets.size) }) try { @@ -175,7 +175,7 @@ class Dialer { return dialTarget } finally { this._pendingDialTargets.delete(id) - this._metrics && this._metrics.updateMetric(METRICS_COMPONENT, METRICS_PENDING_DIAL_TARGETS, this._pendingDialTargets.size) + this._metrics && this._metrics.updateComponentMetric(METRICS_COMPONENT, METRICS_PENDING_DIAL_TARGETS, this._pendingDialTargets.size) } } @@ -264,12 +264,12 @@ class Dialer { destroy: () => { timeoutController.clear() this._pendingDials.delete(dialTarget.id) - this._metrics && this._metrics.updateMetric(METRICS_COMPONENT, METRICS_PENDING_DIALS, this._pendingDials.size) + this._metrics && this._metrics.updateComponentMetric(METRICS_COMPONENT, METRICS_PENDING_DIALS, this._pendingDials.size) } } this._pendingDials.set(dialTarget.id, pendingDial) - this._metrics && this._metrics.updateMetric(METRICS_COMPONENT, METRICS_PENDING_DIALS, this._pendingDials.size) + this._metrics && this._metrics.updateComponentMetric(METRICS_COMPONENT, METRICS_PENDING_DIALS, this._pendingDials.size) return pendingDial } diff --git a/src/metrics/index.js b/src/metrics/index.js index 8aa674d742..5ea3f96af8 100644 --- a/src/metrics/index.js +++ b/src/metrics/index.js @@ -95,7 +95,7 @@ class Metrics { return this._componentMetrics } - updateMetric (component, metric, value) { + updateComponentMetric (component, metric, value) { if (!this._componentMetrics.has(component)) { this._componentMetrics.set(component, new Map()) } diff --git a/test/metrics/index.spec.js b/test/metrics/index.spec.js index 1f50d2dfc4..ed17c57ef8 100644 --- a/test/metrics/index.spec.js +++ b/test/metrics/index.spec.js @@ -266,7 +266,7 @@ describe('Metrics', () => { const metric = 'some-metric' const value = 1 - metrics.updateMetric(component, metric, value) + metrics.updateComponentMetric(component, metric, value) expect(metrics.getComponentMetrics()).to.have.lengthOf(1) expect(metrics.getComponentMetrics().get(component)).to.have.lengthOf(1) From 490ac6108a13b48de73373ed76bf879d758a1b7b Mon Sep 17 00:00:00 2001 From: achingbrain Date: Wed, 15 Dec 2021 07:08:36 +0000 Subject: [PATCH 4/4] chore: pr comments --- src/connection-manager/index.js | 17 +++++++++-------- src/index.js | 5 ----- 2 files changed, 9 insertions(+), 13 deletions(-) diff --git a/src/connection-manager/index.js b/src/connection-manager/index.js index c1f35b3661..8467e26343 100644 --- a/src/connection-manager/index.js +++ b/src/connection-manager/index.js @@ -54,7 +54,6 @@ const METRICS_ALL_CONNECTIONS = 'all-connections' * @property {number} [defaultPeerValue = 1] - The value of the peer. * @property {boolean} [autoDial = true] - Should preemptively guarantee connections are above the low watermark. * @property {number} [autoDialInterval = 10000] - How often, in milliseconds, it should preemptively guarantee connections are above the low watermark. - * @property {import('../metrics')} [metrics] */ /** @@ -100,7 +99,6 @@ class ConnectionManager extends EventEmitter { this._started = false this._timer = null this._checkMetrics = this._checkMetrics.bind(this) - this._metrics = this._options.metrics this._latencyMonitor = new LatencyMonitor({ latencyCheckIntervalMs: this._options.pollInterval, @@ -166,8 +164,8 @@ class ConnectionManager extends EventEmitter { await Promise.all(tasks) this.connections.clear() - this._metrics && this._metrics.updateComponentMetric(METRICS_COMPONENT, METRICS_PEER_CONNECTIONS, 0) - this._metrics && this._metrics.updateComponentMetric(METRICS_COMPONENT, METRICS_ALL_CONNECTIONS, 0) + this._libp2p.metrics && this._libp2p.metrics.updateComponentMetric(METRICS_COMPONENT, METRICS_PEER_CONNECTIONS, 0) + this._libp2p.metrics && this._libp2p.metrics.updateComponentMetric(METRICS_COMPONENT, METRICS_ALL_CONNECTIONS, 0) } /** @@ -219,12 +217,13 @@ class ConnectionManager extends EventEmitter { const storedConn = this.connections.get(peerIdStr) this.emit('peer:connect', connection) + if (storedConn) { storedConn.push(connection) } else { this.connections.set(peerIdStr, [connection]) - this._metrics && this._metrics.updateComponentMetric(METRICS_COMPONENT, METRICS_PEER_CONNECTIONS, this.connections.size) - this._metrics && this._metrics.updateComponentMetric(METRICS_COMPONENT, METRICS_ALL_CONNECTIONS, this.size) + this._libp2p.metrics && this._libp2p.metrics.updateComponentMetric(METRICS_COMPONENT, METRICS_PEER_CONNECTIONS, this.connections.size) + this._libp2p.metrics && this._libp2p.metrics.updateComponentMetric(METRICS_COMPONENT, METRICS_ALL_CONNECTIONS, this.size) } this._libp2p.peerStore.keyBook.set(peerId, peerId.pubKey) @@ -253,10 +252,12 @@ class ConnectionManager extends EventEmitter { this.connections.delete(peerId) this._peerValues.delete(connection.remotePeer.toB58String()) this.emit('peer:disconnect', connection) + + this._libp2p.metrics && this._libp2p.metrics.onPeerDisconnected(connection.remotePeer) } - this._metrics && this._metrics.updateComponentMetric(METRICS_COMPONENT, METRICS_PEER_CONNECTIONS, this.connections.size) - this._metrics && this._metrics.updateComponentMetric(METRICS_COMPONENT, METRICS_ALL_CONNECTIONS, this.size) + this._libp2p.metrics && this._libp2p.metrics.updateComponentMetric(METRICS_COMPONENT, METRICS_PEER_CONNECTIONS, this.connections.size) + this._libp2p.metrics && this._libp2p.metrics.updateComponentMetric(METRICS_COMPONENT, METRICS_ALL_CONNECTIONS, this.size) } /** diff --git a/src/index.js b/src/index.js index dfcf80faf4..b9857e37d2 100644 --- a/src/index.js +++ b/src/index.js @@ -201,11 +201,6 @@ class Libp2p extends EventEmitter { ...this._options.metrics }) - // listen for peer disconnect events - this.connectionManager.on('peer:disconnect', (connection) => { - metrics.onPeerDisconnected(connection.remotePeer) - }) - this.metrics = metrics }