Skip to content

Commit

Permalink
refactor: remove use of timeout-abort-controller (#1708)
Browse files Browse the repository at this point in the history
  • Loading branch information
maschad committed May 1, 2023
1 parent 53b1645 commit 2a9617b
Show file tree
Hide file tree
Showing 13 changed files with 61 additions and 93 deletions.
1 change: 0 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,6 @@
"protons-runtime": "^5.0.0",
"rate-limiter-flexible": "^2.3.11",
"set-delayed-interval": "^1.0.0",
"timeout-abort-controller": "^3.0.0",
"uint8arraylist": "^2.3.2",
"uint8arrays": "^4.0.2",
"wherearewe": "^2.0.0",
Expand Down
27 changes: 13 additions & 14 deletions src/autonat/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@ import map from 'it-map'
import parallel from 'it-parallel'
import { pipe } from 'it-pipe'
import isPrivateIp from 'private-ip'
import { TimeoutController } from 'timeout-abort-controller'
import {
PROTOCOL
} from './constants.js'
import { Message } from './pb/index.js'
import { anySignal } from 'any-signal'

const log = logger('libp2p:autonat')

Expand Down Expand Up @@ -132,20 +132,20 @@ export class AutonatService implements Startable {
* Handle an incoming autonat request
*/
async handleIncomingAutonatStream (data: IncomingStreamData): Promise<void> {
const controller = new TimeoutController(this._init.timeout)
const signal = anySignal([AbortSignal.timeout(this._init.timeout)])

// this controller may be used while dialing lots of peers so prevent MaxListenersExceededWarning
// appearing in the console
try {
// fails on node < 15.4
setMaxListeners?.(Infinity, controller.signal)
setMaxListeners?.(Infinity, signal)
} catch {}

const ourHosts = this.components.addressManager.getAddresses()
.map(ma => ma.toOptions().host)

try {
const source = abortableDuplex(data.stream, controller.signal)
const source = abortableDuplex(data.stream, signal)
const self = this

await pipe(
Expand Down Expand Up @@ -319,7 +319,7 @@ export class AutonatService implements Startable {

try {
connection = await self.components.connectionManager.openConnection(multiaddr, {
signal: controller.signal
signal
})

if (!connection.remoteAddr.equals(multiaddr)) {
Expand Down Expand Up @@ -362,8 +362,8 @@ export class AutonatService implements Startable {
// can't tell the remote when a dial timed out..
data.stream
)
} finally {
controller.clear()
} catch (err) {
log.error(err)
}
}

Expand Down Expand Up @@ -401,13 +401,13 @@ export class AutonatService implements Startable {
return
}

const controller = new TimeoutController(this._init.timeout)
const signal = AbortSignal.timeout(this._init.timeout)

// this controller may be used while dialing lots of peers so prevent MaxListenersExceededWarning
// appearing in the console
try {
// fails on node < 15.4
setMaxListeners?.(Infinity, controller.signal)
setMaxListeners?.(Infinity, signal)
} catch {}

const self = this
Expand Down Expand Up @@ -436,13 +436,13 @@ export class AutonatService implements Startable {
log('Asking %p to verify multiaddr', peer.id)

const connection = await self.components.connectionManager.openConnection(peer.id, {
signal: controller.signal
signal
})

const stream = await connection.newStream(PROTOCOL, {
signal: controller.signal
signal
})
const source = abortableDuplex(stream, controller.signal)
const source = abortableDuplex(stream, signal)

const buf = await pipe(
[request],
Expand Down Expand Up @@ -493,7 +493,7 @@ export class AutonatService implements Startable {
}

for await (const dialResponse of parallel(map(this.components.peerRouting.getClosestPeers(randomCid, {
signal: controller.signal
signal
}), (peer) => async () => await verifyAddress(peer)), {
concurrency: REQUIRED_SUCCESSFUL_DIALS
})) {
Expand Down Expand Up @@ -557,7 +557,6 @@ export class AutonatService implements Startable {
}
}
} finally {
controller.clear()
this.verifyAddressTimeout = setTimeout(this._verifyExternalAddresses, this.refreshInterval)
}
}
Expand Down
26 changes: 10 additions & 16 deletions src/connection-manager/dial-queue.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import { logger } from '@libp2p/logger'
import { AbortError, CodeError } from '@libp2p/interfaces/errors'
import { Multiaddr, Resolver, resolvers } from '@multiformats/multiaddr'
import { TimeoutController } from 'timeout-abort-controller'
import { publicAddressesFirst } from '@libp2p/utils/address-sort'
import { codes } from '../errors.js'
import {
Expand All @@ -22,7 +21,7 @@ import PQueue from 'p-queue'
import { dnsaddrResolver } from '@multiformats/multiaddr/resolvers'
import { combineSignals, resolveMultiaddrs } from './utils.js'
import pDefer from 'p-defer'
import type { ClearableSignal } from 'any-signal'
import { ClearableSignal, anySignal } from 'any-signal'
import type { AddressSorter } from '@libp2p/interface-libp2p'

const log = logger('libp2p:connection-manager:dial-queue')
Expand Down Expand Up @@ -176,7 +175,7 @@ export class DialQueue {

// create abort conditions - need to do this before `calculateMultiaddrs` as we may be about to
// resolve a dns addr which can time out
const { timeoutController, signal } = this.createDialAbortControllers(options.signal)
const signal = this.createDialAbortControllers(options.signal)
let addrsToDial: Address[]

try {
Expand All @@ -187,7 +186,6 @@ export class DialQueue {
})
} catch (err) {
signal.clear()
timeoutController.clear()
throw err
}

Expand All @@ -210,7 +208,6 @@ export class DialQueue {
if (existingDial != null) {
log('joining existing dial target for %p', peerId)
signal.clear()
timeoutController.clear()
return await existingDial.promise
}

Expand All @@ -233,13 +230,12 @@ export class DialQueue {

// clean up abort signals/controllers
signal.clear()
timeoutController.clear()
})
.catch(err => {
log.error('dial failed to %s', addrsToDial.map(({ multiaddr }) => multiaddr.toString()).join(', '), err)

// Error is a timeout
if (timeoutController.signal.aborted) {
if (signal.aborted) {
const error = new CodeError(err.message, codes.ERR_TIMEOUT)
throw error
}
Expand All @@ -253,18 +249,16 @@ export class DialQueue {
return await pendingDial.promise
}

private createDialAbortControllers (userSignal?: AbortSignal): { timeoutController: TimeoutController, signal: ClearableSignal } {
// ensure we throw if the dial takes longer than the dial timeout
const timeoutController = new TimeoutController(this.dialTimeout)

private createDialAbortControllers (userSignal?: AbortSignal): ClearableSignal {
// let any signal abort the dial
const signal = combineSignals(
timeoutController.signal,
this.shutDownController.signal,
userSignal
const signal = anySignal(
[AbortSignal.timeout(this.dialTimeout),
this.shutDownController.signal,
userSignal
]
)

return { timeoutController, signal }
return signal
}

private async calculateMultiaddrs (peerId?: PeerId, addrs: Address[] = [], options: DialOptions = {}): Promise<Address[]> {
Expand Down
11 changes: 2 additions & 9 deletions src/fetch/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import type { AbortOptions } from '@libp2p/interfaces'
import { abortableDuplex } from 'abortable-iterator'
import { pipe } from 'it-pipe'
import first from 'it-first'
import { TimeoutController } from 'timeout-abort-controller'
import { setMaxListeners } from 'events'
import { fromString as uint8arrayFromString } from 'uint8arrays/from-string'
import { toString as uint8arrayToString } from 'uint8arrays/to-string'
Expand Down Expand Up @@ -139,19 +138,17 @@ class DefaultFetchService implements Startable, FetchService {
log('dialing %s to %p', this.protocol, peer)

const connection = await this.components.connectionManager.openConnection(peer, options)
let timeoutController
let signal = options.signal
let stream: Stream | undefined

// create a timeout if no abort signal passed
if (signal == null) {
log('using default timeout of %d ms', this.init.timeout)
timeoutController = new TimeoutController(this.init.timeout ?? DEFAULT_TIMEOUT)
signal = timeoutController.signal
signal = AbortSignal.timeout(this.init.timeout ?? DEFAULT_TIMEOUT)

try {
// fails on node < 15.4
setMaxListeners?.(Infinity, timeoutController.signal)
setMaxListeners?.(Infinity, signal)
} catch {}
}

Expand Down Expand Up @@ -203,10 +200,6 @@ class DefaultFetchService implements Startable, FetchService {

return result ?? null
} finally {
if (timeoutController != null) {
timeoutController.clear()
}

if (stream != null) {
stream.close()
}
Expand Down
31 changes: 12 additions & 19 deletions src/identify/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,14 @@ import type { IncomingStreamData, Registrar } from '@libp2p/interface-registrar'
import type { Connection, Stream } from '@libp2p/interface-connection'
import type { Startable } from '@libp2p/interfaces/startable'
import { peerIdFromKeys } from '@libp2p/peer-id'
import { TimeoutController } from 'timeout-abort-controller'
import type { AbortOptions } from '@libp2p/interfaces'
import { abortableDuplex } from 'abortable-iterator'
import { setMaxListeners } from 'events'
import type { ConnectionManager } from '@libp2p/interface-connection-manager'
import type { PeerId } from '@libp2p/interface-peer-id'
import type { Peer, PeerStore } from '@libp2p/interface-peer-store'
import type { AddressManager } from '@libp2p/interface-address-manager'
import { anySignal } from 'any-signal'
import type { EventEmitter } from '@libp2p/interfaces/events'
import type { Libp2pEvents } from '@libp2p/interface-libp2p'
import { toString as uint8ArrayToString } from 'uint8arrays/to-string'
Expand Down Expand Up @@ -233,20 +233,21 @@ class DefaultIdentifyService implements Startable, IdentifyService {

const pushes = connections.map(async connection => {
let stream: Stream | undefined
const timeoutController = new TimeoutController(this.timeout)

const signal = AbortSignal.timeout(this.timeout)

try {
// fails on node < 15.4
setMaxListeners?.(Infinity, timeoutController.signal)
setMaxListeners?.(Infinity, signal)
} catch {}

try {
stream = await connection.newStream([this.identifyPushProtocolStr], {
signal: timeoutController.signal
signal
})

// make stream abortable
const source = abortableDuplex(stream, timeoutController.signal)
const source = abortableDuplex(stream, signal)

await source.sink(pipe(
[Identify.encode({
Expand All @@ -265,8 +266,6 @@ class DefaultIdentifyService implements Startable, IdentifyService {
if (stream != null) {
stream.close()
}

timeoutController.clear()
}
})

Expand Down Expand Up @@ -306,18 +305,16 @@ class DefaultIdentifyService implements Startable, IdentifyService {
}

async _identify (connection: Connection, options: AbortOptions = {}): Promise<Identify> {
let timeoutController
let signal = options.signal
let stream: Stream | undefined

// create a timeout if no abort signal passed
if (signal == null) {
timeoutController = new TimeoutController(this.timeout)
signal = timeoutController.signal
signal = anySignal([AbortSignal.timeout(this.timeout), options.signal])

try {
// fails on node < 15.4
setMaxListeners?.(Infinity, timeoutController.signal)
setMaxListeners?.(Infinity, signal)
} catch {}
}

Expand Down Expand Up @@ -348,10 +345,6 @@ class DefaultIdentifyService implements Startable, IdentifyService {
throw new CodeError(String(err), codes.ERR_INVALID_MESSAGE)
}
} finally {
if (timeoutController != null) {
timeoutController.clear()
}

if (stream != null) {
stream.close()
}
Expand Down Expand Up @@ -401,11 +394,12 @@ class DefaultIdentifyService implements Startable, IdentifyService {
*/
async _handleIdentify (data: IncomingStreamData): Promise<void> {
const { connection, stream } = data
const timeoutController = new TimeoutController(this.timeout)

const signal = AbortSignal.timeout(this.timeout)

try {
// fails on node < 15.4
setMaxListeners?.(Infinity, timeoutController.signal)
setMaxListeners?.(Infinity, signal)
} catch {}

try {
Expand Down Expand Up @@ -435,15 +429,14 @@ class DefaultIdentifyService implements Startable, IdentifyService {
})

// make stream abortable
const source = abortableDuplex(stream, timeoutController.signal)
const source = abortableDuplex(stream, signal)

const msgWithLenPrefix = pipe([message], (source) => lp.encode(source))
await source.sink(msgWithLenPrefix)
} catch (err: any) {
log.error('could not respond to identify request', err)
} finally {
stream.close()
timeoutController.clear()
}
}

Expand Down
Loading

0 comments on commit 2a9617b

Please sign in to comment.