Skip to content

Commit

Permalink
fix: time out DHT network requests separately from query (#2524)
Browse files Browse the repository at this point in the history
Apply a per-request timeout to each network request in a DHT query.

To avoid having a "one size fits all" timeout, it is adaptive so will increase/decrease based on the average success/failure times during the previous (configurable, default 5s) time interval.
  • Loading branch information
achingbrain committed May 7, 2024
1 parent d9366f9 commit bfa7660
Show file tree
Hide file tree
Showing 8 changed files with 395 additions and 11 deletions.
6 changes: 6 additions & 0 deletions packages/kad-dht/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ import { removePrivateAddressesMapper, removePublicAddressesMapper, passthroughM
import type { ProvidersInit } from './providers.js'
import type { Libp2pEvents, ComponentLogger, TypedEventTarget, Metrics, PeerId, PeerInfo, PeerStore, RoutingOptions } from '@libp2p/interface'
import type { AddressManager, ConnectionManager, Registrar } from '@libp2p/interface-internal'
import type { AdaptiveTimeoutInit } from '@libp2p/utils/src/adaptive-timeout.js'
import type { Datastore } from 'interface-datastore'
import type { CID } from 'multiformats/cid'
import type { ProgressEvent } from 'progress-events'
Expand Down Expand Up @@ -405,6 +406,11 @@ export interface KadDHTInit {
* with this filter.
*/
peerInfoMapper?(peer: PeerInfo): PeerInfo

/**
* Dynamic network timeout settings for sending messages to peers
*/
networkDialTimeout?: Omit<AdaptiveTimeoutInit, 'metricsName' | 'metrics'>
}

export interface KadDHTComponents {
Expand Down
47 changes: 38 additions & 9 deletions packages/kad-dht/src/network.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { TypedEventEmitter } from '@libp2p/interface'
import { Libp2pRecord } from '@libp2p/record'
import { AdaptiveTimeout, type AdaptiveTimeoutInit } from '@libp2p/utils/adaptive-timeout'
import { pbStream } from 'it-protobuf-stream'
import { CodeError } from 'protons-runtime'
import { Message } from './message/dht.js'
Expand All @@ -16,6 +17,7 @@ import type { AbortOptions, Logger, Stream, PeerId, PeerInfo, Startable, Routing
export interface NetworkInit {
protocol: string
logPrefix: string
timeout?: Omit<AdaptiveTimeoutInit, 'metricsName' | 'metrics'>
}

interface NetworkEvents {
Expand All @@ -30,6 +32,7 @@ export class Network extends TypedEventEmitter<NetworkEvents> implements Startab
private readonly protocol: string
private running: boolean
private readonly components: KadDHTComponents
private readonly timeout: AdaptiveTimeout

/**
* Create a new network
Expand All @@ -42,6 +45,11 @@ export class Network extends TypedEventEmitter<NetworkEvents> implements Startab
this.log = components.logger.forComponent(`${init.logPrefix}:network`)
this.running = false
this.protocol = protocol
this.timeout = new AdaptiveTimeout({
...(init.timeout ?? {}),
metrics: components.metrics,
metricName: `${init.logPrefix.replaceAll(':', '_')}_network_message_send_times_milliseconds`
})
}

/**
Expand Down Expand Up @@ -88,13 +96,24 @@ export class Network extends TypedEventEmitter<NetworkEvents> implements Startab
yield sendQueryEvent({ to, type }, options)

let stream: Stream | undefined
const signal = this.timeout.getTimeoutSignal(options)

options = {
...options,
signal
}

try {
const connection = await this.components.connectionManager.openConnection(to, options)
const stream = await connection.newStream(this.protocol, options)

stream = await connection.newStream(this.protocol, options)
const response = await this._writeReadMessage(stream, msg, options)

stream.close(options)
.catch(err => {
this.log.error('error closing stream to %p', to, err)
stream?.abort(err)
})

yield peerResponseEvent({
from: to,
messageType: response.type,
Expand All @@ -103,12 +122,11 @@ export class Network extends TypedEventEmitter<NetworkEvents> implements Startab
record: response.record == null ? undefined : Libp2pRecord.deserialize(response.record)
}, options)
} catch (err: any) {
stream?.abort(err)
this.log.error('could not send %s to %p', msg.type, to, err)
yield queryErrorEvent({ from: to, error: err }, options)
} finally {
if (stream != null) {
await stream.close()
}
this.timeout.cleanUp(signal)
}
}

Expand All @@ -131,20 +149,31 @@ export class Network extends TypedEventEmitter<NetworkEvents> implements Startab
yield sendQueryEvent({ to, type }, options)

let stream: Stream | undefined
const signal = this.timeout.getTimeoutSignal(options)

options = {
...options,
signal
}

try {
const connection = await this.components.connectionManager.openConnection(to, options)
const stream = await connection.newStream(this.protocol, options)
stream = await connection.newStream(this.protocol, options)

await this._writeMessage(stream, msg, options)

stream.close(options)
.catch(err => {
this.log.error('error closing stream to %p', to, err)
stream?.abort(err)
})

yield peerResponseEvent({ from: to, messageType: type }, options)
} catch (err: any) {
stream?.abort(err)
yield queryErrorEvent({ from: to, error: err }, options)
} finally {
if (stream != null) {
await stream.close()
}
this.timeout.cleanUp(signal)
}
}

Expand Down
8 changes: 6 additions & 2 deletions packages/kad-dht/test/libp2p-routing.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,15 @@ const PROTOCOL = '/test/dht/1.0.0'

function createStreams (peerId: PeerId, components: StubbedKadDHTComponents): { connection: Connection, incomingStream: Stream } {
const duplex = duplexPair<any>()
const outgoingStream = stubInterface<Stream>()
const outgoingStream = stubInterface<Stream>({
close: async () => {}
})
outgoingStream.source = duplex[0].source
outgoingStream.sink.callsFake(async source => duplex[0].sink(source))

const incomingStream = stubInterface<Stream>()
const incomingStream = stubInterface<Stream>({
close: async () => {}
})
incomingStream.source = duplex[1].source
incomingStream.sink.callsFake(async source => duplex[1].sink(source))

Expand Down
8 changes: 8 additions & 0 deletions packages/utils/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@
"types": "./dist/src/address-sort.d.ts",
"import": "./dist/src/address-sort.js"
},
"./adaptive-timeout": {
"types": "./dist/src/adaptive-timeout.d.ts",
"import": "./dist/src/adaptive-timeout.js"
},
"./array-equals": {
"types": "./dist/src/array-equals.d.ts",
"import": "./dist/src/array-equals.js"
Expand All @@ -80,6 +84,10 @@
"types": "./dist/src/is-promise.d.ts",
"import": "./dist/src/is-promise.js"
},
"./moving-average": {
"types": "./dist/src/moving-average.d.ts",
"import": "./dist/src/moving-average.js"
},
"./multiaddr/is-loopback": {
"types": "./dist/src/multiaddr/is-loopback.d.ts",
"import": "./dist/src/multiaddr/is-loopback.js"
Expand Down
94 changes: 94 additions & 0 deletions packages/utils/src/adaptive-timeout.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
import { setMaxListeners } from '@libp2p/interface'
import { anySignal, type ClearableSignal } from 'any-signal'
import { MovingAverage } from './moving-average.js'
import type { MetricGroup, Metrics } from '@libp2p/interface'

export const DEFAULT_TIMEOUT_MULTIPLIER = 1.2
export const DEFAULT_FAILURE_MULTIPLIER = 2
export const DEFAULT_MIN_TIMEOUT = 2000

export interface AdaptiveTimeoutSignal extends ClearableSignal {
start: number
timeout: number
}

export interface AdaptiveTimeoutInit {
metricName?: string
metrics?: Metrics
interval?: number
initialValue?: number
timeoutMultiplier?: number
failureMultiplier?: number
minTimeout?: number
}

export interface GetTimeoutSignalOptions {
timeoutFactor?: number
signal?: AbortSignal
}

export class AdaptiveTimeout {
private readonly success: MovingAverage
private readonly failure: MovingAverage
private readonly next: MovingAverage
private readonly metric?: MetricGroup
private readonly timeoutMultiplier: number
private readonly failureMultiplier: number
private readonly minTimeout: number

constructor (init: AdaptiveTimeoutInit = {}) {
this.success = new MovingAverage(init.interval ?? 5000)
this.failure = new MovingAverage(init.interval ?? 5000)
this.next = new MovingAverage(init.interval ?? 5000)
this.failureMultiplier = init.failureMultiplier ?? DEFAULT_FAILURE_MULTIPLIER
this.timeoutMultiplier = init.timeoutMultiplier ?? DEFAULT_TIMEOUT_MULTIPLIER
this.minTimeout = init.minTimeout ?? DEFAULT_MIN_TIMEOUT

if (init.metricName != null) {
this.metric = init.metrics?.registerMetricGroup(init.metricName)
}
}

getTimeoutSignal (options: GetTimeoutSignalOptions = {}): AdaptiveTimeoutSignal {
// calculate timeout for individual peers based on moving average of
// previous successful requests
const timeout = Math.max(
Math.round(this.next.movingAverage * (options.timeoutFactor ?? this.timeoutMultiplier)),
this.minTimeout
)
const sendTimeout = AbortSignal.timeout(timeout)
const timeoutSignal = anySignal([options.signal, sendTimeout]) as AdaptiveTimeoutSignal
setMaxListeners(Infinity, timeoutSignal, sendTimeout)

timeoutSignal.start = Date.now()
timeoutSignal.timeout = timeout

return timeoutSignal
}

cleanUp (signal: AdaptiveTimeoutSignal): void {
const time = Date.now() - signal.start

if (signal.aborted) {
this.failure.push(time)
this.next.push(time * this.failureMultiplier)
this.metric?.update({
failureMovingAverage: this.failure.movingAverage,
failureDeviation: this.failure.deviation,
failureForecast: this.failure.forecast,
failureVariance: this.failure.variance,
failure: time
})
} else {
this.success.push(time)
this.next.push(time)
this.metric?.update({
successMovingAverage: this.success.movingAverage,
successDeviation: this.success.deviation,
successForecast: this.success.forecast,
successVariance: this.success.variance,
success: time
})
}
}
}
45 changes: 45 additions & 0 deletions packages/utils/src/moving-average.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/**
* Implements exponential moving average. Ported from `moving-average`.
*
* @see https://en.wikipedia.org/wiki/Moving_average#Exponential_moving_average
* @see https://www.npmjs.com/package/moving-average
*/
export class MovingAverage {
public movingAverage: number
public variance: number
public deviation: number
public forecast: number
private readonly timespan: number
private previousTime?: number

constructor (timespan: number) {
this.timespan = timespan
this.movingAverage = 0
this.variance = 0
this.deviation = 0
this.forecast = 0
}

alpha (t: number, pt: number): number {
return 1 - (Math.exp(-(t - pt) / this.timespan))
}

push (value: number, time: number = Date.now()): void {
if (this.previousTime != null) {
// calculate moving average
const a = this.alpha(time, this.previousTime)
const diff = value - this.movingAverage
const incr = a * diff
this.movingAverage = a * value + (1 - a) * this.movingAverage
// calculate variance & deviation
this.variance = (1 - a) * (this.variance + diff * incr)
this.deviation = Math.sqrt(this.variance)
// calculate forecast
this.forecast = this.movingAverage + a * diff
} else {
this.movingAverage = value
}

this.previousTime = time
}
}

0 comments on commit bfa7660

Please sign in to comment.