From 72e81dc1ab66fe0bbcafe3261ec20e2a28aaad5f Mon Sep 17 00:00:00 2001 From: Alex Potsides Date: Mon, 31 Jul 2023 16:50:53 +0100 Subject: [PATCH] feat: add node.js/electron support for webrtc transport (#1905) --- packages/transport-webrtc/.aegir.js | 3 - packages/transport-webrtc/package.json | 7 +- .../src/private-to-private/handler.ts | 222 +++++++------ .../src/private-to-private/transport.ts | 2 + .../src/private-to-private/util.ts | 1 + .../src/private-to-public/transport.ts | 212 ++++++------ .../src/webrtc/index.browser.ts | 4 + packages/transport-webrtc/src/webrtc/index.ts | 12 + .../src/webrtc/rtc-data-channel.ts | 140 ++++++++ .../transport-webrtc/src/webrtc/rtc-events.ts | 19 ++ .../src/webrtc/rtc-ice-candidate.ts | 50 +++ .../src/webrtc/rtc-peer-connection.ts | 306 ++++++++++++++++++ .../src/webrtc/rtc-session-description.ts | 19 ++ .../transport-webrtc/test/listener.spec.ts | 2 +- .../test/maconn.browser.spec.ts | 1 + .../test/peer.browser.spec.ts | 36 ++- .../test/stream.browser.spec.ts | 28 +- 17 files changed, 834 insertions(+), 230 deletions(-) create mode 100644 packages/transport-webrtc/src/webrtc/index.browser.ts create mode 100644 packages/transport-webrtc/src/webrtc/index.ts create mode 100644 packages/transport-webrtc/src/webrtc/rtc-data-channel.ts create mode 100644 packages/transport-webrtc/src/webrtc/rtc-events.ts create mode 100644 packages/transport-webrtc/src/webrtc/rtc-ice-candidate.ts create mode 100644 packages/transport-webrtc/src/webrtc/rtc-peer-connection.ts create mode 100644 packages/transport-webrtc/src/webrtc/rtc-session-description.ts diff --git a/packages/transport-webrtc/.aegir.js b/packages/transport-webrtc/.aegir.js index f19d27456f..491576df87 100644 --- a/packages/transport-webrtc/.aegir.js +++ b/packages/transport-webrtc/.aegir.js @@ -2,9 +2,6 @@ /** @type {import('aegir').PartialOptions} */ export default { build: { - config: { - platform: 'node' - }, bundlesizeMax: '117KB' }, test: { diff --git a/packages/transport-webrtc/package.json b/packages/transport-webrtc/package.json index c0fe67af9f..cf9a0dbeb4 100644 --- a/packages/transport-webrtc/package.json +++ b/packages/transport-webrtc/package.json @@ -35,7 +35,8 @@ "scripts": { "generate": "protons src/private-to-private/pb/message.proto src/pb/message.proto", "build": "aegir build", - "test": "aegir test -t browser", + "test": "aegir test -t node -t browser -t electron-main", + "test:node": "aegir test -t node --cov", "test:chrome": "aegir test -t browser --cov", "test:firefox": "aegir test -t browser -- --browser firefox", "lint": "aegir lint", @@ -61,6 +62,7 @@ "it-to-buffer": "^4.0.2", "multiformats": "^12.0.1", "multihashes": "^4.0.3", + "node-datachannel": "^0.4.3", "p-defer": "^4.0.0", "p-event": "^6.0.0", "protons-runtime": "^5.0.0", @@ -82,5 +84,8 @@ "protons": "^7.0.2", "sinon": "^15.1.2", "sinon-ts": "^1.0.0" + }, + "browser": { + "./dist/src/webrtc/index.js": "./dist/src/webrtc/index.browser.js" } } diff --git a/packages/transport-webrtc/src/private-to-private/handler.ts b/packages/transport-webrtc/src/private-to-private/handler.ts index 2b0e09e19d..0f4f055b33 100644 --- a/packages/transport-webrtc/src/private-to-private/handler.ts +++ b/packages/transport-webrtc/src/private-to-private/handler.ts @@ -1,8 +1,10 @@ +import { CodeError } from '@libp2p/interface/errors' import { logger } from '@libp2p/logger' import { abortableDuplex } from 'abortable-iterator' import { pbStream } from 'it-protobuf-stream' import pDefer, { type DeferredPromise } from 'p-defer' import { DataChannelMuxerFactory } from '../muxer.js' +import { RTCPeerConnection, RTCSessionDescription } from '../webrtc/index.js' import { Message } from './pb/message.js' import { readCandidatesUntilConnected, resolveOnConnected } from './util.js' import type { DataChannelOpts } from '../stream.js' @@ -20,66 +22,75 @@ export async function handleIncomingStream ({ rtcConfiguration, dataChannelOptio const signal = AbortSignal.timeout(DEFAULT_TIMEOUT) const stream = pbStream(abortableDuplex(rawStream, signal)).pb(Message) const pc = new RTCPeerConnection(rtcConfiguration) - const muxerFactory = new DataChannelMuxerFactory({ peerConnection: pc, dataChannelOptions }) - const connectedPromise: DeferredPromise = pDefer() - const answerSentPromise: DeferredPromise = pDefer() - - signal.onabort = () => { connectedPromise.reject() } - // candidate callbacks - pc.onicecandidate = ({ candidate }) => { - answerSentPromise.promise.then( - async () => { - await stream.write({ - type: Message.Type.ICE_CANDIDATE, - data: (candidate != null) ? JSON.stringify(candidate.toJSON()) : '' - }) - }, - (err) => { - log.error('cannot set candidate since sending answer failed', err) - } - ) - } - resolveOnConnected(pc, connectedPromise) + try { + const muxerFactory = new DataChannelMuxerFactory({ peerConnection: pc, dataChannelOptions }) + const connectedPromise: DeferredPromise = pDefer() + const answerSentPromise: DeferredPromise = pDefer() + + signal.onabort = () => { + connectedPromise.reject(new CodeError('Timed out while trying to connect', 'ERR_TIMEOUT')) + } + // candidate callbacks + pc.onicecandidate = ({ candidate }) => { + answerSentPromise.promise.then( + async () => { + await stream.write({ + type: Message.Type.ICE_CANDIDATE, + data: (candidate != null) ? JSON.stringify(candidate.toJSON()) : '' + }) + }, + (err) => { + log.error('cannot set candidate since sending answer failed', err) + connectedPromise.reject(err) + } + ) + } + + resolveOnConnected(pc, connectedPromise) + + // read an SDP offer + const pbOffer = await stream.read() + if (pbOffer.type !== Message.Type.SDP_OFFER) { + throw new Error(`expected message type SDP_OFFER, received: ${pbOffer.type ?? 'undefined'} `) + } + const offer = new RTCSessionDescription({ + type: 'offer', + sdp: pbOffer.data + }) + + await pc.setRemoteDescription(offer).catch(err => { + log.error('could not execute setRemoteDescription', err) + throw new Error('Failed to set remoteDescription') + }) + + // create and write an SDP answer + const answer = await pc.createAnswer().catch(err => { + log.error('could not execute createAnswer', err) + answerSentPromise.reject(err) + throw new Error('Failed to create answer') + }) + // write the answer to the remote + await stream.write({ type: Message.Type.SDP_ANSWER, data: answer.sdp }) + + await pc.setLocalDescription(answer).catch(err => { + log.error('could not execute setLocalDescription', err) + answerSentPromise.reject(err) + throw new Error('Failed to set localDescription') + }) + + answerSentPromise.resolve() + + // wait until candidates are connected + await readCandidatesUntilConnected(connectedPromise, pc, stream) - // read an SDP offer - const pbOffer = await stream.read() - if (pbOffer.type !== Message.Type.SDP_OFFER) { - throw new Error(`expected message type SDP_OFFER, received: ${pbOffer.type ?? 'undefined'} `) + const remoteAddress = parseRemoteAddress(pc.currentRemoteDescription?.sdp ?? '') + + return { pc, muxerFactory, remoteAddress } + } catch (err) { + pc.close() + throw err } - const offer = new RTCSessionDescription({ - type: 'offer', - sdp: pbOffer.data - }) - - await pc.setRemoteDescription(offer).catch(err => { - log.error('could not execute setRemoteDescription', err) - throw new Error('Failed to set remoteDescription') - }) - - // create and write an SDP answer - const answer = await pc.createAnswer().catch(err => { - log.error('could not execute createAnswer', err) - answerSentPromise.reject(err) - throw new Error('Failed to create answer') - }) - // write the answer to the remote - await stream.write({ type: Message.Type.SDP_ANSWER, data: answer.sdp }) - - await pc.setLocalDescription(answer).catch(err => { - log.error('could not execute setLocalDescription', err) - answerSentPromise.reject(err) - throw new Error('Failed to set localDescription') - }) - - answerSentPromise.resolve() - - // wait until candidates are connected - await readCandidatesUntilConnected(connectedPromise, pc, stream) - - const remoteAddress = parseRemoteAddress(pc.currentRemoteDescription?.sdp ?? '') - - return { pc, muxerFactory, remoteAddress } } export interface ConnectOptions { @@ -93,56 +104,63 @@ export async function initiateConnection ({ rtcConfiguration, dataChannelOptions const stream = pbStream(abortableDuplex(rawStream, signal)).pb(Message) // setup peer connection const pc = new RTCPeerConnection(rtcConfiguration) - const muxerFactory = new DataChannelMuxerFactory({ peerConnection: pc, dataChannelOptions }) - - const connectedPromise: DeferredPromise = pDefer() - resolveOnConnected(pc, connectedPromise) - - // reject the connectedPromise if the signal aborts - signal.onabort = connectedPromise.reject - // we create the channel so that the peerconnection has a component for which - // to collect candidates. The label is not relevant to connection initiation - // but can be useful for debugging - const channel = pc.createDataChannel('init') - // setup callback to write ICE candidates to the remote - // peer - pc.onicecandidate = ({ candidate }) => { - void stream.write({ - type: Message.Type.ICE_CANDIDATE, - data: (candidate != null) ? JSON.stringify(candidate.toJSON()) : '' - }) - .catch(err => { - log.error('error sending ICE candidate', err) + + try { + const muxerFactory = new DataChannelMuxerFactory({ peerConnection: pc, dataChannelOptions }) + + const connectedPromise: DeferredPromise = pDefer() + resolveOnConnected(pc, connectedPromise) + + // reject the connectedPromise if the signal aborts + signal.onabort = connectedPromise.reject + // we create the channel so that the peerconnection has a component for which + // to collect candidates. The label is not relevant to connection initiation + // but can be useful for debugging + const channel = pc.createDataChannel('init') + // setup callback to write ICE candidates to the remote + // peer + pc.onicecandidate = ({ candidate }) => { + void stream.write({ + type: Message.Type.ICE_CANDIDATE, + data: (candidate != null) ? JSON.stringify(candidate.toJSON()) : '' }) - } - // create an offer - const offerSdp = await pc.createOffer() - // write the offer to the stream - await stream.write({ type: Message.Type.SDP_OFFER, data: offerSdp.sdp }) - // set offer as local description - await pc.setLocalDescription(offerSdp).catch(err => { - log.error('could not execute setLocalDescription', err) - throw new Error('Failed to set localDescription') - }) - - // read answer - const answerMessage = await stream.read() - if (answerMessage.type !== Message.Type.SDP_ANSWER) { - throw new Error('remote should send an SDP answer') - } + .catch(err => { + log.error('error sending ICE candidate', err) + }) + } + + // create an offer + const offerSdp = await pc.createOffer() + // write the offer to the stream + await stream.write({ type: Message.Type.SDP_OFFER, data: offerSdp.sdp }) + // set offer as local description + await pc.setLocalDescription(offerSdp).catch(err => { + log.error('could not execute setLocalDescription', err) + throw new Error('Failed to set localDescription') + }) - const answerSdp = new RTCSessionDescription({ type: 'answer', sdp: answerMessage.data }) - await pc.setRemoteDescription(answerSdp).catch(err => { - log.error('could not execute setRemoteDescription', err) - throw new Error('Failed to set remoteDescription') - }) + // read answer + const answerMessage = await stream.read() + if (answerMessage.type !== Message.Type.SDP_ANSWER) { + throw new Error('remote should send an SDP answer') + } - await readCandidatesUntilConnected(connectedPromise, pc, stream) - channel.close() + const answerSdp = new RTCSessionDescription({ type: 'answer', sdp: answerMessage.data }) + await pc.setRemoteDescription(answerSdp).catch(err => { + log.error('could not execute setRemoteDescription', err) + throw new Error('Failed to set remoteDescription') + }) - const remoteAddress = parseRemoteAddress(pc.currentRemoteDescription?.sdp ?? '') + await readCandidatesUntilConnected(connectedPromise, pc, stream) + channel.close() - return { pc, muxerFactory, remoteAddress } + const remoteAddress = parseRemoteAddress(pc.currentRemoteDescription?.sdp ?? '') + + return { pc, muxerFactory, remoteAddress } + } catch (err) { + pc.close() + throw err + } } function parseRemoteAddress (sdp: string): string { diff --git a/packages/transport-webrtc/src/private-to-private/transport.ts b/packages/transport-webrtc/src/private-to-private/transport.ts index e9d2f83bdb..7f93ac13e8 100644 --- a/packages/transport-webrtc/src/private-to-private/transport.ts +++ b/packages/transport-webrtc/src/private-to-private/transport.ts @@ -5,6 +5,7 @@ import { peerIdFromString } from '@libp2p/peer-id' import { multiaddr, type Multiaddr, protocols } from '@multiformats/multiaddr' import { codes } from '../error.js' import { WebRTCMultiaddrConnection } from '../maconn.js' +import { cleanup } from '../webrtc/index.js' import { initiateConnection, handleIncomingStream } from './handler.js' import { WebRTCPeerListener } from './listener.js' import type { DataChannelOpts } from '../stream.js' @@ -57,6 +58,7 @@ export class WebRTCTransport implements Transport, Startable { async stop (): Promise { await this.components.registrar.unhandle(SIGNALING_PROTO_ID) + cleanup() this._started = false } diff --git a/packages/transport-webrtc/src/private-to-private/util.ts b/packages/transport-webrtc/src/private-to-private/util.ts index e1b669778c..6d2b97898d 100644 --- a/packages/transport-webrtc/src/private-to-private/util.ts +++ b/packages/transport-webrtc/src/private-to-private/util.ts @@ -1,5 +1,6 @@ import { logger } from '@libp2p/logger' import { isFirefox } from '../util.js' +import { RTCIceCandidate } from '../webrtc/index.js' import { Message } from './pb/message.js' import type { DeferredPromise } from 'p-defer' diff --git a/packages/transport-webrtc/src/private-to-public/transport.ts b/packages/transport-webrtc/src/private-to-public/transport.ts index c65f823387..f824197378 100644 --- a/packages/transport-webrtc/src/private-to-public/transport.ts +++ b/packages/transport-webrtc/src/private-to-public/transport.ts @@ -11,6 +11,7 @@ import { WebRTCMultiaddrConnection } from '../maconn.js' import { DataChannelMuxerFactory } from '../muxer.js' import { createStream } from '../stream.js' import { isFirefox } from '../util.js' +import { RTCPeerConnection } from '../webrtc/index.js' import * as sdp from './sdp.js' import { genUfrag } from './util.js' import type { WebRTCDialOptions } from './options.js' @@ -134,116 +135,121 @@ export class WebRTCDirectTransport implements Transport { const peerConnection = new RTCPeerConnection({ certificates: [certificate] }) - // create data channel for running the noise handshake. Once the data channel is opened, - // the remote will initiate the noise handshake. This is used to confirm the identity of - // the peer. - const dataChannelOpenPromise = new Promise((resolve, reject) => { - const handshakeDataChannel = peerConnection.createDataChannel('', { negotiated: true, id: 0 }) - const handshakeTimeout = setTimeout(() => { - const error = `Data channel was never opened: state: ${handshakeDataChannel.readyState}` - log.error(error) - this.metrics?.dialerEvents.increment({ open_error: true }) - reject(dataChannelError('data', error)) - }, HANDSHAKE_TIMEOUT_MS) - - handshakeDataChannel.onopen = (_) => { - clearTimeout(handshakeTimeout) - resolve(handshakeDataChannel) - } - - // ref: https://developer.mozilla.org/en-US/docs/Web/API/RTCDataChannel/error_event - handshakeDataChannel.onerror = (event: Event) => { - clearTimeout(handshakeTimeout) - const errorTarget = event.target?.toString() ?? 'not specified' - const error = `Error opening a data channel for handshaking: ${errorTarget}` - log.error(error) - // NOTE: We use unknown error here but this could potentially be considered a reset by some standards. - this.metrics?.dialerEvents.increment({ unknown_error: true }) - reject(dataChannelError('data', error)) - } - }) - - const ufrag = 'libp2p+webrtc+v1/' + genUfrag(32) - - // Create offer and munge sdp with ufrag == pwd. This allows the remote to - // respond to STUN messages without performing an actual SDP exchange. - // This is because it can infer the passwd field by reading the USERNAME - // attribute of the STUN message. - const offerSdp = await peerConnection.createOffer() - const mungedOfferSdp = sdp.munge(offerSdp, ufrag) - await peerConnection.setLocalDescription(mungedOfferSdp) - - // construct answer sdp from multiaddr and ufrag - const answerSdp = sdp.fromMultiAddr(ma, ufrag) - await peerConnection.setRemoteDescription(answerSdp) - - // wait for peerconnection.onopen to fire, or for the datachannel to open - const handshakeDataChannel = await dataChannelOpenPromise - - const myPeerId = this.components.peerId - - // Do noise handshake. - // Set the Noise Prologue to libp2p-webrtc-noise: before starting the actual Noise handshake. - // is the concatenation of the of the two TLS fingerprints of A and B in their multihash byte representation, sorted in ascending order. - const fingerprintsPrologue = this.generateNoisePrologue(peerConnection, remoteCerthash.code, ma) - - // Since we use the default crypto interface and do not use a static key or early data, - // we pass in undefined for these parameters. - const noise = Noise({ prologueBytes: fingerprintsPrologue })() - - const wrappedChannel = createStream({ channel: handshakeDataChannel, direction: 'inbound', dataChannelOptions: this.init.dataChannel }) - const wrappedDuplex = { - ...wrappedChannel, - sink: wrappedChannel.sink.bind(wrappedChannel), - source: (async function * () { - for await (const list of wrappedChannel.source) { - for (const buf of list) { - yield buf - } + try { + // create data channel for running the noise handshake. Once the data channel is opened, + // the remote will initiate the noise handshake. This is used to confirm the identity of + // the peer. + const dataChannelOpenPromise = new Promise((resolve, reject) => { + const handshakeDataChannel = peerConnection.createDataChannel('', { negotiated: true, id: 0 }) + const handshakeTimeout = setTimeout(() => { + const error = `Data channel was never opened: state: ${handshakeDataChannel.readyState}` + log.error(error) + this.metrics?.dialerEvents.increment({ open_error: true }) + reject(dataChannelError('data', error)) + }, HANDSHAKE_TIMEOUT_MS) + + handshakeDataChannel.onopen = (_) => { + clearTimeout(handshakeTimeout) + resolve(handshakeDataChannel) } - }()) - } - // Creating the connection before completion of the noise - // handshake ensures that the stream opening callback is set up - const maConn = new WebRTCMultiaddrConnection({ - peerConnection, - remoteAddr: ma, - timeline: { - open: Date.now() - }, - metrics: this.metrics?.dialerEvents - }) - - const eventListeningName = isFirefox ? 'iceconnectionstatechange' : 'connectionstatechange' - - peerConnection.addEventListener(eventListeningName, () => { - switch (peerConnection.connectionState) { - case 'failed': - case 'disconnected': - case 'closed': - maConn.close().catch((err) => { - log.error('error closing connection', err) - }).finally(() => { - // Remove the event listener once the connection is closed - controller.abort() - }) - break - default: - break + // ref: https://developer.mozilla.org/en-US/docs/Web/API/RTCDataChannel/error_event + handshakeDataChannel.onerror = (event: Event) => { + clearTimeout(handshakeTimeout) + const errorTarget = event.target?.toString() ?? 'not specified' + const error = `Error opening a data channel for handshaking: ${errorTarget}` + log.error(error) + // NOTE: We use unknown error here but this could potentially be considered a reset by some standards. + this.metrics?.dialerEvents.increment({ unknown_error: true }) + reject(dataChannelError('data', error)) + } + }) + + const ufrag = 'libp2p+webrtc+v1/' + genUfrag(32) + + // Create offer and munge sdp with ufrag == pwd. This allows the remote to + // respond to STUN messages without performing an actual SDP exchange. + // This is because it can infer the passwd field by reading the USERNAME + // attribute of the STUN message. + const offerSdp = await peerConnection.createOffer() + const mungedOfferSdp = sdp.munge(offerSdp, ufrag) + await peerConnection.setLocalDescription(mungedOfferSdp) + + // construct answer sdp from multiaddr and ufrag + const answerSdp = sdp.fromMultiAddr(ma, ufrag) + await peerConnection.setRemoteDescription(answerSdp) + + // wait for peerconnection.onopen to fire, or for the datachannel to open + const handshakeDataChannel = await dataChannelOpenPromise + + const myPeerId = this.components.peerId + + // Do noise handshake. + // Set the Noise Prologue to libp2p-webrtc-noise: before starting the actual Noise handshake. + // is the concatenation of the of the two TLS fingerprints of A and B in their multihash byte representation, sorted in ascending order. + const fingerprintsPrologue = this.generateNoisePrologue(peerConnection, remoteCerthash.code, ma) + + // Since we use the default crypto interface and do not use a static key or early data, + // we pass in undefined for these parameters. + const noise = Noise({ prologueBytes: fingerprintsPrologue })() + + const wrappedChannel = createStream({ channel: handshakeDataChannel, direction: 'inbound', dataChannelOptions: this.init.dataChannel }) + const wrappedDuplex = { + ...wrappedChannel, + sink: wrappedChannel.sink.bind(wrappedChannel), + source: (async function * () { + for await (const list of wrappedChannel.source) { + for (const buf of list) { + yield buf + } + } + }()) } - }, { signal }) - // Track opened peer connection - this.metrics?.dialerEvents.increment({ peer_connection: true }) + // Creating the connection before completion of the noise + // handshake ensures that the stream opening callback is set up + const maConn = new WebRTCMultiaddrConnection({ + peerConnection, + remoteAddr: ma, + timeline: { + open: Date.now() + }, + metrics: this.metrics?.dialerEvents + }) + + const eventListeningName = isFirefox ? 'iceconnectionstatechange' : 'connectionstatechange' + + peerConnection.addEventListener(eventListeningName, () => { + switch (peerConnection.connectionState) { + case 'failed': + case 'disconnected': + case 'closed': + maConn.close().catch((err) => { + log.error('error closing connection', err) + }).finally(() => { + // Remove the event listener once the connection is closed + controller.abort() + }) + break + default: + break + } + }, { signal }) - const muxerFactory = new DataChannelMuxerFactory({ peerConnection, metrics: this.metrics?.dialerEvents, dataChannelOptions: this.init.dataChannel }) + // Track opened peer connection + this.metrics?.dialerEvents.increment({ peer_connection: true }) - // For outbound connections, the remote is expected to start the noise handshake. - // Therefore, we need to secure an inbound noise connection from the remote. - await noise.secureInbound(myPeerId, wrappedDuplex, theirPeerId) + const muxerFactory = new DataChannelMuxerFactory({ peerConnection, metrics: this.metrics?.dialerEvents, dataChannelOptions: this.init.dataChannel }) - return options.upgrader.upgradeOutbound(maConn, { skipProtection: true, skipEncryption: true, muxerFactory }) + // For outbound connections, the remote is expected to start the noise handshake. + // Therefore, we need to secure an inbound noise connection from the remote. + await noise.secureInbound(myPeerId, wrappedDuplex, theirPeerId) + + return await options.upgrader.upgradeOutbound(maConn, { skipProtection: true, skipEncryption: true, muxerFactory }) + } catch (err) { + peerConnection.close() + throw err + } } /** diff --git a/packages/transport-webrtc/src/webrtc/index.browser.ts b/packages/transport-webrtc/src/webrtc/index.browser.ts new file mode 100644 index 0000000000..4e746e6f9a --- /dev/null +++ b/packages/transport-webrtc/src/webrtc/index.browser.ts @@ -0,0 +1,4 @@ +export const RTCPeerConnection = globalThis.RTCPeerConnection +export const RTCSessionDescription = globalThis.RTCSessionDescription +export const RTCIceCandidate = globalThis.RTCIceCandidate +export function cleanup (): void {} diff --git a/packages/transport-webrtc/src/webrtc/index.ts b/packages/transport-webrtc/src/webrtc/index.ts new file mode 100644 index 0000000000..6540ba340b --- /dev/null +++ b/packages/transport-webrtc/src/webrtc/index.ts @@ -0,0 +1,12 @@ +import node from 'node-datachannel' +import { IceCandidate } from './rtc-ice-candidate.js' +import { PeerConnection } from './rtc-peer-connection.js' +import { SessionDescription } from './rtc-session-description.js' + +export { SessionDescription as RTCSessionDescription } +export { IceCandidate as RTCIceCandidate } +export { PeerConnection as RTCPeerConnection } + +export function cleanup (): void { + node.cleanup() +} diff --git a/packages/transport-webrtc/src/webrtc/rtc-data-channel.ts b/packages/transport-webrtc/src/webrtc/rtc-data-channel.ts new file mode 100644 index 0000000000..b5129abe54 --- /dev/null +++ b/packages/transport-webrtc/src/webrtc/rtc-data-channel.ts @@ -0,0 +1,140 @@ +import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string' +import type node from 'node-datachannel' + +export class DataChannel extends EventTarget implements RTCDataChannel { + binaryType: BinaryType + + readonly maxPacketLifeTime: number | null + readonly maxRetransmits: number | null + readonly negotiated: boolean + readonly ordered: boolean + + onbufferedamountlow: ((this: RTCDataChannel, ev: Event) => any) | null + onclose: ((this: RTCDataChannel, ev: Event) => any) | null + onclosing: ((this: RTCDataChannel, ev: Event) => any) | null + onerror: ((this: RTCDataChannel, ev: Event) => any) | null + onmessage: ((this: RTCDataChannel, ev: MessageEvent) => any) | null + onopen: ((this: RTCDataChannel, ev: Event) => any) | null + + #dataChannel: node.DataChannel + #bufferedAmountLowThreshold: number + #readyState: RTCDataChannelState + + constructor (dataChannel: node.DataChannel, dataChannelDict: RTCDataChannelInit = {}) { + super() + + this.#dataChannel = dataChannel + this.#readyState = 'connecting' + this.#bufferedAmountLowThreshold = 0 + + this.binaryType = 'arraybuffer' + + this.#dataChannel.onOpen(() => { + this.#readyState = 'open' + this.dispatchEvent(new Event('open')) + }) + this.#dataChannel.onClosed(() => { + this.#readyState = 'closed' + this.dispatchEvent(new Event('close')) + }) + this.#dataChannel.onError((msg) => { + this.#readyState = 'closed' + this.dispatchEvent(new RTCErrorEvent('error', { + error: new RTCError({ + errorDetail: 'data-channel-failure' + }, msg) + })) + }) + this.#dataChannel.onBufferedAmountLow(() => { + this.dispatchEvent(new Event('bufferedamountlow')) + }) + this.#dataChannel.onMessage((data: string | Uint8Array) => { + if (typeof data === 'string') { + data = uint8ArrayFromString(data) + } + + this.dispatchEvent(new MessageEvent('message', { data })) + }) + + // forward events to properties + this.addEventListener('message', event => { + this.onmessage?.(event as MessageEvent) + }) + this.addEventListener('bufferedamountlow', event => { + this.onbufferedamountlow?.(event) + }) + this.addEventListener('error', event => { + this.onerror?.(event) + }) + this.addEventListener('close', event => { + this.onclose?.(event) + }) + this.addEventListener('closing', event => { + this.onclosing?.(event) + }) + this.addEventListener('open', event => { + this.onopen?.(event) + }) + + this.onbufferedamountlow = null + this.onclose = null + this.onclosing = null + this.onerror = null + this.onmessage = null + this.onopen = null + + this.maxPacketLifeTime = dataChannelDict.maxPacketLifeTime ?? null + this.maxRetransmits = dataChannelDict.maxRetransmits ?? null + this.negotiated = dataChannelDict.negotiated ?? false + this.ordered = dataChannelDict.ordered ?? true + } + + get id (): number { + return this.#dataChannel.getId() + } + + get label (): string { + return this.#dataChannel.getLabel() + } + + get protocol (): string { + return this.#dataChannel.getProtocol() + } + + get bufferedAmount (): number { + return this.#dataChannel.bufferedAmount() + } + + set bufferedAmountLowThreshold (threshold: number) { + this.#bufferedAmountLowThreshold = threshold + this.#dataChannel.setBufferedAmountLowThreshold(threshold) + } + + get bufferedAmountLowThreshold (): number { + return this.#bufferedAmountLowThreshold + } + + get readyState (): RTCDataChannelState { + return this.#readyState + } + + close (): void { + this.#readyState = 'closing' + this.dispatchEvent(new Event('closing')) + + this.#dataChannel.close() + } + + send (data: string): void + send (data: Blob): void + send (data: ArrayBuffer): void + send (data: ArrayBufferView): void + send (data: any): void { + // TODO: sending Blobs + if (typeof data === 'string') { + this.#dataChannel.sendMessage(data) + } else { + this.#dataChannel.sendMessageBinary(data) + } + } +} diff --git a/packages/transport-webrtc/src/webrtc/rtc-events.ts b/packages/transport-webrtc/src/webrtc/rtc-events.ts new file mode 100644 index 0000000000..b7a8772139 --- /dev/null +++ b/packages/transport-webrtc/src/webrtc/rtc-events.ts @@ -0,0 +1,19 @@ +export class PeerConnectionIceEvent extends Event implements RTCPeerConnectionIceEvent { + readonly candidate: RTCIceCandidate | null + + constructor (candidate: RTCIceCandidate) { + super('icecandidate') + + this.candidate = candidate + } +} + +export class DataChannelEvent extends Event implements RTCDataChannelEvent { + readonly channel: RTCDataChannel + + constructor (channel: RTCDataChannel) { + super('datachannel') + + this.channel = channel + } +} diff --git a/packages/transport-webrtc/src/webrtc/rtc-ice-candidate.ts b/packages/transport-webrtc/src/webrtc/rtc-ice-candidate.ts new file mode 100644 index 0000000000..ea02ec99e1 --- /dev/null +++ b/packages/transport-webrtc/src/webrtc/rtc-ice-candidate.ts @@ -0,0 +1,50 @@ +/** + * @see https://developer.mozilla.org/docs/Web/API/RTCIceCandidate + */ +export class IceCandidate implements RTCIceCandidate { + readonly address: string | null + readonly candidate: string + readonly component: RTCIceComponent | null + readonly foundation: string | null + readonly port: number | null + readonly priority: number | null + readonly protocol: RTCIceProtocol | null + readonly relatedAddress: string | null + readonly relatedPort: number | null + readonly sdpMLineIndex: number | null + readonly sdpMid: string | null + readonly tcpType: RTCIceTcpCandidateType | null + readonly type: RTCIceCandidateType | null + readonly usernameFragment: string | null + + constructor (init: RTCIceCandidateInit) { + if (init.candidate == null) { + throw new DOMException('candidate must be specified') + } + + this.candidate = init.candidate + this.sdpMLineIndex = init.sdpMLineIndex ?? null + this.sdpMid = init.sdpMid ?? null + this.usernameFragment = init.usernameFragment ?? null + + this.address = null + this.component = null + this.foundation = null + this.port = null + this.priority = null + this.protocol = null + this.relatedAddress = null + this.relatedPort = null + this.tcpType = null + this.type = null + } + + toJSON (): RTCIceCandidateInit { + return { + candidate: this.candidate, + sdpMLineIndex: this.sdpMLineIndex, + sdpMid: this.sdpMid, + usernameFragment: this.usernameFragment + } + } +} diff --git a/packages/transport-webrtc/src/webrtc/rtc-peer-connection.ts b/packages/transport-webrtc/src/webrtc/rtc-peer-connection.ts new file mode 100644 index 0000000000..7b2b5c6446 --- /dev/null +++ b/packages/transport-webrtc/src/webrtc/rtc-peer-connection.ts @@ -0,0 +1,306 @@ +import node from 'node-datachannel' +import defer, { type DeferredPromise } from 'p-defer' +import { DataChannel } from './rtc-data-channel.js' +import { DataChannelEvent, PeerConnectionIceEvent } from './rtc-events.js' +import { IceCandidate } from './rtc-ice-candidate.js' +import { SessionDescription } from './rtc-session-description.js' + +export class PeerConnection extends EventTarget implements RTCPeerConnection { + static async generateCertificate (keygenAlgorithm: AlgorithmIdentifier): Promise { + throw new Error('Not implemented') + } + + canTrickleIceCandidates: boolean | null + sctp: RTCSctpTransport | null + + onconnectionstatechange: ((this: RTCPeerConnection, ev: Event) => any) | null + ondatachannel: ((this: RTCPeerConnection, ev: RTCDataChannelEvent) => any) | null + onicecandidate: ((this: RTCPeerConnection, ev: RTCPeerConnectionIceEvent) => any) | null + onicecandidateerror: ((this: RTCPeerConnection, ev: Event) => any) | null + oniceconnectionstatechange: ((this: RTCPeerConnection, ev: Event) => any) | null + onicegatheringstatechange: ((this: RTCPeerConnection, ev: Event) => any) | null + onnegotiationneeded: ((this: RTCPeerConnection, ev: Event) => any) | null + onsignalingstatechange: ((this: RTCPeerConnection, ev: Event) => any) | null + ontrack: ((this: RTCPeerConnection, ev: RTCTrackEvent) => any) | null + + #peerConnection: node.PeerConnection + #config: RTCConfiguration + #localOffer: DeferredPromise + #localAnswer: DeferredPromise + #dataChannels: Set + + constructor (init: RTCConfiguration = {}) { + super() + + this.#config = init + this.#localOffer = defer() + this.#localAnswer = defer() + this.#dataChannels = new Set() + + const iceServers = init.iceServers ?? [] + + this.#peerConnection = new node.PeerConnection(`peer-${Math.random()}`, { + iceServers: iceServers.map(server => { + const urls = (Array.isArray(server.urls) ? server.urls : [server.urls]).map(str => new URL(str)) + + return urls.map(url => { + /** @type {import('../lib/index.js').IceServer} */ + const iceServer = { + hostname: url.hostname, + port: parseInt(url.port, 10), + username: server.username, + password: server.credential + // relayType - how to specify? + } + + return iceServer + }) + }) + .flat(), + iceTransportPolicy: init?.iceTransportPolicy + }) + + this.#peerConnection.onStateChange(() => { + this.dispatchEvent(new Event('connectionstatechange')) + }) + // https://github.com/murat-dogan/node-datachannel/pull/171 + // this.#peerConnection.onSignalingStateChange(() => { + // this.dispatchEvent(new Event('signalingstatechange')) + // }) + this.#peerConnection.onGatheringStateChange(() => { + this.dispatchEvent(new Event('icegatheringstatechange')) + }) + this.#peerConnection.onDataChannel(channel => { + this.dispatchEvent(new DataChannelEvent(new DataChannel(channel))) + }) + + // forward events to properties + this.addEventListener('connectionstatechange', event => { + this.onconnectionstatechange?.(event) + }) + this.addEventListener('signalingstatechange', event => { + this.onsignalingstatechange?.(event) + }) + this.addEventListener('icegatheringstatechange', event => { + this.onicegatheringstatechange?.(event) + }) + this.addEventListener('datachannel', event => { + this.ondatachannel?.(event as RTCDataChannelEvent) + }) + + this.#peerConnection.onLocalDescription((sdp, type) => { + if (type === 'offer') { + this.#localOffer.resolve({ + sdp, + type + }) + } + + if (type === 'answer') { + this.#localAnswer.resolve({ + sdp, + type + }) + } + }) + + this.#peerConnection.onLocalCandidate((candidate, mid) => { + if (mid === 'unspec') { + this.#localAnswer.reject(new Error(`Invalid description type ${mid}`)) + return + } + + const event = new PeerConnectionIceEvent(new IceCandidate({ candidate })) + + this.onicecandidate?.(event) + }) + + this.canTrickleIceCandidates = null + this.sctp = null + this.onconnectionstatechange = null + this.ondatachannel = null + this.onicecandidate = null + this.onicecandidateerror = null + this.oniceconnectionstatechange = null + this.onicegatheringstatechange = null + this.onnegotiationneeded = null + this.onsignalingstatechange = null + this.ontrack = null + } + + get connectionState (): RTCPeerConnectionState { + return assertState(this.#peerConnection.state(), RTCPeerConnectionStates) + } + + get iceConnectionState (): RTCIceConnectionState { + return assertState(this.#peerConnection.state(), RTCIceConnectionStates) + } + + get iceGatheringState (): RTCIceGatheringState { + return assertState(this.#peerConnection.gatheringState(), RTCIceGatheringStates) + } + + get signalingState (): RTCSignalingState { + return assertState(this.#peerConnection.signalingState(), RTCSignalingStates) + } + + get currentLocalDescription (): RTCSessionDescription | null { + return toSessionDescription(this.#peerConnection.localDescription()) + } + + get localDescription (): RTCSessionDescription | null { + return toSessionDescription(this.#peerConnection.localDescription()) + } + + get pendingLocalDescription (): RTCSessionDescription | null { + return toSessionDescription(this.#peerConnection.localDescription()) + } + + get currentRemoteDescription (): RTCSessionDescription | null { + // not exposed by node-datachannel + return toSessionDescription(null) + } + + get pendingRemoteDescription (): RTCSessionDescription | null { + // not exposed by node-datachannel + return toSessionDescription(null) + } + + get remoteDescription (): RTCSessionDescription | null { + // not exposed by node-datachannel + return toSessionDescription(null) + } + + async addIceCandidate (candidate?: RTCIceCandidateInit): Promise { + if (candidate == null || candidate.candidate == null) { + throw new Error('Candidate invalid') + } + + this.#peerConnection.addRemoteCandidate(candidate.candidate, candidate.sdpMid ?? '0') + } + + addTrack (track: MediaStreamTrack, ...streams: MediaStream[]): RTCRtpSender { + throw new Error('Not implemented') + } + + addTransceiver (trackOrKind: MediaStreamTrack | string, init?: RTCRtpTransceiverInit): RTCRtpTransceiver { + throw new Error('Not implemented') + } + + close (): void { + // close all channels before shutting down + this.#dataChannels.forEach(channel => { + channel.close() + }) + + this.#peerConnection.close() + this.#peerConnection.destroy() + } + + createDataChannel (label: string, dataChannelDict: RTCDataChannelInit = {}): RTCDataChannel { + const channel = this.#peerConnection.createDataChannel(label, dataChannelDict) + const dataChannel = new DataChannel(channel, dataChannelDict) + + // ensure we can close all channels when shutting down + this.#dataChannels.add(dataChannel) + dataChannel.addEventListener('close', () => { + this.#dataChannels.delete(dataChannel) + }) + + return dataChannel + } + + async createOffer (options?: RTCOfferOptions): Promise + async createOffer (successCallback: RTCSessionDescriptionCallback, failureCallback: RTCPeerConnectionErrorCallback, options?: RTCOfferOptions): Promise + async createOffer (...args: any[]): Promise { + return this.#localOffer.promise + } + + async createAnswer (options?: RTCAnswerOptions): Promise + async createAnswer (successCallback: RTCSessionDescriptionCallback, failureCallback: RTCPeerConnectionErrorCallback): Promise + async createAnswer (...args: any[]): Promise { + return this.#localAnswer.promise + } + + getConfiguration (): RTCConfiguration { + return this.#config + } + + getReceivers (): RTCRtpReceiver[] { + throw new Error('Not implemented') + } + + getSenders (): RTCRtpSender[] { + throw new Error('Not implemented') + } + + async getStats (selector?: MediaStreamTrack | null): Promise { + throw new Error('Not implemented') + } + + getTransceivers (): RTCRtpTransceiver[] { + throw new Error('Not implemented') + } + + removeTrack (sender: RTCRtpSender): void { + throw new Error('Not implemented') + } + + restartIce (): void { + throw new Error('Not implemented') + } + + setConfiguration (configuration: RTCConfiguration = {}): void { + this.#config = configuration + } + + async setLocalDescription (description?: RTCLocalSessionDescriptionInit): Promise { + if (description == null || description.type == null) { + throw new Error('Local description type must be set') + } + + if (description.type !== 'offer') { + // any other type causes libdatachannel to throw + return + } + + // @ts-expect-error types are wrong + this.#peerConnection.setLocalDescription(description.type) + } + + async setRemoteDescription (description: RTCSessionDescriptionInit): Promise { + if (description.sdp == null) { + throw new Error('Remote SDP must be set') + } + + // @ts-expect-error types are wrong + this.#peerConnection.setRemoteDescription(description.sdp, description.type) + } +} + +export { PeerConnection as RTCPeerConnection } + +function assertState (state: any, states: T[]): T { + if (state != null && !states.includes(state)) { + throw new Error(`Invalid value encountered - "${state}" must be one of ${states}`) + } + + return state as T +} + +function toSessionDescription (description: { sdp?: string, type: string } | null): RTCSessionDescription | null { + if (description == null) { + return null + } + + return new SessionDescription({ + sdp: description.sdp, + type: assertState(description.type, RTCSdpTypes) + }) +} + +const RTCPeerConnectionStates: RTCPeerConnectionState[] = ['closed', 'connected', 'connecting', 'disconnected', 'failed', 'new'] +const RTCSdpTypes: RTCSdpType[] = ['answer', 'offer', 'pranswer', 'rollback'] +const RTCIceConnectionStates: RTCIceConnectionState[] = ['checking', 'closed', 'completed', 'connected', 'disconnected', 'failed', 'new'] +const RTCIceGatheringStates: RTCIceGatheringState[] = ['complete', 'gathering', 'new'] +const RTCSignalingStates: RTCSignalingState[] = ['closed', 'have-local-offer', 'have-local-pranswer', 'have-remote-offer', 'have-remote-pranswer', 'stable'] diff --git a/packages/transport-webrtc/src/webrtc/rtc-session-description.ts b/packages/transport-webrtc/src/webrtc/rtc-session-description.ts new file mode 100644 index 0000000000..ae498cbdaf --- /dev/null +++ b/packages/transport-webrtc/src/webrtc/rtc-session-description.ts @@ -0,0 +1,19 @@ +/** + * @see https://developer.mozilla.org/docs/Web/API/RTCSessionDescription + */ +export class SessionDescription implements RTCSessionDescription { + readonly sdp: string + readonly type: RTCSdpType + + constructor (init: RTCSessionDescriptionInit) { + this.sdp = init.sdp ?? '' + this.type = init.type + } + + toJSON (): RTCSessionDescriptionInit { + return { + sdp: this.sdp, + type: this.type + } + } +} diff --git a/packages/transport-webrtc/test/listener.spec.ts b/packages/transport-webrtc/test/listener.spec.ts index e4e6dc149c..34feedb859 100644 --- a/packages/transport-webrtc/test/listener.spec.ts +++ b/packages/transport-webrtc/test/listener.spec.ts @@ -2,7 +2,7 @@ import { createEd25519PeerId } from '@libp2p/peer-id-factory' import { multiaddr } from '@multiformats/multiaddr' import { expect } from 'aegir/chai' import { stubInterface } from 'sinon-ts' -import { WebRTCPeerListener } from '../src/private-to-private/listener' +import { WebRTCPeerListener } from '../src/private-to-private/listener.js' import type { Listener } from '@libp2p/interface/transport' import type { TransportManager } from '@libp2p/interface-internal/transport-manager' diff --git a/packages/transport-webrtc/test/maconn.browser.spec.ts b/packages/transport-webrtc/test/maconn.browser.spec.ts index a8e91272c4..d93362f844 100644 --- a/packages/transport-webrtc/test/maconn.browser.spec.ts +++ b/packages/transport-webrtc/test/maconn.browser.spec.ts @@ -4,6 +4,7 @@ import { multiaddr } from '@multiformats/multiaddr' import { expect } from 'aegir/chai' import { stubObject } from 'sinon-ts' import { WebRTCMultiaddrConnection } from '../src/maconn.js' +import { RTCPeerConnection } from '../src/webrtc/index.js' import type { CounterGroup } from '@libp2p/interface/metrics' describe('Multiaddr Connection', () => { diff --git a/packages/transport-webrtc/test/peer.browser.spec.ts b/packages/transport-webrtc/test/peer.browser.spec.ts index 918ed2d0a5..8d490ff967 100644 --- a/packages/transport-webrtc/test/peer.browser.spec.ts +++ b/packages/transport-webrtc/test/peer.browser.spec.ts @@ -7,14 +7,16 @@ import { pair } from 'it-pair' import { duplexPair } from 'it-pair/duplex' import { pbStream } from 'it-protobuf-stream' import Sinon from 'sinon' -import { initiateConnection, handleIncomingStream } from '../src/private-to-private/handler' +import { initiateConnection, handleIncomingStream } from '../src/private-to-private/handler.js' import { Message } from '../src/private-to-private/pb/message.js' -import { WebRTCTransport, splitAddr } from '../src/private-to-private/transport' +import { WebRTCTransport, splitAddr } from '../src/private-to-private/transport.js' +import { RTCPeerConnection, RTCSessionDescription } from '../src/webrtc/index.js' const browser = detect() describe('webrtc basic', () => { const isFirefox = ((browser != null) && browser.name === 'firefox') + it('should connect', async () => { const [receiver, initiator] = duplexPair() const dstPeerId = await createEd25519PeerId() @@ -34,6 +36,9 @@ describe('webrtc basic', () => { } expect(pc0.connectionState).eq('connected') expect(pc1.connectionState).eq('connected') + + pc0.close() + pc1.close() }) }) @@ -59,10 +64,8 @@ describe('webrtc dialer', () => { const initiatorPeerConnectionPromise = initiateConnection({ signal: controller.signal, stream: mockStream(initiator) }) const stream = pbStream(receiver).pb(Message) - { - const offerMessage = await stream.read() - expect(offerMessage.type).to.eq(Message.Type.SDP_OFFER) - } + const offerMessage = await stream.read() + expect(offerMessage.type).to.eq(Message.Type.SDP_OFFER) await stream.write({ type: Message.Type.SDP_ANSWER, data: 'bad' }) await expect(initiatorPeerConnectionPromise).to.be.rejectedWith(/Failed to set remoteDescription/) @@ -78,17 +81,18 @@ describe('webrtc dialer', () => { pc.onicecandidate = ({ candidate }) => { void stream.write({ type: Message.Type.ICE_CANDIDATE, data: JSON.stringify(candidate?.toJSON()) }) } - { - const offerMessage = await stream.read() - expect(offerMessage.type).to.eq(Message.Type.SDP_OFFER) - const offer = new RTCSessionDescription({ type: 'offer', sdp: offerMessage.data }) - await pc.setRemoteDescription(offer) - - const answer = await pc.createAnswer() - await pc.setLocalDescription(answer) - } + + const offerMessage = await stream.read() + expect(offerMessage.type).to.eq(Message.Type.SDP_OFFER) + const offer = new RTCSessionDescription({ type: 'offer', sdp: offerMessage.data }) + await pc.setRemoteDescription(offer) + + const answer = await pc.createAnswer() + await pc.setLocalDescription(answer) await expect(initiatorPeerConnectionPromise).to.be.rejectedWith(/remote should send an SDP answer/) + + pc.close() }) }) @@ -128,5 +132,3 @@ describe('webrtc splitAddr', () => { expect(peerId.toString()).to.eq('12D3KooWFNBgv86tcpcYUHQz9FWGTrTmpMgr8feZwQXQySVTo3A7') }) }) - -export { } diff --git a/packages/transport-webrtc/test/stream.browser.spec.ts b/packages/transport-webrtc/test/stream.browser.spec.ts index 9bf0172c5e..457f95317d 100644 --- a/packages/transport-webrtc/test/stream.browser.spec.ts +++ b/packages/transport-webrtc/test/stream.browser.spec.ts @@ -4,6 +4,7 @@ import * as lengthPrefixed from 'it-length-prefixed' import { bytes } from 'multiformats' import { Message } from '../src/pb/message.js' import { createStream, type WebRTCStream } from '../src/stream.js' +import { RTCPeerConnection } from '../src/webrtc/index.js' import type { Stream } from '@libp2p/interface/connection' const TEST_MESSAGE = 'test_message' @@ -26,9 +27,16 @@ function generatePbByFlag (flag?: Message.Flag): Uint8Array { describe('Stream Stats', () => { let stream: WebRTCStream + let peerConnection: RTCPeerConnection beforeEach(async () => { - ({ stream } = setup()) + ({ stream, peerConnection } = setup()) + }) + + afterEach(() => { + if (peerConnection != null) { + peerConnection.close() + } }) it('can construct', () => { @@ -86,9 +94,16 @@ describe('Stream Stats', () => { describe('Stream Read Stats Transition By Incoming Flag', () => { let dataChannel: RTCDataChannel let stream: Stream + let peerConnection: RTCPeerConnection beforeEach(async () => { - ({ dataChannel, stream } = setup()) + ({ dataChannel, stream, peerConnection } = setup()) + }) + + afterEach(() => { + if (peerConnection != null) { + peerConnection.close() + } }) it('no flag, no transition', () => { @@ -123,9 +138,16 @@ describe('Stream Read Stats Transition By Incoming Flag', () => { describe('Stream Write Stats Transition By Incoming Flag', () => { let dataChannel: RTCDataChannel let stream: Stream + let peerConnection: RTCPeerConnection beforeEach(async () => { - ({ dataChannel, stream } = setup()) + ({ dataChannel, stream, peerConnection } = setup()) + }) + + afterEach(() => { + if (peerConnection != null) { + peerConnection.close() + } }) it('open to write-close by flag:STOP_SENDING', async () => {