Skip to content
Browse files

Rework livedata (DDP) subscriptions.

Publish now comes in three flavors:

* Bare metal API: Meteor.publish(name, func).

  Server will call func(sub, params) each time a client subscribes to
  "name" (with supplied "params"), or if "name" is null, each time a new
  client connects (autopublish).  "sub" is a Subscription object, which
  supplies sub.set(), sub.unset(), sub.satisfies(), and sub.flush()
  methods to emit DDP data messages.  func() should register a cleanup
  function with sub.onStop(), which will be called when client unsubs.

  To react to database changes, use Collection.observe().  Publish
  functions that are not database backed may use some other mechanism
  (setInterval?) to schedule calls to sub.set().

* publishCursor API:

  If func() *returns* a Cursor, server will automatically publish all
  results from that cursor as the collection with the same name as the
  cursor's underlying Mongo collection.  For example:

  Meteor.publish('top10', function (sub, params) {
    return Players.find({}, {sort: {score: -1}, limit: 10});
  });

  will define a 'top10' publish that is always the top 10 scoring
  players, published to the 'players' collection on each subscribing
  client.

* autopublish

  When the autopublish package is loaded, all Collections defined on the
  server will automatically be published, in their entirety, to each
  connected client.  Clients need not call Meteor.subscribe().  Calls to
  publish() will emit a warning message, since they are superflous.  To
  disable autopublish, run "meteor remove autopublish".
  • Loading branch information...
