diff --git a/.editorconfig b/.editorconfig new file mode 100644 index 00000000..e29f5e50 --- /dev/null +++ b/.editorconfig @@ -0,0 +1,9 @@ +root = true + +[*] +indent_style = space +indent_size = 2 +end_of_line = LF +charset = utf-8 +trim_trailing_whitespace = true +insert_final_newline = true diff --git a/.travis.yml b/.travis.yml index 7bd066b2..736e5fe7 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,10 +1,8 @@ language: node_js node_js: - "10" - - "9" - "8" - "6" - - "4" -script: "npm run jshint && npm run test-cover" +script: "ln -s .. node_modules/sharedb; npm run jshint && npm run test-cover" # Send coverage data to Coveralls after_script: "cat ./coverage/lcov.info | ./node_modules/coveralls/bin/coveralls.js" diff --git a/README.md b/README.md index 12d84d74..c627942c 100644 --- a/README.md +++ b/README.md @@ -19,6 +19,7 @@ tracker](https://github.com/share/sharedb/issues). - Realtime synchronization of any JSON document - Concurrent multi-user collaboration +- Realtime synchronization of any ephemeral "presence" data - Synchronous editing API with asynchronous eventual consistency - Realtime query subscriptions - Simple integration with any database - [MongoDB](https://github.com/share/sharedb-mongo), [PostgresQL](https://github.com/share/sharedb-postgres) (experimental) @@ -57,6 +58,10 @@ initial data. Then you can submit editing operations on the document (using OT). Finally you can delete the document with a delete operation. By default, ShareDB stores all operations forever - nothing is truly deleted. +## User presence synchronization + +Presence data represents a user and is automatically synchronized between all clients subscribed to the same document. Its format is defined by the document's [OT Type](https://github.com/ottypes/docs), for example it may contain a user ID and a cursor position in a text document. All clients can modify their own presence data and receive a read-only version of other client's data. Presence data is automatically cleared when a client unsubscribes from the document or disconnects. It is also automatically transformed against applied operations, so that it still makes sense in the context of a modified document, for example a cursor position may be automatically advanced when a user types at the beginning of a text document. + ## Server API ### Initialization @@ -223,6 +228,9 @@ Unique document ID `doc.data` _(Object)_ Document contents. Available after document is fetched or subscribed to. +`doc.presence` _(Object)_ +Each property under `doc.presence` contains presence data shared by a client subscribed to this document. The property name is an empty string for this client's data and connection IDs for other clients' data. + `doc.fetch(function(err) {...})` Populate the fields on `doc` with a snapshot of the document from the server. @@ -252,6 +260,9 @@ An operation was applied to the data. `source` will be `false` for ops received `doc.on('del', function(data, source) {...})` The document was deleted. Document contents before deletion are passed in as an argument. `source` will be `false` for ops received from the server and defaults to `true` for ops generated locally. +`doc.on('presence', function(srcList, submitted) {...})` +Presence data has changed. `srcList` is an Array of `doc.presence` property names for which values have changed. `submitted` is `true`, if the event is the result of new presence data being submitted by the local or remote user, otherwise it is `false` - eg if the presence data was transformed against an operation or was cleared on unsubscribe, disconnect or roll-back. + `doc.on('error', function(err) {...})` There was an error fetching the document or applying an operation. @@ -285,6 +296,11 @@ Invokes the given callback function after Note that `whenNothingPending` does NOT wait for pending `model.query()` calls. +`doc.submitPresence(presenceData[, function(err) {...}])` +Set local presence data and publish it for other clients. +`presenceData` structure depends on the document type. +Presence is synchronized only when subscribed to the document. + ### Class: `ShareDB.Query` `query.ready` _(Boolean)_ @@ -360,6 +376,9 @@ Additional fields may be added to the error object for debugging context dependi * 4021 - Invalid client id * 4022 - Database adapter does not support queries * 4023 - Cannot project snapshots of this type +* 4024 - OT Type does not support presence +* 4025 - Not subscribed to document +* 4026 - Presence data superseded ### 5000 - Internal error diff --git a/lib/agent.js b/lib/agent.js index d1a944de..ac9c12d7 100644 --- a/lib/agent.js +++ b/lib/agent.js @@ -1,6 +1,7 @@ var hat = require('hat'); var util = require('./util'); var types = require('./types'); +var ShareDBError = require('./error'); /** * Agent deserializes the wire protocol messages received from the stream and @@ -25,6 +26,9 @@ function Agent(backend, stream) { // Map from queryId -> emitter this.subscribedQueries = {}; + // The max presence sequence number received from the client. + this.maxPresenceSeq = 0; + // We need to track this manually to make sure we don't reply to messages // after the stream was closed. this.closed = false; @@ -98,10 +102,17 @@ Agent.prototype._subscribeToStream = function(collection, id, stream) { console.error('Doc subscription stream error', collection, id, data.error); return; } + if (data.a === 'p') { + // Send other clients' presence data + if (data.src !== agent.clientId) agent.send(data); + return; + } if (agent._isOwnOp(collection, data)) return; agent._sendOp(collection, id, data); }); stream.on('end', function() { + var presence = agent._createPresence(collection, id); + agent.backend.sendPresence(presence); // The op stream is done sending, so release its reference var streams = agent.subscribedDocs[collection]; if (!streams) return; @@ -268,6 +279,13 @@ Agent.prototype._checkRequest = function(request) { // Bulk request if (request.c != null && typeof request.c !== 'string') return 'Invalid collection'; if (typeof request.b !== 'object') return 'Invalid bulk subscribe data'; + } else if (request.a === 'p') { + // Presence + if (typeof request.c !== 'string') return 'Invalid collection'; + if (typeof request.d !== 'string') return 'Invalid id'; + if (typeof request.v !== 'number' || request.v < 0) return 'Invalid version'; + if (typeof request.seq !== 'number' || request.seq <= 0) return 'Invalid seq'; + if (typeof request.r !== 'undefined' && typeof request.r !== 'boolean') return 'Invalid "request reply" value'; } }; @@ -300,6 +318,9 @@ Agent.prototype._handleMessage = function(request, callback) { var op = this._createOp(request); if (!op) return callback({code: 4000, message: 'Invalid op message'}); return this._submit(request.c, request.d, op, callback); + case 'p': + var presence = this._createPresence(request.c, request.d, request.p, request.v, request.r, request.seq); + return this._presence(presence, callback); default: callback({code: 4000, message: 'Invalid or unknown message'}); } @@ -582,3 +603,34 @@ Agent.prototype._createOp = function(request) { return new DeleteOp(src, request.seq, request.v, request.del); } }; + +Agent.prototype._presence = function(presence, callback) { + if (presence.seq <= this.maxPresenceSeq) { + return process.nextTick(function() { + callback(new ShareDBError(4026, 'Presence data superseded')); + }); + } + this.maxPresenceSeq = presence.seq; + if (!this.subscribedDocs[presence.c] || !this.subscribedDocs[presence.c][presence.d]) { + return process.nextTick(function() { + callback(new ShareDBError(4025, 'Cannot send presence. Not subscribed to document: ' + presence.c + ' ' + presence.d)); + }); + } + this.backend.sendPresence(presence, function(err) { + if (err) return callback(err); + callback(null, { seq: presence.seq }); + }); +}; + +Agent.prototype._createPresence = function(collection, id, data, version, requestReply, seq) { + return { + a: 'p', + src: this.clientId, + seq: seq != null ? seq : this.maxPresenceSeq, + c: collection, + d: id, + p: data, + v: version, + r: requestReply + }; +}; diff --git a/lib/backend.js b/lib/backend.js index 3156cfc8..a8553683 100644 --- a/lib/backend.js +++ b/lib/backend.js @@ -568,6 +568,11 @@ Backend.prototype.getChannels = function(collection, id) { ]; }; +Backend.prototype.sendPresence = function(presence, callback) { + var channels = [ this.getDocChannel(presence.c, presence.d) ]; + this.pubsub.publish(channels, presence, callback); +}; + function pluckIds(snapshots) { var ids = []; for (var i = 0; i < snapshots.length; i++) { diff --git a/lib/client/connection.js b/lib/client/connection.js index f4cc298e..b8d7f1cc 100644 --- a/lib/client/connection.js +++ b/lib/client/connection.js @@ -243,6 +243,11 @@ Connection.prototype.handleMessage = function(message) { if (doc) doc._handleOp(err, message); return; + case 'p': + var doc = this.getExisting(message.c, message.d); + if (doc) doc._handlePresence(err, message); + return; + default: console.warn('Ignoring unrecognized message', message); } @@ -408,6 +413,23 @@ Connection.prototype.sendOp = function(doc, op) { this.send(message); }; +Connection.prototype.sendPresence = function(doc, data, requestReply) { + // Ensure the doc is registered so that it receives the reply message + this._addDoc(doc); + var message = { + a: 'p', + c: doc.collection, + d: doc.id, + p: data, + v: doc.version || 0, + seq: this.seq++ + }; + if (requestReply) { + message.r = true; + } + this.send(message); +}; + /** * Sends a message down the socket diff --git a/lib/client/doc.js b/lib/client/doc.js index 05e17976..42d8d37d 100644 --- a/lib/client/doc.js +++ b/lib/client/doc.js @@ -28,6 +28,14 @@ var types = require('../types'); * }) * * + * Presence + * -------- + * + * We can associate transient "presence" data with a document, eg caret position, etc. + * The presence data is synchronized on the best-effort basis between clients subscribed to the same document. + * Each client has their own presence data which is read-write. Other clients' data is read-only. + * + * * Events * ------ * @@ -42,6 +50,7 @@ var types = require('../types'); * the data is null. It is passed the data before delteion as an * arguments * - `load ()` Fired when a new snapshot is ingested from a fetch, subscribe, or query + * - `presence ([src])` Fired after the presence data has changed. */ module.exports = Doc; @@ -57,11 +66,37 @@ function Doc(connection, collection, id) { this.type = null; this.data = undefined; + // The current presence data + // Map of src -> presence data + // Local src === '' + this.presence = Object.create(null); + // The presence objects received from the server + // Map of src -> presence + this.receivedPresence = Object.create(null); + // The minimum amount of time to wait before removing processed presence from this.receivedPresence. + // The processed presence is removed to avoid leaking memory, in case peers keep connecting and disconnecting a lot. + // The processed presence is not removed immediately to enable avoiding race conditions, where messages with lower + // sequence number arrive after messages with higher sequence numbers. + this.receivedPresenceTimeout = 60000; + // If set to true, then the next time the local presence is sent, + // all other clients will be asked to reply with their own presence data. + this.requestReplyPresence = true; + // A list of ops sent by the server. These are needed for transforming presence data, + // if we get that presence data for an older version of the document. + // The ops are cached for at least 1 minute by default, which should be lots, considering that the presence + // data is supposed to be synced in real-time. + this.cachedOps = []; + this.cachedOpsTimeout = 60000; + // The sequence number of the inflight presence request. + this.inflightPresenceSeq = 0; + // Array of callbacks or nulls as placeholders this.inflightFetch = []; this.inflightSubscribe = []; this.inflightUnsubscribe = []; + this.inflightPresence = null; this.pendingFetch = []; + this.pendingPresence = null; // Whether we think we are subscribed on the server. Synchronously set to // false on calls to unsubscribe and disconnect. Should never be true when @@ -104,11 +139,24 @@ emitter.mixin(Doc); Doc.prototype.destroy = function(callback) { var doc = this; doc.whenNothingPending(function() { - doc.connection._destroyDoc(doc); if (doc.wantSubscribe) { - return doc.unsubscribe(callback); + doc.unsubscribe(function(err) { + if (err) { + if (callback) callback(err); + else this.emit('error', err); + return; + } + doc.receivedPresence = Object.create(null); + doc.cachedOps.length = 0; + doc.connection._destroyDoc(doc); + if (callback) callback(); + }); + } else { + doc.receivedPresence = Object.create(null); + doc.cachedOps.length = 0; + doc.connection._destroyDoc(doc); + if (callback) callback(); } - if (callback) callback(); }); }; @@ -186,12 +234,14 @@ Doc.prototype.ingestSnapshot = function(snapshot, callback) { if (this.version > snapshot.v) return callback && callback(); this.version = snapshot.v; + this.cachedOps.length = 0; var type = (snapshot.type === undefined) ? types.defaultType : snapshot.type; this._setType(type); this.data = (this.type && this.type.deserialize) ? this.type.deserialize(snapshot.data) : snapshot.data; this.emit('load'); + this._processAllReceivedPresence(); callback && callback(); }; @@ -210,7 +260,9 @@ Doc.prototype.hasPending = function() { this.inflightFetch.length || this.inflightSubscribe.length || this.inflightUnsubscribe.length || - this.pendingFetch.length + this.pendingFetch.length || + this.inflightPresence || + this.pendingPresence ); }; @@ -257,6 +309,7 @@ Doc.prototype._handleSubscribe = function(err, snapshot) { if (this.wantSubscribe) this.subscribed = true; this.ingestSnapshot(snapshot, callback); this._emitNothingPending(); + this.flush(); }; Doc.prototype._handleUnsubscribe = function(err) { @@ -307,6 +360,14 @@ Doc.prototype._handleOp = function(err, message) { return; } + var serverOp = { + src: message.src, + time: Date.now(), + create: !!message.create, + op: message.op, + del: !!message.del + }; + if (this.inflightOp) { var transformErr = transformX(this.inflightOp, message); if (transformErr) return this._hardRollback(transformErr); @@ -318,7 +379,9 @@ Doc.prototype._handleOp = function(err, message) { } this.version++; + this._cacheOp(serverOp); this._otApply(message, false); + this._processAllReceivedPresence(); return; }; @@ -342,7 +405,10 @@ Doc.prototype._onConnectionStateChanged = function() { if (this.inflightUnsubscribe.length) { var callbacks = this.inflightUnsubscribe; this.inflightUnsubscribe = []; + this._pausePresence(); callEach(callbacks); + } else { + this._pausePresence(); } } }; @@ -402,8 +468,10 @@ Doc.prototype.unsubscribe = function(callback) { if (this.connection.canSend) { var isDuplicate = this.connection.sendUnsubscribe(this); pushActionCallback(this.inflightUnsubscribe, isDuplicate, callback); + this._pausePresence(); return; } + this._pausePresence(); if (callback) process.nextTick(callback); }; @@ -426,14 +494,22 @@ function pushActionCallback(inflight, isDuplicate, callback) { // // Only one operation can be in-flight at a time. If an operation is already on // its way, or we're not currently connected, this method does nothing. +// +// If there are no pending ops, this method sends the pending presence data, if possible. Doc.prototype.flush = function() { - // Ignore if we can't send or we are already sending an op - if (!this.connection.canSend || this.inflightOp) return; + if (this.paused) return; - // Send first pending op unless paused - if (!this.paused && this.pendingOps.length) { + if (this.connection.canSend && !this.inflightOp && this.pendingOps.length) { this._sendOp(); } + + if (this.subscribed && !this.inflightPresence && this.pendingPresence && !this.hasWritePending()) { + this.inflightPresence = this.pendingPresence; + this.inflightPresenceSeq = this.connection.seq; + this.pendingPresence = null; + this.connection.sendPresence(this, this.presence[''], this.requestReplyPresence); + this.requestReplyPresence = false; + } }; // Helper function to set op to contain a no-op. @@ -538,6 +614,7 @@ Doc.prototype._otApply = function(op, source) { // Apply the individual op component this.emit('before op', componentOp.op, source); this.data = this.type.apply(this.data, componentOp.op); + this._transformAllPresence(componentOp); this.emit('op', componentOp.op, source); } // Pop whatever was submitted since we started applying this op @@ -550,6 +627,7 @@ Doc.prototype._otApply = function(op, source) { this.emit('before op', op.op, source); // Apply the operation to the local data, mutating it in place this.data = this.type.apply(this.data, op.op); + this._transformAllPresence(op); // Emit an 'op' event once the local data includes the changes from the // op. For locally submitted ops, this will be synchronously with // submission and before the server or other clients have received the op. @@ -566,6 +644,7 @@ Doc.prototype._otApply = function(op, source) { this.type.createDeserialized(op.create.data) : this.type.deserialize(this.type.create(op.create.data)) : this.type.create(op.create.data); + this._transformAllPresence(op); this.emit('create', source); return; } @@ -573,6 +652,7 @@ Doc.prototype._otApply = function(op, source) { if (op.del) { var oldData = this.data; this._setType(null); + this._transformAllPresence(op); this.emit('del', oldData, source); return; } @@ -820,6 +900,7 @@ Doc.prototype.resume = function() { Doc.prototype._opAcknowledged = function(message) { if (this.inflightOp.create) { this.version = message.v; + this.cachedOps.length = 0; } else if (message.v !== this.version) { // We should already be at the same version, because the server should @@ -832,8 +913,16 @@ Doc.prototype._opAcknowledged = function(message) { // The op was committed successfully. Increment the version number this.version++; + this._cacheOp({ + src: this.inflightOp.src, + time: Date.now(), + create: !!this.inflightOp.create, + op: this.inflightOp.op, + del: !!this.inflightOp.del + }); this._clearInflightOp(); + this._processAllReceivedPresence(); }; Doc.prototype._rollback = function(err) { @@ -868,29 +957,54 @@ Doc.prototype._rollback = function(err) { }; Doc.prototype._hardRollback = function(err) { - // Cancel all pending ops and reset if we can't invert - var op = this.inflightOp; - var pending = this.pendingOps; + var callbacks = []; + if (this.inflightPresence) { + callbacks.push.apply(callbacks, this.inflightPresence); + this.inflightPresence = null; + this.inflightPresenceSeq = 0; + } + if (this.pendingPresence) { + callbacks.push.apply(callbacks, this.pendingPresence); + this.pendingPresence = null; + } + if (this.inflightOp) { + callbacks.push.apply(callbacks, this.inflightOp.callbacks); + } + for (var i = 0; i < this.pendingOps.length; i++) { + callbacks.push.apply(callbacks, this.pendingOps[i].callbacks); + } + this._setType(null); this.version = null; this.inflightOp = null; this.pendingOps = []; + this.cachedOps.length = 0; + this.receivedPresence = Object.create(null); + this.requestReplyPresence = true; + + var srcList = Object.keys(this.presence); + var changedSrcList = []; + for (var i = 0; i < srcList.length; i++) { + var src = srcList[i]; + if (this._setPresence(src, null)) { + changedSrcList.push(src); + } + } + this._emitPresence(changedSrcList, false); // Fetch the latest from the server to get us back into a working state var doc = this; this.fetch(function() { - var called = op && callEach(op.callbacks, err); - for (var i = 0; i < pending.length; i++) { - callEach(pending[i].callbacks, err); - } + var called = callEach(callbacks, err); if (err && !called) return doc.emit('error', err); }); }; Doc.prototype._clearInflightOp = function(err) { - var called = callEach(this.inflightOp.callbacks, err); - + var callbacks = this.inflightOp && this.inflightOp.callbacks; this.inflightOp = null; + var called = callbacks && callEach(callbacks, err); + this.flush(); this._emitNothingPending(); @@ -908,3 +1022,281 @@ function callEach(callbacks, err) { } return called; } + +// *** Presence + +Doc.prototype.submitPresence = function (data, callback) { + if (data != null) { + if (!this.type) { + var doc = this; + return process.nextTick(function() { + var err = new ShareDBError(4015, 'Cannot submit presence. Document has not been created. ' + doc.collection + '.' + doc.id); + if (callback) return callback(err); + doc.emit('error', err); + }); + } + + if (!this.type.createPresence || !this.type.transformPresence) { + var doc = this; + return process.nextTick(function() { + var err = new ShareDBError(4024, 'Cannot submit presence. Document\'s type does not support presence. ' + doc.collection + '.' + doc.id); + if (callback) return callback(err); + doc.emit('error', err); + }); + } + + data = this.type.createPresence(data); + } + + if (this._setPresence('', data, true) || this.pendingPresence || this.inflightPresence) { + if (!this.pendingPresence) { + this.pendingPresence = []; + } + if (callback) { + this.pendingPresence.push(callback); + } + + } else if (callback) { + process.nextTick(callback); + } + + var doc = this; + process.nextTick(function() { + doc.flush(); + }); +}; + +Doc.prototype._handlePresence = function(err, presence) { + if (!this.subscribed) return; + + var src = presence.src; + if (!src) { + // Handle the ACK for the presence data we submitted. + // this.inflightPresenceSeq would not equal presence.seq after a hard rollback, + // when all callbacks are flushed with an error. + if (this.inflightPresenceSeq === presence.seq) { + var callbacks = this.inflightPresence; + this.inflightPresence = null; + this.inflightPresenceSeq = 0; + var called = callbacks && callEach(callbacks, err); + if (err && !called) this.emit('error', err); + this.flush(); + this._emitNothingPending(); + } + return; + } + + // This shouldn't happen but check just in case. + if (err) return this.emit('error', err); + + if (presence.r && !this.pendingPresence) { + // Another client requested us to share our current presence data + this.pendingPresence = []; + this.flush(); + } + + // Ignore older messages which arrived out of order + if ( + this.receivedPresence[src] && ( + this.receivedPresence[src].seq > presence.seq || + (this.receivedPresence[src].seq === presence.seq && presence.v != null) + ) + ) return; + + this.receivedPresence[src] = presence; + + if (presence.v == null) { + // null version should happen only when the server automatically sends + // null presence for an unsubscribed client + presence.processedAt = Date.now(); + return this._setPresence(src, null, true); + } + + // Get missing ops first, if necessary + if (this.version == null || this.version < presence.v) return this.fetch(); + + this._processReceivedPresence(src, true); +}; + +// If emit is true and presence has changed, emits a presence event. +// Returns true, if presence has changed for src. Otherwise false. +Doc.prototype._processReceivedPresence = function(src, emit) { + if (!src) return false; + var presence = this.receivedPresence[src]; + if (!presence) return false; + + if (presence.processedAt != null) { + if (Date.now() >= presence.processedAt + this.receivedPresenceTimeout) { + // Remove old received and processed presence + delete this.receivedPresence[src]; + } + return false; + } + + if (this.version == null || this.version < presence.v) return false; // keep waiting for the missing snapshot or ops + + if (presence.p == null) { + // Remove presence data as requested + presence.processedAt = Date.now(); + return this._setPresence(src, null, emit); + } + + if (!this.type || !this.type.createPresence || !this.type.transformPresence) { + // Remove presence data because the document is not created or its type does not support presence + presence.processedAt = Date.now(); + return this._setPresence(src, null, emit); + } + + if (this.inflightOp && this.inflightOp.op == null) { + // Remove presence data because receivedPresence can be transformed only against "op", not "create" nor "del" + presence.processedAt = Date.now(); + return this._setPresence(src, null, emit); + } + + for (var i = 0; i < this.pendingOps.length; i++) { + if (this.pendingOps[i].op == null) { + // Remove presence data because receivedPresence can be transformed only against "op", not "create" nor "del" + presence.processedAt = Date.now(); + return this._setPresence(src, null, emit); + } + } + + var startIndex = this.cachedOps.length - (this.version - presence.v); + if (startIndex < 0) { + // Remove presence data because we can't transform receivedPresence + presence.processedAt = Date.now(); + return this._setPresence(src, null, emit); + } + + for (var i = startIndex; i < this.cachedOps.length; i++) { + if (this.cachedOps[i].op == null) { + // Remove presence data because receivedPresence can be transformed only against "op", not "create" nor "del" + presence.processedAt = Date.now(); + return this._setPresence(src, null, emit); + } + } + + // Make sure the format of the data is correct + var data = this.type.createPresence(presence.p); + + // Transform against past ops + for (var i = startIndex; i < this.cachedOps.length; i++) { + var op = this.cachedOps[i]; + data = this.type.transformPresence(data, op.op, presence.src === op.src); + } + + // Transform against pending ops + if (this.inflightOp) { + data = this.type.transformPresence(data, this.inflightOp.op, false); + } + + for (var i = 0; i < this.pendingOps.length; i++) { + data = this.type.transformPresence(data, this.pendingOps[i].op, false); + } + + // Set presence data + presence.processedAt = Date.now(); + return this._setPresence(src, data, emit); +}; + +Doc.prototype._processAllReceivedPresence = function() { + var srcList = Object.keys(this.receivedPresence); + var changedSrcList = []; + for (var i = 0; i < srcList.length; i++) { + var src = srcList[i]; + if (this._processReceivedPresence(src)) { + changedSrcList.push(src); + } + } + this._emitPresence(changedSrcList, true); +}; + +Doc.prototype._transformPresence = function(src, op) { + var presenceData = this.presence[src]; + if (op.op != null) { + var isOwnOperation = src === (op.src || ''); + presenceData = this.type.transformPresence(presenceData, op.op, isOwnOperation); + } else { + presenceData = null; + } + return this._setPresence(src, presenceData); +}; + +Doc.prototype._transformAllPresence = function(op) { + var srcList = Object.keys(this.presence); + var changedSrcList = []; + for (var i = 0; i < srcList.length; i++) { + var src = srcList[i]; + if (this._transformPresence(src, op)) { + changedSrcList.push(src); + } + } + this._emitPresence(changedSrcList, false); +}; + +Doc.prototype._pausePresence = function() { + if (this.inflightPresence) { + this.pendingPresence = + this.pendingPresence ? + this.inflightPresence.concat(this.pendingPresence) : + this.inflightPresence; + this.inflightPresence = null; + this.inflightPresenceSeq = 0; + } else if (!this.pendingPresence && this.presence[''] != null) { + this.pendingPresence = []; + } + this.receivedPresence = Object.create(null); + this.requestReplyPresence = true; + var srcList = Object.keys(this.presence); + var changedSrcList = []; + for (var i = 0; i < srcList.length; i++) { + var src = srcList[i]; + if (src && this._setPresence(src, null)) { + changedSrcList.push(src); + } + } + this._emitPresence(changedSrcList, false); +}; + +// If emit is true and presence has changed, emits a presence event. +// Returns true, if presence has changed. Otherwise false. +Doc.prototype._setPresence = function(src, data, emit) { + if (data == null) { + if (this.presence[src] == null) return false; + delete this.presence[src]; + } else { + var isPresenceEqual = + this.presence[src] === data || + (this.type.comparePresence && this.type.comparePresence(this.presence[src], data)); + if (isPresenceEqual) return false; + this.presence[src] = data; + } + if (emit) this._emitPresence([ src ], true); + return true; +}; + +Doc.prototype._emitPresence = function(srcList, submitted) { + if (srcList && srcList.length > 0) { + var doc = this; + process.nextTick(function() { + doc.emit('presence', srcList, submitted); + }); + } +}; + +Doc.prototype._cacheOp = function(op) { + // Remove the old ops. + var oldOpTime = Date.now() - this.cachedOpsTimeout; + var i; + for (i = 0; i < this.cachedOps.length; i++) { + if (this.cachedOps[i].time >= oldOpTime) { + break; + } + } + if (i > 0) { + this.cachedOps.splice(0, i); + } + + // Cache the new op. + this.cachedOps.push(op); +}; diff --git a/package.json b/package.json index 35fc64bc..934a886c 100644 --- a/package.json +++ b/package.json @@ -16,7 +16,7 @@ "expect.js": "^0.3.1", "istanbul": "^0.4.2", "jshint": "^2.9.2", - "mocha": "^3.2.0", + "mocha": "^5.1.1", "sharedb-mingo-memory": "^1.0.0-beta" }, "scripts": { diff --git a/test/client/presence-type.js b/test/client/presence-type.js new file mode 100644 index 00000000..51ad272a --- /dev/null +++ b/test/client/presence-type.js @@ -0,0 +1,82 @@ +// A simple type for testing presence, where: +// +// - snapshot is a list +// - operation is { index, value } -> insert value at index in snapshot +// - presence is { index } -> an index in the snapshot +exports.type = { + name: 'wrapped-presence-no-compare', + uri: 'http://sharejs.org/types/wrapped-presence-no-compare', + create: create, + apply: apply, + transform: transform, + createPresence: createPresence, + transformPresence: transformPresence +}; + +// The same as `exports.type` but implements `comparePresence`. +exports.type2 = { + name: 'wrapped-presence-with-compare', + uri: 'http://sharejs.org/types/wrapped-presence-with-compare', + create: create, + apply: apply, + transform: transform, + createPresence: createPresence, + transformPresence: transformPresence, + comparePresence: comparePresence +}; + +// The same as `exports.type` but `presence.index` is unwrapped. +exports.type3 = { + name: 'unwrapped-presence', + uri: 'http://sharejs.org/types/unwrapped-presence', + create: create, + apply: apply, + transform: transform, + createPresence: createPresence2, + transformPresence: transformPresence2 +}; + +function create(data) { + return data || []; +} + +function apply(snapshot, op) { + snapshot.splice(op.index, 0, op.value); + return snapshot; +} + +function transform(op1, op2, side) { + return op1.index < op2.index || (op1.index === op2.index && side === 'left') ? + op1 : + { + index: op1.index + 1, + value: op1.value + }; +} + +function createPresence(data) { + return { index: (data && data.index) | 0 }; +} + +function transformPresence(presence, op, isOwnOperation) { + return presence.index < op.index || (presence.index === op.index && !isOwnOperation) ? + presence : + { + index: presence.index + 1 + }; +} + +function comparePresence(presence1, presence2) { + return presence1 === presence2 || + (presence1 == null && presence2 == null) || + (presence1 != null && presence2 != null && presence1.index === presence2.index); +} + +function createPresence2(data) { + return data | 0; +} + +function transformPresence2(presence, op, isOwnOperation) { + return presence < op.index || (presence === op.index && !isOwnOperation) ? + presence : presence + 1; +} diff --git a/test/client/presence.js b/test/client/presence.js new file mode 100644 index 00000000..2bf7e7da --- /dev/null +++ b/test/client/presence.js @@ -0,0 +1,1441 @@ +var async = require('async'); +var util = require('../util'); +var errorHandler = util.errorHandler; +var Backend = require('../../lib/backend'); +var ShareDBError = require('../../lib/error'); +var expect = require('expect.js'); +var types = require('../../lib/types'); +var presenceType = require('./presence-type'); +types.register(presenceType.type); +types.register(presenceType.type2); +types.register(presenceType.type3); + +[ + 'wrapped-presence-no-compare', + 'wrapped-presence-with-compare', + 'unwrapped-presence' +].forEach(function(typeName) { + function p(index) { + return typeName === 'unwrapped-presence' ? index : { index: index }; + } + + describe('client presence (' + typeName + ')', function() { + beforeEach(function() { + this.backend = new Backend(); + this.connection = this.backend.connect(); + this.connection2 = this.backend.connect(); + this.doc = this.connection.get('dogs', 'fido'); + this.doc2 = this.connection2.get('dogs', 'fido'); + }); + + afterEach(function(done) { + this.backend.close(done); + }); + + it('sends presence immediately', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [], typeName), + this.doc.subscribe.bind(this.doc), + this.doc2.subscribe.bind(this.doc2), + function(done) { + this.doc.requestReplyPresence = false; + this.doc.submitPresence(p(1), errorHandler(done)); + this.doc2.once('presence', function(srcList, submitted) { + expect(srcList).to.eql([ this.connection.id ]); + expect(submitted).to.equal(true); + expect(this.doc2.data).to.eql([]); + expect(this.doc2.presence[this.connection.id]).to.eql(p(1)); + done(); + }.bind(this)); + }.bind(this) + ], allDone); + }); + + it('sends presence after pending ops', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [], typeName), + this.doc.subscribe.bind(this.doc), + this.doc2.subscribe.bind(this.doc2), + function(done) { + this.doc.submitOp({ index: 0, value: 'a' }, errorHandler(done)); + this.doc.submitOp({ index: 1, value: 'b' }, errorHandler(done)); + this.doc.requestReplyPresence = false; + this.doc.submitPresence(p(1), errorHandler(done)); + this.doc2.once('presence', function(srcList, submitted) { + expect(srcList).to.eql([ this.connection.id ]); + expect(submitted).to.equal(true); + expect(this.doc2.data).to.eql([ 'a', 'b' ]); + expect(this.doc2.presence[this.connection.id]).to.eql(p(1)); + done(); + }.bind(this)); + }.bind(this) + ], allDone); + }); + + it('waits for pending ops before processing future presence', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [], typeName), + this.doc.subscribe.bind(this.doc), + this.doc2.subscribe.bind(this.doc2), + function(done) { + this.doc2.on('presence', function(srcList, submitted) { + expect(srcList).to.eql([ this.connection.id ]); + expect(submitted).to.equal(true); + expect(this.doc2.data).to.eql([ 'a', 'b' ]); + expect(this.doc2.presence[this.connection.id]).to.eql(p(1)); + done(); + }.bind(this)); + // A hack to send presence for a future version. + this.doc.version += 2; + this.doc.requestReplyPresence = false; + this.doc.submitPresence(p(1), function(err) { + if (err) return done(err); + this.doc.version -= 2; + this.doc.submitOp({ index: 0, value: 'a' }, errorHandler(done)); + this.doc.submitOp({ index: 1, value: 'b' }, errorHandler(done)); + }.bind(this)); + }.bind(this) + ], allDone); + }); + + it('handles presence sent for earlier revisions (own ops, presence.index < op.index)', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [ 'a' ], typeName), + this.doc.subscribe.bind(this.doc), + this.doc2.subscribe.bind(this.doc2), + this.doc.submitOp.bind(this.doc, { index: 1, value: 'b' }), + this.doc.submitOp.bind(this.doc, { index: 2, value: 'c' }), + function(done) { + this.doc2.on('presence', function(srcList, submitted) { + expect(srcList).to.eql([ this.connection.id ]); + expect(submitted).to.equal(true); + expect(this.doc2.data).to.eql([ 'a', 'b', 'c' ]); + expect(this.doc2.presence[this.connection.id]).to.eql(p(0)); + done(); + }.bind(this)); + // A hack to send presence for an older version. + this.doc.version = 1; + this.doc.data = [ 'a' ]; + this.doc.requestReplyPresence = false; + this.doc.submitPresence(p(0), errorHandler(done)); + }.bind(this) + ], allDone); + }); + + it('handles presence sent for earlier revisions (own ops, presence.index === op.index)', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [ 'a' ], typeName), + this.doc.subscribe.bind(this.doc), + this.doc2.subscribe.bind(this.doc2), + this.doc.submitOp.bind(this.doc, { index: 1, value: 'c' }), + this.doc.submitOp.bind(this.doc, { index: 1, value: 'b' }), + function(done) { + this.doc2.on('presence', function(srcList, submitted) { + expect(srcList).to.eql([ this.connection.id ]); + expect(submitted).to.equal(true); + expect(this.doc2.data).to.eql([ 'a', 'b', 'c' ]); + expect(this.doc2.presence[this.connection.id]).to.eql(p(3)); + done(); + }.bind(this)); + // A hack to send presence for an older version. + this.doc.version = 1; + this.doc.data = [ 'a' ]; + this.doc.requestReplyPresence = false; + this.doc.submitPresence(p(1), errorHandler(done)); + }.bind(this) + ], allDone); + }); + + it('handles presence sent for earlier revisions (own ops, presence.index > op.index)', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [ 'c' ], typeName), + this.doc.subscribe.bind(this.doc), + this.doc2.subscribe.bind(this.doc2), + this.doc.submitOp.bind(this.doc, { index: 0, value: 'b' }), + this.doc.submitOp.bind(this.doc, { index: 0, value: 'a' }), + function(done) { + this.doc2.on('presence', function(srcList, submitted) { + expect(srcList).to.eql([ this.connection.id ]); + expect(submitted).to.equal(true); + expect(this.doc2.data).to.eql([ 'a', 'b', 'c' ]); + expect(this.doc2.presence[this.connection.id]).to.eql(p(3)); + done(); + }.bind(this)); + // A hack to send presence for an older version. + this.doc.version = 1; + this.doc.data = [ 'c' ]; + this.doc.requestReplyPresence = false; + this.doc.submitPresence(p(1), errorHandler(done)); + }.bind(this) + ], allDone); + }); + + it('handles presence sent for earlier revisions (non-own ops, presence.index < op.index)', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [ 'a' ], typeName), + this.doc.subscribe.bind(this.doc), + this.doc2.subscribe.bind(this.doc2), + this.doc2.submitOp.bind(this.doc2, { index: 1, value: 'b' }), + this.doc2.submitOp.bind(this.doc2, { index: 2, value: 'c' }), + function(done) { + this.doc2.on('presence', function(srcList, submitted) { + expect(srcList).to.eql([ this.connection.id ]); + expect(submitted).to.equal(true); + expect(this.doc2.data).to.eql([ 'a', 'b', 'c' ]); + expect(this.doc2.presence[this.connection.id]).to.eql(p(0)); + done(); + }.bind(this)); + // A hack to send presence for an older version. + this.doc.version = 1; + this.doc.data = [ 'a' ]; + this.doc.requestReplyPresence = false; + this.doc.submitPresence(p(0), errorHandler(done)); + }.bind(this) + ], allDone); + }); + + it('handles presence sent for earlier revisions (non-own ops, presence.index === op.index)', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [ 'a' ], typeName), + this.doc.subscribe.bind(this.doc), + this.doc2.subscribe.bind(this.doc2), + this.doc2.submitOp.bind(this.doc2, { index: 1, value: 'c' }), + this.doc2.submitOp.bind(this.doc2, { index: 1, value: 'b' }), + function(done) { + this.doc2.on('presence', function(srcList, submitted) { + expect(srcList).to.eql([ this.connection.id ]); + expect(submitted).to.equal(true); + expect(this.doc2.data).to.eql([ 'a', 'b', 'c' ]); + expect(this.doc2.presence[this.connection.id]).to.eql(p(1)); + done(); + }.bind(this)); + // A hack to send presence for an older version. + this.doc.version = 1; + this.doc.data = [ 'a' ]; + this.doc.requestReplyPresence = false; + this.doc.submitPresence(p(1), errorHandler(done)); + }.bind(this) + ], allDone); + }); + + it('handles presence sent for earlier revisions (non-own ops, presence.index > op.index)', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [ 'c' ], typeName), + this.doc.subscribe.bind(this.doc), + this.doc2.subscribe.bind(this.doc2), + this.doc2.submitOp.bind(this.doc2, { index: 0, value: 'b' }), + this.doc2.submitOp.bind(this.doc2, { index: 0, value: 'a' }), + function(done) { + this.doc2.on('presence', function(srcList, submitted) { + expect(srcList).to.eql([ this.connection.id ]); + expect(submitted).to.equal(true); + expect(this.doc2.data).to.eql([ 'a', 'b', 'c' ]); + expect(this.doc2.presence[this.connection.id]).to.eql(p(3)); + done(); + }.bind(this)); + // A hack to send presence for an older version. + this.doc.version = 1; + this.doc.data = [ 'c' ]; + this.doc.requestReplyPresence = false; + this.doc.submitPresence(p(1), errorHandler(done)); + }.bind(this) + ], allDone); + }); + + it('handles presence sent for earlier revisions (transform against non-op)', function(allDone) { + async.series([ + this.doc.subscribe.bind(this.doc), + this.doc2.subscribe.bind(this.doc2), + this.doc.create.bind(this.doc, [], typeName), + this.doc.submitOp.bind(this.doc, { index: 0, value: 'a' }), + this.doc.del.bind(this.doc), + this.doc.create.bind(this.doc, [ 'b' ], typeName), + function(done) { + this.doc2.once('presence', function(srcList, submitted) { + expect(srcList).to.eql([ this.connection.id ]); + expect(submitted).to.equal(true); + expect(this.doc2.data).to.eql([ 'b' ]); + expect(this.doc2.presence[this.connection.id]).to.eql(p(0)); + done(); + }.bind(this)); + this.doc.requestReplyPresence = false; + this.doc.submitPresence(p(0), errorHandler(done)); + }.bind(this), + function(done) { + this.doc2.on('presence', function(srcList, submitted) { + expect(srcList).to.eql([ this.connection.id ]); + expect(submitted).to.equal(true); + expect(this.doc2.data).to.eql([ 'b' ]); + expect(this.doc2.presence).to.not.have.key(this.connection.id); + done(); + }.bind(this)); + // A hack to send presence for an older version. + this.doc.version = 2; + this.doc.requestReplyPresence = false; + this.doc.submitPresence(p(1), errorHandler(done)); + }.bind(this) + ], allDone); + }); + + it('handles presence sent for earlier revisions (no cached ops)', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [ 'a' ], typeName), + this.doc.subscribe.bind(this.doc), + this.doc2.subscribe.bind(this.doc2), + this.doc.submitOp.bind(this.doc, { index: 1, value: 'b' }), + this.doc.submitOp.bind(this.doc, { index: 2, value: 'c' }), + function(done) { + this.doc2.once('presence', function(srcList, submitted) { + expect(srcList).to.eql([ this.connection.id ]); + expect(submitted).to.equal(true); + expect(this.doc2.data).to.eql([ 'a', 'b', 'c' ]); + expect(this.doc2.presence[this.connection.id]).to.eql(p(0)); + done(); + }.bind(this)); + this.doc.requestReplyPresence = false; + this.doc.submitPresence(p(0), errorHandler(done)); + }.bind(this), + function(done) { + this.doc2.cachedOps = []; + this.doc2.on('presence', function(srcList, submitted) { + expect(srcList).to.eql([ this.connection.id ]); + expect(submitted).to.equal(true); + expect(this.doc2.data).to.eql([ 'a', 'b', 'c' ]); + expect(this.doc2.presence).to.not.have.key(this.connection.id); + done(); + }.bind(this)); + // A hack to send presence for an older version. + this.doc.version = 1; + this.doc.data = [ 'a' ]; + this.doc.requestReplyPresence = false; + this.doc.submitPresence(p(1), errorHandler(done)); + }.bind(this) + ], allDone); + }); + + it('transforms presence against local delete', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [], typeName), + this.doc.subscribe.bind(this.doc), + this.doc2.subscribe.bind(this.doc2), + this.doc.submitPresence.bind(this.doc, p(0)), + this.doc2.submitPresence.bind(this.doc2, p(0)), + setTimeout, + function(done) { + this.doc.on('presence', function(srcList, submitted) { + expect(srcList.sort()).to.eql([ '', this.connection2.id ]); + expect(submitted).to.equal(false); + expect(this.doc.presence).to.not.have.key(''); + expect(this.doc.presence).to.not.have.key(this.connection2.id); + done(); + }.bind(this)); + this.doc.del(errorHandler(done)); + }.bind(this) + ], allDone); + }); + + it('transforms presence against non-local delete', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [], typeName), + this.doc.subscribe.bind(this.doc), + this.doc2.subscribe.bind(this.doc2), + this.doc.submitPresence.bind(this.doc, p(0)), + this.doc2.submitPresence.bind(this.doc2, p(0)), + setTimeout, + function(done) { + this.doc.on('presence', function(srcList, submitted) { + expect(srcList.sort()).to.eql([ '', this.connection2.id ]); + expect(submitted).to.equal(false); + expect(this.doc.presence).to.not.have.key(''); + expect(this.doc.presence).to.not.have.key(this.connection2.id); + done(); + }.bind(this)); + this.doc2.del(errorHandler(done)); + }.bind(this) + ], allDone); + }); + + it('transforms presence against local op (presence.index != op.index)', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [ 'a', 'c' ], typeName), + this.doc.subscribe.bind(this.doc), + this.doc2.subscribe.bind(this.doc2), + this.doc.submitPresence.bind(this.doc, p(0)), + this.doc2.submitPresence.bind(this.doc2, p(2)), + setTimeout, + function(done) { + this.doc.on('presence', function(srcList, submitted) { + expect(srcList).to.eql([ this.connection2.id ]); + expect(submitted).to.equal(false); + expect(this.doc.presence['']).to.eql(p(0)); + expect(this.doc.presence[this.connection2.id]).to.eql(p(3)); + done(); + }.bind(this)); + this.doc.submitOp({ index: 1, value: 'b' }, errorHandler(done)); + }.bind(this) + ], allDone); + }); + + it('transforms presence against non-local op (presence.index != op.index)', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [ 'a', 'c' ], typeName), + this.doc.subscribe.bind(this.doc), + this.doc2.subscribe.bind(this.doc2), + this.doc.submitPresence.bind(this.doc, p(0)), + this.doc2.submitPresence.bind(this.doc2, p(2)), + setTimeout, + function(done) { + this.doc.on('presence', function(srcList, submitted) { + expect(srcList).to.eql([ this.connection2.id ]); + expect(submitted).to.equal(false); + expect(this.doc.presence['']).to.eql(p(0)); + expect(this.doc.presence[this.connection2.id]).to.eql(p(3)); + done(); + }.bind(this)); + this.doc2.submitOp({ index: 1, value: 'b' }, errorHandler(done)); + }.bind(this) + ], allDone); + }); + + it('transforms presence against local op (presence.index == op.index)', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [ 'a', 'c' ], typeName), + this.doc.subscribe.bind(this.doc), + this.doc2.subscribe.bind(this.doc2), + this.doc.submitPresence.bind(this.doc, p(1)), + this.doc2.submitPresence.bind(this.doc2, p(1)), + setTimeout, + function(done) { + this.doc.on('presence', function(srcList, submitted) { + expect(srcList).to.eql([ '' ]); + expect(submitted).to.equal(false); + expect(this.doc.presence['']).to.eql(p(2)); + expect(this.doc.presence[this.connection2.id]).to.eql(p(1)); + done(); + }.bind(this)); + this.doc.submitOp({ index: 1, value: 'b' }, errorHandler(done)); + }.bind(this) + ], allDone); + }); + + it('transforms presence against non-local op (presence.index == op.index)', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [ 'a', 'c' ], typeName), + this.doc.subscribe.bind(this.doc), + this.doc2.subscribe.bind(this.doc2), + this.doc.submitPresence.bind(this.doc, p(1)), + this.doc2.submitPresence.bind(this.doc2, p(1)), + setTimeout, + function(done) { + this.doc.on('presence', function(srcList, submitted) { + expect(srcList).to.eql([ this.connection2.id ]); + expect(submitted).to.equal(false); + expect(this.doc.presence['']).to.eql(p(1)); + expect(this.doc.presence[this.connection2.id]).to.eql(p(2)); + done(); + }.bind(this)); + this.doc2.submitOp({ index: 1, value: 'b' }, errorHandler(done)); + }.bind(this) + ], allDone); + }); + + it('caches local ops', function(allDone) { + var op = { index: 1, value: 'b' }; + async.series([ + this.doc.create.bind(this.doc, [ 'a' ], typeName), + this.doc.submitOp.bind(this.doc, op), + this.doc.del.bind(this.doc), + function(done) { + expect(this.doc.cachedOps.length).to.equal(3); + expect(this.doc.cachedOps[0].create).to.equal(true); + expect(this.doc.cachedOps[1].op).to.equal(op); + expect(this.doc.cachedOps[2].del).to.equal(true); + done(); + }.bind(this) + ], allDone); + }); + + it('caches non-local ops', function(allDone) { + var op = { index: 1, value: 'b' }; + async.series([ + this.doc2.subscribe.bind(this.doc2), + this.doc.create.bind(this.doc, [ 'a' ], typeName), + this.doc.submitOp.bind(this.doc, op), + this.doc.del.bind(this.doc), + setTimeout, + function(done) { + expect(this.doc2.cachedOps.length).to.equal(3); + expect(this.doc2.cachedOps[0].create).to.equal(true); + expect(this.doc2.cachedOps[1].op).to.eql(op); + expect(this.doc2.cachedOps[2].del).to.equal(true); + done(); + }.bind(this) + ], allDone); + }); + + it('expires cached ops', function(allDone) { + var op1 = { index: 1, value: 'b' }; + var op2 = { index: 2, value: 'b' }; + var op3 = { index: 3, value: 'b' }; + this.doc.cachedOpsTimeout = 60; + async.series([ + // Cache 2 ops. + this.doc.create.bind(this.doc, [ 'a' ], typeName), + this.doc.submitOp.bind(this.doc, op1), + function(done) { + expect(this.doc.cachedOps.length).to.equal(2); + expect(this.doc.cachedOps[0].create).to.equal(true); + expect(this.doc.cachedOps[1].op).to.equal(op1); + done(); + }.bind(this), + + // Cache another op before the first 2 expire. + function (callback) { + setTimeout(callback, 30); + }, + this.doc.submitOp.bind(this.doc, op2), + function(done) { + expect(this.doc.cachedOps.length).to.equal(3); + expect(this.doc.cachedOps[0].create).to.equal(true); + expect(this.doc.cachedOps[1].op).to.equal(op1); + expect(this.doc.cachedOps[2].op).to.equal(op2); + done(); + }.bind(this), + + // Cache another op after the first 2 expire. + function (callback) { + setTimeout(callback, 31); + }, + this.doc.submitOp.bind(this.doc, op3), + function(done) { + expect(this.doc.cachedOps.length).to.equal(2); + expect(this.doc.cachedOps[0].op).to.equal(op2); + expect(this.doc.cachedOps[1].op).to.equal(op3); + done(); + }.bind(this) + ], allDone); + }); + + it('requests reply presence when sending presence for the first time', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [ 'a' ], typeName), + this.doc.subscribe.bind(this.doc), + this.doc.submitPresence.bind(this.doc, p(0)), + this.doc2.subscribe.bind(this.doc2), + function(done) { + this.doc2.on('presence', function(srcList, submitted) { + if (srcList[0] === '') { + expect(srcList).to.eql([ '' ]); + expect(submitted).to.equal(true); + expect(this.doc2.presence['']).to.eql(p(1)); + expect(this.doc2.presence).to.not.have.key(this.connection.id); + } else { + expect(srcList).to.eql([ this.connection.id ]); + expect(this.doc2.presence['']).to.eql(p(1)); + expect(this.doc2.presence[this.connection.id]).to.eql(p(0)); + expect(this.doc2.requestReplyPresence).to.equal(false); + done(); + } + }.bind(this)); + this.doc2.submitPresence(p(1), errorHandler(done)); + }.bind(this) + ], allDone); + }); + + it('fails to submit presence for uncreated document: callback(err)', function(allDone) { + async.series([ + this.doc.subscribe.bind(this.doc), + function(done) { + this.doc.submitPresence(p(0), function(err) { + expect(err).to.be.an(Error); + expect(err.code).to.equal(4015); + done(); + }); + }.bind(this) + ], allDone); + }); + + it('fails to submit presence for uncreated document: emit(err)', function(allDone) { + async.series([ + this.doc.subscribe.bind(this.doc), + function(done) { + this.doc.on('error', function(err) { + expect(err).to.be.an(Error); + expect(err.code).to.equal(4015); + done(); + }); + this.doc.submitPresence(p(0)); + }.bind(this) + ], allDone); + }); + + it('fails to submit presence, if type does not support presence: callback(err)', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, {}), + this.doc.subscribe.bind(this.doc), + function(done) { + this.doc.submitPresence(p(0), function(err) { + expect(err).to.be.an(Error); + expect(err.code).to.equal(4024); + done(); + }); + }.bind(this) + ], allDone); + }); + + it('fails to submit presence, if type does not support presence: emit(err)', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, {}), + this.doc.subscribe.bind(this.doc), + function(done) { + this.doc.on('error', function(err) { + expect(err).to.be.an(Error); + expect(err.code).to.equal(4024); + done(); + }); + this.doc.submitPresence(p(0)); + }.bind(this) + ], allDone); + }); + + it('submits null presence', function(allDone) { + async.series([ + this.doc.subscribe.bind(this.doc), + this.doc.submitPresence.bind(this.doc, null) + ], allDone); + }); + + it('sends presence once, if submitted multiple times synchronously', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [ 'a' ], typeName), + this.doc.subscribe.bind(this.doc), + this.doc2.subscribe.bind(this.doc2), + function(done) { + this.doc2.on('presence', function(srcList, submitted) { + expect(srcList).to.eql([ this.connection.id ]); + expect(submitted).to.equal(true); + expect(this.doc2.presence[this.connection.id]).to.eql(p(2)); + done(); + }.bind(this)); + this.doc.requestReplyPresence = false; + this.doc.submitPresence(p(0), errorHandler(done)); + this.doc.submitPresence(p(1), errorHandler(done)); + this.doc.submitPresence(p(2), errorHandler(done)); + }.bind(this) + ], allDone); + }); + + it('buffers presence until subscribed', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [ 'a' ], typeName), + this.doc2.subscribe.bind(this.doc2), + function(done) { + this.doc2.on('presence', function(srcList, submitted) { + expect(srcList).to.eql([ this.connection.id ]); + expect(submitted).to.equal(true); + expect(this.doc2.presence[this.connection.id]).to.eql(p(1)); + done(); + }.bind(this)); + this.doc.requestReplyPresence = false; + this.doc.submitPresence(p(1), errorHandler(done)); + setTimeout(function() { + this.doc.subscribe(function(err) { + if (err) return done(err); + expect(this.doc2.presence).to.eql({}); + }.bind(this)); + }.bind(this)); + }.bind(this) + ], allDone); + }); + + it('buffers presence when disconnected', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [ 'a' ], typeName), + this.doc.subscribe.bind(this.doc), + this.doc2.subscribe.bind(this.doc2), + function(done) { + this.doc2.on('presence', function(srcList, submitted) { + expect(srcList).to.eql([ this.connection.id ]); + expect(submitted).to.equal(true); + expect(this.doc2.presence[this.connection.id]).to.eql(p(1)); + done(); + }.bind(this)); + this.connection.close(); + this.doc.submitPresence(p(1), errorHandler(done)); + process.nextTick(function() { + this.backend.connect(this.connection); + this.doc.requestReplyPresence = false; + }.bind(this)); + }.bind(this) + ], allDone); + }); + + it('submits presence without a callback', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [ 'a' ], typeName), + this.doc.subscribe.bind(this.doc), + this.doc2.subscribe.bind(this.doc2), + function(done) { + this.doc2.on('presence', function(srcList, submitted) { + expect(srcList).to.eql([ this.connection.id ]); + expect(submitted).to.equal(true); + expect(this.doc2.presence[this.connection.id]).to.eql(p(0)); + done(); + }.bind(this)); + this.doc.requestReplyPresence = false; + this.doc.submitPresence(p(0)); + }.bind(this) + ], allDone); + }); + + it('hasPending is true, if there is pending presence', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [ 'a' ], typeName), + this.doc.subscribe.bind(this.doc), + function(done) { + expect(this.doc.hasPending()).to.equal(false); + this.doc.submitPresence(p(0)); + expect(this.doc.hasPending()).to.equal(true); + expect(!!this.doc.pendingPresence).to.equal(true); + expect(!!this.doc.inflightPresence).to.equal(false); + this.doc.whenNothingPending(done); + }.bind(this), + function(done) { + expect(this.doc.hasPending()).to.equal(false); + expect(!!this.doc.pendingPresence).to.equal(false); + expect(!!this.doc.inflightPresence).to.equal(false); + done(); + }.bind(this) + ], allDone); + }); + + it('hasPending is true, if there is inflight presence', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [ 'a' ], typeName), + this.doc.subscribe.bind(this.doc), + function(done) { + expect(this.doc.hasPending()).to.equal(false); + this.doc.submitPresence(p(0)); + expect(this.doc.hasPending()).to.equal(true); + expect(!!this.doc.pendingPresence).to.equal(true); + expect(!!this.doc.inflightPresence).to.equal(false); + process.nextTick(done); + }.bind(this), + function(done) { + expect(this.doc.hasPending()).to.equal(true); + expect(!!this.doc.pendingPresence).to.equal(false); + expect(!!this.doc.inflightPresence).to.equal(true); + this.doc.whenNothingPending(done); + }.bind(this), + function(done) { + expect(this.doc.hasPending()).to.equal(false); + expect(!!this.doc.pendingPresence).to.equal(false); + expect(!!this.doc.inflightPresence).to.equal(false); + done(); + }.bind(this) + ], allDone); + }); + + it('receives presence after doc is deleted', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [ 'a' ], typeName), + this.doc.subscribe.bind(this.doc), + this.doc2.subscribe.bind(this.doc2), + this.doc.submitPresence.bind(this.doc, p(0)), + setTimeout, + function(done) { + expect(this.doc2.presence[this.connection.id]).to.eql(p(0)); + this.doc2.on('presence', function(srcList, submitted) { + expect(srcList).to.eql([ this.connection.id ]); + // The call to `del` transforms the presence and fires the event. + // The call to `submitPresence` does not fire the event because presence is already null. + expect(submitted).to.equal(false); + expect(this.doc2.presence).to.not.have.key(this.connection.id); + done(); + }.bind(this)); + this.doc.requestReplyPresence = false; + this.doc.submitPresence(p(1), errorHandler(done)); + this.doc2.del(errorHandler(done)); + }.bind(this) + ], allDone); + }); + + it('clears peer presence on peer disconnection', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [ 'a' ], typeName), + this.doc.subscribe.bind(this.doc), + this.doc2.subscribe.bind(this.doc2), + this.doc.submitPresence.bind(this.doc, p(0)), + this.doc2.submitPresence.bind(this.doc2, p(1)), + setTimeout, + function(done) { + expect(this.doc.presence['']).to.eql(p(0)); + expect(this.doc.presence[this.connection2.id]).to.eql(p(1)); + expect(this.doc2.presence[this.connection.id]).to.eql(p(0)); + expect(this.doc2.presence['']).to.eql(p(1)); + + var connectionId = this.connection.id; + this.doc2.on('presence', function(srcList, submitted) { + expect(srcList).to.eql([ connectionId ]); + expect(submitted).to.equal(true); + expect(this.doc2.presence).to.not.have.key(connectionId); + expect(this.doc2.presence['']).to.eql(p(1)); + done(); + }.bind(this)); + this.connection.close(); + }.bind(this) + ], allDone); + }); + + it('clears peer presence on own disconnection', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [ 'a' ], typeName), + this.doc.subscribe.bind(this.doc), + this.doc2.subscribe.bind(this.doc2), + this.doc.submitPresence.bind(this.doc, p(0)), + this.doc2.submitPresence.bind(this.doc2, p(1)), + setTimeout, + function(done) { + expect(this.doc.presence['']).to.eql(p(0)); + expect(this.doc.presence[this.connection2.id]).to.eql(p(1)); + expect(this.doc2.presence[this.connection.id]).to.eql(p(0)); + expect(this.doc2.presence['']).to.eql(p(1)); + + var connectionId = this.connection.id; + this.doc2.on('presence', function(srcList, submitted) { + expect(srcList).to.eql([ connectionId ]); + expect(submitted).to.equal(false); + expect(this.doc2.presence).to.not.have.key(connectionId); + expect(this.doc2.presence['']).to.eql(p(1)); + done(); + }.bind(this)); + this.connection2.close(); + }.bind(this) + ], allDone); + }); + + it('clears peer presence on peer unsubscribe', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [ 'a' ], typeName), + this.doc.subscribe.bind(this.doc), + this.doc2.subscribe.bind(this.doc2), + this.doc.submitPresence.bind(this.doc, p(0)), + this.doc2.submitPresence.bind(this.doc2, p(1)), + setTimeout, + function(done) { + expect(this.doc.presence['']).to.eql(p(0)); + expect(this.doc.presence[this.connection2.id]).to.eql(p(1)); + expect(this.doc2.presence[this.connection.id]).to.eql(p(0)); + expect(this.doc2.presence['']).to.eql(p(1)); + + var connectionId = this.connection.id; + this.doc2.on('presence', function(srcList, submitted) { + expect(srcList).to.eql([ connectionId ]); + expect(submitted).to.equal(true); + expect(this.doc2.presence).to.not.have.key(connectionId); + expect(this.doc2.presence['']).to.eql(p(1)); + done(); + }.bind(this)); + this.doc.unsubscribe(errorHandler(done)); + }.bind(this) + ], allDone); + }); + + it('clears peer presence on own unsubscribe', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [ 'a' ], typeName), + this.doc.subscribe.bind(this.doc), + this.doc2.subscribe.bind(this.doc2), + this.doc.submitPresence.bind(this.doc, p(0)), + this.doc2.submitPresence.bind(this.doc2, p(1)), + setTimeout, + function(done) { + expect(this.doc.presence['']).to.eql(p(0)); + expect(this.doc.presence[this.connection2.id]).to.eql(p(1)); + expect(this.doc2.presence[this.connection.id]).to.eql(p(0)); + expect(this.doc2.presence['']).to.eql(p(1)); + + var connectionId = this.connection.id; + this.doc2.on('presence', function(srcList, submitted) { + expect(srcList).to.eql([ connectionId ]); + expect(submitted).to.equal(false); + expect(this.doc2.presence).to.not.have.key(connectionId); + expect(this.doc2.presence['']).to.eql(p(1)); + done(); + }.bind(this)); + this.doc2.unsubscribe(errorHandler(done)); + }.bind(this) + ], allDone); + }); + + it('pauses inflight and pending presence on disconnect', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [ 'a' ], typeName), + this.doc.subscribe.bind(this.doc), + function(done) { + var called = 0; + function callback(err) { + if (err) return done(err); + if (++called === 2) done(); + } + this.doc.submitPresence(p(0), callback); + process.nextTick(function() { + this.doc.submitPresence(p(1), callback); + this.connection.close(); + process.nextTick(function() { + this.backend.connect(this.connection); + }.bind(this)); + }.bind(this)); + }.bind(this) + ], allDone); + }); + + it('pauses inflight and pending presence on unsubscribe', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [ 'a' ], typeName), + this.doc.subscribe.bind(this.doc), + function(done) { + var called = 0; + function callback(err) { + if (err) return done(err); + if (++called === 2) done(); + } + this.doc.submitPresence(p(0), callback); + process.nextTick(function() { + this.doc.submitPresence(p(1), callback); + this.doc.unsubscribe(errorHandler(done)); + process.nextTick(function() { + this.doc.subscribe(errorHandler(done)); + }.bind(this)); + }.bind(this)); + }.bind(this) + ], allDone); + }); + + it('re-synchronizes presence after reconnecting', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [ 'a' ], typeName), + this.doc.subscribe.bind(this.doc), + this.doc2.subscribe.bind(this.doc2), + this.doc.submitPresence.bind(this.doc, p(0)), + this.doc2.submitPresence.bind(this.doc2, p(1)), + setTimeout, + function(done) { + expect(this.doc.presence['']).to.eql(p(0)); + expect(this.doc.presence[this.connection2.id]).to.eql(p(1)); + this.connection.close(); + expect(this.doc.presence['']).to.eql(p(0)); + expect(this.doc.presence).to.not.have.key(this.connection2.id); + this.backend.connect(this.connection); + process.nextTick(done); + }.bind(this), + setTimeout, // wait for re-sync + function(done) { + expect(this.doc.presence['']).to.eql(p(0)); + expect(this.doc.presence[this.connection2.id]).to.eql(p(1)); + process.nextTick(done); + }.bind(this) + ], allDone); + }); + + it('re-synchronizes presence after resubscribing', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [ 'a' ], typeName), + this.doc.subscribe.bind(this.doc), + this.doc2.subscribe.bind(this.doc2), + this.doc.submitPresence.bind(this.doc, p(0)), + this.doc2.submitPresence.bind(this.doc2, p(1)), + setTimeout, + function(done) { + expect(this.doc.presence['']).to.eql(p(0)); + expect(this.doc.presence[this.connection2.id]).to.eql(p(1)); + this.doc.unsubscribe(errorHandler(done)); + expect(this.doc.presence['']).to.eql(p(0)); + expect(this.doc.presence).to.not.have.key(this.connection2.id); + this.doc.subscribe(done); + }.bind(this), + setTimeout, // wait for re-sync + function(done) { + expect(this.doc.presence['']).to.eql(p(0)); + expect(this.doc.presence[this.connection2.id]).to.eql(p(1)); + process.nextTick(done); + }.bind(this) + ], allDone); + }); + + it('transforms received presence against inflight and pending ops (presence.index < op.index)', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [ 'a' ], typeName), + this.doc.subscribe.bind(this.doc), + this.doc2.subscribe.bind(this.doc2), + function(done) { + this.doc2.on('presence', function(srcList, submitted) { + expect(srcList).to.eql([ this.connection.id ]); + expect(submitted).to.equal(true); + expect(this.doc2.presence[this.connection.id]).to.eql(p(0)); + done(); + }.bind(this)); + this.doc.requestReplyPresence = false; + this.doc.submitPresence(p(0), errorHandler(done)); + this.doc2.submitOp({ index: 1, value: 'b' }, errorHandler(done)) + this.doc2.submitOp({ index: 2, value: 'c' }, errorHandler(done)) + }.bind(this) + ], allDone); + }); + + it('transforms received presence against inflight and pending ops (presence.index === op.index)', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [ 'a' ], typeName), + this.doc.subscribe.bind(this.doc), + this.doc2.subscribe.bind(this.doc2), + function(done) { + this.doc2.on('presence', function(srcList, submitted) { + expect(srcList).to.eql([ this.connection.id ]); + expect(submitted).to.equal(true); + expect(this.doc2.presence[this.connection.id]).to.eql(p(1)); + done(); + }.bind(this)); + this.doc.requestReplyPresence = false; + this.doc.submitPresence(p(1), errorHandler(done)); + this.doc2.submitOp({ index: 1, value: 'c' }, errorHandler(done)) + this.doc2.submitOp({ index: 1, value: 'b' }, errorHandler(done)) + }.bind(this) + ], allDone); + }); + + it('transforms received presence against inflight and pending ops (presence.index > op.index)', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [ 'c' ], typeName), + this.doc.subscribe.bind(this.doc), + this.doc2.subscribe.bind(this.doc2), + function(done) { + this.doc2.on('presence', function(srcList, submitted) { + expect(srcList).to.eql([ this.connection.id ]); + expect(submitted).to.equal(true); + expect(this.doc2.presence[this.connection.id]).to.eql(p(3)); + done(); + }.bind(this)); + this.doc.requestReplyPresence = false; + this.doc.submitPresence(p(1), errorHandler(done)); + this.doc2.submitOp({ index: 0, value: 'b' }, errorHandler(done)) + this.doc2.submitOp({ index: 0, value: 'a' }, errorHandler(done)) + }.bind(this) + ], allDone); + }); + + it('transforms received presence against inflight delete', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [ 'c' ], typeName), + this.doc.subscribe.bind(this.doc), + this.doc2.subscribe.bind(this.doc2), + this.doc.submitPresence.bind(this.doc, p(1)), + setTimeout, + function(done) { + this.doc2.on('presence', function(srcList, submitted) { + expect(srcList).to.eql([ this.connection.id ]); + // The call to `del` transforms the presence and fires the event. + // The call to `submitPresence` does not fire the event because presence is already null. + expect(submitted).to.equal(false); + expect(this.doc2.presence).to.not.have.key(this.connection.id); + done(); + }.bind(this)); + this.doc.requestReplyPresence = false; + this.doc.submitPresence(p(2), errorHandler(done)); + this.doc2.del(errorHandler(done)); + this.doc2.create([ 'c' ], typeName, errorHandler(done)); + }.bind(this) + ], allDone); + }); + + it('transforms received presence against a pending delete', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [ 'c' ], typeName), + this.doc.subscribe.bind(this.doc), + this.doc2.subscribe.bind(this.doc2), + this.doc.submitPresence.bind(this.doc, p(1)), + setTimeout, + function(done) { + var firstCall = true; + this.doc2.on('presence', function(srcList, submitted) { + if (firstCall) return firstCall = false; + expect(srcList).to.eql([ this.connection.id ]); + // The call to `del` transforms the presence and fires the event. + // The call to `submitPresence` does not fire the event because presence is already null. + expect(submitted).to.equal(false); + expect(this.doc2.presence).to.not.have.key(this.connection.id); + done(); + }.bind(this)); + this.doc.requestReplyPresence = false; + this.doc.submitPresence(p(2), errorHandler(done)); + this.doc2.submitOp({ index: 0, value: 'b' }, errorHandler(done)); + this.doc2.del(errorHandler(done)); + this.doc2.create([ 'c' ], typeName, errorHandler(done)); + }.bind(this) + ], allDone); + }); + + it('emits the same presence only if comparePresence is not implemented (local presence)', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [ 'c' ], typeName), + this.doc.subscribe.bind(this.doc), + this.doc.submitPresence.bind(this.doc, p(1)), + function(done) { + this.doc.on('presence', function(srcList, submitted) { + if (typeName === 'wrapped-presence-no-compare') { + expect(srcList).to.eql([ '' ]); + expect(submitted).to.equal(true); + expect(this.doc.presence['']).to.eql(p(1)); + done(); + } else { + done(new Error('Unexpected presence event')); + } + }.bind(this)); + this.doc.submitPresence(p(1), typeName === 'wrapped-presence-no-compare' ? errorHandler(done) : done); + }.bind(this) + ], allDone); + }); + + it('emits the same presence only if comparePresence is not implemented (non-local presence)', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [ 'c' ], typeName), + this.doc.subscribe.bind(this.doc), + this.doc2.subscribe.bind(this.doc2), + this.doc.submitPresence.bind(this.doc, p(1)), + setTimeout, + function(done) { + this.doc2.on('presence', function(srcList, submitted) { + if (typeName === 'wrapped-presence-no-compare') { + expect(srcList).to.eql([ this.connection.id ]); + expect(submitted).to.equal(true); + expect(this.doc2.presence[this.connection.id]).to.eql(p(1)); + done(); + } else { + done(new Error('Unexpected presence event')); + } + }.bind(this)); + this.doc.submitPresence(p(1), typeName === 'wrapped-presence-no-compare' ? errorHandler(done) : done); + }.bind(this) + ], allDone); + }); + + it('returns an error when not subscribed on the server', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [ 'c' ], typeName), + this.doc.subscribe.bind(this.doc), + function(done) { + this.connection.sendUnsubscribe(this.doc); + process.nextTick(done); + }.bind(this), + function(done) { + this.doc.on('error', done); + this.doc.submitPresence(p(0), function(err) { + expect(err).to.be.an(Error); + expect(err.code).to.equal(4025); + done(); + }.bind(this)); + }.bind(this) + ], allDone); + }); + + it('emits an error when not subscribed on the server and no callback is provided', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [ 'c' ], typeName), + this.doc.subscribe.bind(this.doc), + function(done) { + this.connection.sendUnsubscribe(this.doc); + process.nextTick(done); + }.bind(this), + function(done) { + this.doc.on('error', function(err) { + expect(err).to.be.an(Error); + expect(err.code).to.equal(4025); + done(); + }.bind(this)); + this.doc.submitPresence(p(0)); + }.bind(this) + ], allDone); + }); + + it('returns an error when the server gets an old sequence number', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [ 'c' ], typeName), + this.doc.subscribe.bind(this.doc), + this.doc.submitPresence.bind(this.doc, p(0)), + setTimeout, + function(done) { + this.doc.on('error', done); + this.connection.seq--; + this.doc.submitPresence(p(1), function(err) { + expect(err).to.be.an(Error); + expect(err.code).to.equal(4026); + done(); + }.bind(this)); + }.bind(this) + ], allDone); + }); + + it('emits an error when the server gets an old sequence number and no callback is provided', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [ 'c' ], typeName), + this.doc.subscribe.bind(this.doc), + this.doc.submitPresence.bind(this.doc, p(0)), + setTimeout, + function(done) { + this.doc.on('error', function(err) { + expect(err).to.be.an(Error); + expect(err.code).to.equal(4026); + done(); + }.bind(this)); + this.connection.seq--; + this.doc.submitPresence(p(1)); + }.bind(this) + ], allDone); + }); + + it('does not publish presence unnecessarily', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [ 'c' ], typeName), + this.doc.subscribe.bind(this.doc), + this.doc.submitPresence.bind(this.doc, p(0)), + setTimeout, + function(done) { + this.doc.on('error', done); + // Decremented sequence number would cause the server to return an error, however, + // the message won't be sent to the server at all because the presence data has not changed. + this.connection.seq--; + this.doc.submitPresence(p(0), function(err) { + if (typeName === 'wrapped-presence-no-compare') { + // The OT type does not support comparing presence. + expect(err).to.be.an(Error); + expect(err.code).to.equal(4026); + } else { + expect(err).to.not.be.ok(); + } + done(); + }.bind(this)); + }.bind(this) + ], allDone); + }); + + it('does not publish presence unnecessarily when no callback is provided', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [ 'c' ], typeName), + this.doc.subscribe.bind(this.doc), + this.doc.submitPresence.bind(this.doc, p(0)), + setTimeout, + function(done) { + this.doc.on('error', function(err) { + if (typeName === 'wrapped-presence-no-compare') { + // The OT type does not support comparing presence. + expect(err).to.be.an(Error); + expect(err.code).to.equal(4026); + done(); + } else { + done(err); + } + }.bind(this)); + // Decremented sequence number would cause the server to return an error, however, + // the message won't be sent to the server at all because the presence data has not changed. + this.connection.seq--; + this.doc.submitPresence(p(0)); + if (typeName !== 'wrapped-presence-no-compare') { + process.nextTick(done); + } + }.bind(this) + ], allDone); + }); + + it('returns an error when publishing presence fails', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [ 'c' ], typeName), + this.doc.subscribe.bind(this.doc), + setTimeout, + function(done) { + var sendPresence = this.backend.sendPresence; + this.backend.sendPresence = function(presence, callback) { + if (presence.a === 'p' && presence.v != null) { + return callback(new ShareDBError(-1, 'Test publishing error')); + } + sendPresence.apply(this, arguments); + }; + this.doc.on('error', done); + this.doc.submitPresence(p(0), function(err) { + expect(err).to.be.an(Error); + expect(err.code).to.equal(-1); + done(); + }.bind(this)); + }.bind(this) + ], allDone); + }); + + it('emits an error when publishing presence fails and no callback is provided', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [ 'c' ], typeName), + this.doc.subscribe.bind(this.doc), + setTimeout, + function(done) { + var sendPresence = this.backend.sendPresence; + this.backend.sendPresence = function(presence, callback) { + if (presence.a === 'p' && presence.v != null) { + return callback(new ShareDBError(-1, 'Test publishing error')); + } + sendPresence.apply(this, arguments); + }; + this.doc.on('error', function(err) { + expect(err).to.be.an(Error); + expect(err.code).to.equal(-1); + done(); + }.bind(this)); + this.doc.submitPresence(p(0)); + }.bind(this) + ], allDone); + }); + + it('clears presence on hard rollback and emits an error', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [ 'a', 'b', 'c' ], typeName), + this.doc.subscribe.bind(this.doc), + this.doc2.subscribe.bind(this.doc2), + this.doc.submitPresence.bind(this.doc, p(0)), + this.doc2.submitPresence.bind(this.doc2, p(0)), + setTimeout, + function(done) { + // A hack to allow testing of hard rollback of both inflight and pending presence. + var doc = this.doc; + var _handlePresence = this.doc._handlePresence; + this.doc._handlePresence = function(err, presence) { + setTimeout(function() { + _handlePresence.call(doc, err, presence); + }); + }; + process.nextTick(done); + }.bind(this), + this.doc.submitPresence.bind(this.doc, p(1)), // inflightPresence + process.nextTick, // wait for "presence" event + this.doc.submitPresence.bind(this.doc, p(2)), // pendingPresence + process.nextTick, // wait for "presence" event + function(done) { + var presenceEmitted = false; + this.doc.on('presence', function(srcList, submitted) { + expect(presenceEmitted).to.equal(false); + presenceEmitted = true; + expect(srcList.sort()).to.eql([ '', this.connection2.id ]); + expect(submitted).to.equal(false); + expect(this.doc.presence).to.not.have.key(''); + expect(this.doc.presence).to.not.have.key(this.connection2.id); + }.bind(this)); + + this.doc.on('error', function(err) { + expect(presenceEmitted).to.equal(true); + expect(err).to.be.an(Error); + expect(err.code).to.equal(4000); + done(); + }.bind(this)); + + // send an invalid op + this.doc._submit({}, null); + }.bind(this) + ], allDone); + }); + + it('clears presence on hard rollback and executes all callbacks', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [ 'a', 'b', 'c' ], typeName), + this.doc.subscribe.bind(this.doc), + this.doc2.subscribe.bind(this.doc2), + this.doc.submitPresence.bind(this.doc, p(0)), + this.doc2.submitPresence.bind(this.doc2, p(0)), + setTimeout, + function(done) { + // A hack to allow testing of hard rollback of both inflight and pending presence. + var doc = this.doc; + var _handlePresence = this.doc._handlePresence; + this.doc._handlePresence = function(err, presence) { + setTimeout(function() { + _handlePresence.call(doc, err, presence); + }); + }; + process.nextTick(done); + }.bind(this), + function(done) { + var presenceEmitted = false; + var called = 0; + function callback(err) { + expect(presenceEmitted).to.equal(true); + expect(err).to.be.an(Error); + expect(err.code).to.equal(4000); + if (++called < 3) return; + done(); + } + this.doc.submitPresence(p(1), callback); // inflightPresence + process.nextTick(function() { // wait for presence event + this.doc.submitPresence(p(2), callback); // pendingPresence + process.nextTick(function() { // wait for presence event + this.doc.on('presence', function(srcList, submitted) { + expect(presenceEmitted).to.equal(false); + presenceEmitted = true; + expect(srcList.sort()).to.eql([ '', this.connection2.id ]); + expect(submitted).to.equal(false); + expect(this.doc.presence).to.not.have.key(''); + expect(this.doc.presence).to.not.have.key(this.connection2.id); + }.bind(this)); + this.doc.on('error', done); + + // send an invalid op + this.doc._submit({ index: 3, value: 'b' }, null, callback); + }.bind(this)); + }.bind(this)); + }.bind(this) + ], allDone); + }); + + function testReceivedMessageExpiry(expireCache, reduceSequence) { + return function(allDone) { + var lastPresence = null; + var handleMessage = this.connection.handleMessage; + this.connection.handleMessage = function(message) { + if (message.a === 'p' && message.src) { + lastPresence = JSON.parse(JSON.stringify(message)); + } + return handleMessage.apply(this, arguments); + }; + if (expireCache) { + this.doc.receivedPresenceTimeout = 0; + } + async.series([ + this.doc.create.bind(this.doc, [ 'a' ], typeName), + this.doc.subscribe.bind(this.doc), + this.doc2.subscribe.bind(this.doc2), + function(done) { + this.doc2.requestReplyPresence = false; + this.doc2.submitPresence(p(0), done); + }.bind(this), + setTimeout, + this.doc2.submitOp.bind(this.doc2, { index: 1, value: 'b' }), // forces processing of all received presence + setTimeout, + function(done) { + expect(this.doc.data).to.eql([ 'a', 'b' ]); + expect(this.doc.presence[this.connection2.id]).to.eql(p(0)); + // Replay the `lastPresence` with modified payload. + lastPresence.p = p(1); + lastPresence.v++; // +1 to account for the op above + if (reduceSequence) { + lastPresence.seq--; + } + this.connection.handleMessage(lastPresence); + process.nextTick(done); + }.bind(this), + function(done) { + expect(this.doc.presence[this.connection2.id]).to.eql(expireCache ? p(1) : p(0)); + process.nextTick(done); + }.bind(this) + ], allDone); + }; + } + + it('ignores an old message (cache not expired, presence.seq === cachedPresence.seq)', testReceivedMessageExpiry(false, false)); + it('ignores an old message (cache not expired, presence.seq < cachedPresence.seq)', testReceivedMessageExpiry(false, true)); + it('processes an old message (cache expired, presence.seq === cachedPresence.seq)', testReceivedMessageExpiry(true, false)); + it('processes an old message (cache expired, presence.seq < cachedPresence.seq)', testReceivedMessageExpiry(true, true)); + }); +}); diff --git a/test/client/submit.js b/test/client/submit.js index 4e508e66..4334b57e 100644 --- a/test/client/submit.js +++ b/test/client/submit.js @@ -1044,6 +1044,39 @@ describe('client submit', function() { }); }); + it('hasWritePending is false when create\'s callback is executed', function(done) { + var doc = this.backend.connect().get('dogs', 'fido'); + doc.create({age: 3}, function(err) { + if (err) return done(err); + expect(doc.hasWritePending()).equal(false); + done(); + }); + }); + + it('hasWritePending is false when submimtOp\'s callback is executed', function(done) { + var doc = this.backend.connect().get('dogs', 'fido'); + doc.create({age: 3}, function(err) { + if (err) return done(err); + doc.submitOp({p: ['age'], na: 2}, function(err) { + if (err) return done(err); + expect(doc.hasWritePending()).equal(false); + done(); + }); + }); + }); + + it('hasWritePending is false when del\'s callback is executed', function(done) { + var doc = this.backend.connect().get('dogs', 'fido'); + doc.create({age: 3}, function(err) { + if (err) return done(err); + doc.del(function(err) { + if (err) return done(err); + expect(doc.hasWritePending()).equal(false); + done(); + }); + }); + }); + describe('type.deserialize', function() { it('can create a new doc', function(done) { var doc = this.backend.connect().get('dogs', 'fido'); diff --git a/test/client/subscribe.js b/test/client/subscribe.js index 567031d0..b24a9474 100644 --- a/test/client/subscribe.js +++ b/test/client/subscribe.js @@ -405,23 +405,37 @@ describe('client subscribe', function() { }); it('doc destroy stops op updates', function(done) { - var doc = this.backend.connect().get('dogs', 'fido'); - var doc2 = this.backend.connect().get('dogs', 'fido'); + var connection1 = this.backend.connect(); + var connection2 = this.backend.connect(); + var doc = connection1.get('dogs', 'fido'); + var doc2 = connection2.get('dogs', 'fido'); doc.create({age: 3}, function(err) { if (err) return done(err); doc2.subscribe(function(err) { if (err) return done(err); doc2.on('op', function(op, context) { - done(); + done(new Error('Should not get op event')); }); doc2.destroy(function(err) { if (err) return done(err); + expect(connection2.getExisting('dogs', 'fido')).equal(undefined); doc.submitOp({p: ['age'], na: 1}, done); }); }); }); }); + it('doc destroy removes doc from connection when doc is not subscribed', function(done) { + var connection = this.backend.connect(); + var doc = connection.get('dogs', 'fido'); + expect(connection.getExisting('dogs', 'fido')).equal(doc); + doc.destroy(function(err) { + if (err) return done(err); + expect(connection.getExisting('dogs', 'fido')).equal(undefined); + done(); + }); + }); + it('bulk unsubscribe stops op updates', function(done) { var connection = this.backend.connect(); var connection2 = this.backend.connect(); diff --git a/test/util.js b/test/util.js index 2bb7e111..36233758 100644 --- a/test/util.js +++ b/test/util.js @@ -15,6 +15,12 @@ exports.pluck = function(docs, key) { return values; }; +exports.errorHandler = function(callback) { + return function(err) { + if (err) callback(err); + }; +}; + // Wrap a done function to call back only after a specified number of calls. // For example, `var callbackAfter = callAfter(1, callback)` means that if // `callbackAfter` is called once, it won't call back. If it is called twice