Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP

Loading…

New subs #2

Closed
wants to merge 8 commits into from

1 participant

@debergalis
Owner

testing something

@gschmidt @n1mmy

debergalis added some commits
@debergalis debergalis Rework livedata (DDP) subscriptions.
Publish now comes in three flavors:

* Bare metal API: Meteor.publish(name, func).

  Server will call func(sub, params) each time a client subscribes to
  "name" (with supplied "params"), or if "name" is null, each time a new
  client connects (autopublish).  "sub" is a Subscription object, which
  supplies sub.set(), sub.unset(), sub.satisfies(), and sub.flush()
  methods to emit DDP data messages.  func() should register a cleanup
  function with sub.onStop(), which will be called when client unsubs.

  To react to database changes, use Collection.observe().  Publish
  functions that are not database backed may use some other mechanism
  (setInterval?) to schedule calls to sub.set().

* publishCursor API:

  If func() *returns* a Cursor, server will automatically publish all
  results from that cursor as the collection with the same name as the
  cursor's underlying Mongo collection.  For example:

  Meteor.publish('top10', function (sub, params) {
    return Players.find({}, {sort: {score: -1}, limit: 10});
  });

  will define a 'top10' publish that is always the top 10 scoring
  players, published to the 'players' collection on each subscribing
  client.

* autopublish

  When the autopublish package is loaded, all Collections defined on the
  server will automatically be published, in their entirety, to each
  connected client.  Clients need not call Meteor.subscribe().  Calls to
  publish() will emit a warning message, since they are superflous.  To
  disable autopublish, run "meteor remove autopublish".
fa53aa6
@debergalis debergalis Sync Minimongo observe API to server.
Return old document in callbacks.  Add LiveResultsSet.collection to
match server.

Add observe() unit test suite.
bc05e41
@debergalis debergalis Support field selection in server-side mongo.
Can now publish subsets of documents with something like this:

Meteor.publish('items', function (sub, params) {
  return Items.find({user_id: params.user_id},
                    {fields: {some_secret_field: 0}});
});

where items.some_secret_fields won't be included in the publish output.

(fields option is ignored by minimongo.)
6f676a9
@debergalis debergalis autopublish in leaderboard cb14d11
@debergalis debergalis Convert to new publish API. f6fffe0
@debergalis debergalis Update documentation. fd1fa85
@debergalis debergalis clarify comment f111902
@debergalis debergalis signal method satisfaction on exception f685e39
@debergalis debergalis closed this
@glasser glasser referenced this pull request from a commit
Commit has since been removed from the repository and is no longer available.
@glasser glasser referenced this pull request from a commit
Commit has since been removed from the repository and is no longer available.
@glasser glasser referenced this pull request from a commit
Commit has since been removed from the repository and is no longer available.
@glasser glasser referenced this pull request from a commit
@glasser glasser Change interface for determining if the user doc is loading to a new …
…reactive

function Meteor.userLoading(). The value of Meteor.user() is not defined if
userLoading returns true.

The Handlebars helper currentUser is "true" if the user is loading, and a new
helper currentUserLoading is equivalent to Meteor.userLoading.

The current user subscription is now named meteor.currentUser rather than being
an unnamed sub. (loginServiceConfiguration is renamed
meteor.loginServiceConfiguration to match.) This subscription is sub'd from when
you log in and unsub'd from when you log out (or if you log in with different
credentials).

I was very careful to make sure that in the case of "sub #1, unsub #1, sub #2,
sub #1 is ready" we do not declare the user to be ready. I could have instead
modified livedata_connection to not call ready callbacks for unsub'd
subscriptions (add a "delete self.sub_ready_callbacks[obj._id]" to the self.subs
removed function) but this seemed less invasive.

The password and email tests use this to take a more rigorous approach to
waiting for the data to load, and they change the localStorage keys so that
multiple tabs running tests don't interact via localStorage.
03c91d4
@glasser glasser referenced this pull request from a commit
Commit has since been removed from the repository and is no longer available.
@glasser glasser referenced this pull request from a commit
Commit has since been removed from the repository and is no longer available.
@glasser glasser referenced this pull request from a commit
Commit has since been removed from the repository and is no longer available.
@glasser glasser referenced this pull request from a commit
Commit has since been removed from the repository and is no longer available.
@glasser glasser referenced this pull request from a commit
Commit has since been removed from the repository and is no longer available.
@glasser glasser referenced this pull request from a commit
Commit has since been removed from the repository and is no longer available.
@glasser glasser referenced this pull request from a commit
Commit has since been removed from the repository and is no longer available.
@glasser glasser referenced this pull request from a commit
@glasser glasser Change interface for determining if the user doc is loaded to a new r…
…eactive

function Meteor.userLoaded(), which is true if you are logged in and the user
doc is loaded, and a currentUserLoaded Handlebars helper to match.

If logged in and the user doc is not yet loaded, Meteor.user() now returns an
object which only contains _id.

The current user subscription is now named meteor.currentUser rather than being
an unnamed sub. (loginServiceConfiguration is renamed
meteor.loginServiceConfiguration to match.) This subscription is sub'd from when
you log in and unsub'd from when you log out (or if you log in with different
credentials).

I was very careful to make sure that in the case of "sub #1, unsub #1, sub #2,
sub #1 is ready" we do not declare the user to be ready. I could have instead
modified livedata_connection to not call ready callbacks for unsub'd
subscriptions (add a "delete self.sub_ready_callbacks[obj._id]" to the self.subs
removed function) but this seemed less invasive.

The password and email tests use this to take a more rigorous approach to
waiting for the data to load, and they change the localStorage keys so that
multiple tabs running tests don't interact via localStorage.
0f3c44b
@glasser glasser referenced this pull request from a commit
@glasser glasser Change interface for determining if the user doc is loaded to a new r…
…eactive

