Skip to content

Commit

Permalink
fix: add timeout for circuit relay (#1294)
Browse files Browse the repository at this point in the history
Make sure we don't potentially wait forever during incoming circuit relay handshakes.

Adds a timeout option to the hop config to control how long we will wait.
  • Loading branch information
achingbrain committed Jul 15, 2022
1 parent 0bb1b80 commit ba56c64
Show file tree
Hide file tree
Showing 11 changed files with 170 additions and 101 deletions.
19 changes: 13 additions & 6 deletions src/circuit/circuit/hop.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import { peerIdFromBytes } from '@libp2p/peer-id'
import type { Duplex } from 'it-stream-types'
import type { Circuit } from '../transport.js'
import type { ConnectionManager } from '@libp2p/interface-connection-manager'
import type { AbortOptions } from '@libp2p/interfaces'

const log = logger('libp2p:circuit:hop')

Expand Down Expand Up @@ -118,7 +119,7 @@ export async function handleHop (hopRequest: HopRequest) {
)
}

export interface HopConfig {
export interface HopConfig extends AbortOptions {
connection: Connection
request: CircuitPB
}
Expand All @@ -130,11 +131,14 @@ export interface HopConfig {
export async function hop (options: HopConfig): Promise<Duplex<Uint8Array>> {
const {
connection,
request
request,
signal
} = options

// Create a new stream to the relay
const stream = await connection.newStream(RELAY_CODEC)
const stream = await connection.newStream(RELAY_CODEC, {
signal
})
// Send the HOP request
const streamHandler = new StreamHandler({ stream })
streamHandler.write(request)
Expand All @@ -156,7 +160,7 @@ export async function hop (options: HopConfig): Promise<Duplex<Uint8Array>> {
throw errCode(new Error(`HOP request failed with code "${response.code ?? 'unknown'}"`), Errors.ERR_HOP_REQUEST_FAILED)
}

export interface CanHopOptions {
export interface CanHopOptions extends AbortOptions {
connection: Connection
}

Expand All @@ -165,11 +169,14 @@ export interface CanHopOptions {
*/
export async function canHop (options: CanHopOptions) {
const {
connection
connection,
signal
} = options

// Create a new stream to the relay
const stream = await connection.newStream(RELAY_CODEC)
const stream = await connection.newStream(RELAY_CODEC, {
signal
})

// Send the HOP request
const streamHandler = new StreamHandler({ stream })
Expand Down
10 changes: 7 additions & 3 deletions src/circuit/circuit/stop.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { StreamHandler } from './stream-handler.js'
import { validateAddrs } from './utils.js'
import type { Connection } from '@libp2p/interface-connection'
import type { Duplex } from 'it-stream-types'
import type { AbortOptions } from '@libp2p/interfaces'

const log = logger('libp2p:circuit:stop')

Expand Down Expand Up @@ -42,7 +43,7 @@ export function handleStop (options: HandleStopOptions): Duplex<Uint8Array> | un
return streamHandler.rest()
}

export interface StopOptions {
export interface StopOptions extends AbortOptions {
connection: Connection
request: CircuitPB
}
Expand All @@ -53,10 +54,13 @@ export interface StopOptions {
export async function stop (options: StopOptions) {
const {
connection,
request
request,
signal
} = options

const stream = await connection.newStream([RELAY_CODEC])
const stream = await connection.newStream(RELAY_CODEC, {
signal
})
log('starting stop request to %p', connection.remotePeer)
const streamHandler = new StreamHandler({ stream })

Expand Down
13 changes: 2 additions & 11 deletions src/circuit/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import {
import type { AddressSorter } from '@libp2p/interface-peer-store'
import type { Startable } from '@libp2p/interfaces/startable'
import type { Components } from '@libp2p/components'
import type { RelayConfig } from '../index.js'

const log = logger('libp2p:relay')

Expand All @@ -22,11 +23,6 @@ export interface RelayAdvertiseConfig {
ttl?: number
}

export interface HopConfig {
enabled?: boolean
active?: boolean
}

export interface AutoRelayConfig {
enabled?: boolean

Expand All @@ -36,13 +32,8 @@ export interface AutoRelayConfig {
maxListeners: number
}

export interface RelayInit {
export interface RelayInit extends RelayConfig {
addressSorter?: AddressSorter
maxListeners?: number
onError?: (error: Error, msg?: string) => void
hop: HopConfig
advertise: RelayAdvertiseConfig
autoRelay: AutoRelayConfig
}

export class Relay implements Startable {
Expand Down
137 changes: 79 additions & 58 deletions src/circuit/transport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,20 @@ import type { AbortOptions } from '@libp2p/interfaces'
import type { IncomingStreamData } from '@libp2p/interface-registrar'
import type { Listener, Transport, CreateListenerOptions, ConnectionHandler } from '@libp2p/interface-transport'
import type { Connection } from '@libp2p/interface-connection'
import type { RelayConfig } from '../index.js'
import { abortableDuplex } from 'abortable-iterator'
import { TimeoutController } from 'timeout-abort-controller'

const log = logger('libp2p:circuit')

export class Circuit implements Transport, Initializable {
private handler?: ConnectionHandler
private components: Components = new Components()
private readonly _init: RelayConfig

constructor (init: RelayConfig) {
this._init = init
}

init (components: Components): void {
this.components = components
Expand Down Expand Up @@ -54,77 +62,89 @@ export class Circuit implements Transport, Initializable {

async _onProtocol (data: IncomingStreamData) {
const { connection, stream } = data
const streamHandler = new StreamHandler({ stream })
const request = await streamHandler.read()

if (request == null) {
log('request was invalid, could not read from stream')
streamHandler.write({
type: CircuitPB.Type.STATUS,
code: CircuitPB.Status.MALFORMED_MESSAGE
})
streamHandler.close()
return
}
const controller = new TimeoutController(this._init.hop.timeout)

let virtualConnection
try {
const source = abortableDuplex(stream, controller.signal)
const streamHandler = new StreamHandler({
stream: {
...stream,
...source
}
})
const request = await streamHandler.read()

switch (request.type) {
case CircuitPB.Type.CAN_HOP: {
log('received CAN_HOP request from %p', connection.remotePeer)
await handleCanHop({ circuit: this, connection, streamHandler })
break
}
case CircuitPB.Type.HOP: {
log('received HOP request from %p', connection.remotePeer)
virtualConnection = await handleHop({
connection,
request,
streamHandler,
circuit: this,
connectionManager: this.components.getConnectionManager()
})
break
}
case CircuitPB.Type.STOP: {
log('received STOP request from %p', connection.remotePeer)
virtualConnection = await handleStop({
connection,
request,
streamHandler
})
break
}
default: {
log('Request of type %s not supported', request.type)
if (request == null) {
log('request was invalid, could not read from stream')
streamHandler.write({
type: CircuitPB.Type.STATUS,
code: CircuitPB.Status.MALFORMED_MESSAGE
})
streamHandler.close()
return
}
}

if (virtualConnection != null) {
// @ts-expect-error dst peer will not be undefined
const remoteAddr = new Multiaddr(request.dstPeer.addrs[0])
// @ts-expect-error dst peer will not be undefined
const localAddr = new Multiaddr(request.srcPeer.addrs[0])
const maConn = streamToMaConnection({
stream: virtualConnection,
remoteAddr,
localAddr
})
const type = request.type === CircuitPB.Type.HOP ? 'relay' : 'inbound'
log('new %s connection %s', type, maConn.remoteAddr)
let virtualConnection

const conn = await this.components.getUpgrader().upgradeInbound(maConn)
log('%s connection %s upgraded', type, maConn.remoteAddr)
switch (request.type) {
case CircuitPB.Type.CAN_HOP: {
log('received CAN_HOP request from %p', connection.remotePeer)
await handleCanHop({ circuit: this, connection, streamHandler })
break
}
case CircuitPB.Type.HOP: {
log('received HOP request from %p', connection.remotePeer)
virtualConnection = await handleHop({
connection,
request,
streamHandler,
circuit: this,
connectionManager: this.components.getConnectionManager()
})
break
}
case CircuitPB.Type.STOP: {
log('received STOP request from %p', connection.remotePeer)
virtualConnection = await handleStop({
connection,
request,
streamHandler
})
break
}
default: {
log('Request of type %s not supported', request.type)
streamHandler.write({
type: CircuitPB.Type.STATUS,
code: CircuitPB.Status.MALFORMED_MESSAGE
})
streamHandler.close()
return
}
}

if (virtualConnection != null) {
// @ts-expect-error dst peer will not be undefined
const remoteAddr = new Multiaddr(request.dstPeer.addrs[0])
// @ts-expect-error dst peer will not be undefined
const localAddr = new Multiaddr(request.srcPeer.addrs[0])
const maConn = streamToMaConnection({
stream: virtualConnection,
remoteAddr,
localAddr
})
const type = request.type === CircuitPB.Type.HOP ? 'relay' : 'inbound'
log('new %s connection %s', type, maConn.remoteAddr)

const conn = await this.components.getUpgrader().upgradeInbound(maConn)
log('%s connection %s upgraded', type, maConn.remoteAddr)

if (this.handler != null) {
this.handler(conn)
if (this.handler != null) {
this.handler(conn)
}
}
} finally {
controller.clear()
}
}

Expand Down Expand Up @@ -160,6 +180,7 @@ export class Circuit implements Transport, Initializable {

try {
const virtualConnection = await hop({
...options,
connection: relayConnection,
request: {
type: CircuitPB.Type.HOP,
Expand Down
3 changes: 2 additions & 1 deletion src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ const DefaultConfig: Partial<Libp2pInit> = {
},
hop: {
enabled: false,
active: false
active: false,
timeout: 30000
},
autoRelay: {
enabled: false,
Expand Down
17 changes: 10 additions & 7 deletions src/fetch/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -93,21 +93,22 @@ export class FetchService implements Startable {
const connection = await this.components.getConnectionManager().openConnection(peer, options)
let timeoutController
let signal = options.signal
let stream: Stream | undefined

// create a timeout if no abort signal passed
if (signal == null) {
timeoutController = new TimeoutController(this.init.timeout)
signal = timeoutController.signal
}

const stream = await connection.newStream([this.protocol], {
signal
})
try {
stream = await connection.newStream([this.protocol], {
signal
})

// make stream abortable
const source = abortableDuplex(stream, signal)
// make stream abortable
const source = abortableDuplex(stream, signal)

try {
const result = await pipe(
[FetchRequest.encode({ identifier: key })],
lp.encode(),
Expand Down Expand Up @@ -146,7 +147,9 @@ export class FetchService implements Startable {
timeoutController.clear()
}

stream.close()
if (stream != null) {
stream.close()
}
}
}

Expand Down
17 changes: 10 additions & 7 deletions src/identify/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -228,21 +228,22 @@ export class IdentifyService implements Startable {
async _identify (connection: Connection, options: AbortOptions = {}): Promise<Identify> {
let timeoutController
let signal = options.signal
let stream: Stream | undefined

// create a timeout if no abort signal passed
if (signal == null) {
timeoutController = new TimeoutController(this.init.timeout)
signal = timeoutController.signal
}

const stream = await connection.newStream([this.identifyProtocolStr], {
signal
})
try {
stream = await connection.newStream([this.identifyProtocolStr], {
signal
})

// make stream abortable
const source = abortableDuplex(stream, signal)
// make stream abortable
const source = abortableDuplex(stream, signal)

try {
const data = await pipe(
[],
source,
Expand All @@ -266,7 +267,9 @@ export class IdentifyService implements Startable {
timeoutController.clear()
}

stream.close()
if (stream != null) {
stream.close()
}
}
}

Expand Down
Loading

0 comments on commit ba56c64

Please sign in to comment.