Skip to content

Commit

Permalink
(#2768) - basic opt-in replication - retry
Browse files Browse the repository at this point in the history
  • Loading branch information
calvinmetcalf authored and daleharvey committed Sep 29, 2014
1 parent 976c606 commit 47d105e
Show file tree
Hide file tree
Showing 2 changed files with 153 additions and 21 deletions.
112 changes: 93 additions & 19 deletions lib/replicate.js
Expand Up @@ -5,12 +5,72 @@ var EE = require('events').EventEmitter;

var MAX_SIMULTANEOUS_REVS = 50;

function randomNumber(min, max) {
min = parseInt(min, 10);
max = parseInt(max, 10);
if (min !== min) {
min = 0;
}
if (max !== max || max <= min) {
max = (min || 1) << 1; //doubling
} else {
max = max + 1;
}
var ratio = Math.random();
var range = max - min;

return ~~(range * ratio + min); // ~~ coerces to an int, but fast.
}

function defaultBackOff(min) {
var max = 0;
if (!min) {
max = 2000;
}
return randomNumber(min, max);
}

function backOff(repId, src, target, opts, returnValue, result, error) {
if (opts.retry === false) {
returnValue.emit('error', error);
returnValue.removeAllListeners();
return;
}
opts.default_back_off = opts.default_back_off || 0;
opts.retries = opts.retries || 0;
if (typeof opts.back_off_function !== 'function') {
opts.back_off_function = defaultBackOff;
}
opts.retries++;
if (opts.max_retries && opts.retries > opts.max_retries) {
returnValue.emit('error', new Error('tried ' +
opts.retries + ' times but replication failed'));
returnValue.removeAllListeners();
return;
}
returnValue.emit('requestError', error);
if (returnValue.state === 'active') {
returnValue.emit('syncStopped');
returnValue.state = 'stopped';
returnValue.once('syncRestarted', function () {
opts.current_back_off = opts.default_back_off;
});
}

opts.current_back_off = opts.current_back_off || opts.default_back_off;
opts.current_back_off = opts.back_off_function(opts.current_back_off);
setTimeout(function () {
replicate(repId, src, target, opts, returnValue);
}, opts.current_back_off);
}

// We create a basic promise so the caller can cancel the replication possibly
// before we have actually started listening to changes etc
utils.inherits(Replication, EE);
function Replication(opts) {
EE.call(this);
this.cancelled = false;
this.state = 'pending';
var self = this;
var promise = new utils.Promise(function (fulfill, reject) {
self.once('complete', fulfill);
Expand All @@ -25,15 +85,26 @@ function Replication(opts) {
// As we allow error handling via "error" event as well,
// put a stub in here so that rejecting never throws UnhandledError.
self.catch(function (err) {});

}

Replication.prototype.cancel = function () {
this.cancelled = true;
this.state = 'cancelled';
this.emit('cancel');
};

Replication.prototype.ready = function (src, target) {
var self = this;
this.once('change', function () {
if (this.state === 'pending') {
self.state = 'active';
self.emit('syncStarted');
} else if (self.state === 'stopped') {
self.state = 'active';
self.emit('syncRestarted');
}
});
function onDestroy() {
self.cancel();
}
Expand Down Expand Up @@ -146,7 +217,7 @@ Checkpointer.prototype.getCheckpoint = function () {
return 0;
});
};
function replicate(repId, src, target, opts, returnValue) {
function replicate(repId, src, target, opts, returnValue, result) {
var batches = []; // list of batches to be processed
var currentBatch; // the batch currently being processed
var pendingBatch = {
Expand All @@ -163,8 +234,11 @@ function replicate(repId, src, target, opts, returnValue) {
var batches_limit = opts.batches_limit || 10;
var changesPending = false; // true while src.changes is running
var doc_ids = opts.doc_ids;
var checkpointer = new Checkpointer(src, target, repId, returnValue);
var result = {
var state = {
cancelled: false
};
var checkpointer = new Checkpointer(src, target, repId, state);
result = result || {
ok: true,
start_time: new Date(),
docs_read: 0,
Expand All @@ -175,7 +249,6 @@ function replicate(repId, src, target, opts, returnValue) {
var changesOpts = {};
returnValue.ready(src, target);


function writeDocs() {
if (currentBatch.docs.length === 0) {
return;
Expand All @@ -186,7 +259,7 @@ function replicate(repId, src, target, opts, returnValue) {
}, {
new_edits: false
}).then(function (res) {
if (returnValue.cancelled) {
if (state.cancelled) {
completeReplication();
throw new Error('cancelled');
}
Expand Down Expand Up @@ -232,7 +305,7 @@ function replicate(repId, src, target, opts, returnValue) {
return src.get(id, {revs: true, open_revs: missing, attachments: true})
.then(function (docs) {
docs.forEach(function (doc) {
if (returnValue.cancelled) {
if (state.cancelled) {
return completeReplication();
}
if (doc.ok) {
Expand Down Expand Up @@ -266,7 +339,7 @@ function replicate(repId, src, target, opts, returnValue) {
keys: ids,
include_docs: true
}).then(function (res) {
if (returnValue.cancelled) {
if (state.cancelled) {
completeReplication();
throw (new Error('cancelled'));
}
Expand Down Expand Up @@ -298,7 +371,7 @@ function replicate(repId, src, target, opts, returnValue) {
currentBatch.seq
).then(function (res) {
writingCheckpoint = false;
if (returnValue.cancelled) {
if (state.cancelled) {
completeReplication();
throw new Error('cancelled');
}
Expand All @@ -322,7 +395,7 @@ function replicate(repId, src, target, opts, returnValue) {
});
});
return target.revsDiff(diff).then(function (diffs) {
if (returnValue.cancelled) {
if (state.cancelled) {
completeReplication();
throw new Error('cancelled');
}
Expand All @@ -334,7 +407,7 @@ function replicate(repId, src, target, opts, returnValue) {


function startNextBatch() {
if (returnValue.cancelled || currentBatch) {
if (state.cancelled || currentBatch) {
return;
}
if (batches.length === 0) {
Expand Down Expand Up @@ -386,7 +459,7 @@ function replicate(repId, src, target, opts, returnValue) {
return;
}
result.ok = false;
result.status = 'aborted';
result.status = 'aborting';
result.errors.push(err);
batches = [];
pendingBatch = {
Expand All @@ -402,7 +475,7 @@ function replicate(repId, src, target, opts, returnValue) {
if (replicationCompleted) {
return;
}
if (returnValue.cancelled) {
if (state.cancelled) {
result.status = 'cancelled';
if (writingCheckpoint) {
return;
Expand All @@ -411,7 +484,7 @@ function replicate(repId, src, target, opts, returnValue) {
result.status = result.status || 'complete';
result.end_time = new Date();
result.last_seq = last_seq;
replicationCompleted = returnValue.cancelled = true;
replicationCompleted = state.cancelled = true;
var non403s = result.errors.filter(function (error) {
return error.name !== 'unauthorized' && error.name !== 'forbidden';
});
Expand All @@ -421,16 +494,16 @@ function replicate(repId, src, target, opts, returnValue) {
error.other_errors = result.errors;
}
error.result = result;
returnValue.emit('error', error);
backOff(repId, src, target, opts, returnValue, result, error);
} else {
returnValue.emit('complete', result);
returnValue.removeAllListeners();
}
returnValue.removeAllListeners();
}


function onChange(change) {
if (returnValue.cancelled) {
if (state.cancelled) {
return completeReplication();
}
if (
Expand All @@ -448,7 +521,7 @@ function replicate(repId, src, target, opts, returnValue) {

function onChangesComplete(changes) {
changesPending = false;
if (returnValue.cancelled) {
if (state.cancelled) {
return completeReplication();
}
if (changesOpts.since < changes.last_seq) {
Expand All @@ -468,7 +541,7 @@ function replicate(repId, src, target, opts, returnValue) {

function onChangesError(err) {
changesPending = false;
if (returnValue.cancelled) {
if (state.cancelled) {
return completeReplication();
}
abortReplication('changes rejected', err);
Expand Down Expand Up @@ -542,7 +615,7 @@ function replicate(repId, src, target, opts, returnValue) {
writingCheckpoint = true;
checkpointer.writeCheckpoint(opts.since).then(function (res) {
writingCheckpoint = false;
if (returnValue.cancelled) {
if (state.cancelled) {
completeReplication();
return;
}
Expand Down Expand Up @@ -583,6 +656,7 @@ function replicateWrapper(src, target, opts, callback) {
}
opts = utils.clone(opts);
opts.continuous = opts.continuous || opts.live;
opts.retry = opts.retry || false;
/*jshint validthis:true */
opts.PouchConstructor = opts.PouchConstructor || this;
var replicateRet = new Replication(opts);
Expand Down
62 changes: 60 additions & 2 deletions tests/test.replication.js
Expand Up @@ -2352,6 +2352,7 @@ adapters.forEach(function (adapters) {
});
});
});

it('should cancel for live replication', function (done) {
var remote = new PouchDB(dbs.remote);
var db = new PouchDB(dbs.name);
Expand All @@ -2372,9 +2373,66 @@ adapters.forEach(function (adapters) {
});
}
});
remote.put({}, 'gazaa');
remote.put({}, 'hazaa');
});
it('retry stuff', function (done) {
var remote = new PouchDB(dbs.remote);
var Promise = PouchDB.utils.Promise;
var allDocs = remote.allDocs;
var i = 0;
var started = 0;
remote.allDocs = function (opts) {
if (opts.keys[0] === 'foo') {
i++;
if (i !== 3) {
return Promise.reject(new Error('flunking you'));
}
}
return allDocs.apply(remote, arguments);
};
var db = new PouchDB(dbs.name);
var rep = db.replicate.from(remote, {
live: true,
retry: true
});
rep.once('syncStopped', function () {
i.should.equal(1, 'sync stopped event');
started.should.equal(1, 'sync stopped event');
started++;
});
rep.on('syncRestarted', function () {
i.should.equal(3, 'sync restarted event');
started.should.equal(2, 'sync stopped event');
started++;
});
rep.on('syncStarted', function () {
i.should.equal(0, 'sync started event');
started.should.equal(0, 'sync started event');
started++;
});
rep.catch(done);
var called = 3;
rep.on('change', function () {
if ((--called) === 2) {
remote.put({}, 'foo').then(function () {
return remote.put({}, 'bar');
});
} else if (!called) {
rep.cancel();
remote.put({}, 'foo2').then(function () {
return remote.put({}, 'bar2');
}).then(function () {
setTimeout(function () {
started.should.equal(3, 'everything was emitted');
done();
}, 500);
});
} else if (called < 0) {
done(new Error('called too many times'));
}
});
remote.put({}, 'hazaa');
});

if (adapters[1] === 'http') {
// test validate_doc_update, which is a reasonable substitute
// for testing design doc replication of non-admin users, since we
Expand Down

0 comments on commit 47d105e

Please sign in to comment.