Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(libp2p): add autodial retry threshold config option #1943

Merged
merged 7 commits into from
Aug 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 26 additions & 4 deletions packages/libp2p/src/connection-manager/auto-dial.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import { logger } from '@libp2p/logger'
import { PeerMap, PeerSet } from '@libp2p/peer-collections'
import { toString as uint8ArrayToString } from 'uint8arrays/to-string'
import { PeerJobQueue } from '../utils/peer-job-queue.js'
import { AUTO_DIAL_CONCURRENCY, AUTO_DIAL_INTERVAL, AUTO_DIAL_MAX_QUEUE_LENGTH, AUTO_DIAL_PRIORITY, MIN_CONNECTIONS } from './constants.js'
import { AUTO_DIAL_CONCURRENCY, AUTO_DIAL_INTERVAL, AUTO_DIAL_MAX_QUEUE_LENGTH, AUTO_DIAL_PEER_RETRY_THRESHOLD, AUTO_DIAL_PRIORITY, LAST_DIAL_FAILURE_KEY, MIN_CONNECTIONS } from './constants.js'
import type { Libp2pEvents } from '@libp2p/interface'
import type { EventEmitter } from '@libp2p/interface/events'
import type { PeerStore } from '@libp2p/interface/peer-store'
Expand All @@ -16,6 +17,7 @@ interface AutoDialInit {
autoDialConcurrency?: number
autoDialPriority?: number
autoDialInterval?: number
autoDialPeerRetryThreshold?: number
}