function Meteor.userLoaded(), which is true if you are logged in and the user
doc is loaded, and a currentUserLoaded Handlebars helper to match.

If logged in and the user doc is not yet loaded, Meteor.user() now returns an
object which only contains _id.

The current user subscription is now named meteor.currentUser rather than being
an unnamed sub. (loginServiceConfiguration is renamed
meteor.loginServiceConfiguration to match.) This subscription is sub'd from when
you log in and unsub'd from when you log out (or if you log in with different
credentials).

I was very careful to make sure that in the case of "sub #1, unsub #1, sub #2,
sub #1 is ready" we do not declare the user to be ready. I could have instead
modified livedata_connection to not call ready callbacks for unsub'd
subscriptions (add a "delete self.sub_ready_callbacks[obj._id]" to the self.subs
removed function) but this seemed less invasive.

The password and email tests use this to take a more rigorous approach to
waiting for the data to load, and they change the localStorage keys so that
multiple tabs running tests don't interact via localStorage.
d4e4a63
@haircuttedfreak haircuttedfreak referenced this pull request from a commit
@glasser glasser Change interface for determining if the user doc is loaded to a new r…
…eactive

function Meteor.userLoaded(), which is true if you are logged in and the user
doc is loaded, and a currentUserLoaded Handlebars helper to match.

If logged in and the user doc is not yet loaded, Meteor.user() now returns an
object which only contains _id.

The current user subscription is now named meteor.currentUser rather than being
an unnamed sub. (loginServiceConfiguration is renamed
meteor.loginServiceConfiguration to match.) This subscription is sub'd from when
you log in and unsub'd from when you log out (or if you log in with different
credentials).

I was very careful to make sure that in the case of "sub #1, unsub #1, sub #2,
sub #1 is ready" we do not declare the user to be ready. I could have instead
modified livedata_connection to not call ready callbacks for unsub'd
subscriptions (add a "delete self.sub_ready_callbacks[obj._id]" to the self.subs
removed function) but this seemed less invasive.

The password and email tests use this to take a more rigorous approach to
waiting for the data to load, and they change the localStorage keys so that
multiple tabs running tests don't interact via localStorage.
efe4a70
@justinsb justinsb referenced this pull request from a commit
@justinsb justinsb Allow passing of arguments to configure-android
Args are passed through to the android tool

Useful commands:

configure-android avd
configure-android list avd
configure-android -- delete avd --name meteor

(Note the extra -- to delineate the args)

(Note #2: the delete avd command doesn't seem to actually work,
though this appears to be an Android bug - it says that is has
deleted it, but it is still there!)
452da56
@DenisGorbachev DenisGorbachev referenced this pull request from a commit in DenisGorbachev/meteor
@DenisGorbachev DenisGorbachev Default to error if it's actually a string #2 a2cf341
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Commits on Feb 29, 2012
  1. @debergalis

    Rework livedata (DDP) subscriptions.

    debergalis authored
    Publish now comes in three flavors:
    
    * Bare metal API: Meteor.publish(name, func).
    
      Server will call func(sub, params) each time a client subscribes to
      "name" (with supplied "params"), or if "name" is null, each time a new
      client connects (autopublish).  "sub" is a Subscription object, which
      supplies sub.set(), sub.unset(), sub.satisfies(), and sub.flush()
      methods to emit DDP data messages.  func() should register a cleanup
      function with sub.onStop(), which will be called when client unsubs.
    
      To react to database changes, use Collection.observe().  Publish
      functions that are not database backed may use some other mechanism
      (setInterval?) to schedule calls to sub.set().
    
    * publishCursor API:
    
      If func() *returns* a Cursor, server will automatically publish all
      results from that cursor as the collection with the same name as the
      cursor's underlying Mongo collection.  For example:
    
      Meteor.publish('top10', function (sub, params) {
        return Players.find({}, {sort: {score: -1}, limit: 10});
      });
    
      will define a 'top10' publish that is always the top 10 scoring
      players, published to the 'players' collection on each subscribing
      client.
    
    * autopublish
    
      When the autopublish package is loaded, all Collections defined on the
      server will automatically be published, in their entirety, to each
      connected client.  Clients need not call Meteor.subscribe().  Calls to
      publish() will emit a warning message, since they are superflous.  To
      disable autopublish, run "meteor remove autopublish".
  2. @debergalis

    Sync Minimongo observe API to server.

    debergalis authored
    Return old document in callbacks.  Add LiveResultsSet.collection to
    match server.
    
    Add observe() unit test suite.
  3. @debergalis

    Support field selection in server-side mongo.

    debergalis authored
    Can now publish subsets of documents with something like this:
    
    Meteor.publish('items', function (sub, params) {
      return Items.find({user_id: params.user_id},
                        {fields: {some_secret_field: 0}});
    });
    
    where items.some_secret_fields won't be included in the publish output.
    
    (fields option is ignored by minimongo.)
  4. @debergalis

    autopublish in leaderboard

    debergalis authored
  5. @debergalis

    Convert to new publish API.

    debergalis authored
  6. @debergalis

    Update documentation.

    debergalis authored
  7. @debergalis

    clarify comment

    debergalis authored
  8. @debergalis
