diff --git a/docs/design/perfect_negotiation.md b/docs/design/perfect_negotiation.md new file mode 100644 index 00000000..8c1e9dc7 --- /dev/null +++ b/docs/design/perfect_negotiation.md @@ -0,0 +1,31 @@ +# Perfect Negotiation + +This document describes how perfect negotiation is implemented in OWT P2P SDK to avoid collision. + +**Perfect negotiation**, **polite peer**, **impolite peer** are defined in [Perfect negotiation example of WebRTC 1.0](https://w3c.github.io/webrtc-pc/#perfect-negotiation-example). + +## Determining polite peer and impolite peer + +OWT client SDKs determines polite peer and impolite peer by comparing client IDs. Sorting these two client IDs alphabetically in increasing order, the first one is polite order, the second one is impolite peer. + +Signaling server is required to return a client ID assigned to the client connected after authentication. OWT doesn't define how authentication works and how client IDs are assigned. They depend on application's design. But every client gets a unique client ID after connecting to the signaling server. + +## Connections + +We expect only one `PeerConnection` between two endpoints. A random ID is generated when creating a new `PeerConnection`. All messages (offers, answers, ICE candidates) for this `PeerConnection` have a `connectionId` property with the connection ID as its value. The connection ID is shared by both clients. + +## Collision + +When WebRTC collision occurs, it basically follows the perfect negotiation example in WebRTC 1.0. This section only describes the implementation for signaling collision. + +A connection typically ends by calling `stop` at local side or receiving a `chat-closed` message from the remote side. Client SDKs stores the most recent connection IDs with all remote endpoints, and clean one of them when a connection ends. If the connection ID of a signaling message received is different from the one stored locally, collision happens. + +An example is both sides call `send` to create a data channel and send a message to the remote endpoint at the same time. Each side creates a new `PeerConnection`, and a connection ID. These two connection IDs are different. Then each of them will receive signaling messages with connection ID differ from the local one. + +### Polite peer + +The polite peer is the controlled side. When a signaling message with a new connection ID is received, it stops current PeerConnection, create a new one, and associate it with the remote connection ID. + +### Impolite peer + +The polite peer is the controlling side. It ignores remote messages conflicting with its own state, and continues its own process. \ No newline at end of file diff --git a/src/sdk/p2p/p2pclient.js b/src/sdk/p2p/p2pclient.js index bbc486a6..b86fa09d 100644 --- a/src/sdk/p2p/p2pclient.js +++ b/src/sdk/p2p/p2pclient.js @@ -85,26 +85,36 @@ const P2PClient = function(configuration, signalingChannel) { const config = configuration; const signaling = signalingChannel; const channels = new Map(); // Map of PeerConnectionChannels. - const self=this; + const connectionIds = new Map(); // Key is remote user ID, value is current session ID. + const self = this; let state = ConnectionState.READY; let myId; signaling.onMessage = function(origin, message) { Logger.debug('Received signaling message from ' + origin + ': ' + message); const data = JSON.parse(message); + const connectionId = data.connectionId; + if (self.allowedRemoteIds.indexOf(origin) < 0) { + sendSignalingMessage( + origin, data.connectionId, 'chat-closed', + ErrorModule.errors.P2P_CLIENT_DENIED); + return; + } + if (connectionIds.has(origin) && + connectionIds.get(origin) !== connectionId && !isPolitePeer(origin)) { + Logger.warning( + // eslint-disable-next-line max-len + 'Collision detected, ignore this message because current endpoint is impolite peer.'); + return; + } if (data.type === 'chat-closed') { if (channels.has(origin)) { - getOrCreateChannel(origin, false).onMessage(data); + getOrCreateChannel(origin, connectionId).onMessage(data); channels.delete(origin); } return; } - if (self.allowedRemoteIds.indexOf(origin) >= 0) { - getOrCreateChannel(origin, false).onMessage(data); - } else { - sendSignalingMessage(origin, 'chat-closed', - ErrorModule.errors.P2P_CLIENT_DENIED); - } + getOrCreateChannel(origin, connectionId).onMessage(data); }; signaling.onServerDisconnected = function() { @@ -158,7 +168,7 @@ const P2PClient = function(configuration, signalingChannel) { if (state == ConnectionState.READY) { return; } - channels.forEach((channel)=>{ + channels.forEach((channel) => { channel.stop(); }); channels.clear(); @@ -184,7 +194,7 @@ const P2PClient = function(configuration, signalingChannel) { return Promise.reject(new ErrorModule.P2PError( ErrorModule.errors.P2P_CLIENT_NOT_ALLOWED)); } - return Promise.resolve(getOrCreateChannel(remoteId, true).publish(stream)); + return Promise.resolve(getOrCreateChannel(remoteId).publish(stream)); }; /** @@ -206,7 +216,7 @@ const P2PClient = function(configuration, signalingChannel) { return Promise.reject(new ErrorModule.P2PError( ErrorModule.errors.P2P_CLIENT_NOT_ALLOWED)); } - return Promise.resolve(getOrCreateChannel(remoteId, true).send(message)); + return Promise.resolve(getOrCreateChannel(remoteId).send(message)); }; /** @@ -247,9 +257,11 @@ const P2PClient = function(configuration, signalingChannel) { return channels.get(remoteId).getStats(); }; - const sendSignalingMessage = function(remoteId, type, message) { + const sendSignalingMessage = function( + remoteId, connectionId, type, message) { const msg = { - type: type, + type, + connectionId, }; if (message) { msg.data = message; @@ -261,21 +273,47 @@ const P2PClient = function(configuration, signalingChannel) { }); }; - const getOrCreateChannel = function(remoteId, isInitializer) { + // Return true if current endpoint is an impolite peer, which controls the + // session. + const isPolitePeer = function(remoteId) { + return myId < remoteId; + }; + + // If a connection with remoteId with a different session ID exists, it will + // be stopped and a new connection will be created. + const getOrCreateChannel = function(remoteId, connectionId) { + // If `connectionId` is not defined, use the latest one or generate a new + // one. + if (!connectionId && connectionIds.has(remoteId)) { + connectionId = connectionIds.get(remoteId); + } + // Delete old channel if connection doesn't match. + if (connectionIds.has(remoteId) && + connectionIds.get(remoteId) != connectionId) { + self.stop(remoteId); + } + if (!connectionId) { + const connectionIdLimit = 100000; + connectionId = Math.round(Math.random() * connectionIdLimit); + } + connectionIds.set(remoteId, connectionId); if (!channels.has(remoteId)) { // Construct an signaling sender/receiver for P2PPeerConnection. const signalingForChannel = Object.create(EventDispatcher); signalingForChannel.sendSignalingMessage = sendSignalingMessage; - const pcc = new P2PPeerConnectionChannel(config, myId, remoteId, - signalingForChannel, isInitializer); + const pcc = new P2PPeerConnectionChannel( + config, myId, remoteId, connectionId, signalingForChannel); pcc.addEventListener('streamadded', (streamEvent)=>{ self.dispatchEvent(streamEvent); }); pcc.addEventListener('messagereceived', (messageEvent)=>{ self.dispatchEvent(messageEvent); }); - pcc.addEventListener('ended', ()=>{ - channels.delete(remoteId); + pcc.addEventListener('ended', () => { + if (channels.has(remoteId)) { + channels.delete(remoteId); + } + connectionIds.delete(remoteId); }); channels.set(remoteId, pcc); } diff --git a/src/sdk/p2p/peerconnection-channel.js b/src/sdk/p2p/peerconnection-channel.js index 63fac835..28f9deb2 100644 --- a/src/sdk/p2p/peerconnection-channel.js +++ b/src/sdk/p2p/peerconnection-channel.js @@ -54,18 +54,22 @@ const sysInfo = Utils.sysInfo(); /** * @class P2PPeerConnectionChannel - * @desc A P2PPeerConnectionChannel handles all interactions between this endpoint and a remote endpoint. + * @desc A P2PPeerConnectionChannel manages a PeerConnection object, handles all + * interactions between this endpoint (local) and a remote endpoint. Only one + * PeerConnectionChannel is alive for a local - remote endpoint pair at any + * given time. * @memberOf Owt.P2P * @private */ class P2PPeerConnectionChannel extends EventDispatcher { // |signaling| is an object has a method |sendSignalingMessage|. /* eslint-disable-next-line require-jsdoc */ - constructor(config, localId, remoteId, signaling, isInitializer) { + constructor( + config, localId, remoteId, connectionId, signaling) { super(); this._config = config; - this._localId = localId; this._remoteId = remoteId; + this._connectionId = connectionId; this._signaling = signaling; this._pc = null; this._publishedStreams = new Map(); // Key is streams published, value is its publication. @@ -91,13 +95,9 @@ class P2PPeerConnectionChannel extends EventDispatcher { this._dataSeq = 1; // Sequence number for data channel messages. this._sendDataPromises = new Map(); // Key is data sequence number, value is an object has |resolve| and |reject|. this._addedTrackIds = []; // Tracks that have been added after receiving remote SDP but before connection is established. Draining these messages when ICE connection state is connected. - this._isCaller = true; - this._infoSent = false; + this._isPolitePeer = localId < remoteId; this._disposed = false; this._createPeerConnection(); - if (isInitializer) { - this._sendSignalingMessage(SignalingType.CLOSED); - } this._sendSignalingMessage(SignalingType.UA, sysInfo); } @@ -126,7 +126,6 @@ class P2PPeerConnectionChannel extends EventDispatcher { for (const track of stream.mediaStream.getTracks()) { this._pc.addTrack(track, stream.mediaStream); } - this._onNegotiationneeded(); this._publishingStreams.push(stream); const trackIds = Array.from(stream.mediaStream.getTracks(), (track) => track.id); @@ -227,11 +226,12 @@ class P2PPeerConnectionChannel extends EventDispatcher { _sendSdp(sdp) { return this._signaling.sendSignalingMessage( - this._remoteId, SignalingType.SDP, sdp); + this._remoteId, this._connectionId, SignalingType.SDP, sdp); } _sendSignalingMessage(type, message) { - return this._signaling.sendSignalingMessage(this._remoteId, type, message); + return this._signaling.sendSignalingMessage( + this._remoteId, this._connectionId, type, message); } _SignalingMesssageHandler(message) { @@ -416,6 +416,15 @@ class P2PPeerConnectionChannel extends EventDispatcher { _onOffer(sdp) { Logger.debug('About to set remote description. Signaling state: ' + this._pc.signalingState); + if (this._pc.signalingState !== 'stable') { + if (this._isPolitePeer) { + Logger.debug('Rollback.'); + this._pc.setLocalDescription(); + } else { + Logger.debug('Collision detected. Ignore this offer.'); + return; + } + } sdp.sdp = this._setRtpSenderOptions(sdp.sdp, this._config); // Firefox only has one codec in answer, which does not truly reflect its // decoding capability. So we set codec preference to remote offer, and let @@ -539,12 +548,6 @@ class P2PPeerConnectionChannel extends EventDispatcher { } _onNegotiationneeded() { - // This is intented to be executed when onnegotiationneeded event is fired. - // However, onnegotiationneeded may fire mutiple times when more than one - // track is added/removed. So we manually execute this function after - // adding/removing track and creating data channel. - Logger.debug('On negotiation needed.'); - if (this._pc.signalingState === 'stable') { this._doNegotiate(); } else { @@ -689,43 +692,23 @@ class P2PPeerConnectionChannel extends EventDispatcher { this._pc.oniceconnectionstatechange = (event) => { this._onIceConnectionStateChange.apply(this, [event]); }; - /* - this._pc.oniceChannelStatechange = function(event) { - _onIceChannelStateChange(peer, event); - }; - = function() { - onNegotiationneeded(peers[peer.id]); + this._pc.onnegotiationneeded = () => { + this._onNegotiationneeded(); }; - - //DataChannel - this._pc.ondatachannel = function(event) { - Logger.debug(myId + ': On data channel'); - // Save remote created data channel. - if (!peer.dataChannels[event.channel.label]) { - peer.dataChannels[event.channel.label] = event.channel; - Logger.debug('Save remote created data channel.'); - } - bindEventsToDataChannel(event.channel, peer); - };*/ } _drainPendingStreams() { - let negotiationNeeded = false; Logger.debug('Draining pending streams.'); if (this._pc && this._pc.signalingState === 'stable') { Logger.debug('Peer connection is ready for draining pending streams.'); for (let i = 0; i < this._pendingStreams.length; i++) { const stream = this._pendingStreams[i]; - // OnNegotiationNeeded event will be triggered immediately after adding stream to PeerConnection in Firefox. - // And OnNegotiationNeeded handler will execute drainPendingStreams. To avoid add the same stream multiple times, - // shift it from pending stream list before adding it to PeerConnection. this._pendingStreams.shift(); if (!stream.mediaStream) { continue; } for (const track of stream.mediaStream.getTracks()) { this._pc.addTrack(track, stream.mediaStream); - negotiationNeeded = true; } Logger.debug('Added stream to peer connection.'); this._publishingStreams.push(stream); @@ -736,7 +719,6 @@ class P2PPeerConnectionChannel extends EventDispatcher { continue; } this._pc.removeStream(this._pendingUnpublishStreams[j].mediaStream); - negotiationNeeded = true; this._unpublishPromises.get( this._pendingUnpublishStreams[j].mediaStream.id).resolve(); this._publishedStreams.delete(this._pendingUnpublishStreams[j]); @@ -744,9 +726,6 @@ class P2PPeerConnectionChannel extends EventDispatcher { } this._pendingUnpublishStreams.length = 0; } - if (negotiationNeeded) { - this._onNegotiationneeded(); - } } _drainPendingRemoteIceCandidates() { @@ -867,7 +846,6 @@ class P2PPeerConnectionChannel extends EventDispatcher { return; } this._isNegotiationNeeded = false; - this._isCaller = true; let localDesc; this._pc.createOffer().then((desc) => { desc.sdp = this._setRtpReceiverOptions(desc.sdp); @@ -878,7 +856,7 @@ class P2PPeerConnectionChannel extends EventDispatcher { }); } }).catch((e) => { - Logger.error(e.message + ' Please check your codec settings.'); + Logger.error(e.message); const error = new ErrorModule.P2PError(ErrorModule.errors.P2P_WEBRTC_SDP, e.message); this._stop(error, true); @@ -888,7 +866,6 @@ class P2PPeerConnectionChannel extends EventDispatcher { _createAndSendAnswer() { this._drainPendingStreams(); this._isNegotiationNeeded = false; - this._isCaller = false; let localDesc; this._pc.createAnswer().then((desc) => { desc.sdp = this._setRtpReceiverOptions(desc.sdp); @@ -963,7 +940,6 @@ class P2PPeerConnectionChannel extends EventDispatcher { const dc = this._pc.createDataChannel(label); this._bindEventsToDataChannel(dc); this._dataChannels.set(DataChannelLabel.MESSAGE, dc); - this._onNegotiationneeded(); } _bindEventsToDataChannel(dc) { diff --git a/test/unit/resources/scripts/fake-p2p-signaling.js b/test/unit/resources/scripts/fake-p2p-signaling.js index 9a1711d9..38140dc1 100644 --- a/test/unit/resources/scripts/fake-p2p-signaling.js +++ b/test/unit/resources/scripts/fake-p2p-signaling.js @@ -39,7 +39,6 @@ export default class FakeP2PSignalingChannel { } send(targetId, message) { - Logger.debug(this.userId + ' -> ' + targetId + ': ' + message); return new Promise((resolve, reject) => { messageQueue.push({ target: targetId, message, resolve, reject, sender: this.userId }); setTimeout(() => { diff --git a/test/unit/resources/scripts/p2p.js b/test/unit/resources/scripts/p2p.js index fe4aa8f7..aa48b5bc 100644 --- a/test/unit/resources/scripts/p2p.js +++ b/test/unit/resources/scripts/p2p.js @@ -66,7 +66,7 @@ describe('Unit tests for P2PClient', function() { done(); }); }); - describe('Interop with remote endpoints', function(){ + describe('Interop with remote endpoints (includes end to end tests)', function(){ const sourceInfo = new StreamModule.StreamSourceInfo('mic'); let signaling1, signaling2, signaling3; let p2pclient1, p2pclient2, p2pclient3; @@ -150,5 +150,34 @@ describe('Unit tests for P2PClient', function() { expect(p2pclient3.send('user1', 'message')).to.be.rejected.and.notify( done); }); + it('Signaling collisions should be resolved.', done => { + p2pclient1.send('user2', 'message'); + p2pclient2.send('user1', 'message'); + done(); + //expect(Promise.all([p2pclient1.send('user2', 'message'), p2pclient2.send('user1', 'message')])).to.be.fulfilled.and.notify(done); + // TODO: Check messages are received. + }); + it('WebRTC collision should be resolved.', async () => { + const c1Spy = new sinon.spy(); + const c2Spy = new sinon.spy(); + p2pclient1.addEventListener('messagereceived',c1Spy); + p2pclient2.addEventListener('messagereceived',c2Spy); + await p2pclient1.publish('user2', localStream); + // Both sides create PeerConnection. It cannot 100% sure to trigger WebRTC + // collision. But it has a high chance that `setRemoteDescription` get + // failed. However, even `setRemoteDescription` is failed, the SDK stops + // `PeerConnection` silently without firing an event. So this test case + // cannot detect failures like this. + await Promise.all([ + p2pclient1.send('user2', 'message'), p2pclient2.send('user1', 'message') + ]); + await new Promise(resolve => { + setTimeout(() => { + expect(c1Spy.callCount).to.equal(1); + expect(c2Spy.callCount).to.equal(1); + resolve(); + }, 100); + }); + }); }); });