interface AutoDialComponents {
Expand All @@ -29,7 +31,8 @@ const defaultOptions = {
maxQueueLength: AUTO_DIAL_MAX_QUEUE_LENGTH,
autoDialConcurrency: AUTO_DIAL_CONCURRENCY,
autoDialPriority: AUTO_DIAL_PRIORITY,
autoDialInterval: AUTO_DIAL_INTERVAL
autoDialInterval: AUTO_DIAL_INTERVAL,
autoDialPeerRetryThreshold: AUTO_DIAL_PEER_RETRY_THRESHOLD
}

export class AutoDial implements Startable {
Expand All @@ -40,6 +43,7 @@ export class AutoDial implements Startable {
private readonly autoDialPriority: number
private readonly autoDialIntervalMs: number
private readonly autoDialMaxQueueLength: number
private readonly autoDialPeerRetryThresholdMs: number
private autoDialInterval?: ReturnType<typeof setInterval>
private started: boolean
private running: boolean
Expand All @@ -56,6 +60,7 @@ export class AutoDial implements Startable {
this.autoDialPriority = init.autoDialPriority ?? defaultOptions.autoDialPriority
this.autoDialIntervalMs = init.autoDialInterval ?? defaultOptions.autoDialInterval
this.autoDialMaxQueueLength = init.maxQueueLength ?? defaultOptions.maxQueueLength
this.autoDialPeerRetryThresholdMs = init.autoDialPeerRetryThreshold ?? defaultOptions.autoDialPeerRetryThreshold
this.started = false
this.running = false
this.queue = new PeerJobQueue({
Expand Down Expand Up @@ -207,9 +212,26 @@ export class AutoDial implements Startable {
return 0
})

log('selected %d/%d peers to dial', sortedPeers.length, peers.length)
const peersThatHaveNotFailed = sortedPeers.filter(peer => {
const lastDialFailure = peer.metadata.get(LAST_DIAL_FAILURE_KEY)

for (const peer of sortedPeers) {
if (lastDialFailure == null) {
return true
}

const lastDialFailureTimestamp = parseInt(uint8ArrayToString(lastDialFailure))

if (isNaN(lastDialFailureTimestamp)) {
return true
}

// only dial if the time since the last failure is above the retry threshold
return Date.now() - lastDialFailureTimestamp > this.autoDialPeerRetryThresholdMs
})

log('selected %d/%d peers to dial', peersThatHaveNotFailed.length, peers.length)

for (const peer of peersThatHaveNotFailed) {
this.queue.add(async () => {
const numConnections = this.connectionManager.getConnectionsMap().size

Expand Down
15 changes: 15 additions & 0 deletions packages/libp2p/src/connection-manager/constants.defaults.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ export const AUTO_DIAL_PRIORITY = 0
*/
export const AUTO_DIAL_MAX_QUEUE_LENGTH = 100

/**
* @see https://libp2p.github.io/js-libp2p/interfaces/libp2p.index.unknown.ConnectionManagerInit.html#autoDialPeerRetryThreshold
*/
export const AUTO_DIAL_PEER_RETRY_THRESHOLD = 1000 * 60

/**
* @see https://libp2p.github.io/js-libp2p/interfaces/index._internal_.ConnectionManagerConfig.html#inboundConnectionThreshold
*/
Expand All @@ -42,3 +47,13 @@ export const INBOUND_CONNECTION_THRESHOLD = 5
* @see https://libp2p.github.io/js-libp2p/interfaces/index._internal_.ConnectionManagerConfig.html#maxIncomingPendingConnections
*/
export const MAX_INCOMING_PENDING_CONNECTIONS = 10

/**
* Store as part of the peer store metadata for a given peer, the value for this
* key is a timestamp of the last time a dial attempted failed with the relevant
* peer stored as a string.
*
* Used to insure we do not endlessly try to auto dial peers we have recently
* failed to dial.
*/
export const LAST_DIAL_FAILURE_KEY = 'last-dial-failure'
19 changes: 17 additions & 2 deletions packages/libp2p/src/connection-manager/dial-queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,15 @@ import { dnsaddrResolver } from '@multiformats/multiaddr/resolvers'
import { type ClearableSignal, anySignal } from 'any-signal'
import pDefer from 'p-defer'
import PQueue from 'p-queue'
import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string'
import { codes } from '../errors.js'
import { getPeerAddress } from '../get-peer.js'
import {
DIAL_TIMEOUT,
MAX_PARALLEL_DIALS_PER_PEER,
MAX_PARALLEL_DIALS,
MAX_PEER_ADDRS_TO_DIAL
MAX_PEER_ADDRS_TO_DIAL,
LAST_DIAL_FAILURE_KEY
} from './constants.js'
import { combineSignals, resolveMultiaddrs } from './utils.js'
import type { AddressSorter, AbortOptions, PendingDial } from '@libp2p/interface'
Expand Down Expand Up @@ -230,9 +232,22 @@ export class DialQueue {
// clean up abort signals/controllers
signal.clear()
})
.catch(err => {
.catch(async err => {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if i'm not mistaken, this dial-queue is also used for direct dials, should we discern between an auto-dial triggered dial vs a manually/user requested dial?

log.error('dial failed to %s', pendingDial.multiaddrs.map(ma => ma.toString()).join(', '), err)

if (peerId != null) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if (peerId != null) {
if (peerId != null && priority < someVal) {

to address manual dials? or do we want a specific dial option to flag whether we will update the LAST_DIAL_FAILURE_KEY for a peer that we only use with auto-dialer?

// record the last failed dial
try {
await this.peerStore.patch(peerId, {
maschad marked this conversation as resolved.
Show resolved Hide resolved
metadata: {
[LAST_DIAL_FAILURE_KEY]: uint8ArrayFromString(Date.now().toString())
}
})
} catch (err: any) {
log.error('could not update last dial failure key for %p', peerId, err)
}
}

// Error is a timeout
if (signal.aborted) {
const error = new CodeError(err.message, codes.ERR_TIMEOUT)
Expand Down
6 changes: 6 additions & 0 deletions packages/libp2p/src/connection-manager/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,12 @@ export interface ConnectionManagerInit {
*/
autoDialMaxQueueLength?: number

/**
* When we've failed to dial a peer, do not autodial them again within this
* number of ms. (default: 1 minute)
*/
autoDialPeerRetryThreshold?: number

/**
* Sort the known addresses of a peer before trying to dial, By default public
* addresses will be dialled before private (e.g. loopback or LAN) addresses.
Expand Down
67 changes: 67 additions & 0 deletions packages/libp2p/test/connection-manager/auto-dial.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@ import delay from 'delay'
import pWaitFor from 'p-wait-for'
import Sinon from 'sinon'
import { stubInterface } from 'sinon-ts'
import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string'
import { AutoDial } from '../../src/connection-manager/auto-dial.js'
import { LAST_DIAL_FAILURE_KEY } from '../../src/connection-manager/constants.js'
import { matchPeerId } from '../fixtures/match-peer-id.js'
import type { Libp2pEvents } from '@libp2p/interface'
import type { Connection } from '@libp2p/interface/connection'
Expand Down Expand Up @@ -224,4 +226,69 @@ describe('auto-dial', () => {
// should only have queried peer store once
expect(peerStoreAllSpy.callCount).to.equal(1)
})

it('should not re-dial peers we have recently failed to dial', async () => {
const peerWithAddress: Peer = {
id: await createEd25519PeerId(),
protocols: [],
addresses: [{
multiaddr: multiaddr('/ip4/127.0.0.1/tcp/4001'),
isCertified: true
}],
metadata: new Map(),
tags: new Map()
}
const undialablePeer: Peer = {
id: await createEd25519PeerId(),
protocols: [],
addresses: [{
multiaddr: multiaddr('/ip4/127.0.0.1/tcp/4002'),
isCertified: true
}],
// we failed to dial them recently
metadata: new Map([[LAST_DIAL_FAILURE_KEY, uint8ArrayFromString(`${Date.now() - 10}`)]]),
tags: new Map()
}

await peerStore.save(peerWithAddress.id, peerWithAddress)
await peerStore.save(undialablePeer.id, undialablePeer)

const connectionManager = stubInterface<ConnectionManager>({
getConnectionsMap: new PeerMap(),
getDialQueue: []
})

autoDialler = new AutoDial({
peerStore,
connectionManager,
events
}, {
minConnections: 10,
autoDialPeerRetryThreshold: 2000
})
autoDialler.start()

void autoDialler.autoDial()

await pWaitFor(() => {
return connectionManager.openConnection.callCount === 1
})

expect(connectionManager.openConnection.callCount).to.equal(1)
expect(connectionManager.openConnection.calledWith(matchPeerId(peerWithAddress.id))).to.be.true()
expect(connectionManager.openConnection.calledWith(matchPeerId(undialablePeer.id))).to.be.false()

// pass the retry threshold
await delay(2000)

// autodial again
void autoDialler.autoDial()

await pWaitFor(() => {
return connectionManager.openConnection.callCount === 3
})

// should have retried the unreachable peer
expect(connectionManager.openConnection.calledWith(matchPeerId(undialablePeer.id))).to.be.true()
})
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should add another test to ensure that the undialable peer is eventually dialled once the threshold has expired.

})
13 changes: 13 additions & 0 deletions packages/libp2p/test/connection-manager/direct.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import { pEvent } from 'p-event'
import sinon from 'sinon'
import { stubInterface } from 'sinon-ts'
import { defaultComponents, type Components } from '../../src/components.js'
import { LAST_DIAL_FAILURE_KEY } from '../../src/connection-manager/constants.js'
import { DefaultConnectionManager } from '../../src/connection-manager/index.js'
import { codes as ErrorCodes } from '../../src/errors.js'
import { type IdentifyService, identifyService } from '../../src/identify/index.js'
Expand Down Expand Up @@ -104,6 +105,18 @@ describe('dialing (direct, WebSockets)', () => {
.and.to.have.nested.property('.code', ErrorCodes.ERR_NO_VALID_ADDRESSES)
})

it('should mark a peer as having recently failed to connect', async () => {
connectionManager = new DefaultConnectionManager(localComponents)
await connectionManager.start()

await expect(connectionManager.openConnection(multiaddr(`/ip4/127.0.0.1/tcp/12984/ws/p2p/${remoteComponents.peerId.toString()}`)))
.to.eventually.be.rejected()

const peer = await localComponents.peerStore.get(remoteComponents.peerId)

expect(peer.metadata.has(LAST_DIAL_FAILURE_KEY)).to.be.true()
})

it('should be able to connect to a given peer', async () => {
connectionManager = new DefaultConnectionManager(localComponents)
await connectionManager.start()
Expand Down
Loading