Skip to content

Commit

Permalink
fix: use keep-alive tag to reconnect to peers on startup (#1278)
Browse files Browse the repository at this point in the history
Instead of trying to connect to every peer in the peer store when
we start a node, only connect to the peers that have been marked
with a `keep-alive` tag.
  • Loading branch information
achingbrain committed Jun 28, 2022
1 parent b1b2b21 commit 2836acc
Show file tree
Hide file tree
Showing 9 changed files with 88 additions and 57 deletions.
1 change: 1 addition & 0 deletions doc/CONFIGURATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -510,6 +510,7 @@ Dialing in libp2p can be configured to limit the rate of dialing, and how long d
| dialTimeout | `number` | Second dial timeout per peer in ms. |
| resolvers | `object` | Dial [Resolvers](https://github.com/multiformats/js-multiaddr/blob/master/src/resolvers/index.js) for resolving multiaddrs |
| addressSorter | `(Array<Address>) => Array<Address>` | Sort the known addresses of a peer before trying to dial. |
| startupReconnectTimeout | `number` | When a node is restarted, we try to connect to any peers marked with the `keep-alive` tag up until to this timeout in ms is reached (default: 60000) |

The below configuration example shows how the dialer should be configured, with the current defaults:

Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@
"@libp2p/interface-peer-id": "^1.0.2",
"@libp2p/interface-peer-info": "^1.0.1",
"@libp2p/interface-peer-routing": "^1.0.0",
"@libp2p/interface-peer-store": "^1.0.0",
"@libp2p/interface-peer-store": "^1.2.0",
"@libp2p/interface-pubsub": "^1.0.3",
"@libp2p/interface-registrar": "^2.0.0",
"@libp2p/interface-stream-muxer": "^1.0.1",
Expand Down
47 changes: 47 additions & 0 deletions src/connection-manager/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import { Dialer } from './dialer/index.js'
import type { AddressSorter } from '@libp2p/interface-peer-store'
import type { Resolver } from '@multiformats/multiaddr'
import { PeerMap } from '@libp2p/peer-collections'
import { TimeoutController } from 'timeout-abort-controller'
import { KEEP_ALIVE } from '@libp2p/interface-peer-store/tags'

const log = logger('libp2p:connection-manager')

Expand All @@ -36,6 +38,7 @@ const defaultOptions: Partial<ConnectionManagerInit> = {

const METRICS_COMPONENT = 'connection-manager'
const METRICS_PEER_CONNECTIONS = 'peer-connections'
const STARTUP_RECONNECT_TIMEOUT = 60000

export interface ConnectionManagerInit {
/**
Expand Down Expand Up @@ -118,6 +121,12 @@ export interface ConnectionManagerInit {
* Multiaddr resolvers to use when dialing
*/
resolvers?: Record<string, Resolver>

/**
* On startup we try to dial any peer that has previously been
* tagged with KEEP_ALIVE up to this timeout in ms. (default: 60000)
*/
startupReconnectTimeout?: number
}

export interface ConnectionManagerEvents {
Expand All @@ -136,6 +145,8 @@ export class DefaultConnectionManager extends EventEmitter<ConnectionManagerEven
private started: boolean
private timer?: ReturnType<retimer>
private readonly latencyMonitor: LatencyMonitor
private readonly startupReconnectTimeout: number
private connectOnStartupController?: TimeoutController

constructor (init: ConnectionManagerInit) {
super()
Expand Down Expand Up @@ -174,6 +185,8 @@ export class DefaultConnectionManager extends EventEmitter<ConnectionManagerEven

this.onConnect = this.onConnect.bind(this)
this.onDisconnect = this.onDisconnect.bind(this)

this.startupReconnectTimeout = init.startupReconnectTimeout ?? STARTUP_RECONNECT_TIMEOUT
}

init (components: Components): void {
Expand Down Expand Up @@ -208,9 +221,43 @@ export class DefaultConnectionManager extends EventEmitter<ConnectionManagerEven
async afterStart () {
this.components.getUpgrader().addEventListener('connection', this.onConnect)
this.components.getUpgrader().addEventListener('connectionEnd', this.onDisconnect)

// re-connect to any peers with the KEEP_ALIVE tag
void Promise.resolve()
.then(async () => {
const keepAlivePeers: PeerId[] = []

for (const peer of await this.components.getPeerStore().all()) {
const tags = await this.components.getPeerStore().getTags(peer.id)
const hasKeepAlive = tags.filter(tag => tag.name === KEEP_ALIVE).length > 0

if (hasKeepAlive) {
keepAlivePeers.push(peer.id)
}
}

this.connectOnStartupController?.clear()
this.connectOnStartupController = new TimeoutController(this.startupReconnectTimeout)

await Promise.all(
keepAlivePeers.map(async peer => {
await this.openConnection(peer, {
signal: this.connectOnStartupController?.signal
})
.catch(err => {
log.error(err)
})
})
)
})
.finally(() => {
this.connectOnStartupController?.clear()
})
}

async beforeStop () {
// if we are still dialing KEEP_ALIVE peers, abort those dials
this.connectOnStartupController?.abort()
this.components.getUpgrader().removeEventListener('connection', this.onConnect)
this.components.getUpgrader().removeEventListener('connectionEnd', this.onDisconnect)
}
Expand Down
12 changes: 0 additions & 12 deletions src/libp2p.ts
Original file line number Diff line number Diff line change
Expand Up @@ -309,18 +309,6 @@ export class Libp2pNode extends EventEmitter<Libp2pEvents> implements Libp2p {
)

log('libp2p has started')

// Once we start, emit any peers we may have already discovered
// TODO: this should be removed, as we already discovered these peers in the past
await this.components.getPeerStore().forEach(peer => {
this.dispatchEvent(new CustomEvent<PeerInfo>('peer:discovery', {
detail: {
id: peer.id,
multiaddrs: peer.addresses.map(addr => addr.multiaddr),
protocols: peer.protocols
}
}))
})
} catch (err: any) {
log.error('An error occurred starting libp2p', err)
await this.stop()
Expand Down
27 changes: 26 additions & 1 deletion test/connection-manager/index.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import type { DefaultConnectionManager } from '../../src/connection-manager/inde
import { mockConnection, mockDuplex, mockMultiaddrConnection } from '@libp2p/interface-mocks'
import { createEd25519PeerId } from '@libp2p/peer-id-factory'
import { CustomEvent } from '@libp2p/interfaces/events'
import { KEEP_ALIVE } from '@libp2p/interface-peer-store/tags'

describe('Connection Manager', () => {
let libp2p: Libp2pNode
Expand Down Expand Up @@ -74,7 +75,7 @@ describe('Connection Manager', () => {
const connection = mockConnection(mockMultiaddrConnection(mockDuplex(), await createEd25519PeerId()))
const spy = sinon.spy(connection, 'close')

const value = Math.round(Math.random() * 100)
const value = i * 10
spies.set(value, spy)
await libp2p.peerStore.tagPeer(connection.remotePeer, 'test-tag', {
value
Expand Down Expand Up @@ -141,4 +142,28 @@ describe('Connection Manager', () => {
started: false
})).to.eventually.rejected('maxConnections must be greater')
})

it('should reconnect to important peers on startup', async () => {
const peerId = await createEd25519PeerId()

libp2p = await createNode({
config: createBaseOptions(),
started: false
})

const connectionManager = libp2p.components.getConnectionManager() as DefaultConnectionManager
const connectionManagerOpenConnectionSpy = sinon.spy(connectionManager, 'openConnection')

await libp2p.start()

expect(connectionManagerOpenConnectionSpy.called).to.be.false('Attempted to connect to peers')

await libp2p.peerStore.tagPeer(peerId, KEEP_ALIVE)

await libp2p.stop()
await libp2p.start()

expect(connectionManagerOpenConnectionSpy.called).to.be.true('Did not attempt to connect to important peer')
expect(connectionManagerOpenConnectionSpy.getCall(0).args[0].toString()).to.equal(peerId.toString(), 'Attempted to connect to the wrong peer')
})
})
4 changes: 4 additions & 0 deletions test/fetch/index.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import { CustomEvent } from '@libp2p/interfaces/events'
import { TimeoutController } from 'timeout-abort-controller'
import delay from 'delay'
import { pipe } from 'it-pipe'
import { PersistentPeerStore } from '@libp2p/peer-store'
import { MemoryDatastore } from 'datastore-core'

const defaultInit: FetchServiceInit = {
protocolPrefix: 'ipfs',
Expand All @@ -27,6 +29,8 @@ async function createComponents (index: number) {
peerId,
registrar: mockRegistrar(),
upgrader: mockUpgrader(),
peerStore: new PersistentPeerStore(),
datastore: new MemoryDatastore(),
connectionManager: new DefaultConnectionManager({
minConnections: 50,
maxConnections: 1000,
Expand Down
45 changes: 2 additions & 43 deletions test/peer-discovery/index.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,19 @@

import { expect } from 'aegir/chai'
import sinon from 'sinon'
import defer from 'p-defer'
import { Multiaddr } from '@multiformats/multiaddr'
import { createBaseOptions } from '../utils/base-options.browser.js'
import { createPeerId } from '../utils/creators/peer.js'
import { isPeerId, PeerId } from '@libp2p/interface-peer-id'
import type { PeerId } from '@libp2p/interface-peer-id'
import { createLibp2pNode, Libp2pNode } from '../../src/libp2p.js'
import { mockConnection, mockDuplex, mockMultiaddrConnection } from '@libp2p/interface-mocks'
import type { Startable } from '@libp2p/interfaces/startable'

describe('peer discovery', () => {
describe('basic functions', () => {
let peerId: PeerId
let remotePeerId: PeerId
let libp2p: Libp2pNode

before(async () => {
[peerId, remotePeerId] = await Promise.all([
createPeerId(),
createPeerId()
])
peerId = await createPeerId()
})

afterEach(async () => {
Expand All @@ -32,40 +25,6 @@ describe('peer discovery', () => {
sinon.reset()
})

it('should dial known peers on startup below the minConnections watermark', async () => {
libp2p = await createLibp2pNode(createBaseOptions({
peerId,
connectionManager: {
minConnections: 2
}
}))

await libp2p.peerStore.addressBook.set(remotePeerId, [new Multiaddr('/ip4/165.1.1.1/tcp/80')])

const deferred = defer()
sinon.stub(libp2p.components.getConnectionManager(), 'openConnection').callsFake(async (id) => {
if (!isPeerId(id)) {
throw new Error('Tried to dial something that was not a peer ID')
}

if (!remotePeerId.equals(id)) {
throw new Error('Tried to dial wrong peer ID')
}

deferred.resolve()
return mockConnection(mockMultiaddrConnection(mockDuplex(), id))
})

const spy = sinon.spy()
libp2p.addEventListener('peer:discovery', spy)

await libp2p.start()
await deferred.promise

expect(spy.calledOnce).to.equal(true)
expect(spy.getCall(0).args[0].detail.id.toString()).to.equal(remotePeerId.toString())
})

it('should stop discovery on libp2p start/stop', async () => {
let started = 0
let stopped = 0
Expand Down
4 changes: 4 additions & 0 deletions test/ping/index.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import { CustomEvent } from '@libp2p/interfaces/events'
import { TimeoutController } from 'timeout-abort-controller'
import delay from 'delay'
import { pipe } from 'it-pipe'
import { PersistentPeerStore } from '@libp2p/peer-store'
import { MemoryDatastore } from 'datastore-core'

const defaultInit: PingServiceInit = {
protocolPrefix: 'ipfs',
Expand All @@ -27,6 +29,8 @@ async function createComponents (index: number) {
peerId,
registrar: mockRegistrar(),
upgrader: mockUpgrader(),
peerStore: new PersistentPeerStore(),
datastore: new MemoryDatastore(),
connectionManager: new DefaultConnectionManager({
minConnections: 50,
maxConnections: 1000,
Expand Down
3 changes: 3 additions & 0 deletions test/utils/base-options.browser.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ export function createBaseOptions (overrides?: Libp2pOptions): Libp2pOptions {
hop: {
enabled: false
}
},
nat: {
enabled: false
}
}

Expand Down

0 comments on commit 2836acc

Please sign in to comment.