Skip to content

Commit

Permalink
Merge pull request #297 from skyway/dev/socket-reconnect
Browse files Browse the repository at this point in the history
Make sure to try to reconnect when request to dispatcher fails
  • Loading branch information
y-i committed Jan 18, 2021
2 parents 3889570 + 72d8b70 commit 1e6d861
Show file tree
Hide file tree
Showing 3 changed files with 132 additions and 38 deletions.
56 changes: 30 additions & 26 deletions src/peer/socket.js
Original file line number Diff line number Diff line change
Expand Up @@ -85,15 +85,12 @@ class Socket extends EventEmitter {
}

if (this._dispatcherUrl) {
let serverInfo;
try {
serverInfo = await this._getSignalingServer();
this.signalingServerUrl = await this._fetchSignalingServerUrlWithRetry();
} catch (err) {
this.emit('error', err);
return;
}
const httpProtocol = serverInfo.secure ? 'https://' : 'http://';
this.signalingServerUrl = `${httpProtocol}${serverInfo.host}:${serverInfo.port}`;
}

this._io = io(this.signalingServerUrl, {
Expand All @@ -115,49 +112,56 @@ class Socket extends EventEmitter {
}

/**
* Connect to "new" signaling server. Attempts up to 10 times before giving up and emitting an error on the socket.
* @param {number} [numAttempts=0] - Current number of attempts.
* Connect to "new" signaling server.
* @return {Promise<void>} A promise that resolves with new connection has done.
* @private
*/
async _connectToNewServer(numAttempts = 0) {
// max number of attempts to get a new server from the dispatcher.
const maxNumberOfAttempts = 10;
if (
numAttempts >= maxNumberOfAttempts ||
this._reconnectAttempts >= config.numberServersToTry
) {
async _connectToNewServer() {
if (this._reconnectAttempts >= config.numberServersToTry) {
this.emit('error', 'Could not connect to server.');
return;
}

// Keep trying until we connect to a new server because consul can take some time to remove from the active list.
let serverInfo;
try {
serverInfo = await this._getSignalingServer();
this.signalingServerUrl = await this._fetchSignalingServerUrlWithRetry();
} catch (err) {
this.emit('error', err);
return;
}
this._io.io.uri = this.signalingServerUrl;
this._io.connect();
this._reconnectAttempts++;
}

if (this.signalingServerUrl.indexOf(serverInfo.host) === -1) {
const httpProtocol = serverInfo.secure ? 'https://' : 'http://';
this.signalingServerUrl = `${httpProtocol}${serverInfo.host}:${serverInfo.port}`;

this._io.io.uri = this.signalingServerUrl;
this._io.connect();
this._reconnectAttempts++;
} else {
this._connectToNewServer(++numAttempts);
/**
* Return signaling server url. This attempts trying up to maxNumberOfAttempts times before giving up then throw error.
* @return {String} A string of signaling server url.
*/
async _fetchSignalingServerUrlWithRetry() {
for (let attempts = 0; attempts < config.maxNumberOfAttempts; attempts++) {
const serverInfo = await this._fetchSignalingServer().catch(err => {
logger.warn(err);
});
if (
serverInfo &&
serverInfo.port &&
serverInfo.host &&
(!this.signalingServerUrl ||
this.signalingServerUrl.indexOf(serverInfo.host) === -1)
) {
const httpProtocol = serverInfo.secure ? 'https://' : 'http://';
return `${httpProtocol}${serverInfo.host}:${serverInfo.port}`;
}
}
throw new Error('Could not get signaling server url.');
}

/**
* Return object including signaling server info.
* @return {Promise<Object>} A promise that resolves with signaling server info
and rejects if there's no response or status code isn't 200.
*/
_getSignalingServer() {
_fetchSignalingServer() {
return new Promise((resolve, reject) => {
const http = new XMLHttpRequest();

Expand Down
4 changes: 4 additions & 0 deletions src/shared/config.js
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ const maxDataSize = 20 * 1024 * 1024;
// The minimum interval of using Room.send() is 100 ms
const minBroadcastIntervalMs = 100;

// max number of attempts to get a new server from the dispatcher.
const maxNumberOfAttempts = 10;

// Number of reconnection attempts to the same server before giving up
const reconnectionAttempts = 2;

Expand Down Expand Up @@ -88,6 +91,7 @@ export default {
maxChunkSize,
maxDataSize,
minBroadcastIntervalMs,
maxNumberOfAttempts,
reconnectionAttempts,
numberServersToTry,
sendInterval,
Expand Down
110 changes: 98 additions & 12 deletions tests/peer/socket.js
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ describe('Socket', () => {
const signalingHost = 'signaling.io';
const signalingPort = 443;
const signalingSecure = true;
let getSignalingServerStub;
let fetchSignalingServerStub;

beforeEach(() => {
socket = new Socket(apiKey, {
Expand All @@ -114,8 +114,8 @@ describe('Socket', () => {
dispatcherSecure: dispatcherSecure,
});

getSignalingServerStub = sinon.stub(socket, '_getSignalingServer');
getSignalingServerStub.returns(
fetchSignalingServerStub = sinon.stub(socket, '_fetchSignalingServer');
fetchSignalingServerStub.returns(
Promise.resolve({
host: signalingHost,
port: signalingPort,
Expand All @@ -125,7 +125,7 @@ describe('Socket', () => {
});

afterEach(() => {
getSignalingServerStub.restore();
fetchSignalingServerStub.restore();
});

it('should set _dispatcherUrl', () => {
Expand All @@ -135,7 +135,7 @@ describe('Socket', () => {
);
});

it('should get set the signalingServerUrl from _getSignalingServer', done => {
it('should get set the signalingServerUrl from _fetchSignalingServer', done => {
socket.start(null, token).then(() => {
const httpProtocol = signalingSecure ? 'https://' : 'http://';
const signalingServerUrl = `${httpProtocol}${signalingHost}:${signalingPort}`;
Expand Down Expand Up @@ -419,7 +419,7 @@ describe('Socket', () => {
});
});

describe('_getSignalingServer', () => {
describe('_fetchSignalingServer', () => {
let requests = [];
let xhr;
const fakeDomain = 'fake.domain';
Expand All @@ -443,7 +443,7 @@ describe('Socket', () => {
const result = { domain: fakeDomain };

socket
._getSignalingServer()
._fetchSignalingServer()
.then(() => {
assert.equal(requests.length, 1);

Expand All @@ -469,7 +469,7 @@ describe('Socket', () => {
const result = { domain: fakeDomain };

socket
._getSignalingServer()
._fetchSignalingServer()
.then(res => {
assert.deepEqual(res, {
host: fakeDomain,
Expand All @@ -492,7 +492,7 @@ describe('Socket', () => {
const result = {};

socket
._getSignalingServer()
._fetchSignalingServer()
.then(() => {
assert.fail('This should be rejected.');
done();
Expand All @@ -516,7 +516,7 @@ describe('Socket', () => {
};

socket
._getSignalingServer()
._fetchSignalingServer()
.then(() => {
assert.fail('This should be rejected.');
done();
Expand All @@ -543,7 +543,7 @@ describe('Socket', () => {
},
};
socket
._getSignalingServer()
._fetchSignalingServer()
.then(() => {
assert.fail('This should be rejected.');
done();
Expand Down Expand Up @@ -571,7 +571,7 @@ describe('Socket', () => {
};

socket
._getSignalingServer()
._fetchSignalingServer()
.then(() => {
assert.fail('This should be rejected.');
done();
Expand All @@ -589,4 +589,90 @@ describe('Socket', () => {
});
});
});

describe('_fetchSignalingServerUrlWithRetry', () => {
const signalingHost = 'signaling.io';
const signalingPort = 443;
const signalingSecure = true;
const httpProtocol = 'https://';
const signalingServerUrl = `${httpProtocol}${signalingHost}:${signalingPort}`;
let emitStub;
let fetchSignalingServerStub;

beforeEach(() => {
emitStub = sinon.stub(socket, 'emit');
fetchSignalingServerStub = sinon.stub(socket, '_fetchSignalingServer');
fetchSignalingServerStub.returns(
Promise.resolve({
host: signalingHost,
port: signalingPort,
secure: signalingSecure,
})
);
});

afterEach(() => {
emitStub.restore();
fetchSignalingServerStub.restore();
});

it('should return signalingServerUrl', async () => {
const url = await socket._fetchSignalingServerUrlWithRetry();

assert.equal(url, signalingServerUrl);
});

it('should attempt 10 times before giving up and throw error', async () => {
socket.signalingServerUrl = signalingServerUrl;

await socket._fetchSignalingServerUrlWithRetry().catch(err => {
assert.equal(fetchSignalingServerStub.callCount, 10);
assert.equal(err.message, 'Could not get signaling server url.');
});

// assert.throws(await socket._fetchSignalingServerUrlWithRetry(), 'Could not get signaling server url.');
});
});

describe('_connectToNewServer', () => {
let connectToNewServerSpy;
let emitStub;
let fetchSignalingServerUrlWithRetryStub;

beforeEach(() => {
connectToNewServerSpy = sinon.spy(socket, '_connectToNewServer');
emitStub = sinon.stub(socket, 'emit');
fetchSignalingServerUrlWithRetryStub = sinon.stub(
socket,
'_fetchSignalingServerUrlWithRetry'
);
});

afterEach(() => {
connectToNewServerSpy.restore();
emitStub.restore();
fetchSignalingServerUrlWithRetryStub.restore();
});

it('should set _io.io.uri', () => {
const signalingServerUrl = 'https://signaling.io:443';
fetchSignalingServerUrlWithRetryStub.returns(signalingServerUrl);

socket.start(undefined, token).then(async () => {
await socket._connectToNewServer();

assert.equal(socket._io.io.uri, signalingServerUrl);
});
});

describe('when response from dispatcher is empty', () => {
it('should emit an error on the socket', async () => {
fetchSignalingServerUrlWithRetryStub.throws();

await socket._connectToNewServer();

assert.deepEqual(emitStub.args[0], ['error', new Error()]);
});
});
});
});

0 comments on commit 1e6d861

Please sign in to comment.