Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make sure to try to reconnect when request to dispatcher fails #297

Merged
merged 13 commits into from
Jan 18, 2021
56 changes: 30 additions & 26 deletions src/peer/socket.js
Original file line number Diff line number Diff line change
Expand Up @@ -85,15 +85,12 @@ class Socket extends EventEmitter {
}

if (this._dispatcherUrl) {
let serverInfo;
try {
serverInfo = await this._getSignalingServer();
this.signalingServerUrl = await this._fetchSignalingServerUrlWithRetry();
} catch (err) {
this.emit('error', err);
return;
}
const httpProtocol = serverInfo.secure ? 'https://' : 'http://';
this.signalingServerUrl = `${httpProtocol}${serverInfo.host}:${serverInfo.port}`;
}

this._io = io(this.signalingServerUrl, {
Expand All @@ -115,49 +112,56 @@ class Socket extends EventEmitter {
}

/**
* Connect to "new" signaling server. Attempts up to 10 times before giving up and emitting an error on the socket.
* @param {number} [numAttempts=0] - Current number of attempts.
* Connect to "new" signaling server.
* @return {Promise<void>} A promise that resolves with new connection has done.
* @private
*/
async _connectToNewServer(numAttempts = 0) {
// max number of attempts to get a new server from the dispatcher.
const maxNumberOfAttempts = 10;
if (
numAttempts >= maxNumberOfAttempts ||
this._reconnectAttempts >= config.numberServersToTry
) {
async _connectToNewServer() {
if (this._reconnectAttempts >= config.numberServersToTry) {
this.emit('error', 'Could not connect to server.');
return;
}

// Keep trying until we connect to a new server because consul can take some time to remove from the active list.
let serverInfo;
try {
serverInfo = await this._getSignalingServer();
this.signalingServerUrl = await this._fetchSignalingServerUrlWithRetry();
} catch (err) {
this.emit('error', err);
return;
}
this._io.io.uri = this.signalingServerUrl;
this._io.connect();
this._reconnectAttempts++;
}

if (this.signalingServerUrl.indexOf(serverInfo.host) === -1) {
const httpProtocol = serverInfo.secure ? 'https://' : 'http://';
this.signalingServerUrl = `${httpProtocol}${serverInfo.host}:${serverInfo.port}`;

this._io.io.uri = this.signalingServerUrl;
this._io.connect();
this._reconnectAttempts++;
} else {
this._connectToNewServer(++numAttempts);
/**
* Return signaling server url. This attempts trying up to maxNumberOfAttempts times before giving up then throw error.
* @return {String} A string of signaling server url.
*/
async _fetchSignalingServerUrlWithRetry() {
for (let attempts = 0; attempts < config.maxNumberOfAttempts; attempts++) {
const serverInfo = await this._fetchSignalingServer().catch(err => {
logger.warn(err);
});
if (
serverInfo &&
serverInfo.port &&
serverInfo.host &&
(!this.signalingServerUrl ||
this.signalingServerUrl.indexOf(serverInfo.host) === -1)
) {
const httpProtocol = serverInfo.secure ? 'https://' : 'http://';
return `${httpProtocol}${serverInfo.host}:${serverInfo.port}`;
}
}
throw new Error('Could not get signaling server url.');
}

/**
* Return object including signaling server info.
* @return {Promise<Object>} A promise that resolves with signaling server info
and rejects if there's no response or status code isn't 200.
*/
_getSignalingServer() {
_fetchSignalingServer() {
return new Promise((resolve, reject) => {
const http = new XMLHttpRequest();

Expand Down
4 changes: 4 additions & 0 deletions src/shared/config.js
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ const maxDataSize = 20 * 1024 * 1024;
// The minimum interval of using Room.send() is 100 ms
const minBroadcastIntervalMs = 100;

// max number of attempts to get a new server from the dispatcher.
const maxNumberOfAttempts = 10;

// Number of reconnection attempts to the same server before giving up
const reconnectionAttempts = 2;

Expand Down Expand Up @@ -88,6 +91,7 @@ export default {
maxChunkSize,
maxDataSize,
minBroadcastIntervalMs,
maxNumberOfAttempts,
reconnectionAttempts,
numberServersToTry,
sendInterval,
Expand Down
110 changes: 98 additions & 12 deletions tests/peer/socket.js
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ describe('Socket', () => {
const signalingHost = 'signaling.io';
const signalingPort = 443;
const signalingSecure = true;
let getSignalingServerStub;
let fetchSignalingServerStub;

beforeEach(() => {
socket = new Socket(apiKey, {
Expand All @@ -114,8 +114,8 @@ describe('Socket', () => {
dispatcherSecure: dispatcherSecure,
});

getSignalingServerStub = sinon.stub(socket, '_getSignalingServer');
getSignalingServerStub.returns(
fetchSignalingServerStub = sinon.stub(socket, '_fetchSignalingServer');
fetchSignalingServerStub.returns(
Promise.resolve({
host: signalingHost,
port: signalingPort,
Expand All @@ -125,7 +125,7 @@ describe('Socket', () => {
});

afterEach(() => {
getSignalingServerStub.restore();
fetchSignalingServerStub.restore();
});

it('should set _dispatcherUrl', () => {
Expand All @@ -135,7 +135,7 @@ describe('Socket', () => {
);
});

it('should get set the signalingServerUrl from _getSignalingServer', done => {
it('should get set the signalingServerUrl from _fetchSignalingServer', done => {
socket.start(null, token).then(() => {
const httpProtocol = signalingSecure ? 'https://' : 'http://';
const signalingServerUrl = `${httpProtocol}${signalingHost}:${signalingPort}`;
Expand Down Expand Up @@ -419,7 +419,7 @@ describe('Socket', () => {
});
});

describe('_getSignalingServer', () => {
describe('_fetchSignalingServer', () => {
let requests = [];
let xhr;
const fakeDomain = 'fake.domain';
Expand All @@ -443,7 +443,7 @@ describe('Socket', () => {
const result = { domain: fakeDomain };

socket
._getSignalingServer()
._fetchSignalingServer()
.then(() => {
assert.equal(requests.length, 1);

Expand All @@ -469,7 +469,7 @@ describe('Socket', () => {
const result = { domain: fakeDomain };

socket
._getSignalingServer()
._fetchSignalingServer()
.then(res => {
assert.deepEqual(res, {
host: fakeDomain,
Expand All @@ -492,7 +492,7 @@ describe('Socket', () => {
const result = {};

socket
._getSignalingServer()
._fetchSignalingServer()
.then(() => {
assert.fail('This should be rejected.');
done();
Expand All @@ -516,7 +516,7 @@ describe('Socket', () => {
};

socket
._getSignalingServer()
._fetchSignalingServer()
.then(() => {
assert.fail('This should be rejected.');
done();
Expand All @@ -543,7 +543,7 @@ describe('Socket', () => {
},
};
socket
._getSignalingServer()
._fetchSignalingServer()
.then(() => {
assert.fail('This should be rejected.');
done();
Expand Down Expand Up @@ -571,7 +571,7 @@ describe('Socket', () => {
};

socket
._getSignalingServer()
._fetchSignalingServer()
.then(() => {
assert.fail('This should be rejected.');
done();
Expand All @@ -589,4 +589,90 @@ describe('Socket', () => {
});
});
});

describe('_fetchSignalingServerUrlWithRetry', () => {
const signalingHost = 'signaling.io';
const signalingPort = 443;
const signalingSecure = true;
const httpProtocol = 'https://';
const signalingServerUrl = `${httpProtocol}${signalingHost}:${signalingPort}`;
let emitStub;
let fetchSignalingServerStub;

beforeEach(() => {
emitStub = sinon.stub(socket, 'emit');
fetchSignalingServerStub = sinon.stub(socket, '_fetchSignalingServer');
fetchSignalingServerStub.returns(
Promise.resolve({
host: signalingHost,
port: signalingPort,
secure: signalingSecure,
})
);
});

afterEach(() => {
emitStub.restore();
fetchSignalingServerStub.restore();
});

it('should return signalingServerUrl', async () => {
const url = await socket._fetchSignalingServerUrlWithRetry();

assert.equal(url, signalingServerUrl);
});

it('should attempt 10 times before giving up and throw error', async () => {
socket.signalingServerUrl = signalingServerUrl;

await socket._fetchSignalingServerUrlWithRetry().catch(err => {
assert.equal(fetchSignalingServerStub.callCount, 10);
assert.equal(err.message, 'Could not get signaling server url.');
});

// assert.throws(await socket._fetchSignalingServerUrlWithRetry(), 'Could not get signaling server url.');
});
});

describe('_connectToNewServer', () => {
let connectToNewServerSpy;
let emitStub;
let fetchSignalingServerUrlWithRetryStub;

beforeEach(() => {
connectToNewServerSpy = sinon.spy(socket, '_connectToNewServer');
emitStub = sinon.stub(socket, 'emit');
fetchSignalingServerUrlWithRetryStub = sinon.stub(
socket,
'_fetchSignalingServerUrlWithRetry'
);
});

afterEach(() => {
connectToNewServerSpy.restore();
emitStub.restore();
fetchSignalingServerUrlWithRetryStub.restore();
});

it('should set _io.io.uri', () => {
const signalingServerUrl = 'https://signaling.io:443';
fetchSignalingServerUrlWithRetryStub.returns(signalingServerUrl);

socket.start(undefined, token).then(async () => {
await socket._connectToNewServer();

assert.equal(socket._io.io.uri, signalingServerUrl);
});
});

describe('when response from dispatcher is empty', () => {
it('should emit an error on the socket', async () => {
fetchSignalingServerUrlWithRetryStub.throws();

await socket._connectToNewServer();

assert.deepEqual(emitStub.args[0], ['error', new Error()]);
});
});
});
});