Skip to content

Commit

Permalink
fix: use labels to differentiate interfaces for metrics (libp2p#230)
Browse files Browse the repository at this point in the history
Instead of inserting the interface address into the metric name,
use the metric address as a label prefix for the value being reported.

This allows our metric names to be stable even if you don't
know the ip/port combo that will be used ahead of time.

The tradeoff is the label names may change between restarts if
the port number changes, but we have to apply a disambguator somewhere.

Depends on:

- [ ] libp2p/js-libp2p-prometheus-metrics#6
  • Loading branch information
achingbrain committed Nov 22, 2022
1 parent 6568f81 commit 6c4c316
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 36 deletions.
11 changes: 3 additions & 8 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ export interface TCPComponents {

export interface TCPMetrics {
dialerEvents: CounterGroup
listenerEvents: CounterGroup
}

class TCP implements Transport {
Expand All @@ -76,13 +75,9 @@ class TCP implements Transport {

if (components.metrics != null) {
this.metrics = {
dialerEvents: components.metrics.registerCounterGroup('libp2p_tcp_dialer_errors_total', {
dialerEvents: components.metrics.registerCounterGroup('libp2p_tcp_dialer_events_total', {
label: 'event',
help: 'Total count of TCP dialer errors by error type'
}),
listenerEvents: components.metrics.registerCounterGroup('libp2p_tcp_listener_errors_total', {
label: 'event',
help: 'Total count of TCP listener errors by error type'
help: 'Total count of TCP dialer events by type'
})
}
}
Expand Down Expand Up @@ -115,7 +110,7 @@ class TCP implements Transport {
})
log('new outbound connection %s', maConn.remoteAddr)
const conn = await options.upgrader.upgradeOutbound(maConn)
log('outbound connection %s upgraded', maConn.remoteAddr)
log('outbound connection upgraded %s', maConn.remoteAddr)
return conn
}

Expand Down
60 changes: 35 additions & 25 deletions src/listener.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import type { MultiaddrConnection, Connection } from '@libp2p/interface-connecti
import type { Upgrader, Listener, ListenerEvents } from '@libp2p/interface-transport'
import type { Multiaddr } from '@multiformats/multiaddr'
import type { TCPCreateListenerOptions } from './index.js'
import type { CounterGroup, Metric, Metrics } from '@libp2p/interface-metrics'
import type { CounterGroup, MetricGroup, Metrics } from '@libp2p/interface-metrics'

const log = logger('libp2p:tcp:listener')

Expand Down Expand Up @@ -39,7 +39,7 @@ const SERVER_STATUS_UP = 1
const SERVER_STATUS_DOWN = 0

export interface TCPListenerMetrics {
status: Metric
status: MetricGroup
errors: CounterGroup
events: CounterGroup
}
Expand All @@ -52,12 +52,14 @@ export class TCPListener extends EventEmitter<ListenerEvents> implements Listene
private readonly connections = new Set<MultiaddrConnection>()
private status: Status = { started: false }
private metrics?: TCPListenerMetrics
private addr: string

constructor (private readonly context: Context) {
super()

context.keepAlive = context.keepAlive ?? true

this.addr = 'unknown'
this.server = net.createServer(context, this.onSocket.bind(this))

// https://nodejs.org/api/net.html#servermaxconnections
Expand All @@ -72,49 +74,56 @@ export class TCPListener extends EventEmitter<ListenerEvents> implements Listene
if (context.metrics != null) {
// we are listening, register metrics for our port
const address = this.server.address()
let addr: string

if (address == null) {
addr = 'unknown'
this.addr = 'unknown'
} else if (typeof address === 'string') {
// unix socket
addr = address
this.addr = address
} else {
addr = `${address.address}:${address.port}`
this.addr = `${address.address}:${address.port}`
}

context.metrics?.registerMetric(`libp2p_tcp_connections_${addr}_total`, {
context.metrics?.registerMetricGroup('libp2p_tcp_inbound_connections_total', {
label: 'address',
help: 'Current active connections in TCP listener',
calculate: () => {
return this.connections.size
return {
[this.addr]: this.connections.size
}
}
})

this.metrics = {
status: context.metrics.registerMetric(`libp2p_tcp_${addr}_server_status_info`, {
help: 'Current status of the TCP server'
status: context.metrics.registerMetricGroup('libp2p_tcp_listener_status_info', {
label: 'address',
help: 'Current status of the TCP listener socket'
}),
errors: context.metrics.registerCounterGroup(`libp2p_tcp_${addr}_server_errors_total`, {
label: 'error',
help: 'Total count of TCP listener errors by error type'
errors: context.metrics.registerMetricGroup('libp2p_tcp_listener_errors_total', {
label: 'address',
help: 'Total count of TCP listener errors by type'
}),
events: context.metrics.registerCounterGroup(`libp2p_tcp_${addr}_socket_events_total`, {
label: 'event',
help: 'Total count of TCP socket events by event'
events: context.metrics.registerMetricGroup('libp2p_tcp_listener_events_total', {
label: 'address',
help: 'Total count of TCP listener events by type'
})
}

this.metrics?.status.update(SERVER_STATUS_UP)
this.metrics?.status.update({
[this.addr]: SERVER_STATUS_UP
})
}

this.dispatchEvent(new CustomEvent('listening'))
})
.on('error', err => {
this.metrics?.errors.increment({ listen_error: true })
this.metrics?.errors.increment({ [`${this.addr} listen_error`]: true })
this.dispatchEvent(new CustomEvent<Error>('error', { detail: err }))
})
.on('close', () => {
this.metrics?.status.update(SERVER_STATUS_DOWN)
this.metrics?.status.update({
[this.addr]: SERVER_STATUS_DOWN
})
this.dispatchEvent(new CustomEvent('close'))
})
}
Expand All @@ -123,7 +132,7 @@ export class TCPListener extends EventEmitter<ListenerEvents> implements Listene
// Avoid uncaught errors caused by unstable connections
socket.on('error', err => {
log('socket error', err)
this.metrics?.events.increment({ error: true })
this.metrics?.events.increment({ [`${this.addr} error`]: true })
})

let maConn: MultiaddrConnection
Expand All @@ -132,19 +141,20 @@ export class TCPListener extends EventEmitter<ListenerEvents> implements Listene
listeningAddr: this.status.started ? this.status.listeningAddr : undefined,
socketInactivityTimeout: this.context.socketInactivityTimeout,
socketCloseTimeout: this.context.socketCloseTimeout,
metrics: this.metrics?.events
metrics: this.metrics?.events,
metricPrefix: `${this.addr} `
})
} catch (err) {
log.error('inbound connection failed', err)
this.metrics?.errors.increment({ inbound_to_connection: true })
this.metrics?.errors.increment({ [`${this.addr} inbound_to_connection`]: true })
return
}

log('new inbound connection %s', maConn.remoteAddr)
try {
this.context.upgrader.upgradeInbound(maConn)
.then((conn) => {
log('inbound connection %s upgraded', maConn.remoteAddr)
log('inbound connection upgraded %s', maConn.remoteAddr)
this.connections.add(maConn)

socket.once('close', () => {
Expand All @@ -159,7 +169,7 @@ export class TCPListener extends EventEmitter<ListenerEvents> implements Listene
})
.catch(async err => {
log.error('inbound connection failed', err)
this.metrics?.errors.increment({ inbound_upgrade: true })
this.metrics?.errors.increment({ [`${this.addr} inbound_upgrade`]: true })

await attemptClose(maConn)
})
Expand All @@ -172,7 +182,7 @@ export class TCPListener extends EventEmitter<ListenerEvents> implements Listene
attemptClose(maConn)
.catch(err => {
log.error('closing inbound connection failed', err)
this.metrics?.errors.increment({ inbound_closing_failed: true })
this.metrics?.errors.increment({ [`${this.addr} inbound_closing_failed`]: true })
})
}
}
Expand Down
8 changes: 5 additions & 3 deletions src/socket-to-conn.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ interface ToConnectionOptions {
socketInactivityTimeout?: number
socketCloseTimeout?: number
metrics?: CounterGroup
metricPrefix?: string
}

/**
Expand All @@ -29,6 +30,7 @@ interface ToConnectionOptions {
*/
export const toMultiaddrConnection = (socket: Socket, options: ToConnectionOptions) => {
const metrics = options.metrics
const metricPrefix = options.metricPrefix ?? ''
const inactivityTimeout = options.socketInactivityTimeout ?? SOCKET_TIMEOUT
const closeTimeout = options.socketCloseTimeout ?? CLOSE_TIMEOUT

Expand Down Expand Up @@ -63,7 +65,7 @@ export const toMultiaddrConnection = (socket: Socket, options: ToConnectionOptio
// https://nodejs.org/dist/latest-v16.x/docs/api/net.html#socketsettimeouttimeout-callback
socket.setTimeout(inactivityTimeout, () => {
log('%s socket read timeout', lOptsStr)
metrics?.increment({ timeout: true })
metrics?.increment({ [`${metricPrefix}timeout`]: true })

// only destroy with an error if the remote has not sent the FIN message
let err: Error | undefined
Expand All @@ -78,7 +80,7 @@ export const toMultiaddrConnection = (socket: Socket, options: ToConnectionOptio

socket.once('close', () => {
log('%s socket read timeout', lOptsStr)
metrics?.increment({ close: true })
metrics?.increment({ [`${metricPrefix}close`]: true })

// In instances where `close` was not explicitly called,
// such as an iterable stream ending, ensure we have set the close
Expand All @@ -92,7 +94,7 @@ export const toMultiaddrConnection = (socket: Socket, options: ToConnectionOptio
// the remote sent a FIN packet which means no more data will be sent
// https://nodejs.org/dist/latest-v16.x/docs/api/net.html#event-end
log('socket ended', maConn.remoteAddr.toString())
metrics?.increment({ end: true })
metrics?.increment({ [`${metricPrefix}end`]: true })
})

const maConn: MultiaddrConnection = {
Expand Down

0 comments on commit 6c4c316

Please sign in to comment.