Browse files

Make multiple observes of the same Mongo cursor share their polling c…

…ycles.

There is no longer any explicit time-based throttling of Mongo poll calls,
though we do prevent multiple instances of polling to be scheduled at once. If
this appears to be a problem, we can reintroduce throttling.

Note: this can lead to a hang if you start observing a cursor during a callback
from an identical cursor's observation. This doesn't seem to be a very realistic
use case though.
  • Loading branch information...
1 parent 41b0c28 commit 183653e381ff05d8ce7dee8a16c2c9eb0502ac9a @glasser glasser committed Nov 13, 2012
Showing with 459 additions and 95 deletions.
  1. +3 −0 packages/livedata/crossbar.js
  2. +263 −83 packages/mongo-livedata/mongo_driver.js
  3. +193 −12 packages/mongo-livedata/mongo_livedata_tests.js
View
3 packages/livedata/crossbar.js
@@ -20,6 +20,9 @@ _.extend(Meteor._InvalidationCrossbar.prototype, {
//
// XXX It should be legal to call fire() from inside a listen()
// callback?
+ //
+ // Note: the LiveResultsSet constructor assumes that a call to listen() never
+ // yields.
listen: function (trigger, callback) {
var self = this;
var id = self.next_id++;
View
346 packages/mongo-livedata/mongo_driver.js
@@ -17,6 +17,8 @@ _Mongo = function (url) {
self.collection_queue = [];
+ self._liveResultsSets = {};
+
MongoDB.connect(url, function(err, db) {
if (err)
throw err;
@@ -257,9 +259,13 @@ _Mongo.prototype._ensureIndex = function (collectionName, index, options) {
// SynchronousCursor (lazily: it doesn't contact Mongo until you call a method
// like fetch or forEach on it).
//
-// _Mongo.LiveResultsSet is the "observe handle" returned from observe and
-// _observeUnordered. It caches the results of a query and reruns it when
-// necessary.
+// ObserveHandle is the "observe handle" returned from observe and
+// _observeUnordered. It has a reference to a LiveResultsSet.
+//
+// LiveResultsSet caches the results of a query and reruns it when necessary.
+// It is hooked up to one or more ObserveHandles; a single LiveResultsSet
+// can drive multiple sets of observation callbacks if they are for the
+// same query.
var CursorDescription = function (collectionName, selector, options) {
@@ -448,127 +454,301 @@ _.extend(SynchronousCursor.prototype, {
}
});
+var nextObserveHandleId = 1;
+var ObserveHandle = function (liveResultsSet, callbacks) {
+ var self = this;
+ self._liveResultsSet = liveResultsSet;
+ self._added = callbacks.added;
+ self._changed = callbacks.changed;
+ self._removed = callbacks.removed;
+ self._moved = callbacks.moved;
+ self._observeHandleId = nextObserveHandleId++;
+};
+ObserveHandle.prototype.stop = function () {
+ var self = this;
+ self._liveResultsSet._removeObserveHandle(self);
+ self._liveResultsSet = null;
+};
+
_Mongo.prototype._observe = function (cursorDescription, ordered, callbacks) {
var self = this;
- return new _Mongo.LiveResultsSet(
- cursorDescription,
- self._createSynchronousCursor(cursorDescription),
- ordered,
- callbacks);
+ var observeKey = JSON.stringify(
+ _.extend({ordered: ordered}, cursorDescription));
+
+ var liveResultsSet;
+ var observeHandle;
+ var newlyCreated = false;
+
+ // Find a matching LiveResultsSet, or create a new one. This next block is
+ // guaranteed to not yield (and it doesn't call anything that can observe a
+ // new query), so no other calls to this function can interleave with it.
+ Meteor._noYieldsAllowed(function () {
+ if (_.has(self._liveResultsSets, observeKey)) {
+ liveResultsSet = self._liveResultsSets[observeKey];
+ } else {
+ // Create a new LiveResultsSet. It is created "locked": no polling can
+ // take place.
+ liveResultsSet = new LiveResultsSet(
+ cursorDescription,
+ self._createSynchronousCursor(cursorDescription),
+ ordered,
+ function () {
+ delete self._liveResultsSets[observeKey];
+ });
+ self._liveResultsSets[observeKey] = liveResultsSet;
+ newlyCreated = true;
+ }
+ observeHandle = new ObserveHandle(liveResultsSet, callbacks);
+ });
+
+ if (newlyCreated) {
+ // This is the first ObserveHandle on this LiveResultsSet. Add it and run
+ // the initial synchronous poll (which may yield).
+ liveResultsSet._addFirstObserveHandle(observeHandle);
+ } else {
+ // Not the first ObserveHandle. Add it to the LiveResultsSet. This call
+ // yields until we're not in the middle of a poll, and its invocation of the
+ // initial 'added' callbacks may yield as well. It blocks until the 'added'
+ // callbacks have fired.
+ liveResultsSet._addObserveHandleAndSendInitialAdds(observeHandle);
+ }
+
+ return observeHandle;
};
-_Mongo.LiveResultsSet = function (cursorDescription, synchronousCursor, ordered,
- callbacks) {
+var LiveResultsSet = function (cursorDescription, synchronousCursor, ordered,
+ stopCallback) {
var self = this;
self._cursorDescription = cursorDescription;
self._synchronousCursor = synchronousCursor;
-
self._ordered = ordered;
+ self._stopCallbacks = [stopCallback];
// previous results snapshot. on each poll cycle, diffs against
// results drives the callbacks.
self._results = ordered ? [] : {};
- // state for polling
- self._dirty = false; // do we need polling?
+ // The number of _pollMongo calls that have been added to self._taskQueue but
+ // have not started running. Used to make sure we never schedule more than one
+ // _pollMongo (other than possibly the one that is currently running). It's
+ // also used by _suspendPolling to pretend there's a poll scheduled. Usually,
+ // it's either 0 (for "no polls scheduled other than maybe one currently
+ // running") or 1 (for "a poll scheduled that isn't running yet"), but it can
+ // also be 2 if incremented by _suspendPolling.
+ self._pollsScheduledButNotStarted = 0;
+ // Number of _addObserveHandleAndSendInitialAdds tasks scheduled but not yet
+ // running. _removeObserveHandle uses this to know if it's safe to shut down
+ // this LiveResultsSet.
+ self._addHandleTasksScheduledButNotPerformed = 0;
self._pendingWrites = []; // people to notify when polling completes
- self._pollRunning = false; // is polling in progress now?
- self._pollingSuspended = false; // is polling temporarily suspended?
- // (each instance of the class needs to get a separate throttling
- // context -- we don't want to coalesce invocations of markDirty on
- // different instances!)
- self._markDirty = _.throttle(self._unthrottledMarkDirty, 50 /* ms */);
+ // Make sure to create a separately throttled function for each LiveResultsSet
+ // object.
+ self._ensurePollIsScheduled = _.throttle(
+ self._unthrottledEnsurePollIsScheduled, 50 /* ms */);
+
+ self._taskQueue = new Meteor._SynchronousQueue();
- // listen for the invalidation messages that will trigger us to poll
- // the database for changes
+ // listen for the invalidation messages that will trigger us to poll the
+ // database for changes
var keys = (cursorDescription.options.key ||
{collection: cursorDescription.collectionName});
if (!(keys instanceof Array))
keys = [keys];
- self._crossbarListeners = _.map(keys, function (key) {
- return Meteor._InvalidationCrossbar.listen(key, function (notification,
- complete) {
- // When someone does a transaction that might affect us,
- // schedule a poll of the database. If that transaction happens
- // inside of a write fence, block the fence until we've polled
- // and notified observers.
- var fence = Meteor._CurrentWriteFence.get();
- if (fence)
- self._pendingWrites.push(fence.beginWrite());
- self._markDirty();
- complete();
- });
+ _.each(keys, function (key) {
+ var listener = Meteor._InvalidationCrossbar.listen(
+ key, function (notification, complete) {
+ // When someone does a transaction that might affect us, schedule a poll
+ // of the database. If that transaction happens inside of a write fence,
+ // block the fence until we've polled and notified observers.
+ var fence = Meteor._CurrentWriteFence.get();
+ if (fence)
+ self._pendingWrites.push(fence.beginWrite());
+ self._ensurePollIsScheduled();
+ complete();
+ });
+ self._stopCallbacks.push(function () { listener.stop(); });
});
- // user callbacks
- self._callbacks = callbacks;
-
- // run the first _poll() cycle synchronously.
- self._pollRunning = true;
- self._doPoll();
- self._pollRunning = false;
+ // Map from handle ID to ObserveHandle.
+ self._observeHandles = {};
+
+ self._callbackMultiplexer = {};
+ var callbackNames = ['added', 'changed', 'removed'];
+ if (self._ordered)
+ callbackNames.push('moved');
+ _.each(callbackNames, function (callback) {
+ var handleCallback = '_' + callback;
+ self._callbackMultiplexer[callback] = function () {
+ var args = _.toArray(arguments);
+ // Because callbacks can yield and _removeObserveHandle() (ie,
+ // handle.stop()) doesn't synchronize its actions with _taskQueue,
+ // ObserveHandles can disappear from self._observeHandles during this
+ // dispatch. Thus, we save a copy of the keys of self._observeHandles
+ // before we start to iterate, and we check to see if the handle is still
+ // there each time.
+ _.each(_.keys(self._observeHandles), function (handleId) {
+ var handle = self._observeHandles[handleId];
+ if (handle && handle[handleCallback])
+ handle[handleCallback].apply(null, args);
+ });
+ };
+ });
// every once and a while, poll even if we don't think we're dirty,
// for eventual consistency with database writes from outside the
// Meteor universe
- self._refreshTimer = Meteor.setInterval(_.bind(self._markDirty, this),
- 10 * 1000 /* 10 seconds */);
+ var intervalHandle = Meteor.setInterval(
+ _.bind(self._ensurePollIsScheduled, self), 10 * 1000 /* 10 seconds */);
+ self._stopCallbacks.push(function () {
+ Meteor.clearInterval(intervalHandle);
+ });
};
-_Mongo.LiveResultsSet.prototype._unthrottledMarkDirty = function () {
- var self = this;
+_.extend(LiveResultsSet.prototype, {
+ _addFirstObserveHandle: function (handle) {
+ var self = this;
+ if (! _.isEmpty(self._observeHandles))
+ throw new Error("Not the first observe handle!");
+ if (! _.isEmpty(self._results))
+ throw new Error("Call _addFirstObserveHandle before polling!");
+
+ self._observeHandles[handle._observeHandleId] = handle;
+
+ // Run the first _poll() cycle synchronously (delivering results to the
+ // first ObserveHandle).
+ ++self._pollsScheduledButNotStarted;
+ self._taskQueue.runTask(function () {
+ self._pollMongo();
+ });
+ },
- self._dirty = true;
- if (self._pollingSuspended)
- return; // don't poll when told not to
- if (self._pollRunning)
- return; // only one instance can run at once. just tell it to re-cycle.
- self._pollRunning = true;
+ // This is always called through _.throttle.
+ _unthrottledEnsurePollIsScheduled: function () {
+ var self = this;
+ if (self._pollsScheduledButNotStarted > 0)
+ return;
+ ++self._pollsScheduledButNotStarted;
+ self._taskQueue.queueTask(function () {
+ self._pollMongo();
+ });
+ },
+
+ // test-only interface for controlling polling.
+ //
+ // _suspendPolling blocks until any currently running and scheduled polls are
+ // done, and prevents any further polls from being scheduled. (new
+ // ObserveHandles can be added and receive their initial added callbacks,
+ // though.)
+ //
+ // _resumePolling immediately polls, and allows further polls to occur.
+ _suspendPolling: function() {
+ var self = this;
+ // Pretend that there's another poll scheduled (which will prevent
+ // _ensurePollIsScheduled from queueing any more polls).
+ ++self._pollsScheduledButNotStarted;
+ // Now block until all currently running or scheduled polls are done.
+ self._taskQueue.runTask(function() {});
+
+ // Confirm that there is only one "poll" (the fake one we're pretending to
+ // have) scheduled.
+ if (self._pollsScheduledButNotStarted !== 1)
+ throw new Error("_pollsScheduledButNotStarted is " +
+ self._pollsScheduledButNotStarted);
+ },
+ _resumePolling: function() {
+ var self = this;
+ // We should be in the same state as in the end of _suspendPolling.
+ if (self._pollsScheduledButNotStarted !== 1)
+ throw new Error("_pollsScheduledButNotStarted is " +
+ self._pollsScheduledButNotStarted);
+ // Run a poll synchronously (which will counteract the
+ // ++_pollsScheduledButNotStarted from _suspendPolling).
+ self._taskQueue.runTask(function () {
+ self._pollMongo();
+ });
+ },
- Fiber(function () {
- self._dirty = false;
+ _pollMongo: function () {
+ var self = this;
+ --self._pollsScheduledButNotStarted;
+
+ // Save the list of pending writes which this round will commit.
var writesForCycle = self._pendingWrites;
self._pendingWrites = [];
- self._doPoll(); // could yield, and set self._dirty
+
+ // Get the new query results. (These calls can yield.)
+ self._synchronousCursor.rewind();
+ var newResults = self._synchronousCursor.getRawObjects(self._ordered);
+ var oldResults = self._results;
+
+ // Run diffs. (This can yield too.)
+ if (!_.isEmpty(self._observeHandles))
+ LocalCollection._diffQuery(
+ self._ordered, oldResults, newResults, self._callbackMultiplexer, true);
+
+ // Replace self._results atomically.
+ self._results = newResults;
+
+ // Mark all the writes which existed before this call as commmitted. (If new
+ // writes have shown up in the meantime, there'll already be another
+ // _pollMongo task scheduled.)
_.each(writesForCycle, function (w) {w.committed();});
+ },
- self._pollRunning = false;
- if (self._dirty || self._pendingWrites.length)
- // rerun ourselves, but through _.throttle
- self._markDirty();
- }).run();
-};
+ // Adds the observe handle to this set and sends its initial added
+ // callbacks. Meteor._SynchronousQueue guarantees that this won't interleave
+ // with a call to _pollMongo or another call to this function.
+ _addObserveHandleAndSendInitialAdds: function (handle) {
+ var self = this;
-// interface for tests to control when polling happens
-_Mongo.LiveResultsSet.prototype._suspendPolling = function() {
- this._pollingSuspended = true;
-};
-_Mongo.LiveResultsSet.prototype._resumePolling = function() {
- this._pollingSuspended = false;
- this._unthrottledMarkDirty(); // poll NOW, don't wait
-};
+ // Keep track of how many of these tasks are on the queue, so that
+ // _removeObserveHandle knows if it's safe to GC.
+ ++self._addHandleTasksScheduledButNotPerformed;
+ self._taskQueue.runTask(function () {
+ if (!self._observeHandles)
+ throw new Error("Can't add observe handle to stopped LiveResultsSet");
-_Mongo.LiveResultsSet.prototype._doPoll = function () {
- var self = this;
+ if (_.has(self._observeHandles, handle._observeHandleId))
+ throw new Error("Duplicate observe handle ID");
+ self._observeHandles[handle._observeHandleId] = handle;
+ --self._addHandleTasksScheduledButNotPerformed;
- // Get the new query results
- self._synchronousCursor.rewind();
- var new_results = self._synchronousCursor.getRawObjects(self._ordered);
- var old_results = self._results;
+ // Send initial adds.
+ _.each(self._results, function (doc, i) {
+ handle._added(LocalCollection._deepcopy(doc),
+ self._ordered ? i : undefined);
+ });
+ });
+ },
- LocalCollection._diffQuery(
- self._ordered, old_results, new_results, self._callbacks, true);
- self._results = new_results;
-};
+ // Remove an observe handle. If it was the last observe handle, call all the
+ // stop callbacks; you cannot add any more observe handles after this.
+ //
+ // This is not synchronized with polls and handle additions: this means that
+ // you can safely call it from within an observe callback.
+ _removeObserveHandle: function (handle) {
+ var self = this;
-_Mongo.LiveResultsSet.prototype.stop = function () {
- var self = this;
- _.each(self._crossbarListeners, function (l) { l.stop(); });
- Meteor.clearInterval(self._refreshTimer);
-};
+ if (!_.has(self._observeHandles, handle._observeHandleId))
+ throw new Error("Unknown observe handle ID " + handle._observeHandleId);
+ delete self._observeHandles[handle._observeHandleId];
+
+ if (_.isEmpty(self._observeHandles) &&
+ self._addHandleTasksScheduledButNotPerformed === 0) {
+ // The last observe handle was stopped; call our stop callbacks, which:
+ // - removes us from the _Mongo's _liveResultsSets map
+ // - stops the poll timer
+ // - removes us from the invalidation crossbar
+ _.each(self._stopCallbacks, function (c) { c(); });
+ // This will cause future _addObserveHandleAndSendInitialAdds calls to
+ // throw.
+ self._observeHandles = null;
+ }
+ }
+});
_.extend(Meteor, {
_Mongo: _Mongo
View
205 packages/mongo-livedata/mongo_livedata_tests.js
@@ -238,8 +238,11 @@ Tinytest.addAsync("mongo-livedata - fuzz test", function(test, onComplete) {
var max_counters = _.clone(counters);
finishObserve(function () {
+ // XXX What if there are multiple observe handles on the LiveResultsSet?
+ // There shouldn't be because the collection has a name unique to this
+ // run.
if (Meteor.isServer)
- obs._suspendPolling();
+ obs._liveResultsSet._suspendPolling();
// Do a batch of 1-10 operations
var batch_count = rnd(10) + 1;
@@ -272,7 +275,7 @@ Tinytest.addAsync("mongo-livedata - fuzz test", function(test, onComplete) {
}
}
if (Meteor.isServer)
- obs._resumePolling();
+ obs._liveResultsSet._resumePolling();
});
@@ -294,6 +297,16 @@ Tinytest.addAsync("mongo-livedata - fuzz test", function(test, onComplete) {
});
+var runInFence = function (f) {
+ if (Meteor.isClient) {
+ f();
+ } else {
+ var fence = new Meteor._WriteFence;
+ Meteor._CurrentWriteFence.withValue(fence, f);
+ fence.armAndWait();
+ }
+};
+
Tinytest.addAsync("mongo-livedata - scribbling", function (test, onComplete) {
var run = test.runId();
var coll;
@@ -303,16 +316,6 @@ Tinytest.addAsync("mongo-livedata - scribbling", function (test, onComplete) {
coll = new Meteor.Collection("livedata_test_collection_"+run);
}
- var runInFence = function (f) {
- if (Meteor.isClient) {
- f();
- } else {
- var fence = new Meteor._WriteFence;
- Meteor._CurrentWriteFence.withValue(fence, f);
- fence.armAndWait();
- }
- };
-
var numAddeds = 0;
var handle = coll.find({run: run}).observe({
added: function (o) {
@@ -333,3 +336,181 @@ Tinytest.addAsync("mongo-livedata - scribbling", function (test, onComplete) {
onComplete();
});
+
+Tinytest.addAsync("mongo-livedata - stop handle in callback", function (test, onComplete) {
+ var run = test.runId();
+ var coll;
+ if (Meteor.isClient) {
+ coll = new Meteor.Collection(null); // local, unmanaged
+ } else {
+ coll = new Meteor.Collection("stopHandleInCallback-"+run);
+ }
+
+ var output = [];
+
+ var handle = coll.find()._observeUnordered({
+ added: function (doc) {
+ output.push({added: doc._id});
+ },
+ changed: function (newDoc) {
+ output.push('changed');
+ handle.stop();
+ }
+ });
+
+ test.equal(output, []);
+
+ // Insert a document. Observe that the added callback is called.
+ var docId;
+ runInFence(function () {
+ docId = coll.insert({foo: 42});
+ });
+ test.length(output, 1);
+ test.equal(output.shift(), {added: docId});
+
+ // Update it. Observe that the changed callback is called. This should also
+ // stop the observation.
+ runInFence(function() {
+ coll.update(docId, {$set: {bar: 10}});
+ });
+ test.length(output, 1);
+ test.equal(output.shift(), 'changed');
+
+ // Update again. This shouldn't call the callback because we stopped the
+ // observation.
+ runInFence(function() {
+ coll.update(docId, {$set: {baz: 40}});
+ });
+ test.length(output, 0);
+
+ test.equal(coll.find().count(), 1);
+ test.equal(coll.findOne(docId),
+ {_id: docId, foo: 42, bar: 10, baz: 40});
+
+ onComplete();
+});
+
+// This behavior isn't great, but it beats deadlock.
+if (Meteor.isServer) {
+ Tinytest.addAsync("mongo-livedata - recursive observe throws", function (test, onComplete) {
+ var run = test.runId();
+ var coll = new Meteor.Collection("observeInCallback-"+run);
+
+ var callbackCalled = false;
+ var handle = coll.find()._observeUnordered({
+ added: function (newDoc) {
+ callbackCalled = true;
+ test.throws(function () {
+ coll.find()._observeUnordered({});
+ });
+ }
+ });
+ test.isFalse(callbackCalled);
+ // Insert a document. Observe that the added callback is called.
+ runInFence(function () {
+ coll.insert({foo: 42});
+ });
+ test.isTrue(callbackCalled);
+
+ handle.stop();
+
+ onComplete();
+ });
+
+ Tinytest.addAsync("mongo-livedata - cursor dedup", function (test, onComplete) {
+ var run = test.runId();
+ var coll = new Meteor.Collection("cursorDedup-"+run);
+
+ var observer = function () {
+ var output = [];
+ var handle = coll.find({foo: 22}).observe({
+ added: function (doc) {
+ output.push({added: doc._id});
+ },
+ changed: function (newDoc) {
+ output.push({changed: newDoc._id});
+ }
+ });
+ return {output: output, handle: handle};
+ };
+
+ // Insert a doc and start observing.
+ var docId1 = coll.insert({foo: 22});
+ var o1 = observer();
+ // Initial add.
+ test.length(o1.output, 1);
+ test.equal(o1.output.shift(), {added: docId1});
+
+ // Insert another doc (blocking until observes have fired).
+ var docId2;
+ runInFence(function () {
+ docId2 = coll.insert({foo: 22, bar: 5});
+ });
+ // Observed add.
+ test.length(o1.output, 1);
+ test.equal(o1.output.shift(), {added: docId2});
+
+ // Second identical observe.
+ var o2 = observer();
+ // Initial adds.
+ test.length(o2.output, 2);
+ test.include([docId1, docId2], o2.output[0].added);
+ test.include([docId1, docId2], o2.output[1].added);
+ test.notEqual(o2.output[0].added, o2.output[1].added);
+ o2.output.length = 0;
+ // Original observe not affected.
+ test.length(o1.output, 0);
+
+ // White-box test: both observes should have the same underlying
+ // LiveResultsSet.
+ var liveResultsSet = o1.handle._liveResultsSet;
+ test.isTrue(liveResultsSet);
+ test.isTrue(liveResultsSet === o2.handle._liveResultsSet);
+
+ // Update. Both observes fire.
+ runInFence(function () {
+ coll.update(docId1, {$set: {x: 'y'}});
+ });
+ test.length(o1.output, 1);
+ test.length(o2.output, 1);
+ test.equal(o1.output.shift(), {changed: docId1});
+ test.equal(o2.output.shift(), {changed: docId1});
+
+ // Stop first handle. Second handle still around.
+ o1.handle.stop();
+ test.length(o1.output, 0);
+ test.length(o2.output, 0);
+
+ // Another update. Just the second handle should fire.
+ runInFence(function () {
+ coll.update(docId2, {$set: {z: 'y'}});
+ });
+ test.length(o1.output, 0);
+ test.length(o2.output, 1);
+ test.equal(o2.output.shift(), {changed: docId2});
+
+ // Stop second handle. Nothing should happen, but the liveResultsSet should
+ // be stopped.
+ o2.handle.stop();
+ test.length(o1.output, 0);
+ test.length(o2.output, 0);
+ // White-box: liveResultsSet has nulled its _observeHandles so you can't
+ // accidentally join to it.
+ test.isNull(liveResultsSet._observeHandles);
+
+ // Start yet another handle on the same query.
+ var o3 = observer();
+ // Initial adds.
+ test.length(o3.output, 2);
+ test.include([docId1, docId2], o3.output[0].added);
+ test.include([docId1, docId2], o3.output[1].added);
+ test.notEqual(o3.output[0].added, o3.output[1].added);
+ // Old observers not called.
+ test.length(o1.output, 0);
+ test.length(o2.output, 0);
+ // White-box: Different LiveResultsSet.
+ test.isTrue(liveResultsSet !== o3.handle._liveResultsSet);
+ o3.handle.stop();
+ onComplete();
+ });
+}

0 comments on commit 183653e

Please sign in to comment.