Permalink
Comparing changes
Open a pull request
- 1 commit
- 6 files changed
- 0 commit comments
- 1 contributor
Unified
Split
Showing
with
218 additions
and 49 deletions.
- +0 −2 h/static/scripts/app.coffee
- +26 −18 h/static/scripts/streamer.js
- +33 −28 h/static/scripts/test/streamer-test.js
- +59 −0 h/static/scripts/test/websocket-test.js
- +99 −0 h/static/scripts/websocket.js
- +1 −1 package.json
| @@ -1,7 +1,6 @@ | ||
| require('autofill-event') | ||
| baseURI = require('document-base-uri') | ||
| angular = require('angular') | ||
| require('angular-websocket') | ||
| require('angular-jwt') | ||
| streamer = require('./streamer') | ||
| @@ -93,7 +92,6 @@ module.exports = angular.module('h', [ | ||
| 'ngRoute' | ||
| 'ngSanitize' | ||
| 'ngTagsInput' | ||
| 'ngWebSocket' | ||
| 'toastr' | ||
| 'ui.bootstrap' | ||
| ]) | ||
| @@ -1,5 +1,7 @@ | ||
| var uuid = require('node-uuid') | ||
| var Socket = require('./websocket'); | ||
| // the randomly generated session UUID | ||
| var clientId = uuid.v4(); | ||
| @@ -12,7 +14,7 @@ var socket; | ||
| * Only one websocket connection may exist at a time, any existing socket is | ||
| * closed. | ||
| * | ||
| * @param $websocket - angular-websocket constructor | ||
| * @param $rootScope | ||
| * @param annotationMapper - The local annotation store | ||
| * @param groups - The local groups store | ||
| * @param session - Provides access to read and update the session state | ||
| @@ -21,7 +23,7 @@ var socket; | ||
| * @return An angular-websocket wrapper around the socket. | ||
| */ | ||
| // @ngInject | ||
| function connect($websocket, annotationMapper, groups, session, settings) { | ||
| function connect($rootScope, annotationMapper, groups, session, settings) { | ||
| // Get the socket URL | ||
| var url = settings.websocketUrl; | ||
| @@ -31,9 +33,7 @@ function connect($websocket, annotationMapper, groups, session, settings) { | ||
| } | ||
| // Open the socket | ||
| socket = $websocket(url, [], { | ||
| reconnectIfNotNormalClose: true | ||
| }); | ||
| socket = new Socket(url); | ||
| socket.send({ | ||
| messageType: 'client_id', | ||
| value: clientId | ||
| @@ -70,20 +70,28 @@ function connect($websocket, annotationMapper, groups, session, settings) { | ||
| session.update(message.model); | ||
| } | ||
| // Listen for updates | ||
| socket.onMessage(function (event) { | ||
| message = JSON.parse(event.data); | ||
| if (!message) { | ||
| return; | ||
| } | ||
| socket.on('error', function (error) { | ||
| console.warn('Error connecting to H push notification service:', error); | ||
| }); | ||
| if (message.type === 'annotation-notification') { | ||
| handleAnnotationNotification(message) | ||
| } else if (message.type === 'session-change') { | ||
| handleSessionChangeNotification(message) | ||
| } else { | ||
| console.warn('received unsupported notification', message.type) | ||
| } | ||
| socket.on('message', function (event) { | ||
| // wrap message dispatches in $rootScope.$apply() so that | ||
| // scope watches on app state affected by the received message | ||
| // are updated | ||
| $rootScope.$apply(function () { | ||
| message = JSON.parse(event.data); | ||
| if (!message) { | ||
| return; | ||
| } | ||
| if (message.type === 'annotation-notification') { | ||
| handleAnnotationNotification(message) | ||
| } else if (message.type === 'session-change') { | ||
| handleSessionChangeNotification(message) | ||
| } else { | ||
| console.warn('received unsupported notification', message.type) | ||
| } | ||
| }); | ||
| }); | ||
| return socket | ||
| @@ -1,43 +1,43 @@ | ||
| 'use strict'; | ||
| var streamer = require('../streamer'); | ||
| function fakeSocketConstructor(url) { | ||
| return { | ||
| messages: [], | ||
| onMessageCallbacks: [], | ||
| didClose: false, | ||
| send: function (message) { | ||
| this.messages.push(message); | ||
| }, | ||
| onMessage: function (callback) { | ||
| this.onMessageCallbacks.push(callback); | ||
| }, | ||
| notify: function (message) { | ||
| this.onMessageCallbacks.forEach(function (callback) { | ||
| callback({ | ||
| data: JSON.stringify(message) | ||
| }); | ||
| }); | ||
| }, | ||
| var EventEmitter = require('events'); | ||
| var util = require('util'); | ||
| var proxyquire = require('proxyquire'); | ||
| function FakeSocket(url) { | ||
| this.messages = []; | ||
| this.didClose = false; | ||
| close: function () { | ||
| this.didClose = true | ||
| } | ||
| this.send = function (message) { | ||
| this.messages.push(message); | ||
| }; | ||
| this.notify = function (message) { | ||
| this.emit('message', {data: JSON.stringify(message)}); | ||
| }; | ||
| this.close = function () { | ||
| this.didClose = true | ||
| }; | ||
| } | ||
| util.inherits(FakeSocket, EventEmitter); | ||
| describe('streamer', function () { | ||
| var fakeAnnotationMapper; | ||
| var fakeGroups; | ||
| var fakeSession; | ||
| var fakeSettings; | ||
| var socket; | ||
| var streamer; | ||
| beforeEach(function () { | ||
| fakeRootScope = { | ||
| $apply: function (callback) { | ||
| callback(); | ||
| } | ||
| }; | ||
| fakeAnnotationMapper = { | ||
| loadAnnotations: sinon.stub(), | ||
| unloadAnnotations: sinon.stub(), | ||
| @@ -57,8 +57,12 @@ describe('streamer', function () { | ||
| websocketUrl: 'ws://example.com/ws', | ||
| }; | ||
| streamer = proxyquire('../streamer', { | ||
| './websocket': FakeSocket, | ||
| }); | ||
| socket = streamer.connect( | ||
| fakeSocketConstructor, | ||
| fakeRootScope, | ||
| fakeAnnotationMapper, | ||
| fakeGroups, | ||
| fakeSession, | ||
| @@ -74,7 +78,8 @@ describe('streamer', function () { | ||
| it('should close any existing socket', function () { | ||
| var oldSocket = socket; | ||
| var newSocket = streamer.connect(fakeSocketConstructor, | ||
| var newSocket = streamer.connect( | ||
| fakeRootScope, | ||
| fakeAnnotationMapper, | ||
| fakeGroups, | ||
| fakeSession, | ||
| @@ -0,0 +1,59 @@ | ||
| var Socket = require('../websocket'); | ||
| describe('websocket wrapper', function () { | ||
| var fakeSocket; | ||
| var clock; | ||
| function FakeWebSocket() { | ||
| this.close = sinon.stub(); | ||
| this.send = sinon.stub(); | ||
| fakeSocket = this; | ||
| }; | ||
| FakeWebSocket.OPEN = 1; | ||
| var WebSocket = window.WebSocket; | ||
| beforeEach(function () { | ||
| global.WebSocket = FakeWebSocket; | ||
| clock = sinon.useFakeTimers(); | ||
| }); | ||
| afterEach(function () { | ||
| global.WebSocket = WebSocket; | ||
| clock.restore(); | ||
| }); | ||
| it('should reconnect after an abnormal disconnection', function () { | ||
| var socket = new Socket('ws://test:1234'); | ||
| assert.ok(fakeSocket); | ||
| var initialSocket = fakeSocket; | ||
| fakeSocket.onclose({code: 1006}); | ||
| clock.tick(1000); | ||
| assert.ok(fakeSocket); | ||
| assert.notEqual(fakeSocket, initialSocket); | ||
| }); | ||
| it('should not reconnect after a normal disconnection', function () { | ||
| var socket = new Socket('ws://test:1234'); | ||
| socket.close(); | ||
| assert.called(fakeSocket.close); | ||
| var initialSocket = fakeSocket; | ||
| clock.tick(1000); | ||
| assert.equal(fakeSocket, initialSocket); | ||
| }); | ||
| it('should queue messages sent prior to connection', function () { | ||
| var socket = new Socket('ws://test:1234'); | ||
| socket.send({abc: 'foo'}); | ||
| assert.notCalled(fakeSocket.send); | ||
| fakeSocket.onopen({}); | ||
| assert.calledWith(fakeSocket.send, '{"abc":"foo"}'); | ||
| }); | ||
| it('should send messages immediately when connected', function () { | ||
| var socket = new Socket('ws://test:1234'); | ||
| fakeSocket.readyState = FakeWebSocket.OPEN; | ||
| socket.send({abc: 'foo'}); | ||
| assert.calledWith(fakeSocket.send, '{"abc":"foo"}'); | ||
| }); | ||
| }); |
| @@ -0,0 +1,99 @@ | ||
| 'use strict'; | ||
| var retry = require('retry'); | ||
| var util = require('util'); | ||
| var EventEmitter = require('events'); | ||
| // see https://developer.mozilla.org/en-US/docs/Web/API/CloseEvent | ||
| var CLOSE_NORMAL = 1000; | ||
| /** | ||
| * Socket is a minimal wrapper around WebSocket which provides: | ||
| * | ||
| * - Automatic reconnection in the event of an abnormal close | ||
| * - Queuing of messages passed to send() whilst the socket is | ||
| * connecting | ||
| * - Uses the standard EventEmitter API for reporting open, close, error | ||
| * and message events. | ||
| */ | ||
| function Socket(url) { | ||
| var self = this; | ||
| // queue of JSON objects which have not yet been submitted | ||
| var messageQueue = []; | ||
| // the current WebSocket instance or null if disconnected | ||
| var socket; | ||
| function sendMessages() { | ||
| while (messageQueue.length > 0) { | ||
| var messageString = JSON.stringify(messageQueue.shift()); | ||
| socket.send(messageString); | ||
| } | ||
| } | ||
| function reconnect() { | ||
| var didConnect = false; | ||
| var connectOperation = retry.operation(); | ||
| connectOperation.attempt(function (currentAttempt) { | ||
| socket = new WebSocket(url); | ||
| socket.onopen = function (event) { | ||
| // signal successful connection | ||
| connectOperation.retry(); | ||
| didConnect = true; | ||
| sendMessages(); | ||
| self.emit('open', event); | ||
| }; | ||
| socket.onclose = function (event) { | ||
| if (event.code !== CLOSE_NORMAL) { | ||
| if (didConnect) { | ||
| console.warn('The WebSocket connection closed abnormally ' + | ||
| '(code: %d, reason: %s). Reconnecting automatically.', | ||
| event.code, event.reason); | ||
| reconnect(); | ||
| } else { | ||
| console.warn('Retrying connection (attempt %d)', currentAttempt); | ||
| connectOperation.retry(new Error(event.reason)); | ||
| } | ||
| } | ||
| socket = null; | ||
| }; | ||
| socket.onerror = function (event) { | ||
| self.emit('error', event); | ||
| }; | ||
| socket.onmessage = function (event) { | ||
| self.emit('message', event); | ||
| }; | ||
| }); | ||
| }; | ||
| this.close = function () { | ||
| if (!socket) { | ||
| console.error('Socket.close() called before socket was connected'); | ||
| return; | ||
| } | ||
| socket.close(); | ||
| }; | ||
| /** | ||
| * Send a JSON object via the WebSocket connection, or queue it | ||
| * for later delivery if not currently connected. | ||
| */ | ||
| this.send = function (message) { | ||
| messageQueue.push(message); | ||
| if (socket && socket.readyState === WebSocket.OPEN) { | ||
| sendMessages(); | ||
| } | ||
| }; | ||
| // establish the initial connection | ||
| reconnect(); | ||
| } | ||
| util.inherits(Socket, EventEmitter); | ||
| module.exports = Socket; |
| @@ -12,7 +12,6 @@ | ||
| "angular-route": "1.4.7", | ||
| "angular-sanitize": "1.4.7", | ||
| "angular-toastr": "^1.5.0", | ||
| "angular-websocket": "^1.0.13", | ||
| "angulartics": "0.17.2", | ||
| "autofill-event": "0.0.1", | ||
| "autoprefixer": "^6.0.3", | ||
| @@ -44,6 +43,7 @@ | ||
| "node-uuid": "^1.4.3", | ||
| "postcss": "^5.0.6", | ||
| "raf": "^3.1.0", | ||
| "retry": "^0.8.0", | ||
| "scroll-into-view": "^1.3.1", | ||
| "showdown": "^1.2.1", | ||
| "uglify-js": "^2.4.14", | ||