Skip to content
This repository has been archived by the owner on Nov 27, 2022. It is now read-only.

Commit

Permalink
Merge pull request #30 from jholleran/feature/reconnect-on-close
Browse files Browse the repository at this point in the history
Reconnect UpdateTracker on close
  • Loading branch information
RubenVerborgh committed Mar 29, 2020
2 parents 4594b86 + 6ba0506 commit e5685e4
Show file tree
Hide file tree
Showing 3 changed files with 225 additions and 14 deletions.
2 changes: 1 addition & 1 deletion .eslintrc
Expand Up @@ -27,7 +27,7 @@
dot-location: [ error, property ],
dot-notation: error,
eqeqeq: error,
guard-for-in: error,
guard-for-in: off,
no-alert: error,
no-caller: error,
no-console: [ error, { allow: [ warn ] } ],
Expand Down
61 changes: 54 additions & 7 deletions src/UpdateTracker.js
Expand Up @@ -30,7 +30,7 @@ export default class UpdateTracker {
if (url !== ALL)
trackResource(url);
else
fetchedUrls.forEach(trackResource);
fetchedUrls.forEach(fetchedUrl => trackResource(fetchedUrl));
}
// Add the new subscriber
subscribers[url].add(this.subscriber);
Expand All @@ -48,20 +48,37 @@ export default class UpdateTracker {
}