This page is out of date. Refresh to see the latest.
View
32 docs/client/api.js
@@ -30,40 +30,20 @@ Template.api_box_args.pretty = function (fn) {
// Meteor boxes
-// XXX now supports sort, skip, limit. same deal as collection (can be
-// a function.) document this.
Template.api.publish = {
id: "publish",
- name: "Meteor.publish(name, [options])",
+ name: "Meteor.publish(name, handler)",
locus: "Server",
descr: [
- "Define a live dataset to which clients may subscribe. Every dataset has a name and can optionally take arguments.",
- "A client that connects to the dataset with `Meteor.subscribe|subscribe` will receive the published data, and will receive updates in realtime.",
+ "Define a live dataset to which clients may subscribe. If `name` is a String, a client can subscribe to the dataset with `Meteor.subscribe|subscribe`. If `name` is falsey, every client is automatically subscribed at connection time. The `handler` argument is a publish function, called at subscription or connection time, that is expected to send data events to the client.",
"Calls to this function are ignored on the client."],
args: [
{name: "name",
type: "String",
- descr: "The name that clients will use to subscribe to this query."}],
- options: [
- {name: "collection",
- type: "Collection",
- type_link: 'collection',
- descr: "The collection to publish. If not given, the default is the collection named `name` on disk in MongoDB."},
- {name: "selector",
- type: "Object — Mongo selector; or Function",
- type_link: "selectors",
- descr: "Filter for the records to publish. The default is `{}` (everything.). If a function, it receives the parameters passed by the client when making the subscription, and should return the selector to use."},
- {name: "sort",
- type: "Object — sort specifier; or Function",
- type_link: "sortspecifiers",
- descr: "Sort order. Like `selector`, can take a function, to be sensitive to the subscription parameters."},
- {name: "skip",
- type: "Number; or Function",
- descr: "Number of results to skip at the beginning. Like `selector`, can take a function, to be sensitive to the subscription parameters."},
- {name: "limit",
- type: "Number; or Function",
- descr: "Maximum number of results to return. Like `selector`, can take a function, to be sensitive to the subscription parameters."}
- ]
+ descr: "The name that clients will use to subscribe to this query."},
+ {name: "handler",
+ type: "Function",
+ descr: "The publish function that emits data messages."}]
};
Template.api.subscribe = {
View
2  examples/leaderboard-remote/client/leaderboard-remote.js
@@ -3,8 +3,6 @@ Leaderboard = Meteor.connect("http://leader2.meteor.com/sockjs");
// XXX I'd rather this be Leaderboard.Players.. can this API be easier?
Players = new Meteor.Collection("players", Leaderboard);
-Leaderboard.subscribe("top10");
-
Template.main.events = {
'keydown': function () {
Session.set("error", null);
View
1  examples/leaderboard/.meteor/packages
@@ -4,3 +4,4 @@
# but you can also edit it by hand.
jquery
+autopublish
View
8 examples/leaderboard/leaderboard.js
@@ -5,9 +5,6 @@ Players = new Meteor.Collection("players");
/*** Client ***/
if (Meteor.is_client) {
- // Get the top 10 players from the server, updated continuously.
- Meteor.subscribe("top10");
-
$(document).ready(function () {
// List the players by score. You can click to select a player.
var scores = Meteor.ui.renderList(Players.find({}, {sort: {score: -1}}), {
@@ -54,11 +51,6 @@ if (Meteor.is_client) {
// subdirectory named 'server'.
if (Meteor.is_server) {
- // Publish the top 10 players, live, to any client that wants them.
- Meteor.publish("top10", {collection: Players,
- sort: {score: -1},
- limit: 10});
-
// On server startup, create some players if the database is empty.
Meteor.startup(function () {
if (Players.find().count() === 0) {
View
20 examples/todos/model.js
@@ -2,20 +2,12 @@ Lists = new Meteor.Collection("lists");
Todos = new Meteor.Collection("todos");
-/* Schema support coming soon!
-
-Lists.schema({text: String});
-
-Todos.schema({text: String,
- done: Boolean,
- tags: [String]});
-*/
-
if (Meteor.is_server) {
- Meteor.publish('lists');
- Meteor.publish('todos', {
- selector: function (params) {
- return {list_id: params.list};
- }
+ Meteor.publish('lists', function (sub, params) {
+ return Lists.find();
+ });
+
+ Meteor.publish('todos', function (sub, params) {
+ return Todos.find({list_id: params.list});
});
}
View
403 packages/livedata/livedata_server.js
@@ -47,7 +47,7 @@ _.extend(Meteor._ServerMethodInvocation.prototype, {
_sendResponse: function (error, ret, from) {
var self = this;
- if (self._threw && from !== "throw")
+ if (self._threw && from !== "throw")
return;
if (self._responded) {
if (from === "throw")
@@ -113,22 +113,25 @@ _.extend(Meteor._ServerMethodInvocation.prototype, {
Meteor._LivedataServer = function () {
var self = this;
- self.publishes = {};
- self.universal_publishes = []; // publishes with no name
- self._hack_collections = {}; // XXX hack. name => Collection
+ self.publish_handlers = {};
+ self.universal_publish_handlers = [];
+
self.method_handlers = {};
- self.stream_server = new Meteor._StreamServer;
+
self.on_autopublish = []; // array of func if AP disabled, null if enabled
self.warned_about_autopublish = false;
+ self.stream_server = new Meteor._StreamServer;
+
self.stream_server.register(function (socket) {
socket.meteor = {};
- socket.meteor.subs = [];
- socket.meteor.cache = {};
- socket.meteor.pending_method_ids = [];
socket.meteor.methods_blocked = false;
socket.meteor.method_queue = [];
+ // Sub objects for active subscriptions
+ socket.meteor.named_subs = {};
+ socket.meteor.universal_subs = [];
+
socket.on('data', function (raw_msg) {
try {
try {
@@ -159,151 +162,173 @@ Meteor._LivedataServer = function () {
}
});
-
- // 5/sec updates tops, once every 10sec min.
- socket.meteor.throttled_poll = _.throttle(function () {
- self._poll_subscriptions(socket);
- }, 50); // XXX only 50ms! for great speed. might want higher in prod.
- socket.meteor.timer = setInterval(socket.meteor.throttled_poll, 10000);
+ socket.on('close', function () {
+ self._stopAllSubscriptions(socket);
+ });
});
};
-_.extend(Meteor._LivedataServer.prototype, {
- _poll_subscriptions: function (socket) {
- var self = this;
+// ctor for a sub handle: the input to each publish function
+Meteor._LivedataServer.Subscription = function (socket, sub_id) {
+ // transport. provides send(obj).
+ this.socket = socket;
- Fiber(function () {
- // holds a clean copy of client's data. channel.send will
- // populate new_cache, then we compute the difference with the old
- // cache, send the delta.
- var new_cache = {};
-
- // setup a channel object
- var channel = {
- // this gets called by publish lambda with each object. send
- // populates the server's copy of what the client has.
- send: function(collection_name, obj) {
- if (!(obj instanceof Array))
- obj = [obj];
-
- _.each(obj, function (o) {
- if (!o._id) {
- console.log("WARNING trying to send object without _id"); // XXX
- return;
- }
-
- // XXX -- '|' not allowed in collection name?
- var key = collection_name + "|" + o._id;
-
- // insert or extend new_cache with 'o' object
- new_cache[key] = _.extend(new_cache[key] || {}, o);
- });
- }
- };
+ // my subscription ID (generated by client, null for universal subs).
+ this.sub_id = sub_id;
- // actually run the subscriptions.
+ // unsent DDP messages.
+ this.pending_updates = {};
+ this.pending_complete = false;
- _.each(self.universal_publishes, function (pub) {
- pub(channel, {});
- });
+ // stop callbacks to g/c this sub. called w/ zero arguments.
+ this.stop_callbacks = [];
+};
- _.each(socket.meteor.subs, function (sub) {
- var pub = self.publishes[sub.name];
- if (!pub) {
- // XXX error unknown publish
- console.log("ERROR UNKNOWN PUBLISH " + sub.name);
- return;
- }
+Meteor._LivedataServer.Subscription.prototype.stop = function () {
+ for (var i = 0; i < this.stop_callbacks.length; i++)
+ (this.stop_callbacks[i])();
+};
- pub(channel, sub.params);
- });
+Meteor._LivedataServer.Subscription.prototype.onStop = function (callback) {
+ this.stop_callbacks.push(callback);
+};
+Meteor._LivedataServer.Subscription.prototype._ensureMsg = function (collection_name, id) {
+ var self = this;
+ if (!self.pending_updates[collection_name])
+ self.pending_updates[collection_name] = {};
+ if (!self.pending_updates[collection_name][id])
+ self.pending_updates[collection_name][id] = {msg: 'data', collection: collection_name, id: id};
+ return self.pending_updates[collection_name][id];
+};
- // emit deltas for each item in the new cache (any object
- // created in this poll cycle).
- _.each(new_cache, function (new_obj, key) {
- var old_obj = socket.meteor.cache[key];
-
- // XXX parsing from the string is so ugly.
- var parts = key.split("|");
- if (!parts || parts.length !== 2) return;
- var collection_name = parts[0];
- var id = parts[1];
-
- var msg = {msg: 'data', collection: collection_name, id: id};
-
- if (!old_obj) {
- // New object. Send an insert down to the client.
- var obj_to_send = _.extend({}, new_obj);
- delete obj_to_send._id;
- if (_.keys(obj_to_send).length) {
- msg.set = obj_to_send;
- socket.send(JSON.stringify(msg));
- }
-
- } else {
- // Old object. Check for updates and send changes attributes
- // to the client.
- var set = {};
- var unset = [];
-
- _.each(new_obj, function (v, k) {
- // Not canonical order comparison or anything, but close
- // enough I hope. We may send some spurious updates?
- if (JSON.stringify(v) !== JSON.stringify(old_obj[k]))
- set[k] = v;
- });
-
- unset = _.difference(_.keys(old_obj), _.keys(new_obj));
-
- if (_.keys(set).length > 0)
- msg.set = set;
- if (unset.length > 0)
- msg.unset = unset;
-
- if (msg.set || msg.unset)
- socket.send(JSON.stringify(msg));
- }
- });
+Meteor._LivedataServer.Subscription.prototype.set = function (collection_name, id, dictionary) {
+ var self = this;
+ var obj = _.extend({}, dictionary);
+ delete obj._id;
+ var msg = self._ensureMsg(collection_name, id);
+ msg.set = _.extend(msg.set || {}, obj);
+
+ if (msg.unset) {
+ msg.unset = _.difference(msg.unset, _.keys(msg.set));
+ if (!msg.unset.length)
+ delete msg.unset;
+ }
+};
- // emit deltas for items in the old cache that no longer exist.
- var removed_keys = _.difference(_.keys(socket.meteor.cache),
- _.keys(new_cache));
- _.each(removed_keys, function (key) {
- // XXX parsing from the string is so ugly.
- var parts = key.split("|");
- if (!parts || parts.length !== 2) return;
- var collection_name = parts[0];
- var id = parts[1];
-
- var msg = {msg: 'data', collection: collection_name, id: id};
- msg.unset = _.without(_.keys(socket.meteor.cache[key]), '_id');
- socket.send(JSON.stringify(msg));
- });
+Meteor._LivedataServer.Subscription.prototype.unset = function (collection_name, id, keys) {
+ var self = this;
+ keys = _.without(keys, '_id');
+ var msg = self._ensureMsg(collection_name, id);
+ msg.unset = _.union(msg.unset || [], keys);
+
+ if (msg.set) {
+ for (var key in keys)
+ delete msg.set[key];
+ if (!_.keys(msg.set))
+ delete msg.set;
+ }
+};
+
+Meteor._LivedataServer.Subscription.prototype.complete = function () {
+ var self = this;
- // promote new_cache to old_cache
- socket.meteor.cache = new_cache;
+ // universal subs (sub_id is null) can't signal completion. it's
+ // not an error, since the same handler (eg publishQuery) might be used
+ // to implement both named and universal subs.
- // inform the client that the subscription is ready to go
- var subs_ready = [];
- _.each(socket.meteor.subs, function (sub) {
- if (!sub.ready) {
- subs_ready.push(sub._id);
- sub.ready = true;
- }
+ if (self.sub_id)
+ self.pending_complete = true;
+};
+
+Meteor._LivedataServer.Subscription.prototype.flush = function () {
+ var self = this;
+ var msg;
+
+ for (var name in self.pending_updates)
+ for (var id in self.pending_updates[name])
+ self.socket.send(JSON.stringify(self.pending_updates[name][id]));
+
+ if (self.pending_complete)
+ self.socket.send(JSON.stringify({msg: 'data', subs: [self.sub_id]}));
+
+ self.pending_updates = {};
+ self.pending_complete = false;
+};
+
+Meteor._LivedataServer.Subscription.prototype._publishCursor = function (cursor, name) {
+ var self = this;
+ var collection = name || cursor.collection_name;
+
+ var observe_handle = cursor.observe({
+ added: function (obj) {
+ self.set(collection, obj._id, obj);
+ self.flush();
+ },
+ changed: function (obj, old_idx, old_obj) {
+ var set = {};
+ _.each(obj, function (v, k) {
+ if (!_.isEqual(v, old_obj[k]))
+ set[k] = v;
});
+ self.set(collection, obj._id, set);
+ var dead_keys = _.difference(_.keys(old_obj), _.keys(obj));
+ self.unset(collection, obj._id, dead_keys);
+ self.flush();
+ },
+ removed: function (id, old_idx, old_obj) {
+ self.unset(collection, id, _.keys(old_obj));
+ self.flush();
+ }
+ });
- if (subs_ready.length || socket.meteor.pending_method_ids.length) {
- var msg = {msg: 'data'};
- if (subs_ready.length)
- msg.subs = subs_ready;
- if (socket.meteor.pending_method_ids.length)
- msg.methods = socket.meteor.pending_method_ids;
- socket.send(JSON.stringify(msg));
- }
- socket.meteor.pending_method_ids = [];
+ // observe only returns after the initial added callbacks have
+ // run. mark subscription as completed.
+ self.complete();
+ self.flush();
- }).run();
+ // register stop callback (expects lambda w/ no args).
+ self.onStop(_.bind(observe_handle.stop, observe_handle));
+};
+
+_.extend(Meteor._LivedataServer.prototype, {
+ _startSubscription: function (socket, handler, sub_id, params) {
+ var self = this;
+
+ var sub = new Meteor._LivedataServer.Subscription(socket, sub_id);
+ if (sub_id)
+ socket.meteor.named_subs[sub_id] = sub;
+ else
+ socket.meteor.universal_subs.push(sub);
+
+ var res = handler(sub, params);
+
+ // automatically wire up handlers that return a Cursor.
+ // otherwise, the handler is completely responsible for delivering
+ // its own data messages and registering stop functions.
+ if (res instanceof _Mongo.Cursor) // XXX generalize
+ sub._publishCursor(res);
+ },
+
+ // tear down specified subscription
+ _stopSubscription: function (socket, sub_id) {
+ if (sub_id && socket.meteor.named_subs[sub_id]) {
+ socket.meteor.named_subs[sub_id].stop();
+ delete socket.meteor.named_subs[sub_id];
+ }
+ },
+
+ // tear down all subscriptions
+ _stopAllSubscriptions: function (socket) {
+ _.each(socket.meteor.named_subs, function (sub, id) {
+ sub.stop();
+ });
+ socket.meteor.named_subs = {};
+
+ _.each(socket.meteor.universal_subs, function (sub) {
+ sub.stop();
+ });
+ socket.meteor.universal_subs = [];
},
// XXX 'connect' message should have a protocol version
@@ -311,8 +336,15 @@ _.extend(Meteor._LivedataServer.prototype, {
var self = this;
// Always start a new session. We don't support any reconnection.
socket.send(JSON.stringify({msg: 'connected', session: Meteor.uuid()}));
- // Run any universal publishes we may have.
- self._poll_subscriptions(socket);
+
+ // Spin up all the universal publishers.
+ Fiber(function () {
+ _.each(self.universal_publish_handlers, function (handler) {
+ self._startSubscription(socket, handler);
+ });
+ }).run();
+
+ // XXX what to do here on reconnect? oh, probably just fake a sub message.
},
_livedata_sub: function (socket, msg) {
@@ -328,25 +360,33 @@ _.extend(Meteor._LivedataServer.prototype, {
return;
}
- if (!self.publishes[msg.name]) {
+ if (!self.publish_handlers[msg.name]) {
socket.send(JSON.stringify({
msg: 'nosub', id: msg.id, error: {error: 404,
reason: "Subscription not found"}}));
return;
}
- socket.meteor.subs.push({_id: msg.id, name: msg.name,
- params: msg.params || {}});
- self._poll_subscriptions(socket);
+ Fiber(function () {
+ if (msg.id in socket.meteor.named_subs)
+ // XXX client screwed up
+ self._stopSubscription(socket, msg.id);
+
+ var handler = self.publish_handlers[msg.name];
+ self._startSubscription(socket, handler, msg.id, msg.params);
+ }).run();
},
+ // XXX Fiber() doesn't interlock. if a client subs then unsubs, the
+ // subscription should end up as off.
_livedata_unsub: function (socket, msg) {
var self = this;
+
+ Fiber(function () {
+ self._stopSubscription(socket, msg.id);
+ }).run();
+
socket.send(JSON.stringify({msg: 'nosub', id: msg.id}));
- socket.meteor.subs = _.filter(socket.meteor.subs, function (x) {
- return x._id !== msg.id;
- });
- self._poll_subscriptions(socket);
},
_livedata_method: function (socket, msg) {
@@ -410,15 +450,11 @@ _.extend(Meteor._LivedataServer.prototype, {
socket.send(JSON.stringify({
msg: 'result', id: msg.id, result: result}));
- // after the method, rerun all the subscriptions as stuff may
- // have changed.
- // XXX going away in merge very soon
- socket.meteor.pending_method_ids.push(msg.id);
- _.each(self.stream_server.all_sockets(), function(x) {
- if (x && x.meteor)
- x.meteor.throttled_poll();
- });
-
+ // the method is satisfied once callback is called, because
+ // any DB observe callbacks run to completion in the same
+ // tick.
+ socket.send(JSON.stringify({
+ msg: 'data', methods: [msg.id]}));
};
var invocation = new Meteor._ServerMethodInvocation(msg.method, handler);
@@ -427,43 +463,44 @@ _.extend(Meteor._LivedataServer.prototype, {
} catch (e) {
// _run will have already logged the exception (and told the
// client, if appropriate)
+ socket.send(JSON.stringify({
+ msg: 'data', methods: [msg.id]}));
}
}).run();
},
/**
- * Defines a live dataset that clients can subscribe to.
+ * Register a publish handler function.
*
* @param name {String} identifier for query
+ * @param handler {Function} publish handler
* @param options {Object}
*
+ * Server will call handler function on each new subscription,
+ * either when receiving DDP sub message for a named subscription, or on
+ * DDP connect for a universal subscription.
+ *
* If name is null, this will be a subscription that is
* automatically established and permanently on for all connected
* client, instead of a subscription that can be turned on and off
* with subscribe().
*
* options to contain:
- * - collection {Collection} collection; defaults to the collection
- * named 'name' on disk in mongodb
- * - selector {Function<args> OR Object} either a mongodb selector,
- * or a function that takes the argument object passed to
- * Meteor.subscribe and returns a mongodb selector. default {}
* - (mostly internal) is_auto: true if generated automatically
* from an autopublish hook. this is for cosmetic purposes only
* (it lets us determine whether to print a warning suggesting
* that you turn off autopublish.)
*/
- publish: function (name, options) {
+ publish: function (name, handler, options) {
var self = this;
- if (name && name in self.publishes) {
- // XXX error duplicate publish
- console.log("ERROR DUPLICATE PUBLISH " + name);
+ options = options || {};
+
+ if (name && name in self.publish_handlers) {
+ Meteor._debug("Ignoring duplicate publish named '" + name + "'");
return;
}
- options = options || {};
-
if (!self.on_autopublish && !options.is_auto) {
// They have autopublish on, yet they're trying to manually
// picking stuff to publish. They probably should turn off
@@ -489,36 +526,10 @@ _.extend(Meteor._LivedataServer.prototype, {
}
}
- var collection = options.collection ||
- (name && self._hack_collections[name]);
- if (!collection) {
- if (name)
- throw new Error("No collection '" + name + "' found to publish. " +
- "You can specify the collection explicitly with the " +
- "'collection' option.");
- else
- throw new Error("When creating universal publishes, you must specify " +
- "the collection explicitly with the 'collection' " +
- "option.");
- }
- var selector = options.selector || {};
- var func = function (channel, params) {
- var opt = function (key, or) {
- var x = options[key] || or;
- return (x instanceof Function) ? x(params) : x;
- };
-
- channel.send(collection._name, collection.find(opt("selector", {}), {
- sort: opt("sort"),
- skip: opt("skip"),
- limit: opt("limit")
- }).fetch());
- };
-
if (name)
- self.publishes[name] = func;
+ self.publish_handlers[name] = handler;
else
- self.universal_publishes.push(func);
+ self.universal_publish_handlers.push(handler);
},
methods: function (methods) {
View
7 packages/livedata/livedata_test_service.js
@@ -26,11 +26,8 @@ Meteor.startup(function () {
});
if (Meteor.is_server)
- Meteor.publish('ledger', {
- collection: Ledger,
- selector: function (params) {
- return {world: params.world};
- }
+ Meteor.publish('ledger', function (sub, params) {
+ return Ledger.find({world: params.world});
});
App.methods({
View
15 packages/livedata/livedata_tests.js
@@ -1,16 +1,3 @@
-test("livedata - basics", function () {
- // Very basic test. Just see that it runs.
-
- var coll = new Meteor.Collection("testing" + LocalCollection.uuid());
-
- coll.remove({foo: 'bar'});
- assert.length(coll.find({foo: 'bar'}).fetch(), 0);
- coll.insert({foo: 'bar'});
- assert.length(coll.find({foo: 'bar'}).fetch(), 1);
-});
-
-/******************************************************************************/
-
// XXX should probably move this into a testing helpers package so it
// can be used by other tests
@@ -257,4 +244,4 @@ testAsyncMulti("livedata - compound methods", [
// method completion/satisfaction
// subscriptions (multiple APIs, including autosubscribe?)
// subscription completion
-// [probably lots more]
+// [probably lots more]
View
17 packages/minimongo/minimongo.js
@@ -144,14 +144,12 @@ LocalCollection.LiveResultsSet = function () {};
// options to contain:
// * callbacks:
// - added (object, before_index)
-// - changed (new_object, at_index)
+// - changed (new_object, at_index, old_object)
// - moved (object, old_index, new_index) - can only fire with changed()
-// - removed (id, at_index)
-// * sort: sort descriptor
+// - removed (id, at_index, object)
//
// attributes available on returned query handle:
// * stop(): end updates
-// * indexOf(id): return current index of object in result set, or -1
// * collection: the collection this query is querying
//
// iff x is a returned query handle, (x instanceof
@@ -163,7 +161,6 @@ LocalCollection.LiveResultsSet = function () {};
// XXX maybe support limit/skip
// XXX it'd be helpful if removed got the object that just left the
// query, not just its id
-// XXX document that initial results will definitely be delivered before we return [do, add to asana]
LocalCollection.Cursor.prototype.observe = function (options) {
var self = this;
@@ -337,6 +334,8 @@ LocalCollection.prototype._modifyAndNotify = function (doc, mod) {
for (var qid in self.queries)
matched_before[qid] = self.queries[qid].selector_f(doc);
+ var old_doc = LocalCollection._deepcopy(doc);
+
LocalCollection._modify(doc, mod);
for (var qid in self.queries) {
@@ -348,7 +347,7 @@ LocalCollection.prototype._modifyAndNotify = function (doc, mod) {
else if (!before && after)
LocalCollection._insertInResults(query, doc);
else if (before && after)
- LocalCollection._updateInResults(query, doc);
+ LocalCollection._updateInResults(query, doc, old_doc);
}
};
@@ -386,13 +385,13 @@ LocalCollection._insertInResults = function (query, doc) {
LocalCollection._removeFromResults = function (query, doc) {
var i = LocalCollection._findInResults(query, doc);
- query.removed(doc._id, i);
+ query.removed(doc._id, i, doc);
query.results.splice(i, 1);
};
-LocalCollection._updateInResults = function (query, doc) {
+LocalCollection._updateInResults = function (query, doc, old_doc) {
var orig_idx = LocalCollection._findInResults(query, doc);
- query.changed(LocalCollection._deepcopy(doc), orig_idx);
+ query.changed(LocalCollection._deepcopy(doc), orig_idx, old_doc);
if (!query.sort_f)
return;
View
63 packages/minimongo/minimongo_tests.js
@@ -825,6 +825,65 @@ test("minimongo - modify", function () {
// XXX test update() (selecting docs, multi, upsert..)
test("minimongo - observe", function () {
- // XXX needs tests!
- // don't forget tests for stop
+ var operations = [];
+ var cbs = {
+ added: function (obj, idx) {
+ delete obj._id;
+ operations.push(LocalCollection._deepcopy(['added', obj, idx]));
+ },
+ changed: function (obj, at, old_obj) {
+ delete obj._id;
+ delete old_obj._id;
+ operations.push(LocalCollection._deepcopy(['changed', obj, at, old_obj]));
+ },
+ moved: function (obj, old_at, new_at) {
+ delete obj._id;
+ operations.push(LocalCollection._deepcopy(['moved', obj, old_at, new_at]));
+ },
+ removed: function (id, at, old_obj) {
+ delete old_obj._id;
+ operations.push(LocalCollection._deepcopy(['removed', id, at, old_obj]));
+ }
+ };
+ var handle;
+
+ var c = new LocalCollection();
+ handle = c.find({}, {sort: {a: 1}}).observe(cbs);
+ assert.isTrue(handle.collection === c);
+
+ c.insert({a:1});
+ assert.equal(operations.shift(), ['added', {a:1}, 0]);
+ c.update({a:1}, {$set: {a: 2}});
+ assert.equal(operations.shift(), ['changed', {a:2}, 0, {a:1}]);
+ c.insert({a:10});
+ assert.equal(operations.shift(), ['added', {a:10}, 1]);
+ c.update({}, {$inc: {a: 1}}, {multi: true});
+ assert.equal(operations.shift(), ['changed', {a:3}, 0, {a:2}]);
+ assert.equal(operations.shift(), ['changed', {a:11}, 1, {a:10}]);
+ c.update({a:11}, {a:1});
+ assert.equal(operations.shift(), ['changed', {a:1}, 1, {a:11}]);
+ assert.equal(operations.shift(), ['moved', {a:1}, 1, 0]);
+ c.remove({a:2});
+ assert.equal(operations.shift(), undefined);
+ var id = c.findOne({a:3})._id;
+ c.remove({a:3});
+ assert.equal(operations.shift(), ['removed', id, 1, {a:3}]);
+
+ // test stop
+ handle.stop();
+ c.insert({a:2});
+ assert.equal(operations.shift(), undefined);
+
+ // test initial inserts (and backwards sort)
+ handle = c.find({}, {sort: {a: -1}}).observe(cbs);
+ assert.equal(operations.shift(), ['added', {a:2}, 0]);
+ assert.equal(operations.shift(), ['added', {a:1}, 1]);
+ handle.stop();
+
+ // test _suppress_initial
+ handle = c.find({}, {sort: {a: -1}}).observe(_.extend(cbs, {_suppress_initial: true}));
+ assert.equal(operations.shift(), undefined);
+ c.insert({a:100});
+ assert.equal(operations.shift(), ['added', {a:100}, 0]);
+ handle.stop();
});
View
10 packages/mongo-livedata/collection.js
@@ -104,17 +104,11 @@ Meteor.Collection = function (name, manager, driver) {
manager.methods(m);
}
- // XXX temporary hack to provide sugar in LivedataServer.publish()
- if (name && manager && manager._hack_collections) {
- if (name in manager._hack_collections)
- throw new Error("There is already a collection named '" + name + "'");
- manager._hack_collections[name] = self;
- }
-
// autopublish
if (manager && manager.onAutopublish)
manager.onAutopublish(function () {
- manager.publish(null, {collection: self, is_auto: true});
+ var handler = function (sub, params) { return self.find(); };
+ manager.publish(null, handler, {is_auto: true});
});
};
View
143 packages/mongo-livedata/mongo_driver.js
@@ -16,6 +16,10 @@ Future.prototype.ret = Future.prototype.return;
_Mongo = function (url) {
var self = this;
+ // holds active observes
+ self.observers = {};
+ self.next_observer_id = 1;
+
self.collection_queue = [];
MongoDB.connect(url, function(err, db) {
@@ -27,6 +31,14 @@ _Mongo = function (url) {
db.collection(c.name, c.callback);
}
});
+
+ // refresh all outstanding observers every 10 seconds. they are
+ // also triggered on DB updates.
+ setInterval(function () {
+ Fiber(function () {
+ self._pollObservers.call(self);
+ }).run();
+ }, 10000);
};
// protect against dangerous selectors. falsey and {_id: falsey}
@@ -56,6 +68,19 @@ _Mongo.prototype._withCollection = function(collection_name, callback) {
}
};
+// poke observers watching the given collection name, or all observers
+// if no collection name provided.
+_Mongo.prototype._pollObservers = function (collection_name) {
+ var self = this;
+
+ for (var id in self.observers) {
+ var o = self.observers[id];
+ if (!collection_name || o.collection_name === collection_name) {
+ o._poll();
+ }
+ }
+};
+
//////////// Public API //////////
_Mongo.prototype.insert = function (collection_name, document) {
@@ -70,7 +95,10 @@ _Mongo.prototype.insert = function (collection_name, document) {
// XXX err handling
collection.insert(document, {safe: true}, function(err) {
// XXX err handling
- future.ret();
+ Fiber(function () {
+ self._pollObservers(collection_name);
+ future.ret();
+ }).run();
});
});
@@ -93,7 +121,10 @@ _Mongo.prototype.remove = function (collection_name, selector) {
// XXX err handling
collection.remove(selector, {safe:true}, function(err) {
// XXX err handling
- future.ret();
+ Fiber(function () {
+ self._pollObservers(collection_name);
+ future.ret();
+ }).run();
});
});
@@ -121,7 +152,10 @@ _Mongo.prototype.update = function (collection_name, selector, mod, options) {
collection.update(selector, mod, opts, function(err) {
// XXX err handling
- future.ret();
+ Fiber(function () {
+ self._pollObservers(collection_name);
+ future.ret();
+ }).run();
});
});
@@ -160,16 +194,7 @@ _Mongo.Cursor = function (mongo, collection_name, selector, options) {
self.mongo._withCollection(collection_name, function(err, collection) {
// XXX err handling
-
- var cursor = collection.find(self.selector);
- // XXX is there a way to do this as for x in ['sort', 'limit', 'skip']?
- if (self.options.sort)
- cursor = cursor.sort(self.options.sort);
- if (self.options.limit)
- cursor = cursor.limit(self.options.limit);
- if (self.options.skip)
- cursor = cursor.skip(self.options.skip);
-
+ var cursor = collection.find(self.selector, self.options.fields, self.options.skip, self.options.limit, self.options.sort);
future.ret(cursor);
});
@@ -224,9 +249,99 @@ _Mongo.Cursor.prototype.count = function () {
future.ret(err || res);
});
+ return future.wait();
+};
+// options to contain:
+// * callbacks:
+// - added (object, before_index)
+// - changed (new_object, at_index)
+// - moved (object, old_index, new_index) - can only fire with changed()
+// - removed (id, at_index)
+//
+// attributes available on returned LiveResultsSet
+// * stop(): end updates
+
+_Mongo.Cursor.prototype.observe = function (options) {
+ return new _Mongo.LiveResultsSet(this, options);
+};
- return future.wait();
+_Mongo.LiveResultsSet = function (cursor, options) {
+ // copy my cursor, so that the observe can run independently from
+ // some other use of the cursor.
+ this.cursor = new _Mongo.Cursor(cursor.mongo,
+ cursor.collection_name,
+ cursor.selector,
+ cursor.options);
+
+ // expose collection name
+ this.collection_name = cursor.collection_name;
+
+ // unique handle for this live query
+ this.qid = this.cursor.mongo.next_observer_id++;
+
+ // previous results snapshot. on each poll cycle, diffs against
+ // results drives the callbacks.
+ this.results = {};
+ this.indexes = {};
+
+ this.added = options.added;
+ this.changed = options.changed;
+ this.moved = options.moved;
+ this.removed = options.removed;
+
+ // trigger the first _poll() cycle immediately.
+ this._poll();
+
+ // register myself with the mongo driver
+ this.cursor.mongo.observers[this.qid] = this;
+};
+
+_Mongo.LiveResultsSet.prototype._fetchResults = function (results, indexes) {
+ var self = this;
+ var index = 0;
+
+ self.cursor.rewind();
+ self.cursor.forEach(function (obj) {
+ results[obj._id] = obj;
+ indexes[obj._id] = index++;
+ });
+};
+
+_Mongo.LiveResultsSet.prototype._poll = function () {
+ var self = this;
+
+ var old_results = self.results;
+ var old_indexes = self.indexes;
+ var new_results = {};
+ var new_indexes = {};
+
+ var callbacks = [];
+
+ self._fetchResults(new_results, new_indexes);
+
+ _.each(new_results, function (obj) {
+ if (self.added && !old_results[obj._id])
+ self.added(obj, new_indexes[obj._id]);
+
+ else if (self.changed && !_.isEqual(new_results[obj._id], old_results[obj._id]))
+ self.changed(obj, old_indexes[obj._id], old_results[obj._id]);
+
+ if (self.moved && new_indexes[obj._id] !== old_indexes[obj._id])
+ self.moved(obj, old_indexes[obj._id], new_indexes[obj._id]);
+ });
+
+ for (var id in old_results)
+ if (self.removed && !(id in new_results))
+ self.removed(id, old_indexes[id], old_results[id]);
+
+ self.results = new_results;
+ self.indexes = new_indexes;
+};
+
+_Mongo.LiveResultsSet.prototype.stop = function () {
+ var self = this;
+ delete self.cursor.mongo.observers[self.qid];
};
_.extend(Meteor, {
View
5 packages/tinytest/tinytest_server.js
@@ -2,9 +2,8 @@ Meteor.startup(function () {
Meteor._ServerTestResults.remove();
});
-App.publish('tinytest/results', {
- collection: Meteor._ServerTestResults,
- selector: function (params) { return {run_id: params.run_id} }
+App.publish('tinytest/results', function (sub, params) {
+ return Meteor._ServerTestResults.find({run_id: params.run_id});
});
App.methods({
Something went wrong with that request. Please try again.