Browse files

Change "wait" methods to do a full quiesce: ie, they do not stream data.

Rationale: "wait" methods are used for login/logout, which tend to make drastic
changes that should take effect atomically rather than streamy. (Without this,
logging out with accounts-ui saw the username disappear before the buttons
switched from "sign out" to "sign in", eg in single-button mode. Now it goes
directly from "signed in with username visible" to "signed out".)
  • Loading branch information...
1 parent 183653e commit 48a74d70a8552d7bd0019e393ede4f6b46601be8 @glasser glasser committed Nov 15, 2012
Showing with 65 additions and 27 deletions.
  1. +43 −26 packages/livedata/livedata_connection.js
  2. +22 −1 packages/livedata/livedata_connection_tests.js
View
69 packages/livedata/livedata_connection.js
@@ -109,18 +109,24 @@ Meteor._LivedataConnection = function (url, options) {
// visible.
self._afterUpdateCallbacks = [];
- // When we reconnect, we don't apply any incoming data messages to stores
- // until all subs that had been ready before reconnect are ready again, and
- // all methods that are active have returned their "data done message"; then
- // all data messages are processed in one update. The following fields are
- // used for this "reconnect quiescence" process.
+ // In two contexts, we buffer all incoming data messages and then process them
+ // all at once in a single update:
+ // - During reconnect, we buffer all data messages until all subs that had
+ // been ready before reconnect are ready again, and all methods that are
+ // active have returned their "data done message"; then
+ // - During the execution of a "wait" method, we buffer all data messages
+ // until the wait method gets its "data done" message. (If the wait method
+ // occurs during reconnect, it doesn't get any special handling.)
+ // all data messages are processed in one update.
//
- // This buffers the messages that aren't being processed because not all subs
- // and methods are ready.
- self._bufferedMessagesAtReconnect = [];
- // map from method ID -> true for methods active at reconnect time that
- // haven't sent their "data done" message yet
- self._reconnectMethods = {}; // map from method_id -> true
+ // The following fields are used for this "quiescence" process.
+
+ // This buffers the messages that aren't being processed yet.
+ self._messagesBufferedUntilQuiescence = [];
+ // Map from method ID -> true. Methods are removed from this when their
+ // "data done" message is received, and we will not quiesce until it is
+ // empty.
+ self._methodsBlockingQuiescence = {};
// map from sub ID -> true for subs that were ready (ie, called the sub
// ready callback) before reconnect but haven't become ready again yet
self._subsBeingRevived = {}; // map from sub._id -> true
@@ -267,6 +273,7 @@ var MethodInvoker = function (options) {
self._connection = options.connection;
self._message = JSON.stringify(options.message);
self._onResultReceived = options.onResultReceived || function () {};
+ self._wait = options.wait;
self._methodResult = null;
self._dataVisible = false;
@@ -290,6 +297,11 @@ _.extend(MethodInvoker.prototype, {
self.sentMessage = true;
+ // If this is a wait method, make all data messages be buffered until it is
+ // done.
+ if (self._wait)
+ self._connection._methodsBlockingQuiescence[self.methodId] = true;
+
// Actually send the message.
self._connection._stream.send(self._message);
},
@@ -577,6 +589,7 @@ _.extend(Meteor._LivedataConnection.prototype, {
callback: callback,
connection: self,
onResultReceived: options.onResultReceived,
+ wait: !!options.wait,
message: {
msg: 'method',
method: name,
@@ -685,11 +698,12 @@ _.extend(Meteor._LivedataConnection.prototype, {
},
// Returns true if we are in a state after reconnect of waiting for subs to be
- // revived or early methods to finish their data.
- _waitingForReconnectQuiescence: function () {
+ // revived or early methods to finish their data, or we are waiting for a
+ // "wait" method to finish.
+ _waitingForQuiescence: function () {
var self = this;
return (! _.isEmpty(self._subsBeingRevived) ||
- ! _.isEmpty(self._reconnectMethods));
+ ! _.isEmpty(self._methodsBlockingQuiescence));
},
// Returns true if any method whose message has been sent to the server has
@@ -747,7 +761,11 @@ _.extend(Meteor._LivedataConnection.prototype, {
// Arrange for "half-finished" methods to have their callbacks run, and
// track methods that were sent on this connection so that we don't
// quiesce until they are all done.
- self._reconnectMethods = {};
+ //
+ // Start by clearing _methodsBlockingQuiescence: methods sent before
+ // reconnect don't matter, and any "wait" methods sent on the new connection
+ // that we drop here will be restored by the loop below.
+ self._methodsBlockingQuiescence = {};
if (self._resetStores) {
_.each(self._methodInvokers, function (invoker) {
if (invoker.gotResult()) {
@@ -766,16 +784,16 @@ _.extend(Meteor._LivedataConnection.prototype, {
// reconnect quiescence. (eg, it might be a login method that was run
// from onReconnect, and we don't want to see flicker by seeing a
// logged-out state.)
- self._reconnectMethods[invoker.methodId] = true;
+ self._methodsBlockingQuiescence[invoker.methodId] = true;
}
});
}
- self._bufferedMessagesAtReconnect = [];
+ self._messagesBufferedUntilQuiescence = [];
// If we're not waiting on any methods or subs, we can reset the stores and
// call the callbacks immediately.
- if (!self._waitingForReconnectQuiescence()) {
+ if (!self._waitingForQuiescence()) {
if (self._resetStores) {
_.each(self._stores, function (s) {
s.beginUpdate(0, true);
@@ -793,26 +811,25 @@ _.extend(Meteor._LivedataConnection.prototype, {
// collection name -> array of messages
var updates = {};
- if (self._waitingForReconnectQuiescence()) {
- self._bufferedMessagesAtReconnect.push(msg);
+ if (self._waitingForQuiescence()) {
+ self._messagesBufferedUntilQuiescence.push(msg);
_.each(msg.subs || [], function (subId) {
delete self._subsBeingRevived[subId];
});
_.each(msg.methods || [], function (methodId) {
- delete self._reconnectMethods[methodId];
+ delete self._methodsBlockingQuiescence[methodId];
});
- if (self._waitingForReconnectQuiescence())
+ if (self._waitingForQuiescence())
return;
- // All subscriptions that were ready before reconnect are now ready again,
- // and all active methods at reconnect time have their data written!
+ // No methods or subs are blocking quiescence!
// We'll now process and all of our buffered messages, reset all stores,
// and apply them all at once.
- _.each(self._bufferedMessagesAtReconnect, function (bufferedMsg) {
+ _.each(self._messagesBufferedUntilQuiescence, function (bufferedMsg) {
self._processOneDataMessage(bufferedMsg, updates);
});
- self._bufferedMessagesAtReconnect = [];
+ self._messagesBufferedUntilQuiescence = [];
} else {
self._processOneDataMessage(msg, updates);
}
View
23 packages/livedata/livedata_connection_tests.js
@@ -854,6 +854,9 @@ Tinytest.add("livedata connection - two wait methods", function (test) {
var conn = newConnection(stream);
startAndConnect(test, stream);
+ var collName = Meteor.uuid();
+ var coll = new Meteor.Collection(collName, {manager: conn});
+
// setup method
conn.methods({do_something: function (x) {}});
@@ -885,6 +888,14 @@ Tinytest.add("livedata connection - two wait methods", function (test) {
// 'one!'.
test.equal(stream.sent.length, 0);
+ // Receive some data. "one" is not a wait method and there are no stubs, so it
+ // gets applied immediately.
+ test.equal(coll.find().count(), 0);
+ stream.receive({msg: 'data', collection: collName,
+ id: 'foo', set: {x: 1}});
+ test.equal(coll.find().count(), 1);
+ test.equal(coll.findOne('foo'), {_id: 'foo', x: 1});
+
// Let "one!" finish. Both messages are required to fire the callback.
stream.receive({msg: 'result', id: one_message.id});
test.equal(responses, []);
@@ -898,10 +909,20 @@ Tinytest.add("livedata connection - two wait methods", function (test) {
// But still haven't sent "three!".
test.equal(stream.sent.length, 0);
+ // Receive more data. "two" is a wait method, so the data doesn't get applied
+ // yet.
+ stream.receive({msg: 'data', collection: collName,
+ id: 'foo', set: {y: 3}});
+ test.equal(coll.find().count(), 1);
+ test.equal(coll.findOne('foo'), {_id: 'foo', x: 1});
+
// Let "two!" finish, with its end messages in the opposite order to "one!".
stream.receive({msg: 'data', methods: [two_message.id]});
test.equal(responses, ['one']);
test.equal(stream.sent.length, 0);
+ // data-done message is enough to allow data to be written.
+ test.equal(coll.find().count(), 1);
+ test.equal(coll.findOne('foo'), {_id: 'foo', x: 1, y: 3});
stream.receive({msg: 'result', id: two_message.id});
test.equal(responses, ['one', 'two']);
@@ -1017,7 +1038,7 @@ Tinytest.add("livedata connection - onReconnect prepends messages correctly with
return JSON.parse(msg).params[0];
}), ['reconnect one', 'reconnect two', 'reconnect three', 'one']);
- // black-box test:
+ // white-box test:
test.equal(_.map(conn._outstandingMethodBlocks, function (block) {
return [block.wait, _.map(block.methods, function (method) {
return JSON.parse(method._message).params[0];

0 comments on commit 48a74d7

Please sign in to comment.