/** Tracks updates to the given resource */
function trackResource(url) {
function trackResource(url, webSocketOptions = {}) {
// Try to find an existing socket for the host
const { protocol, host } = new URL(url);
let webSocket = webSockets[host];

// If none exists, create a new one
// If no socket exists, create a new one
if (!webSocket) {
const socketUrl = `${protocol.replace('http', 'ws')}//${host}/`;
webSockets[host] = webSocket = new WebSocket(socketUrl);
Object.assign(webSocket, { enqueue, onmessage,
ready: new Promise(resolve => (webSocket.onopen = resolve)),
});
Object.assign(webSocket, {
host,
resources: new Set(),
reconnectionAttempts: 0,
reconnectionDelay: 1000,
enqueue,
onmessage: processMessage,
onclose: reconnect,
ready: new Promise(resolve => {
webSocket.onopen = () => {
webSocket.reconnectionAttempts = 0;
webSocket.reconnectionDelay = 1000;
resolve();
};
}),
}, webSocketOptions);
}

// Each WebSocket keeps track of subscribed resources
// so we can resubscribe later if needed
webSocket.resources.add(url);

// Subscribe to updates on the resource
webSocket.enqueue(`sub ${url}`);
}
Expand All @@ -73,7 +90,7 @@ async function enqueue(data) {
}

/** Processes an update message from the WebSocket */
function onmessage({ data }) {
function processMessage({ data }) {
// Verify the message is an update notification
const match = /^pub +(.+)/.exec(data);
if (!match)
Expand All @@ -91,6 +108,36 @@ function onmessage({ data }) {
subscriber(update);
}

/** Reconnects a socket after a backoff delay */
async function reconnect() {
// Ensure this socket is no longer marked as active
delete webSockets[this.host];

// Try setting up a new socket
if (this.reconnectionAttempts < 6) {
// Wait a given backoff period before reconnecting
await new Promise(done => (setTimeout(done, this.reconnectionDelay)));
// Try reconnecting, and back off exponentially
this.resources.forEach(url => trackResource(url, {
reconnectionAttempts: this.reconnectionAttempts + 1,
reconnectionDelay: this.reconnectionDelay * 2,
}));
}
}

/** Closes all sockets */
export function resetWebSockets() {
for (const url in subscribers)
delete subscribers[url];
for (const host in webSockets) {
const socket = webSockets[host];
delete webSockets[host];
delete socket.onclose;
socket.close();
}
fetchedUrls.clear();
}

// Keep track of all fetched resources
auth.on('request', url => {
if (!fetchedUrls.has(url)) {
Expand Down
176 changes: 170 additions & 6 deletions test/UpdateTracker-test.js
@@ -1,18 +1,27 @@
import UpdateTracker from '../src/UpdateTracker';
import UpdateTracker, { resetWebSockets } from '../src/UpdateTracker';
import auth from 'solid-auth-client';
import ldflex from '@solid/query-ldflex';

const WebSocket = global.WebSocket = jest.fn(() => ({
send: jest.fn(),
close: jest.fn(),
}));

describe('An UpdateTracker', () => {
const callback = jest.fn();
let updateTracker, webSockets;

jest.useFakeTimers();

beforeAll(() => {
updateTracker = new UpdateTracker(callback);
});

function retrieveCreatedWebSockets() {
webSockets = WebSocket.mock.results.map(s => s.value);
return webSockets;
}

describe('subscribing to 3 resources', () => {
const resources = [
'http://a.com/docs/1',
Expand All @@ -23,7 +32,7 @@ describe('An UpdateTracker', () => {
beforeAll(() => {
WebSocket.mockClear();
updateTracker.subscribe(...resources);
webSockets = WebSocket.mock.results.map(s => s.value);
retrieveCreatedWebSockets();
});

it('opens WebSockets to the servers of those resources', () => {
Expand Down Expand Up @@ -159,8 +168,7 @@ describe('An UpdateTracker', () => {
describe('after subscribing', () => {
beforeAll(() => {
updateTracker.subscribe('*');
webSockets = WebSocket.mock.results.map(s => s.value);
webSockets.forEach(s => s.onopen());
retrieveCreatedWebSockets().forEach(s => s.onopen());
});

it('subscribes to all previously fetched resources', () => {
Expand Down Expand Up @@ -189,8 +197,7 @@ describe('An UpdateTracker', () => {
WebSocket.mockClear();
auth.emit('request', 'https://z.com/1');
auth.emit('request', 'https://z.com/2');
webSockets = WebSocket.mock.results.map(s => s.value);
webSockets.forEach(s => s.onopen());
retrieveCreatedWebSockets().forEach(s => s.onopen());
});

it('subscribes to the new resources', () => {
Expand All @@ -203,4 +210,161 @@ describe('An UpdateTracker', () => {
});
});
});

describe('when a socket is closed', () => {
// Ensure clean slate between tests
beforeEach(resetWebSockets);
beforeEach(WebSocket.mockClear);

// Subscribe to resources
beforeEach(() => {
updateTracker.subscribe('http://retry.com/docs/1', 'http://retry.com/docs/2');
});

// Simulate socket closure
beforeEach(() => {
retrieveCreatedWebSockets()[0].onclose();
WebSocket.mockClear();
});

it('resubscribes after 1s backoff time', async () => {
await jest.advanceTimersByTime(500); // backoff time not exceeded yet
expect(WebSocket).toHaveBeenCalledTimes(0);

await jest.advanceTimersByTime(500); // backoff time exceeded
expect(WebSocket).toHaveBeenCalledTimes(1);
expect(WebSocket).toHaveBeenCalledWith('ws://retry.com/');

retrieveCreatedWebSockets()[0].onopen();
await webSockets[0].ready;
expect(webSockets[0].send).toHaveBeenCalledTimes(2);
expect(webSockets[0].send).toHaveBeenCalledWith('sub http://retry.com/docs/1');
expect(webSockets[0].send).toHaveBeenCalledWith('sub http://retry.com/docs/2');
});

it('makes six attempts to resubscribe with doubling backoff times', async () => {
await jest.advanceTimersByTime(1000);
expect(WebSocket).toHaveBeenCalledTimes(1);

retrieveCreatedWebSockets()[0].onclose();
await jest.advanceTimersByTime(2000);
expect(WebSocket).toHaveBeenCalledTimes(2);

retrieveCreatedWebSockets()[1].onclose();
await jest.advanceTimersByTime(4000);
expect(WebSocket).toHaveBeenCalledTimes(3);

retrieveCreatedWebSockets()[2].onclose();
await jest.advanceTimersByTime(8000);
expect(WebSocket).toHaveBeenCalledTimes(4);

retrieveCreatedWebSockets()[3].onclose();
await jest.advanceTimersByTime(16000);
expect(WebSocket).toHaveBeenCalledTimes(5);

retrieveCreatedWebSockets()[4].onclose();
await jest.advanceTimersByTime(32000);
expect(WebSocket).toHaveBeenCalledTimes(6);

retrieveCreatedWebSockets()[5].onopen();
await webSockets[5].ready;

// First five attempts failed to connect so there ere no subscribe calls
expect(webSockets[0].send).toHaveBeenCalledTimes(0);
expect(webSockets[1].send).toHaveBeenCalledTimes(0);
expect(webSockets[2].send).toHaveBeenCalledTimes(0);
expect(webSockets[3].send).toHaveBeenCalledTimes(0);
expect(webSockets[4].send).toHaveBeenCalledTimes(0);

// The sixth attempts succeeded to connect so there was a subscribe call
expect(webSockets[5].send).toHaveBeenCalledTimes(2);
expect(webSockets[5].send).toHaveBeenCalledWith('sub http://retry.com/docs/1');
expect(webSockets[5].send).toHaveBeenCalledWith('sub http://retry.com/docs/2');
});

it('does not retry after the sixth attempt', async () => {
await jest.advanceTimersByTime(1000);
expect(WebSocket).toHaveBeenCalledTimes(1);

retrieveCreatedWebSockets()[0].onclose();
await jest.advanceTimersByTime(2000);
expect(WebSocket).toHaveBeenCalledTimes(2);

retrieveCreatedWebSockets()[1].onclose();
await jest.advanceTimersByTime(4000);
expect(WebSocket).toHaveBeenCalledTimes(3);

retrieveCreatedWebSockets()[2].onclose();
await jest.advanceTimersByTime(8000);
expect(WebSocket).toHaveBeenCalledTimes(4);

retrieveCreatedWebSockets()[3].onclose();
await jest.advanceTimersByTime(16000);
expect(WebSocket).toHaveBeenCalledTimes(5);

retrieveCreatedWebSockets()[4].onclose();
await jest.advanceTimersByTime(32000);
expect(WebSocket).toHaveBeenCalledTimes(6);

retrieveCreatedWebSockets()[5].onclose();
await jest.advanceTimersByTime(64000);
expect(WebSocket).toHaveBeenCalledTimes(6);

// All five attempts failed to connect so there was no subscribe calls
expect(webSockets[0].send).toHaveBeenCalledTimes(0);
expect(webSockets[1].send).toHaveBeenCalledTimes(0);
expect(webSockets[2].send).toHaveBeenCalledTimes(0);
expect(webSockets[3].send).toHaveBeenCalledTimes(0);
expect(webSockets[4].send).toHaveBeenCalledTimes(0);
expect(webSockets[5].send).toHaveBeenCalledTimes(0);
});

it('resets backoff if the connection is dropping and coming back up', async () => {
await jest.advanceTimersByTime(1000);
expect(WebSocket).toHaveBeenCalledTimes(1);

retrieveCreatedWebSockets()[0].onclose();
await jest.advanceTimersByTime(2000);
expect(WebSocket).toHaveBeenCalledTimes(2);

// Connection succeeded which should reset backoff times
retrieveCreatedWebSockets()[1].onopen();
await webSockets[1].ready;
expect(WebSocket).toHaveBeenCalledTimes(2);

// Backoff timeouts have been reset back to original times
retrieveCreatedWebSockets()[1].onclose();
await jest.advanceTimersByTime(1000);
expect(WebSocket).toHaveBeenCalledTimes(3);

retrieveCreatedWebSockets()[2].onclose();
await jest.advanceTimersByTime(2000);
expect(WebSocket).toHaveBeenCalledTimes(4);

retrieveCreatedWebSockets()[3].onclose();
await jest.advanceTimersByTime(4000);
expect(WebSocket).toHaveBeenCalledTimes(5);

retrieveCreatedWebSockets()[4].onopen();
await webSockets[4].ready;
expect(WebSocket).toHaveBeenCalledTimes(5);

// First attempt failed to connect so there was no subscribe call
expect(webSockets[0].send).toHaveBeenCalledTimes(0);

// Second attempt succeed to connect so there were two subscribe calls
expect(webSockets[1].send).toHaveBeenCalledTimes(2);
expect(webSockets[1].send).toHaveBeenCalledWith('sub http://retry.com/docs/1');
expect(webSockets[1].send).toHaveBeenCalledWith('sub http://retry.com/docs/2');

// After a close event the third and forth attempts failed to connect
expect(webSockets[2].send).toHaveBeenCalledTimes(0);
expect(webSockets[3].send).toHaveBeenCalledTimes(0);

// The Fifth attempt succeed to connect so there were two subscribe calls
expect(webSockets[4].send).toHaveBeenCalledTimes(2);
expect(webSockets[4].send).toHaveBeenCalledWith('sub http://retry.com/docs/1');
expect(webSockets[4].send).toHaveBeenCalledWith('sub http://retry.com/docs/2');
});
});
});

0 comments on commit e5685e4

Please sign in to comment.