Skip to content

Commit

Permalink
Merge limit/skip support in observe into devel. Fixes #528.
Browse files Browse the repository at this point in the history
  • Loading branch information
n1mmy committed Dec 17, 2012
2 parents f68a8a3 + e32502c commit c10dd19
Show file tree
Hide file tree
Showing 2 changed files with 231 additions and 132 deletions.
102 changes: 84 additions & 18 deletions packages/minimongo/minimongo.js
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,9 @@ LocalCollection.Cursor = function (collection, selector, options) {
} else {
this.selector_f = LocalCollection._compileSelector(selector);
this.sort_f = options.sort ? LocalCollection._compileSort(options.sort) : null;
this.skip = options.skip;
this.limit = options.limit;
}
this.skip = options.skip;
this.limit = options.limit;

// db_objects is a list of the objects that match the cursor. (It's always a
// list, never an object: LocalCollection.Cursor is always ordered.)
Expand All @@ -94,10 +94,17 @@ LocalCollection.prototype.findOne = function (selector, options) {
if (arguments.length === 0)
selector = {};

// XXX disable limit here so that we can observe findOne() cursor,
// as required by markAsReactive.
// options = options || {};
// options.limit = 1;
// NOTE: by setting limit 1 here, we end up using very inefficient
// code that recomputes the whole query on each update. The upside is
// that when you reactively depend on a findOne you only get
// invalidated when the found object changes, not any object in the
// collection. Most findOne will be by id, which has a fast path, so
// this might not be a big deal. In most cases, invalidation causes
// the called to re-query anyway, so this should be a net performance
// improvement.
options = options || {};
options.limit = 1;

return this.find(selector, options).fetch()[0];
};

Expand Down Expand Up @@ -173,7 +180,6 @@ LocalCollection.LiveResultsSet = function () {};
// initial results delivered through added callback
// XXX maybe callbacks should take a list of objects, to expose transactions?
// XXX maybe support field limiting (to limit what you're notified on)
// XXX maybe support limit/skip

