Skip to content

Commit

Permalink
fix: MaxListenersExceeded warning (#1297)
Browse files Browse the repository at this point in the history
Where we create signals that are passed down the stack, increase the max listeners to prevent warnings in the console.
  • Loading branch information
achingbrain committed Jul 17, 2022
1 parent ba56c64 commit 627b8bf
Show file tree
Hide file tree
Showing 7 changed files with 67 additions and 1 deletion.
6 changes: 6 additions & 0 deletions src/circuit/transport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import type { Connection } from '@libp2p/interface-connection'
import type { RelayConfig } from '../index.js'
import { abortableDuplex } from 'abortable-iterator'
import { TimeoutController } from 'timeout-abort-controller'
import { setMaxListeners } from 'events'

const log = logger('libp2p:circuit')

Expand Down Expand Up @@ -64,6 +65,11 @@ export class Circuit implements Transport, Initializable {
const { connection, stream } = data
const controller = new TimeoutController(this._init.hop.timeout)

try {
// fails on node < 15.4
setMaxListeners?.(Infinity, controller.signal)
} catch {}

try {
const source = abortableDuplex(stream, controller.signal)
const streamHandler = new StreamHandler({
Expand Down
6 changes: 6 additions & 0 deletions src/connection-manager/dialer/auto-dialer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import type { PeerInfo } from '@libp2p/interface-peer-info'
import { logger } from '@libp2p/logger'
import type { Components } from '@libp2p/components'
import { TimeoutController } from 'timeout-abort-controller'
import { setMaxListeners } from 'events'

const log = logger('libp2p:dialer:auto-dialer')

Expand Down Expand Up @@ -44,6 +45,11 @@ export class AutoDialer {

const controller = new TimeoutController(this.dialTimeout)

try {
// fails on node < 15.4
setMaxListeners?.(Infinity, controller.signal)
} catch {}

void this.components.getConnectionManager().openConnection(peer.id, {
signal: controller.signal
})
Expand Down
10 changes: 10 additions & 0 deletions src/connection-manager/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,11 @@ export class DefaultConnectionManager extends EventEmitter<ConnectionManagerEven
this.connectOnStartupController?.clear()
this.connectOnStartupController = new TimeoutController(this.startupReconnectTimeout)

try {
// fails on node < 15.4
setMaxListeners?.(Infinity, this.connectOnStartupController.signal)
} catch {}

await Promise.all(
keepAlivePeers.map(async peer => {
await this.openConnection(peer, {
Expand Down Expand Up @@ -510,6 +515,11 @@ export class DefaultConnectionManager extends EventEmitter<ConnectionManagerEven
if (options?.signal == null) {
timeoutController = new TimeoutController(this.dialTimeout)
options.signal = timeoutController.signal

try {
// fails on node < 15.4
setMaxListeners?.(Infinity, timeoutController.signal)
} catch {}
}

try {
Expand Down
6 changes: 6 additions & 0 deletions src/fetch/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import { abortableDuplex } from 'abortable-iterator'
import { pipe } from 'it-pipe'
import first from 'it-first'
import { TimeoutController } from 'timeout-abort-controller'
import { setMaxListeners } from 'events'

const log = logger('libp2p:fetch')

Expand Down Expand Up @@ -99,6 +100,11 @@ export class FetchService implements Startable {
if (signal == null) {
timeoutController = new TimeoutController(this.init.timeout)
signal = timeoutController.signal

try {
// fails on node < 15.4
setMaxListeners?.(Infinity, timeoutController.signal)
} catch {}
}

try {
Expand Down
23 changes: 22 additions & 1 deletion src/identify/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import { TimeoutController } from 'timeout-abort-controller'
import type { AbortOptions } from '@libp2p/interfaces'
import { abortableDuplex } from 'abortable-iterator'
import type { Duplex } from 'it-stream-types'
import { setMaxListeners } from 'events'

const log = logger('libp2p:identify')

Expand Down Expand Up @@ -164,8 +165,13 @@ export class IdentifyService implements Startable {
const protocols = await this.components.getPeerStore().protoBook.get(this.components.getPeerId())

const pushes = connections.map(async connection => {
const timeoutController = new TimeoutController(this.init.timeout)
let stream: Stream | undefined
const timeoutController = new TimeoutController(this.init.timeout)

try {
// fails on node < 15.4
setMaxListeners?.(Infinity, timeoutController.signal)
} catch {}

try {
stream = await connection.newStream([this.identifyPushProtocolStr], {
Expand Down Expand Up @@ -234,6 +240,11 @@ export class IdentifyService implements Startable {
if (signal == null) {
timeoutController = new TimeoutController(this.init.timeout)
signal = timeoutController.signal

try {
// fails on node < 15.4
setMaxListeners?.(Infinity, timeoutController.signal)
} catch {}
}

try {
Expand Down Expand Up @@ -374,6 +385,11 @@ export class IdentifyService implements Startable {
const { connection, stream } = data
const timeoutController = new TimeoutController(this.init.timeout)

try {
// fails on node < 15.4
setMaxListeners?.(Infinity, timeoutController.signal)
} catch {}

try {
const publicKey = this.components.getPeerId().publicKey ?? new Uint8Array(0)
const peerData = await this.components.getPeerStore().get(this.components.getPeerId())
Expand Down Expand Up @@ -425,6 +441,11 @@ export class IdentifyService implements Startable {
const { connection, stream } = data
const timeoutController = new TimeoutController(this.init.timeout)

try {
// fails on node < 15.4
setMaxListeners?.(Infinity, timeoutController.signal)
} catch {}

let message: Identify | undefined
try {
// make stream abortable
Expand Down
6 changes: 6 additions & 0 deletions src/ping/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import type { AbortOptions } from '@libp2p/interfaces'
import { abortableDuplex } from 'abortable-iterator'
import { TimeoutController } from 'timeout-abort-controller'
import type { Stream } from '@libp2p/interface-connection'
import { setMaxListeners } from 'events'

const log = logger('libp2p:ping')

Expand Down Expand Up @@ -90,6 +91,11 @@ export class PingService implements Startable {
if (signal == null) {
timeoutController = new TimeoutController(this.init.timeout)
signal = timeoutController.signal

try {
// fails on node < 15.4
setMaxListeners?.(Infinity, timeoutController.signal)
} catch {}
}

try {
Expand Down
11 changes: 11 additions & 0 deletions src/upgrader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import type { Registrar } from '@libp2p/interface-registrar'
import { DEFAULT_MAX_INBOUND_STREAMS, DEFAULT_MAX_OUTBOUND_STREAMS } from './registrar.js'
import { TimeoutController } from 'timeout-abort-controller'
import { abortableDuplex } from 'abortable-iterator'
import { setMaxListeners } from 'events'

const log = logger('libp2p:upgrader')

Expand Down Expand Up @@ -133,6 +134,11 @@ export class DefaultUpgrader extends EventEmitter<UpgraderEvents> implements Upg

const timeoutController = new TimeoutController(this.inboundUpgradeTimeout)

try {
// fails on node < 15.4
setMaxListeners?.(Infinity, timeoutController.signal)
} catch {}

try {
const abortableStream = abortableDuplex(maConn, timeoutController.signal)
maConn.source = abortableStream.source
Expand Down Expand Up @@ -407,6 +413,11 @@ export class DefaultUpgrader extends EventEmitter<UpgraderEvents> implements Upg

controller = new TimeoutController(30000)
options.signal = controller.signal

try {
// fails on node < 15.4
setMaxListeners?.(Infinity, controller.signal)
} catch {}
}

let { stream, protocol } = await mss.select(protocols, options)
Expand Down

0 comments on commit 627b8bf

Please sign in to comment.