From 628de990e858439c1f3dbff4f65f95ef9a70d0a2 Mon Sep 17 00:00:00 2001 From: Dawid Kisielewski Date: Mon, 30 Jan 2023 12:22:09 +0100 Subject: [PATCH] =?UTF-8?q?=E2=9C=A8=20Add=20multi=20channel=20query=20sub?= =?UTF-8?q?scription.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit At the moment we can only specify one channel the specific query, would be able to listen to, as per docs: ```javascript backend.use('query', (context, next) => { // Set our query to only listen for changes on our user-specific channel context.channel = userChannel(context) next() }) ``` However let's imagine the situation where the user wants to query all the posts, the where posted by him and all his friends. Now we would need new query every for friend separately to make sure the proper scalability is preserved and we do not receive all the changes to posts collection. This change allows to listen for multiple channels, so if we want to query all user friends posts. We can do it by this: ```javascript backend.use('query', (context, next) => { // Set our query to only listen for changes on our user-specific channel context.channels = [userChannel(context), friendChannel(context))] next() }) ``` Now this query would only listen to all the changes that were made to the user posts and his friends. --- docs/middleware/actions.md | 8 +- docs/queries.md | 2 +- lib/backend.js | 49 +- lib/error.js | 1 + lib/query-emitter.js | 57 +- package.json | 2 +- test/client/presence/presence.js | 25 - test/client/query-subscribe.js | 1274 ++++++++++++++++++------------ test/logger.js | 4 - test/setup.js | 5 + 10 files changed, 872 insertions(+), 555 deletions(-) diff --git a/docs/middleware/actions.md b/docs/middleware/actions.md index 0b9ab4965..bcd8ba71a 100644 --- a/docs/middleware/actions.md +++ b/docs/middleware/actions.md @@ -149,9 +149,13 @@ This action has these additional `context` properties: > The query's projection [fields]({{ site.baseurl }}{% link api/backend.md %}#addprojection) -`channel` -- string +`channel` -- string (deprecated) -> The [Pub/Sub]({{ site.baseurl }}{% link adapters/pub-sub.md %}) channel the query will subscribe to. Defaults to its collection channel. +> This property is deprecated use `channels` instead. The [Pub/Sub]({{ site.baseurl }}{% link adapters/pub-sub.md %}) channels the query will subscribe to. Defaults to its collection channel. + +`channels` -- string[] + +> The [Pub/Sub]({{ site.baseurl }}{% link adapters/pub-sub.md %}) channels the query will subscribe to. Defaults to its collection channel. `query` -- Object diff --git a/docs/queries.md b/docs/queries.md index 930ef43b2..bd7d9d3b6 100644 --- a/docs/queries.md +++ b/docs/queries.md @@ -160,7 +160,7 @@ backend.use('commit', (context, next) => { backend.use('query', (context, next) => { // Set our query to only listen for changes on our user-specific channel - context.channel = userChannel(context) + context.channels = [userChannel(context)] next() }) diff --git a/lib/backend.js b/lib/backend.js index 0a4f6a884..08cfa3480 100644 --- a/lib/backend.js +++ b/lib/backend.js @@ -661,10 +661,32 @@ Backend.prototype.querySubscribe = function(agent, index, query, options, callba 'DB does not support subscribe' )); } - backend.pubsub.subscribe(request.channel, function(err, stream) { - if (err) return callback(err); + + var channels = request.channels; + + if (request.channel) { + logger.warn( + '[DEPRECATED] "query" middleware\'s context.channel is deprecated, use context.channels instead. ' + + 'Read more: https://share.github.io/sharedb/middleware/actions#query' + ); + channels = [request.channel]; + } + + if (!channels || !channels.length) { + return callback(new ShareDBError(ERROR_CODE.ERR_QUERY_CHANNEL_MISSING, 'Required minimum one query channel.')); + } + + var streams = []; + + function destroyStreams() { + streams.forEach(function(stream) { + stream.destroy(); + }); + } + + function createQueryEmitter() { if (options.ids) { - var queryEmitter = new QueryEmitter(request, stream, options.ids); + var queryEmitter = new QueryEmitter(request, streams, options.ids); backend.emit('timing', 'querySubscribe.reconnect', Date.now() - start, request); callback(null, queryEmitter); return; @@ -672,14 +694,29 @@ Backend.prototype.querySubscribe = function(agent, index, query, options, callba // Issue query on db to get our initial results backend._query(agent, request, function(err, snapshots, extra) { if (err) { - stream.destroy(); + destroyStreams(); return callback(err); } var ids = pluckIds(snapshots); - var queryEmitter = new QueryEmitter(request, stream, ids, extra); + var queryEmitter = new QueryEmitter(request, streams, ids, extra); backend.emit('timing', 'querySubscribe.initial', Date.now() - start, request); callback(null, queryEmitter, snapshots, extra); }); + } + + channels.forEach(function(channel) { + backend.pubsub.subscribe(channel, function(err, stream) { + if (err) { + destroyStreams(); + return callback(err); + } + streams.push(stream); + + var subscribedToAllChannels = streams.length === channels.length; + if (subscribedToAllChannels) { + createQueryEmitter(); + } + }); }); }); }; @@ -693,7 +730,7 @@ Backend.prototype._triggerQuery = function(agent, index, query, options, callbac collection: collection, projection: projection, fields: fields, - channel: this.getCollectionChannel(collection), + channels: [this.getCollectionChannel(collection)], query: query, options: options, db: null, diff --git a/lib/error.js b/lib/error.js index 929684643..a93c84672 100644 --- a/lib/error.js +++ b/lib/error.js @@ -45,6 +45,7 @@ ShareDBError.CODES = { ERR_OT_OP_NOT_PROVIDED: 'ERR_OT_OP_NOT_PROVIDED', ERR_PRESENCE_TRANSFORM_FAILED: 'ERR_PRESENCE_TRANSFORM_FAILED', ERR_PROTOCOL_VERSION_NOT_SUPPORTED: 'ERR_PROTOCOL_VERSION_NOT_SUPPORTED', + ERR_QUERY_CHANNEL_MISSING: 'ERR_QUERY_CHANNEL_MISSING', ERR_QUERY_EMITTER_LISTENER_NOT_ASSIGNED: 'ERR_QUERY_EMITTER_LISTENER_NOT_ASSIGNED', /** * A special error that a "readSnapshots" middleware implementation can use to indicate that it diff --git a/lib/query-emitter.js b/lib/query-emitter.js index 79877bc31..cfbd930af 100644 --- a/lib/query-emitter.js +++ b/lib/query-emitter.js @@ -5,7 +5,7 @@ var util = require('./util'); var ERROR_CODE = ShareDBError.CODES; -function QueryEmitter(request, stream, ids, extra) { +function QueryEmitter(request, streams, ids, extra) { this.backend = request.backend; this.agent = request.agent; this.db = request.db; @@ -15,7 +15,7 @@ function QueryEmitter(request, stream, ids, extra) { this.fields = request.fields; this.options = request.options; this.snapshotProjection = request.snapshotProjection; - this.stream = stream; + this.streams = streams; this.ids = ids; this.extra = extra; @@ -23,10 +23,12 @@ function QueryEmitter(request, stream, ids, extra) { this.canPollDoc = this.db.canPollDoc(this.collection, this.query); this.pollDebounce = (typeof this.options.pollDebounce === 'number') ? this.options.pollDebounce : - (typeof this.db.pollDebounce === 'number') ? this.db.pollDebounce : 0; + (typeof this.db.pollDebounce === 'number') ? this.db.pollDebounce : + streams.length > 1 ? 1000 : 0; this.pollInterval = (typeof this.options.pollInterval === 'number') ? this.options.pollInterval : - (typeof this.db.pollInterval === 'number') ? this.db.pollInterval : 0; + (typeof this.db.pollInterval === 'number') ? this.db.pollInterval : + streams.length > 1 ? 1000 : 0; this._polling = false; this._pendingPoll = null; @@ -41,15 +43,19 @@ QueryEmitter.prototype._open = function() { this._defaultCallback = function(err) { if (err) emitter.onError(err); }; - emitter.stream.on('data', function(data) { - if (data.error) { - return emitter.onError(data.error); - } - emitter._update(data); - }); - emitter.stream.on('end', function() { - emitter.destroy(); + + emitter.streams.forEach(function(stream) { + stream.on('data', function(data) { + if (data.error) { + return emitter.onError(data.error); + } + emitter._update(data); + }); + stream.on('end', function() { + emitter.destroy(); + }); }); + // Make sure we start polling if pollInterval is being used this._flushPoll(); }; @@ -57,7 +63,12 @@ QueryEmitter.prototype._open = function() { QueryEmitter.prototype.destroy = function() { clearTimeout(this._pollDebounceId); clearTimeout(this._pollIntervalId); - this.stream.destroy(); + + var stream; + + while (stream = this.streams.pop()) { + stream.destroy(); + } }; QueryEmitter.prototype._emitTiming = function(action, start) { @@ -140,8 +151,8 @@ QueryEmitter.prototype._flushPoll = function() { if (this._pendingPoll) { this.queryPoll(); - // If a pollInterval is specified, poll if the query doesn't get polled in - // the time of the interval + // If a pollInterval is specified, poll if the query doesn't get polled in + // the time of the interval } else if (this.pollInterval) { var emitter = this; this._pollIntervalId = setTimeout(function() { @@ -301,14 +312,14 @@ QueryEmitter.prototype.queryPollDoc = function(id, callback) { // all messages are received and applied in order, so it is critical that none // are dropped. QueryEmitter.prototype.onError = -QueryEmitter.prototype.onDiff = -QueryEmitter.prototype.onExtra = -QueryEmitter.prototype.onOp = function() { - throw new ShareDBError( - ERROR_CODE.ERR_QUERY_EMITTER_LISTENER_NOT_ASSIGNED, - 'Required QueryEmitter listener not assigned' - ); -}; + QueryEmitter.prototype.onDiff = + QueryEmitter.prototype.onExtra = + QueryEmitter.prototype.onOp = function() { + throw new ShareDBError( + ERROR_CODE.ERR_QUERY_EMITTER_LISTENER_NOT_ASSIGNED, + 'Required QueryEmitter listener not assigned' + ); + }; function getInserted(diff) { var inserted = []; diff --git a/package.json b/package.json index e0c05cbaa..fe70b7b9a 100644 --- a/package.json +++ b/package.json @@ -22,7 +22,7 @@ "ot-json1": "^0.3.0", "rich-text": "^4.1.0", "sharedb-legacy": "npm:sharedb@=1.1.0", - "sinon": "^7.5.0" + "sinon": "^9.2.4" }, "files": [ "lib/", diff --git a/test/client/presence/presence.js b/test/client/presence/presence.js index c2a59c495..f7a52762b 100644 --- a/test/client/presence/presence.js +++ b/test/client/presence/presence.js @@ -38,7 +38,6 @@ describe('Presence', function() { }); afterEach(function(done) { - sinon.restore(); connection1.close(); connection2.close(); backend.close(done); @@ -147,30 +146,6 @@ describe('Presence', function() { ], errorHandler(done)); }); - it('destroys old local presence but keeps new local presence when getting during destroy', function(done) { - presence2.create('presence-2'); - var presence2a; - - async.series([ - presence2.subscribe.bind(presence2), - function(next) { - presence2.destroy(function() { - expect(presence2).to.equal(presence2a); - expect(Object.keys(presence2.localPresences)).to.eql(['presence-2a']); - done(); - }); - next(); - }, - function(next) { - presence2a = connection2.getPresence('test-channel'); - presence2a.create('presence-2a'); - presence2a.subscribe(function(error) { - next(error); - }); - } - ], errorHandler(done)); - }); - it('throws if trying to create local presence when wanting destroy', function(done) { presence2.destroy(errorHandler(done)); expect(function() { diff --git a/test/client/query-subscribe.js b/test/client/query-subscribe.js index a414f1468..95e286f9a 100644 --- a/test/client/query-subscribe.js +++ b/test/client/query-subscribe.js @@ -2,6 +2,7 @@ var expect = require('chai').expect; var async = require('async'); var util = require('../util'); var sinon = require('sinon'); +var ShareDBError = require('../../lib/error'); module.exports = function(options) { var getQuery = options.getQuery; @@ -11,596 +12,883 @@ module.exports = function(options) { this.matchAllDbQuery = getQuery({query: {}}); }); - it('creating a document updates a subscribed query', function(done) { - var connection = this.backend.connect(); - var query = connection.createSubscribeQuery('dogs', this.matchAllDbQuery, null, function(err) { + afterEach(function() { + sinon.restore(); + }); + + commonTests(options); + + describe('custom channels', function() { + it('only informs subscribed channels', function(done) { + this.backend.use('connect', function(context, next) { + context.agent.custom = context.req; + next(); + }); + + this.backend.use('commit', function(context, next) { + var user = context.agent.custom; + + if (user === 'sending-user-1') { + context.channels.push('channel-1'); + } + if (user === 'sending-user-2') { + context.channels.push('channel-2'); + } + next(); + }); + + this.backend.use('query', function(context, next) { + var user = context.agent.custom; + if (user === 'receiving-user') { + context.channels = ['channel-1', 'channel-2']; + } else if (user === 'not-receiving-user') { + context.channels = ['different-channel']; + } + next(); + }); + + var receivingUserConnection = this.backend.connect(null, 'receiving-user'); + var notReceivingUserConnection = this.backend.connect(null, 'not-receiving-user'); + var sendingUser1Connection = this.backend.connect(null, 'sending-user-1'); + var sendingUser2Connection = this.backend.connect(null, 'sending-user-2'); + + var notReceivingQuery = notReceivingUserConnection.createSubscribeQuery( + 'dogs', + this.matchAllDbQuery, + null, + function(err) { + if (err) return done(err); + } + ); + + notReceivingQuery.on('error', done); + notReceivingQuery.on('insert', function() { + done('User who didn\'t subscribed to sending channels shouldn\'t get the message'); + }); + + var receivingQuery = receivingUserConnection.createSubscribeQuery( + 'dogs', + this.matchAllDbQuery, + null, + function(err) { + if (err) return done(err); + sendingUser1Connection.get('dogs', '1').on('error', done).create({}); + sendingUser2Connection.get('dogs', '2').on('error', done).create({}); + } + ); + var receivedDogsCount = 0; + receivingQuery.on('error', done); + receivingQuery.on('insert', function() { + receivedDogsCount++; + if (receivedDogsCount === 2) { + var allDocsIds = receivingQuery.results.map(function(doc) { + return doc.id; + }); + expect(allDocsIds.sort()).to.be.deep.equal(['1', '2']); + done(); + } else if (receivedDogsCount > 2) { + done('It should not duplicate messages'); + } + }); + }); + + describe('one common channel', function() { + beforeEach(function() { + this.backend.use('commit', function(context, next) { + context.channels.push('channel-1'); + context.channels.push('channel-3'); + next(); + }); + this.backend.use('query', function(context, next) { + context.channels = ['channel-1', 'channel-2']; + next(); + }); + }); + + commonCustomChannelsErrorHandlingTests(); + commonTests(options); + }); + + describe('multiple common channels', function() { + beforeEach(function() { + this.backend.use('commit', function(context, next) { + context.channels.push('channel-1'); + context.channels.push('channel-2'); + next(); + }); + this.backend.use('query', function(context, next) { + context.channels = ['channel-1', 'channel-2']; + next(); + }); + }); + + it('does not duplicate messages', function(done) { + var connection = this.backend.connect(); + var count = 0; + var query = connection.createSubscribeQuery( + 'dogs', + this.matchAllDbQuery, + {pollInterval: 0, pollDebounce: 0}, + function(err) { + if (err) return done(err); + connection.get('dogs', '1').on('error', done).create({}); + connection.get('dogs', '2').on('error', done).create({}); + connection.get('dogs', '3').on('error', done).create({}); + } + ); + query.on('error', done); + query.on('insert', function() { + count++; + if (count === 3) { + var allDocsIds = query.results.map(function(doc) { + return doc.id; + }); + expect(allDocsIds.sort()).to.be.deep.equal(['1', '2', '3']); + done(); + } else if (count > 3) { + done('It should not duplicate messages'); + } + }); + }); + + commonCustomChannelsErrorHandlingTests(); + commonTests(options); + }); + + describe('backward compatibility', function() { + beforeEach(function() { + this.backend.use('commit', function(context, next) { + context.channels.push('channel-1'); + next(); + }); + this.backend.use('query', function(context, next) { + context.channel = 'channel-1'; + next(); + }); + }); + commonTests(options); + }); + }); + }); +}; + +function commonCustomChannelsErrorHandlingTests() { + it('should throw if not channels provided in query', function(done) { + this.backend.use('query', function(context, next) { + context.channels = null; + next(); + }); + var connection = this.backend.connect(); + var matchAllDbQuery = this.matchAllDbQuery; + async.parallel([ + function(cb) { + connection.get('dogs', 'fido').on('error', done).create({age: 3}, cb); + }, + function(cb) { + connection.get('dogs', 'spot').on('error', done).create({age: 5}, cb); + } + ], function(err) { + if (err) return done(err); + connection.createSubscribeQuery('dogs', matchAllDbQuery, null, function(err) { + if (!err) return done('Should throw Required minimum one query channel error'); + expect(err.message).to.be.equal('Required minimum one query channel.'); + expect(err.code).to.be.equal(ShareDBError.CODES.ERR_QUERY_CHANNEL_MISSING); + done(); + }); + }); + }); + + it('should throw if channels provided in query is an empty array', function(done) { + this.backend.use('query', function(context, next) { + context.channels = []; + next(); + }); + var connection = this.backend.connect(); + var matchAllDbQuery = this.matchAllDbQuery; + async.parallel([ + function(cb) { + connection.get('dogs', 'fido').on('error', done).create({age: 3}, cb); + }, + function(cb) { + connection.get('dogs', 'spot').on('error', done).create({age: 5}, cb); + } + ], function(err) { + if (err) return done(err); + connection.createSubscribeQuery('dogs', matchAllDbQuery, null, function(err) { + if (!err) return done('Should throw Required minimum one query channel error'); + expect(err.message).to.be.equal('Required minimum one query channel.'); + expect(err.code).to.be.equal(ShareDBError.CODES.ERR_QUERY_CHANNEL_MISSING); + done(); + }); + }); + }); +} + +function commonTests(options) { + var getQuery = options.getQuery; + + it('creating a document updates a subscribed query', function(done) { + var connection = this.backend.connect(); + var query = connection.createSubscribeQuery('dogs', this.matchAllDbQuery, null, function(err) { + if (err) return done(err); + connection.get('dogs', 'fido').on('error', done).create({age: 3}); + }); + query.on('error', done); + query.on('insert', function(docs, index) { + expect(util.pluck(docs, 'id')).eql(['fido']); + expect(util.pluck(docs, 'data')).eql([{age: 3}]); + expect(index).equal(0); + expect(util.pluck(query.results, 'id')).eql(['fido']); + expect(util.pluck(query.results, 'data')).eql([{age: 3}]); + done(); + }); + }); + + it('creating an additional document updates a subscribed query', function(done) { + var connection = this.backend.connect(); + var matchAllDbQuery = this.matchAllDbQuery; + async.parallel([ + function(cb) { + connection.get('dogs', 'fido').on('error', done).create({age: 3}, cb); + }, + function(cb) { + connection.get('dogs', 'spot').on('error', done).create({age: 5}, cb); + } + ], function(err) { + if (err) return done(err); + var query = connection.createSubscribeQuery('dogs', matchAllDbQuery, null, function(err) { if (err) return done(err); - connection.get('dogs', 'fido').on('error', done).create({age: 3}); + connection.get('dogs', 'taco').on('error', done).create({age: 2}); }); query.on('error', done); query.on('insert', function(docs, index) { - expect(util.pluck(docs, 'id')).eql(['fido']); - expect(util.pluck(docs, 'data')).eql([{age: 3}]); - expect(index).equal(0); - expect(util.pluck(query.results, 'id')).eql(['fido']); - expect(util.pluck(query.results, 'data')).eql([{age: 3}]); + expect(util.pluck(docs, 'id')).eql(['taco']); + expect(util.pluck(docs, 'data')).eql([{age: 2}]); + expect(query.results[index]).equal(docs[0]); + var results = util.sortById(query.results); + expect(util.pluck(results, 'id')).eql(['fido', 'spot', 'taco']); + expect(util.pluck(results, 'data')).eql([{age: 3}, {age: 5}, {age: 2}]); done(); }); }); + }); - it('creating an additional document updates a subscribed query', function(done) { - var connection = this.backend.connect(); - var matchAllDbQuery = this.matchAllDbQuery; - async.parallel([ - function(cb) { - connection.get('dogs', 'fido').on('error', done).create({age: 3}, cb); - }, - function(cb) { - connection.get('dogs', 'spot').on('error', done).create({age: 5}, cb); - } - ], function(err) { + it('deleting a document updates a subscribed query', function(done) { + var connection = this.backend.connect(); + var matchAllDbQuery = this.matchAllDbQuery; + async.parallel([ + function(cb) { + connection.get('dogs', 'fido').on('error', done).create({age: 3}, cb); + }, + function(cb) { + connection.get('dogs', 'spot').on('error', done).create({age: 5}, cb); + } + ], function(err) { + if (err) return done(err); + var query = connection.createSubscribeQuery('dogs', matchAllDbQuery, null, function(err) { if (err) return done(err); - var query = connection.createSubscribeQuery('dogs', matchAllDbQuery, null, function(err) { - if (err) return done(err); - connection.get('dogs', 'taco').on('error', done).create({age: 2}); - }); - query.on('error', done); - query.on('insert', function(docs, index) { - expect(util.pluck(docs, 'id')).eql(['taco']); - expect(util.pluck(docs, 'data')).eql([{age: 2}]); - expect(query.results[index]).equal(docs[0]); - var results = util.sortById(query.results); - expect(util.pluck(results, 'id')).eql(['fido', 'spot', 'taco']); - expect(util.pluck(results, 'data')).eql([{age: 3}, {age: 5}, {age: 2}]); - done(); - }); + connection.get('dogs', 'fido').del(); + }); + query.on('error', done); + query.on('remove', function(docs, index) { + expect(util.pluck(docs, 'id')).eql(['fido']); + expect(util.pluck(docs, 'data')).eql([undefined]); + expect(index).a('number'); + var results = util.sortById(query.results); + expect(util.pluck(results, 'id')).eql(['spot']); + expect(util.pluck(results, 'data')).eql([{age: 5}]); + done(); }); }); + }); - it('deleting a document updates a subscribed query', function(done) { - var connection = this.backend.connect(); - var matchAllDbQuery = this.matchAllDbQuery; - async.parallel([ - function(cb) { - connection.get('dogs', 'fido').on('error', done).create({age: 3}, cb); - }, - function(cb) { - connection.get('dogs', 'spot').on('error', done).create({age: 5}, cb); - } - ], function(err) { + it('subscribed query removes document from results before sending delete op to other clients', function(done) { + var connection1 = this.backend.connect(); + var connection2 = this.backend.connect(); + var matchAllDbQuery = this.matchAllDbQuery; + async.parallel([ + function(cb) { + connection1.get('dogs', 'fido').on('error', done).create({age: 3}, cb); + }, + function(cb) { + connection1.get('dogs', 'spot').on('error', done).create({age: 5}, cb); + } + ], function(err) { + if (err) return done(err); + var query = connection2.createSubscribeQuery('dogs', matchAllDbQuery, null, function(err) { if (err) return done(err); - var query = connection.createSubscribeQuery('dogs', matchAllDbQuery, null, function(err) { - if (err) return done(err); - connection.get('dogs', 'fido').del(); - }); - query.on('error', done); - query.on('remove', function(docs, index) { - expect(util.pluck(docs, 'id')).eql(['fido']); - expect(util.pluck(docs, 'data')).eql([undefined]); - expect(index).a('number'); - var results = util.sortById(query.results); - expect(util.pluck(results, 'id')).eql(['spot']); - expect(util.pluck(results, 'data')).eql([{age: 5}]); - done(); - }); + connection1.get('dogs', 'fido').del(); + }); + query.on('error', done); + var removed = false; + connection2.get('dogs', 'fido').on('del', function() { + expect(removed).equal(true); + done(); + }); + query.on('remove', function(docs, index) { + removed = true; + expect(util.pluck(docs, 'id')).eql(['fido']); + expect(util.pluck(docs, 'data')).eql([{age: 3}]); + expect(index).a('number'); + var results = util.sortById(query.results); + expect(util.pluck(results, 'id')).eql(['spot']); + expect(util.pluck(results, 'data')).eql([{age: 5}]); }); }); + }); - it('subscribed query removes document from results before sending delete op to other clients', function(done) { - var connection1 = this.backend.connect(); - var connection2 = this.backend.connect(); - var matchAllDbQuery = this.matchAllDbQuery; - async.parallel([ - function(cb) { - connection1.get('dogs', 'fido').on('error', done).create({age: 3}, cb); - }, - function(cb) { - connection1.get('dogs', 'spot').on('error', done).create({age: 5}, cb); - } - ], function(err) { + it('subscribed query does not get updated after destroyed', function(done) { + var connection = this.backend.connect(); + var connection2 = this.backend.connect(); + var matchAllDbQuery = this.matchAllDbQuery; + async.parallel([ + function(cb) { + connection.get('dogs', 'fido').on('error', done).create({age: 3}, cb); + }, + function(cb) { + connection.get('dogs', 'spot').on('error', done).create({age: 5}, cb); + } + ], function(err) { + if (err) return done(err); + var query = connection.createSubscribeQuery('dogs', matchAllDbQuery, null, function(err) { if (err) return done(err); - var query = connection2.createSubscribeQuery('dogs', matchAllDbQuery, null, function(err) { + query.destroy(function(err) { if (err) return done(err); - connection1.get('dogs', 'fido').del(); - }); - query.on('error', done); - var removed = false; - connection2.get('dogs', 'fido').on('del', function() { - expect(removed).equal(true); - done(); - }); - query.on('remove', function(docs, index) { - removed = true; - expect(util.pluck(docs, 'id')).eql(['fido']); - expect(util.pluck(docs, 'data')).eql([{age: 3}]); - expect(index).a('number'); - var results = util.sortById(query.results); - expect(util.pluck(results, 'id')).eql(['spot']); - expect(util.pluck(results, 'data')).eql([{age: 5}]); + connection2.get('dogs', 'taco').on('error', done).create({age: 2}, done); }); }); + query.on('error', done); + query.on('insert', function() { + done(); + }); }); + }); - it('subscribed query does not get updated after destroyed', function(done) { - var connection = this.backend.connect(); - var connection2 = this.backend.connect(); - var matchAllDbQuery = this.matchAllDbQuery; - async.parallel([ - function(cb) { - connection.get('dogs', 'fido').on('error', done).create({age: 3}, cb); - }, - function(cb) { - connection.get('dogs', 'spot').on('error', done).create({age: 5}, cb); - } - ], function(err) { + it('subscribed query does not get updated after connection is disconnected', function(done) { + var connection = this.backend.connect(); + var connection2 = this.backend.connect(); + var matchAllDbQuery = this.matchAllDbQuery; + async.parallel([ + function(cb) { + connection.get('dogs', 'fido').on('error', done).create({age: 3}, cb); + }, + function(cb) { + connection.get('dogs', 'spot').on('error', done).create({age: 5}, cb); + } + ], function(err) { + if (err) return done(err); + var query = connection.createSubscribeQuery('dogs', matchAllDbQuery, null, function(err) { if (err) return done(err); - var query = connection.createSubscribeQuery('dogs', matchAllDbQuery, null, function(err) { - if (err) return done(err); - query.destroy(function(err) { - if (err) return done(err); - connection2.get('dogs', 'taco').on('error', done).create({age: 2}, done); - }); - }); - query.on('error', done); - query.on('insert', function() { - done(); - }); + connection.close(); + connection2.get('dogs', 'taco').on('error', done).create({age: 2}, done); + }); + query.on('error', done); + query.on('insert', function() { + done(); }); }); + }); - it('subscribed query does not get updated after connection is disconnected', function(done) { - var connection = this.backend.connect(); - var connection2 = this.backend.connect(); - var matchAllDbQuery = this.matchAllDbQuery; - async.parallel([ - function(cb) { - connection.get('dogs', 'fido').on('error', done).create({age: 3}, cb); - }, - function(cb) { - connection.get('dogs', 'spot').on('error', done).create({age: 5}, cb); - } - ], function(err) { + it('subscribed query gets update after reconnecting', function(done) { + var backend = this.backend; + var connection = backend.connect(); + var connection2 = backend.connect(); + var matchAllDbQuery = this.matchAllDbQuery; + async.parallel([ + function(cb) { + connection.get('dogs', 'fido').on('error', done).create({age: 3}, cb); + }, + function(cb) { + connection.get('dogs', 'spot').on('error', done).create({age: 5}, cb); + } + ], function(err) { + if (err) return done(err); + + var wait = 2; + function finish() { + if (--wait) return; + expect(util.pluck(query.results, 'id')).to.have.members(['fido', 'spot', 'taco']); + done(); + } + + var query = connection.createSubscribeQuery('dogs', matchAllDbQuery, null, function(err) { if (err) return done(err); - var query = connection.createSubscribeQuery('dogs', matchAllDbQuery, null, function(err) { - if (err) return done(err); - connection.close(); - connection2.get('dogs', 'taco').on('error', done).create({age: 2}, done); - }); - query.on('error', done); - query.on('insert', function() { - done(); + connection.close(); + connection2.get('dogs', 'taco').on('error', done).create({age: 2}); + process.nextTick(function() { + backend.connect(connection); + query.on('ready', finish); }); }); + query.on('error', done); + query.on('insert', finish); }); + }); - it('subscribed query gets update after reconnecting', function(done) { - var backend = this.backend; - var connection = backend.connect(); - var connection2 = backend.connect(); - var matchAllDbQuery = this.matchAllDbQuery; - async.parallel([ - function(cb) { - connection.get('dogs', 'fido').on('error', done).create({age: 3}, cb); - }, - function(cb) { - connection.get('dogs', 'spot').on('error', done).create({age: 5}, cb); - } - ], function(err) { - if (err) return done(err); + it('subscribed query gets simultaneous insert and remove after reconnecting', function(done) { + var backend = this.backend; + var connection = backend.connect(); + var connection2 = backend.connect(); + var matchAllDbQuery = this.matchAllDbQuery; + async.parallel([ + function(cb) { + connection.get('dogs', 'fido').on('error', done).create({age: 3}, cb); + }, + function(cb) { + connection.get('dogs', 'spot').on('error', done).create({age: 5}, cb); + } + ], function(err) { + if (err) return done(err); + var query = connection.createSubscribeQuery('dogs', matchAllDbQuery); + query.on('error', done); - var wait = 2; - function finish() { - if (--wait) return; - expect(util.pluck(query.results, 'id')).to.have.members(['fido', 'spot', 'taco']); - done(); - } + query.once('ready', function() { + connection.close(); - var query = connection.createSubscribeQuery('dogs', matchAllDbQuery, null, function(err) { + connection2.get('dogs', 'fido').fetch(function(err) { if (err) return done(err); - connection.close(); - connection2.get('dogs', 'taco').on('error', done).create({age: 2}); - process.nextTick(function() { + async.parallel([ + function(cb) { + connection2.get('dogs', 'fido').del(cb); + }, + function(cb) { + connection2.get('dogs', 'taco').create({age: 2}, cb); + } + ], function(error) { + if (error) return done(error); backend.connect(connection); - query.on('ready', finish); - }); - }); - query.on('error', done); - query.on('insert', finish); - }); - }); - - it('subscribed query gets simultaneous insert and remove after reconnecting', function(done) { - var backend = this.backend; - var connection = backend.connect(); - var connection2 = backend.connect(); - var matchAllDbQuery = this.matchAllDbQuery; - async.parallel([ - function(cb) { - connection.get('dogs', 'fido').on('error', done).create({age: 3}, cb); - }, - function(cb) { - connection.get('dogs', 'spot').on('error', done).create({age: 5}, cb); - } - ], function(err) { - if (err) return done(err); - var query = connection.createSubscribeQuery('dogs', matchAllDbQuery); - query.on('error', done); - - query.once('ready', function() { - connection.close(); - - connection2.get('dogs', 'fido').fetch(function(err) { - if (err) return done(err); - async.parallel([ - function(cb) { - connection2.get('dogs', 'fido').del(cb); - }, - function(cb) { - connection2.get('dogs', 'taco').create({age: 2}, cb); - } - ], function(error) { - if (error) return done(error); - backend.connect(connection); - query.once('ready', function() { - finish(); - }); + query.once('ready', function() { + finish(); }); }); }); + }); - var wait = 3; - function finish() { - if (--wait) return; - var results = util.sortById(query.results); - expect(util.pluck(results, 'id')).eql(['spot', 'taco']); - expect(util.pluck(results, 'data')).eql([{age: 5}, {age: 2}]); - done(); - } - query.once('insert', function(docs) { - expect(util.pluck(docs, 'id')).eql(['taco']); - expect(util.pluck(docs, 'data')).eql([{age: 2}]); - finish(); - }); - query.once('remove', function(docs) { - expect(util.pluck(docs, 'id')).eql(['fido']); - // We don't assert the value of data, because the del op could be - // applied by the client before or after the query result is removed. - // Order of ops & query result updates is not currently guaranteed - finish(); - }); + var wait = 3; + function finish() { + if (--wait) return; + var results = util.sortById(query.results); + expect(util.pluck(results, 'id')).eql(['spot', 'taco']); + expect(util.pluck(results, 'data')).eql([{age: 5}, {age: 2}]); + done(); + } + query.once('insert', function(docs) { + expect(util.pluck(docs, 'id')).eql(['taco']); + expect(util.pluck(docs, 'data')).eql([{age: 2}]); + finish(); + }); + query.once('remove', function(docs) { + expect(util.pluck(docs, 'id')).eql(['fido']); + // We don't assert the value of data, because the del op could be + // applied by the client before or after the query result is removed. + // Order of ops & query result updates is not currently guaranteed + finish(); }); }); + }); - it('creating an additional document updates a subscribed query', function(done) { - var connection = this.backend.connect(); - var matchAllDbQuery = this.matchAllDbQuery; - async.parallel([ - function(cb) { - connection.get('dogs', 'fido').on('error', done).create({age: 3}, cb); - }, - function(cb) { - connection.get('dogs', 'spot').on('error', done).create({age: 5}, cb); - } - ], function(err) { + it('creating an additional document updates a subscribed query', function(done) { + var connection = this.backend.connect(); + var matchAllDbQuery = this.matchAllDbQuery; + async.parallel([ + function(cb) { + connection.get('dogs', 'fido').on('error', done).create({age: 3}, cb); + }, + function(cb) { + connection.get('dogs', 'spot').on('error', done).create({age: 5}, cb); + } + ], function(err) { + if (err) return done(err); + var query = connection.createSubscribeQuery('dogs', matchAllDbQuery, null, function(err) { if (err) return done(err); - var query = connection.createSubscribeQuery('dogs', matchAllDbQuery, null, function(err) { - if (err) return done(err); - connection.get('dogs', 'taco').on('error', done).create({age: 2}); - }); - query.on('error', done); - query.on('insert', function(docs, index) { - expect(util.pluck(docs, 'id')).eql(['taco']); - expect(util.pluck(docs, 'data')).eql([{age: 2}]); - expect(query.results[index]).equal(docs[0]); - var results = util.sortById(query.results); - expect(util.pluck(results, 'id')).eql(['fido', 'spot', 'taco']); - expect(util.pluck(results, 'data')).eql([{age: 3}, {age: 5}, {age: 2}]); - done(); - }); + connection.get('dogs', 'taco').on('error', done).create({age: 2}); }); - }); - - it('pollDebounce option reduces subsequent poll interval', function(done) { - var connection = this.backend.connect(); - this.backend.db.canPollDoc = function() { - return false; - }; - var query = connection.createSubscribeQuery('items', this.matchAllDbQuery, {pollDebounce: 1000}); query.on('error', done); - var batchSizes = []; - var total = 0; + query.on('insert', function(docs, index) { + expect(util.pluck(docs, 'id')).eql(['taco']); + expect(util.pluck(docs, 'data')).eql([{age: 2}]); + expect(query.results[index]).equal(docs[0]); + var results = util.sortById(query.results); + expect(util.pluck(results, 'id')).eql(['fido', 'spot', 'taco']); + expect(util.pluck(results, 'data')).eql([{age: 3}, {age: 5}, {age: 2}]); + done(); + }); + }); + }); - query.on('insert', function(docs) { - batchSizes.push(docs.length); - total += docs.length; - if (total === 1) { + it('pollDebounce option reduces subsequent poll interval', function(done) { + var clock = sinon.useFakeTimers(); + var connection = this.backend.connect(); + this.backend.db.canPollDoc = function() { + return false; + }; + var query = connection.createSubscribeQuery('items', this.matchAllDbQuery, {pollDebounce: 2000}); + query.on('error', done); + var batchSizes = []; + var total = 0; + + query.on('insert', function(docs) { + batchSizes.push(docs.length); + total += docs.length; + + if (total === 1) { // first write received by client. we're debouncing. create 9 // more documents. - for (var i = 1; i < 10; i++) { - connection.get('items', i.toString()).on('error', done).create({}); - } + var counter = 0; + for (var i = 1; i < 10; i++) { + connection.get('items', i.toString()).on('error', done).create({}, function(err) { + if (err) return done(err); + counter++; + if (counter === 9) clock.tickAsync(10000); + }); } - if (total === 10) { + } + if (total === 10) { // first document is its own batch; then subsequent creates - // are debounced until after all other 9 docs are created - expect(batchSizes).eql([1, 9]); - done(); - } - }); - - // create an initial document. this will lead to the 'insert' - // event firing the first time, while sharedb is definitely - // debouncing - connection.get('items', '0').on('error', done).create({}); + // are debounced and batched + expect(batchSizes[0]).eql(1); + batchSizes.shift(); + var sum = batchSizes.reduce(function(sum, batchSize) { + return sum + batchSize; + }, 0); + expect(batchSizes.length).to.lessThan(9); + expect(sum).eql(9); + done(); + } }); - it('db.pollDebounce option reduces subsequent poll interval', function(done) { - var connection = this.backend.connect(); - this.backend.db.canPollDoc = function() { - return false; - }; - this.backend.db.pollDebounce = 1000; - var query = connection.createSubscribeQuery('items', this.matchAllDbQuery); - query.on('error', done); - var batchSizes = []; - var total = 0; + // create an initial document. this will lead to the 'insert' + // event firing the first time, while sharedb is definitely + // debouncing + connection.get('items', '0').on('error', done).create({}, function() { + clock.tickAsync(3000); + }); + }); - query.on('insert', function(docs) { - batchSizes.push(docs.length); - total += docs.length; - if (total === 1) { + it('db.pollDebounce option reduces subsequent poll interval', function(done) { + var clock = sinon.useFakeTimers(); + var connection = this.backend.connect(); + this.backend.db.canPollDoc = function() { + return false; + }; + this.backend.db.pollDebounce = 2000; + var query = connection.createSubscribeQuery('items', this.matchAllDbQuery); + query.on('error', done); + var batchSizes = []; + var total = 0; + + query.on('insert', function(docs) { + batchSizes.push(docs.length); + total += docs.length; + + if (total === 1) { // first write received by client. we're debouncing. create 9 // more documents. - for (var i = 1; i < 10; i++) { - connection.get('items', i.toString()).on('error', done).create({}); - } + var counter = 0; + for (var i = 1; i < 10; i++) { + connection.get('items', i.toString()).on('error', done).create({}, function(err) { + if (err) return done(err); + counter++; + if (counter === 9) clock.tickAsync(10000); + }); } - if (total === 10) { + } + if (total === 10) { // first document is its own batch; then subsequent creates - // are debounced until after all other 9 docs are created - expect(batchSizes).eql([1, 9]); - done(); - } - }); + // are debounced and batched + expect(batchSizes[0]).eql(1); + batchSizes.shift(); + var sum = batchSizes.reduce(function(sum, batchSize) { + return sum + batchSize; + }, 0); + expect(batchSizes.length).to.lessThan(9); + expect(sum).eql(9); + done(); + } + }); - // create an initial document. this will lead to the 'insert' - // event firing the first time, while sharedb is definitely - // debouncing - connection.get('items', '0').on('error', done).create({}); + // create an initial document. this will lead to the 'insert' + // event firing the first time, while sharedb is definitely + // debouncing + connection.get('items', '0').on('error', done).create({}, function() { + clock.tickAsync(3000); }); + }); - it('pollInterval updates a subscribed query after an unpublished create', function(done) { - var connection = this.backend.connect(); - this.backend.suppressPublish = true; - var query = connection.createSubscribeQuery('dogs', this.matchAllDbQuery, {pollInterval: 50}, function(err) { + it('pollInterval updates a subscribed query after an unpublished create', function(done) { + var clock = sinon.useFakeTimers(); + var connection = this.backend.connect(); + this.backend.suppressPublish = true; + var query = connection.createSubscribeQuery( + 'dogs', + this.matchAllDbQuery, + {pollDebounce: 0, pollInterval: 50}, + function(err) { if (err) return done(err); - connection.get('dogs', 'fido').on('error', done).create({}); - }); - query.on('error', done); - query.on('insert', function(docs) { - expect(util.pluck(docs, 'id')).eql(['fido']); - done(); + connection.get('dogs', 'fido').on('error', done).create({}, function() { + clock.tickAsync(51); + }); + } + ); + query.on('error', done); + query.on('insert', function(docs) { + expect(util.pluck(docs, 'id')).eql(['fido']); + done(); + }); + }); + + it('db.pollInterval updates a subscribed query after an unpublished create', function(done) { + var clock = sinon.useFakeTimers(); + var connection = this.backend.connect(); + this.backend.suppressPublish = true; + this.backend.db.pollDebounce = 0; + this.backend.db.pollInterval = 50; + var query = connection.createSubscribeQuery('dogs', this.matchAllDbQuery, null, function(err) { + if (err) return done(err); + connection.get('dogs', 'fido').on('error', done).create({}, function() { + clock.tickAsync(51); }); }); + query.on('error', done); + query.on('insert', function(docs) { + expect(util.pluck(docs, 'id')).eql(['fido']); + done(); + }); + }); - it('db.pollInterval updates a subscribed query after an unpublished create', function(done) { - var connection = this.backend.connect(); - this.backend.suppressPublish = true; - this.backend.db.pollInterval = 50; - var query = connection.createSubscribeQuery('dogs', this.matchAllDbQuery, null, function(err) { - if (err) return done(err); - connection.get('dogs', 'fido').on('error', done).create({}); + it('pollInterval captures additional unpublished creates', function(done) { + var clock = sinon.useFakeTimers(); + var connection = this.backend.connect(); + this.backend.suppressPublish = true; + var count = 0; + + var query = connection.createSubscribeQuery('dogs', this.matchAllDbQuery, {pollInterval: 1000}, function(err) { + if (err) return done(err); + var doc = connection.get('dogs', count.toString()).on('error', done); + doc.create({}, function(e) { + if (e) return done(e); + clock.tickAsync(2000); }); - query.on('error', done); - query.on('insert', function(docs) { - expect(util.pluck(docs, 'id')).eql(['fido']); - done(); + }); + query.on('error', done); + query.on('insert', function() { + count++; + if (count === 3) return done(); + var doc = connection.get('dogs', count.toString()).on('error', done); + doc.create({}, function(e) { + if (e) return done(e); + clock.tickAsync(10000); }); }); + clock.tickAsync(1); + }); - it('pollInterval captures additional unpublished creates', function(done) { - var connection = this.backend.connect(); - this.backend.suppressPublish = true; - var count = 0; - var query = connection.createSubscribeQuery('dogs', this.matchAllDbQuery, {pollInterval: 50}, function(err) { - if (err) return done(err); - connection.get('dogs', count.toString()).on('error', done).create({}); + it('query extra is returned to client', function(done) { + var connection = this.backend.connect(); + this.backend.db.query = function(collection, query, fields, options, callback) { + process.nextTick(function() { + callback(null, [], {colors: ['brown', 'gold']}); }); - query.on('error', done); - query.on('insert', function() { - count++; - if (count === 3) return done(); - connection.get('dogs', count.toString()).on('error', done).create({}); + }; + var query = connection.createSubscribeQuery('dogs', this.matchAllDbQuery, null, function(err, results, extra) { + if (err) return done(err); + expect(results).eql([]); + expect(extra).eql({colors: ['brown', 'gold']}); + expect(query.extra).eql({colors: ['brown', 'gold']}); + done(); + }); + query.on('error', done); + }); + + it('query extra is updated on change', function(done) { + var connection = this.backend.connect(); + this.backend.db.query = function(collection, query, fields, options, callback) { + process.nextTick(function() { + callback(null, [], 1); }); + }; + this.backend.db.queryPoll = function(collection, query, options, callback) { + process.nextTick(function() { + callback(null, [], 2); + }); + }; + this.backend.db.canPollDoc = function() { + return false; + }; + var query = connection.createSubscribeQuery('dogs', this.matchAllDbQuery, null, function(err, results, extra) { + if (err) return done(err); + expect(extra).eql(1); + expect(query.extra).eql(1); + }); + query.on('error', done); + query.on('extra', function(extra) { + expect(extra).eql(2); + expect(query.extra).eql(2); + done(); }); + connection.get('dogs', 'fido').on('error', done).create({age: 3}); + }); - it('query extra is returned to client', function(done) { - var connection = this.backend.connect(); - this.backend.db.query = function(collection, query, fields, options, callback) { - process.nextTick(function() { - callback(null, [], {colors: ['brown', 'gold']}); - }); + describe('passing agent.custom to the DB adapter', function() { + var connection; + var expectedArg = { + agentCustom: {foo: 'bar'} + }; + beforeEach('set up', function() { + connection = this.backend.connect(); + connection.agent.custom = { + foo: 'bar' }; - var query = connection.createSubscribeQuery('dogs', this.matchAllDbQuery, null, function(err, results, extra) { - if (err) return done(err); - expect(results).eql([]); - expect(extra).eql({colors: ['brown', 'gold']}); - expect(query.extra).eql({colors: ['brown', 'gold']}); + }); + + it('sends agentCustom to the db\'s getSnapshot call', function(done) { + var query = connection.createSubscribeQuery('dogs', this.matchAllDbQuery); + var getSnapshotSpy = sinon.spy(this.backend.db, 'getSnapshot'); + + query.on('insert', function() { + // The first call to getSnapshot is when the document is created + // The seconds call is when the event is triggered, and is the one we are testing here + expect(getSnapshotSpy.callCount).to.equal(2); + expect(getSnapshotSpy.getCall(1).args[3]).to.deep.equal(expectedArg); done(); }); - query.on('error', done); + connection.get('dogs', 'fido').create({age: 3}); }); - it('query extra is updated on change', function(done) { - var connection = this.backend.connect(); - this.backend.db.query = function(collection, query, fields, options, callback) { - process.nextTick(function() { - callback(null, [], 1); - }); - }; - this.backend.db.queryPoll = function(collection, query, options, callback) { - process.nextTick(function() { - callback(null, [], 2); - }); - }; + it('sends agentCustom to the db\'s getSnapshotBulk call', function(done) { + // Ensures that getSnapshotBulk is called, instead of getSnapshot this.backend.db.canPollDoc = function() { return false; }; - var query = connection.createSubscribeQuery('dogs', this.matchAllDbQuery, null, function(err, results, extra) { - if (err) return done(err); - expect(extra).eql(1); - expect(query.extra).eql(1); - }); - query.on('error', done); - query.on('extra', function(extra) { - expect(extra).eql(2); - expect(query.extra).eql(2); + + var query = connection.createSubscribeQuery('dogs', this.matchAllDbQuery); + var getSnapshotBulkSpy = sinon.spy(this.backend.db, 'getSnapshotBulk'); + + query.on('insert', function() { + expect(getSnapshotBulkSpy.callCount).to.equal(1); + expect(getSnapshotBulkSpy.getCall(0).args[3]).to.deep.equal(expectedArg); done(); }); - connection.get('dogs', 'fido').on('error', done).create({age: 3}); + connection.get('dogs', 'fido').create({age: 3}); }); + }); - describe('passing agent.custom to the DB adapter', function() { - var connection; - var expectedArg = { - agentCustom: {foo: 'bar'} - }; - beforeEach('set up', function() { - connection = this.backend.connect(); - connection.agent.custom = { - foo: 'bar' - }; - }); - - it('sends agentCustom to the db\'s getSnapshot call', function(done) { - var query = connection.createSubscribeQuery('dogs', this.matchAllDbQuery); - var getSnapshotSpy = sinon.spy(this.backend.db, 'getSnapshot'); - - query.on('insert', function() { - // The first call to getSnapshot is when the document is created - // The seconds call is when the event is triggered, and is the one we are testing here - expect(getSnapshotSpy.callCount).to.equal(2); - expect(getSnapshotSpy.getCall(1).args[3]).to.deep.equal(expectedArg); - done(); - }); - connection.get('dogs', 'fido').create({age: 3}); + it('changing a filtered property removes from a subscribed query', function(done) { + var connection = this.backend.connect(); + async.parallel([ + function(cb) { + connection.get('dogs', 'fido').on('error', done).create({age: 3}, cb); + }, + function(cb) { + connection.get('dogs', 'spot').on('error', done).create({age: 3}, cb); + } + ], function(err) { + if (err) return done(err); + var dbQuery = getQuery({query: {age: 3}}); + var query = connection.createSubscribeQuery('dogs', dbQuery, null, function(err, results) { + if (err) return done(err); + var sorted = util.sortById(results); + expect(util.pluck(sorted, 'id')).eql(['fido', 'spot']); + expect(util.pluck(sorted, 'data')).eql([{age: 3}, {age: 3}]); + connection.get('dogs', 'fido').submitOp({p: ['age'], na: 2}); }); - - it('sends agentCustom to the db\'s getSnapshotBulk call', function(done) { - // Ensures that getSnapshotBulk is called, instead of getSnapshot - this.backend.db.canPollDoc = function() { - return false; - }; - - var query = connection.createSubscribeQuery('dogs', this.matchAllDbQuery); - var getSnapshotBulkSpy = sinon.spy(this.backend.db, 'getSnapshotBulk'); - - query.on('insert', function() { - expect(getSnapshotBulkSpy.callCount).to.equal(1); - expect(getSnapshotBulkSpy.getCall(0).args[3]).to.deep.equal(expectedArg); - done(); - }); - connection.get('dogs', 'fido').create({age: 3}); + query.on('error', done); + query.on('remove', function(docs, index) { + expect(util.pluck(docs, 'id')).eql(['fido']); + expect(util.pluck(docs, 'data')).eql([{age: 5}]); + expect(index).a('number'); + var results = util.sortById(query.results); + expect(util.pluck(results, 'id')).eql(['spot']); + expect(util.pluck(results, 'data')).eql([{age: 3}]); + done(); }); }); + }); - it('changing a filtered property removes from a subscribed query', function(done) { - var connection = this.backend.connect(); - async.parallel([ - function(cb) { - connection.get('dogs', 'fido').on('error', done).create({age: 3}, cb); - }, - function(cb) { - connection.get('dogs', 'spot').on('error', done).create({age: 3}, cb); - } - ], function(err) { + it('changing a filtered property inserts to a subscribed query', function(done) { + var connection = this.backend.connect(); + async.parallel([ + function(cb) { + connection.get('dogs', 'fido').on('error', done).create({age: 3}, cb); + }, + function(cb) { + connection.get('dogs', 'spot').on('error', done).create({age: 5}, cb); + } + ], function(err) { + if (err) return done(err); + var dbQuery = getQuery({query: {age: 3}}); + var query = connection.createSubscribeQuery('dogs', dbQuery, null, function(err, results) { if (err) return done(err); - var dbQuery = getQuery({query: {age: 3}}); - var query = connection.createSubscribeQuery('dogs', dbQuery, null, function(err, results) { - if (err) return done(err); - var sorted = util.sortById(results); - expect(util.pluck(sorted, 'id')).eql(['fido', 'spot']); - expect(util.pluck(sorted, 'data')).eql([{age: 3}, {age: 3}]); - connection.get('dogs', 'fido').submitOp({p: ['age'], na: 2}); - }); - query.on('error', done); - query.on('remove', function(docs, index) { - expect(util.pluck(docs, 'id')).eql(['fido']); - expect(util.pluck(docs, 'data')).eql([{age: 5}]); - expect(index).a('number'); - var results = util.sortById(query.results); - expect(util.pluck(results, 'id')).eql(['spot']); - expect(util.pluck(results, 'data')).eql([{age: 3}]); - done(); - }); + var sorted = util.sortById(results); + expect(util.pluck(sorted, 'id')).eql(['fido']); + expect(util.pluck(sorted, 'data')).eql([{age: 3}]); + connection.get('dogs', 'spot').submitOp({p: ['age'], na: -2}); + }); + query.on('error', done); + query.on('insert', function(docs, index) { + expect(util.pluck(docs, 'id')).eql(['spot']); + expect(util.pluck(docs, 'data')).eql([{age: 3}]); + expect(index).a('number'); + var results = util.sortById(query.results); + expect(util.pluck(results, 'id')).eql(['fido', 'spot']); + expect(util.pluck(results, 'data')).eql([{age: 3}, {age: 3}]); + done(); }); }); + }); - it('changing a filtered property inserts to a subscribed query', function(done) { - var connection = this.backend.connect(); - async.parallel([ - function(cb) { - connection.get('dogs', 'fido').on('error', done).create({age: 3}, cb); - }, - function(cb) { - connection.get('dogs', 'spot').on('error', done).create({age: 5}, cb); - } - ], function(err) { - if (err) return done(err); - var dbQuery = getQuery({query: {age: 3}}); - var query = connection.createSubscribeQuery('dogs', dbQuery, null, function(err, results) { + it('changing a sorted property moves in a subscribed query', function(done) { + var connection = this.backend.connect(); + + async.parallel([ + function(cb) { + connection.get('dogs', 'fido').on('error', done).create({age: 3}, cb); + }, + function(cb) { + connection.get('dogs', 'spot').on('error', done).create({age: 5}, cb); + } + ], function(err) { + if (err) return done(err); + var dbQuery = getQuery({query: {}, sort: [['age', 1]]}); + var query = connection.createSubscribeQuery( + 'dogs', + dbQuery, + null, + function(err, results) { if (err) return done(err); - var sorted = util.sortById(results); - expect(util.pluck(sorted, 'id')).eql(['fido']); - expect(util.pluck(sorted, 'data')).eql([{age: 3}]); - connection.get('dogs', 'spot').submitOp({p: ['age'], na: -2}); - }); - query.on('error', done); - query.on('insert', function(docs, index) { - expect(util.pluck(docs, 'id')).eql(['spot']); - expect(util.pluck(docs, 'data')).eql([{age: 3}]); - expect(index).a('number'); - var results = util.sortById(query.results); expect(util.pluck(results, 'id')).eql(['fido', 'spot']); - expect(util.pluck(results, 'data')).eql([{age: 3}, {age: 3}]); - done(); + expect(util.pluck(results, 'data')).eql([{age: 3}, {age: 5}]); + connection.get('dogs', 'spot').submitOp({p: ['age'], na: -3}); }); + query.on('error', done); + query.on('move', function(docs, from, to) { + expect(docs.length).eql(1); + expect(from).a('number'); + expect(to).a('number'); + expect(util.pluck(query.results, 'id')).eql(['spot', 'fido']); + expect(util.pluck(query.results, 'data')).eql([{age: 2}, {age: 3}]); + done(); }); }); + }); - it('changing a sorted property moves in a subscribed query', function(done) { - var connection = this.backend.connect(); - - async.parallel([ - function(cb) { - connection.get('dogs', 'fido').on('error', done).create({age: 3}, cb); - }, - function(cb) { - connection.get('dogs', 'spot').on('error', done).create({age: 5}, cb); - } - ], function(err) { - if (err) return done(err); - var dbQuery = getQuery({query: {}, sort: [['age', 1]]}); - var query = connection.createSubscribeQuery( - 'dogs', - dbQuery, - null, - function(err, results) { - if (err) return done(err); - expect(util.pluck(results, 'id')).eql(['fido', 'spot']); - expect(util.pluck(results, 'data')).eql([{age: 3}, {age: 5}]); - connection.get('dogs', 'spot').submitOp({p: ['age'], na: -3}); - }); - query.on('error', done); - query.on('move', function(docs, from, to) { - expect(docs.length).eql(1); - expect(from).a('number'); - expect(to).a('number'); - expect(util.pluck(query.results, 'id')).eql(['spot', 'fido']); - expect(util.pluck(query.results, 'data')).eql([{age: 2}, {age: 3}]); - done(); - }); - }); + it('returns pubSub error if fails to subscribe to channel', function(done) { + sinon.stub(this.backend.pubsub, 'subscribe').callsFake(function(_channel, callback) { + callback(new Error('TEST_ERROR')); }); + var connection = this.backend.connect(); + connection.createSubscribeQuery( + 'dogs', + this.matchAllDbQuery, + {pollInterval: 0, pollDebounce: 0}, + function(err) { + if (err) { + expect(err.message).to.be.equal('TEST_ERROR'); + return done(); + } else { + done('Should call callback with pubsub subscribe error'); + } + } + ); }); -}; +} diff --git a/test/logger.js b/test/logger.js index 72f75e909..193f6be5d 100644 --- a/test/logger.js +++ b/test/logger.js @@ -8,10 +8,6 @@ describe('Logger', function() { sinon.stub(console, 'warn'); }); - afterEach(function() { - sinon.restore(); - }); - it('logs to console by default', function() { var logger = new Logger(); logger.warn('warning'); diff --git a/test/setup.js b/test/setup.js index d8699ef3b..6c1c91f98 100644 --- a/test/setup.js +++ b/test/setup.js @@ -1,4 +1,5 @@ var logger = require('../lib/logger'); +var sinon = require('sinon'); if (process.env.LOGGING !== 'true') { // Silence the logger for tests by setting all its methods to no-ops @@ -8,3 +9,7 @@ if (process.env.LOGGING !== 'true') { error: function() {} }); } + +afterEach(function() { + sinon.restore(); +});