_.extend(LocalCollection.Cursor.prototype, {
observe: function (options) {
Expand All @@ -187,8 +193,8 @@ _.extend(LocalCollection.Cursor.prototype, {
_observeInternal: function (ordered, options) {
var self = this;

if (self.skip || self.limit)
throw new Error("cannot observe queries with skip or limit");
if (!ordered && (self.skip || self.limit))
throw new Error("must use ordered observe with skip or limit");

var qid = self.collection.next_qid++;

Expand All @@ -198,7 +204,8 @@ _.extend(LocalCollection.Cursor.prototype, {
sort_f: ordered && self.sort_f,
results_snapshot: null,
ordered: ordered,
cursor: this
cursor: this,
needsRecompute: false
};
query.results = self._getRawObjects(ordered);
if (self.collection.paused)
Expand Down Expand Up @@ -254,6 +261,12 @@ LocalCollection.Cursor.prototype._getRawObjects = function (ordered) {

// fast path for single ID value
if (self.selector_id) {
// If you have non-zero skip and ask for a single id, you get
// nothing. This is so it matches the behavior of the '{_id: foo}'
// path.
if (self.skip)
return results;

if (_.has(self.collection.docs, self.selector_id)) {
var selectedDoc = self.collection.docs[self.selector_id];
if (ordered)
Expand All @@ -273,6 +286,10 @@ LocalCollection.Cursor.prototype._getRawObjects = function (ordered) {
else
results[id] = doc;
}
// Fast path for limited unsorted queries.
if (self.limit && !self.skip && !self.sort_f &&
results.length === self.limit)
return results;
}

if (!ordered)
Expand Down Expand Up @@ -337,9 +354,15 @@ LocalCollection.prototype.insert = function (doc) {
// trigger live queries that match
for (var qid in self.queries) {
var query = self.queries[qid];
if (query.selector_f(doc))
LocalCollection._insertInResults(query, doc);
if (query.selector_f(doc)) {
if (query.cursor.skip || query.cursor.limit)
query.needsRecompute = true;
else
LocalCollection._insertInResults(query, doc);
}
}

self._recomputeNecessaryQueries();
};

LocalCollection.prototype.remove = function (selector) {
Expand All @@ -365,8 +388,12 @@ LocalCollection.prototype.remove = function (selector) {
var removeId = remove[i];
var removeDoc = self.docs[removeId];
_.each(self.queries, function (query) {
if (query.selector_f(removeDoc))
queryRemove.push([query, removeDoc]);
if (query.selector_f(removeDoc)) {
if (query.cursor.skip || query.cursor.limit)
query.needsRecompute = true;
else
queryRemove.push([query, removeDoc]);
}
});
self._saveOriginal(removeId, removeDoc);
delete self.docs[removeId];
Expand All @@ -376,6 +403,8 @@ LocalCollection.prototype.remove = function (selector) {
for (var i = 0; i < queryRemove.length; i++) {
LocalCollection._removeFromResults(queryRemove[i][0], queryRemove[i][1]);
}

self._recomputeNecessaryQueries();
};

// XXX atomicity: if multi is true, and one modification fails, do
Expand All @@ -393,7 +422,7 @@ LocalCollection.prototype.update = function (selector, mod, options) {
self._saveOriginal(id, doc);
self._modifyAndNotify(doc, mod);
if (!options.multi)
return;
break;
any = true;
}
}
Expand All @@ -408,6 +437,8 @@ LocalCollection.prototype.update = function (selector, mod, options) {
self.insert(insert);
}
}

self._recomputeNecessaryQueries();
};

LocalCollection.prototype._modifyAndNotify = function (doc, mod) {
Expand All @@ -431,12 +462,24 @@ LocalCollection.prototype._modifyAndNotify = function (doc, mod) {
query = self.queries[qid];
var before = matched_before[qid];
var after = query.selector_f(doc);
if (before && !after)

if (query.cursor.skip || query.cursor.limit) {
// We need to recompute any query where the doc may have been in the
// cursor's window either before or after the update. (Note that if skip
// or limit is set, "before" and "after" being true do not necessarily
// mean that the document is in the cursor's output after skip/limit is
// applied... but if they are false, then the document definitely is NOT
// in the output. So it's safe to skip recompute if neither before or
// after are true.)
if (before || after)
query.needsRecompute = true;
} else if (before && !after) {
LocalCollection._removeFromResults(query, doc);
else if (!before && after)
} else if (!before && after) {
LocalCollection._insertInResults(query, doc);
else if (before && after)
} else if (before && after) {
LocalCollection._updateInResults(query, doc, old_doc);
}
}
};

Expand All @@ -463,6 +506,9 @@ LocalCollection._deepcopy = function (v) {

// XXX the sorted-query logic below is laughably inefficient. we'll
// need to come up with a better datastructure for this.
//
// XXX the logic for observing with a skip or a limit is even more
// laughably inefficient. we recompute the whole results every time!

LocalCollection._insertInResults = function (query, doc) {
if (query.ordered) {
Expand Down Expand Up @@ -517,6 +563,26 @@ LocalCollection._updateInResults = function (query, doc, old_doc) {
query.moved(LocalCollection._deepcopy(doc), orig_idx, new_idx);
};

LocalCollection.prototype._recomputeNecessaryQueries = function () {
var self = this;
_.each(self.queries, function (query) {
if (query.needsRecompute) {
LocalCollection._recomputeResults(query);
query.needsRecompute = false;
}
});
};

LocalCollection._recomputeResults = function (query) {
var old_results = query.results;
query.results = query.cursor._getRawObjects(query.ordered);

if (!query.paused)
LocalCollection._diffQuery(
query.ordered, old_results, query.results, query, true);
};


LocalCollection._findInOrderedResults = function (query, doc) {
if (!query.ordered)
throw new Error("Can't call _findInOrderedResults on unordered query");
Expand Down
Loading

0 comments on commit c10dd19

Please sign in to comment.