From f5bcdfcaad228845c77ac9ad2afa3d647374467d Mon Sep 17 00:00:00 2001 From: Henri Pihkala Date: Thu, 8 Sep 2016 21:13:47 +0300 Subject: [PATCH 1/7] wip: update client to understand new protocol --- streamr-client.js | 1090 +++++++++++++++++------------------ test/test.streamr-client.js | 277 ++++----- 2 files changed, 637 insertions(+), 730 deletions(-) diff --git a/streamr-client.js b/streamr-client.js index 569a0f88a..60157904c 100644 --- a/streamr-client.js +++ b/streamr-client.js @@ -2,658 +2,622 @@ (function() { -var io -var debug -if (typeof window !== 'undefined') { - io = window.io - debug = (window.debug ? window.debug('StreamrClient') : function() { - if (window.consoleLoggingEnabled) - console.log.apply(console, arguments) - }) -} -else { - io = require('socket.io-client') - debug = require('debug')('StreamrClient') -} - -var STREAM_KEY = "_S" -var COUNTER_KEY = "_C" -var TIMESTAMP_KEY = "_T" -var BYE_KEY = "_bye" -var SUB_KEY = "_sub" - -function extend() { - for(var i=1; i1) - throw "Multiple resend options active! Please use only one: "+JSON.stringify(options) - - // Automatically convert Date objects to numbers for resend_from_time - if (this.options.resend_from_time != null - && typeof this.options.resend_from_time !== 'number') { - - if (typeof this.options.resend_from_time.getTime === 'function') - this.options.resend_from_time = this.options.resend_from_time.getTime() - else throw "resend_from_time option must be a Date object or a number representing time!" + + var BYE_KEY = "_bye" + + function extend() { + for(var i=1; i _this.counter) { - this.trigger('gap', this.counter, response.from-1) + var idx = versionFields[msg[0].toString()].indexOf(field) + if (idx >= 0) { + return msg[idx] } - // This situation should never occur, it is safe to ignore but will result in duplicate messages being sent by the server - else if (response.from != null && response.from < _this.counter) { - debug("Subscribed from a counter less than what was expected! Expected: %d, subscribed from: %d", _this.counter, response.from) + else { + return undefined } + } - }) - - this.bind('unsubscribed', function() { - debug("Sub %s unsubscribed: %s", _this.id, _this.streamId) - _this.subscribed = false - _this.unsubscribing = false - _this.resending = false - }) + function Subscription(streamId, callback, options) { + if (!streamId) + throw "No stream id given!" + if (!callback) + throw "No callback given!" + + var _this = this + + this.id = generateSubscriptionId() + this.streamId = streamId + this.callback = callback + this.options = options || {} + this.queue = [] + this.subscribed = false + this.lastReceivedOffset = null + + // Check that multiple resend options are not given + var resendOptionCount = 0 + if (this.options.resend_all) + resendOptionCount++ + if (this.options.resend_from!=null) + resendOptionCount++ + if (this.options.resend_last!=null) + resendOptionCount++ + if (this.options.resend_from_time!=null) + resendOptionCount++ + if (resendOptionCount>1) + throw "Multiple resend options active! Please use only one: "+JSON.stringify(options) + + // Automatically convert Date objects to numbers for resend_from_time + if (this.options.resend_from_time != null + && typeof this.options.resend_from_time !== 'number') { + + if (typeof this.options.resend_from_time.getTime === 'function') + this.options.resend_from_time = this.options.resend_from_time.getTime() + else throw "resend_from_time option must be a Date object or a number representing time!" + } - this.bind('resending', function(response) { - debug("Sub %s resending: %s next message set to %d, was: %o", _this.id, response.channel, response.from, _this.counter) - _this.counter = response.from - }) + /*** Message handlers ***/ - this.bind('no_resend', function(response) { - debug("Sub %s no_resend: %s next message set to %d, was: %o", _this.id, response.channel, response.next, _this.counter) + this.bind('subscribed', function(response) { + debug("Sub %s subscribed to stream: %s", _this.id, _this.streamId) + _this.subscribed = true + }) - _this.counter = response.next - _this.resending = false - _this.checkQueue() - }) + this.bind('unsubscribed', function() { + debug("Sub %s unsubscribed: %s", _this.id, _this.streamId) + _this.subscribed = false + _this.unsubscribing = false + _this.resending = false + }) - this.bind('resent', function(response) { - debug("Sub %s resent: %s from %d to %d", _this.id, response.channel, response.from, response.to) - - _this.resending = false - _this.checkQueue() - }) + this.bind('resending', function(response) { + debug("Sub %s resending: %o", _this.id, response) + // _this.resending = true was set elsewhere before making the request + }) - this.bind('connected', function() { + this.bind('no_resend', function(response) { + debug("Sub %s no_resend: %o", _this.id, response) + _this.resending = false + _this.checkQueue() + }) - }) + this.bind('resent', function(response) { + debug("Sub %s resent: %o", _this.id, response) + _this.resending = false + _this.checkQueue() + }) - this.bind('disconnected', function() { - _this.subscribed = false - _this.resending = false - }) + this.bind('connected', function() { -} + }) -MicroEvent.mixin(Subscription) + this.bind('disconnected', function() { + _this.subscribed = false + _this.resending = false + }) -Subscription.prototype.handleMessage = function(message, stream, timestamp, counter, bye) { - if (this.counter === null && counter!==undefined) { - debug("Sub %s received message %d but does not know what number to expect", this.id, counter) - return } - // Update ack counter - if (counter > this.counter) { - this.queue.push({ - msg: message, - str: stream, - ts: timestamp, - c: counter, - bye: bye - }) - - if (!this.resending) { - debug("Gap detected, requesting resend for channel %s", this.streamId) - this.trigger('gap', this.counter, counter-1) + MicroEvent.mixin(Subscription) + + Subscription.prototype.handleMessage = function(msg) { + var content = getMessageField('content', msg) + var timestamp = getMessageField('timestamp', msg) + var offset = getMessageField('offset', msg) + var prevOffset = getMessageField('prevOffset', msg) + + if (prevOffset == null) { + debug("handleMessage: prevOffset is null, gap detection is impossible! message: %o", msg) } - } - else if (counter < this.counter) { - debug("Sub %s already received message: %d, expecting: %d", this.id, counter, this.counter) - } - else { - this.counter = counter + 1 - this.callback(message, stream, timestamp, counter) + debug("handleMessage: %o", msg) + debug("handleMessage: lastReceivedOffset %d", this.lastReceivedOffset) - if (bye) - this.trigger('done') - } -} - -Subscription.prototype.checkQueue = function() { - if (this.queue.length) { - debug("Attempting to process %d queued messages for stream %s", this.queue.length, this.streamId) - - var i - for (i=0;i this.lastReceivedOffset) { + this.queue.push(msg) + + if (!this.resending) { + var from = this.lastReceivedOffset + 1 + var to = prevOffset + debug("Gap detected, requesting resend for stream %s from %d to %d", this.streamId, from, to) + this.trigger('gap', from, to) } - // Ignore old messages in the queue - else if (this.queue[i].c < this.counter) - continue - // Else stop looping - else if (this.queue[i].c > this.counter) - break } - - // All messages in queue were processed - if (i===this.queue.length) { - this.queue = [] + // Prevent double-processing of messages for any reason + else if (this.lastReceivedOffset != null && offset <= this.lastReceivedOffset) { + debug("Sub %s already received message: %d, lastReceivedOffset: %d. Ignoring message.", this.id, offset, this.lastReceivedOffset) } - // Some messages could not be processed, so compact the queue - // and request another resend for the gap! + // Normal case where prevOffset == null || lastReceivedOffset == null || prevOffset === lastReceivedOffset else { - this.queue.splice(0, i) - this.trigger('gap', this.counter, this.queue[0].c-1) + this.lastReceivedOffset = offset + this.callback(content, this.streamId, timestamp, offset) + if (content[BYE_KEY]) { + this.trigger('done') + } } } -} - -Subscription.prototype.hasResendOptions = function() { - return this.options.resend_all===true || this.options.resend_from >= 0 || this.options.resend_from_time >= 0 || this.options.resend_last > 0 -} - -Subscription.prototype.isSubscribed = function() { - return this.subscribed -} - -/** - * StreamClient - **/ - -function StreamrClient(options) { - // Default options - this.options = { - // The server to connect to - server: "https://data.streamr.com", - // Automatically connect on first subscribe - autoConnect: true, - // Automatically disconnect on last unsubscribe - autoDisconnect: true, - // Allow client socket library to choose appropriate transport - transports: null - } - this.subsByStream = {} - this.subById = {} - this.io = io - this.socket = null - this.connected = false + Subscription.prototype.checkQueue = function() { + if (this.queue.length) { + debug("Attempting to process %d queued messages for stream %s", this.queue.length, this.streamId) - // Can give server URL as parameter instead of options object - if (typeof options === "string") - this.options.server = options - else - extend(this.options, options || {}) -} + var i + for (i=0;i= 0 || this.options.resend_from_time >= 0 || this.options.resend_last > 0 + } - if (!this.subsByStream[sub.streamId]) - this.subsByStream[sub.streamId] = [sub] - else this.subsByStream[sub.streamId].push(sub) -} + Subscription.prototype.isSubscribed = function() { + return this.subscribed + } -StreamrClient.prototype._removeSubscription = function(sub) { - delete this.subById[sub.id] + function StreamrClient(options) { + // Default options + this.options = { + // The server to connect to + server: "https://data.streamr.com", + // Automatically connect on first subscribe + autoConnect: true, + // Automatically disconnect on last unsubscribe + autoDisconnect: true, + // Allow client socket library to choose appropriate transport + transports: null + } + this.subsByStream = {} + this.subById = {} + + this.io = io + this.socket = null + this.connected = false + + // Can give server URL as parameter instead of options object + if (typeof options === "string") + this.options.server = options + else + extend(this.options, options || {}) + } - if (this.subsByStream[sub.streamId]) { - this.subsByStream[sub.streamId] = this.subsByStream[sub.streamId].filter(function(it) { - return it !== sub - }) + MicroEvent.mixin(StreamrClient) - if (this.subsByStream[sub.streamId].length === 0) - delete this.subsByStream[sub.streamId] - } -} - -StreamrClient.prototype.getSubscriptions = function(streamId) { - return this.subsByStream[streamId] || [] -} - -StreamrClient.prototype.subscribe = function(streamId, callback, options) { - var _this = this - - if (!streamId) - throw "subscribe: Invalid arguments: stream id is required!" - else if (typeof streamId !== 'string') - throw "subscribe: stream id must be a string!" - - if (!callback) - throw "subscribe: Invalid arguments: callback is required!" - - // Create the Subscription object and bind handlers - var sub = new Subscription(streamId, callback, options) - sub.bind('gap', function(from, to) { - _this._requestResend(sub, {resend_from: from, resend_to: to}) - }) - sub.bind('done', function() { - _this.unsubscribe(sub) - }) - - // Add to lookups - this._addSubscription(sub) - - // If connected, emit a subscribe request - if (this.connected) { - this._resendAndSubscribe(sub) - } else if (this.options.autoConnect) { - this.connect() + StreamrClient.prototype._addSubscription = function(sub) { + this.subById[sub.id] = sub + + if (!this.subsByStream[sub.streamId]) + this.subsByStream[sub.streamId] = [sub] + else this.subsByStream[sub.streamId].push(sub) } - return sub -} + StreamrClient.prototype._removeSubscription = function(sub) { + delete this.subById[sub.id] -StreamrClient.prototype.unsubscribe = function(sub) { - if (!sub || !sub.streamId) - throw "unsubscribe: please give a Subscription object as an argument!" + if (this.subsByStream[sub.streamId]) { + this.subsByStream[sub.streamId] = this.subsByStream[sub.streamId].filter(function(it) { + return it !== sub + }) - // If this is the last subscription for this stream, unsubscribe the client too - if (this.subsByStream[sub.streamId] !== undefined && this.subsByStream[sub.streamId].length === 1 && this.connected && !this.disconnecting && sub.isSubscribed() && !sub.unsubscribing) { - sub.unsubscribing = true - this._requestUnsubscribe(sub.streamId) + if (this.subsByStream[sub.streamId].length === 0) + delete this.subsByStream[sub.streamId] + } } - // Else the sub can be cleaned off immediately - else if (!sub.unsubscribing) { - this._removeSubscription(sub) - sub.trigger('unsubscribed') - this._checkAutoDisconnect() + + StreamrClient.prototype.getSubscriptions = function(streamId) { + return this.subsByStream[streamId] || [] } -} - -StreamrClient.prototype.unsubscribeAll = function(streamId) { - var _this = this - if (!streamId) - throw "unsubscribeAll: a stream id is required!" - else if (typeof streamId !== 'string') - throw "unsubscribe: stream id must be a string!" - - if (this.subsByStream[streamId]) { - // Copy the list to avoid concurrent modifications - var l = this.subsByStream[streamId].slice() - l.forEach(function(sub) { + + StreamrClient.prototype.subscribe = function(streamId, callback, options) { + var _this = this + + if (!streamId) + throw "subscribe: Invalid arguments: stream id is required!" + else if (typeof streamId !== 'string') + throw "subscribe: stream id must be a string!" + + if (!callback) + throw "subscribe: Invalid arguments: callback is required!" + + // Create the Subscription object and bind handlers + var sub = new Subscription(streamId, callback, options) + sub.bind('gap', function(from, to) { + _this._requestResend(sub, {resend_from: from, resend_to: to}) + }) + sub.bind('done', function() { + debug("done event for sub %d", sub.id) _this.unsubscribe(sub) }) - } -} - -StreamrClient.prototype.isConnected = function() { - return this.connected -} - -StreamrClient.prototype.reconnect = function() { - return this.connect(true) -} - -StreamrClient.prototype.connect = function(reconnect) { - var _this = this - - if (this.connected) { - debug("connect() called while already connected, doing nothing...") - return - } - else if (this.connecting) { - debug("connect() called while connecting, doing nothing...") - return - } - - debug("Connecting to %s", this.options.server) - this.connecting = true - this.disconnecting = false - - this.socket = this.io(this.options.server, { - forceNew: true, - transports: this.options.transports - }) - - this.socket.on('ui', function(data) { - if (typeof data == 'string' || data instanceof String) { - data = JSON.parse(data) - } - // Delete the internal fields from the msg - var streamId = data[STREAM_KEY] - delete data[STREAM_KEY] - var timestamp = data[TIMESTAMP_KEY] - delete data[TIMESTAMP_KEY] - var counter = data[COUNTER_KEY] - delete data[COUNTER_KEY] - var bye = data[BYE_KEY] - delete data[BYE_KEY] - var sub = data[SUB_KEY] - delete data[SUB_KEY] - - // If the message targets a specific subscription via _sub, only report the message to that - if (sub!==undefined) { - _this.subById[sub].handleMessage(data, streamId, timestamp, counter, bye) - } - else { - // Notify the Subscriptions for this stream. If this is not the message each individual Subscription - // is expecting, they will either ignore it or request resend via gap event. - var subs = _this.subsByStream[streamId] + // Add to lookups + this._addSubscription(sub) - if (subs) { - for (var i=0;i 0) { + debug('_requestSubscribe: another subscription for same stream: %s, insta-subscribing', sub.streamId) + + setTimeout(function() { + sub.trigger('subscribed') + }, 0) + } } -} - -StreamrClient.prototype._requestSubscribe = function(sub, from) { - var _this = this - var subs = _this.subsByStream[sub.streamId] - - var subscribedSubs = subs.filter(function(it) { - return it.isSubscribed() - }) - - // If this is the first subscription for this stream, send a subscription request to the server - if (!subs._subscribing && subscribedSubs.length === 0) { - var req = extend({}, sub.options, {channel: sub.streamId, from: from}) - debug("_requestSubscribe: subscribing client: %o", req) - subs._subscribing = true - _this.socket.emit('subscribe', req) + + StreamrClient.prototype._requestUnsubscribe = function(streamId) { + debug("Client unsubscribing stream %o", streamId) + this.socket.emit('unsubscribe', {channel: streamId}) } - // If there already is a subscribed subscription for this stream, this new one will just join it immediately - else if (subscribedSubs.length > 0) { - if (from === undefined) { - // Find the max received counter - var counters = subscribedSubs.map(function(it) { - return it.counter + + StreamrClient.prototype._requestResend = function(sub, resendOptions) { + // If overriding resendOptions are given, need to remove resend options in sub.options + var options = extend({}, sub.options) + if (resendOptions) { + Object.keys(options).forEach(function (key) { + if (key.match(/resend_.*/)) { + delete options[key] + } }) - from = Math.max.apply(Math, counters) } - debug('_requestSubscribe: another subscription for same stream: %s, insta-subscribing from %d', sub.streamId, from) + sub.resending = true - setTimeout(function() { - sub.trigger('subscribed', {from: from}) - }, 0) - } -} - -StreamrClient.prototype._requestUnsubscribe = function(streamId) { - debug("Client unsubscribing from %o", streamId) - this.socket.emit('unsubscribe', {channel: streamId}) -} - -StreamrClient.prototype._requestResend = function(sub, resendOptions) { - // If overriding resendOptions are given, need to remove resend options in sub.options - var options = extend({}, sub.options) - if (resendOptions) { - Object.keys(options).forEach(function (key) { - if (key.match(/resend_.*/)) { - delete options[key] - } - }) + var request = extend({}, options, resendOptions, {channel: sub.streamId, sub: sub.id}) + debug("_requestResend: %o", request) + this.socket.emit('resend', request) } - sub.resending = true - - var request = extend({}, options, resendOptions, {channel: sub.streamId, sub: sub.id}) - debug("_requestResend: %o", request) - this.socket.emit('resend', request) -} - -StreamrClient.prototype.handleError = function(msg) { - debug(msg) - this.trigger('error', msg) -} + StreamrClient.prototype.handleError = function(msg) { + debug(msg) + this.trigger('error', msg) + } -if (typeof module !== 'undefined' && module.exports) - module.exports = StreamrClient -else window.StreamrClient = StreamrClient + if (typeof module !== 'undefined' && module.exports) + module.exports = StreamrClient + else window.StreamrClient = StreamrClient })(typeof(exports) !== 'undefined' ? exports : window) \ No newline at end of file diff --git a/test/test.streamr-client.js b/test/test.streamr-client.js index c2cd1e79d..952042686 100644 --- a/test/test.streamr-client.js +++ b/test/test.streamr-client.js @@ -34,20 +34,41 @@ describe('StreamrClient', function() { asyncs = [] } - function msg(stream, counter, content, subId) { - var msg = {} - msg[STREAM_KEY] = stream - msg[COUNTER_KEY] = counter - - if (subId!==undefined) - msg[SUB_KEY] = subId - - if (content) - Object.keys(content).forEach(function(key) { - msg[key] = content[key] - }) - - return msg + var previousOffsetByStreamId = {} + + function msg(streamId, offset, content, subId) { + content = content || {} + + // unicast message to subscription + if (subId != null) { + var msg = [ + 28, // version + streamId, + Date.now(), // timestamp + offset, + undefined, // previousOffset + 27, // contentType (JSON) + content] + + return { + m: msg, + sub: subId + } + } + // broadcast message to all subscriptions + else { + var previousOffset = previousOffsetByStreamId[streamId] + previousOffsetByStreamId[streamId] = offset + + return [ + 28, // version + streamId, + Date.now(), // timestamp + offset, + previousOffset !== offset ? previousOffset : null, + 27, // contentType (JSON) + content] + } } function byeMsg(stream, counter) { @@ -113,6 +134,7 @@ describe('StreamrClient', function() { client = new StreamrClient() client.options.autoConnect = false client.options.autoDisconnect = false + previousOffsetByStreamId = {} }) after(function() { @@ -330,7 +352,7 @@ describe('StreamrClient', function() { }) }) - var subscription = client.subscribe("stream1", function(message) {}) + client.subscribe("stream1", function(message) {}) client.connect() client.bind('error', function(msg) { @@ -346,23 +368,6 @@ describe('StreamrClient', function() { client.subscribe("stream1", function(message) {}) }) - it('should set the expected counter to what the subscribed message says', function(done) { - var sub = client.subscribe("stream1", function(message) {}) - client.connect() - - client.socket.removeListener('subscribe', client.socket.defaultSubscribeHandler) - client.socket.on('subscribe', function(request) { - async(function() { - client.socket.emit('subscribed', {channel:'stream1', from:2}) - }) - }) - - client.socket.once('subscribed', function(response) { - assert.equal(sub.counter, 2) - done() - }) - }) - it('should send only one subscribe request to server even if there are multiple subscriptions for same stream', function(done) { var subscribeCount = 0 socket.on('subscribe', function(request) { @@ -389,34 +394,6 @@ describe('StreamrClient', function() { }) }) - it('should subscribe late subscriptions from the latest message', function(done) { - var sub1 = client.subscribe("stream1", function(sub1msg) { - if (sub1msg.n===0) { - async(function() { - var sub2 = client.subscribe('stream1', function(sub2msg) { - if (sub2msg.n===1) - done() - else throw "Unexpected message received by sub2: "+JSON.stringify(sub2msg) - }) - sub2.bind('subscribed', function(response) { - assert.equal(response.from, 1) - async(function() { - client.socket.emit('ui', msg('stream1', 1, {n:1})) - }) - }) - }) - } - }) - client.connect() - - sub1.bind('subscribed', function(response) { - async(function() { - client.socket.emit('ui', msg('stream1', 0, {n:0})) - }) - }) - - }) - }) describe("subscribe with resend options", function() { @@ -455,46 +432,15 @@ describe('StreamrClient', function() { client.socket.once('resend', function(request) { async(function() { - client.socket.emit('resending', {channel:'stream1', sub: request.sub, from:0 ,to:1}) - client.socket.emit('ui', msg('stream1', 0)) - client.socket.emit('ui', msg('stream1', 1)) - client.socket.emit('resent', {channel:'stream1', sub: request.sub, from:0, to:1}) + client.socket.emit('resending', {channel:'stream1', sub: request.sub}) + client.socket.emit('u', msg('stream1', 0, request.sub)) + client.socket.emit('resent', {channel:'stream1', sub: request.sub}) }) client.socket.once('subscribe', function(request) { - assert.equal(request.from, 2) - done() - }) - }) - }) - - it('should request resend after subscribe if the counter subscribed from is not the expected value', function(done) { - socket.removeListener('subscribe', socket.defaultSubscribeHandler) - - client.subscribe("stream1", function(message) {}, {resend_all:true}) - client.connect() - - client.socket.once('resend', function(request) { - async(function() { - client.socket.emit('resending', {channel:'stream1', sub: request.sub, from:0 ,to:1}) - client.socket.emit('ui', msg('stream1', 0)) - client.socket.emit('ui', msg('stream1', 1)) - client.socket.emit('resent', {channel:'stream1', sub: request.sub, from:0, to:1}) - }) - }) - - client.socket.once('subscribe', function(request) { - assert.equal(request.from, 2) - async(function() { - client.socket.emit('subscribed', {channel:'stream1', from: 10}) - }) - client.socket.once('resend', function(request) { - assert.equal(request.resend_from, 2) - assert.equal(request.resend_to, 9) done() }) }) - }) it('should subscribe to the channel after no_resend', function(done) { @@ -503,11 +449,10 @@ describe('StreamrClient', function() { client.socket.once('resend', function(request) { async(function() { - client.socket.emit('no_resend', {channel:'stream1', sub: request.sub, next:7}) + client.socket.emit('no_resend', {channel:'stream1', sub: request.sub}) }) client.socket.once('subscribe', function(request) { - assert.equal(request.from, 7) done() }) }) @@ -530,20 +475,20 @@ describe('StreamrClient', function() { client.connect() - client.socket.once('resend', function(request) { + client.socket.on('resend', function(request) { if (request.resend_all) { async(function() { - client.socket.emit('resending', {channel:'stream1', sub: request.sub, from:0 ,to:1}) - client.socket.emit('ui', msg('stream1', 0)) - client.socket.emit('ui', msg('stream1', 1)) - client.socket.emit('resent', {channel:'stream1', sub: request.sub, from:0, to:1}) + client.socket.emit('resending', {channel:'stream1', sub: request.sub}) + client.socket.emit('u', msg('stream1', 0, request.sub)) + client.socket.emit('u', msg('stream1', 1, request.sub)) + client.socket.emit('resent', {channel:'stream1', sub: request.sub}) }) } else if (request.resend_last===1) { async(function() { - client.socket.emit('resending', {channel:'stream1', sub: request.sub, from:1 ,to:1}) - client.socket.emit('ui', msg('stream1', 1)) - client.socket.emit('resent', {channel:'stream1', sub: request.sub, from:1, to:1}) + client.socket.emit('resending', {channel:'stream1', sub: request.sub}) + client.socket.emit('ui', msg('stream1', 1, request.sub)) + client.socket.emit('resent', {channel:'stream1', sub: request.sub}) }) } }) @@ -555,7 +500,6 @@ describe('StreamrClient', function() { if (subCount>1) throw "Only one subscribe should have been sent" - assert.equal(request.from, 2) done() }) }) @@ -566,7 +510,7 @@ describe('StreamrClient', function() { client.socket.once('resend', function(request) { async(function() { - client.socket.emit('resending', {channel:'stream1', sub: request.sub, from:0 ,to:1}) + client.socket.emit('resending', {channel:'stream1', sub: request.sub}) }) sub1.bind('subscribed', function(response) { @@ -583,7 +527,7 @@ describe('StreamrClient', function() { }) }) - it('should not subscribe or crash on resent if bye message is received', function(done) { + it('should not subscribe or crash on resent if bye message is received while resending', function(done) { var sub = client.subscribe("stream1", function(message) {}, {resend_all:true}) client.connect() @@ -592,9 +536,9 @@ describe('StreamrClient', function() { }) client.socket.once('resend', function(request) { async(function() { - client.socket.emit('resending', {channel:'stream1', from:0 ,to:0, sub:sub.id}) - client.socket.emit('ui', byeMsg('stream1', 0)) - client.socket.emit('resent', {channel:'stream1', from:0, to:0, sub:sub.id}) + client.socket.emit('resending', {channel:'stream1', sub:sub.id}) + client.socket.emit('b', byeMsg('stream1', 0)) + client.socket.emit('resent', {channel:'stream1', sub:sub.id}) done() }) }) @@ -610,10 +554,10 @@ describe('StreamrClient', function() { }) client.socket.once('resend', function(request) { async(function() { - client.socket.emit('resending', {channel:'stream1', from:0 ,to:1, sub:sub.id}) - client.socket.emit('ui', byeMsg('stream1', 0)) - client.socket.emit('ui', msg('stream1', 1, {})) - client.socket.emit('resent', {channel:'stream1', from:0, to:1, sub:sub.id}) + client.socket.emit('resending', {channel:'stream1', sub:sub.id}) + client.socket.emit('b', byeMsg('stream1', 0)) + client.socket.emit('u', msg('stream1', 1, sub.id)) + client.socket.emit('resent', {channel:'stream1', sub:sub.id}) done() }) }) @@ -622,13 +566,14 @@ describe('StreamrClient', function() { }) describe("message handling", function() { - it('should call the callback when a message is received with correct counter', function(done) { + + it('should call the callback when a message is received', function(done) { var sub = client.subscribe("stream1", function(message) { done() }) client.connect() client.socket.once('subscribed', function() { - client.socket.emit('ui', msg("stream1", 0)) + client.socket.emit('b', msg("stream1", 0)) }) }) @@ -643,9 +588,9 @@ describe('StreamrClient', function() { client.socket.once('subscribed', function() { // Fake messages - client.socket.emit('ui', msg("stream1",0)) - client.socket.emit('ui', msg("stream1",0)) - client.socket.emit('ui', msg("stream1",0)) + client.socket.emit('b', msg("stream1",0)) + client.socket.emit('b', msg("stream1",0)) + client.socket.emit('b', msg("stream1",0)) done() }) }) @@ -664,9 +609,9 @@ describe('StreamrClient', function() { client.connect() client.socket.once('subscribed', function() { - client.socket.emit('ui', msg("stream1", 0, {count:0})) - client.socket.emit('ui', msg("stream1", 1, {count:1})) - client.socket.emit('ui', msg("stream1", 2, {count:2})) + client.socket.emit('b', msg("stream1", 0, {count:0})) + client.socket.emit('b', msg("stream1", 1, {count:1})) + client.socket.emit('b', msg("stream1", 2, {count:2})) }) }) @@ -678,7 +623,7 @@ describe('StreamrClient', function() { client.connect() client.socket.once('subscribed', function() { - client.socket.emit('ui', byeMsg("stream1", 0)) + client.socket.emit('b', byeMsg("stream1", 0)) }) client.socket.once('unsubscribed', function(response) { @@ -701,9 +646,10 @@ describe('StreamrClient', function() { client.connect() sub2.bind('subscribed', function() { assert.throws(function() { - client.socket.emit('ui', msg('stream1', 0, {}))//, sub2.id)) + // Received by sub2 + client.socket.emit('b', msg('stream1', 0, {})) }) - client.socket.emit('ui', msg('stream1', 0, {}, sub2.id)) + client.socket.emit('u', msg('stream1', 0, {}, sub2.id)) }) }) @@ -716,7 +662,7 @@ describe('StreamrClient', function() { client.connect() client.socket.once('subscribed', function() { - client.socket.emit('ui', msg("stream1", 0, {count:0})) + client.socket.emit('b', msg("stream1", 0, {count:0})) }) }) @@ -847,7 +793,7 @@ describe('StreamrClient', function() { done() }) newSub.bind('subscribed', function() { - client.socket.emit('ui', msg("stream1", 0, {})) + client.socket.emit('b', msg("stream1", 0, {})) }) }) }) @@ -883,12 +829,10 @@ describe('StreamrClient', function() { async(function() { client.socket.emit('resending', { channel: request.channel, - sub: request.sub, - from: 0, - to: 0 + sub: request.sub }) - client.socket.emit('ui', byeMsg(request.channel,0)) - client.socket.emit('resent', {channel: request.channel, sub: request.sub, from:0, to:0}) + client.socket.emit('b', byeMsg(request.channel,0)) + client.socket.emit('resent', {channel: request.channel, sub: request.sub}) }) }) @@ -1074,14 +1018,12 @@ describe('StreamrClient', function() { function resend(channel, sub, from, to) { client.socket.emit('resending', { channel: channel, - sub: sub, - from: from, - to: to + sub: sub }) for (var i=from;i<=to;i++) { - client.socket.emit('ui', msg(channel,i)) + client.socket.emit('u', msg(channel, i, {}, sub)) } - client.socket.emit('resent', {channel: channel, sub: sub, from:from, to:to}) + client.socket.emit('resent', {channel: channel, sub: sub}) } socket.defaultResendHandler = function(request) { @@ -1097,7 +1039,7 @@ describe('StreamrClient', function() { resend(request.channel, request.sub, 99, 100) } else if (resendLimits[request.channel]===undefined) - client.socket.emit('no_resend', {channel: request.channel, sub: request.sub, next: 0}) + client.socket.emit('no_resend', {channel: request.channel, sub: request.sub}) else resend(request.channel, request.sub, resendLimits[request.channel].from, resendLimits[request.channel].to) }) @@ -1171,7 +1113,8 @@ describe('StreamrClient', function() { client.subscribe("stream1", function(message) {}, {resend_from_time: "invalid"}) }) }) - + + // TODO: jatka tästä it('should emit a resend request if the first message is not the expected one', function(done) { client.subscribe("stream1", function(message) {}) client.connect() @@ -1180,7 +1123,7 @@ describe('StreamrClient', function() { client.socket.once('subscribed', function() { assert(client.socket.defaultResendHandler!=null) - client.socket.emit('ui', msg("stream1",2)) + client.socket.emit('b', msg("stream1",2)) }) client.socket.once('resent', function() { @@ -1195,8 +1138,8 @@ describe('StreamrClient', function() { validResendRequests.push({channel:"stream1", resend_from:1, resend_to:9}) client.socket.once('subscribed', function() { - client.socket.emit('ui', msg("stream1",0)) - client.socket.emit('ui', msg("stream1",10)) + client.socket.emit('b', msg("stream1",0)) + client.socket.emit('b', msg("stream1",10)) }) client.socket.once('resent', function() { @@ -1211,8 +1154,8 @@ describe('StreamrClient', function() { validResendRequests.push({channel:"stream1", resend_from:1, resend_to:9}) client.socket.once('subscribed', function() { - client.socket.emit('ui', msg("stream1",0)) - client.socket.emit('ui', msg("stream1",10)) + client.socket.emit('b', msg("stream1",0)) + client.socket.emit('b', msg("stream1",10)) }) client.socket.once('resend', function(request) { @@ -1232,8 +1175,8 @@ describe('StreamrClient', function() { validResendRequests.push({channel:"stream1", resend_from:1, resend_to:1}) client.socket.once('subscribed', function() { - client.socket.emit('ui', msg("stream1",0)) - client.socket.emit('ui', msg("stream1",2)) + client.socket.emit('b', msg("stream1",0)) + client.socket.emit('b', msg("stream1",2)) }) client.socket.on('resend', function(request) { @@ -1253,9 +1196,9 @@ describe('StreamrClient', function() { validResendRequests.push({channel:"stream1", resend_from:1, resend_to:9}) client.socket.once('subscribed', function() { - client.socket.emit('ui', msg("stream1",0)) - client.socket.emit('ui', msg("stream1",10)) - client.socket.emit('ui', msg("stream1",11)) + client.socket.emit('b', msg("stream1",0)) + client.socket.emit('b', msg("stream1",10)) + client.socket.emit('b', msg("stream1",11)) }) var counter = 0 @@ -1279,10 +1222,10 @@ describe('StreamrClient', function() { validResendRequests.push({channel:"stream1", resend_from:1, resend_to:9}) client.socket.once('subscribed', function() { - client.socket.emit('ui', msg("stream1", 0, {counter: 0})) - client.socket.emit('ui', msg("stream1",10, {counter: 10})) - client.socket.emit('ui', msg("stream1",11, {counter: 11})) - client.socket.emit('ui', msg("stream1",12, {counter: 12})) + client.socket.emit('b', msg("stream1", 0, {counter: 0})) + client.socket.emit('b', msg("stream1",10, {counter: 10})) + client.socket.emit('b', msg("stream1",11, {counter: 11})) + client.socket.emit('b', msg("stream1",12, {counter: 12})) }) }) @@ -1296,12 +1239,12 @@ describe('StreamrClient', function() { validResendRequests.push({channel:"stream1", resend_from:1, resend_to:9}) client.socket.once('subscribed', function() { - client.socket.emit('ui', msg("stream1", 0, {counter: 0})) - client.socket.emit('ui', msg("stream1", 10, {counter: 10})) - client.socket.emit('ui', msg("stream1", 11, {counter: 11})) - client.socket.emit('ui', msg("stream1", 11, {counter: 11})) // bogus message - client.socket.emit('ui', msg("stream1", 5, {counter: 5})) // bogus message - client.socket.emit('ui', msg("stream1", 12, {counter: 12})) + client.socket.emit('b', msg("stream1", 0, {counter: 0})) + client.socket.emit('b', msg("stream1", 10, {counter: 10})) + client.socket.emit('b', msg("stream1", 11, {counter: 11})) + client.socket.emit('b', msg("stream1", 11, {counter: 11})) // bogus message + client.socket.emit('b', msg("stream1", 5, {counter: 5})) // bogus message + client.socket.emit('b', msg("stream1", 12, {counter: 12})) }) }) @@ -1316,9 +1259,9 @@ describe('StreamrClient', function() { validResendRequests.push({channel:"stream1", resend_from:11, resend_to:11}) client.socket.once('subscribed', function() { - client.socket.emit('ui', msg("stream1", 0, {counter: 0})) - client.socket.emit('ui', msg("stream1", 10, {counter: 10})) - client.socket.emit('ui', msg("stream1", 12, {counter: 12})) + client.socket.emit('b', msg("stream1", 0, {counter: 0})) + client.socket.emit('b', msg("stream1", 10, {counter: 10})) + client.socket.emit('b', msg("stream1", 12, {counter: 12})) }) }) @@ -1333,15 +1276,15 @@ describe('StreamrClient', function() { client.socket.on('subscribed', function(response) { if (response.channel==='stream1') { - client.socket.emit('ui', msg("stream1",0)) - client.socket.emit('ui', msg("stream1",1)) - client.socket.emit('ui', msg("stream1",2)) + client.socket.emit('b', msg("stream1",0)) + client.socket.emit('b', msg("stream1",1)) + client.socket.emit('b', msg("stream1",2)) } else if (response.channel==='stream2') { - client.socket.emit('ui', msg("stream2",0)) + client.socket.emit('b', msg("stream2",0)) } else if (response.channel==='stream3') { - client.socket.emit('ui', msg("stream3",0)) + client.socket.emit('b', msg("stream3",0)) } client.socket.emit('disconnect') }) From e74318eadddf15bb7330ed951c9a275859f6066b Mon Sep 17 00:00:00 2001 From: Henri Pihkala Date: Fri, 9 Sep 2016 15:59:12 +0300 Subject: [PATCH 2/7] tests passing --- streamr-client.js | 22 +++------- test/test.streamr-client.js | 88 ++++++++++++++++++------------------- 2 files changed, 50 insertions(+), 60 deletions(-) diff --git a/streamr-client.js b/streamr-client.js index 60157904c..4a40f6cb7 100644 --- a/streamr-client.js +++ b/streamr-client.js @@ -186,9 +186,9 @@ var content = getMessageField('content', msg) var timestamp = getMessageField('timestamp', msg) var offset = getMessageField('offset', msg) - var prevOffset = getMessageField('prevOffset', msg) + var previousOffset = getMessageField('previousOffset', msg) - if (prevOffset == null) { + if (previousOffset == null) { debug("handleMessage: prevOffset is null, gap detection is impossible! message: %o", msg) } @@ -196,12 +196,12 @@ debug("handleMessage: lastReceivedOffset %d", this.lastReceivedOffset) // Check for gaps - if (prevOffset != null && this.lastReceivedOffset != null && prevOffset > this.lastReceivedOffset) { + if (previousOffset != null && this.lastReceivedOffset != null && previousOffset > this.lastReceivedOffset) { this.queue.push(msg) if (!this.resending) { var from = this.lastReceivedOffset + 1 - var to = prevOffset + var to = previousOffset debug("Gap detected, requesting resend for stream %s from %d to %d", this.streamId, from, to) this.trigger('gap', from, to) } @@ -225,21 +225,11 @@ debug("Attempting to process %d queued messages for stream %s", this.queue.length, this.streamId) var i - for (i=0;i Date: Mon, 12 Sep 2016 00:36:43 +0300 Subject: [PATCH 3/7] improve resends and tests --- package.json | 5 +- streamr-client.js | 39 +++++++- test/test.streamr-client.js | 185 +++++++++++++++++++++++++++++------- 3 files changed, 187 insertions(+), 42 deletions(-) diff --git a/package.json b/package.json index 2cc2dd5d8..2430b9b1b 100644 --- a/package.json +++ b/package.json @@ -16,8 +16,9 @@ "author": "", "license": "", "dependencies": { - "socket.io-client": "1.3.7", - "debug": "*" + "debug": "*", + "sinon": "^1.17.5", + "socket.io-client": "1.3.7" }, "devDependencies": { "mocha": "*", diff --git a/streamr-client.js b/streamr-client.js index 4a40f6cb7..a2c44ebc3 100644 --- a/streamr-client.js +++ b/streamr-client.js @@ -192,11 +192,12 @@ debug("handleMessage: prevOffset is null, gap detection is impossible! message: %o", msg) } - debug("handleMessage: %o", msg) - debug("handleMessage: lastReceivedOffset %d", this.lastReceivedOffset) + // Gap check + if (previousOffset != null && // previousOffset is required to check for gaps + this.lastReceivedOffset != null && // and we need to know what msg was the previous one + previousOffset > this.lastReceivedOffset && // previous message had larger offset than our previous msg => gap! + !(this.options.resend_last != null && this.resending)) { // don't mind gaps when resending resend_last - // Check for gaps - if (previousOffset != null && this.lastReceivedOffset != null && previousOffset > this.lastReceivedOffset) { this.queue.push(msg) if (!this.resending) { @@ -237,6 +238,34 @@ return this.options.resend_all===true || this.options.resend_from >= 0 || this.options.resend_from_time >= 0 || this.options.resend_last > 0 } + /** + * Resend needs can change if messages have already been received. + * This function always returns the effective resend options: + * + * If messages have been received: + * - resend_all becomes resend_from + * - resend_from becomes resend_from the latest received message + * - resend_from_time becomes resend_from the latest received message + * - resend_last stays the same + */ + Subscription.prototype.getEffectiveResendOptions = function() { + if (this.hasReceivedMessages() && this.hasResendOptions()) { + if (this.options.resend_all || this.options.resend_from || this.options.resend_from_time) { + return { resend_from: this.lastReceivedOffset + 1 } + } + else if (this.options.resend_last) { + return this.options + } + } + else { + return this.options + } + } + + Subscription.prototype.hasReceivedMessages = function() { + return this.lastReceivedOffset != null + } + Subscription.prototype.isSubscribed = function() { return this.subscribed } @@ -585,7 +614,7 @@ StreamrClient.prototype._requestResend = function(sub, resendOptions) { // If overriding resendOptions are given, need to remove resend options in sub.options - var options = extend({}, sub.options) + var options = extend({}, sub.getEffectiveResendOptions()) if (resendOptions) { Object.keys(options).forEach(function (key) { if (key.match(/resend_.*/)) { diff --git a/test/test.streamr-client.js b/test/test.streamr-client.js index 4e860f396..dfe329d0f 100644 --- a/test/test.streamr-client.js +++ b/test/test.streamr-client.js @@ -1,6 +1,7 @@ var assert = require('assert'), events = require('eventemitter2'), - mockery = require('mockery') + mockery = require('mockery'), + sinon = require('sinon') var STREAM_KEY = "_S" var COUNTER_KEY = "_C" @@ -46,7 +47,7 @@ describe('StreamrClient', function() { streamId, Date.now(), // timestamp offset, - undefined, // previousOffset + forcePreviousOffset, // previousOffset 27, // contentType (JSON) content] @@ -1033,15 +1034,36 @@ describe('StreamrClient', function() { checkResendRequest(request) async(function() { - if (request.resend_from!=null && request.resend_to!=null) + console.log("Mock resend handler handling request: %o", request) + if (request.resend_all) { + if (resendLimits[request.channel]===undefined) { + client.socket.emit('no_resend', {channel: request.channel, sub: request.sub}) + } + else { + resend(request.channel, request.sub, resendLimits[request.channel].from, resendLimits[request.channel].to) + } + } + else if (request.resend_last) { + if (resendLimits[request.channel] === undefined) { + throw "Testing resend_last needs resendLimits.channel.to" + } + resend(request.channel, request.sub, resendLimits[request.channel].to - (request.resend_last - 1), resendLimits[request.channel].to) + } + else if (request.resend_from!=null && request.resend_to!=null) { resend(request.channel, request.sub, request.resend_from, request.resend_to) + } + else if (request.resend_from!=null) { + if (resendLimits[request.channel] === undefined) { + throw "Testing resend_from needs resendLimits.channel.to" + } + resend(request.channel, request.sub, request.resend_from, resendLimits[request.channel].to) + } else if (request.resend_from_time!=null) { resend(request.channel, request.sub, 99, 100) } - else if (resendLimits[request.channel]===undefined) - client.socket.emit('no_resend', {channel: request.channel, sub: request.sub}) - else - resend(request.channel, request.sub, resendLimits[request.channel].from, resendLimits[request.channel].to) + else { + throw "Unknown kind of resend request: "+JSON.stringify(request) + } }) } socket.on('resend', socket.defaultResendHandler) @@ -1265,46 +1287,139 @@ describe('StreamrClient', function() { client.socket.emit('b', msg("stream1", 12, {counter: 12}, undefined, 11)) }) }) - - it('should re-request from the latest counter on reconnect', function(done) { - var sub1 = client.subscribe("stream1", function(message) {}, {resend_all:true}) - var sub2 = client.subscribe("stream2", function(message) {}, {resend_from:0}) - var sub3 = client.subscribe("stream3", function(message) {}) // no resend for stream3 - client.connect() - validResendRequests.push({channel:"stream1", resend_all:true}) - validResendRequests.push({channel:"stream2", resend_from:0}) + describe('on reconnect', function() { + var msgHandler + beforeEach(function() { + msgHandler = sinon.spy() + }) - client.socket.on('subscribed', function(response) { - if (response.channel==='stream1') { - client.socket.emit('b', msg("stream1", 0)) - client.socket.emit('b', msg("stream1", 1, {}, undefined, 0)) - client.socket.emit('b', msg("stream1", 2, {}, undefined, 1)) - } - else if (response.channel==='stream2') { - client.socket.emit('b', msg("stream2", 0)) + it('no resend', function(done) { + client.subscribe("stream", msgHandler) + client.connect() + + client.socket.on('subscribed', function(response) { + client.socket.emit('b', msg("stream", 0)) + client.socket.emit('disconnect') + }) + + client.socket.once('disconnect', function() { + client.connect() + + socket.on('resend', function() { + throw "Should not have made a resend request!" + }) + + socket.on('subscribed', function() { + assert.equal(msgHandler.callCount, 1) + done() + }) + }) + }) + + it('resend_all', function(done) { + validResendRequests.push({channel:"stream", resend_all: true}) + resendLimits["stream"] = { + from: 0, + to: 5 } - else if (response.channel==='stream3') { - client.socket.emit('b', msg("stream3", 0)) - client.socket.emit('disconnect') + + client.subscribe("stream", msgHandler, { resend_all: true }) + client.connect() + + client.socket.on('subscribed', function(response) { + client.socket.emit('disconnect') + }) + + client.socket.once('disconnect', function() { + client.connect() + + socket.on('resend', function(request) { + assert.equal(request.resend_from, 6) + assert.equal(request.resend_to, undefined) + done() + }) + }) + }) + + it('resend_from', function(done) { + validResendRequests.push({channel:"stream", resend_from: 3}) + resendLimits["stream"] = { + from: 0, + to: 5 } + + client.subscribe("stream", msgHandler, { resend_from: 3 }) + client.connect() + + client.socket.on('subscribed', function(response) { + client.socket.emit('disconnect') + }) + + client.socket.once('disconnect', function() { + client.connect() + + socket.on('resend', function(request) { + assert.equal(request.resend_from, 6) + assert.equal(request.resend_to, undefined) + done() + }) + }) }) - client.socket.once('disconnect', function() { + it('resend_last', function(done) { + validResendRequests.push({channel:"stream", resend_last: 1}) + resendLimits["stream"] = { + from: 0, + to: 5 + } + + client.subscribe("stream", msgHandler, { resend_last: 1 }) client.connect() - socket.on('subscribe', function(request) { - if (request.channel==='stream1' && request.from !== 3) - throw "Wrong starting index for "+request.channel+": "+request.from - else if (request.channel==='stream2' && request.from !== 1) - throw "Wrong starting index for "+request.channel+": "+request.from - else if (request.channel==='stream3' && request.from !== undefined) - throw "Should not have specified the from field for stream3: "+request.from + client.socket.on('subscribed', function(response) { + client.socket.emit('disconnect') + }) + + client.socket.once('disconnect', function() { + client.connect() + + socket.on('resend', function(request) { + assert.equal(request.resend_last, 1) + done() + }) }) + }) + + it('resend_last should accept a gap on reconnect', function(done) { + validResendRequests.push({channel:"stream", resend_last: 1}) + resendLimits["stream"] = { + from: 0, + to: 0 + } + + client.subscribe("stream", msgHandler, { resend_last: 1 }) + client.connect() client.socket.on('subscribed', function(response) { - if (sub1.isSubscribed() && sub2.isSubscribed() && sub3.isSubscribed()) + socket.off('resend', socket.defaultResendHandler) + client.socket.emit('disconnect') + }) + + client.socket.once('disconnect', function() { + client.connect() + + socket.on('resend', function(request) { + assert.equal(request.resend_last, 1) + client.socket.emit('resending', { + channel: request.channel, + sub: request.sub + }) + client.socket.emit('u', msg(request.channel, 10, {}, request.sub, 9)) + client.socket.emit('resent', {channel: request.channel, sub: request.sub}) + assert.equal(msgHandler.callCount, 2) done() + }) }) }) }) From de82224e83dc3bca4abcb54866e3ab70e7c590aa Mon Sep 17 00:00:00 2001 From: Henri Pihkala Date: Wed, 28 Sep 2016 17:08:47 +0300 Subject: [PATCH 4/7] update protocol fields --- streamr-client.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/streamr-client.js b/streamr-client.js index a2c44ebc3..f3d17f82a 100644 --- a/streamr-client.js +++ b/streamr-client.js @@ -83,7 +83,7 @@ }; var versionFields = { - '28': ['version', 'streamId', 'timestamp', 'offset', 'previousOffset', 'contentType', 'content'] + '28': ['version', 'streamId', 'streamPartition', 'timestamp', 'ttl', 'offset', 'previousOffset', 'contentType', 'content'] } function getMessageField(field, msg) { From d849b013df4c22578c8c31adb95762daa3ed04c9 Mon Sep 17 00:00:00 2001 From: Henri Pihkala Date: Fri, 30 Sep 2016 18:07:27 +0300 Subject: [PATCH 5/7] support server path --- streamr-client.js | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/streamr-client.js b/streamr-client.js index f3d17f82a..e5ed1fb51 100644 --- a/streamr-client.js +++ b/streamr-client.js @@ -414,10 +414,8 @@ this.connecting = true this.disconnecting = false - this.socket = this.io(this.options.server, { - forceNew: true, - transports: this.options.transports - }) + var options = extend({}, this.options, {forceNew: true}) + this.socket = this.io(this.options.server, options) // Broadcast messages to all subs listening on stream this.socket.on('b', function(msg) { From 6ad5bb94a0244cf717646e6c389df3d92a397420 Mon Sep 17 00:00:00 2001 From: Henri Pihkala Date: Tue, 4 Oct 2016 15:55:19 +0300 Subject: [PATCH 6/7] default path --- streamr-client.js | 1 + 1 file changed, 1 insertion(+) diff --git a/streamr-client.js b/streamr-client.js index e5ed1fb51..d632cd24f 100644 --- a/streamr-client.js +++ b/streamr-client.js @@ -275,6 +275,7 @@ this.options = { // The server to connect to server: "https://data.streamr.com", + path: "/api/v1/socket.io", // Automatically connect on first subscribe autoConnect: true, // Automatically disconnect on last unsubscribe From 51f8cf6a470b4aa615523273e444f5ee6239b12d Mon Sep 17 00:00:00 2001 From: Henri Pihkala Date: Fri, 16 Dec 2016 22:02:26 +0200 Subject: [PATCH 7/7] fix tests, start using EventEmitter3 --- package.json | 6 +- streamr-client.js | 504 ++++++++++++++++++++++++++++-------- test/test.streamr-client.js | 145 ++++------- 3 files changed, 446 insertions(+), 209 deletions(-) diff --git a/package.json b/package.json index 2430b9b1b..2ef5636ad 100644 --- a/package.json +++ b/package.json @@ -17,12 +17,12 @@ "license": "", "dependencies": { "debug": "*", - "sinon": "^1.17.5", + "eventemitter3": "*", "socket.io-client": "1.3.7" }, "devDependencies": { "mocha": "*", - "eventemitter2": "*", - "mockery": "*" + "mockery": "*", + "sinon": "^1.17.5" } } diff --git a/streamr-client.js b/streamr-client.js index d632cd24f..9f17012ce 100644 --- a/streamr-client.js +++ b/streamr-client.js @@ -26,55 +26,318 @@ return arguments[0]; } - /** - * MicroEvent - to make any js object an event emitter (server or browser) - * - * - pure javascript - server compatible, browser compatible - * - dont rely on the browser doms - * - super simple - you get it immediatly, no mistery, no magic involved - * - * - create a MicroEventDebug with goodies to debug - * - make it safer to use - */ - - var MicroEvent = function(){}; - MicroEvent.prototype = { - bind : function(event, fct){ - this._events = this._events || {}; - this._events[event] = this._events[event] || []; - this._events[event].push(fct); - }, - unbind : function(event, fct){ - this._events = this._events || {}; - if( event in this._events === false ) return; - this._events[event].splice(this._events[event].indexOf(fct), 1); - }, - trigger : function(event /* , args... */){ - this._events = this._events || {}; - if( event in this._events === false ) return; - for(var i = 0; i < this._events[event].length; i++){ - this._events[event][i].apply(this, Array.prototype.slice.call(arguments, 1)); - } + + var EventEmitter = (function() { + 'use strict'; + + var has = Object.prototype.hasOwnProperty + , prefix = '~'; + + /** + * Constructor to create a storage for our `EE` objects. + * An `Events` instance is a plain object whose properties are event names. + * + * @constructor + * @api private + */ + function Events() {} + +// +// We try to not inherit from `Object.prototype`. In some engines creating an +// instance in this way is faster than calling `Object.create(null)` directly. +// If `Object.create(null)` is not supported we prefix the event names with a +// character to make sure that the built-in object properties are not +// overridden or used as an attack vector. +// + if (Object.create) { + Events.prototype = Object.create(null); + + // + // This hack is needed because the `__proto__` property is still inherited in + // some old browsers like Android 4, iPhone 5.1, Opera 11 and Safari 5. + // + if (!new Events().__proto__) prefix = false; } - }; - /** - * mixin will delegate all MicroEvent.js function in the destination object - * - * - require('MicroEvent').mixin(Foobar) will make Foobar able to use MicroEvent - * - * @param {Object} the object which will support MicroEvent - */ - MicroEvent.mixin = function(destObject) { - var props = ['bind', 'unbind', 'trigger']; - for(var i = 0; i < props.length; i ++){ - if( typeof destObject === 'function' ){ - destObject.prototype[props[i]] = MicroEvent.prototype[props[i]]; - }else{ - destObject[props[i]] = MicroEvent.prototype[props[i]]; - } + /** + * Representation of a single event listener. + * + * @param {Function} fn The listener function. + * @param {Mixed} context The context to invoke the listener with. + * @param {Boolean} [once=false] Specify if the listener is a one-time listener. + * @constructor + * @api private + */ + function EE(fn, context, once) { + this.fn = fn; + this.context = context; + this.once = once || false; } - } + + /** + * Minimal `EventEmitter` interface that is molded against the Node.js + * `EventEmitter` interface. + * + * @constructor + * @api public + */ + function EventEmitter() { + this._events = new Events(); + this._eventsCount = 0; + } + + /** + * Return an array listing the events for which the emitter has registered + * listeners. + * + * @returns {Array} + * @api public + */ + EventEmitter.prototype.eventNames = function eventNames() { + var names = [] + , events + , name; + + if (this._eventsCount === 0) return names; + + for (name in (events = this._events)) { + if (has.call(events, name)) names.push(prefix ? name.slice(1) : name); + } + + if (Object.getOwnPropertySymbols) { + return names.concat(Object.getOwnPropertySymbols(events)); + } + + return names; + }; + + /** + * Return the listeners registered for a given event. + * + * @param {String|Symbol} event The event name. + * @param {Boolean} exists Only check if there are listeners. + * @returns {Array|Boolean} + * @api public + */ + EventEmitter.prototype.listeners = function listeners(event, exists) { + var evt = prefix ? prefix + event : event + , available = this._events[evt]; + + if (exists) return !!available; + if (!available) return []; + if (available.fn) return [available.fn]; + + for (var i = 0, l = available.length, ee = new Array(l); i < l; i++) { + ee[i] = available[i].fn; + } + + return ee; + }; + + /** + * Calls each of the listeners registered for a given event. + * + * @param {String|Symbol} event The event name. + * @returns {Boolean} `true` if the event had listeners, else `false`. + * @api public + */ + EventEmitter.prototype.emit = function emit(event, a1, a2, a3, a4, a5) { + var evt = prefix ? prefix + event : event; + + if (!this._events[evt]) return false; + + var listeners = this._events[evt] + , len = arguments.length + , args + , i; + + if (listeners.fn) { + if (listeners.once) this.removeListener(event, listeners.fn, undefined, true); + + switch (len) { + case 1: return listeners.fn.call(listeners.context), true; + case 2: return listeners.fn.call(listeners.context, a1), true; + case 3: return listeners.fn.call(listeners.context, a1, a2), true; + case 4: return listeners.fn.call(listeners.context, a1, a2, a3), true; + case 5: return listeners.fn.call(listeners.context, a1, a2, a3, a4), true; + case 6: return listeners.fn.call(listeners.context, a1, a2, a3, a4, a5), true; + } + + for (i = 1, args = new Array(len -1); i < len; i++) { + args[i - 1] = arguments[i]; + } + + listeners.fn.apply(listeners.context, args); + } else { + var length = listeners.length + , j; + + for (i = 0; i < length; i++) { + if (listeners[i].once) this.removeListener(event, listeners[i].fn, undefined, true); + + switch (len) { + case 1: listeners[i].fn.call(listeners[i].context); break; + case 2: listeners[i].fn.call(listeners[i].context, a1); break; + case 3: listeners[i].fn.call(listeners[i].context, a1, a2); break; + case 4: listeners[i].fn.call(listeners[i].context, a1, a2, a3); break; + default: + if (!args) for (j = 1, args = new Array(len -1); j < len; j++) { + args[j - 1] = arguments[j]; + } + + listeners[i].fn.apply(listeners[i].context, args); + } + } + } + + return true; + }; + + /** + * Add a listener for a given event. + * + * @param {String|Symbol} event The event name. + * @param {Function} fn The listener function. + * @param {Mixed} [context=this] The context to invoke the listener with. + * @returns {EventEmitter} `this`. + * @api public + */ + EventEmitter.prototype.on = function on(event, fn, context) { + var listener = new EE(fn, context || this) + , evt = prefix ? prefix + event : event; + + if (!this._events[evt]) this._events[evt] = listener, this._eventsCount++; + else if (!this._events[evt].fn) this._events[evt].push(listener); + else this._events[evt] = [this._events[evt], listener]; + + return this; + }; + + /** + * Add a one-time listener for a given event. + * + * @param {String|Symbol} event The event name. + * @param {Function} fn The listener function. + * @param {Mixed} [context=this] The context to invoke the listener with. + * @returns {EventEmitter} `this`. + * @api public + */ + EventEmitter.prototype.once = function once(event, fn, context) { + var listener = new EE(fn, context || this, true) + , evt = prefix ? prefix + event : event; + + if (!this._events[evt]) this._events[evt] = listener, this._eventsCount++; + else if (!this._events[evt].fn) this._events[evt].push(listener); + else this._events[evt] = [this._events[evt], listener]; + + return this; + }; + + /** + * Remove the listeners of a given event. + * + * @param {String|Symbol} event The event name. + * @param {Function} fn Only remove the listeners that match this function. + * @param {Mixed} context Only remove the listeners that have this context. + * @param {Boolean} once Only remove one-time listeners. + * @returns {EventEmitter} `this`. + * @api public + */ + EventEmitter.prototype.removeListener = function removeListener(event, fn, context, once) { + var evt = prefix ? prefix + event : event; + + if (!this._events[evt]) return this; + if (!fn) { + if (--this._eventsCount === 0) this._events = new Events(); + else delete this._events[evt]; + return this; + } + + var listeners = this._events[evt]; + + if (listeners.fn) { + if ( + listeners.fn === fn + && (!once || listeners.once) + && (!context || listeners.context === context) + ) { + if (--this._eventsCount === 0) this._events = new Events(); + else delete this._events[evt]; + } + } else { + for (var i = 0, events = [], length = listeners.length; i < length; i++) { + if ( + listeners[i].fn !== fn + || (once && !listeners[i].once) + || (context && listeners[i].context !== context) + ) { + events.push(listeners[i]); + } + } + + // + // Reset the array, or remove it completely if we have no more listeners. + // + if (events.length) this._events[evt] = events.length === 1 ? events[0] : events; + else if (--this._eventsCount === 0) this._events = new Events(); + else delete this._events[evt]; + } + + return this; + }; + + /** + * Remove all listeners, or those of the specified event. + * + * @param {String|Symbol} [event] The event name. + * @returns {EventEmitter} `this`. + * @api public + */ + EventEmitter.prototype.removeAllListeners = function removeAllListeners(event) { + var evt; + + if (event) { + evt = prefix ? prefix + event : event; + if (this._events[evt]) { + if (--this._eventsCount === 0) this._events = new Events(); + else delete this._events[evt]; + } + } else { + this._events = new Events(); + this._eventsCount = 0; + } + + return this; + }; + +// +// Alias methods names because people roll like that. +// + EventEmitter.prototype.off = EventEmitter.prototype.removeListener; + EventEmitter.prototype.addListener = EventEmitter.prototype.on; + +// +// This function doesn't apply anymore. +// + EventEmitter.prototype.setMaxListeners = function setMaxListeners() { + return this; + }; + +// +// Expose the prefix. +// + EventEmitter.prefixed = prefix; + +// +// Allow `EventEmitter` to be imported as module namespace. +// + EventEmitter.EventEmitter = EventEmitter; + +// +// Expose the module. +// + return EventEmitter + }()) var subId = 0 function generateSubscriptionId() { @@ -101,6 +364,8 @@ } function Subscription(streamId, callback, options) { + EventEmitter.call(this); // call parent constructor + if (!streamId) throw "No stream id given!" if (!callback) @@ -113,6 +378,7 @@ this.callback = callback this.options = options || {} this.queue = [] + this.subscribing = false this.subscribed = false this.lastReceivedOffset = null @@ -140,49 +406,55 @@ /*** Message handlers ***/ - this.bind('subscribed', function(response) { + this.on('subscribed', function(response) { debug("Sub %s subscribed to stream: %s", _this.id, _this.streamId) _this.subscribed = true + _this.subscribing = false }) - this.bind('unsubscribed', function() { + this.on('unsubscribed', function() { debug("Sub %s unsubscribed: %s", _this.id, _this.streamId) _this.subscribed = false + _this.subscribing = false _this.unsubscribing = false _this.resending = false }) - this.bind('resending', function(response) { + this.on('resending', function(response) { debug("Sub %s resending: %o", _this.id, response) // _this.resending = true was set elsewhere before making the request }) - this.bind('no_resend', function(response) { + this.on('no_resend', function(response) { debug("Sub %s no_resend: %o", _this.id, response) _this.resending = false _this.checkQueue() }) - this.bind('resent', function(response) { + this.on('resent', function(response) { debug("Sub %s resent: %o", _this.id, response) _this.resending = false _this.checkQueue() }) - this.bind('connected', function() { + this.on('connected', function() { }) - this.bind('disconnected', function() { + this.on('disconnected', function() { _this.subscribed = false + _this.subscribing = false _this.resending = false }) } - MicroEvent.mixin(Subscription) + // Subscription extends EventEmitter + Object.keys(EventEmitter.prototype).forEach(function(it) { + Subscription.prototype[it] = EventEmitter.prototype[it] + }) - Subscription.prototype.handleMessage = function(msg) { + Subscription.prototype.handleMessage = function(msg, isResend) { var content = getMessageField('content', msg) var timestamp = getMessageField('timestamp', msg) var offset = getMessageField('offset', msg) @@ -192,31 +464,35 @@ debug("handleMessage: prevOffset is null, gap detection is impossible! message: %o", msg) } - // Gap check - if (previousOffset != null && // previousOffset is required to check for gaps - this.lastReceivedOffset != null && // and we need to know what msg was the previous one - previousOffset > this.lastReceivedOffset && // previous message had larger offset than our previous msg => gap! - !(this.options.resend_last != null && this.resending)) { // don't mind gaps when resending resend_last - + // If resending, queue broadcasted messages + if (this.resending && !isResend) { this.queue.push(msg) + } else { + // Gap check + if (previousOffset != null && // previousOffset is required to check for gaps + this.lastReceivedOffset != null && // and we need to know what msg was the previous one + previousOffset > this.lastReceivedOffset && // previous message had larger offset than our previous msg => gap! + !this.resending) { + + // Queue the message to be processed after resend + this.queue.push(msg) - if (!this.resending) { var from = this.lastReceivedOffset + 1 var to = previousOffset debug("Gap detected, requesting resend for stream %s from %d to %d", this.streamId, from, to) - this.trigger('gap', from, to) + this.emit('gap', from, to) } - } - // Prevent double-processing of messages for any reason - else if (this.lastReceivedOffset != null && offset <= this.lastReceivedOffset) { - debug("Sub %s already received message: %d, lastReceivedOffset: %d. Ignoring message.", this.id, offset, this.lastReceivedOffset) - } - // Normal case where prevOffset == null || lastReceivedOffset == null || prevOffset === lastReceivedOffset - else { - this.lastReceivedOffset = offset - this.callback(content, this.streamId, timestamp, offset) - if (content[BYE_KEY]) { - this.trigger('done') + // Prevent double-processing of messages for any reason + else if (this.lastReceivedOffset != null && offset <= this.lastReceivedOffset) { + debug("Sub %s already received message: %d, lastReceivedOffset: %d. Ignoring message.", this.id, offset, this.lastReceivedOffset) + } + // Normal case where prevOffset == null || lastReceivedOffset == null || prevOffset === lastReceivedOffset + else { + this.lastReceivedOffset = offset + this.callback(content, this.streamId, timestamp, offset) + if (content[BYE_KEY]) { + this.emit('done') + } } } } @@ -229,7 +505,7 @@ var length = this.queue.length for (i=0; i= 0)) done() }) @@ -387,10 +399,10 @@ describe('StreamrClient', function() { done() } - sub1.bind('subscribed', function(response) { + sub1.on('subscribed', function(response) { check(sub1) }) - sub2.bind('subscribed', function(response) { + sub2.on('subscribed', function(response) { check(sub2) }) }) @@ -399,15 +411,19 @@ describe('StreamrClient', function() { describe("subscribe with resend options", function() { - it('should emit a resend request', function(done) { - client.subscribe("stream1", function(message) {}, {resend_all:true}) + it('should emit a resend request after subscribed', function(done) { + var sub = client.subscribe("stream1", function(message) {}, {resend_all:true}) client.connect() client.socket.once('resend', function(request) { - if (request.resend_all) - done() + if (request.resend_all) { + if (sub.isSubscribed()) { + done() + } else throw "Resend request sent, but sub was not subscribed!" + } else throw "Unexpected resend request: "+JSON.stringify(request) }) + }) it('should emit a resend request with given other options', function(done) { @@ -427,38 +443,6 @@ describe('StreamrClient', function() { }) }) - it('should subscribe to the channel after resent', function(done) { - client.subscribe("stream1", function(message) {}, {resend_all:true}) - client.connect() - - client.socket.once('resend', function(request) { - async(function() { - client.socket.emit('resending', {channel:'stream1', sub: request.sub}) - client.socket.emit('u', msg('stream1', 0, request.sub)) - client.socket.emit('resent', {channel:'stream1', sub: request.sub}) - }) - - client.socket.once('subscribe', function(request) { - done() - }) - }) - }) - - it('should subscribe to the channel after no_resend', function(done) { - client.subscribe("stream1", function(message) {}, {resend_all:true}) - client.connect() - - client.socket.once('resend', function(request) { - async(function() { - client.socket.emit('no_resend', {channel:'stream1', sub: request.sub}) - }) - - client.socket.once('subscribe', function(request) { - done() - }) - }) - }) - it('should resend to multiple subscriptions as per each resend option', function(done) { var sub1count = 0 var sub1 = client.subscribe("stream1", function(message) { @@ -505,36 +489,10 @@ describe('StreamrClient', function() { }) }) - it('should not report subscribed to resending channels', function(done) { - var sub1 = client.subscribe("stream1", function(message) {}, {resend_all:true}) - client.connect() - - client.socket.once('resend', function(request) { - async(function() { - client.socket.emit('resending', {channel:'stream1', sub: request.sub}) - }) - - sub1.bind('subscribed', function(response) { - throw "Should not have reported a resending subscription as subscribed!" - }) - - async(function() { - var sub2 = client.subscribe("stream1", function(message) {}) // no resend - sub2.bind('subscribed', function() { - done() - }) - }) - - }) - }) - - it('should not subscribe or crash on resent if bye message is received while resending', function(done) { + it('should not crash on resent if bye message is received while resending', function(done) { var sub = client.subscribe("stream1", function(message) {}, {resend_all:true}) client.connect() - client.socket.on('subscribe', function(request) { - throw "Should not have subscribed" - }) client.socket.once('resend', function(request) { async(function() { client.socket.emit('resending', {channel:'stream1', sub:sub.id}) @@ -550,9 +508,6 @@ describe('StreamrClient', function() { var sub = client.subscribe("stream1", function(message) {}, {resend_all:true}) client.connect() - client.socket.on('subscribe', function(request) { - throw "Should not have subscribed" - }) client.socket.once('resend', function(request) { async(function() { client.socket.emit('resending', {channel:'stream1', sub:sub.id}) @@ -645,7 +600,7 @@ describe('StreamrClient', function() { sub2.counter = 0 client.connect() - sub2.bind('subscribed', function() { + sub2.on('subscribed', function() { assert.throws(function() { // Received by sub2 client.socket.emit('b', msg('stream1', 0, {})) @@ -673,10 +628,10 @@ describe('StreamrClient', function() { it('should fire the unsubscribed event', function(done) { var sub = client.subscribe("stream1", function(message) {}) client.connect() - sub.bind('subscribed', function() { + sub.on('subscribed', function() { client.unsubscribe(sub) }) - sub.bind('unsubscribed', function() { + sub.on('unsubscribed', function() { done() }) }) @@ -718,11 +673,11 @@ describe('StreamrClient', function() { var sub2 = client.subscribe("stream1", function(message) {}) client.connect() - sub2.bind('subscribed', function() { + sub2.on('subscribed', function() { client.unsubscribe(sub2) }) - sub2.bind('unsubscribed', function() { + sub2.on('unsubscribed', function() { done() }) @@ -730,7 +685,7 @@ describe('StreamrClient', function() { throw "Socket should not have unsubscribed" }) - sub1.bind('unsubscribed', function() { + sub1.on('unsubscribed', function() { throw "sub1 should not have unsubscribed" }) }) @@ -761,7 +716,7 @@ describe('StreamrClient', function() { var sub = client.subscribe('stream1', function(message) {}) client.connect() - sub.bind('subscribed', function() { + sub.on('subscribed', function() { assert.throws(function() { client.unsubscribe() }) @@ -772,7 +727,7 @@ describe('StreamrClient', function() { var sub = client.subscribe("stream1", function(message) {}) client.connect() - sub.bind('subscribed', function() { + sub.on('subscribed', function() { assert.throws(function() { client.unsubscribe('stream1') }) @@ -785,15 +740,15 @@ describe('StreamrClient', function() { }) client.connect() - sub.bind('subscribed', function() { + sub.on('subscribed', function() { client.unsubscribe(sub) }) - sub.bind('unsubscribed', function() { + sub.on('unsubscribed', function() { var newSub = client.subscribe("stream1", function(message) { done() }) - newSub.bind('subscribed', function() { + newSub.on('subscribed', function() { client.socket.emit('b', msg("stream1", 0, {})) }) }) @@ -1028,13 +983,13 @@ describe('StreamrClient', function() { } socket.defaultResendHandler = function(request) { - console.log("defaultResendHandler: "+JSON.stringify(request)) + mockDebug("defaultResendHandler: "+JSON.stringify(request)) // Check that the request is allowed checkResendRequest(request) async(function() { - console.log("Mock resend handler handling request: %o", request) + mockDebug("handling resend request: %o", request) if (request.resend_all) { if (resendLimits[request.channel]===undefined) { client.socket.emit('no_resend', {channel: request.channel, sub: request.sub}) @@ -1213,10 +1168,10 @@ describe('StreamrClient', function() { }) it('should not emit another resend request while waiting for resend', function(done) { + validResendRequests.push({channel:"stream1", resend_from:1, resend_to:9}) + client.subscribe("stream1", function(message) {}) client.connect() - - validResendRequests.push({channel:"stream1", resend_from:1, resend_to:9}) client.socket.once('subscribed', function() { client.socket.emit('b', msg("stream1", 0)) @@ -1436,8 +1391,8 @@ describe('StreamrClient', function() { if (++subscribeCount === 2) done() } - sub1.bind('subscribed', check) - sub2.bind('subscribed', check) + sub1.on('subscribed', check) + sub2.on('subscribed', check) client.connect() }) @@ -1451,8 +1406,8 @@ describe('StreamrClient', function() { var sub1 = client.subscribe("stream1", function(message) {}) var sub2 = client.subscribe("stream2", function(message) {}) - sub1.bind('unsubscribed', check) - sub2.bind('unsubscribed', check) + sub1.on('unsubscribed', check) + sub2.on('unsubscribed', check) client.connect() @@ -1467,14 +1422,14 @@ describe('StreamrClient', function() { describe("client events", function() { it('should trigger a connected event on connect', function(done) { - client.bind('connected', function() { + client.on('connected', function() { done() }) client.connect() }) it('should trigger a disconnected event on disconnect', function(done) { - client.bind('disconnected', function() { + client.on('disconnected', function() { done() }) client.connect()