diff --git a/package.json b/package.json index 2cc2dd5d8..2ef5636ad 100644 --- a/package.json +++ b/package.json @@ -16,12 +16,13 @@ "author": "", "license": "", "dependencies": { - "socket.io-client": "1.3.7", - "debug": "*" + "debug": "*", + "eventemitter3": "*", + "socket.io-client": "1.3.7" }, "devDependencies": { "mocha": "*", - "eventemitter2": "*", - "mockery": "*" + "mockery": "*", + "sinon": "^1.17.5" } } diff --git a/streamr-client.js b/streamr-client.js index 569a0f88a..9f17012ce 100644 --- a/streamr-client.js +++ b/streamr-client.js @@ -2,658 +2,922 @@ (function() { -var io -var debug -if (typeof window !== 'undefined') { - io = window.io - debug = (window.debug ? window.debug('StreamrClient') : function() { - if (window.consoleLoggingEnabled) - console.log.apply(console, arguments) - }) -} -else { - io = require('socket.io-client') - debug = require('debug')('StreamrClient') -} - -var STREAM_KEY = "_S" -var COUNTER_KEY = "_C" -var TIMESTAMP_KEY = "_T" -var BYE_KEY = "_bye" -var SUB_KEY = "_sub" - -function extend() { - for(var i=1; i1) - throw "Multiple resend options active! Please use only one: "+JSON.stringify(options) - - // Automatically convert Date objects to numbers for resend_from_time - if (this.options.resend_from_time != null - && typeof this.options.resend_from_time !== 'number') { - - if (typeof this.options.resend_from_time.getTime === 'function') - this.options.resend_from_time = this.options.resend_from_time.getTime() - else throw "resend_from_time option must be a Date object or a number representing time!" + else { + io = require('socket.io-client') + debug = require('debug')('StreamrClient') } - /*** Message handlers ***/ + var BYE_KEY = "_bye" - this.bind('subscribed', function(response) { - debug("Sub %s subscribed: %s from %d", _this.id, _this.streamId, response.from) + function extend() { + for(var i=1; i _this.counter) { - this.trigger('gap', this.counter, response.from-1) + var EventEmitter = (function() { + 'use strict'; + + var has = Object.prototype.hasOwnProperty + , prefix = '~'; + + /** + * Constructor to create a storage for our `EE` objects. + * An `Events` instance is a plain object whose properties are event names. + * + * @constructor + * @api private + */ + function Events() {} + +// +// We try to not inherit from `Object.prototype`. In some engines creating an +// instance in this way is faster than calling `Object.create(null)` directly. +// If `Object.create(null)` is not supported we prefix the event names with a +// character to make sure that the built-in object properties are not +// overridden or used as an attack vector. +// + if (Object.create) { + Events.prototype = Object.create(null); + + // + // This hack is needed because the `__proto__` property is still inherited in + // some old browsers like Android 4, iPhone 5.1, Opera 11 and Safari 5. + // + if (!new Events().__proto__) prefix = false; } - // This situation should never occur, it is safe to ignore but will result in duplicate messages being sent by the server - else if (response.from != null && response.from < _this.counter) { - debug("Subscribed from a counter less than what was expected! Expected: %d, subscribed from: %d", _this.counter, response.from) + + /** + * Representation of a single event listener. + * + * @param {Function} fn The listener function. + * @param {Mixed} context The context to invoke the listener with. + * @param {Boolean} [once=false] Specify if the listener is a one-time listener. + * @constructor + * @api private + */ + function EE(fn, context, once) { + this.fn = fn; + this.context = context; + this.once = once || false; } - }) + /** + * Minimal `EventEmitter` interface that is molded against the Node.js + * `EventEmitter` interface. + * + * @constructor + * @api public + */ + function EventEmitter() { + this._events = new Events(); + this._eventsCount = 0; + } - this.bind('unsubscribed', function() { - debug("Sub %s unsubscribed: %s", _this.id, _this.streamId) - _this.subscribed = false - _this.unsubscribing = false - _this.resending = false - }) + /** + * Return an array listing the events for which the emitter has registered + * listeners. + * + * @returns {Array} + * @api public + */ + EventEmitter.prototype.eventNames = function eventNames() { + var names = [] + , events + , name; + + if (this._eventsCount === 0) return names; + + for (name in (events = this._events)) { + if (has.call(events, name)) names.push(prefix ? name.slice(1) : name); + } - this.bind('resending', function(response) { - debug("Sub %s resending: %s next message set to %d, was: %o", _this.id, response.channel, response.from, _this.counter) - _this.counter = response.from - }) + if (Object.getOwnPropertySymbols) { + return names.concat(Object.getOwnPropertySymbols(events)); + } - this.bind('no_resend', function(response) { - debug("Sub %s no_resend: %s next message set to %d, was: %o", _this.id, response.channel, response.next, _this.counter) + return names; + }; + + /** + * Return the listeners registered for a given event. + * + * @param {String|Symbol} event The event name. + * @param {Boolean} exists Only check if there are listeners. + * @returns {Array|Boolean} + * @api public + */ + EventEmitter.prototype.listeners = function listeners(event, exists) { + var evt = prefix ? prefix + event : event + , available = this._events[evt]; + + if (exists) return !!available; + if (!available) return []; + if (available.fn) return [available.fn]; + + for (var i = 0, l = available.length, ee = new Array(l); i < l; i++) { + ee[i] = available[i].fn; + } - _this.counter = response.next - _this.resending = false - _this.checkQueue() - }) + return ee; + }; + + /** + * Calls each of the listeners registered for a given event. + * + * @param {String|Symbol} event The event name. + * @returns {Boolean} `true` if the event had listeners, else `false`. + * @api public + */ + EventEmitter.prototype.emit = function emit(event, a1, a2, a3, a4, a5) { + var evt = prefix ? prefix + event : event; + + if (!this._events[evt]) return false; + + var listeners = this._events[evt] + , len = arguments.length + , args + , i; + + if (listeners.fn) { + if (listeners.once) this.removeListener(event, listeners.fn, undefined, true); + + switch (len) { + case 1: return listeners.fn.call(listeners.context), true; + case 2: return listeners.fn.call(listeners.context, a1), true; + case 3: return listeners.fn.call(listeners.context, a1, a2), true; + case 4: return listeners.fn.call(listeners.context, a1, a2, a3), true; + case 5: return listeners.fn.call(listeners.context, a1, a2, a3, a4), true; + case 6: return listeners.fn.call(listeners.context, a1, a2, a3, a4, a5), true; + } - this.bind('resent', function(response) { - debug("Sub %s resent: %s from %d to %d", _this.id, response.channel, response.from, response.to) - - _this.resending = false - _this.checkQueue() - }) + for (i = 1, args = new Array(len -1); i < len; i++) { + args[i - 1] = arguments[i]; + } - this.bind('connected', function() { + listeners.fn.apply(listeners.context, args); + } else { + var length = listeners.length + , j; + + for (i = 0; i < length; i++) { + if (listeners[i].once) this.removeListener(event, listeners[i].fn, undefined, true); + + switch (len) { + case 1: listeners[i].fn.call(listeners[i].context); break; + case 2: listeners[i].fn.call(listeners[i].context, a1); break; + case 3: listeners[i].fn.call(listeners[i].context, a1, a2); break; + case 4: listeners[i].fn.call(listeners[i].context, a1, a2, a3); break; + default: + if (!args) for (j = 1, args = new Array(len -1); j < len; j++) { + args[j - 1] = arguments[j]; + } + + listeners[i].fn.apply(listeners[i].context, args); + } + } + } - }) + return true; + }; + + /** + * Add a listener for a given event. + * + * @param {String|Symbol} event The event name. + * @param {Function} fn The listener function. + * @param {Mixed} [context=this] The context to invoke the listener with. + * @returns {EventEmitter} `this`. + * @api public + */ + EventEmitter.prototype.on = function on(event, fn, context) { + var listener = new EE(fn, context || this) + , evt = prefix ? prefix + event : event; + + if (!this._events[evt]) this._events[evt] = listener, this._eventsCount++; + else if (!this._events[evt].fn) this._events[evt].push(listener); + else this._events[evt] = [this._events[evt], listener]; + + return this; + }; + + /** + * Add a one-time listener for a given event. + * + * @param {String|Symbol} event The event name. + * @param {Function} fn The listener function. + * @param {Mixed} [context=this] The context to invoke the listener with. + * @returns {EventEmitter} `this`. + * @api public + */ + EventEmitter.prototype.once = function once(event, fn, context) { + var listener = new EE(fn, context || this, true) + , evt = prefix ? prefix + event : event; + + if (!this._events[evt]) this._events[evt] = listener, this._eventsCount++; + else if (!this._events[evt].fn) this._events[evt].push(listener); + else this._events[evt] = [this._events[evt], listener]; + + return this; + }; + + /** + * Remove the listeners of a given event. + * + * @param {String|Symbol} event The event name. + * @param {Function} fn Only remove the listeners that match this function. + * @param {Mixed} context Only remove the listeners that have this context. + * @param {Boolean} once Only remove one-time listeners. + * @returns {EventEmitter} `this`. + * @api public + */ + EventEmitter.prototype.removeListener = function removeListener(event, fn, context, once) { + var evt = prefix ? prefix + event : event; + + if (!this._events[evt]) return this; + if (!fn) { + if (--this._eventsCount === 0) this._events = new Events(); + else delete this._events[evt]; + return this; + } - this.bind('disconnected', function() { - _this.subscribed = false - _this.resending = false - }) + var listeners = this._events[evt]; -} + if (listeners.fn) { + if ( + listeners.fn === fn + && (!once || listeners.once) + && (!context || listeners.context === context) + ) { + if (--this._eventsCount === 0) this._events = new Events(); + else delete this._events[evt]; + } + } else { + for (var i = 0, events = [], length = listeners.length; i < length; i++) { + if ( + listeners[i].fn !== fn + || (once && !listeners[i].once) + || (context && listeners[i].context !== context) + ) { + events.push(listeners[i]); + } + } -MicroEvent.mixin(Subscription) + // + // Reset the array, or remove it completely if we have no more listeners. + // + if (events.length) this._events[evt] = events.length === 1 ? events[0] : events; + else if (--this._eventsCount === 0) this._events = new Events(); + else delete this._events[evt]; + } -Subscription.prototype.handleMessage = function(message, stream, timestamp, counter, bye) { - if (this.counter === null && counter!==undefined) { - debug("Sub %s received message %d but does not know what number to expect", this.id, counter) - return - } + return this; + }; + + /** + * Remove all listeners, or those of the specified event. + * + * @param {String|Symbol} [event] The event name. + * @returns {EventEmitter} `this`. + * @api public + */ + EventEmitter.prototype.removeAllListeners = function removeAllListeners(event) { + var evt; + + if (event) { + evt = prefix ? prefix + event : event; + if (this._events[evt]) { + if (--this._eventsCount === 0) this._events = new Events(); + else delete this._events[evt]; + } + } else { + this._events = new Events(); + this._eventsCount = 0; + } - // Update ack counter - if (counter > this.counter) { - this.queue.push({ - msg: message, - str: stream, - ts: timestamp, - c: counter, - bye: bye - }) - - if (!this.resending) { - debug("Gap detected, requesting resend for channel %s", this.streamId) - this.trigger('gap', this.counter, counter-1) - } - } - else if (counter < this.counter) { - debug("Sub %s already received message: %d, expecting: %d", this.id, counter, this.counter) + return this; + }; + +// +// Alias methods names because people roll like that. +// + EventEmitter.prototype.off = EventEmitter.prototype.removeListener; + EventEmitter.prototype.addListener = EventEmitter.prototype.on; + +// +// This function doesn't apply anymore. +// + EventEmitter.prototype.setMaxListeners = function setMaxListeners() { + return this; + }; + +// +// Expose the prefix. +// + EventEmitter.prefixed = prefix; + +// +// Allow `EventEmitter` to be imported as module namespace. +// + EventEmitter.EventEmitter = EventEmitter; + +// +// Expose the module. +// + return EventEmitter + }()) + + var subId = 0 + function generateSubscriptionId() { + var id = subId++ + return id.toString() + }; + + var versionFields = { + '28': ['version', 'streamId', 'streamPartition', 'timestamp', 'ttl', 'offset', 'previousOffset', 'contentType', 'content'] } - else { - this.counter = counter + 1 - - this.callback(message, stream, timestamp, counter) - if (bye) - this.trigger('done') - } -} - -Subscription.prototype.checkQueue = function() { - if (this.queue.length) { - debug("Attempting to process %d queued messages for stream %s", this.queue.length, this.streamId) - - var i - for (i=0;i this.counter) - break + function getMessageField(field, msg) { + if (msg == null || msg[0] == null || versionFields[msg[0]] == null) { + return undefined } - - // All messages in queue were processed - if (i===this.queue.length) { - this.queue = [] + + var idx = versionFields[msg[0].toString()].indexOf(field) + if (idx >= 0) { + return msg[idx] } - // Some messages could not be processed, so compact the queue - // and request another resend for the gap! else { - this.queue.splice(0, i) - this.trigger('gap', this.counter, this.queue[0].c-1) + return undefined } } -} - -Subscription.prototype.hasResendOptions = function() { - return this.options.resend_all===true || this.options.resend_from >= 0 || this.options.resend_from_time >= 0 || this.options.resend_last > 0 -} - -Subscription.prototype.isSubscribed = function() { - return this.subscribed -} - -/** - * StreamClient - **/ - -function StreamrClient(options) { - // Default options - this.options = { - // The server to connect to - server: "https://data.streamr.com", - // Automatically connect on first subscribe - autoConnect: true, - // Automatically disconnect on last unsubscribe - autoDisconnect: true, - // Allow client socket library to choose appropriate transport - transports: null - } - this.subsByStream = {} - this.subById = {} - this.io = io - this.socket = null - this.connected = false - - // Can give server URL as parameter instead of options object - if (typeof options === "string") - this.options.server = options - else - extend(this.options, options || {}) -} + function Subscription(streamId, callback, options) { + EventEmitter.call(this); // call parent constructor + + if (!streamId) + throw "No stream id given!" + if (!callback) + throw "No callback given!" + + var _this = this + + this.id = generateSubscriptionId() + this.streamId = streamId + this.callback = callback + this.options = options || {} + this.queue = [] + this.subscribing = false + this.subscribed = false + this.lastReceivedOffset = null + + // Check that multiple resend options are not given + var resendOptionCount = 0 + if (this.options.resend_all) + resendOptionCount++ + if (this.options.resend_from!=null) + resendOptionCount++ + if (this.options.resend_last!=null) + resendOptionCount++ + if (this.options.resend_from_time!=null) + resendOptionCount++ + if (resendOptionCount>1) + throw "Multiple resend options active! Please use only one: "+JSON.stringify(options) + + // Automatically convert Date objects to numbers for resend_from_time + if (this.options.resend_from_time != null + && typeof this.options.resend_from_time !== 'number') { + + if (typeof this.options.resend_from_time.getTime === 'function') + this.options.resend_from_time = this.options.resend_from_time.getTime() + else throw "resend_from_time option must be a Date object or a number representing time!" + } -MicroEvent.mixin(StreamrClient) + /*** Message handlers ***/ -StreamrClient.prototype._addSubscription = function(sub) { - this.subById[sub.id] = sub + this.on('subscribed', function(response) { + debug("Sub %s subscribed to stream: %s", _this.id, _this.streamId) + _this.subscribed = true + _this.subscribing = false + }) - if (!this.subsByStream[sub.streamId]) - this.subsByStream[sub.streamId] = [sub] - else this.subsByStream[sub.streamId].push(sub) -} + this.on('unsubscribed', function() { + debug("Sub %s unsubscribed: %s", _this.id, _this.streamId) + _this.subscribed = false + _this.subscribing = false + _this.unsubscribing = false + _this.resending = false + }) -StreamrClient.prototype._removeSubscription = function(sub) { - delete this.subById[sub.id] + this.on('resending', function(response) { + debug("Sub %s resending: %o", _this.id, response) + // _this.resending = true was set elsewhere before making the request + }) - if (this.subsByStream[sub.streamId]) { - this.subsByStream[sub.streamId] = this.subsByStream[sub.streamId].filter(function(it) { - return it !== sub + this.on('no_resend', function(response) { + debug("Sub %s no_resend: %o", _this.id, response) + _this.resending = false + _this.checkQueue() }) - if (this.subsByStream[sub.streamId].length === 0) - delete this.subsByStream[sub.streamId] - } -} + this.on('resent', function(response) { + debug("Sub %s resent: %o", _this.id, response) + _this.resending = false + _this.checkQueue() + }) -StreamrClient.prototype.getSubscriptions = function(streamId) { - return this.subsByStream[streamId] || [] -} + this.on('connected', function() { -StreamrClient.prototype.subscribe = function(streamId, callback, options) { - var _this = this + }) - if (!streamId) - throw "subscribe: Invalid arguments: stream id is required!" - else if (typeof streamId !== 'string') - throw "subscribe: stream id must be a string!" + this.on('disconnected', function() { + _this.subscribed = false + _this.subscribing = false + _this.resending = false + }) - if (!callback) - throw "subscribe: Invalid arguments: callback is required!" + } - // Create the Subscription object and bind handlers - var sub = new Subscription(streamId, callback, options) - sub.bind('gap', function(from, to) { - _this._requestResend(sub, {resend_from: from, resend_to: to}) - }) - sub.bind('done', function() { - _this.unsubscribe(sub) + // Subscription extends EventEmitter + Object.keys(EventEmitter.prototype).forEach(function(it) { + Subscription.prototype[it] = EventEmitter.prototype[it] }) - // Add to lookups - this._addSubscription(sub) + Subscription.prototype.handleMessage = function(msg, isResend) { + var content = getMessageField('content', msg) + var timestamp = getMessageField('timestamp', msg) + var offset = getMessageField('offset', msg) + var previousOffset = getMessageField('previousOffset', msg) - // If connected, emit a subscribe request - if (this.connected) { - this._resendAndSubscribe(sub) - } else if (this.options.autoConnect) { - this.connect() + if (previousOffset == null) { + debug("handleMessage: prevOffset is null, gap detection is impossible! message: %o", msg) + } + + // If resending, queue broadcasted messages + if (this.resending && !isResend) { + this.queue.push(msg) + } else { + // Gap check + if (previousOffset != null && // previousOffset is required to check for gaps + this.lastReceivedOffset != null && // and we need to know what msg was the previous one + previousOffset > this.lastReceivedOffset && // previous message had larger offset than our previous msg => gap! + !this.resending) { + + // Queue the message to be processed after resend + this.queue.push(msg) + + var from = this.lastReceivedOffset + 1 + var to = previousOffset + debug("Gap detected, requesting resend for stream %s from %d to %d", this.streamId, from, to) + this.emit('gap', from, to) + } + // Prevent double-processing of messages for any reason + else if (this.lastReceivedOffset != null && offset <= this.lastReceivedOffset) { + debug("Sub %s already received message: %d, lastReceivedOffset: %d. Ignoring message.", this.id, offset, this.lastReceivedOffset) + } + // Normal case where prevOffset == null || lastReceivedOffset == null || prevOffset === lastReceivedOffset + else { + this.lastReceivedOffset = offset + this.callback(content, this.streamId, timestamp, offset) + if (content[BYE_KEY]) { + this.emit('done') + } + } + } } - return sub -} + Subscription.prototype.checkQueue = function() { + if (this.queue.length) { + debug("Attempting to process %d queued messages for stream %s", this.queue.length, this.streamId) -StreamrClient.prototype.unsubscribe = function(sub) { - if (!sub || !sub.streamId) - throw "unsubscribe: please give a Subscription object as an argument!" + var i + var length = this.queue.length + for (i=0; i= 0 || this.options.resend_from_time >= 0 || this.options.resend_last > 0 } - // Else the sub can be cleaned off immediately - else if (!sub.unsubscribing) { - this._removeSubscription(sub) - sub.trigger('unsubscribed') - this._checkAutoDisconnect() + + /** + * Resend needs can change if messages have already been received. + * This function always returns the effective resend options: + * + * If messages have been received: + * - resend_all becomes resend_from + * - resend_from becomes resend_from the latest received message + * - resend_from_time becomes resend_from the latest received message + * - resend_last stays the same + */ + Subscription.prototype.getEffectiveResendOptions = function() { + if (this.hasReceivedMessages() && this.hasResendOptions()) { + if (this.options.resend_all || this.options.resend_from || this.options.resend_from_time) { + return { resend_from: this.lastReceivedOffset + 1 } + } + else if (this.options.resend_last) { + return this.options + } + } + else { + return this.options + } } -} - -StreamrClient.prototype.unsubscribeAll = function(streamId) { - var _this = this - if (!streamId) - throw "unsubscribeAll: a stream id is required!" - else if (typeof streamId !== 'string') - throw "unsubscribe: stream id must be a string!" - - if (this.subsByStream[streamId]) { - // Copy the list to avoid concurrent modifications - var l = this.subsByStream[streamId].slice() - l.forEach(function(sub) { - _this.unsubscribe(sub) - }) + + Subscription.prototype.hasReceivedMessages = function() { + return this.lastReceivedOffset != null } -} - -StreamrClient.prototype.isConnected = function() { - return this.connected -} - -StreamrClient.prototype.reconnect = function() { - return this.connect(true) -} - -StreamrClient.prototype.connect = function(reconnect) { - var _this = this - - if (this.connected) { - debug("connect() called while already connected, doing nothing...") - return + + Subscription.prototype.isSubscribed = function() { + return this.subscribed } - else if (this.connecting) { - debug("connect() called while connecting, doing nothing...") - return + + function StreamrClient(options) { + EventEmitter.call(this); // call parent constructor + + // Default options + this.options = { + // The server to connect to + server: "https://data.streamr.com", + path: "/api/v1/socket.io", + // Automatically connect on first subscribe + autoConnect: true, + // Automatically disconnect on last unsubscribe + autoDisconnect: true, + // Allow client socket library to choose appropriate transport + transports: null + } + this.subsByStream = {} + this.subById = {} + + this.io = io + this.socket = null + this.connected = false + + // Can give server URL as parameter instead of options object + if (typeof options === "string") + this.options.server = options + else + extend(this.options, options || {}) } - - debug("Connecting to %s", this.options.server) - this.connecting = true - this.disconnecting = false - - this.socket = this.io(this.options.server, { - forceNew: true, - transports: this.options.transports + + // StreamrClient extends EventEmitter + Object.keys(EventEmitter.prototype).forEach(function(it) { + StreamrClient.prototype[it] = EventEmitter.prototype[it] }) - this.socket.on('ui', function(data) { - if (typeof data == 'string' || data instanceof String) { - data = JSON.parse(data) - } + StreamrClient.prototype._addSubscription = function(sub) { + this.subById[sub.id] = sub - // Delete the internal fields from the msg - var streamId = data[STREAM_KEY] - delete data[STREAM_KEY] - var timestamp = data[TIMESTAMP_KEY] - delete data[TIMESTAMP_KEY] - var counter = data[COUNTER_KEY] - delete data[COUNTER_KEY] - var bye = data[BYE_KEY] - delete data[BYE_KEY] - var sub = data[SUB_KEY] - delete data[SUB_KEY] - - // If the message targets a specific subscription via _sub, only report the message to that - if (sub!==undefined) { - _this.subById[sub].handleMessage(data, streamId, timestamp, counter, bye) - } - else { - // Notify the Subscriptions for this stream. If this is not the message each individual Subscription - // is expecting, they will either ignore it or request resend via gap event. - var subs = _this.subsByStream[streamId] + if (!this.subsByStream[sub.streamId]) + this.subsByStream[sub.streamId] = [sub] + else this.subsByStream[sub.streamId].push(sub) + } - if (subs) { - for (var i=0;i 0) { - if (from === undefined) { - // Find the max received counter - var counters = subscribedSubs.map(function(it) { - return it.counter - }) - from = Math.max.apply(Math, counters) + + StreamrClient.prototype._checkAutoDisconnect = function() { + // Disconnect if no longer subscribed to any channels + if (Object.keys(this.subsByStream).length===0 && this.options.autoDisconnect) { + debug("Disconnecting due to no longer being subscribed to any channels") + this.disconnect() } + } + + StreamrClient.prototype._resendAndSubscribe = function(sub) { + var _this = this - debug('_requestSubscribe: another subscription for same stream: %s, insta-subscribing from %d', sub.streamId, from) + if (!sub.subscribing && !sub.resending) { + sub.subscribing = true + _this._requestSubscribe(sub) - setTimeout(function() { - sub.trigger('subscribed', {from: from}) - }, 0) + // Once subscribed, ask for a resend + sub.once('subscribed', function() { + if (sub.hasResendOptions()) { + _this._requestResend(sub) + } + }) + } } -} - -StreamrClient.prototype._requestUnsubscribe = function(streamId) { - debug("Client unsubscribing from %o", streamId) - this.socket.emit('unsubscribe', {channel: streamId}) -} - -StreamrClient.prototype._requestResend = function(sub, resendOptions) { - // If overriding resendOptions are given, need to remove resend options in sub.options - var options = extend({}, sub.options) - if (resendOptions) { - Object.keys(options).forEach(function (key) { - if (key.match(/resend_.*/)) { - delete options[key] - } + + StreamrClient.prototype._requestSubscribe = function(sub) { + var _this = this + var subs = _this.subsByStream[sub.streamId] + + var subscribedSubs = subs.filter(function(it) { + return it.isSubscribed() }) + + // If this is the first subscription for this stream, send a subscription request to the server + if (!subs._subscribing && subscribedSubs.length === 0) { + var req = extend({}, sub.options, {channel: sub.streamId}) + debug("_requestSubscribe: subscribing client: %o", req) + subs._subscribing = true + _this.socket.emit('subscribe', req) + } + // If there already is a subscribed subscription for this stream, this new one will just join it immediately + else if (subscribedSubs.length > 0) { + debug('_requestSubscribe: another subscription for same stream: %s, insta-subscribing', sub.streamId) + + setTimeout(function() { + sub.emit('subscribed') + }, 0) + } } - sub.resending = true + StreamrClient.prototype._requestUnsubscribe = function(streamId) { + debug("Client unsubscribing stream %o", streamId) + this.socket.emit('unsubscribe', {channel: streamId}) + } + + StreamrClient.prototype._requestResend = function(sub, resendOptions) { + // If overriding resendOptions are given, need to remove resend options in sub.options + var options = extend({}, sub.getEffectiveResendOptions()) + if (resendOptions) { + Object.keys(options).forEach(function (key) { + if (key.match(/resend_.*/)) { + delete options[key] + } + }) + } + + sub.resending = true - var request = extend({}, options, resendOptions, {channel: sub.streamId, sub: sub.id}) - debug("_requestResend: %o", request) - this.socket.emit('resend', request) -} + var request = extend({}, options, resendOptions, {channel: sub.streamId, sub: sub.id}) + debug("_requestResend: %o", request) + this.socket.emit('resend', request) + } -StreamrClient.prototype.handleError = function(msg) { - debug(msg) - this.trigger('error', msg) -} + StreamrClient.prototype.handleError = function(msg) { + debug(msg) + this.emit('error', msg) + } -if (typeof module !== 'undefined' && module.exports) - module.exports = StreamrClient -else window.StreamrClient = StreamrClient + if (typeof module !== 'undefined' && module.exports) + module.exports = StreamrClient + else window.StreamrClient = StreamrClient })(typeof(exports) !== 'undefined' ? exports : window) \ No newline at end of file diff --git a/test/test.streamr-client.js b/test/test.streamr-client.js index c2cd1e79d..65b9a8560 100644 --- a/test/test.streamr-client.js +++ b/test/test.streamr-client.js @@ -1,6 +1,10 @@ +"use strict" + +var EventEmitter = require('eventemitter3') var assert = require('assert'), - events = require('eventemitter2'), - mockery = require('mockery') + mockery = require('mockery'), + sinon = require('sinon'), + mockDebug = require('debug')('mock') var STREAM_KEY = "_S" var COUNTER_KEY = "_C" @@ -34,20 +38,47 @@ describe('StreamrClient', function() { asyncs = [] } - function msg(stream, counter, content, subId) { - var msg = {} - msg[STREAM_KEY] = stream - msg[COUNTER_KEY] = counter - - if (subId!==undefined) - msg[SUB_KEY] = subId - - if (content) - Object.keys(content).forEach(function(key) { - msg[key] = content[key] - }) - - return msg + var previousOffsetByStreamId = {} + + // ['version', 'streamId', 'streamPartition', 'timestamp', 'ttl', 'offset', 'previousOffset', 'contentType', 'content'] + + function msg(streamId, offset, content, subId, forcePreviousOffset) { + content = content || {} + + // unicast message to subscription + if (subId != null) { + var msg = [ + 28, // version + streamId, + 0, // partition + Date.now(), // timestamp + 0, // ttl + offset, + forcePreviousOffset, // previousOffset + 27, // contentType (JSON) + content] + + return { + m: msg, + sub: subId + } + } + // broadcast message to all subscriptions + else { + var previousOffset = forcePreviousOffset || previousOffsetByStreamId[streamId] + previousOffsetByStreamId[streamId] = offset + + return [ + 28, // version + streamId, + 0, // partition + Date.now(), // timestamp + 0, // ttl + offset, + previousOffset !== offset ? previousOffset : null, + 27, // contentType (JSON) + content] + } } function byeMsg(stream, counter) { @@ -57,23 +88,26 @@ describe('StreamrClient', function() { } function createSocketMock() { - var s = new events.EventEmitter2 + var s = new EventEmitter() s.disconnect = function() { async(function() { + mockDebug("socket.disconnect: emitting disconnect") s.emit('disconnect') }) } s.defaultSubscribeHandler = function(request) { async(function() { - s.emit('subscribed', {channel: request.channel, from: 0}) + mockDebug("defaultSubscribeHandler: emitting subscribed") + s.emit('subscribed', {channel: request.channel}) }) } s.on('subscribe', s.defaultSubscribeHandler) s.defaultUnsubscribeHandler = function(request) { async(function() { + mockDebug("defaultUnsubscribeHandler: emitting unsubscribed") s.emit('unsubscribed', {channel: request.channel}) }) } @@ -113,6 +147,7 @@ describe('StreamrClient', function() { client = new StreamrClient() client.options.autoConnect = false client.options.autoDisconnect = false + previousOffsetByStreamId = {} }) after(function() { @@ -157,10 +192,10 @@ describe('StreamrClient', function() { it('should not emit anything on connect if not subscribed to anything', function(done) { client.connect() - client.socket.onAny(function() { + client.socket.emit = function() { if (this.event !== 'connect') throw "Unexpected emit: "+this.event - }) + } done() }) @@ -330,10 +365,10 @@ describe('StreamrClient', function() { }) }) - var subscription = client.subscribe("stream1", function(message) {}) + client.subscribe("stream1", function(message) {}) client.connect() - client.bind('error', function(msg) { + client.on('error', function(msg) { assert(msg.indexOf('error message' >= 0)) done() }) @@ -346,23 +381,6 @@ describe('StreamrClient', function() { client.subscribe("stream1", function(message) {}) }) - it('should set the expected counter to what the subscribed message says', function(done) { - var sub = client.subscribe("stream1", function(message) {}) - client.connect() - - client.socket.removeListener('subscribe', client.socket.defaultSubscribeHandler) - client.socket.on('subscribe', function(request) { - async(function() { - client.socket.emit('subscribed', {channel:'stream1', from:2}) - }) - }) - - client.socket.once('subscribed', function(response) { - assert.equal(sub.counter, 2) - done() - }) - }) - it('should send only one subscribe request to server even if there are multiple subscriptions for same stream', function(done) { var subscribeCount = 0 socket.on('subscribe', function(request) { @@ -381,55 +399,31 @@ describe('StreamrClient', function() { done() } - sub1.bind('subscribed', function(response) { + sub1.on('subscribed', function(response) { check(sub1) }) - sub2.bind('subscribed', function(response) { + sub2.on('subscribed', function(response) { check(sub2) }) }) - it('should subscribe late subscriptions from the latest message', function(done) { - var sub1 = client.subscribe("stream1", function(sub1msg) { - if (sub1msg.n===0) { - async(function() { - var sub2 = client.subscribe('stream1', function(sub2msg) { - if (sub2msg.n===1) - done() - else throw "Unexpected message received by sub2: "+JSON.stringify(sub2msg) - }) - sub2.bind('subscribed', function(response) { - assert.equal(response.from, 1) - async(function() { - client.socket.emit('ui', msg('stream1', 1, {n:1})) - }) - }) - }) - } - }) - client.connect() - - sub1.bind('subscribed', function(response) { - async(function() { - client.socket.emit('ui', msg('stream1', 0, {n:0})) - }) - }) - - }) - }) describe("subscribe with resend options", function() { - it('should emit a resend request', function(done) { - client.subscribe("stream1", function(message) {}, {resend_all:true}) + it('should emit a resend request after subscribed', function(done) { + var sub = client.subscribe("stream1", function(message) {}, {resend_all:true}) client.connect() client.socket.once('resend', function(request) { - if (request.resend_all) - done() + if (request.resend_all) { + if (sub.isSubscribed()) { + done() + } else throw "Resend request sent, but sub was not subscribed!" + } else throw "Unexpected resend request: "+JSON.stringify(request) }) + }) it('should emit a resend request with given other options', function(done) { @@ -449,70 +443,6 @@ describe('StreamrClient', function() { }) }) - it('should subscribe to the channel after resent', function(done) { - client.subscribe("stream1", function(message) {}, {resend_all:true}) - client.connect() - - client.socket.once('resend', function(request) { - async(function() { - client.socket.emit('resending', {channel:'stream1', sub: request.sub, from:0 ,to:1}) - client.socket.emit('ui', msg('stream1', 0)) - client.socket.emit('ui', msg('stream1', 1)) - client.socket.emit('resent', {channel:'stream1', sub: request.sub, from:0, to:1}) - }) - - client.socket.once('subscribe', function(request) { - assert.equal(request.from, 2) - done() - }) - }) - }) - - it('should request resend after subscribe if the counter subscribed from is not the expected value', function(done) { - socket.removeListener('subscribe', socket.defaultSubscribeHandler) - - client.subscribe("stream1", function(message) {}, {resend_all:true}) - client.connect() - - client.socket.once('resend', function(request) { - async(function() { - client.socket.emit('resending', {channel:'stream1', sub: request.sub, from:0 ,to:1}) - client.socket.emit('ui', msg('stream1', 0)) - client.socket.emit('ui', msg('stream1', 1)) - client.socket.emit('resent', {channel:'stream1', sub: request.sub, from:0, to:1}) - }) - }) - - client.socket.once('subscribe', function(request) { - assert.equal(request.from, 2) - async(function() { - client.socket.emit('subscribed', {channel:'stream1', from: 10}) - }) - client.socket.once('resend', function(request) { - assert.equal(request.resend_from, 2) - assert.equal(request.resend_to, 9) - done() - }) - }) - - }) - - it('should subscribe to the channel after no_resend', function(done) { - client.subscribe("stream1", function(message) {}, {resend_all:true}) - client.connect() - - client.socket.once('resend', function(request) { - async(function() { - client.socket.emit('no_resend', {channel:'stream1', sub: request.sub, next:7}) - }) - - client.socket.once('subscribe', function(request) { - assert.equal(request.from, 7) - done() - }) - }) - }) - it('should resend to multiple subscriptions as per each resend option', function(done) { var sub1count = 0 var sub1 = client.subscribe("stream1", function(message) { @@ -530,20 +460,20 @@ describe('StreamrClient', function() { client.connect() - client.socket.once('resend', function(request) { + client.socket.on('resend', function(request) { if (request.resend_all) { async(function() { - client.socket.emit('resending', {channel:'stream1', sub: request.sub, from:0 ,to:1}) - client.socket.emit('ui', msg('stream1', 0)) - client.socket.emit('ui', msg('stream1', 1)) - client.socket.emit('resent', {channel:'stream1', sub: request.sub, from:0, to:1}) + client.socket.emit('resending', {channel:'stream1', sub: request.sub}) + client.socket.emit('u', msg('stream1', 0, request.sub)) + client.socket.emit('u', msg('stream1', 1, request.sub)) + client.socket.emit('resent', {channel:'stream1', sub: request.sub}) }) } else if (request.resend_last===1) { async(function() { - client.socket.emit('resending', {channel:'stream1', sub: request.sub, from:1 ,to:1}) - client.socket.emit('ui', msg('stream1', 1)) - client.socket.emit('resent', {channel:'stream1', sub: request.sub, from:1, to:1}) + client.socket.emit('resending', {channel:'stream1', sub: request.sub}) + client.socket.emit('ui', msg('stream1', 1, request.sub)) + client.socket.emit('resent', {channel:'stream1', sub: request.sub}) }) } }) @@ -555,46 +485,19 @@ describe('StreamrClient', function() { if (subCount>1) throw "Only one subscribe should have been sent" - assert.equal(request.from, 2) done() }) }) - it('should not report subscribed to resending channels', function(done) { - var sub1 = client.subscribe("stream1", function(message) {}, {resend_all:true}) - client.connect() - - client.socket.once('resend', function(request) { - async(function() { - client.socket.emit('resending', {channel:'stream1', sub: request.sub, from:0 ,to:1}) - }) - - sub1.bind('subscribed', function(response) { - throw "Should not have reported a resending subscription as subscribed!" - }) - - async(function() { - var sub2 = client.subscribe("stream1", function(message) {}) // no resend - sub2.bind('subscribed', function() { - done() - }) - }) - - }) - }) - - it('should not subscribe or crash on resent if bye message is received', function(done) { + it('should not crash on resent if bye message is received while resending', function(done) { var sub = client.subscribe("stream1", function(message) {}, {resend_all:true}) client.connect() - client.socket.on('subscribe', function(request) { - throw "Should not have subscribed" - }) client.socket.once('resend', function(request) { async(function() { - client.socket.emit('resending', {channel:'stream1', from:0 ,to:0, sub:sub.id}) - client.socket.emit('ui', byeMsg('stream1', 0)) - client.socket.emit('resent', {channel:'stream1', from:0, to:0, sub:sub.id}) + client.socket.emit('resending', {channel:'stream1', sub:sub.id}) + client.socket.emit('b', byeMsg('stream1', 0)) + client.socket.emit('resent', {channel:'stream1', sub:sub.id}) done() }) }) @@ -605,15 +508,12 @@ describe('StreamrClient', function() { var sub = client.subscribe("stream1", function(message) {}, {resend_all:true}) client.connect() - client.socket.on('subscribe', function(request) { - throw "Should not have subscribed" - }) client.socket.once('resend', function(request) { async(function() { - client.socket.emit('resending', {channel:'stream1', from:0 ,to:1, sub:sub.id}) - client.socket.emit('ui', byeMsg('stream1', 0)) - client.socket.emit('ui', msg('stream1', 1, {})) - client.socket.emit('resent', {channel:'stream1', from:0, to:1, sub:sub.id}) + client.socket.emit('resending', {channel:'stream1', sub:sub.id}) + client.socket.emit('b', byeMsg('stream1', 0)) + client.socket.emit('u', msg('stream1', 1, sub.id)) + client.socket.emit('resent', {channel:'stream1', sub:sub.id}) done() }) }) @@ -622,13 +522,14 @@ describe('StreamrClient', function() { }) describe("message handling", function() { - it('should call the callback when a message is received with correct counter', function(done) { + + it('should call the callback when a message is received', function(done) { var sub = client.subscribe("stream1", function(message) { done() }) client.connect() client.socket.once('subscribed', function() { - client.socket.emit('ui', msg("stream1", 0)) + client.socket.emit('b', msg("stream1", 0)) }) }) @@ -643,9 +544,9 @@ describe('StreamrClient', function() { client.socket.once('subscribed', function() { // Fake messages - client.socket.emit('ui', msg("stream1",0)) - client.socket.emit('ui', msg("stream1",0)) - client.socket.emit('ui', msg("stream1",0)) + client.socket.emit('b', msg("stream1",0)) + client.socket.emit('b', msg("stream1",0)) + client.socket.emit('b', msg("stream1",0)) done() }) }) @@ -664,9 +565,9 @@ describe('StreamrClient', function() { client.connect() client.socket.once('subscribed', function() { - client.socket.emit('ui', msg("stream1", 0, {count:0})) - client.socket.emit('ui', msg("stream1", 1, {count:1})) - client.socket.emit('ui', msg("stream1", 2, {count:2})) + client.socket.emit('b', msg("stream1", 0, {count:0})) + client.socket.emit('b', msg("stream1", 1, {count:1})) + client.socket.emit('b', msg("stream1", 2, {count:2})) }) }) @@ -678,7 +579,7 @@ describe('StreamrClient', function() { client.connect() client.socket.once('subscribed', function() { - client.socket.emit('ui', byeMsg("stream1", 0)) + client.socket.emit('b', byeMsg("stream1", 0)) }) client.socket.once('unsubscribed', function(response) { @@ -699,11 +600,12 @@ describe('StreamrClient', function() { sub2.counter = 0 client.connect() - sub2.bind('subscribed', function() { + sub2.on('subscribed', function() { assert.throws(function() { - client.socket.emit('ui', msg('stream1', 0, {}))//, sub2.id)) + // Received by sub2 + client.socket.emit('b', msg('stream1', 0, {})) }) - client.socket.emit('ui', msg('stream1', 0, {}, sub2.id)) + client.socket.emit('u', msg('stream1', 0, {}, sub2.id)) }) }) @@ -716,7 +618,7 @@ describe('StreamrClient', function() { client.connect() client.socket.once('subscribed', function() { - client.socket.emit('ui', msg("stream1", 0, {count:0})) + client.socket.emit('b', msg("stream1", 0, {count:0})) }) }) @@ -726,10 +628,10 @@ describe('StreamrClient', function() { it('should fire the unsubscribed event', function(done) { var sub = client.subscribe("stream1", function(message) {}) client.connect() - sub.bind('subscribed', function() { + sub.on('subscribed', function() { client.unsubscribe(sub) }) - sub.bind('unsubscribed', function() { + sub.on('unsubscribed', function() { done() }) }) @@ -771,11 +673,11 @@ describe('StreamrClient', function() { var sub2 = client.subscribe("stream1", function(message) {}) client.connect() - sub2.bind('subscribed', function() { + sub2.on('subscribed', function() { client.unsubscribe(sub2) }) - sub2.bind('unsubscribed', function() { + sub2.on('unsubscribed', function() { done() }) @@ -783,7 +685,7 @@ describe('StreamrClient', function() { throw "Socket should not have unsubscribed" }) - sub1.bind('unsubscribed', function() { + sub1.on('unsubscribed', function() { throw "sub1 should not have unsubscribed" }) }) @@ -814,7 +716,7 @@ describe('StreamrClient', function() { var sub = client.subscribe('stream1', function(message) {}) client.connect() - sub.bind('subscribed', function() { + sub.on('subscribed', function() { assert.throws(function() { client.unsubscribe() }) @@ -825,7 +727,7 @@ describe('StreamrClient', function() { var sub = client.subscribe("stream1", function(message) {}) client.connect() - sub.bind('subscribed', function() { + sub.on('subscribed', function() { assert.throws(function() { client.unsubscribe('stream1') }) @@ -838,16 +740,16 @@ describe('StreamrClient', function() { }) client.connect() - sub.bind('subscribed', function() { + sub.on('subscribed', function() { client.unsubscribe(sub) }) - sub.bind('unsubscribed', function() { + sub.on('unsubscribed', function() { var newSub = client.subscribe("stream1", function(message) { done() }) - newSub.bind('subscribed', function() { - client.socket.emit('ui', msg("stream1", 0, {})) + newSub.on('subscribed', function() { + client.socket.emit('b', msg("stream1", 0, {})) }) }) }) @@ -883,12 +785,10 @@ describe('StreamrClient', function() { async(function() { client.socket.emit('resending', { channel: request.channel, - sub: request.sub, - from: 0, - to: 0 + sub: request.sub }) - client.socket.emit('ui', byeMsg(request.channel,0)) - client.socket.emit('resent', {channel: request.channel, sub: request.sub, from:0, to:0}) + client.socket.emit('b', byeMsg(request.channel,0)) + client.socket.emit('resent', {channel: request.channel, sub: request.sub}) }) }) @@ -1074,32 +974,51 @@ describe('StreamrClient', function() { function resend(channel, sub, from, to) { client.socket.emit('resending', { channel: channel, - sub: sub, - from: from, - to: to + sub: sub }) for (var i=from;i<=to;i++) { - client.socket.emit('ui', msg(channel,i)) + client.socket.emit('u', msg(channel, i, {}, sub)) } - client.socket.emit('resent', {channel: channel, sub: sub, from:from, to:to}) + client.socket.emit('resent', {channel: channel, sub: sub}) } socket.defaultResendHandler = function(request) { - console.log("defaultResendHandler: "+JSON.stringify(request)) + mockDebug("defaultResendHandler: "+JSON.stringify(request)) // Check that the request is allowed checkResendRequest(request) async(function() { - if (request.resend_from!=null && request.resend_to!=null) + mockDebug("handling resend request: %o", request) + if (request.resend_all) { + if (resendLimits[request.channel]===undefined) { + client.socket.emit('no_resend', {channel: request.channel, sub: request.sub}) + } + else { + resend(request.channel, request.sub, resendLimits[request.channel].from, resendLimits[request.channel].to) + } + } + else if (request.resend_last) { + if (resendLimits[request.channel] === undefined) { + throw "Testing resend_last needs resendLimits.channel.to" + } + resend(request.channel, request.sub, resendLimits[request.channel].to - (request.resend_last - 1), resendLimits[request.channel].to) + } + else if (request.resend_from!=null && request.resend_to!=null) { resend(request.channel, request.sub, request.resend_from, request.resend_to) + } + else if (request.resend_from!=null) { + if (resendLimits[request.channel] === undefined) { + throw "Testing resend_from needs resendLimits.channel.to" + } + resend(request.channel, request.sub, request.resend_from, resendLimits[request.channel].to) + } else if (request.resend_from_time!=null) { resend(request.channel, request.sub, 99, 100) } - else if (resendLimits[request.channel]===undefined) - client.socket.emit('no_resend', {channel: request.channel, sub: request.sub, next: 0}) - else - resend(request.channel, request.sub, resendLimits[request.channel].from, resendLimits[request.channel].to) + else { + throw "Unknown kind of resend request: "+JSON.stringify(request) + } }) } socket.on('resend', socket.defaultResendHandler) @@ -1171,23 +1090,25 @@ describe('StreamrClient', function() { client.subscribe("stream1", function(message) {}, {resend_from_time: "invalid"}) }) }) - - it('should emit a resend request if the first message is not the expected one', function(done) { - client.subscribe("stream1", function(message) {}) - client.connect() - - validResendRequests.push({channel:"stream1", resend_from:0, resend_to:1}) - client.socket.once('subscribed', function() { - assert(client.socket.defaultResendHandler!=null) - client.socket.emit('ui', msg("stream1",2)) - }) + it('should not emit a resend request if there is no gap in messages', function(done) { + client.subscribe("stream1", function(message) { + if (message.done) { + done() + } + }) + client.connect() + + socket.once('resend', function(req) { + throw "Should not have made a resend request:" + JSON.stringify(req) + }) + + client.socket.once('subscribed', function() { + client.socket.emit('b', msg("stream1", 0)) + client.socket.emit('b', msg("stream1", 10, {done: true}, undefined, 0)) + }) + }) - client.socket.once('resent', function() { - done() - }) - }) - it('should emit a resend request if there is a gap in messages', function(done) { client.subscribe("stream1", function(message) {}) client.connect() @@ -1195,8 +1116,8 @@ describe('StreamrClient', function() { validResendRequests.push({channel:"stream1", resend_from:1, resend_to:9}) client.socket.once('subscribed', function() { - client.socket.emit('ui', msg("stream1",0)) - client.socket.emit('ui', msg("stream1",10)) + client.socket.emit('b', msg("stream1", 0)) + client.socket.emit('b', msg("stream1", 10, {}, undefined, 9)) }) client.socket.once('resent', function() { @@ -1211,8 +1132,8 @@ describe('StreamrClient', function() { validResendRequests.push({channel:"stream1", resend_from:1, resend_to:9}) client.socket.once('subscribed', function() { - client.socket.emit('ui', msg("stream1",0)) - client.socket.emit('ui', msg("stream1",10)) + client.socket.emit('b', msg("stream1", 0)) + client.socket.emit('b', msg("stream1", 10, {}, undefined, 9)) }) client.socket.once('resend', function(request) { @@ -1232,8 +1153,8 @@ describe('StreamrClient', function() { validResendRequests.push({channel:"stream1", resend_from:1, resend_to:1}) client.socket.once('subscribed', function() { - client.socket.emit('ui', msg("stream1",0)) - client.socket.emit('ui', msg("stream1",2)) + client.socket.emit('b', msg("stream1", 0)) + client.socket.emit('b', msg("stream1", 2, {}, undefined, 1)) }) client.socket.on('resend', function(request) { @@ -1247,15 +1168,15 @@ describe('StreamrClient', function() { }) it('should not emit another resend request while waiting for resend', function(done) { + validResendRequests.push({channel:"stream1", resend_from:1, resend_to:9}) + client.subscribe("stream1", function(message) {}) client.connect() - - validResendRequests.push({channel:"stream1", resend_from:1, resend_to:9}) client.socket.once('subscribed', function() { - client.socket.emit('ui', msg("stream1",0)) - client.socket.emit('ui', msg("stream1",10)) - client.socket.emit('ui', msg("stream1",11)) + client.socket.emit('b', msg("stream1", 0)) + client.socket.emit('b', msg("stream1", 10, {}, undefined, 9)) + client.socket.emit('b', msg("stream1", 11, {}, undefined, 10)) }) var counter = 0 @@ -1279,10 +1200,10 @@ describe('StreamrClient', function() { validResendRequests.push({channel:"stream1", resend_from:1, resend_to:9}) client.socket.once('subscribed', function() { - client.socket.emit('ui', msg("stream1", 0, {counter: 0})) - client.socket.emit('ui', msg("stream1",10, {counter: 10})) - client.socket.emit('ui', msg("stream1",11, {counter: 11})) - client.socket.emit('ui', msg("stream1",12, {counter: 12})) + client.socket.emit('b', msg("stream1", 0, {counter: 0})) + client.socket.emit('b', msg("stream1",10, {counter: 10}, undefined, 9)) + client.socket.emit('b', msg("stream1",11, {counter: 11})) + client.socket.emit('b', msg("stream1",12, {counter: 12})) }) }) @@ -1296,17 +1217,17 @@ describe('StreamrClient', function() { validResendRequests.push({channel:"stream1", resend_from:1, resend_to:9}) client.socket.once('subscribed', function() { - client.socket.emit('ui', msg("stream1", 0, {counter: 0})) - client.socket.emit('ui', msg("stream1", 10, {counter: 10})) - client.socket.emit('ui', msg("stream1", 11, {counter: 11})) - client.socket.emit('ui', msg("stream1", 11, {counter: 11})) // bogus message - client.socket.emit('ui', msg("stream1", 5, {counter: 5})) // bogus message - client.socket.emit('ui', msg("stream1", 12, {counter: 12})) + client.socket.emit('b', msg("stream1", 0, {counter: 0})) + client.socket.emit('b', msg("stream1", 10, {counter: 10}, undefined, 9)) + client.socket.emit('b', msg("stream1", 11, {counter: 11}, undefined, 10)) + client.socket.emit('b', msg("stream1", 11, {counter: 11}, undefined, 10)) // bogus message + client.socket.emit('b', msg("stream1", 5, {counter: 5}, undefined, 4)) // bogus message + client.socket.emit('b', msg("stream1", 12, {counter: 12}, undefined, 11)) }) }) it('should do another resend request if there are gaps in the queue', function(done) { - var subscription = client.subscribe("stream1", function(message, streamId, timetamp, counter) { + client.subscribe("stream1", function(message, streamId, timetamp, counter) { if (counter===12) done() }) @@ -1316,56 +1237,148 @@ describe('StreamrClient', function() { validResendRequests.push({channel:"stream1", resend_from:11, resend_to:11}) client.socket.once('subscribed', function() { - client.socket.emit('ui', msg("stream1", 0, {counter: 0})) - client.socket.emit('ui', msg("stream1", 10, {counter: 10})) - client.socket.emit('ui', msg("stream1", 12, {counter: 12})) + client.socket.emit('b', msg("stream1", 0, {counter: 0})) + client.socket.emit('b', msg("stream1", 10, {counter: 10}, undefined, 9)) + client.socket.emit('b', msg("stream1", 12, {counter: 12}, undefined, 11)) }) }) - - it('should re-request from the latest counter on reconnect', function(done) { - var sub1 = client.subscribe("stream1", function(message) {}, {resend_all:true}) - var sub2 = client.subscribe("stream2", function(message) {}, {resend_from:0}) - var sub3 = client.subscribe("stream3", function(message) {}) // no resend for stream3 - client.connect() - validResendRequests.push({channel:"stream1", resend_all:true}) - validResendRequests.push({channel:"stream2", resend_from:0}) + describe('on reconnect', function() { + var msgHandler + beforeEach(function() { + msgHandler = sinon.spy() + }) - client.socket.on('subscribed', function(response) { - if (response.channel==='stream1') { - client.socket.emit('ui', msg("stream1",0)) - client.socket.emit('ui', msg("stream1",1)) - client.socket.emit('ui', msg("stream1",2)) - } - else if (response.channel==='stream2') { - client.socket.emit('ui', msg("stream2",0)) + it('no resend', function(done) { + client.subscribe("stream", msgHandler) + client.connect() + + client.socket.on('subscribed', function(response) { + client.socket.emit('b', msg("stream", 0)) + client.socket.emit('disconnect') + }) + + client.socket.once('disconnect', function() { + client.connect() + + socket.on('resend', function() { + throw "Should not have made a resend request!" + }) + + socket.on('subscribed', function() { + assert.equal(msgHandler.callCount, 1) + done() + }) + }) + }) + + it('resend_all', function(done) { + validResendRequests.push({channel:"stream", resend_all: true}) + resendLimits["stream"] = { + from: 0, + to: 5 } - else if (response.channel==='stream3') { - client.socket.emit('ui', msg("stream3",0)) + + client.subscribe("stream", msgHandler, { resend_all: true }) + client.connect() + + client.socket.on('subscribed', function(response) { + client.socket.emit('disconnect') + }) + + client.socket.once('disconnect', function() { + client.connect() + + socket.on('resend', function(request) { + assert.equal(request.resend_from, 6) + assert.equal(request.resend_to, undefined) + done() + }) + }) + }) + + it('resend_from', function(done) { + validResendRequests.push({channel:"stream", resend_from: 3}) + resendLimits["stream"] = { + from: 0, + to: 5 } - client.socket.emit('disconnect') + + client.subscribe("stream", msgHandler, { resend_from: 3 }) + client.connect() + + client.socket.on('subscribed', function(response) { + client.socket.emit('disconnect') + }) + + client.socket.once('disconnect', function() { + client.connect() + + socket.on('resend', function(request) { + assert.equal(request.resend_from, 6) + assert.equal(request.resend_to, undefined) + done() + }) + }) }) - client.socket.once('disconnect', function() { + it('resend_last', function(done) { + validResendRequests.push({channel:"stream", resend_last: 1}) + resendLimits["stream"] = { + from: 0, + to: 5 + } + + client.subscribe("stream", msgHandler, { resend_last: 1 }) client.connect() - client.socket.on('subscribe', function(request) { - if (request.channel==='stream1' && request.from !== 3) - throw "Wrong starting index for "+request.channel+": "+request.from - else if (request.channel==='stream2' && request.from !== 1) - throw "Wrong starting index for "+request.channel+": "+request.from - else if (request.channel==='stream3' && request.from !== undefined) - throw "Should not have specified the from field for stream3: "+request.from + client.socket.on('subscribed', function(response) { + client.socket.emit('disconnect') }) + client.socket.once('disconnect', function() { + client.connect() + + socket.on('resend', function(request) { + assert.equal(request.resend_last, 1) + done() + }) + }) + }) + + it('resend_last should accept a gap on reconnect', function(done) { + validResendRequests.push({channel:"stream", resend_last: 1}) + resendLimits["stream"] = { + from: 0, + to: 0 + } + + client.subscribe("stream", msgHandler, { resend_last: 1 }) + client.connect() + client.socket.on('subscribed', function(response) { - if (sub1.isSubscribed() && sub2.isSubscribed() && sub3.isSubscribed()) + socket.off('resend', socket.defaultResendHandler) + client.socket.emit('disconnect') + }) + + client.socket.once('disconnect', function() { + client.connect() + + socket.on('resend', function(request) { + assert.equal(request.resend_last, 1) + client.socket.emit('resending', { + channel: request.channel, + sub: request.sub + }) + client.socket.emit('u', msg(request.channel, 10, {}, request.sub, 9)) + client.socket.emit('resent', {channel: request.channel, sub: request.sub}) + assert.equal(msgHandler.callCount, 2) done() + }) }) }) }) - }) describe("Subscription", function() { @@ -1378,8 +1391,8 @@ describe('StreamrClient', function() { if (++subscribeCount === 2) done() } - sub1.bind('subscribed', check) - sub2.bind('subscribed', check) + sub1.on('subscribed', check) + sub2.on('subscribed', check) client.connect() }) @@ -1393,8 +1406,8 @@ describe('StreamrClient', function() { var sub1 = client.subscribe("stream1", function(message) {}) var sub2 = client.subscribe("stream2", function(message) {}) - sub1.bind('unsubscribed', check) - sub2.bind('unsubscribed', check) + sub1.on('unsubscribed', check) + sub2.on('unsubscribed', check) client.connect() @@ -1409,14 +1422,14 @@ describe('StreamrClient', function() { describe("client events", function() { it('should trigger a connected event on connect', function(done) { - client.bind('connected', function() { + client.on('connected', function() { done() }) client.connect() }) it('should trigger a disconnected event on disconnect', function(done) { - client.bind('disconnected', function() { + client.on('disconnected', function() { done() }) client.connect()