From ba56c6466232ad4aa5025e2db084c5c9ccd4e5d0 Mon Sep 17 00:00:00 2001 From: Alex Potsides Date: Fri, 15 Jul 2022 16:36:31 +0000 Subject: [PATCH] fix: add timeout for circuit relay (#1294) Make sure we don't potentially wait forever during incoming circuit relay handshakes. Adds a timeout option to the hop config to control how long we will wait. --- src/circuit/circuit/hop.ts | 19 +++-- src/circuit/circuit/stop.ts | 10 ++- src/circuit/index.ts | 13 +--- src/circuit/transport.ts | 137 +++++++++++++++++++++--------------- src/config.ts | 3 +- src/fetch/index.ts | 17 +++-- src/identify/index.ts | 17 +++-- src/index.ts | 1 + src/libp2p.ts | 2 +- src/ping/index.ts | 18 +++-- test/relay/relay.node.ts | 34 +++++++++ 11 files changed, 170 insertions(+), 101 deletions(-) diff --git a/src/circuit/circuit/hop.ts b/src/circuit/circuit/hop.ts index d8bbe03ed6..a7fd38850e 100644 --- a/src/circuit/circuit/hop.ts +++ b/src/circuit/circuit/hop.ts @@ -12,6 +12,7 @@ import { peerIdFromBytes } from '@libp2p/peer-id' import type { Duplex } from 'it-stream-types' import type { Circuit } from '../transport.js' import type { ConnectionManager } from '@libp2p/interface-connection-manager' +import type { AbortOptions } from '@libp2p/interfaces' const log = logger('libp2p:circuit:hop') @@ -118,7 +119,7 @@ export async function handleHop (hopRequest: HopRequest) { ) } -export interface HopConfig { +export interface HopConfig extends AbortOptions { connection: Connection request: CircuitPB } @@ -130,11 +131,14 @@ export interface HopConfig { export async function hop (options: HopConfig): Promise> { const { connection, - request + request, + signal } = options // Create a new stream to the relay - const stream = await connection.newStream(RELAY_CODEC) + const stream = await connection.newStream(RELAY_CODEC, { + signal + }) // Send the HOP request const streamHandler = new StreamHandler({ stream }) streamHandler.write(request) @@ -156,7 +160,7 @@ export async function hop (options: HopConfig): Promise> { throw errCode(new Error(`HOP request failed with code "${response.code ?? 'unknown'}"`), Errors.ERR_HOP_REQUEST_FAILED) } -export interface CanHopOptions { +export interface CanHopOptions extends AbortOptions { connection: Connection } @@ -165,11 +169,14 @@ export interface CanHopOptions { */ export async function canHop (options: CanHopOptions) { const { - connection + connection, + signal } = options // Create a new stream to the relay - const stream = await connection.newStream(RELAY_CODEC) + const stream = await connection.newStream(RELAY_CODEC, { + signal + }) // Send the HOP request const streamHandler = new StreamHandler({ stream }) diff --git a/src/circuit/circuit/stop.ts b/src/circuit/circuit/stop.ts index 75c97f66f1..1ad8f8e0f1 100644 --- a/src/circuit/circuit/stop.ts +++ b/src/circuit/circuit/stop.ts @@ -5,6 +5,7 @@ import { StreamHandler } from './stream-handler.js' import { validateAddrs } from './utils.js' import type { Connection } from '@libp2p/interface-connection' import type { Duplex } from 'it-stream-types' +import type { AbortOptions } from '@libp2p/interfaces' const log = logger('libp2p:circuit:stop') @@ -42,7 +43,7 @@ export function handleStop (options: HandleStopOptions): Duplex | un return streamHandler.rest() } -export interface StopOptions { +export interface StopOptions extends AbortOptions { connection: Connection request: CircuitPB } @@ -53,10 +54,13 @@ export interface StopOptions { export async function stop (options: StopOptions) { const { connection, - request + request, + signal } = options - const stream = await connection.newStream([RELAY_CODEC]) + const stream = await connection.newStream(RELAY_CODEC, { + signal + }) log('starting stop request to %p', connection.remotePeer) const streamHandler = new StreamHandler({ stream }) diff --git a/src/circuit/index.ts b/src/circuit/index.ts index b8219b57e8..ed6aebbcf6 100644 --- a/src/circuit/index.ts +++ b/src/circuit/index.ts @@ -13,6 +13,7 @@ import { import type { AddressSorter } from '@libp2p/interface-peer-store' import type { Startable } from '@libp2p/interfaces/startable' import type { Components } from '@libp2p/components' +import type { RelayConfig } from '../index.js' const log = logger('libp2p:relay') @@ -22,11 +23,6 @@ export interface RelayAdvertiseConfig { ttl?: number } -export interface HopConfig { - enabled?: boolean - active?: boolean -} - export interface AutoRelayConfig { enabled?: boolean @@ -36,13 +32,8 @@ export interface AutoRelayConfig { maxListeners: number } -export interface RelayInit { +export interface RelayInit extends RelayConfig { addressSorter?: AddressSorter - maxListeners?: number - onError?: (error: Error, msg?: string) => void - hop: HopConfig - advertise: RelayAdvertiseConfig - autoRelay: AutoRelayConfig } export class Relay implements Startable { diff --git a/src/circuit/transport.ts b/src/circuit/transport.ts index a4d4894cc3..295ef81410 100644 --- a/src/circuit/transport.ts +++ b/src/circuit/transport.ts @@ -17,12 +17,20 @@ import type { AbortOptions } from '@libp2p/interfaces' import type { IncomingStreamData } from '@libp2p/interface-registrar' import type { Listener, Transport, CreateListenerOptions, ConnectionHandler } from '@libp2p/interface-transport' import type { Connection } from '@libp2p/interface-connection' +import type { RelayConfig } from '../index.js' +import { abortableDuplex } from 'abortable-iterator' +import { TimeoutController } from 'timeout-abort-controller' const log = logger('libp2p:circuit') export class Circuit implements Transport, Initializable { private handler?: ConnectionHandler private components: Components = new Components() + private readonly _init: RelayConfig + + constructor (init: RelayConfig) { + this._init = init + } init (components: Components): void { this.components = components @@ -54,49 +62,20 @@ export class Circuit implements Transport, Initializable { async _onProtocol (data: IncomingStreamData) { const { connection, stream } = data - const streamHandler = new StreamHandler({ stream }) - const request = await streamHandler.read() - - if (request == null) { - log('request was invalid, could not read from stream') - streamHandler.write({ - type: CircuitPB.Type.STATUS, - code: CircuitPB.Status.MALFORMED_MESSAGE - }) - streamHandler.close() - return - } + const controller = new TimeoutController(this._init.hop.timeout) - let virtualConnection + try { + const source = abortableDuplex(stream, controller.signal) + const streamHandler = new StreamHandler({ + stream: { + ...stream, + ...source + } + }) + const request = await streamHandler.read() - switch (request.type) { - case CircuitPB.Type.CAN_HOP: { - log('received CAN_HOP request from %p', connection.remotePeer) - await handleCanHop({ circuit: this, connection, streamHandler }) - break - } - case CircuitPB.Type.HOP: { - log('received HOP request from %p', connection.remotePeer) - virtualConnection = await handleHop({ - connection, - request, - streamHandler, - circuit: this, - connectionManager: this.components.getConnectionManager() - }) - break - } - case CircuitPB.Type.STOP: { - log('received STOP request from %p', connection.remotePeer) - virtualConnection = await handleStop({ - connection, - request, - streamHandler - }) - break - } - default: { - log('Request of type %s not supported', request.type) + if (request == null) { + log('request was invalid, could not read from stream') streamHandler.write({ type: CircuitPB.Type.STATUS, code: CircuitPB.Status.MALFORMED_MESSAGE @@ -104,27 +83,68 @@ export class Circuit implements Transport, Initializable { streamHandler.close() return } - } - if (virtualConnection != null) { - // @ts-expect-error dst peer will not be undefined - const remoteAddr = new Multiaddr(request.dstPeer.addrs[0]) - // @ts-expect-error dst peer will not be undefined - const localAddr = new Multiaddr(request.srcPeer.addrs[0]) - const maConn = streamToMaConnection({ - stream: virtualConnection, - remoteAddr, - localAddr - }) - const type = request.type === CircuitPB.Type.HOP ? 'relay' : 'inbound' - log('new %s connection %s', type, maConn.remoteAddr) + let virtualConnection - const conn = await this.components.getUpgrader().upgradeInbound(maConn) - log('%s connection %s upgraded', type, maConn.remoteAddr) + switch (request.type) { + case CircuitPB.Type.CAN_HOP: { + log('received CAN_HOP request from %p', connection.remotePeer) + await handleCanHop({ circuit: this, connection, streamHandler }) + break + } + case CircuitPB.Type.HOP: { + log('received HOP request from %p', connection.remotePeer) + virtualConnection = await handleHop({ + connection, + request, + streamHandler, + circuit: this, + connectionManager: this.components.getConnectionManager() + }) + break + } + case CircuitPB.Type.STOP: { + log('received STOP request from %p', connection.remotePeer) + virtualConnection = await handleStop({ + connection, + request, + streamHandler + }) + break + } + default: { + log('Request of type %s not supported', request.type) + streamHandler.write({ + type: CircuitPB.Type.STATUS, + code: CircuitPB.Status.MALFORMED_MESSAGE + }) + streamHandler.close() + return + } + } + + if (virtualConnection != null) { + // @ts-expect-error dst peer will not be undefined + const remoteAddr = new Multiaddr(request.dstPeer.addrs[0]) + // @ts-expect-error dst peer will not be undefined + const localAddr = new Multiaddr(request.srcPeer.addrs[0]) + const maConn = streamToMaConnection({ + stream: virtualConnection, + remoteAddr, + localAddr + }) + const type = request.type === CircuitPB.Type.HOP ? 'relay' : 'inbound' + log('new %s connection %s', type, maConn.remoteAddr) + + const conn = await this.components.getUpgrader().upgradeInbound(maConn) + log('%s connection %s upgraded', type, maConn.remoteAddr) - if (this.handler != null) { - this.handler(conn) + if (this.handler != null) { + this.handler(conn) + } } + } finally { + controller.clear() } } @@ -160,6 +180,7 @@ export class Circuit implements Transport, Initializable { try { const virtualConnection = await hop({ + ...options, connection: relayConnection, request: { type: CircuitPB.Type.HOP, diff --git a/src/config.ts b/src/config.ts index 6e88a67456..a3acf51a2c 100644 --- a/src/config.ts +++ b/src/config.ts @@ -69,7 +69,8 @@ const DefaultConfig: Partial = { }, hop: { enabled: false, - active: false + active: false, + timeout: 30000 }, autoRelay: { enabled: false, diff --git a/src/fetch/index.ts b/src/fetch/index.ts index 9add8e8f46..327d5049df 100644 --- a/src/fetch/index.ts +++ b/src/fetch/index.ts @@ -93,6 +93,7 @@ export class FetchService implements Startable { const connection = await this.components.getConnectionManager().openConnection(peer, options) let timeoutController let signal = options.signal + let stream: Stream | undefined // create a timeout if no abort signal passed if (signal == null) { @@ -100,14 +101,14 @@ export class FetchService implements Startable { signal = timeoutController.signal } - const stream = await connection.newStream([this.protocol], { - signal - }) + try { + stream = await connection.newStream([this.protocol], { + signal + }) - // make stream abortable - const source = abortableDuplex(stream, signal) + // make stream abortable + const source = abortableDuplex(stream, signal) - try { const result = await pipe( [FetchRequest.encode({ identifier: key })], lp.encode(), @@ -146,7 +147,9 @@ export class FetchService implements Startable { timeoutController.clear() } - stream.close() + if (stream != null) { + stream.close() + } } } diff --git a/src/identify/index.ts b/src/identify/index.ts index 8a871f1982..268ab057ab 100644 --- a/src/identify/index.ts +++ b/src/identify/index.ts @@ -228,6 +228,7 @@ export class IdentifyService implements Startable { async _identify (connection: Connection, options: AbortOptions = {}): Promise { let timeoutController let signal = options.signal + let stream: Stream | undefined // create a timeout if no abort signal passed if (signal == null) { @@ -235,14 +236,14 @@ export class IdentifyService implements Startable { signal = timeoutController.signal } - const stream = await connection.newStream([this.identifyProtocolStr], { - signal - }) + try { + stream = await connection.newStream([this.identifyProtocolStr], { + signal + }) - // make stream abortable - const source = abortableDuplex(stream, signal) + // make stream abortable + const source = abortableDuplex(stream, signal) - try { const data = await pipe( [], source, @@ -266,7 +267,9 @@ export class IdentifyService implements Startable { timeoutController.clear() } - stream.close() + if (stream != null) { + stream.close() + } } } diff --git a/src/index.ts b/src/index.ts index b7aa62878e..d5c3830709 100644 --- a/src/index.ts +++ b/src/index.ts @@ -50,6 +50,7 @@ export interface MetricsConfig { export interface HopConfig { enabled?: boolean active?: boolean + timeout: number } export interface RelayConfig { diff --git a/src/libp2p.ts b/src/libp2p.ts index d4ae584b88..0a5a435df5 100644 --- a/src/libp2p.ts +++ b/src/libp2p.ts @@ -218,7 +218,7 @@ export class Libp2pNode extends EventEmitter implements Libp2p { }))) if (init.relay.enabled) { - this.components.getTransportManager().add(this.configureComponent(new Circuit())) + this.components.getTransportManager().add(this.configureComponent(new Circuit(init.relay))) this.configureComponent(new Relay(this.components, { addressSorter: init.connectionManager.addressSorter, diff --git a/src/ping/index.ts b/src/ping/index.ts index 1fc1567a4e..98c249093a 100644 --- a/src/ping/index.ts +++ b/src/ping/index.ts @@ -13,6 +13,7 @@ import type { Components } from '@libp2p/components' import type { AbortOptions } from '@libp2p/interfaces' import { abortableDuplex } from 'abortable-iterator' import { TimeoutController } from 'timeout-abort-controller' +import type { Stream } from '@libp2p/interface-connection' const log = logger('libp2p:ping') @@ -83,6 +84,7 @@ export class PingService implements Startable { const connection = await this.components.getConnectionManager().openConnection(peer, options) let timeoutController let signal = options.signal + let stream: Stream | undefined // create a timeout if no abort signal passed if (signal == null) { @@ -90,14 +92,14 @@ export class PingService implements Startable { signal = timeoutController.signal } - const stream = await connection.newStream([this.protocol], { - signal - }) + try { + stream = await connection.newStream([this.protocol], { + signal + }) - // make stream abortable - const source = abortableDuplex(stream, signal) + // make stream abortable + const source = abortableDuplex(stream, signal) - try { const result = await pipe( [data], source, @@ -115,7 +117,9 @@ export class PingService implements Startable { timeoutController.clear() } - stream.close() + if (stream != null) { + stream.close() + } } } } diff --git a/test/relay/relay.node.ts b/test/relay/relay.node.ts index 60e1bf5f60..4249f01b96 100644 --- a/test/relay/relay.node.ts +++ b/test/relay/relay.node.ts @@ -13,6 +13,7 @@ import { RELAY_CODEC } from '../../src/circuit/multicodec.js' import { StreamHandler } from '../../src/circuit/circuit/stream-handler.js' import { CircuitRelay } from '../../src/circuit/pb/index.js' import { createNodeOptions, createRelayOptions } from './utils.js' +import delay from 'delay' describe('Dialing (via relay, TCP)', () => { let srcLibp2p: Libp2pNode @@ -170,4 +171,37 @@ describe('Dialing (via relay, TCP)', () => { expect(dstToRelayConn).to.have.lengthOf(1) expect(dstToRelayConn).to.have.nested.property('[0].stat.status', 'OPEN') }) + + it('should time out when establishing a relay connection', async () => { + await relayLibp2p.stop() + relayLibp2p = await createNode({ + config: createRelayOptions({ + relay: { + autoRelay: { + enabled: false + }, + hop: { + // very short timeout + timeout: 10 + } + } + }) + }) + + const relayAddr = relayLibp2p.components.getTransportManager().getAddrs()[0] + const dialAddr = relayAddr.encapsulate(`/p2p/${relayLibp2p.peerId.toString()}`) + + const connection = await srcLibp2p.dial(dialAddr) + const stream = await connection.newStream('/libp2p/circuit/relay/0.1.0') + + await stream.sink(async function * () { + // delay for longer than the timeout + await delay(1000) + yield Uint8Array.from([0]) + }()) + + // because we timed out, the remote should have reset the stream + await expect(all(stream.source)).to.eventually.be.rejected + .with.property('code', 'ERR_MPLEX_STREAM_RESET') + }) })