New subs #2

Closed
wants to merge 8 commits into
from
View
32 docs/client/api.js
@@ -30,40 +30,20 @@ Template.api_box_args.pretty = function (fn) {
// Meteor boxes
-// XXX now supports sort, skip, limit. same deal as collection (can be
-// a function.) document this.
Template.api.publish = {
id: "publish",
- name: "Meteor.publish(name, [options])",
+ name: "Meteor.publish(name, handler)",
locus: "Server",
descr: [
- "Define a live dataset to which clients may subscribe. Every dataset has a name and can optionally take arguments.",
- "A client that connects to the dataset with `Meteor.subscribe|subscribe` will receive the published data, and will receive updates in realtime.",
+ "Define a live dataset to which clients may subscribe. If `name` is a String, a client can subscribe to the dataset with `Meteor.subscribe|subscribe`. If `name` is falsey, every client is automatically subscribed at connection time. The `handler` argument is a publish function, called at subscription or connection time, that is expected to send data events to the client.",
"Calls to this function are ignored on the client."],
args: [
{name: "name",
type: "String",
- descr: "The name that clients will use to subscribe to this query."}],
- options: [
- {name: "collection",
- type: "Collection",
- type_link: 'collection',
- descr: "The collection to publish. If not given, the default is the collection named `name` on disk in MongoDB."},
- {name: "selector",
- type: "Object — Mongo selector; or Function",
- type_link: "selectors",
- descr: "Filter for the records to publish. The default is `{}` (everything.). If a function, it receives the parameters passed by the client when making the subscription, and should return the selector to use."},
- {name: "sort",
- type: "Object — sort specifier; or Function",
- type_link: "sortspecifiers",
- descr: "Sort order. Like `selector`, can take a function, to be sensitive to the subscription parameters."},
- {name: "skip",
- type: "Number; or Function",
- descr: "Number of results to skip at the beginning. Like `selector`, can take a function, to be sensitive to the subscription parameters."},
- {name: "limit",
- type: "Number; or Function",
- descr: "Maximum number of results to return. Like `selector`, can take a function, to be sensitive to the subscription parameters."}
- ]
+ descr: "The name that clients will use to subscribe to this query."},
+ {name: "handler",
+ type: "Function",
+ descr: "The publish function that emits data messages."}]
};
Template.api.subscribe = {
View
2 examples/leaderboard-remote/client/leaderboard-remote.js
@@ -3,8 +3,6 @@ Leaderboard = Meteor.connect("http://leader2.meteor.com/sockjs");
// XXX I'd rather this be Leaderboard.Players.. can this API be easier?
Players = new Meteor.Collection("players", Leaderboard);
-Leaderboard.subscribe("top10");
-
Template.main.events = {
'keydown': function () {
Session.set("error", null);
View
1 examples/leaderboard/.meteor/packages
@@ -4,3 +4,4 @@
# but you can also edit it by hand.
jquery
+autopublish
View
8 examples/leaderboard/leaderboard.js
@@ -5,9 +5,6 @@ Players = new Meteor.Collection("players");
/*** Client ***/
if (Meteor.is_client) {
- // Get the top 10 players from the server, updated continuously.
- Meteor.subscribe("top10");
-
$(document).ready(function () {
// List the players by score. You can click to select a player.
var scores = Meteor.ui.renderList(Players.find({}, {sort: {score: -1}}), {
@@ -54,11 +51,6 @@ if (Meteor.is_client) {
// subdirectory named 'server'.
if (Meteor.is_server) {
- // Publish the top 10 players, live, to any client that wants them.
- Meteor.publish("top10", {collection: Players,
- sort: {score: -1},
- limit: 10});
-
// On server startup, create some players if the database is empty.
Meteor.startup(function () {
if (Players.find().count() === 0) {
View
20 examples/todos/model.js
@@ -2,20 +2,12 @@ Lists = new Meteor.Collection("lists");
Todos = new Meteor.Collection("todos");
-/* Schema support coming soon!
-
-Lists.schema({text: String});
-
-Todos.schema({text: String,
- done: Boolean,
- tags: [String]});
-*/
-
if (Meteor.is_server) {
- Meteor.publish('lists');
- Meteor.publish('todos', {
- selector: function (params) {
- return {list_id: params.list};
- }
+ Meteor.publish('lists', function (sub, params) {
+ return Lists.find();
+ });
+
+ Meteor.publish('todos', function (sub, params) {
+ return Todos.find({list_id: params.list});
});
}
View
403 packages/livedata/livedata_server.js
@@ -47,7 +47,7 @@ _.extend(Meteor._ServerMethodInvocation.prototype, {
_sendResponse: function (error, ret, from) {
var self = this;
- if (self._threw && from !== "throw")
+ if (self._threw && from !== "throw")
return;
if (self._responded) {
if (from === "throw")
@@ -113,22 +113,25 @@ _.extend(Meteor._ServerMethodInvocation.prototype, {
Meteor._LivedataServer = function () {
var self = this;
- self.publishes = {};
- self.universal_publishes = []; // publishes with no name
- self._hack_collections = {}; // XXX hack. name => Collection
+ self.publish_handlers = {};
+ self.universal_publish_handlers = [];
+
self.method_handlers = {};
- self.stream_server = new Meteor._StreamServer;
+
self.on_autopublish = []; // array of func if AP disabled, null if enabled
self.warned_about_autopublish = false;
+ self.stream_server = new Meteor._StreamServer;
+
self.stream_server.register(function (socket) {
socket.meteor = {};
- socket.meteor.subs = [];
- socket.meteor.cache = {};
- socket.meteor.pending_method_ids = [];
socket.meteor.methods_blocked = false;
socket.meteor.method_queue = [];
+ // Sub objects for active subscriptions
+ socket.meteor.named_subs = {};
+ socket.meteor.universal_subs = [];
+
socket.on('data', function (raw_msg) {
try {
try {
@@ -159,160 +162,189 @@ Meteor._LivedataServer = function () {
}
});
-
- // 5/sec updates tops, once every 10sec min.
- socket.meteor.throttled_poll = _.throttle(function () {
- self._poll_subscriptions(socket);
- }, 50); // XXX only 50ms! for great speed. might want higher in prod.
- socket.meteor.timer = setInterval(socket.meteor.throttled_poll, 10000);
+ socket.on('close', function () {
+ self._stopAllSubscriptions(socket);
+ });
});
};
-_.extend(Meteor._LivedataServer.prototype, {
- _poll_subscriptions: function (socket) {
- var self = this;
+// ctor for a sub handle: the input to each publish function
+Meteor._LivedataServer.Subscription = function (socket, sub_id) {
+ // transport. provides send(obj).
+ this.socket = socket;
- Fiber(function () {
- // holds a clean copy of client's data. channel.send will
- // populate new_cache, then we compute the difference with the old
- // cache, send the delta.
- var new_cache = {};
-
- // setup a channel object
- var channel = {
- // this gets called by publish lambda with each object. send
- // populates the server's copy of what the client has.
- send: function(collection_name, obj) {
- if (!(obj instanceof Array))
- obj = [obj];
-
- _.each(obj, function (o) {
- if (!o._id) {
- console.log("WARNING trying to send object without _id"); // XXX
- return;
- }
-
- // XXX -- '|' not allowed in collection name?
- var key = collection_name + "|" + o._id;
-
- // insert or extend new_cache with 'o' object
- new_cache[key] = _.extend(new_cache[key] || {}, o);
- });
- }
- };
+ // my subscription ID (generated by client, null for universal subs).
+ this.sub_id = sub_id;
- // actually run the subscriptions.
+ // unsent DDP messages.
+ this.pending_updates = {};
+ this.pending_complete = false;
- _.each(self.universal_publishes, function (pub) {
- pub(channel, {});
- });
+ // stop callbacks to g/c this sub. called w/ zero arguments.
+ this.stop_callbacks = [];
+};
- _.each(socket.meteor.subs, function (sub) {
- var pub = self.publishes[sub.name];
- if (!pub) {
- // XXX error unknown publish
- console.log("ERROR UNKNOWN PUBLISH " + sub.name);
- return;
- }
+Meteor._LivedataServer.Subscription.prototype.stop = function () {
+ for (var i = 0; i < this.stop_callbacks.length; i++)
+ (this.stop_callbacks[i])();
+};
- pub(channel, sub.params);
- });
+Meteor._LivedataServer.Subscription.prototype.onStop = function (callback) {
+ this.stop_callbacks.push(callback);
+};
+Meteor._LivedataServer.Subscription.prototype._ensureMsg = function (collection_name, id) {
+ var self = this;
+ if (!self.pending_updates[collection_name])
+ self.pending_updates[collection_name] = {};
+ if (!self.pending_updates[collection_name][id])
+ self.pending_updates[collection_name][id] = {msg: 'data', collection: collection_name, id: id};
+ return self.pending_updates[collection_name][id];
+};
- // emit deltas for each item in the new cache (any object
- // created in this poll cycle).
- _.each(new_cache, function (new_obj, key) {
- var old_obj = socket.meteor.cache[key];
-
- // XXX parsing from the string is so ugly.
- var parts = key.split("|");
- if (!parts || parts.length !== 2) return;
- var collection_name = parts[0];
- var id = parts[1];
-
- var msg = {msg: 'data', collection: collection_name, id: id};
-
- if (!old_obj) {
- // New object. Send an insert down to the client.
- var obj_to_send = _.extend({}, new_obj);
- delete obj_to_send._id;
- if (_.keys(obj_to_send).length) {
- msg.set = obj_to_send;
- socket.send(JSON.stringify(msg));
- }
-
- } else {
- // Old object. Check for updates and send changes attributes
- // to the client.
- var set = {};
- var unset = [];
-
- _.each(new_obj, function (v, k) {
- // Not canonical order comparison or anything, but close
- // enough I hope. We may send some spurious updates?
- if (JSON.stringify(v) !== JSON.stringify(old_obj[k]))
- set[k] = v;
- });
-
- unset = _.difference(_.keys(old_obj), _.keys(new_obj));
-
- if (_.keys(set).length > 0)
- msg.set = set;
- if (unset.length > 0)
- msg.unset = unset;
-
- if (msg.set || msg.unset)
- socket.send(JSON.stringify(msg));
- }
- });
+Meteor._LivedataServer.Subscription.prototype.set = function (collection_name, id, dictionary) {
+ var self = this;
+ var obj = _.extend({}, dictionary);
+ delete obj._id;
+ var msg = self._ensureMsg(collection_name, id);
+ msg.set = _.extend(msg.set || {}, obj);
+
+ if (msg.unset) {
+ msg.unset = _.difference(msg.unset, _.keys(msg.set));
+ if (!msg.unset.length)
+ delete msg.unset;
+ }
+};
- // emit deltas for items in the old cache that no longer exist.
- var removed_keys = _.difference(_.keys(socket.meteor.cache),
- _.keys(new_cache));
- _.each(removed_keys, function (key) {
- // XXX parsing from the string is so ugly.
- var parts = key.split("|");
- if (!parts || parts.length !== 2) return;
- var collection_name = parts[0];
- var id = parts[1];
-
- var msg = {msg: 'data', collection: collection_name, id: id};
- msg.unset = _.without(_.keys(socket.meteor.cache[key]), '_id');
- socket.send(JSON.stringify(msg));
- });
+Meteor._LivedataServer.Subscription.prototype.unset = function (collection_name, id, keys) {
+ var self = this;
+ keys = _.without(keys, '_id');
+ var msg = self._ensureMsg(collection_name, id);
+ msg.unset = _.union(msg.unset || [], keys);
+
+ if (msg.set) {
+ for (var key in keys)
+ delete msg.set[key];
+ if (!_.keys(msg.set))
+ delete msg.set;
+ }
+};
+
+Meteor._LivedataServer.Subscription.prototype.complete = function () {
+ var self = this;
- // promote new_cache to old_cache
- socket.meteor.cache = new_cache;
+ // universal subs (sub_id is null) can't signal completion. it's
+ // not an error, since the same handler (eg publishQuery) might be used
+ // to implement both named and universal subs.
- // inform the client that the subscription is ready to go
- var subs_ready = [];
- _.each(socket.meteor.subs, function (sub) {
- if (!sub.ready) {
- subs_ready.push(sub._id);
- sub.ready = true;
- }
+ if (self.sub_id)
+ self.pending_complete = true;
+};
+
+Meteor._LivedataServer.Subscription.prototype.flush = function () {
+ var self = this;
+ var msg;
+
+ for (var name in self.pending_updates)
+ for (var id in self.pending_updates[name])
+ self.socket.send(JSON.stringify(self.pending_updates[name][id]));
+
+ if (self.pending_complete)
+ self.socket.send(JSON.stringify({msg: 'data', subs: [self.sub_id]}));
+
+ self.pending_updates = {};
+ self.pending_complete = false;
+};
+
+Meteor._LivedataServer.Subscription.prototype._publishCursor = function (cursor, name) {
+ var self = this;
+ var collection = name || cursor.collection_name;
+
+ var observe_handle = cursor.observe({
+ added: function (obj) {
+ self.set(collection, obj._id, obj);
+ self.flush();
+ },
+ changed: function (obj, old_idx, old_obj) {
+ var set = {};
+ _.each(obj, function (v, k) {
+ if (!_.isEqual(v, old_obj[k]))
+ set[k] = v;
});
+ self.set(collection, obj._id, set);
+ var dead_keys = _.difference(_.keys(old_obj), _.keys(obj));
+ self.unset(collection, obj._id, dead_keys);
+ self.flush();
+ },
+ removed: function (id, old_idx, old_obj) {
+ self.unset(collection, id, _.keys(old_obj));
+ self.flush();
+ }
+ });
- if (subs_ready.length || socket.meteor.pending_method_ids.length) {
- var msg = {msg: 'data'};
- if (subs_ready.length)
- msg.subs = subs_ready;
- if (socket.meteor.pending_method_ids.length)
- msg.methods = socket.meteor.pending_method_ids;
- socket.send(JSON.stringify(msg));
- }
- socket.meteor.pending_method_ids = [];
+ // observe only returns after the initial added callbacks have
+ // run. mark subscription as completed.
+ self.complete();
+ self.flush();
- }).run();
+ // register stop callback (expects lambda w/ no args).
+ self.onStop(_.bind(observe_handle.stop, observe_handle));
+};
+
+_.extend(Meteor._LivedataServer.prototype, {
+ _startSubscription: function (socket, handler, sub_id, params) {
+ var self = this;
+
+ var sub = new Meteor._LivedataServer.Subscription(socket, sub_id);
+ if (sub_id)
+ socket.meteor.named_subs[sub_id] = sub;
+ else
+ socket.meteor.universal_subs.push(sub);
+
+ var res = handler(sub, params);
+
+ // automatically wire up handlers that return a Cursor.
+ // otherwise, the handler is completely responsible for delivering
+ // its own data messages and registering stop functions.
+ if (res instanceof _Mongo.Cursor) // XXX generalize
+ sub._publishCursor(res);
+ },
+
+ // tear down specified subscription
+ _stopSubscription: function (socket, sub_id) {
+ if (sub_id && socket.meteor.named_subs[sub_id]) {
+ socket.meteor.named_subs[sub_id].stop();
+ delete socket.meteor.named_subs[sub_id];
+ }
+ },
+
+ // tear down all subscriptions
+ _stopAllSubscriptions: function (socket) {
+ _.each(socket.meteor.named_subs, function (sub, id) {
+ sub.stop();
+ });
+ socket.meteor.named_subs = {};
+
+ _.each(socket.meteor.universal_subs, function (sub) {
+ sub.stop();
+ });
+ socket.meteor.universal_subs = [];
},
// XXX 'connect' message should have a protocol version
_livedata_connect: function (socket, msg) {
var self = this;
// Always start a new session. We don't support any reconnection.
socket.send(JSON.stringify({msg: 'connected', session: Meteor.uuid()}));
- // Run any universal publishes we may have.
- self._poll_subscriptions(socket);
+
+ // Spin up all the universal publishers.
+ Fiber(function () {
+ _.each(self.universal_publish_handlers, function (handler) {
+ self._startSubscription(socket, handler);
+ });
+ }).run();
+
+ // XXX what to do here on reconnect? oh, probably just fake a sub message.
},
_livedata_sub: function (socket, msg) {
@@ -328,25 +360,33 @@ _.extend(Meteor._LivedataServer.prototype, {
return;
}
- if (!self.publishes[msg.name]) {
+ if (!self.publish_handlers[msg.name]) {
socket.send(JSON.stringify({
msg: 'nosub', id: msg.id, error: {error: 404,
reason: "Subscription not found"}}));
return;
}
- socket.meteor.subs.push({_id: msg.id, name: msg.name,
- params: msg.params || {}});
- self._poll_subscriptions(socket);
+ Fiber(function () {
+ if (msg.id in socket.meteor.named_subs)
+ // XXX client screwed up
+ self._stopSubscription(socket, msg.id);
+
+ var handler = self.publish_handlers[msg.name];
+ self._startSubscription(socket, handler, msg.id, msg.params);
+ }).run();
},
+ // XXX Fiber() doesn't interlock. if a client subs then unsubs, the
+ // subscription should end up as off.
_livedata_unsub: function (socket, msg) {
var self = this;
+
+ Fiber(function () {
+ self._stopSubscription(socket, msg.id);
+ }).run();
+
socket.send(JSON.stringify({msg: 'nosub', id: msg.id}));
- socket.meteor.subs = _.filter(socket.meteor.subs, function (x) {
- return x._id !== msg.id;
- });
- self._poll_subscriptions(socket);
},
_livedata_method: function (socket, msg) {
@@ -410,15 +450,11 @@ _.extend(Meteor._LivedataServer.prototype, {
socket.send(JSON.stringify({
msg: 'result', id: msg.id, result: result}));
- // after the method, rerun all the subscriptions as stuff may
- // have changed.
- // XXX going away in merge very soon
- socket.meteor.pending_method_ids.push(msg.id);
- _.each(self.stream_server.all_sockets(), function(x) {
- if (x && x.meteor)
- x.meteor.throttled_poll();
- });
-
+ // the method is satisfied once callback is called, because
+ // any DB observe callbacks run to completion in the same
+ // tick.
+ socket.send(JSON.stringify({
+ msg: 'data', methods: [msg.id]}));
};
var invocation = new Meteor._ServerMethodInvocation(msg.method, handler);
@@ -427,43 +463,44 @@ _.extend(Meteor._LivedataServer.prototype, {
} catch (e) {
// _run will have already logged the exception (and told the
// client, if appropriate)
+ socket.send(JSON.stringify({
+ msg: 'data', methods: [msg.id]}));
}
}).run();
},
/**
- * Defines a live dataset that clients can subscribe to.
+ * Register a publish handler function.
*
* @param name {String} identifier for query
+ * @param handler {Function} publish handler
* @param options {Object}
*
+ * Server will call handler function on each new subscription,
+ * either when receiving DDP sub message for a named subscription, or on
+ * DDP connect for a universal subscription.
+ *
* If name is null, this will be a subscription that is
* automatically established and permanently on for all connected
* client, instead of a subscription that can be turned on and off
* with subscribe().
*
* options to contain:
- * - collection {Collection} collection; defaults to the collection
- * named 'name' on disk in mongodb
- * - selector {Function<args> OR Object} either a mongodb selector,
- * or a function that takes the argument object passed to
- * Meteor.subscribe and returns a mongodb selector. default {}
* - (mostly internal) is_auto: true if generated automatically
* from an autopublish hook. this is for cosmetic purposes only
* (it lets us determine whether to print a warning suggesting
* that you turn off autopublish.)
*/
- publish: function (name, options) {
+ publish: function (name, handler, options) {
var self = this;
- if (name && name in self.publishes) {
- // XXX error duplicate publish
- console.log("ERROR DUPLICATE PUBLISH " + name);
+ options = options || {};
+
+ if (name && name in self.publish_handlers) {
+ Meteor._debug("Ignoring duplicate publish named '" + name + "'");
return;
}
- options = options || {};
-
if (!self.on_autopublish && !options.is_auto) {
// They have autopublish on, yet they're trying to manually
// picking stuff to publish. They probably should turn off
@@ -489,36 +526,10 @@ _.extend(Meteor._LivedataServer.prototype, {
}
}
- var collection = options.collection ||
- (name && self._hack_collections[name]);
- if (!collection) {
- if (name)
- throw new Error("No collection '" + name + "' found to publish. " +
- "You can specify the collection explicitly with the " +
- "'collection' option.");
- else
- throw new Error("When creating universal publishes, you must specify " +
- "the collection explicitly with the 'collection' " +
- "option.");
- }
- var selector = options.selector || {};
- var func = function (channel, params) {
- var opt = function (key, or) {
- var x = options[key] || or;
- return (x instanceof Function) ? x(params) : x;
- };
-
- channel.send(collection._name, collection.find(opt("selector", {}), {
- sort: opt("sort"),
- skip: opt("skip"),
- limit: opt("limit")
- }).fetch());
- };
-
if (name)
- self.publishes[name] = func;
+ self.publish_handlers[name] = handler;
else
- self.universal_publishes.push(func);
+ self.universal_publish_handlers.push(handler);
},
methods: function (methods) {
View
7 packages/livedata/livedata_test_service.js
@@ -26,11 +26,8 @@ Meteor.startup(function () {
});
if (Meteor.is_server)
- Meteor.publish('ledger', {
- collection: Ledger,
- selector: function (params) {
- return {world: params.world};
- }
+ Meteor.publish('ledger', function (sub, params) {
+ return Ledger.find({world: params.world});
});
App.methods({
View
15 packages/livedata/livedata_tests.js
@@ -1,16 +1,3 @@
-test("livedata - basics", function () {
- // Very basic test. Just see that it runs.
-
- var coll = new Meteor.Collection("testing" + LocalCollection.uuid());
-
- coll.remove({foo: 'bar'});
- assert.length(coll.find({foo: 'bar'}).fetch(), 0);
- coll.insert({foo: 'bar'});
- assert.length(coll.find({foo: 'bar'}).fetch(), 1);
-});
-
-/******************************************************************************/
-
// XXX should probably move this into a testing helpers package so it
// can be used by other tests
@@ -257,4 +244,4 @@ testAsyncMulti("livedata - compound methods", [
// method completion/satisfaction
// subscriptions (multiple APIs, including autosubscribe?)
// subscription completion
-// [probably lots more]
+// [probably lots more]
View
17 packages/minimongo/minimongo.js
@@ -144,14 +144,12 @@ LocalCollection.LiveResultsSet = function () {};
// options to contain:
// * callbacks:
// - added (object, before_index)
-// - changed (new_object, at_index)
+// - changed (new_object, at_index, old_object)
// - moved (object, old_index, new_index) - can only fire with changed()
-// - removed (id, at_index)
-// * sort: sort descriptor
+// - removed (id, at_index, object)
//
// attributes available on returned query handle:
// * stop(): end updates
-// * indexOf(id): return current index of object in result set, or -1
// * collection: the collection this query is querying
//
// iff x is a returned query handle, (x instanceof
@@ -163,7 +161,6 @@ LocalCollection.LiveResultsSet = function () {};
// XXX maybe support limit/skip
// XXX it'd be helpful if removed got the object that just left the
// query, not just its id
-// XXX document that initial results will definitely be delivered before we return [do, add to asana]
LocalCollection.Cursor.prototype.observe = function (options) {
var self = this;
@@ -337,6 +334,8 @@ LocalCollection.prototype._modifyAndNotify = function (doc, mod) {
for (var qid in self.queries)
matched_before[qid] = self.queries[qid].selector_f(doc);
+ var old_doc = LocalCollection._deepcopy(doc);
+
LocalCollection._modify(doc, mod);
for (var qid in self.queries) {
@@ -348,7 +347,7 @@ LocalCollection.prototype._modifyAndNotify = function (doc, mod) {
else if (!before && after)
LocalCollection._insertInResults(query, doc);
else if (before && after)
- LocalCollection._updateInResults(query, doc);
+ LocalCollection._updateInResults(query, doc, old_doc);
}
};
@@ -386,13 +385,13 @@ LocalCollection._insertInResults = function (query, doc) {
LocalCollection._removeFromResults = function (query, doc) {
var i = LocalCollection._findInResults(query, doc);
- query.removed(doc._id, i);
+ query.removed(doc._id, i, doc);
query.results.splice(i, 1);
};
-LocalCollection._updateInResults = function (query, doc) {
+LocalCollection._updateInResults = function (query, doc, old_doc) {
var orig_idx = LocalCollection._findInResults(query, doc);
- query.changed(LocalCollection._deepcopy(doc), orig_idx);
+ query.changed(LocalCollection._deepcopy(doc), orig_idx, old_doc);
if (!query.sort_f)
return;
View
63 packages/minimongo/minimongo_tests.js
@@ -825,6 +825,65 @@ test("minimongo - modify", function () {
// XXX test update() (selecting docs, multi, upsert..)
test("minimongo - observe", function () {
- // XXX needs tests!
- // don't forget tests for stop
+ var operations = [];
+ var cbs = {
+ added: function (obj, idx) {
+ delete obj._id;
+ operations.push(LocalCollection._deepcopy(['added', obj, idx]));
+ },
+ changed: function (obj, at, old_obj) {
+ delete obj._id;
+ delete old_obj._id;
+ operations.push(LocalCollection._deepcopy(['changed', obj, at, old_obj]));
+ },
+ moved: function (obj, old_at, new_at) {
+ delete obj._id;
+ operations.push(LocalCollection._deepcopy(['moved', obj, old_at, new_at]));
+ },
+ removed: function (id, at, old_obj) {
+ delete old_obj._id;
+ operations.push(LocalCollection._deepcopy(['removed', id, at, old_obj]));
+ }
+ };
+ var handle;
+
+ var c = new LocalCollection();
+ handle = c.find({}, {sort: {a: 1}}).observe(cbs);
+ assert.isTrue(handle.collection === c);
+
+ c.insert({a:1});
+ assert.equal(operations.shift(), ['added', {a:1}, 0]);
+ c.update({a:1}, {$set: {a: 2}});
+ assert.equal(operations.shift(), ['changed', {a:2}, 0, {a:1}]);
+ c.insert({a:10});
+ assert.equal(operations.shift(), ['added', {a:10}, 1]);
+ c.update({}, {$inc: {a: 1}}, {multi: true});
+ assert.equal(operations.shift(), ['changed', {a:3}, 0, {a:2}]);
+ assert.equal(operations.shift(), ['changed', {a:11}, 1, {a:10}]);
+ c.update({a:11}, {a:1});
+ assert.equal(operations.shift(), ['changed', {a:1}, 1, {a:11}]);
+ assert.equal(operations.shift(), ['moved', {a:1}, 1, 0]);
+ c.remove({a:2});
+ assert.equal(operations.shift(), undefined);
+ var id = c.findOne({a:3})._id;
+ c.remove({a:3});
+ assert.equal(operations.shift(), ['removed', id, 1, {a:3}]);
+
+ // test stop
+ handle.stop();
+ c.insert({a:2});
+ assert.equal(operations.shift(), undefined);
+
+ // test initial inserts (and backwards sort)
+ handle = c.find({}, {sort: {a: -1}}).observe(cbs);
+ assert.equal(operations.shift(), ['added', {a:2}, 0]);
+ assert.equal(operations.shift(), ['added', {a:1}, 1]);
+ handle.stop();
+
+ // test _suppress_initial
+ handle = c.find({}, {sort: {a: -1}}).observe(_.extend(cbs, {_suppress_initial: true}));
+ assert.equal(operations.shift(), undefined);
+ c.insert({a:100});
+ assert.equal(operations.shift(), ['added', {a:100}, 0]);
+ handle.stop();
});
View
10 packages/mongo-livedata/collection.js
@@ -104,17 +104,11 @@ Meteor.Collection = function (name, manager, driver) {
manager.methods(m);
}
- // XXX temporary hack to provide sugar in LivedataServer.publish()
- if (name && manager && manager._hack_collections) {
- if (name in manager._hack_collections)
- throw new Error("There is already a collection named '" + name + "'");
- manager._hack_collections[name] = self;
- }
-
// autopublish
if (manager && manager.onAutopublish)
manager.onAutopublish(function () {
- manager.publish(null, {collection: self, is_auto: true});
+ var handler = function (sub, params) { return self.find(); };
+ manager.publish(null, handler, {is_auto: true});
});
};
View
143 packages/mongo-livedata/mongo_driver.js
@@ -16,6 +16,10 @@ Future.prototype.ret = Future.prototype.return;
_Mongo = function (url) {
var self = this;
+ // holds active observes
+ self.observers = {};
+ self.next_observer_id = 1;
+
self.collection_queue = [];
MongoDB.connect(url, function(err, db) {
@@ -27,6 +31,14 @@ _Mongo = function (url) {
db.collection(c.name, c.callback);
}
});
+
+ // refresh all outstanding observers every 10 seconds. they are
+ // also triggered on DB updates.
+ setInterval(function () {
+ Fiber(function () {
+ self._pollObservers.call(self);
+ }).run();
+ }, 10000);
};
// protect against dangerous selectors. falsey and {_id: falsey}
@@ -56,6 +68,19 @@ _Mongo.prototype._withCollection = function(collection_name, callback) {
}
};
+// poke observers watching the given collection name, or all observers
+// if no collection name provided.
+_Mongo.prototype._pollObservers = function (collection_name) {
+ var self = this;
+
+ for (var id in self.observers) {
+ var o = self.observers[id];
+ if (!collection_name || o.collection_name === collection_name) {
+ o._poll();
+ }
+ }
+};
+
//////////// Public API //////////
_Mongo.prototype.insert = function (collection_name, document) {
@@ -70,7 +95,10 @@ _Mongo.prototype.insert = function (collection_name, document) {
// XXX err handling
collection.insert(document, {safe: true}, function(err) {
// XXX err handling
- future.ret();
+ Fiber(function () {
+ self._pollObservers(collection_name);
+ future.ret();
+ }).run();
});
});
@@ -93,7 +121,10 @@ _Mongo.prototype.remove = function (collection_name, selector) {
// XXX err handling
collection.remove(selector, {safe:true}, function(err) {
// XXX err handling
- future.ret();
+ Fiber(function () {
+ self._pollObservers(collection_name);
+ future.ret();
+ }).run();
});
});
@@ -121,7 +152,10 @@ _Mongo.prototype.update = function (collection_name, selector, mod, options) {
collection.update(selector, mod, opts, function(err) {
// XXX err handling
- future.ret();
+ Fiber(function () {
+ self._pollObservers(collection_name);
+ future.ret();
+ }).run();
});
});
@@ -160,16 +194,7 @@ _Mongo.Cursor = function (mongo, collection_name, selector, options) {
self.mongo._withCollection(collection_name, function(err, collection) {
// XXX err handling
-
- var cursor = collection.find(self.selector);
- // XXX is there a way to do this as for x in ['sort', 'limit', 'skip']?
- if (self.options.sort)
- cursor = cursor.sort(self.options.sort);
- if (self.options.limit)
- cursor = cursor.limit(self.options.limit);
- if (self.options.skip)
- cursor = cursor.skip(self.options.skip);
-
+ var cursor = collection.find(self.selector, self.options.fields, self.options.skip, self.options.limit, self.options.sort);
future.ret(cursor);
});
@@ -224,9 +249,99 @@ _Mongo.Cursor.prototype.count = function () {
future.ret(err || res);
});
+ return future.wait();
+};
+// options to contain:
+// * callbacks:
+// - added (object, before_index)
+// - changed (new_object, at_index)
+// - moved (object, old_index, new_index) - can only fire with changed()
+// - removed (id, at_index)
+//
+// attributes available on returned LiveResultsSet
+// * stop(): end updates
+
+_Mongo.Cursor.prototype.observe = function (options) {
+ return new _Mongo.LiveResultsSet(this, options);
+};
- return future.wait();
+_Mongo.LiveResultsSet = function (cursor, options) {
+ // copy my cursor, so that the observe can run independently from
+ // some other use of the cursor.
+ this.cursor = new _Mongo.Cursor(cursor.mongo,
+ cursor.collection_name,
+ cursor.selector,
+ cursor.options);
+
+ // expose collection name
+ this.collection_name = cursor.collection_name;
+
+ // unique handle for this live query
+ this.qid = this.cursor.mongo.next_observer_id++;
+
+ // previous results snapshot. on each poll cycle, diffs against
+ // results drives the callbacks.
+ this.results = {};
+ this.indexes = {};
+
+ this.added = options.added;
+ this.changed = options.changed;
+ this.moved = options.moved;
+ this.removed = options.removed;
+
+ // trigger the first _poll() cycle immediately.
+ this._poll();
+
+ // register myself with the mongo driver
+ this.cursor.mongo.observers[this.qid] = this;
+};
+
+_Mongo.LiveResultsSet.prototype._fetchResults = function (results, indexes) {
+ var self = this;
+ var index = 0;
+
+ self.cursor.rewind();
+ self.cursor.forEach(function (obj) {
+ results[obj._id] = obj;
+ indexes[obj._id] = index++;
+ });
+};
+
+_Mongo.LiveResultsSet.prototype._poll = function () {
+ var self = this;
+
+ var old_results = self.results;
+ var old_indexes = self.indexes;
+ var new_results = {};
+ var new_indexes = {};
+
+ var callbacks = [];
+
+ self._fetchResults(new_results, new_indexes);
+
+ _.each(new_results, function (obj) {
+ if (self.added && !old_results[obj._id])
+ self.added(obj, new_indexes[obj._id]);
+
+ else if (self.changed && !_.isEqual(new_results[obj._id], old_results[obj._id]))
+ self.changed(obj, old_indexes[obj._id], old_results[obj._id]);
+
+ if (self.moved && new_indexes[obj._id] !== old_indexes[obj._id])
+ self.moved(obj, old_indexes[obj._id], new_indexes[obj._id]);
+ });
+
+ for (var id in old_results)
+ if (self.removed && !(id in new_results))
+ self.removed(id, old_indexes[id], old_results[id]);
+
+ self.results = new_results;
+ self.indexes = new_indexes;
+};
+
+_Mongo.LiveResultsSet.prototype.stop = function () {
+ var self = this;
+ delete self.cursor.mongo.observers[self.qid];
};
_.extend(Meteor, {
View
5 packages/tinytest/tinytest_server.js
@@ -2,9 +2,8 @@ Meteor.startup(function () {
Meteor._ServerTestResults.remove();
});
-App.publish('tinytest/results', {
- collection: Meteor._ServerTestResults,
- selector: function (params) { return {run_id: params.run_id} }
+App.publish('tinytest/results', function (sub, params) {
+ return Meteor._ServerTestResults.find({run_id: params.run_id});
});
App.methods({