Skip to content
This repository was archived by the owner on Oct 25, 2024. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions docs/design/perfect_negotiation.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# Perfect Negotiation

This document describes how perfect negotiation is implemented in OWT P2P SDK to avoid collision.

**Perfect negotiation**, **polite peer**, **impolite peer** are defined in [Perfect negotiation example of WebRTC 1.0](https://w3c.github.io/webrtc-pc/#perfect-negotiation-example).

## Determining polite peer and impolite peer

OWT client SDKs determines polite peer and impolite peer by comparing client IDs. Sorting these two client IDs alphabetically in increasing order, the first one is polite order, the second one is impolite peer.

Signaling server is required to return a client ID assigned to the client connected after authentication. OWT doesn't define how authentication works and how client IDs are assigned. They depend on application's design. But every client gets a unique client ID after connecting to the signaling server.

## Connections

We expect only one `PeerConnection` between two endpoints. A random ID is generated when creating a new `PeerConnection`. All messages (offers, answers, ICE candidates) for this `PeerConnection` have a `connectionId` property with the connection ID as its value. The connection ID is shared by both clients.

## Collision

When WebRTC collision occurs, it basically follows the perfect negotiation example in WebRTC 1.0. This section only describes the implementation for signaling collision.

A connection typically ends by calling `stop` at local side or receiving a `chat-closed` message from the remote side. Client SDKs stores the most recent connection IDs with all remote endpoints, and clean one of them when a connection ends. If the connection ID of a signaling message received is different from the one stored locally, collision happens.

An example is both sides call `send` to create a data channel and send a message to the remote endpoint at the same time. Each side creates a new `PeerConnection`, and a connection ID. These two connection IDs are different. Then each of them will receive signaling messages with connection ID differ from the local one.

### Polite peer

The polite peer is the controlled side. When a signaling message with a new connection ID is received, it stops current PeerConnection, create a new one, and associate it with the remote connection ID.

### Impolite peer

The polite peer is the controlling side. It ignores remote messages conflicting with its own state, and continues its own process.
74 changes: 56 additions & 18 deletions src/sdk/p2p/p2pclient.js
Original file line number Diff line number Diff line change
Expand Up @@ -85,26 +85,36 @@ const P2PClient = function(configuration, signalingChannel) {
const config = configuration;
const signaling = signalingChannel;
const channels = new Map(); // Map of PeerConnectionChannels.
const self=this;
const connectionIds = new Map(); // Key is remote user ID, value is current session ID.
const self = this;
let state = ConnectionState.READY;
let myId;

signaling.onMessage = function(origin, message) {
Logger.debug('Received signaling message from ' + origin + ': ' + message);
const data = JSON.parse(message);
const connectionId = data.connectionId;
if (self.allowedRemoteIds.indexOf(origin) < 0) {
sendSignalingMessage(
origin, data.connectionId, 'chat-closed',
ErrorModule.errors.P2P_CLIENT_DENIED);
return;
}
if (connectionIds.has(origin) &&
connectionIds.get(origin) !== connectionId && !isPolitePeer(origin)) {
Logger.warning(
// eslint-disable-next-line max-len
'Collision detected, ignore this message because current endpoint is impolite peer.');
return;
}
if (data.type === 'chat-closed') {
if (channels.has(origin)) {
getOrCreateChannel(origin, false).onMessage(data);
getOrCreateChannel(origin, connectionId).onMessage(data);
channels.delete(origin);
}
return;
}
if (self.allowedRemoteIds.indexOf(origin) >= 0) {
getOrCreateChannel(origin, false).onMessage(data);
} else {
sendSignalingMessage(origin, 'chat-closed',
ErrorModule.errors.P2P_CLIENT_DENIED);
}
getOrCreateChannel(origin, connectionId).onMessage(data);
};

signaling.onServerDisconnected = function() {
Expand Down Expand Up @@ -158,7 +168,7 @@ const P2PClient = function(configuration, signalingChannel) {
if (state == ConnectionState.READY) {
return;
}
channels.forEach((channel)=>{
channels.forEach((channel) => {
channel.stop();
});
channels.clear();
Expand All @@ -184,7 +194,7 @@ const P2PClient = function(configuration, signalingChannel) {
return Promise.reject(new ErrorModule.P2PError(
ErrorModule.errors.P2P_CLIENT_NOT_ALLOWED));
}
return Promise.resolve(getOrCreateChannel(remoteId, true).publish(stream));
return Promise.resolve(getOrCreateChannel(remoteId).publish(stream));
};

/**
Expand All @@ -206,7 +216,7 @@ const P2PClient = function(configuration, signalingChannel) {
return Promise.reject(new ErrorModule.P2PError(
ErrorModule.errors.P2P_CLIENT_NOT_ALLOWED));
}
return Promise.resolve(getOrCreateChannel(remoteId, true).send(message));
return Promise.resolve(getOrCreateChannel(remoteId).send(message));
};

/**
Expand Down Expand Up @@ -247,9 +257,11 @@ const P2PClient = function(configuration, signalingChannel) {
return channels.get(remoteId).getStats();
};

const sendSignalingMessage = function(remoteId, type, message) {
const sendSignalingMessage = function(
remoteId, connectionId, type, message) {
const msg = {
type: type,
type,
connectionId,
};
if (message) {
msg.data = message;
Expand All @@ -261,21 +273,47 @@ const P2PClient = function(configuration, signalingChannel) {
});
};

const getOrCreateChannel = function(remoteId, isInitializer) {
// Return true if current endpoint is an impolite peer, which controls the
// session.
const isPolitePeer = function(remoteId) {
return myId < remoteId;
};

// If a connection with remoteId with a different session ID exists, it will
// be stopped and a new connection will be created.
const getOrCreateChannel = function(remoteId, connectionId) {
// If `connectionId` is not defined, use the latest one or generate a new
// one.
if (!connectionId && connectionIds.has(remoteId)) {
connectionId = connectionIds.get(remoteId);
}
// Delete old channel if connection doesn't match.
if (connectionIds.has(remoteId) &&
connectionIds.get(remoteId) != connectionId) {
self.stop(remoteId);
}
if (!connectionId) {
const connectionIdLimit = 100000;
connectionId = Math.round(Math.random() * connectionIdLimit);
}
connectionIds.set(remoteId, connectionId);
if (!channels.has(remoteId)) {
// Construct an signaling sender/receiver for P2PPeerConnection.
const signalingForChannel = Object.create(EventDispatcher);
signalingForChannel.sendSignalingMessage = sendSignalingMessage;
const pcc = new P2PPeerConnectionChannel(config, myId, remoteId,
signalingForChannel, isInitializer);
const pcc = new P2PPeerConnectionChannel(
config, myId, remoteId, connectionId, signalingForChannel);
pcc.addEventListener('streamadded', (streamEvent)=>{
self.dispatchEvent(streamEvent);
});
pcc.addEventListener('messagereceived', (messageEvent)=>{
self.dispatchEvent(messageEvent);
});
pcc.addEventListener('ended', ()=>{
channels.delete(remoteId);
pcc.addEventListener('ended', () => {
if (channels.has(remoteId)) {
channels.delete(remoteId);
}
connectionIds.delete(remoteId);
});
channels.set(remoteId, pcc);
}
Expand Down
70 changes: 23 additions & 47 deletions src/sdk/p2p/peerconnection-channel.js
Original file line number Diff line number Diff line change
Expand Up @@ -54,18 +54,22 @@ const sysInfo = Utils.sysInfo();

/**
* @class P2PPeerConnectionChannel
* @desc A P2PPeerConnectionChannel handles all interactions between this endpoint and a remote endpoint.
* @desc A P2PPeerConnectionChannel manages a PeerConnection object, handles all
* interactions between this endpoint (local) and a remote endpoint. Only one
* PeerConnectionChannel is alive for a local - remote endpoint pair at any
* given time.
* @memberOf Owt.P2P
* @private
*/
class P2PPeerConnectionChannel extends EventDispatcher {
// |signaling| is an object has a method |sendSignalingMessage|.
/* eslint-disable-next-line require-jsdoc */
constructor(config, localId, remoteId, signaling, isInitializer) {
constructor(
config, localId, remoteId, connectionId, signaling) {
super();
this._config = config;
this._localId = localId;
this._remoteId = remoteId;
this._connectionId = connectionId;
this._signaling = signaling;
this._pc = null;
this._publishedStreams = new Map(); // Key is streams published, value is its publication.
Expand All @@ -91,13 +95,9 @@ class P2PPeerConnectionChannel extends EventDispatcher {
this._dataSeq = 1; // Sequence number for data channel messages.
this._sendDataPromises = new Map(); // Key is data sequence number, value is an object has |resolve| and |reject|.
this._addedTrackIds = []; // Tracks that have been added after receiving remote SDP but before connection is established. Draining these messages when ICE connection state is connected.
this._isCaller = true;
this._infoSent = false;
this._isPolitePeer = localId < remoteId;
this._disposed = false;
this._createPeerConnection();
if (isInitializer) {
this._sendSignalingMessage(SignalingType.CLOSED);
}
this._sendSignalingMessage(SignalingType.UA, sysInfo);
}

Expand Down Expand Up @@ -126,7 +126,6 @@ class P2PPeerConnectionChannel extends EventDispatcher {
for (const track of stream.mediaStream.getTracks()) {
this._pc.addTrack(track, stream.mediaStream);
}
this._onNegotiationneeded();
this._publishingStreams.push(stream);
const trackIds = Array.from(stream.mediaStream.getTracks(),
(track) => track.id);
Expand Down Expand Up @@ -227,11 +226,12 @@ class P2PPeerConnectionChannel extends EventDispatcher {

_sendSdp(sdp) {
return this._signaling.sendSignalingMessage(
this._remoteId, SignalingType.SDP, sdp);
this._remoteId, this._connectionId, SignalingType.SDP, sdp);
}

_sendSignalingMessage(type, message) {
return this._signaling.sendSignalingMessage(this._remoteId, type, message);
return this._signaling.sendSignalingMessage(
this._remoteId, this._connectionId, type, message);
}

_SignalingMesssageHandler(message) {
Expand Down Expand Up @@ -416,6 +416,15 @@ class P2PPeerConnectionChannel extends EventDispatcher {
_onOffer(sdp) {
Logger.debug('About to set remote description. Signaling state: ' +
this._pc.signalingState);
if (this._pc.signalingState !== 'stable') {
if (this._isPolitePeer) {
Logger.debug('Rollback.');
this._pc.setLocalDescription();
} else {
Logger.debug('Collision detected. Ignore this offer.');
return;
}
}
sdp.sdp = this._setRtpSenderOptions(sdp.sdp, this._config);
// Firefox only has one codec in answer, which does not truly reflect its
// decoding capability. So we set codec preference to remote offer, and let
Expand Down Expand Up @@ -539,12 +548,6 @@ class P2PPeerConnectionChannel extends EventDispatcher {
}

_onNegotiationneeded() {
// This is intented to be executed when onnegotiationneeded event is fired.
// However, onnegotiationneeded may fire mutiple times when more than one
// track is added/removed. So we manually execute this function after
// adding/removing track and creating data channel.
Logger.debug('On negotiation needed.');

if (this._pc.signalingState === 'stable') {
this._doNegotiate();
} else {
Expand Down Expand Up @@ -689,43 +692,23 @@ class P2PPeerConnectionChannel extends EventDispatcher {
this._pc.oniceconnectionstatechange = (event) => {
this._onIceConnectionStateChange.apply(this, [event]);
};
/*
this._pc.oniceChannelStatechange = function(event) {
_onIceChannelStateChange(peer, event);
};
= function() {
onNegotiationneeded(peers[peer.id]);
this._pc.onnegotiationneeded = () => {
this._onNegotiationneeded();
};

//DataChannel
this._pc.ondatachannel = function(event) {
Logger.debug(myId + ': On data channel');
// Save remote created data channel.
if (!peer.dataChannels[event.channel.label]) {
peer.dataChannels[event.channel.label] = event.channel;
Logger.debug('Save remote created data channel.');
}
bindEventsToDataChannel(event.channel, peer);
};*/
}

_drainPendingStreams() {
let negotiationNeeded = false;
Logger.debug('Draining pending streams.');
if (this._pc && this._pc.signalingState === 'stable') {
Logger.debug('Peer connection is ready for draining pending streams.');
for (let i = 0; i < this._pendingStreams.length; i++) {
const stream = this._pendingStreams[i];
// OnNegotiationNeeded event will be triggered immediately after adding stream to PeerConnection in Firefox.
// And OnNegotiationNeeded handler will execute drainPendingStreams. To avoid add the same stream multiple times,
// shift it from pending stream list before adding it to PeerConnection.
this._pendingStreams.shift();
if (!stream.mediaStream) {
continue;
}
for (const track of stream.mediaStream.getTracks()) {
this._pc.addTrack(track, stream.mediaStream);
negotiationNeeded = true;
}
Logger.debug('Added stream to peer connection.');
this._publishingStreams.push(stream);
Expand All @@ -736,17 +719,13 @@ class P2PPeerConnectionChannel extends EventDispatcher {
continue;
}
this._pc.removeStream(this._pendingUnpublishStreams[j].mediaStream);
negotiationNeeded = true;
this._unpublishPromises.get(
this._pendingUnpublishStreams[j].mediaStream.id).resolve();
this._publishedStreams.delete(this._pendingUnpublishStreams[j]);
Logger.debug('Remove stream.');
}
this._pendingUnpublishStreams.length = 0;
}
if (negotiationNeeded) {
this._onNegotiationneeded();
}
}

_drainPendingRemoteIceCandidates() {
Expand Down Expand Up @@ -867,7 +846,6 @@ class P2PPeerConnectionChannel extends EventDispatcher {
return;
}
this._isNegotiationNeeded = false;
this._isCaller = true;
let localDesc;
this._pc.createOffer().then((desc) => {
desc.sdp = this._setRtpReceiverOptions(desc.sdp);
Expand All @@ -878,7 +856,7 @@ class P2PPeerConnectionChannel extends EventDispatcher {
});
}
}).catch((e) => {
Logger.error(e.message + ' Please check your codec settings.');
Logger.error(e.message);
const error = new ErrorModule.P2PError(ErrorModule.errors.P2P_WEBRTC_SDP,
e.message);
this._stop(error, true);
Expand All @@ -888,7 +866,6 @@ class P2PPeerConnectionChannel extends EventDispatcher {
_createAndSendAnswer() {
this._drainPendingStreams();
this._isNegotiationNeeded = false;
this._isCaller = false;
let localDesc;
this._pc.createAnswer().then((desc) => {
desc.sdp = this._setRtpReceiverOptions(desc.sdp);
Expand Down Expand Up @@ -963,7 +940,6 @@ class P2PPeerConnectionChannel extends EventDispatcher {
const dc = this._pc.createDataChannel(label);
this._bindEventsToDataChannel(dc);
this._dataChannels.set(DataChannelLabel.MESSAGE, dc);
this._onNegotiationneeded();
}

_bindEventsToDataChannel(dc) {
Expand Down
1 change: 0 additions & 1 deletion test/unit/resources/scripts/fake-p2p-signaling.js
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ export default class FakeP2PSignalingChannel {
}

send(targetId, message) {
Logger.debug(this.userId + ' -> ' + targetId + ': ' + message);
return new Promise((resolve, reject) => {
messageQueue.push({ target: targetId, message, resolve, reject, sender: this.userId });
setTimeout(() => {
Expand Down
Loading