Permalink
Browse files

(#6031) - faster IDB changes() with batched cursor

  • Loading branch information...
1 parent 3c9d7f0 commit 620d987a875542f9b0eec58573f9ed85c30bc00f @nolanlawson nolanlawson committed with daleharvey Dec 21, 2016
@@ -27,7 +27,9 @@ import {
openTransactionSafely
} from './utils';
-function idbBulkDocs(dbOpts, req, opts, api, idb, idbChanges, callback) {
+import changesHandler from './changesHandler';
+
+function idbBulkDocs(dbOpts, req, opts, api, idb, callback) {
var docInfos = req.docs;
var txn;
var docStore;
@@ -137,7 +139,7 @@ function idbBulkDocs(dbOpts, req, opts, api, idb, idbChanges, callback) {
return;
}
- idbChanges.notify(api._meta.name);
+ changesHandler.notify(api._meta.name);
api._meta.docCount += docCountDelta;
callback(null, results);
}
@@ -0,0 +1,212 @@
+import changesHandler from './changesHandler';
+import {
+ clone,
+ filterChange,
+ uuid
+} from 'pouchdb-utils';
+import {
+ Map,
+ Set
+} from 'pouchdb-collections';
+import {
+ ATTACH_STORE,
+ BY_SEQ_STORE,
+ DOC_STORE
+} from './constants';
+import {
+ decodeDoc,
+ decodeMetadata,
+ fetchAttachmentsIfNecessary,
+ idbError,
+ postProcessAttachments,
+ openTransactionSafely
+} from './utils';
+import runBatchedCursor from './runBatchedCursor';
+
+function changes(opts, api, dbName, idb) {
+ opts = clone(opts);
+
+ if (opts.continuous) {
+ var id = dbName + ':' + uuid();
+ changesHandler.addListener(dbName, id, api, opts);
+ changesHandler.notify(dbName);
+ return {
+ cancel: function () {
+ changesHandler.removeListener(dbName, id);
+ }
+ };
+ }
+
+ var docIds = opts.doc_ids && new Set(opts.doc_ids);
+
+ opts.since = opts.since || 0;
+ var lastSeq = opts.since;
+
+ var limit = 'limit' in opts ? opts.limit : -1;
+ if (limit === 0) {
+ limit = 1; // per CouchDB _changes spec
+ }
+ var returnDocs;
+ if ('return_docs' in opts) {
+ returnDocs = opts.return_docs;
+ } else if ('returnDocs' in opts) {
+ // TODO: Remove 'returnDocs' in favor of 'return_docs' in a future release
+ returnDocs = opts.returnDocs;
+ } else {
+ returnDocs = true;
+ }
+
+ var results = [];
+ var numResults = 0;
+ var filter = filterChange(opts);
+ var docIdsToMetadata = new Map();
+
+ var txn;
+ var bySeqStore;
+ var docStore;
+ var docIdRevIndex;
+
+ function onBatch(batchKeys, batchValues, cursor) {
+ if (!cursor || !batchKeys.length) { // done
+ return;
+ }
+
+ var winningDocs = new Array(batchKeys.length);
+ var metadatas = new Array(batchKeys.length);
+
+ function processMetadataAndWinningDoc(metadata, winningDoc) {
+ var change = opts.processChange(winningDoc, metadata, opts);
+ lastSeq = change.seq = metadata.seq;
+
+ var filtered = filter(change);
+ if (typeof filtered === 'object') { // anything but true/false indicates error
+ return opts.complete(filtered);
+ }
+
+ if (filtered) {
+ numResults++;
+ if (returnDocs) {
+ results.push(change);
+ }
+ // process the attachment immediately
+ // for the benefit of live listeners
+ if (opts.attachments && opts.include_docs) {
+ fetchAttachmentsIfNecessary(winningDoc, opts, txn, function () {
+ postProcessAttachments([change], opts.binary).then(function () {
+ opts.onChange(change);
+ });
+ });
+ } else {
+ opts.onChange(change);
+ }
+ }
+ }
+
+ function onBatchDone() {
+ for (var i = 0, len = winningDocs.length; i < len; i++) {
+ if (numResults === limit) {
+ break;
+ }
+ var winningDoc = winningDocs[i];
+ if (!winningDoc) {
+ continue;
+ }
+ var metadata = metadatas[i];
+ processMetadataAndWinningDoc(metadata, winningDoc);
+ }
+
+ if (numResults !== limit) {
+ cursor.continue();
+ }
+ }
+
+ // Fetch all metadatas/winningdocs from this batch in parallel, then process
+ // them all only once all data has been collected. This is done in parallel
+ // because it's faster than doing it one-at-a-time.
+ var numDone = 0;
+ batchValues.forEach(function (value, i) {
+ var doc = decodeDoc(value);
+ var seq = batchKeys[i];
+ fetchWinningDocAndMetadata(doc, seq, function (metadata, winningDoc) {
+ metadatas[i] = metadata;
+ winningDocs[i] = winningDoc;
+ if (++numDone === batchKeys.length) {
+ onBatchDone();
+ }
+ });
+ });
+ }
+
+ function onGetMetadata(doc, seq, metadata, cb) {
+ if (metadata.seq !== seq) {
+ // some other seq is later
+ return cb();
+ }
+
+ if (metadata.winningRev === doc._rev) {
+ // this is the winning doc
+ return cb(metadata, doc);
+ }
+
+ // fetch winning doc in separate request
+ var docIdRev = doc._id + '::' + metadata.winningRev;
+ var req = docIdRevIndex.get(docIdRev);
+ req.onsuccess = function (e) {
+ cb(metadata, decodeDoc(e.target.result));
+ };
+ }
+
+ function fetchWinningDocAndMetadata(doc, seq, cb) {
+ if (docIds && !docIds.has(doc._id)) {
+ return cb();
+ }
+
+ var metadata = docIdsToMetadata.get(doc._id);
+ if (metadata) { // cached
+ return onGetMetadata(doc, seq, metadata, cb);
+ }
+ // metadata not cached, have to go fetch it
+ docStore.get(doc._id).onsuccess = function (e) {
+ metadata = decodeMetadata(e.target.result);
+ docIdsToMetadata.set(doc._id, metadata);
+ onGetMetadata(doc, seq, metadata, cb);
+ };
+ }
+
+ function finish() {
+ opts.complete(null, {
+ results: results,
+ last_seq: lastSeq
+ });
+ }
+
+ function onTxnComplete() {
+ if (!opts.continuous && opts.attachments) {
+ // cannot guarantee that postProcessing was already done,
+ // so do it again
+ postProcessAttachments(results).then(finish);
+ } else {
+ finish();
+ }
+ }
+
+ var objectStores = [DOC_STORE, BY_SEQ_STORE];
+ if (opts.attachments) {
+ objectStores.push(ATTACH_STORE);
+ }
+ var txnResult = openTransactionSafely(idb, objectStores, 'readonly');
+ if (txnResult.error) {
+ return opts.complete(txnResult.error);
+ }
+ txn = txnResult.txn;
+ txn.onabort = idbError(opts.complete);
+ txn.oncomplete = onTxnComplete;
+
+ bySeqStore = txn.objectStore(BY_SEQ_STORE);
+ docStore = txn.objectStore(DOC_STORE);
+ docIdRevIndex = bySeqStore.index('_doc_id_rev');
+
+ runBatchedCursor(bySeqStore, opts.since, opts.descending, limit, onBatch);
+}
+
+export default changes;
@@ -0,0 +1,2 @@
+import { changesHandler as Changes } from 'pouchdb-utils';
+export default new Changes();
Oops, something went wrong.

0 comments on commit 620d987

Please sign in to comment.