Skip to content

Commit

Permalink
feat: add node.js/electron support for webrtc transport (#1905)
Browse files Browse the repository at this point in the history
  • Loading branch information
achingbrain committed Jul 31, 2023
1 parent fdd8082 commit 72e81dc
Show file tree
Hide file tree
Showing 17 changed files with 834 additions and 230 deletions.
3 changes: 0 additions & 3 deletions packages/transport-webrtc/.aegir.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,6 @@
/** @type {import('aegir').PartialOptions} */
export default {
build: {
config: {
platform: 'node'
},
bundlesizeMax: '117KB'
},
test: {
Expand Down
7 changes: 6 additions & 1 deletion packages/transport-webrtc/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@
"scripts": {
"generate": "protons src/private-to-private/pb/message.proto src/pb/message.proto",
"build": "aegir build",
"test": "aegir test -t browser",
"test": "aegir test -t node -t browser -t electron-main",
"test:node": "aegir test -t node --cov",
"test:chrome": "aegir test -t browser --cov",
"test:firefox": "aegir test -t browser -- --browser firefox",
"lint": "aegir lint",
Expand All @@ -61,6 +62,7 @@
"it-to-buffer": "^4.0.2",
"multiformats": "^12.0.1",
"multihashes": "^4.0.3",
"node-datachannel": "^0.4.3",
"p-defer": "^4.0.0",
"p-event": "^6.0.0",
"protons-runtime": "^5.0.0",
Expand All @@ -82,5 +84,8 @@
"protons": "^7.0.2",
"sinon": "^15.1.2",
"sinon-ts": "^1.0.0"
},
"browser": {
"./dist/src/webrtc/index.js": "./dist/src/webrtc/index.browser.js"
}
}
222 changes: 120 additions & 102 deletions packages/transport-webrtc/src/private-to-private/handler.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import { CodeError } from '@libp2p/interface/errors'
import { logger } from '@libp2p/logger'
import { abortableDuplex } from 'abortable-iterator'
import { pbStream } from 'it-protobuf-stream'
import pDefer, { type DeferredPromise } from 'p-defer'
import { DataChannelMuxerFactory } from '../muxer.js'
import { RTCPeerConnection, RTCSessionDescription } from '../webrtc/index.js'
import { Message } from './pb/message.js'
import { readCandidatesUntilConnected, resolveOnConnected } from './util.js'
import type { DataChannelOpts } from '../stream.js'
Expand All @@ -20,66 +22,75 @@ export async function handleIncomingStream ({ rtcConfiguration, dataChannelOptio
const signal = AbortSignal.timeout(DEFAULT_TIMEOUT)
const stream = pbStream(abortableDuplex(rawStream, signal)).pb(Message)
const pc = new RTCPeerConnection(rtcConfiguration)
const muxerFactory = new DataChannelMuxerFactory({ peerConnection: pc, dataChannelOptions })
const connectedPromise: DeferredPromise<void> = pDefer()
const answerSentPromise: DeferredPromise<void> = pDefer()

signal.onabort = () => { connectedPromise.reject() }
// candidate callbacks
pc.onicecandidate = ({ candidate }) => {
answerSentPromise.promise.then(
async () => {
await stream.write({
type: Message.Type.ICE_CANDIDATE,
data: (candidate != null) ? JSON.stringify(candidate.toJSON()) : ''
})
},
(err) => {
log.error('cannot set candidate since sending answer failed', err)
}
)
}

resolveOnConnected(pc, connectedPromise)
try {
const muxerFactory = new DataChannelMuxerFactory({ peerConnection: pc, dataChannelOptions })
const connectedPromise: DeferredPromise<void> = pDefer()
const answerSentPromise: DeferredPromise<void> = pDefer()

signal.onabort = () => {
connectedPromise.reject(new CodeError('Timed out while trying to connect', 'ERR_TIMEOUT'))
}
// candidate callbacks
pc.onicecandidate = ({ candidate }) => {
answerSentPromise.promise.then(
async () => {
await stream.write({
type: Message.Type.ICE_CANDIDATE,
data: (candidate != null) ? JSON.stringify(candidate.toJSON()) : ''
})
},
(err) => {
log.error('cannot set candidate since sending answer failed', err)
connectedPromise.reject(err)
}
)
}

resolveOnConnected(pc, connectedPromise)

// read an SDP offer
const pbOffer = await stream.read()
if (pbOffer.type !== Message.Type.SDP_OFFER) {
throw new Error(`expected message type SDP_OFFER, received: ${pbOffer.type ?? 'undefined'} `)
}
const offer = new RTCSessionDescription({
type: 'offer',
sdp: pbOffer.data
})

await pc.setRemoteDescription(offer).catch(err => {
log.error('could not execute setRemoteDescription', err)
throw new Error('Failed to set remoteDescription')
})

// create and write an SDP answer
const answer = await pc.createAnswer().catch(err => {
log.error('could not execute createAnswer', err)
answerSentPromise.reject(err)
throw new Error('Failed to create answer')
})
// write the answer to the remote
await stream.write({ type: Message.Type.SDP_ANSWER, data: answer.sdp })

await pc.setLocalDescription(answer).catch(err => {
log.error('could not execute setLocalDescription', err)
answerSentPromise.reject(err)
throw new Error('Failed to set localDescription')
})

answerSentPromise.resolve()

// wait until candidates are connected
await readCandidatesUntilConnected(connectedPromise, pc, stream)

// read an SDP offer
const pbOffer = await stream.read()
if (pbOffer.type !== Message.Type.SDP_OFFER) {
throw new Error(`expected message type SDP_OFFER, received: ${pbOffer.type ?? 'undefined'} `)
const remoteAddress = parseRemoteAddress(pc.currentRemoteDescription?.sdp ?? '')

return { pc, muxerFactory, remoteAddress }
} catch (err) {
pc.close()
throw err
}
const offer = new RTCSessionDescription({
type: 'offer',
sdp: pbOffer.data
})

await pc.setRemoteDescription(offer).catch(err => {
log.error('could not execute setRemoteDescription', err)
throw new Error('Failed to set remoteDescription')
})

// create and write an SDP answer
const answer = await pc.createAnswer().catch(err => {
log.error('could not execute createAnswer', err)
answerSentPromise.reject(err)
throw new Error('Failed to create answer')
})
// write the answer to the remote
await stream.write({ type: Message.Type.SDP_ANSWER, data: answer.sdp })

await pc.setLocalDescription(answer).catch(err => {
log.error('could not execute setLocalDescription', err)
answerSentPromise.reject(err)
throw new Error('Failed to set localDescription')
})

answerSentPromise.resolve()

// wait until candidates are connected
await readCandidatesUntilConnected(connectedPromise, pc, stream)

const remoteAddress = parseRemoteAddress(pc.currentRemoteDescription?.sdp ?? '')

return { pc, muxerFactory, remoteAddress }
}

export interface ConnectOptions {
Expand All @@ -93,56 +104,63 @@ export async function initiateConnection ({ rtcConfiguration, dataChannelOptions
const stream = pbStream(abortableDuplex(rawStream, signal)).pb(Message)
// setup peer connection
const pc = new RTCPeerConnection(rtcConfiguration)
const muxerFactory = new DataChannelMuxerFactory({ peerConnection: pc, dataChannelOptions })

const connectedPromise: DeferredPromise<void> = pDefer()
resolveOnConnected(pc, connectedPromise)

// reject the connectedPromise if the signal aborts
signal.onabort = connectedPromise.reject
// we create the channel so that the peerconnection has a component for which
// to collect candidates. The label is not relevant to connection initiation
// but can be useful for debugging
const channel = pc.createDataChannel('init')
// setup callback to write ICE candidates to the remote
// peer
pc.onicecandidate = ({ candidate }) => {
void stream.write({
type: Message.Type.ICE_CANDIDATE,
data: (candidate != null) ? JSON.stringify(candidate.toJSON()) : ''
})
.catch(err => {
log.error('error sending ICE candidate', err)

try {
const muxerFactory = new DataChannelMuxerFactory({ peerConnection: pc, dataChannelOptions })

const connectedPromise: DeferredPromise<void> = pDefer()
resolveOnConnected(pc, connectedPromise)

// reject the connectedPromise if the signal aborts
signal.onabort = connectedPromise.reject
// we create the channel so that the peerconnection has a component for which
// to collect candidates. The label is not relevant to connection initiation
// but can be useful for debugging
const channel = pc.createDataChannel('init')
// setup callback to write ICE candidates to the remote
// peer
pc.onicecandidate = ({ candidate }) => {
void stream.write({
type: Message.Type.ICE_CANDIDATE,
data: (candidate != null) ? JSON.stringify(candidate.toJSON()) : ''
})
}
// create an offer
const offerSdp = await pc.createOffer()
// write the offer to the stream
await stream.write({ type: Message.Type.SDP_OFFER, data: offerSdp.sdp })
// set offer as local description
await pc.setLocalDescription(offerSdp).catch(err => {
log.error('could not execute setLocalDescription', err)
throw new Error('Failed to set localDescription')
})

// read answer
const answerMessage = await stream.read()
if (answerMessage.type !== Message.Type.SDP_ANSWER) {
throw new Error('remote should send an SDP answer')
}
.catch(err => {
log.error('error sending ICE candidate', err)
})
}

// create an offer
const offerSdp = await pc.createOffer()
// write the offer to the stream
await stream.write({ type: Message.Type.SDP_OFFER, data: offerSdp.sdp })
// set offer as local description
await pc.setLocalDescription(offerSdp).catch(err => {
log.error('could not execute setLocalDescription', err)
throw new Error('Failed to set localDescription')
})

const answerSdp = new RTCSessionDescription({ type: 'answer', sdp: answerMessage.data })
await pc.setRemoteDescription(answerSdp).catch(err => {
log.error('could not execute setRemoteDescription', err)
throw new Error('Failed to set remoteDescription')
})
// read answer
const answerMessage = await stream.read()
if (answerMessage.type !== Message.Type.SDP_ANSWER) {
throw new Error('remote should send an SDP answer')
}

await readCandidatesUntilConnected(connectedPromise, pc, stream)
channel.close()
const answerSdp = new RTCSessionDescription({ type: 'answer', sdp: answerMessage.data })
await pc.setRemoteDescription(answerSdp).catch(err => {
log.error('could not execute setRemoteDescription', err)
throw new Error('Failed to set remoteDescription')
})

const remoteAddress = parseRemoteAddress(pc.currentRemoteDescription?.sdp ?? '')
await readCandidatesUntilConnected(connectedPromise, pc, stream)
channel.close()

return { pc, muxerFactory, remoteAddress }
const remoteAddress = parseRemoteAddress(pc.currentRemoteDescription?.sdp ?? '')

return { pc, muxerFactory, remoteAddress }
} catch (err) {
pc.close()
throw err
}
}

function parseRemoteAddress (sdp: string): string {
Expand Down
2 changes: 2 additions & 0 deletions packages/transport-webrtc/src/private-to-private/transport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { peerIdFromString } from '@libp2p/peer-id'
import { multiaddr, type Multiaddr, protocols } from '@multiformats/multiaddr'
import { codes } from '../error.js'
import { WebRTCMultiaddrConnection } from '../maconn.js'
import { cleanup } from '../webrtc/index.js'
import { initiateConnection, handleIncomingStream } from './handler.js'
import { WebRTCPeerListener } from './listener.js'
import type { DataChannelOpts } from '../stream.js'
Expand Down Expand Up @@ -57,6 +58,7 @@ export class WebRTCTransport implements Transport, Startable {

async stop (): Promise<void> {
await this.components.registrar.unhandle(SIGNALING_PROTO_ID)
cleanup()
this._started = false
}

Expand Down
1 change: 1 addition & 0 deletions packages/transport-webrtc/src/private-to-private/util.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { logger } from '@libp2p/logger'
import { isFirefox } from '../util.js'
import { RTCIceCandidate } from '../webrtc/index.js'
import { Message } from './pb/message.js'
import type { DeferredPromise } from 'p-defer'

Expand Down

0 comments on commit 72e81dc

Please sign in to comment.