Skip to content

Commit

Permalink
fix: add timeout for incoming connections and build-in protocols (#1292)
Browse files Browse the repository at this point in the history
Ensure that we don't wait forever for upgrading an inbound connection
to occur.

Note that transports should return an AbortableSource when passed an
AbortSignal so outbound connections to not need the same fix.

Also adds default timeouts for the ping, fetch, and identify protocols.
  • Loading branch information
achingbrain authored Jul 14, 2022
1 parent b1b9139 commit 750ed9c
Show file tree
Hide file tree
Showing 20 changed files with 265 additions and 142 deletions.
4 changes: 2 additions & 2 deletions examples/webrtc-direct/test.js
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,8 @@ export async function test () {
selector => {
const text = document.querySelector(selector).innerText
return text.includes('libp2p id is') &&
text.includes('Found peer') &&
text.includes('Connected to')
text.includes('Found peer 12D3KooWCuo3MdXfMgaqpLC5Houi1TRoFqgK9aoxok4NK5udMu8m') &&
text.includes('Connected to 12D3KooWCuo3MdXfMgaqpLC5Houi1TRoFqgK9aoxok4NK5udMu8m')
},
'#output',
{ timeout: 10000 }
Expand Down
10 changes: 7 additions & 3 deletions src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ const DefaultConfig: Partial<Libp2pInit> = {
maxParallelDials: Constants.MAX_PARALLEL_DIALS,
maxDialsPerPeer: Constants.MAX_PER_PEER_DIALS,
dialTimeout: Constants.DIAL_TIMEOUT,
inboundUpgradeTimeout: Constants.INBOUND_UPGRADE_TIMEOUT,
resolvers: {
dnsaddr: dnsaddrResolver
},
Expand Down Expand Up @@ -79,7 +80,8 @@ const DefaultConfig: Partial<Libp2pInit> = {
host: {
agentVersion: AGENT_VERSION
},
timeout: 30000,
// https://github.com/libp2p/go-libp2p/blob/8d2e54e1637041d5cf4fac1e531287560bd1f4ac/p2p/protocol/identify/id.go#L48
timeout: 60000,
maxInboundStreams: 1,
maxOutboundStreams: 1,
maxPushIncomingStreams: 1,
Expand All @@ -88,12 +90,14 @@ const DefaultConfig: Partial<Libp2pInit> = {
ping: {
protocolPrefix: 'ipfs',
maxInboundStreams: 1,
maxOutboundStreams: 1
maxOutboundStreams: 1,
timeout: 10000
},
fetch: {
protocolPrefix: 'libp2p',
maxInboundStreams: 1,
maxOutboundStreams: 1
maxOutboundStreams: 1,
timeout: 10000
}
}

Expand Down
20 changes: 10 additions & 10 deletions src/connection-manager/dialer/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ export class Dialer implements Startable, Initializable {

log('creating dial target for %p', id)

const dialTarget = await this._createCancellableDialTarget(id)
const dialTarget = await this._createCancellableDialTarget(id, options)

if (dialTarget.addrs.length === 0) {
throw errCode(new Error('The dial request has no valid addresses'), codes.ERR_NO_VALID_ADDRESSES)
Expand Down Expand Up @@ -207,7 +207,7 @@ export class Dialer implements Startable, Initializable {
* The dial to the first address that is successfully able to upgrade a connection
* will be used.
*/
async _createCancellableDialTarget (peer: PeerId): Promise<DialTarget> {
async _createCancellableDialTarget (peer: PeerId, options: AbortOptions): Promise<DialTarget> {
// Make dial target promise cancellable
const id = `${(parseInt(String(Math.random() * 1e9), 10)).toString()}${Date.now()}`
const cancellablePromise = new Promise<DialTarget>((resolve, reject) => {
Expand All @@ -216,7 +216,7 @@ export class Dialer implements Startable, Initializable {

try {
const dialTarget = await Promise.race([
this._createDialTarget(peer),
this._createDialTarget(peer, options),
cancellablePromise
])

Expand All @@ -232,7 +232,7 @@ export class Dialer implements Startable, Initializable {
* If a multiaddr is received it should be the first address attempted.
* Multiaddrs not supported by the available transports will be filtered out.
*/
async _createDialTarget (peer: PeerId): Promise<DialTarget> {
async _createDialTarget (peer: PeerId, options: AbortOptions): Promise<DialTarget> {
const knownAddrs = await pipe(
await this.components.getPeerStore().addressBook.get(peer),
(source) => filter(source, async (address) => {
Expand All @@ -253,7 +253,7 @@ export class Dialer implements Startable, Initializable {

const addrs: Multiaddr[] = []
for (const a of knownAddrs) {
const resolvedAddrs = await this._resolve(a)
const resolvedAddrs = await this._resolve(a, options)
resolvedAddrs.forEach(ra => addrs.push(ra))
}

Expand Down Expand Up @@ -341,7 +341,7 @@ export class Dialer implements Startable, Initializable {
/**
* Resolve multiaddr recursively
*/
async _resolve (ma: Multiaddr): Promise<Multiaddr[]> {
async _resolve (ma: Multiaddr, options: AbortOptions): Promise<Multiaddr[]> {
// TODO: recursive logic should live in multiaddr once dns4/dns6 support is in place
// Now only supporting resolve for dnsaddr
const resolvableProto = ma.protoNames().includes('dnsaddr')
Expand All @@ -351,9 +351,9 @@ export class Dialer implements Startable, Initializable {
return [ma]
}

const resolvedMultiaddrs = await this._resolveRecord(ma)
const resolvedMultiaddrs = await this._resolveRecord(ma, options)
const recursiveMultiaddrs = await Promise.all(resolvedMultiaddrs.map(async (nm) => {
return await this._resolve(nm)
return await this._resolve(nm, options)
}))

const addrs = recursiveMultiaddrs.flat()
Expand All @@ -368,10 +368,10 @@ export class Dialer implements Startable, Initializable {
/**
* Resolve a given multiaddr. If this fails, an empty array will be returned
*/
async _resolveRecord (ma: Multiaddr): Promise<Multiaddr[]> {
async _resolveRecord (ma: Multiaddr, options: AbortOptions): Promise<Multiaddr[]> {
try {
ma = new Multiaddr(ma.toString()) // Use current multiaddr module
const multiaddrs = await ma.resolve()
const multiaddrs = await ma.resolve(options)
return multiaddrs
} catch (err) {
log.error(`multiaddr ${ma.toString()} could not be resolved`, err)
Expand Down
60 changes: 41 additions & 19 deletions src/connection-manager/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -107,10 +107,17 @@ export interface ConnectionManagerInit {
maxAddrsToDial?: number

/**
* How long a dial attempt is allowed to take
* How long a dial attempt is allowed to take, including DNS resolution
* of the multiaddr, opening a socket and upgrading it to a Connection.
*/
dialTimeout?: number

/**
* When a new inbound connection is opened, the upgrade process (e.g. protect,
* encrypt, multiplex etc) must complete within this number of ms.
*/
inboundUpgradeTimeout: number

/**
* Number of max concurrent dials per peer
*/
Expand Down Expand Up @@ -146,6 +153,7 @@ export class DefaultConnectionManager extends EventEmitter<ConnectionManagerEven
private readonly latencyMonitor: LatencyMonitor
private readonly startupReconnectTimeout: number
private connectOnStartupController?: TimeoutController
private readonly dialTimeout: number

constructor (init: ConnectionManagerInit) {
super()
Expand Down Expand Up @@ -182,6 +190,7 @@ export class DefaultConnectionManager extends EventEmitter<ConnectionManagerEven
this.onDisconnect = this.onDisconnect.bind(this)

this.startupReconnectTimeout = init.startupReconnectTimeout ?? STARTUP_RECONNECT_TIMEOUT
this.dialTimeout = init.dialTimeout ?? 30000
}

init (components: Components): void {
Expand Down Expand Up @@ -486,7 +495,7 @@ export class DefaultConnectionManager extends EventEmitter<ConnectionManagerEven
return conns
}

async openConnection (peerId: PeerId, options?: AbortOptions): Promise<Connection> {
async openConnection (peerId: PeerId, options: AbortOptions = {}): Promise<Connection> {
log('dial to %p', peerId)
const existingConnections = this.getConnections(peerId)

Expand All @@ -496,30 +505,43 @@ export class DefaultConnectionManager extends EventEmitter<ConnectionManagerEven
return existingConnections[0]
}

const connection = await this.dialer.dial(peerId, options)
let peerConnections = this.connections.get(peerId.toString())
let timeoutController: TimeoutController | undefined

if (peerConnections == null) {
peerConnections = []
this.connections.set(peerId.toString(), peerConnections)
if (options?.signal == null) {
timeoutController = new TimeoutController(this.dialTimeout)
options.signal = timeoutController.signal
}

// we get notified of connections via the Upgrader emitting "connection"
// events, double check we aren't already tracking this connection before
// storing it
let trackedConnection = false
try {
const connection = await this.dialer.dial(peerId, options)
let peerConnections = this.connections.get(peerId.toString())

for (const conn of peerConnections) {
if (conn.id === connection.id) {
trackedConnection = true
if (peerConnections == null) {
peerConnections = []
this.connections.set(peerId.toString(), peerConnections)
}
}

if (!trackedConnection) {
peerConnections.push(connection)
}
// we get notified of connections via the Upgrader emitting "connection"
// events, double check we aren't already tracking this connection before
// storing it
let trackedConnection = false

return connection
for (const conn of peerConnections) {
if (conn.id === connection.id) {
trackedConnection = true
}
}

if (!trackedConnection) {
peerConnections.push(connection)
}

return connection
} finally {
if (timeoutController != null) {
timeoutController.clear()
}
}
}

async closeConnections (peerId: PeerId): Promise<void> {
Expand Down
5 changes: 5 additions & 0 deletions src/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,11 @@
*/
export const DIAL_TIMEOUT = 30e3

/**
* How long in ms an inbound connection upgrade is allowed to take
*/
export const INBOUND_UPGRADE_TIMEOUT = 30e3

/**
* Maximum allowed concurrent dials
*/
Expand Down
29 changes: 23 additions & 6 deletions src/fetch/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,22 @@ import type { Stream } from '@libp2p/interface-connection'
import type { IncomingStreamData } from '@libp2p/interface-registrar'
import type { Components } from '@libp2p/components'
import type { AbortOptions } from '@libp2p/interfaces'
import type { Duplex } from 'it-stream-types'
import { abortableDuplex } from 'abortable-iterator'
import { pipe } from 'it-pipe'
import first from 'it-first'
import { TimeoutController } from 'timeout-abort-controller'

const log = logger('libp2p:fetch')

export interface FetchServiceInit {
protocolPrefix: string
maxInboundStreams: number
maxOutboundStreams: number

/**
* How long we should wait for a remote peer to send any data
*/
timeout: number
}

export interface HandleMessageOptions {
Expand Down Expand Up @@ -86,14 +91,22 @@ export class FetchService implements Startable {
log('dialing %s to %p', this.protocol, peer)

const connection = await this.components.getConnectionManager().openConnection(peer, options)
const stream = await connection.newStream([this.protocol], options)
let source: Duplex<Uint8Array> = stream
let timeoutController
let signal = options.signal

// make stream abortable if AbortSignal passed
if (options.signal != null) {
source = abortableDuplex(stream, options.signal)
// create a timeout if no abort signal passed
if (signal == null) {
timeoutController = new TimeoutController(this.init.timeout)
signal = timeoutController.signal
}

const stream = await connection.newStream([this.protocol], {
signal
})

// make stream abortable
const source = abortableDuplex(stream, signal)

try {
const result = await pipe(
[FetchRequest.encode({ identifier: key })],
Expand Down Expand Up @@ -129,6 +142,10 @@ export class FetchService implements Startable {

return result ?? null
} finally {
if (timeoutController != null) {
timeoutController.clear()
}

stream.close()
}
}
Expand Down
23 changes: 11 additions & 12 deletions src/identify/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,6 @@ import type { Duplex } from 'it-stream-types'

const log = logger('libp2p:identify')

// https://github.com/libp2p/go-libp2p/blob/8d2e54e1637041d5cf4fac1e531287560bd1f4ac/p2p/protocol/identify/id.go#L48
const IDENTIFY_TIMEOUT = 60000

// https://github.com/libp2p/go-libp2p/blob/8d2e54e1637041d5cf4fac1e531287560bd1f4ac/p2p/protocol/identify/id.go#L52
const MAX_IDENTIFY_MESSAGE_SIZE = 1024 * 8

Expand All @@ -54,7 +51,7 @@ export interface IdentifyServiceInit {
/**
* How long we should wait for a remote peer to send their identify response
*/
timeout?: number
timeout: number

/**
* Identify responses larger than this in bytes will be rejected (default: 8192)
Expand Down Expand Up @@ -167,7 +164,7 @@ export class IdentifyService implements Startable {
const protocols = await this.components.getPeerStore().protoBook.get(this.components.getPeerId())

const pushes = connections.map(async connection => {
const timeoutController = new TimeoutController(this.init.timeout ?? IDENTIFY_TIMEOUT)
const timeoutController = new TimeoutController(this.init.timeout)
let stream: Stream | undefined

try {
Expand Down Expand Up @@ -229,19 +226,21 @@ export class IdentifyService implements Startable {
}

async _identify (connection: Connection, options: AbortOptions = {}): Promise<Identify> {
const stream = await connection.newStream([this.identifyProtocolStr], options)
let source: Duplex<Uint8Array> = stream
let timeoutController
let signal = options.signal

// create a timeout if no abort signal passed
if (signal == null) {
timeoutController = new TimeoutController(this.init.timeout ?? IDENTIFY_TIMEOUT)
timeoutController = new TimeoutController(this.init.timeout)
signal = timeoutController.signal
}

// make stream abortable if AbortSignal passed
source = abortableDuplex(stream, signal)
const stream = await connection.newStream([this.identifyProtocolStr], {
signal
})

// make stream abortable
const source = abortableDuplex(stream, signal)

try {
const data = await pipe(
Expand Down Expand Up @@ -370,7 +369,7 @@ export class IdentifyService implements Startable {
*/
async _handleIdentify (data: IncomingStreamData) {
const { connection, stream } = data
const timeoutController = new TimeoutController(this.init.timeout ?? IDENTIFY_TIMEOUT)
const timeoutController = new TimeoutController(this.init.timeout)

try {
const publicKey = this.components.getPeerId().publicKey ?? new Uint8Array(0)
Expand Down Expand Up @@ -421,7 +420,7 @@ export class IdentifyService implements Startable {
*/
async _handlePush (data: IncomingStreamData) {
const { connection, stream } = data
const timeoutController = new TimeoutController(this.init.timeout ?? IDENTIFY_TIMEOUT)
const timeoutController = new TimeoutController(this.init.timeout)

let message: Identify | undefined
try {
Expand Down
3 changes: 2 additions & 1 deletion src/libp2p.ts
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,8 @@ export class Libp2pNode extends EventEmitter<Libp2pEvents> implements Libp2p {
// Set up the Upgrader
this.components.setUpgrader(new DefaultUpgrader(this.components, {
connectionEncryption: (init.connectionEncryption ?? []).map(component => this.configureComponent(component)),
muxers: (init.streamMuxers ?? []).map(component => this.configureComponent(component))
muxers: (init.streamMuxers ?? []).map(component => this.configureComponent(component)),
inboundUpgradeTimeout: init.connectionManager.inboundUpgradeTimeout
}))

// Create the Connection Manager
Expand Down
Loading

0 comments on commit 750ed9c

Please sign in to comment.