diff --git a/docs/middleware/actions.md b/docs/middleware/actions.md index 0b9ab496..bcd8ba71 100644 --- a/docs/middleware/actions.md +++ b/docs/middleware/actions.md @@ -149,9 +149,13 @@ This action has these additional `context` properties: > The query's projection [fields]({{ site.baseurl }}{% link api/backend.md %}#addprojection) -`channel` -- string +`channel` -- string (deprecated) -> The [Pub/Sub]({{ site.baseurl }}{% link adapters/pub-sub.md %}) channel the query will subscribe to. Defaults to its collection channel. +> This property is deprecated use `channels` instead. The [Pub/Sub]({{ site.baseurl }}{% link adapters/pub-sub.md %}) channels the query will subscribe to. Defaults to its collection channel. + +`channels` -- string[] + +> The [Pub/Sub]({{ site.baseurl }}{% link adapters/pub-sub.md %}) channels the query will subscribe to. Defaults to its collection channel. `query` -- Object diff --git a/docs/queries.md b/docs/queries.md index 930ef43b..bd7d9d3b 100644 --- a/docs/queries.md +++ b/docs/queries.md @@ -160,7 +160,7 @@ backend.use('commit', (context, next) => { backend.use('query', (context, next) => { // Set our query to only listen for changes on our user-specific channel - context.channel = userChannel(context) + context.channels = [userChannel(context)] next() }) diff --git a/lib/backend.js b/lib/backend.js index 0a4f6a88..08cfa348 100644 --- a/lib/backend.js +++ b/lib/backend.js @@ -661,10 +661,32 @@ Backend.prototype.querySubscribe = function(agent, index, query, options, callba 'DB does not support subscribe' )); } - backend.pubsub.subscribe(request.channel, function(err, stream) { - if (err) return callback(err); + + var channels = request.channels; + + if (request.channel) { + logger.warn( + '[DEPRECATED] "query" middleware\'s context.channel is deprecated, use context.channels instead. ' + + 'Read more: https://share.github.io/sharedb/middleware/actions#query' + ); + channels = [request.channel]; + } + + if (!channels || !channels.length) { + return callback(new ShareDBError(ERROR_CODE.ERR_QUERY_CHANNEL_MISSING, 'Required minimum one query channel.')); + } + + var streams = []; + + function destroyStreams() { + streams.forEach(function(stream) { + stream.destroy(); + }); + } + + function createQueryEmitter() { if (options.ids) { - var queryEmitter = new QueryEmitter(request, stream, options.ids); + var queryEmitter = new QueryEmitter(request, streams, options.ids); backend.emit('timing', 'querySubscribe.reconnect', Date.now() - start, request); callback(null, queryEmitter); return; @@ -672,14 +694,29 @@ Backend.prototype.querySubscribe = function(agent, index, query, options, callba // Issue query on db to get our initial results backend._query(agent, request, function(err, snapshots, extra) { if (err) { - stream.destroy(); + destroyStreams(); return callback(err); } var ids = pluckIds(snapshots); - var queryEmitter = new QueryEmitter(request, stream, ids, extra); + var queryEmitter = new QueryEmitter(request, streams, ids, extra); backend.emit('timing', 'querySubscribe.initial', Date.now() - start, request); callback(null, queryEmitter, snapshots, extra); }); + } + + channels.forEach(function(channel) { + backend.pubsub.subscribe(channel, function(err, stream) { + if (err) { + destroyStreams(); + return callback(err); + } + streams.push(stream); + + var subscribedToAllChannels = streams.length === channels.length; + if (subscribedToAllChannels) { + createQueryEmitter(); + } + }); }); }); }; @@ -693,7 +730,7 @@ Backend.prototype._triggerQuery = function(agent, index, query, options, callbac collection: collection, projection: projection, fields: fields, - channel: this.getCollectionChannel(collection), + channels: [this.getCollectionChannel(collection)], query: query, options: options, db: null, diff --git a/lib/error.js b/lib/error.js index 92968464..a93c8467 100644 --- a/lib/error.js +++ b/lib/error.js @@ -45,6 +45,7 @@ ShareDBError.CODES = { ERR_OT_OP_NOT_PROVIDED: 'ERR_OT_OP_NOT_PROVIDED', ERR_PRESENCE_TRANSFORM_FAILED: 'ERR_PRESENCE_TRANSFORM_FAILED', ERR_PROTOCOL_VERSION_NOT_SUPPORTED: 'ERR_PROTOCOL_VERSION_NOT_SUPPORTED', + ERR_QUERY_CHANNEL_MISSING: 'ERR_QUERY_CHANNEL_MISSING', ERR_QUERY_EMITTER_LISTENER_NOT_ASSIGNED: 'ERR_QUERY_EMITTER_LISTENER_NOT_ASSIGNED', /** * A special error that a "readSnapshots" middleware implementation can use to indicate that it diff --git a/lib/query-emitter.js b/lib/query-emitter.js index 79877bc3..cfbd930a 100644 --- a/lib/query-emitter.js +++ b/lib/query-emitter.js @@ -5,7 +5,7 @@ var util = require('./util'); var ERROR_CODE = ShareDBError.CODES; -function QueryEmitter(request, stream, ids, extra) { +function QueryEmitter(request, streams, ids, extra) { this.backend = request.backend; this.agent = request.agent; this.db = request.db; @@ -15,7 +15,7 @@ function QueryEmitter(request, stream, ids, extra) { this.fields = request.fields; this.options = request.options; this.snapshotProjection = request.snapshotProjection; - this.stream = stream; + this.streams = streams; this.ids = ids; this.extra = extra; @@ -23,10 +23,12 @@ function QueryEmitter(request, stream, ids, extra) { this.canPollDoc = this.db.canPollDoc(this.collection, this.query); this.pollDebounce = (typeof this.options.pollDebounce === 'number') ? this.options.pollDebounce : - (typeof this.db.pollDebounce === 'number') ? this.db.pollDebounce : 0; + (typeof this.db.pollDebounce === 'number') ? this.db.pollDebounce : + streams.length > 1 ? 1000 : 0; this.pollInterval = (typeof this.options.pollInterval === 'number') ? this.options.pollInterval : - (typeof this.db.pollInterval === 'number') ? this.db.pollInterval : 0; + (typeof this.db.pollInterval === 'number') ? this.db.pollInterval : + streams.length > 1 ? 1000 : 0; this._polling = false; this._pendingPoll = null; @@ -41,15 +43,19 @@ QueryEmitter.prototype._open = function() { this._defaultCallback = function(err) { if (err) emitter.onError(err); }; - emitter.stream.on('data', function(data) { - if (data.error) { - return emitter.onError(data.error); - } - emitter._update(data); - }); - emitter.stream.on('end', function() { - emitter.destroy(); + + emitter.streams.forEach(function(stream) { + stream.on('data', function(data) { + if (data.error) { + return emitter.onError(data.error); + } + emitter._update(data); + }); + stream.on('end', function() { + emitter.destroy(); + }); }); + // Make sure we start polling if pollInterval is being used this._flushPoll(); }; @@ -57,7 +63,12 @@ QueryEmitter.prototype._open = function() { QueryEmitter.prototype.destroy = function() { clearTimeout(this._pollDebounceId); clearTimeout(this._pollIntervalId); - this.stream.destroy(); + + var stream; + + while (stream = this.streams.pop()) { + stream.destroy(); + } }; QueryEmitter.prototype._emitTiming = function(action, start) { @@ -140,8 +151,8 @@ QueryEmitter.prototype._flushPoll = function() { if (this._pendingPoll) { this.queryPoll(); - // If a pollInterval is specified, poll if the query doesn't get polled in - // the time of the interval + // If a pollInterval is specified, poll if the query doesn't get polled in + // the time of the interval } else if (this.pollInterval) { var emitter = this; this._pollIntervalId = setTimeout(function() { @@ -301,14 +312,14 @@ QueryEmitter.prototype.queryPollDoc = function(id, callback) { // all messages are received and applied in order, so it is critical that none // are dropped. QueryEmitter.prototype.onError = -QueryEmitter.prototype.onDiff = -QueryEmitter.prototype.onExtra = -QueryEmitter.prototype.onOp = function() { - throw new ShareDBError( - ERROR_CODE.ERR_QUERY_EMITTER_LISTENER_NOT_ASSIGNED, - 'Required QueryEmitter listener not assigned' - ); -}; + QueryEmitter.prototype.onDiff = + QueryEmitter.prototype.onExtra = + QueryEmitter.prototype.onOp = function() { + throw new ShareDBError( + ERROR_CODE.ERR_QUERY_EMITTER_LISTENER_NOT_ASSIGNED, + 'Required QueryEmitter listener not assigned' + ); + }; function getInserted(diff) { var inserted = []; diff --git a/package.json b/package.json index e0c05cba..fe70b7b9 100644 --- a/package.json +++ b/package.json @@ -22,7 +22,7 @@ "ot-json1": "^0.3.0", "rich-text": "^4.1.0", "sharedb-legacy": "npm:sharedb@=1.1.0", - "sinon": "^7.5.0" + "sinon": "^9.2.4" }, "files": [ "lib/", diff --git a/test/client/presence/presence.js b/test/client/presence/presence.js index c2a59c49..f7a52762 100644 --- a/test/client/presence/presence.js +++ b/test/client/presence/presence.js @@ -38,7 +38,6 @@ describe('Presence', function() { }); afterEach(function(done) { - sinon.restore(); connection1.close(); connection2.close(); backend.close(done); @@ -147,30 +146,6 @@ describe('Presence', function() { ], errorHandler(done)); }); - it('destroys old local presence but keeps new local presence when getting during destroy', function(done) { - presence2.create('presence-2'); - var presence2a; - - async.series([ - presence2.subscribe.bind(presence2), - function(next) { - presence2.destroy(function() { - expect(presence2).to.equal(presence2a); - expect(Object.keys(presence2.localPresences)).to.eql(['presence-2a']); - done(); - }); - next(); - }, - function(next) { - presence2a = connection2.getPresence('test-channel'); - presence2a.create('presence-2a'); - presence2a.subscribe(function(error) { - next(error); - }); - } - ], errorHandler(done)); - }); - it('throws if trying to create local presence when wanting destroy', function(done) { presence2.destroy(errorHandler(done)); expect(function() { diff --git a/test/client/query-subscribe.js b/test/client/query-subscribe.js index a414f146..95e286f9 100644 --- a/test/client/query-subscribe.js +++ b/test/client/query-subscribe.js @@ -2,6 +2,7 @@ var expect = require('chai').expect; var async = require('async'); var util = require('../util'); var sinon = require('sinon'); +var ShareDBError = require('../../lib/error'); module.exports = function(options) { var getQuery = options.getQuery; @@ -11,596 +12,883 @@ module.exports = function(options) { this.matchAllDbQuery = getQuery({query: {}}); }); - it('creating a document updates a subscribed query', function(done) { - var connection = this.backend.connect(); - var query = connection.createSubscribeQuery('dogs', this.matchAllDbQuery, null, function(err) { + afterEach(function() { + sinon.restore(); + }); + + commonTests(options); + + describe('custom channels', function() { + it('only informs subscribed channels', function(done) { + this.backend.use('connect', function(context, next) { + context.agent.custom = context.req; + next(); + }); + + this.backend.use('commit', function(context, next) { + var user = context.agent.custom; + + if (user === 'sending-user-1') { + context.channels.push('channel-1'); + } + if (user === 'sending-user-2') { + context.channels.push('channel-2'); + } + next(); + }); + + this.backend.use('query', function(context, next) { + var user = context.agent.custom; + if (user === 'receiving-user') { + context.channels = ['channel-1', 'channel-2']; + } else if (user === 'not-receiving-user') { + context.channels = ['different-channel']; + } + next(); + }); + + var receivingUserConnection = this.backend.connect(null, 'receiving-user'); + var notReceivingUserConnection = this.backend.connect(null, 'not-receiving-user'); + var sendingUser1Connection = this.backend.connect(null, 'sending-user-1'); + var sendingUser2Connection = this.backend.connect(null, 'sending-user-2'); + + var notReceivingQuery = notReceivingUserConnection.createSubscribeQuery( + 'dogs', + this.matchAllDbQuery, + null, + function(err) { + if (err) return done(err); + } + ); + + notReceivingQuery.on('error', done); + notReceivingQuery.on('insert', function() { + done('User who didn\'t subscribed to sending channels shouldn\'t get the message'); + }); + + var receivingQuery = receivingUserConnection.createSubscribeQuery( + 'dogs', + this.matchAllDbQuery, + null, + function(err) { + if (err) return done(err); + sendingUser1Connection.get('dogs', '1').on('error', done).create({}); + sendingUser2Connection.get('dogs', '2').on('error', done).create({}); + } + ); + var receivedDogsCount = 0; + receivingQuery.on('error', done); + receivingQuery.on('insert', function() { + receivedDogsCount++; + if (receivedDogsCount === 2) { + var allDocsIds = receivingQuery.results.map(function(doc) { + return doc.id; + }); + expect(allDocsIds.sort()).to.be.deep.equal(['1', '2']); + done(); + } else if (receivedDogsCount > 2) { + done('It should not duplicate messages'); + } + }); + }); + + describe('one common channel', function() { + beforeEach(function() { + this.backend.use('commit', function(context, next) { + context.channels.push('channel-1'); + context.channels.push('channel-3'); + next(); + }); + this.backend.use('query', function(context, next) { + context.channels = ['channel-1', 'channel-2']; + next(); + }); + }); + + commonCustomChannelsErrorHandlingTests(); + commonTests(options); + }); + + describe('multiple common channels', function() { + beforeEach(function() { + this.backend.use('commit', function(context, next) { + context.channels.push('channel-1'); + context.channels.push('channel-2'); + next(); + }); + this.backend.use('query', function(context, next) { + context.channels = ['channel-1', 'channel-2']; + next(); + }); + }); + + it('does not duplicate messages', function(done) { + var connection = this.backend.connect(); + var count = 0; + var query = connection.createSubscribeQuery( + 'dogs', + this.matchAllDbQuery, + {pollInterval: 0, pollDebounce: 0}, + function(err) { + if (err) return done(err); + connection.get('dogs', '1').on('error', done).create({}); + connection.get('dogs', '2').on('error', done).create({}); + connection.get('dogs', '3').on('error', done).create({}); + } + ); + query.on('error', done); + query.on('insert', function() { + count++; + if (count === 3) { + var allDocsIds = query.results.map(function(doc) { + return doc.id; + }); + expect(allDocsIds.sort()).to.be.deep.equal(['1', '2', '3']); + done(); + } else if (count > 3) { + done('It should not duplicate messages'); + } + }); + }); + + commonCustomChannelsErrorHandlingTests(); + commonTests(options); + }); + + describe('backward compatibility', function() { + beforeEach(function() { + this.backend.use('commit', function(context, next) { + context.channels.push('channel-1'); + next(); + }); + this.backend.use('query', function(context, next) { + context.channel = 'channel-1'; + next(); + }); + }); + commonTests(options); + }); + }); + }); +}; + +function commonCustomChannelsErrorHandlingTests() { + it('should throw if not channels provided in query', function(done) { + this.backend.use('query', function(context, next) { + context.channels = null; + next(); + }); + var connection = this.backend.connect(); + var matchAllDbQuery = this.matchAllDbQuery; + async.parallel([ + function(cb) { + connection.get('dogs', 'fido').on('error', done).create({age: 3}, cb); + }, + function(cb) { + connection.get('dogs', 'spot').on('error', done).create({age: 5}, cb); + } + ], function(err) { + if (err) return done(err); + connection.createSubscribeQuery('dogs', matchAllDbQuery, null, function(err) { + if (!err) return done('Should throw Required minimum one query channel error'); + expect(err.message).to.be.equal('Required minimum one query channel.'); + expect(err.code).to.be.equal(ShareDBError.CODES.ERR_QUERY_CHANNEL_MISSING); + done(); + }); + }); + }); + + it('should throw if channels provided in query is an empty array', function(done) { + this.backend.use('query', function(context, next) { + context.channels = []; + next(); + }); + var connection = this.backend.connect(); + var matchAllDbQuery = this.matchAllDbQuery; + async.parallel([ + function(cb) { + connection.get('dogs', 'fido').on('error', done).create({age: 3}, cb); + }, + function(cb) { + connection.get('dogs', 'spot').on('error', done).create({age: 5}, cb); + } + ], function(err) { + if (err) return done(err); + connection.createSubscribeQuery('dogs', matchAllDbQuery, null, function(err) { + if (!err) return done('Should throw Required minimum one query channel error'); + expect(err.message).to.be.equal('Required minimum one query channel.'); + expect(err.code).to.be.equal(ShareDBError.CODES.ERR_QUERY_CHANNEL_MISSING); + done(); + }); + }); + }); +} + +function commonTests(options) { + var getQuery = options.getQuery; + + it('creating a document updates a subscribed query', function(done) { + var connection = this.backend.connect(); + var query = connection.createSubscribeQuery('dogs', this.matchAllDbQuery, null, function(err) { + if (err) return done(err); + connection.get('dogs', 'fido').on('error', done).create({age: 3}); + }); + query.on('error', done); + query.on('insert', function(docs, index) { + expect(util.pluck(docs, 'id')).eql(['fido']); + expect(util.pluck(docs, 'data')).eql([{age: 3}]); + expect(index).equal(0); + expect(util.pluck(query.results, 'id')).eql(['fido']); + expect(util.pluck(query.results, 'data')).eql([{age: 3}]); + done(); + }); + }); + + it('creating an additional document updates a subscribed query', function(done) { + var connection = this.backend.connect(); + var matchAllDbQuery = this.matchAllDbQuery; + async.parallel([ + function(cb) { + connection.get('dogs', 'fido').on('error', done).create({age: 3}, cb); + }, + function(cb) { + connection.get('dogs', 'spot').on('error', done).create({age: 5}, cb); + } + ], function(err) { + if (err) return done(err); + var query = connection.createSubscribeQuery('dogs', matchAllDbQuery, null, function(err) { if (err) return done(err); - connection.get('dogs', 'fido').on('error', done).create({age: 3}); + connection.get('dogs', 'taco').on('error', done).create({age: 2}); }); query.on('error', done); query.on('insert', function(docs, index) { - expect(util.pluck(docs, 'id')).eql(['fido']); - expect(util.pluck(docs, 'data')).eql([{age: 3}]); - expect(index).equal(0); - expect(util.pluck(query.results, 'id')).eql(['fido']); - expect(util.pluck(query.results, 'data')).eql([{age: 3}]); + expect(util.pluck(docs, 'id')).eql(['taco']); + expect(util.pluck(docs, 'data')).eql([{age: 2}]); + expect(query.results[index]).equal(docs[0]); + var results = util.sortById(query.results); + expect(util.pluck(results, 'id')).eql(['fido', 'spot', 'taco']); + expect(util.pluck(results, 'data')).eql([{age: 3}, {age: 5}, {age: 2}]); done(); }); }); + }); - it('creating an additional document updates a subscribed query', function(done) { - var connection = this.backend.connect(); - var matchAllDbQuery = this.matchAllDbQuery; - async.parallel([ - function(cb) { - connection.get('dogs', 'fido').on('error', done).create({age: 3}, cb); - }, - function(cb) { - connection.get('dogs', 'spot').on('error', done).create({age: 5}, cb); - } - ], function(err) { + it('deleting a document updates a subscribed query', function(done) { + var connection = this.backend.connect(); + var matchAllDbQuery = this.matchAllDbQuery; + async.parallel([ + function(cb) { + connection.get('dogs', 'fido').on('error', done).create({age: 3}, cb); + }, + function(cb) { + connection.get('dogs', 'spot').on('error', done).create({age: 5}, cb); + } + ], function(err) { + if (err) return done(err); + var query = connection.createSubscribeQuery('dogs', matchAllDbQuery, null, function(err) { if (err) return done(err); - var query = connection.createSubscribeQuery('dogs', matchAllDbQuery, null, function(err) { - if (err) return done(err); - connection.get('dogs', 'taco').on('error', done).create({age: 2}); - }); - query.on('error', done); - query.on('insert', function(docs, index) { - expect(util.pluck(docs, 'id')).eql(['taco']); - expect(util.pluck(docs, 'data')).eql([{age: 2}]); - expect(query.results[index]).equal(docs[0]); - var results = util.sortById(query.results); - expect(util.pluck(results, 'id')).eql(['fido', 'spot', 'taco']); - expect(util.pluck(results, 'data')).eql([{age: 3}, {age: 5}, {age: 2}]); - done(); - }); + connection.get('dogs', 'fido').del(); + }); + query.on('error', done); + query.on('remove', function(docs, index) { + expect(util.pluck(docs, 'id')).eql(['fido']); + expect(util.pluck(docs, 'data')).eql([undefined]); + expect(index).a('number'); + var results = util.sortById(query.results); + expect(util.pluck(results, 'id')).eql(['spot']); + expect(util.pluck(results, 'data')).eql([{age: 5}]); + done(); }); }); + }); - it('deleting a document updates a subscribed query', function(done) { - var connection = this.backend.connect(); - var matchAllDbQuery = this.matchAllDbQuery; - async.parallel([ - function(cb) { - connection.get('dogs', 'fido').on('error', done).create({age: 3}, cb); - }, - function(cb) { - connection.get('dogs', 'spot').on('error', done).create({age: 5}, cb); - } - ], function(err) { + it('subscribed query removes document from results before sending delete op to other clients', function(done) { + var connection1 = this.backend.connect(); + var connection2 = this.backend.connect(); + var matchAllDbQuery = this.matchAllDbQuery; + async.parallel([ + function(cb) { + connection1.get('dogs', 'fido').on('error', done).create({age: 3}, cb); + }, + function(cb) { + connection1.get('dogs', 'spot').on('error', done).create({age: 5}, cb); + } + ], function(err) { + if (err) return done(err); + var query = connection2.createSubscribeQuery('dogs', matchAllDbQuery, null, function(err) { if (err) return done(err); - var query = connection.createSubscribeQuery('dogs', matchAllDbQuery, null, function(err) { - if (err) return done(err); - connection.get('dogs', 'fido').del(); - }); - query.on('error', done); - query.on('remove', function(docs, index) { - expect(util.pluck(docs, 'id')).eql(['fido']); - expect(util.pluck(docs, 'data')).eql([undefined]); - expect(index).a('number'); - var results = util.sortById(query.results); - expect(util.pluck(results, 'id')).eql(['spot']); - expect(util.pluck(results, 'data')).eql([{age: 5}]); - done(); - }); + connection1.get('dogs', 'fido').del(); + }); + query.on('error', done); + var removed = false; + connection2.get('dogs', 'fido').on('del', function() { + expect(removed).equal(true); + done(); + }); + query.on('remove', function(docs, index) { + removed = true; + expect(util.pluck(docs, 'id')).eql(['fido']); + expect(util.pluck(docs, 'data')).eql([{age: 3}]); + expect(index).a('number'); + var results = util.sortById(query.results); + expect(util.pluck(results, 'id')).eql(['spot']); + expect(util.pluck(results, 'data')).eql([{age: 5}]); }); }); + }); - it('subscribed query removes document from results before sending delete op to other clients', function(done) { - var connection1 = this.backend.connect(); - var connection2 = this.backend.connect(); - var matchAllDbQuery = this.matchAllDbQuery; - async.parallel([ - function(cb) { - connection1.get('dogs', 'fido').on('error', done).create({age: 3}, cb); - }, - function(cb) { - connection1.get('dogs', 'spot').on('error', done).create({age: 5}, cb); - } - ], function(err) { + it('subscribed query does not get updated after destroyed', function(done) { + var connection = this.backend.connect(); + var connection2 = this.backend.connect(); + var matchAllDbQuery = this.matchAllDbQuery; + async.parallel([ + function(cb) { + connection.get('dogs', 'fido').on('error', done).create({age: 3}, cb); + }, + function(cb) { + connection.get('dogs', 'spot').on('error', done).create({age: 5}, cb); + } + ], function(err) { + if (err) return done(err); + var query = connection.createSubscribeQuery('dogs', matchAllDbQuery, null, function(err) { if (err) return done(err); - var query = connection2.createSubscribeQuery('dogs', matchAllDbQuery, null, function(err) { + query.destroy(function(err) { if (err) return done(err); - connection1.get('dogs', 'fido').del(); - }); - query.on('error', done); - var removed = false; - connection2.get('dogs', 'fido').on('del', function() { - expect(removed).equal(true); - done(); - }); - query.on('remove', function(docs, index) { - removed = true; - expect(util.pluck(docs, 'id')).eql(['fido']); - expect(util.pluck(docs, 'data')).eql([{age: 3}]); - expect(index).a('number'); - var results = util.sortById(query.results); - expect(util.pluck(results, 'id')).eql(['spot']); - expect(util.pluck(results, 'data')).eql([{age: 5}]); + connection2.get('dogs', 'taco').on('error', done).create({age: 2}, done); }); }); + query.on('error', done); + query.on('insert', function() { + done(); + }); }); + }); - it('subscribed query does not get updated after destroyed', function(done) { - var connection = this.backend.connect(); - var connection2 = this.backend.connect(); - var matchAllDbQuery = this.matchAllDbQuery; - async.parallel([ - function(cb) { - connection.get('dogs', 'fido').on('error', done).create({age: 3}, cb); - }, - function(cb) { - connection.get('dogs', 'spot').on('error', done).create({age: 5}, cb); - } - ], function(err) { + it('subscribed query does not get updated after connection is disconnected', function(done) { + var connection = this.backend.connect(); + var connection2 = this.backend.connect(); + var matchAllDbQuery = this.matchAllDbQuery; + async.parallel([ + function(cb) { + connection.get('dogs', 'fido').on('error', done).create({age: 3}, cb); + }, + function(cb) { + connection.get('dogs', 'spot').on('error', done).create({age: 5}, cb); + } + ], function(err) { + if (err) return done(err); + var query = connection.createSubscribeQuery('dogs', matchAllDbQuery, null, function(err) { if (err) return done(err); - var query = connection.createSubscribeQuery('dogs', matchAllDbQuery, null, function(err) { - if (err) return done(err); - query.destroy(function(err) { - if (err) return done(err); - connection2.get('dogs', 'taco').on('error', done).create({age: 2}, done); - }); - }); - query.on('error', done); - query.on('insert', function() { - done(); - }); + connection.close(); + connection2.get('dogs', 'taco').on('error', done).create({age: 2}, done); + }); + query.on('error', done); + query.on('insert', function() { + done(); }); }); + }); - it('subscribed query does not get updated after connection is disconnected', function(done) { - var connection = this.backend.connect(); - var connection2 = this.backend.connect(); - var matchAllDbQuery = this.matchAllDbQuery; - async.parallel([ - function(cb) { - connection.get('dogs', 'fido').on('error', done).create({age: 3}, cb); - }, - function(cb) { - connection.get('dogs', 'spot').on('error', done).create({age: 5}, cb); - } - ], function(err) { + it('subscribed query gets update after reconnecting', function(done) { + var backend = this.backend; + var connection = backend.connect(); + var connection2 = backend.connect(); + var matchAllDbQuery = this.matchAllDbQuery; + async.parallel([ + function(cb) { + connection.get('dogs', 'fido').on('error', done).create({age: 3}, cb); + }, + function(cb) { + connection.get('dogs', 'spot').on('error', done).create({age: 5}, cb); + } + ], function(err) { + if (err) return done(err); + + var wait = 2; + function finish() { + if (--wait) return; + expect(util.pluck(query.results, 'id')).to.have.members(['fido', 'spot', 'taco']); + done(); + } + + var query = connection.createSubscribeQuery('dogs', matchAllDbQuery, null, function(err) { if (err) return done(err); - var query = connection.createSubscribeQuery('dogs', matchAllDbQuery, null, function(err) { - if (err) return done(err); - connection.close(); - connection2.get('dogs', 'taco').on('error', done).create({age: 2}, done); - }); - query.on('error', done); - query.on('insert', function() { - done(); + connection.close(); + connection2.get('dogs', 'taco').on('error', done).create({age: 2}); + process.nextTick(function() { + backend.connect(connection); + query.on('ready', finish); }); }); + query.on('error', done); + query.on('insert', finish); }); + }); - it('subscribed query gets update after reconnecting', function(done) { - var backend = this.backend; - var connection = backend.connect(); - var connection2 = backend.connect(); - var matchAllDbQuery = this.matchAllDbQuery; - async.parallel([ - function(cb) { - connection.get('dogs', 'fido').on('error', done).create({age: 3}, cb); - }, - function(cb) { - connection.get('dogs', 'spot').on('error', done).create({age: 5}, cb); - } - ], function(err) { - if (err) return done(err); + it('subscribed query gets simultaneous insert and remove after reconnecting', function(done) { + var backend = this.backend; + var connection = backend.connect(); + var connection2 = backend.connect(); + var matchAllDbQuery = this.matchAllDbQuery; + async.parallel([ + function(cb) { + connection.get('dogs', 'fido').on('error', done).create({age: 3}, cb); + }, + function(cb) { + connection.get('dogs', 'spot').on('error', done).create({age: 5}, cb); + } + ], function(err) { + if (err) return done(err); + var query = connection.createSubscribeQuery('dogs', matchAllDbQuery); + query.on('error', done); - var wait = 2; - function finish() { - if (--wait) return; - expect(util.pluck(query.results, 'id')).to.have.members(['fido', 'spot', 'taco']); - done(); - } + query.once('ready', function() { + connection.close(); - var query = connection.createSubscribeQuery('dogs', matchAllDbQuery, null, function(err) { + connection2.get('dogs', 'fido').fetch(function(err) { if (err) return done(err); - connection.close(); - connection2.get('dogs', 'taco').on('error', done).create({age: 2}); - process.nextTick(function() { + async.parallel([ + function(cb) { + connection2.get('dogs', 'fido').del(cb); + }, + function(cb) { + connection2.get('dogs', 'taco').create({age: 2}, cb); + } + ], function(error) { + if (error) return done(error); backend.connect(connection); - query.on('ready', finish); - }); - }); - query.on('error', done); - query.on('insert', finish); - }); - }); - - it('subscribed query gets simultaneous insert and remove after reconnecting', function(done) { - var backend = this.backend; - var connection = backend.connect(); - var connection2 = backend.connect(); - var matchAllDbQuery = this.matchAllDbQuery; - async.parallel([ - function(cb) { - connection.get('dogs', 'fido').on('error', done).create({age: 3}, cb); - }, - function(cb) { - connection.get('dogs', 'spot').on('error', done).create({age: 5}, cb); - } - ], function(err) { - if (err) return done(err); - var query = connection.createSubscribeQuery('dogs', matchAllDbQuery); - query.on('error', done); - - query.once('ready', function() { - connection.close(); - - connection2.get('dogs', 'fido').fetch(function(err) { - if (err) return done(err); - async.parallel([ - function(cb) { - connection2.get('dogs', 'fido').del(cb); - }, - function(cb) { - connection2.get('dogs', 'taco').create({age: 2}, cb); - } - ], function(error) { - if (error) return done(error); - backend.connect(connection); - query.once('ready', function() { - finish(); - }); + query.once('ready', function() { + finish(); }); }); }); + }); - var wait = 3; - function finish() { - if (--wait) return; - var results = util.sortById(query.results); - expect(util.pluck(results, 'id')).eql(['spot', 'taco']); - expect(util.pluck(results, 'data')).eql([{age: 5}, {age: 2}]); - done(); - } - query.once('insert', function(docs) { - expect(util.pluck(docs, 'id')).eql(['taco']); - expect(util.pluck(docs, 'data')).eql([{age: 2}]); - finish(); - }); - query.once('remove', function(docs) { - expect(util.pluck(docs, 'id')).eql(['fido']); - // We don't assert the value of data, because the del op could be - // applied by the client before or after the query result is removed. - // Order of ops & query result updates is not currently guaranteed - finish(); - }); + var wait = 3; + function finish() { + if (--wait) return; + var results = util.sortById(query.results); + expect(util.pluck(results, 'id')).eql(['spot', 'taco']); + expect(util.pluck(results, 'data')).eql([{age: 5}, {age: 2}]); + done(); + } + query.once('insert', function(docs) { + expect(util.pluck(docs, 'id')).eql(['taco']); + expect(util.pluck(docs, 'data')).eql([{age: 2}]); + finish(); + }); + query.once('remove', function(docs) { + expect(util.pluck(docs, 'id')).eql(['fido']); + // We don't assert the value of data, because the del op could be + // applied by the client before or after the query result is removed. + // Order of ops & query result updates is not currently guaranteed + finish(); }); }); + }); - it('creating an additional document updates a subscribed query', function(done) { - var connection = this.backend.connect(); - var matchAllDbQuery = this.matchAllDbQuery; - async.parallel([ - function(cb) { - connection.get('dogs', 'fido').on('error', done).create({age: 3}, cb); - }, - function(cb) { - connection.get('dogs', 'spot').on('error', done).create({age: 5}, cb); - } - ], function(err) { + it('creating an additional document updates a subscribed query', function(done) { + var connection = this.backend.connect(); + var matchAllDbQuery = this.matchAllDbQuery; + async.parallel([ + function(cb) { + connection.get('dogs', 'fido').on('error', done).create({age: 3}, cb); + }, + function(cb) { + connection.get('dogs', 'spot').on('error', done).create({age: 5}, cb); + } + ], function(err) { + if (err) return done(err); + var query = connection.createSubscribeQuery('dogs', matchAllDbQuery, null, function(err) { if (err) return done(err); - var query = connection.createSubscribeQuery('dogs', matchAllDbQuery, null, function(err) { - if (err) return done(err); - connection.get('dogs', 'taco').on('error', done).create({age: 2}); - }); - query.on('error', done); - query.on('insert', function(docs, index) { - expect(util.pluck(docs, 'id')).eql(['taco']); - expect(util.pluck(docs, 'data')).eql([{age: 2}]); - expect(query.results[index]).equal(docs[0]); - var results = util.sortById(query.results); - expect(util.pluck(results, 'id')).eql(['fido', 'spot', 'taco']); - expect(util.pluck(results, 'data')).eql([{age: 3}, {age: 5}, {age: 2}]); - done(); - }); + connection.get('dogs', 'taco').on('error', done).create({age: 2}); }); - }); - - it('pollDebounce option reduces subsequent poll interval', function(done) { - var connection = this.backend.connect(); - this.backend.db.canPollDoc = function() { - return false; - }; - var query = connection.createSubscribeQuery('items', this.matchAllDbQuery, {pollDebounce: 1000}); query.on('error', done); - var batchSizes = []; - var total = 0; + query.on('insert', function(docs, index) { + expect(util.pluck(docs, 'id')).eql(['taco']); + expect(util.pluck(docs, 'data')).eql([{age: 2}]); + expect(query.results[index]).equal(docs[0]); + var results = util.sortById(query.results); + expect(util.pluck(results, 'id')).eql(['fido', 'spot', 'taco']); + expect(util.pluck(results, 'data')).eql([{age: 3}, {age: 5}, {age: 2}]); + done(); + }); + }); + }); - query.on('insert', function(docs) { - batchSizes.push(docs.length); - total += docs.length; - if (total === 1) { + it('pollDebounce option reduces subsequent poll interval', function(done) { + var clock = sinon.useFakeTimers(); + var connection = this.backend.connect(); + this.backend.db.canPollDoc = function() { + return false; + }; + var query = connection.createSubscribeQuery('items', this.matchAllDbQuery, {pollDebounce: 2000}); + query.on('error', done); + var batchSizes = []; + var total = 0; + + query.on('insert', function(docs) { + batchSizes.push(docs.length); + total += docs.length; + + if (total === 1) { // first write received by client. we're debouncing. create 9 // more documents. - for (var i = 1; i < 10; i++) { - connection.get('items', i.toString()).on('error', done).create({}); - } + var counter = 0; + for (var i = 1; i < 10; i++) { + connection.get('items', i.toString()).on('error', done).create({}, function(err) { + if (err) return done(err); + counter++; + if (counter === 9) clock.tickAsync(10000); + }); } - if (total === 10) { + } + if (total === 10) { // first document is its own batch; then subsequent creates - // are debounced until after all other 9 docs are created - expect(batchSizes).eql([1, 9]); - done(); - } - }); - - // create an initial document. this will lead to the 'insert' - // event firing the first time, while sharedb is definitely - // debouncing - connection.get('items', '0').on('error', done).create({}); + // are debounced and batched + expect(batchSizes[0]).eql(1); + batchSizes.shift(); + var sum = batchSizes.reduce(function(sum, batchSize) { + return sum + batchSize; + }, 0); + expect(batchSizes.length).to.lessThan(9); + expect(sum).eql(9); + done(); + } }); - it('db.pollDebounce option reduces subsequent poll interval', function(done) { - var connection = this.backend.connect(); - this.backend.db.canPollDoc = function() { - return false; - }; - this.backend.db.pollDebounce = 1000; - var query = connection.createSubscribeQuery('items', this.matchAllDbQuery); - query.on('error', done); - var batchSizes = []; - var total = 0; + // create an initial document. this will lead to the 'insert' + // event firing the first time, while sharedb is definitely + // debouncing + connection.get('items', '0').on('error', done).create({}, function() { + clock.tickAsync(3000); + }); + }); - query.on('insert', function(docs) { - batchSizes.push(docs.length); - total += docs.length; - if (total === 1) { + it('db.pollDebounce option reduces subsequent poll interval', function(done) { + var clock = sinon.useFakeTimers(); + var connection = this.backend.connect(); + this.backend.db.canPollDoc = function() { + return false; + }; + this.backend.db.pollDebounce = 2000; + var query = connection.createSubscribeQuery('items', this.matchAllDbQuery); + query.on('error', done); + var batchSizes = []; + var total = 0; + + query.on('insert', function(docs) { + batchSizes.push(docs.length); + total += docs.length; + + if (total === 1) { // first write received by client. we're debouncing. create 9 // more documents. - for (var i = 1; i < 10; i++) { - connection.get('items', i.toString()).on('error', done).create({}); - } + var counter = 0; + for (var i = 1; i < 10; i++) { + connection.get('items', i.toString()).on('error', done).create({}, function(err) { + if (err) return done(err); + counter++; + if (counter === 9) clock.tickAsync(10000); + }); } - if (total === 10) { + } + if (total === 10) { // first document is its own batch; then subsequent creates - // are debounced until after all other 9 docs are created - expect(batchSizes).eql([1, 9]); - done(); - } - }); + // are debounced and batched + expect(batchSizes[0]).eql(1); + batchSizes.shift(); + var sum = batchSizes.reduce(function(sum, batchSize) { + return sum + batchSize; + }, 0); + expect(batchSizes.length).to.lessThan(9); + expect(sum).eql(9); + done(); + } + }); - // create an initial document. this will lead to the 'insert' - // event firing the first time, while sharedb is definitely - // debouncing - connection.get('items', '0').on('error', done).create({}); + // create an initial document. this will lead to the 'insert' + // event firing the first time, while sharedb is definitely + // debouncing + connection.get('items', '0').on('error', done).create({}, function() { + clock.tickAsync(3000); }); + }); - it('pollInterval updates a subscribed query after an unpublished create', function(done) { - var connection = this.backend.connect(); - this.backend.suppressPublish = true; - var query = connection.createSubscribeQuery('dogs', this.matchAllDbQuery, {pollInterval: 50}, function(err) { + it('pollInterval updates a subscribed query after an unpublished create', function(done) { + var clock = sinon.useFakeTimers(); + var connection = this.backend.connect(); + this.backend.suppressPublish = true; + var query = connection.createSubscribeQuery( + 'dogs', + this.matchAllDbQuery, + {pollDebounce: 0, pollInterval: 50}, + function(err) { if (err) return done(err); - connection.get('dogs', 'fido').on('error', done).create({}); - }); - query.on('error', done); - query.on('insert', function(docs) { - expect(util.pluck(docs, 'id')).eql(['fido']); - done(); + connection.get('dogs', 'fido').on('error', done).create({}, function() { + clock.tickAsync(51); + }); + } + ); + query.on('error', done); + query.on('insert', function(docs) { + expect(util.pluck(docs, 'id')).eql(['fido']); + done(); + }); + }); + + it('db.pollInterval updates a subscribed query after an unpublished create', function(done) { + var clock = sinon.useFakeTimers(); + var connection = this.backend.connect(); + this.backend.suppressPublish = true; + this.backend.db.pollDebounce = 0; + this.backend.db.pollInterval = 50; + var query = connection.createSubscribeQuery('dogs', this.matchAllDbQuery, null, function(err) { + if (err) return done(err); + connection.get('dogs', 'fido').on('error', done).create({}, function() { + clock.tickAsync(51); }); }); + query.on('error', done); + query.on('insert', function(docs) { + expect(util.pluck(docs, 'id')).eql(['fido']); + done(); + }); + }); - it('db.pollInterval updates a subscribed query after an unpublished create', function(done) { - var connection = this.backend.connect(); - this.backend.suppressPublish = true; - this.backend.db.pollInterval = 50; - var query = connection.createSubscribeQuery('dogs', this.matchAllDbQuery, null, function(err) { - if (err) return done(err); - connection.get('dogs', 'fido').on('error', done).create({}); + it('pollInterval captures additional unpublished creates', function(done) { + var clock = sinon.useFakeTimers(); + var connection = this.backend.connect(); + this.backend.suppressPublish = true; + var count = 0; + + var query = connection.createSubscribeQuery('dogs', this.matchAllDbQuery, {pollInterval: 1000}, function(err) { + if (err) return done(err); + var doc = connection.get('dogs', count.toString()).on('error', done); + doc.create({}, function(e) { + if (e) return done(e); + clock.tickAsync(2000); }); - query.on('error', done); - query.on('insert', function(docs) { - expect(util.pluck(docs, 'id')).eql(['fido']); - done(); + }); + query.on('error', done); + query.on('insert', function() { + count++; + if (count === 3) return done(); + var doc = connection.get('dogs', count.toString()).on('error', done); + doc.create({}, function(e) { + if (e) return done(e); + clock.tickAsync(10000); }); }); + clock.tickAsync(1); + }); - it('pollInterval captures additional unpublished creates', function(done) { - var connection = this.backend.connect(); - this.backend.suppressPublish = true; - var count = 0; - var query = connection.createSubscribeQuery('dogs', this.matchAllDbQuery, {pollInterval: 50}, function(err) { - if (err) return done(err); - connection.get('dogs', count.toString()).on('error', done).create({}); + it('query extra is returned to client', function(done) { + var connection = this.backend.connect(); + this.backend.db.query = function(collection, query, fields, options, callback) { + process.nextTick(function() { + callback(null, [], {colors: ['brown', 'gold']}); }); - query.on('error', done); - query.on('insert', function() { - count++; - if (count === 3) return done(); - connection.get('dogs', count.toString()).on('error', done).create({}); + }; + var query = connection.createSubscribeQuery('dogs', this.matchAllDbQuery, null, function(err, results, extra) { + if (err) return done(err); + expect(results).eql([]); + expect(extra).eql({colors: ['brown', 'gold']}); + expect(query.extra).eql({colors: ['brown', 'gold']}); + done(); + }); + query.on('error', done); + }); + + it('query extra is updated on change', function(done) { + var connection = this.backend.connect(); + this.backend.db.query = function(collection, query, fields, options, callback) { + process.nextTick(function() { + callback(null, [], 1); }); + }; + this.backend.db.queryPoll = function(collection, query, options, callback) { + process.nextTick(function() { + callback(null, [], 2); + }); + }; + this.backend.db.canPollDoc = function() { + return false; + }; + var query = connection.createSubscribeQuery('dogs', this.matchAllDbQuery, null, function(err, results, extra) { + if (err) return done(err); + expect(extra).eql(1); + expect(query.extra).eql(1); + }); + query.on('error', done); + query.on('extra', function(extra) { + expect(extra).eql(2); + expect(query.extra).eql(2); + done(); }); + connection.get('dogs', 'fido').on('error', done).create({age: 3}); + }); - it('query extra is returned to client', function(done) { - var connection = this.backend.connect(); - this.backend.db.query = function(collection, query, fields, options, callback) { - process.nextTick(function() { - callback(null, [], {colors: ['brown', 'gold']}); - }); + describe('passing agent.custom to the DB adapter', function() { + var connection; + var expectedArg = { + agentCustom: {foo: 'bar'} + }; + beforeEach('set up', function() { + connection = this.backend.connect(); + connection.agent.custom = { + foo: 'bar' }; - var query = connection.createSubscribeQuery('dogs', this.matchAllDbQuery, null, function(err, results, extra) { - if (err) return done(err); - expect(results).eql([]); - expect(extra).eql({colors: ['brown', 'gold']}); - expect(query.extra).eql({colors: ['brown', 'gold']}); + }); + + it('sends agentCustom to the db\'s getSnapshot call', function(done) { + var query = connection.createSubscribeQuery('dogs', this.matchAllDbQuery); + var getSnapshotSpy = sinon.spy(this.backend.db, 'getSnapshot'); + + query.on('insert', function() { + // The first call to getSnapshot is when the document is created + // The seconds call is when the event is triggered, and is the one we are testing here + expect(getSnapshotSpy.callCount).to.equal(2); + expect(getSnapshotSpy.getCall(1).args[3]).to.deep.equal(expectedArg); done(); }); - query.on('error', done); + connection.get('dogs', 'fido').create({age: 3}); }); - it('query extra is updated on change', function(done) { - var connection = this.backend.connect(); - this.backend.db.query = function(collection, query, fields, options, callback) { - process.nextTick(function() { - callback(null, [], 1); - }); - }; - this.backend.db.queryPoll = function(collection, query, options, callback) { - process.nextTick(function() { - callback(null, [], 2); - }); - }; + it('sends agentCustom to the db\'s getSnapshotBulk call', function(done) { + // Ensures that getSnapshotBulk is called, instead of getSnapshot this.backend.db.canPollDoc = function() { return false; }; - var query = connection.createSubscribeQuery('dogs', this.matchAllDbQuery, null, function(err, results, extra) { - if (err) return done(err); - expect(extra).eql(1); - expect(query.extra).eql(1); - }); - query.on('error', done); - query.on('extra', function(extra) { - expect(extra).eql(2); - expect(query.extra).eql(2); + + var query = connection.createSubscribeQuery('dogs', this.matchAllDbQuery); + var getSnapshotBulkSpy = sinon.spy(this.backend.db, 'getSnapshotBulk'); + + query.on('insert', function() { + expect(getSnapshotBulkSpy.callCount).to.equal(1); + expect(getSnapshotBulkSpy.getCall(0).args[3]).to.deep.equal(expectedArg); done(); }); - connection.get('dogs', 'fido').on('error', done).create({age: 3}); + connection.get('dogs', 'fido').create({age: 3}); }); + }); - describe('passing agent.custom to the DB adapter', function() { - var connection; - var expectedArg = { - agentCustom: {foo: 'bar'} - }; - beforeEach('set up', function() { - connection = this.backend.connect(); - connection.agent.custom = { - foo: 'bar' - }; - }); - - it('sends agentCustom to the db\'s getSnapshot call', function(done) { - var query = connection.createSubscribeQuery('dogs', this.matchAllDbQuery); - var getSnapshotSpy = sinon.spy(this.backend.db, 'getSnapshot'); - - query.on('insert', function() { - // The first call to getSnapshot is when the document is created - // The seconds call is when the event is triggered, and is the one we are testing here - expect(getSnapshotSpy.callCount).to.equal(2); - expect(getSnapshotSpy.getCall(1).args[3]).to.deep.equal(expectedArg); - done(); - }); - connection.get('dogs', 'fido').create({age: 3}); + it('changing a filtered property removes from a subscribed query', function(done) { + var connection = this.backend.connect(); + async.parallel([ + function(cb) { + connection.get('dogs', 'fido').on('error', done).create({age: 3}, cb); + }, + function(cb) { + connection.get('dogs', 'spot').on('error', done).create({age: 3}, cb); + } + ], function(err) { + if (err) return done(err); + var dbQuery = getQuery({query: {age: 3}}); + var query = connection.createSubscribeQuery('dogs', dbQuery, null, function(err, results) { + if (err) return done(err); + var sorted = util.sortById(results); + expect(util.pluck(sorted, 'id')).eql(['fido', 'spot']); + expect(util.pluck(sorted, 'data')).eql([{age: 3}, {age: 3}]); + connection.get('dogs', 'fido').submitOp({p: ['age'], na: 2}); }); - - it('sends agentCustom to the db\'s getSnapshotBulk call', function(done) { - // Ensures that getSnapshotBulk is called, instead of getSnapshot - this.backend.db.canPollDoc = function() { - return false; - }; - - var query = connection.createSubscribeQuery('dogs', this.matchAllDbQuery); - var getSnapshotBulkSpy = sinon.spy(this.backend.db, 'getSnapshotBulk'); - - query.on('insert', function() { - expect(getSnapshotBulkSpy.callCount).to.equal(1); - expect(getSnapshotBulkSpy.getCall(0).args[3]).to.deep.equal(expectedArg); - done(); - }); - connection.get('dogs', 'fido').create({age: 3}); + query.on('error', done); + query.on('remove', function(docs, index) { + expect(util.pluck(docs, 'id')).eql(['fido']); + expect(util.pluck(docs, 'data')).eql([{age: 5}]); + expect(index).a('number'); + var results = util.sortById(query.results); + expect(util.pluck(results, 'id')).eql(['spot']); + expect(util.pluck(results, 'data')).eql([{age: 3}]); + done(); }); }); + }); - it('changing a filtered property removes from a subscribed query', function(done) { - var connection = this.backend.connect(); - async.parallel([ - function(cb) { - connection.get('dogs', 'fido').on('error', done).create({age: 3}, cb); - }, - function(cb) { - connection.get('dogs', 'spot').on('error', done).create({age: 3}, cb); - } - ], function(err) { + it('changing a filtered property inserts to a subscribed query', function(done) { + var connection = this.backend.connect(); + async.parallel([ + function(cb) { + connection.get('dogs', 'fido').on('error', done).create({age: 3}, cb); + }, + function(cb) { + connection.get('dogs', 'spot').on('error', done).create({age: 5}, cb); + } + ], function(err) { + if (err) return done(err); + var dbQuery = getQuery({query: {age: 3}}); + var query = connection.createSubscribeQuery('dogs', dbQuery, null, function(err, results) { if (err) return done(err); - var dbQuery = getQuery({query: {age: 3}}); - var query = connection.createSubscribeQuery('dogs', dbQuery, null, function(err, results) { - if (err) return done(err); - var sorted = util.sortById(results); - expect(util.pluck(sorted, 'id')).eql(['fido', 'spot']); - expect(util.pluck(sorted, 'data')).eql([{age: 3}, {age: 3}]); - connection.get('dogs', 'fido').submitOp({p: ['age'], na: 2}); - }); - query.on('error', done); - query.on('remove', function(docs, index) { - expect(util.pluck(docs, 'id')).eql(['fido']); - expect(util.pluck(docs, 'data')).eql([{age: 5}]); - expect(index).a('number'); - var results = util.sortById(query.results); - expect(util.pluck(results, 'id')).eql(['spot']); - expect(util.pluck(results, 'data')).eql([{age: 3}]); - done(); - }); + var sorted = util.sortById(results); + expect(util.pluck(sorted, 'id')).eql(['fido']); + expect(util.pluck(sorted, 'data')).eql([{age: 3}]); + connection.get('dogs', 'spot').submitOp({p: ['age'], na: -2}); + }); + query.on('error', done); + query.on('insert', function(docs, index) { + expect(util.pluck(docs, 'id')).eql(['spot']); + expect(util.pluck(docs, 'data')).eql([{age: 3}]); + expect(index).a('number'); + var results = util.sortById(query.results); + expect(util.pluck(results, 'id')).eql(['fido', 'spot']); + expect(util.pluck(results, 'data')).eql([{age: 3}, {age: 3}]); + done(); }); }); + }); - it('changing a filtered property inserts to a subscribed query', function(done) { - var connection = this.backend.connect(); - async.parallel([ - function(cb) { - connection.get('dogs', 'fido').on('error', done).create({age: 3}, cb); - }, - function(cb) { - connection.get('dogs', 'spot').on('error', done).create({age: 5}, cb); - } - ], function(err) { - if (err) return done(err); - var dbQuery = getQuery({query: {age: 3}}); - var query = connection.createSubscribeQuery('dogs', dbQuery, null, function(err, results) { + it('changing a sorted property moves in a subscribed query', function(done) { + var connection = this.backend.connect(); + + async.parallel([ + function(cb) { + connection.get('dogs', 'fido').on('error', done).create({age: 3}, cb); + }, + function(cb) { + connection.get('dogs', 'spot').on('error', done).create({age: 5}, cb); + } + ], function(err) { + if (err) return done(err); + var dbQuery = getQuery({query: {}, sort: [['age', 1]]}); + var query = connection.createSubscribeQuery( + 'dogs', + dbQuery, + null, + function(err, results) { if (err) return done(err); - var sorted = util.sortById(results); - expect(util.pluck(sorted, 'id')).eql(['fido']); - expect(util.pluck(sorted, 'data')).eql([{age: 3}]); - connection.get('dogs', 'spot').submitOp({p: ['age'], na: -2}); - }); - query.on('error', done); - query.on('insert', function(docs, index) { - expect(util.pluck(docs, 'id')).eql(['spot']); - expect(util.pluck(docs, 'data')).eql([{age: 3}]); - expect(index).a('number'); - var results = util.sortById(query.results); expect(util.pluck(results, 'id')).eql(['fido', 'spot']); - expect(util.pluck(results, 'data')).eql([{age: 3}, {age: 3}]); - done(); + expect(util.pluck(results, 'data')).eql([{age: 3}, {age: 5}]); + connection.get('dogs', 'spot').submitOp({p: ['age'], na: -3}); }); + query.on('error', done); + query.on('move', function(docs, from, to) { + expect(docs.length).eql(1); + expect(from).a('number'); + expect(to).a('number'); + expect(util.pluck(query.results, 'id')).eql(['spot', 'fido']); + expect(util.pluck(query.results, 'data')).eql([{age: 2}, {age: 3}]); + done(); }); }); + }); - it('changing a sorted property moves in a subscribed query', function(done) { - var connection = this.backend.connect(); - - async.parallel([ - function(cb) { - connection.get('dogs', 'fido').on('error', done).create({age: 3}, cb); - }, - function(cb) { - connection.get('dogs', 'spot').on('error', done).create({age: 5}, cb); - } - ], function(err) { - if (err) return done(err); - var dbQuery = getQuery({query: {}, sort: [['age', 1]]}); - var query = connection.createSubscribeQuery( - 'dogs', - dbQuery, - null, - function(err, results) { - if (err) return done(err); - expect(util.pluck(results, 'id')).eql(['fido', 'spot']); - expect(util.pluck(results, 'data')).eql([{age: 3}, {age: 5}]); - connection.get('dogs', 'spot').submitOp({p: ['age'], na: -3}); - }); - query.on('error', done); - query.on('move', function(docs, from, to) { - expect(docs.length).eql(1); - expect(from).a('number'); - expect(to).a('number'); - expect(util.pluck(query.results, 'id')).eql(['spot', 'fido']); - expect(util.pluck(query.results, 'data')).eql([{age: 2}, {age: 3}]); - done(); - }); - }); + it('returns pubSub error if fails to subscribe to channel', function(done) { + sinon.stub(this.backend.pubsub, 'subscribe').callsFake(function(_channel, callback) { + callback(new Error('TEST_ERROR')); }); + var connection = this.backend.connect(); + connection.createSubscribeQuery( + 'dogs', + this.matchAllDbQuery, + {pollInterval: 0, pollDebounce: 0}, + function(err) { + if (err) { + expect(err.message).to.be.equal('TEST_ERROR'); + return done(); + } else { + done('Should call callback with pubsub subscribe error'); + } + } + ); }); -}; +} diff --git a/test/logger.js b/test/logger.js index 72f75e90..193f6be5 100644 --- a/test/logger.js +++ b/test/logger.js @@ -8,10 +8,6 @@ describe('Logger', function() { sinon.stub(console, 'warn'); }); - afterEach(function() { - sinon.restore(); - }); - it('logs to console by default', function() { var logger = new Logger(); logger.warn('warning'); diff --git a/test/setup.js b/test/setup.js index d8699ef3..6c1c91f9 100644 --- a/test/setup.js +++ b/test/setup.js @@ -1,4 +1,5 @@ var logger = require('../lib/logger'); +var sinon = require('sinon'); if (process.env.LOGGING !== 'true') { // Silence the logger for tests by setting all its methods to no-ops @@ -8,3 +9,7 @@ if (process.env.LOGGING !== 'true') { error: function() {} }); } + +afterEach(function() { + sinon.restore(); +});