Skip to content

Commit

Permalink
(#242) - avoid unnecessary GET for gen-1
Browse files Browse the repository at this point in the history
  • Loading branch information
nolanlawson committed Jan 22, 2015
1 parent 1f67e2f commit 514a3cb
Show file tree
Hide file tree
Showing 3 changed files with 204 additions and 74 deletions.
168 changes: 94 additions & 74 deletions index.js
Expand Up @@ -26,6 +26,12 @@ function parseViewName(name) {
return name.indexOf('/') === -1 ? [name, name] : name.split('/');
}

function isGenOne(changes) {
// only return true if the current change is 1-
// and there are no other leafs
return changes.length === 1 && /^1-/.test(changes[0].rev);
}

function tryCode(db, fun, args) {
// emit an event if there was an error thrown by a map/reduce function.
// putting try/catches in a single function also avoids deoptimizations.
Expand Down Expand Up @@ -260,90 +266,100 @@ function defaultsTo(value) {
}

// returns a promise for a list of docs to update, based on the input docId.
// we update the metaDoc first (i.e. the doc that points from the sourceDB
// document Id to the ids of the documents in the mrview database), then
// the key/value docs. that way, if lightning strikes the user's computer
// in the middle of an update, we don't write any docs that we wouldn't
// be able to find later using the metaDoc.
function getDocsToPersist(docId, view, docIdsToEmits) {
// the order doesn't matter, because post-3.2.0, bulkDocs
// is an atomic operation in all three adapters.
function getDocsToPersist(docId, view, docIdsToChangesAndEmits) {
var metaDocId = '_local/doc_' + docId;
return view.db.get(metaDocId)
.catch(defaultsTo({_id: metaDocId, keys: []}))
.then(function (metaDoc) {
return Promise.resolve().then(function () {
if (metaDoc.keys.length) {
return view.db.allDocs({
keys: metaDoc.keys,
include_docs: true
});
var defaultMetaDoc = {_id: metaDocId, keys: []};
var docData = docIdsToChangesAndEmits[docId];
var indexableKeysToKeyValues = docData.indexableKeysToKeyValues;
var changes = docData.changes;

function getMetaDoc() {
if (isGenOne(changes)) {
// generation 1, so we can safely assume initial state
// for performance reasons (avoids unnecessary GETs)
return Promise.resolve(defaultMetaDoc);
}
return view.db.get(metaDocId).catch(defaultsTo(defaultMetaDoc));
}

function getKeyValueDocs(metaDoc) {
if (!metaDoc.keys.length) {
// no keys, no need for a lookup
return Promise.resolve({rows: []});
}
return view.db.allDocs({
keys: metaDoc.keys,
include_docs: true
});
}

function processKvDocs(metaDoc, kvDocsRes) {
var kvDocs = [];
var oldKeysMap = {};

for (var i = 0, len = kvDocsRes.rows.length; i < len; i++) {
var row = kvDocsRes.rows[i];
var doc = row.doc;
if (!doc) { // deleted
continue;
}
kvDocs.push(doc);
oldKeysMap[doc._id] = true;
doc._deleted = !indexableKeysToKeyValues[doc._id];
if (!doc._deleted) {
var keyValue = indexableKeysToKeyValues[doc._id];
if ('value' in keyValue) {
doc.value = keyValue.value;
}
return {rows: []}; // no keys, no need for a lookup
}).then(function (res) {
var kvDocs = res.rows.map(function (row) {
return row.doc;
}).filter(function (row) {
return row;
});
}
}

var indexableKeysToKeyValues = docIdsToEmits[docId];
var oldKeysMap = {};
kvDocs.forEach(function (kvDoc) {
oldKeysMap[kvDoc._id] = true;
kvDoc._deleted = !indexableKeysToKeyValues[kvDoc._id];
if (!kvDoc._deleted) {
var keyValue = indexableKeysToKeyValues[kvDoc._id];
if ('value' in keyValue) {
kvDoc.value = keyValue.value;
}
}
});
var newKeys = Object.keys(indexableKeysToKeyValues);
newKeys.forEach(function (key) {
if (!oldKeysMap[key]) {
// new doc
var kvDoc = {
_id: key
};
var keyValue = indexableKeysToKeyValues[key];
if ('value' in keyValue) {
kvDoc.value = keyValue.value;
}
kvDocs.push(kvDoc);
}
});
metaDoc.keys = utils.uniq(newKeys.concat(metaDoc.keys));
kvDocs.push(metaDoc);

var newKeys = Object.keys(indexableKeysToKeyValues);
newKeys.forEach(function (key) {
if (!oldKeysMap[key]) {
// new doc
var kvDoc = {
_id: key
};
var keyValue = indexableKeysToKeyValues[key];
if ('value' in keyValue) {
kvDoc.value = keyValue.value;
}
kvDocs.push(kvDoc);
}
});
metaDoc.keys = utils.uniq(newKeys.concat(metaDoc.keys));
kvDocs.splice(0, 0, metaDoc);
return kvDocs;
}

return kvDocs;
});
return getMetaDoc().then(function (metaDoc) {
return getKeyValueDocs(metaDoc).then(function (kvDocsRes) {
return processKvDocs(metaDoc, kvDocsRes);
});
});
}

// updates all emitted key/value docs and metaDocs in the mrview database
// for the given batch of documents from the source database
function saveKeyValues(view, docIdsToEmits, seq) {
function saveKeyValues(view, docIdsToChangesAndEmits, seq) {
var seqDocId = '_local/lastSeq';
return view.db.get(seqDocId)
.catch(defaultsTo({_id: seqDocId, seq: 0}))
.then(function (lastSeqDoc) {
var docIds = Object.keys(docIdsToEmits);
var docIds = Object.keys(docIdsToChangesAndEmits);
return Promise.all(docIds.map(function (docId) {
return getDocsToPersist(docId, view, docIdsToEmits);
})).then(function (listOfDocsToPersist) {
var docsToPersist = [];
listOfDocsToPersist.forEach(function (docList) {
docsToPersist = docsToPersist.concat(docList);
});

// update the seq doc last, so that if a meteor strikes the user's
// computer in the middle of an update, we can apply the idempotent
// batch update operation again
lastSeqDoc.seq = seq;
docsToPersist.push(lastSeqDoc);

return view.db.bulkDocs({docs : docsToPersist});
});
return getDocsToPersist(docId, view, docIdsToChangesAndEmits);
})).then(function (listOfDocsToPersist) {
var docsToPersist = utils.flatten(listOfDocsToPersist);
lastSeqDoc.seq = seq;
docsToPersist.push(lastSeqDoc);
// write all docs in a single operation, update the seq once
return view.db.bulkDocs({docs : docsToPersist});
});
});
}

Expand Down Expand Up @@ -390,9 +406,9 @@ function updateViewInQueue(view) {

var currentSeq = view.seq || 0;

function processChange(docIdsToEmits, seq) {
function processChange(docIdsToChangesAndEmits, seq) {
return function () {
return saveKeyValues(view, docIdsToEmits, seq);
return saveKeyValues(view, docIdsToChangesAndEmits, seq);
};
}

Expand All @@ -412,14 +428,15 @@ function updateViewInQueue(view) {
view.sourceDB.changes({
conflicts: true,
include_docs: true,
style: 'all_docs',
since: currentSeq,
limit: CHANGES_BATCH_SIZE
}).on('complete', function (response) {
var results = response.results;
if (!results.length) {
return complete();
}
var docIdsToEmits = {};
var docIdsToChangesAndEmits = {};
for (var i = 0, l = results.length; i < l; i++) {
var change = results[i];
if (change.doc._id[0] !== '_') {
Expand All @@ -443,11 +460,14 @@ function updateViewInQueue(view) {
indexableKeysToKeyValues[indexableKey] = obj;
lastKey = obj.key;
}
docIdsToEmits[change.doc._id] = indexableKeysToKeyValues;
docIdsToChangesAndEmits[change.doc._id] = {
indexableKeysToKeyValues: indexableKeysToKeyValues,
changes: change.changes
};
}
currentSeq = change.seq;
}
queue.add(processChange(docIdsToEmits, currentSeq));
queue.add(processChange(docIdsToChangesAndEmits, currentSeq));
if (results.length < CHANGES_BATCH_SIZE) {
return complete();
}
Expand Down
102 changes: 102 additions & 0 deletions test/test.js
Expand Up @@ -820,6 +820,108 @@ function tests(dbName, dbType, viewType) {
});
});

it('#242 conflicts at the root level', function () {
var db = new Pouch(dbName);

return db.bulkDocs([
{
foo: '1',
_id: 'foo',
_rev: '1-w',
_revisions: {start: 1, ids: ['w']}
}
], {new_edits: false}).then(function () {
return createView(db, {
map: function (doc) {
emit(doc.foo);
}
}).then(function (queryFun) {
return db.query(queryFun).then(function (res) {
res.rows[0].key.should.equal('1');
return db.bulkDocs([
{
foo: '2',
_id: 'foo',
_rev: '1-x',
_revisions: {start: 1, ids: ['x']}
}
], {new_edits: false}).then(function () {
return db.query(queryFun);
}).then(function (res) {
res.rows[0].key.should.equal('2');
return db.bulkDocs([
{
foo: '3',
_id: 'foo',
_rev: '1-y',
_deleted: true,
_revisions: {start: 1, ids: ['y']}
}
], {new_edits: false});
}).then(function () {
return db.query(queryFun);
}).then(function (res) {
res.rows[0].key.should.equal('2');
});
});
});
});
});

it('#242 conflicts at the root+1 level', function () {
var db = new Pouch(dbName);

return db.bulkDocs([
{
foo: '2',
_id: 'foo',
_rev: '1-x',
_revisions: {start: 1, ids: ['x']}
},
{
foo: '3',
_id: 'foo',
_rev: '2-y',
_deleted: true,
_revisions: {start: 2, ids: ['y', 'x']}
}

], {new_edits: false}).then(function () {
return createView(db, {
map: function (doc) {
emit(doc.foo);
}
}).then(function (queryFun) {
return db.query(queryFun).then(function (res) {
res.rows.length.should.equal(0);
return db.bulkDocs([
{
foo: '1',
_id: 'foo',
_rev: '1-w',
_revisions: {start: 1, ids: ['w']}
}
], {new_edits: false}).then(function () {
return db.query(queryFun);
}).then(function (res) {
res.rows[0].key.should.equal('1');
return db.bulkDocs([
{
foo: '4',
_id: 'foo',
_rev: '1-z',
_revisions: {start: 1, ids: ['z']}
}
], {new_edits: false});
}).then(function () {
return db.query(queryFun);
}).then(function (res) {
res.rows[0].key.should.equal('4');
});
});
});
});
});

it('Views should include _conflicts', function () {
var db2name = 'test2' + Math.random();
Expand Down
8 changes: 8 additions & 0 deletions utils.js
Expand Up @@ -67,6 +67,14 @@ exports.sequentialize = function (queue, promiseFactory) {
};
};

exports.flatten = function (arrs) {
var res = [];
for (var i = 0, len = arrs.length; i < len; i++) {
res = res.concat(arrs[i]);
}
return res;
};

// uniq an array of strings, order not guaranteed
// similar to underscore/lodash _.uniq
exports.uniq = function (arr) {
Expand Down

0 comments on commit 514a3cb

Please sign in to comment.