1 parent 5aeff80 commit fa53aa6d460d5936e857f0322b71c7a3b4016279 @debergalis debergalis committed Feb 17, 2012
View
412 packages/livedata/livedata_server.js
@@ -113,22 +113,25 @@ _.extend(Meteor._ServerMethodInvocation.prototype, {
Meteor._LivedataServer = function () {
var self = this;
- self.publishes = {};
- self.universal_publishes = []; // publishes with no name
- self._hack_collections = {}; // XXX hack. name => Collection
+ self.publish_handlers = {};
+ self.universal_publish_handlers = [];
+
self.method_handlers = {};
- self.stream_server = new Meteor._StreamServer;
+
self.on_autopublish = []; // array of func if AP disabled, null if enabled
self.warned_about_autopublish = false;
+ self.stream_server = new Meteor._StreamServer;
+
self.stream_server.register(function (socket) {
socket.meteor = {};
- socket.meteor.subs = [];
- socket.meteor.cache = {};
- socket.meteor.pending_method_ids = [];
socket.meteor.methods_blocked = false;
socket.meteor.method_queue = [];
+ // Sub objects for active subscriptions
+ socket.meteor.named_subs = {};
+ socket.meteor.universal_subs = [];
+
socket.on('data', function (raw_msg) {
try {
try {
@@ -159,160 +162,189 @@ Meteor._LivedataServer = function () {
}
});
-
- // 5/sec updates tops, once every 10sec min.
- socket.meteor.throttled_poll = _.throttle(function () {
- self._poll_subscriptions(socket);
- }, 50); // XXX only 50ms! for great speed. might want higher in prod.
- socket.meteor.timer = setInterval(socket.meteor.throttled_poll, 10000);
+ socket.on('close', function () {
+ self._stopAllSubscriptions(socket);
+ });
});
};
-_.extend(Meteor._LivedataServer.prototype, {
- _poll_subscriptions: function (socket) {
- var self = this;
+// ctor for a sub handle: the input to each publish function
+Meteor._LivedataServer.Subscription = function (socket, sub_id) {
+ // transport. provides send(obj).
+ this.socket = socket;
- Fiber(function () {
- // holds a clean copy of client's data. channel.send will
- // populate new_cache, then we compute the difference with the old
- // cache, send the delta.
- var new_cache = {};
-
- // setup a channel object
- var channel = {
- // this gets called by publish lambda with each object. send
- // populates the server's copy of what the client has.
- send: function(collection_name, obj) {
- if (!(obj instanceof Array))
- obj = [obj];
-
- _.each(obj, function (o) {
- if (!o._id) {
- console.log("WARNING trying to send object without _id"); // XXX
- return;
- }
-
- // XXX -- '|' not allowed in collection name?
- var key = collection_name + "|" + o._id;
-
- // insert or extend new_cache with 'o' object
- new_cache[key] = _.extend(new_cache[key] || {}, o);
- });
- }
- };
+ // my subscription ID (generated by client, null for universal subs).
+ this.sub_id = sub_id;
- // actually run the subscriptions.
+ // unsent DDP messages.
+ this.pending_updates = {};
+ this.pending_complete = false;
- _.each(self.universal_publishes, function (pub) {
- pub(channel, {});
- });
+ // stop callbacks to g/c this sub. called w/ zero arguments.
+ this.stop_callbacks = [];
+};
- _.each(socket.meteor.subs, function (sub) {
- var pub = self.publishes[sub.name];
- if (!pub) {
- // XXX error unknown publish
- console.log("ERROR UNKNOWN PUBLISH " + sub.name);
- return;
- }
+Meteor._LivedataServer.Subscription.prototype.stop = function () {
+ for (var i = 0; i < this.stop_callbacks.length; i++)
+ (this.stop_callbacks[i])();
+};
- pub(channel, sub.params);
- });
+Meteor._LivedataServer.Subscription.prototype.onStop = function (callback) {
+ this.stop_callbacks.push(callback);
+};
+Meteor._LivedataServer.Subscription.prototype._ensureMsg = function (collection_name, id) {
+ var self = this;
+ if (!self.pending_updates[collection_name])
+ self.pending_updates[collection_name] = {};
+ if (!self.pending_updates[collection_name][id])
+ self.pending_updates[collection_name][id] = {msg: 'data', collection: collection_name, id: id};
+ return self.pending_updates[collection_name][id];
+};
- // emit deltas for each item in the new cache (any object
- // created in this poll cycle).
- _.each(new_cache, function (new_obj, key) {
- var old_obj = socket.meteor.cache[key];
-
- // XXX parsing from the string is so ugly.
- var parts = key.split("|");
- if (!parts || parts.length !== 2) return;
- var collection_name = parts[0];
- var id = parts[1];
-
- var msg = {msg: 'data', collection: collection_name, id: id};
-
- if (!old_obj) {
- // New object. Send an insert down to the client.
- var obj_to_send = _.extend({}, new_obj);
- delete obj_to_send._id;
- if (_.keys(obj_to_send).length) {
- msg.set = obj_to_send;
- socket.send(JSON.stringify(msg));
- }
-
- } else {
- // Old object. Check for updates and send changes attributes
- // to the client.
- var set = {};
- var unset = [];
-
- _.each(new_obj, function (v, k) {
- // Not canonical order comparison or anything, but close
- // enough I hope. We may send some spurious updates?
- if (JSON.stringify(v) !== JSON.stringify(old_obj[k]))
- set[k] = v;
- });
-
- unset = _.difference(_.keys(old_obj), _.keys(new_obj));
-
- if (_.keys(set).length > 0)
- msg.set = set;
- if (unset.length > 0)
- msg.unset = unset;
-
- if (msg.set || msg.unset)
- socket.send(JSON.stringify(msg));
- }
- });
+Meteor._LivedataServer.Subscription.prototype.set = function (collection_name, id, dictionary) {
+ var self = this;
+ var obj = _.extend({}, dictionary);
+ delete obj._id;
+ var msg = self._ensureMsg(collection_name, id);
+ msg.set = _.extend(msg.set || {}, obj);
+
+ if (msg.unset) {
+ msg.unset = _.difference(msg.unset, _.keys(msg.set));
+ if (!msg.unset.length)
+ delete msg.unset;
+ }
+};
- // emit deltas for items in the old cache that no longer exist.
- var removed_keys = _.difference(_.keys(socket.meteor.cache),
- _.keys(new_cache));
- _.each(removed_keys, function (key) {
- // XXX parsing from the string is so ugly.
- var parts = key.split("|");
- if (!parts || parts.length !== 2) return;
- var collection_name = parts[0];
- var id = parts[1];
-
- var msg = {msg: 'data', collection: collection_name, id: id};
- msg.unset = _.without(_.keys(socket.meteor.cache[key]), '_id');
- socket.send(JSON.stringify(msg));
- });
+Meteor._LivedataServer.Subscription.prototype.unset = function (collection_name, id, keys) {
+ var self = this;
+ keys = _.without(keys, '_id');
+ var msg = self._ensureMsg(collection_name, id);
+ msg.unset = _.union(msg.unset || [], keys);
+
+ if (msg.set) {
+ for (var key in keys)
+ delete msg.set[key];
+ if (!_.keys(msg.set))
+ delete msg.set;
+ }
+};
- // promote new_cache to old_cache
- socket.meteor.cache = new_cache;
+Meteor._LivedataServer.Subscription.prototype.complete = function () {
+ var self = this;
- // inform the client that the subscription is ready to go
- var subs_ready = [];
- _.each(socket.meteor.subs, function (sub) {
- if (!sub.ready) {
- subs_ready.push(sub._id);
- sub.ready = true;
- }
+ // universal subs (sub_id is null) can't signal completion. it's
+ // not an error, since the same handler (eg publishQuery) might be used
+ // to implement both named and universal subs.
+
+ if (self.sub_id)
+ self.pending_complete = true;
+};
+
+Meteor._LivedataServer.Subscription.prototype.flush = function () {
+ var self = this;
+ var msg;
+
+ for (var name in self.pending_updates)
+ for (var id in self.pending_updates[name])
+ self.socket.send(JSON.stringify(self.pending_updates[name][id]));
+
+ if (self.pending_complete)
+ self.socket.send(JSON.stringify({msg: 'data', subs: [self.sub_id]}));
+
+ self.pending_updates = {};
+ self.pending_complete = false;
+};
+
+Meteor._LivedataServer.Subscription.prototype._publishCursor = function (cursor, name) {
+ var self = this;
+ var collection = name || cursor.collection_name;
+
+ var observe_handle = cursor.observe({
+ added: function (obj) {
+ self.set(collection, obj._id, obj);
+ self.flush();
+ },
+ changed: function (obj, old_idx, old_obj) {
+ var set = {};
+ _.each(obj, function (v, k) {
+ if (!_.isEqual(v, old_obj[k]))
+ set[k] = v;
});
+ self.set(collection, obj._id, set);
+ var dead_keys = _.difference(_.keys(old_obj), _.keys(obj));
+ self.unset(collection, obj._id, dead_keys);
+ self.flush();
+ },
+ removed: function (id, old_idx, old_obj) {
+ self.unset(collection, id, _.keys(old_obj));
+ self.flush();
+ }
+ });
- if (subs_ready.length || socket.meteor.pending_method_ids.length) {
- var msg = {msg: 'data'};
- if (subs_ready.length)
- msg.subs = subs_ready;
- if (socket.meteor.pending_method_ids.length)
- msg.methods = socket.meteor.pending_method_ids;
- socket.send(JSON.stringify(msg));
- }
- socket.meteor.pending_method_ids = [];
+ // observe only returns after the initial added callbacks have
+ // run. mark subscription as completed.
+ self.complete();
+ self.flush();
- }).run();
+ // register stop callback (expects lambda w/ no args).
+ self.onStop(_.bind(observe_handle.stop, observe_handle));
+};
+
+_.extend(Meteor._LivedataServer.prototype, {
+ _startSubscription: function (socket, handler, sub_id, params) {
+ var self = this;
+
+ var sub = new Meteor._LivedataServer.Subscription(socket, sub_id);
+ if (sub_id)
+ socket.meteor.named_subs[sub_id] = sub;
+ else
+ socket.meteor.universal_subs.push(sub);
+
+ var res = handler(sub, params);
+
+ // automatically wire up handlers that return a Cursor.
+ // otherwise, the handler is completely responsible for delivering
+ // its own data messages and registering stop functions.
+ if (res instanceof _Mongo.Cursor) // XXX generalize
+ sub._publishCursor(res);
+ },
+
+ // tear down specified subscription
+ _stopSubscription: function (socket, sub_id) {
+ if (sub_id && socket.meteor.named_subs[sub_id]) {
+ socket.meteor.named_subs[sub_id].stop();
+ delete socket.meteor.named_subs[sub_id];
+ }
+ },
+
+ // tear down all subscriptions
+ _stopAllSubscriptions: function (socket) {
+ _.each(socket.meteor.named_subs, function (sub, id) {
+ sub.stop();
+ });
+ socket.meteor.named_subs = {};
+
+ _.each(socket.meteor.universal_subs, function (sub) {
+ sub.stop();
+ });
+ socket.meteor.universal_subs = [];
},
// XXX 'connect' message should have a protocol version
_livedata_connect: function (socket, msg) {
var self = this;
// Always start a new session. We don't support any reconnection.
socket.send(JSON.stringify({msg: 'connected', session: Meteor.uuid()}));
- // Run any universal publishes we may have.
- self._poll_subscriptions(socket);
+
+ // Spin up all the universal publishers.
+ Fiber(function () {
+ _.each(self.universal_publish_handlers, function (handler) {
+ self._startSubscription(socket, handler);
+ });
+ }).run();
+
+ // XXX what to do here on reconnect? oh, probably just fake a sub message.
},
_livedata_sub: function (socket, msg) {
@@ -328,25 +360,33 @@ _.extend(Meteor._LivedataServer.prototype, {
return;
}
- if (!self.publishes[msg.name]) {
+ if (!self.publish_handlers[msg.name]) {
socket.send(JSON.stringify({
msg: 'nosub', id: msg.id, error: {error: 404,
reason: "Subscription not found"}}));
return;
}
- socket.meteor.subs.push({_id: msg.id, name: msg.name,
- params: msg.params || {}});
- self._poll_subscriptions(socket);
+ Fiber(function () {
+ if (msg.id in socket.meteor.named_subs)
+ // XXX client screwed up
+ self._stopSubscription(socket, msg.id);
+
+ var handler = self.publish_handlers[msg.name];
+ self._startSubscription(socket, handler, msg.id, msg.params);
+ }).run();
},
+ // XXX Fiber() doesn't interlock. if a client subs then unsubs, the
+ // subscription should end up as off.
_livedata_unsub: function (socket, msg) {
var self = this;
+
+ Fiber(function () {
+ self._stopSubscription(socket, msg.id);
+ }).run();
+
socket.send(JSON.stringify({msg: 'nosub', id: msg.id}));
- socket.meteor.subs = _.filter(socket.meteor.subs, function (x) {
- return x._id !== msg.id;
- });
- self._poll_subscriptions(socket);
},
_livedata_method: function (socket, msg) {
@@ -410,60 +450,66 @@ _.extend(Meteor._LivedataServer.prototype, {
socket.send(JSON.stringify({
msg: 'result', id: msg.id, result: result}));
- // after the method, rerun all the subscriptions as stuff may
- // have changed.
- // XXX going away in merge very soon
- socket.meteor.pending_method_ids.push(msg.id);
- _.each(self.stream_server.all_sockets(), function(x) {
- if (x && x.meteor)
- x.meteor.throttled_poll();
- });
-
+ // the method is satisfied once func returns, because any
+ // DB observe callbacks run to completion in the same tick.
+ // publishQuery will queue data before returning from the
+ // observe callback.
+ socket.send(JSON.stringify({
+ msg: 'data', methods: [msg.id]}));
};
var invocation = new Meteor._ServerMethodInvocation(msg.method, handler);
try {
invocation._run(msg.params || [], callback, next);
- } catch (e) {
// _run will have already logged the exception (and told the
// client, if appropriate)
+ } catch (err) {
+ socket.send(JSON.stringify({
+ msg: 'result', id: msg.id,
+ error: {error: 13, /* XXX error codes! */
+ reason: "Internal server error"}}));
+ // report method satisfaction to the client
+ socket.send(JSON.stringify({
+ msg: 'data', methods: [msg.id]}));
+ // XXX prettyprint exception in the log
+ Meteor._debug("Exception in method '" + msg.method + "': " +
+ JSON.stringify(err.stack));
}
}).run();
},
/**
- * Defines a live dataset that clients can subscribe to.
+ * Register a publish handler function.
*
* @param name {String} identifier for query
+ * @param handler {Function} publish handler
* @param options {Object}
*
+ * Server will call handler function on each new subscription,
+ * either when receiving DDP sub message for a named subscription, or on
+ * DDP connect for a universal subscription.
+ *
* If name is null, this will be a subscription that is
* automatically established and permanently on for all connected
* client, instead of a subscription that can be turned on and off
* with subscribe().
*
* options to contain:
- * - collection {Collection} collection; defaults to the collection
- * named 'name' on disk in mongodb
- * - selector {Function<args> OR Object} either a mongodb selector,
- * or a function that takes the argument object passed to
- * Meteor.subscribe and returns a mongodb selector. default {}
* - (mostly internal) is_auto: true if generated automatically
* from an autopublish hook. this is for cosmetic purposes only
* (it lets us determine whether to print a warning suggesting
* that you turn off autopublish.)
*/
- publish: function (name, options) {
+ publish: function (name, handler, options) {
var self = this;
- if (name && name in self.publishes) {
- // XXX error duplicate publish
- console.log("ERROR DUPLICATE PUBLISH " + name);
+ options = options || {};
+
+ if (name && name in self.publish_handlers) {
+ Meteor._debug("Ignoring duplicate publish named '" + name + "'");
return;
}
- options = options || {};
-
if (!self.on_autopublish && !options.is_auto) {
// They have autopublish on, yet they're trying to manually
// picking stuff to publish. They probably should turn off
@@ -489,36 +535,10 @@ _.extend(Meteor._LivedataServer.prototype, {
}
}
- var collection = options.collection ||
- (name && self._hack_collections[name]);
- if (!collection) {
- if (name)
- throw new Error("No collection '" + name + "' found to publish. " +
- "You can specify the collection explicitly with the " +
- "'collection' option.");
- else
- throw new Error("When creating universal publishes, you must specify " +
- "the collection explicitly with the 'collection' " +
- "option.");
- }
- var selector = options.selector || {};
- var func = function (channel, params) {
- var opt = function (key, or) {
- var x = options[key] || or;
- return (x instanceof Function) ? x(params) : x;
- };
-
- channel.send(collection._name, collection.find(opt("selector", {}), {
- sort: opt("sort"),
- skip: opt("skip"),
- limit: opt("limit")
- }).fetch());
- };
-
if (name)
- self.publishes[name] = func;
+ self.publish_handlers[name] = handler;
else
- self.universal_publishes.push(func);
+ self.universal_publish_handlers.push(handler);
},
methods: function (methods) {
View
15 packages/livedata/livedata_tests.js
@@ -1,16 +1,3 @@
-test("livedata - basics", function () {
- // Very basic test. Just see that it runs.
-
- var coll = new Meteor.Collection("testing" + LocalCollection.uuid());
-
- coll.remove({foo: 'bar'});
- assert.length(coll.find({foo: 'bar'}).fetch(), 0);
- coll.insert({foo: 'bar'});
- assert.length(coll.find({foo: 'bar'}).fetch(), 1);
-});
-
-/******************************************************************************/
-
// XXX should probably move this into a testing helpers package so it
// can be used by other tests
@@ -257,4 +244,4 @@ testAsyncMulti("livedata - compound methods", [
// method completion/satisfaction
// subscriptions (multiple APIs, including autosubscribe?)
// subscription completion
-// [probably lots more]
+// [probably lots more]
View
2 packages/minimongo/minimongo.js
@@ -151,7 +151,6 @@ LocalCollection.LiveResultsSet = function () {};
//
// attributes available on returned query handle:
// * stop(): end updates
-// * indexOf(id): return current index of object in result set, or -1
// * collection: the collection this query is querying
//
// iff x is a returned query handle, (x instanceof
@@ -163,7 +162,6 @@ LocalCollection.LiveResultsSet = function () {};
// XXX maybe support limit/skip
// XXX it'd be helpful if removed got the object that just left the
// query, not just its id
-// XXX document that initial results will definitely be delivered before we return [do, add to asana]
LocalCollection.Cursor.prototype.observe = function (options) {
var self = this;
View
10 packages/mongo-livedata/collection.js
@@ -104,17 +104,11 @@ Meteor.Collection = function (name, manager, driver) {
manager.methods(m);
}
- // XXX temporary hack to provide sugar in LivedataServer.publish()
- if (name && manager && manager._hack_collections) {
- if (name in manager._hack_collections)
- throw new Error("There is already a collection named '" + name + "'");
- manager._hack_collections[name] = self;
- }
-
// autopublish
if (manager && manager.onAutopublish)
manager.onAutopublish(function () {
- manager.publish(null, {collection: self, is_auto: true});
+ var handler = function (sub, params) { return self.find(); };
+ manager.publish(null, handler, {is_auto: true});
});
};
View
133 packages/mongo-livedata/mongo_driver.js
@@ -16,6 +16,10 @@ Future.prototype.ret = Future.prototype.return;
_Mongo = function (url) {
var self = this;
+ // holds active observes
+ self.observers = {};
+ self.next_observer_id = 1;
+
self.collection_queue = [];
MongoDB.connect(url, function(err, db) {
@@ -27,6 +31,14 @@ _Mongo = function (url) {
db.collection(c.name, c.callback);
}
});
+
+ // refresh all outstanding observers every 10 seconds. they are
+ // also triggered on DB updates.
+ setInterval(function () {
+ Fiber(function () {
+ self._pollObservers.call(self);
+ }).run();
+ }, 10000);
};
// protect against dangerous selectors. falsey and {_id: falsey}
@@ -56,6 +68,19 @@ _Mongo.prototype._withCollection = function(collection_name, callback) {
}
};
+// poke observers watching the given collection name, or all observers
+// if no collection name provided.
+_Mongo.prototype._pollObservers = function (collection_name) {
+ var self = this;
+
+ for (var id in self.observers) {
+ var o = self.observers[id];
+ if (!collection_name || o.collection_name === collection_name) {
+ o._poll();
+ }
+ }
+};
+
//////////// Public API //////////
_Mongo.prototype.insert = function (collection_name, document) {
@@ -70,7 +95,10 @@ _Mongo.prototype.insert = function (collection_name, document) {
// XXX err handling
collection.insert(document, {safe: true}, function(err) {
// XXX err handling
- future.ret();
+ Fiber(function () {
+ self._pollObservers(collection_name);
+ future.ret();
+ }).run();
});
});
@@ -93,7 +121,10 @@ _Mongo.prototype.remove = function (collection_name, selector) {
// XXX err handling
collection.remove(selector, {safe:true}, function(err) {
// XXX err handling
- future.ret();
+ Fiber(function () {
+ self._pollObservers(collection_name);
+ future.ret();
+ }).run();
});
});
@@ -121,7 +152,10 @@ _Mongo.prototype.update = function (collection_name, selector, mod, options) {
collection.update(selector, mod, opts, function(err) {
// XXX err handling
- future.ret();
+ Fiber(function () {
+ self._pollObservers(collection_name);
+ future.ret();
+ }).run();
});
});
@@ -224,9 +258,100 @@ _Mongo.Cursor.prototype.count = function () {
future.ret(err || res);
});
+ return future.wait();
+};
+
+// options to contain:
+// * callbacks:
+// - added (object, before_index)
+// - changed (new_object, at_index)
+// - moved (object, old_index, new_index) - can only fire with changed()
+// - removed (id, at_index)
+// * sort: sort descriptor
+//
+// attributes available on returned LiveResultsSet
+// * stop(): end updates
+
+_Mongo.Cursor.prototype.observe = function (options) {
+ return new _Mongo.LiveResultsSet(this, options);
+};
+_Mongo.LiveResultsSet = function (cursor, options) {
+ // copy my cursor, so that the observe can run independently from
+ // some other use of the cursor.
+ this.cursor = new _Mongo.Cursor(cursor.mongo,
+ cursor.collection_name,
+ cursor.selector,
+ cursor.options);
- return future.wait();
+ // expose collection name
+ this.collection_name = cursor.collection_name;
+
+ // unique handle for this live query
+ this.qid = this.cursor.mongo.next_observer_id++;
+
+ // previous results snapshot. on each poll cycle, diffs against
+ // results drives the callbacks.
+ this.results = {};
+ this.indexes = {};
+
+ this.added = options.added;
+ this.changed = options.changed;
+ this.moved = options.moved;
+ this.removed = options.removed;
+
+ // trigger the first _poll() cycle immediately.
+ this._poll();
+
+ // register myself with the mongo driver
+ this.cursor.mongo.observers[this.qid] = this;
+};
+
+_Mongo.LiveResultsSet.prototype._fetchResults = function (results, indexes) {
+ var self = this;
+ var index = 0;
+
+ self.cursor.rewind();
+ self.cursor.forEach(function (obj) {
+ results[obj._id] = obj;
+ indexes[obj._id] = index++;
+ });
+};
+
+_Mongo.LiveResultsSet.prototype._poll = function () {
+ var self = this;
+
+ var old_results = self.results;
+ var old_indexes = self.indexes;
+ var new_results = {};
+ var new_indexes = {};
+
+ var callbacks = [];
+
+ self._fetchResults(new_results, new_indexes);
+
+ _.each(new_results, function (obj) {
+ if (self.added && !old_results[obj._id])
+ self.added(obj, new_indexes[obj._id]);
+
+ else if (self.changed && !_.isEqual(new_results[obj._id], old_results[obj._id]))
+ self.changed(obj, old_indexes[obj._id], old_results[obj._id]);
+
+ if (self.moved && new_indexes[obj._id] !== old_indexes[obj._id])
+ self.moved(obj, old_indexes[obj._id], new_indexes[obj._id]);
+ });
+
+ for (var id in old_results)
+ if (self.removed && !(id in new_results))
+ self.removed(id, old_indexes[id], old_results[id]);
+
+ self.results = new_results;
+ self.indexes = new_indexes;
+};
+
+_Mongo.LiveResultsSet.prototype.stop = function () {
+ var self = this;
+ delete self.cursor.mongo.observers[self.qid];
};
_.extend(Meteor, {

0 comments on commit fa53aa6

Please sign in to comment.
Something went wrong with that request. Please try again.