diff --git a/lib/replicate.js b/lib/replicate.js index fa3e960224..2c0ffff50e 100644 --- a/lib/replicate.js +++ b/lib/replicate.js @@ -5,12 +5,72 @@ var EE = require('events').EventEmitter; var MAX_SIMULTANEOUS_REVS = 50; +function randomNumber(min, max) { + min = parseInt(min, 10); + max = parseInt(max, 10); + if (min !== min) { + min = 0; + } + if (max !== max || max <= min) { + max = (min || 1) << 1; //doubling + } else { + max = max + 1; + } + var ratio = Math.random(); + var range = max - min; + + return ~~(range * ratio + min); // ~~ coerces to an int, but fast. +} + +function defaultBackOff(min) { + var max = 0; + if (!min) { + max = 2000; + } + return randomNumber(min, max); +} + +function backOff(repId, src, target, opts, returnValue, result, error) { + if (opts.retry === false) { + returnValue.emit('error', error); + returnValue.removeAllListeners(); + return; + } + opts.default_back_off = opts.default_back_off || 0; + opts.retries = opts.retries || 0; + if (typeof opts.back_off_function !== 'function') { + opts.back_off_function = defaultBackOff; + } + opts.retries++; + if (opts.max_retries && opts.retries > opts.max_retries) { + returnValue.emit('error', new Error('tried ' + + opts.retries + ' times but replication failed')); + returnValue.removeAllListeners(); + return; + } + returnValue.emit('requestError', error); + if (returnValue.state === 'active') { + returnValue.emit('syncStopped'); + returnValue.state = 'stopped'; + returnValue.once('syncRestarted', function () { + opts.current_back_off = opts.default_back_off; + }); + } + + opts.current_back_off = opts.current_back_off || opts.default_back_off; + opts.current_back_off = opts.back_off_function(opts.current_back_off); + setTimeout(function () { + replicate(repId, src, target, opts, returnValue); + }, opts.current_back_off); +} + // We create a basic promise so the caller can cancel the replication possibly // before we have actually started listening to changes etc utils.inherits(Replication, EE); function Replication(opts) { EE.call(this); this.cancelled = false; + this.state = 'pending'; var self = this; var promise = new utils.Promise(function (fulfill, reject) { self.once('complete', fulfill); @@ -25,15 +85,26 @@ function Replication(opts) { // As we allow error handling via "error" event as well, // put a stub in here so that rejecting never throws UnhandledError. self.catch(function (err) {}); + } Replication.prototype.cancel = function () { this.cancelled = true; + this.state = 'cancelled'; this.emit('cancel'); }; Replication.prototype.ready = function (src, target) { var self = this; + this.once('change', function () { + if (this.state === 'pending') { + self.state = 'active'; + self.emit('syncStarted'); + } else if (self.state === 'stopped') { + self.state = 'active'; + self.emit('syncRestarted'); + } + }); function onDestroy() { self.cancel(); } @@ -146,7 +217,7 @@ Checkpointer.prototype.getCheckpoint = function () { return 0; }); }; -function replicate(repId, src, target, opts, returnValue) { +function replicate(repId, src, target, opts, returnValue, result) { var batches = []; // list of batches to be processed var currentBatch; // the batch currently being processed var pendingBatch = { @@ -163,8 +234,11 @@ function replicate(repId, src, target, opts, returnValue) { var batches_limit = opts.batches_limit || 10; var changesPending = false; // true while src.changes is running var doc_ids = opts.doc_ids; - var checkpointer = new Checkpointer(src, target, repId, returnValue); - var result = { + var state = { + cancelled: false + }; + var checkpointer = new Checkpointer(src, target, repId, state); + result = result || { ok: true, start_time: new Date(), docs_read: 0, @@ -175,7 +249,6 @@ function replicate(repId, src, target, opts, returnValue) { var changesOpts = {}; returnValue.ready(src, target); - function writeDocs() { if (currentBatch.docs.length === 0) { return; @@ -186,7 +259,7 @@ function replicate(repId, src, target, opts, returnValue) { }, { new_edits: false }).then(function (res) { - if (returnValue.cancelled) { + if (state.cancelled) { completeReplication(); throw new Error('cancelled'); } @@ -232,7 +305,7 @@ function replicate(repId, src, target, opts, returnValue) { return src.get(id, {revs: true, open_revs: missing, attachments: true}) .then(function (docs) { docs.forEach(function (doc) { - if (returnValue.cancelled) { + if (state.cancelled) { return completeReplication(); } if (doc.ok) { @@ -266,7 +339,7 @@ function replicate(repId, src, target, opts, returnValue) { keys: ids, include_docs: true }).then(function (res) { - if (returnValue.cancelled) { + if (state.cancelled) { completeReplication(); throw (new Error('cancelled')); } @@ -298,7 +371,7 @@ function replicate(repId, src, target, opts, returnValue) { currentBatch.seq ).then(function (res) { writingCheckpoint = false; - if (returnValue.cancelled) { + if (state.cancelled) { completeReplication(); throw new Error('cancelled'); } @@ -322,7 +395,7 @@ function replicate(repId, src, target, opts, returnValue) { }); }); return target.revsDiff(diff).then(function (diffs) { - if (returnValue.cancelled) { + if (state.cancelled) { completeReplication(); throw new Error('cancelled'); } @@ -334,7 +407,7 @@ function replicate(repId, src, target, opts, returnValue) { function startNextBatch() { - if (returnValue.cancelled || currentBatch) { + if (state.cancelled || currentBatch) { return; } if (batches.length === 0) { @@ -386,7 +459,7 @@ function replicate(repId, src, target, opts, returnValue) { return; } result.ok = false; - result.status = 'aborted'; + result.status = 'aborting'; result.errors.push(err); batches = []; pendingBatch = { @@ -402,7 +475,7 @@ function replicate(repId, src, target, opts, returnValue) { if (replicationCompleted) { return; } - if (returnValue.cancelled) { + if (state.cancelled) { result.status = 'cancelled'; if (writingCheckpoint) { return; @@ -411,7 +484,7 @@ function replicate(repId, src, target, opts, returnValue) { result.status = result.status || 'complete'; result.end_time = new Date(); result.last_seq = last_seq; - replicationCompleted = returnValue.cancelled = true; + replicationCompleted = state.cancelled = true; var non403s = result.errors.filter(function (error) { return error.name !== 'unauthorized' && error.name !== 'forbidden'; }); @@ -421,16 +494,16 @@ function replicate(repId, src, target, opts, returnValue) { error.other_errors = result.errors; } error.result = result; - returnValue.emit('error', error); + backOff(repId, src, target, opts, returnValue, result, error); } else { returnValue.emit('complete', result); + returnValue.removeAllListeners(); } - returnValue.removeAllListeners(); } function onChange(change) { - if (returnValue.cancelled) { + if (state.cancelled) { return completeReplication(); } if ( @@ -448,7 +521,7 @@ function replicate(repId, src, target, opts, returnValue) { function onChangesComplete(changes) { changesPending = false; - if (returnValue.cancelled) { + if (state.cancelled) { return completeReplication(); } if (changesOpts.since < changes.last_seq) { @@ -468,7 +541,7 @@ function replicate(repId, src, target, opts, returnValue) { function onChangesError(err) { changesPending = false; - if (returnValue.cancelled) { + if (state.cancelled) { return completeReplication(); } abortReplication('changes rejected', err); @@ -542,7 +615,7 @@ function replicate(repId, src, target, opts, returnValue) { writingCheckpoint = true; checkpointer.writeCheckpoint(opts.since).then(function (res) { writingCheckpoint = false; - if (returnValue.cancelled) { + if (state.cancelled) { completeReplication(); return; } @@ -583,6 +656,7 @@ function replicateWrapper(src, target, opts, callback) { } opts = utils.clone(opts); opts.continuous = opts.continuous || opts.live; + opts.retry = opts.retry || false; /*jshint validthis:true */ opts.PouchConstructor = opts.PouchConstructor || this; var replicateRet = new Replication(opts); diff --git a/tests/test.replication.js b/tests/test.replication.js index 700cb1d71d..208b8433b2 100644 --- a/tests/test.replication.js +++ b/tests/test.replication.js @@ -2352,6 +2352,7 @@ adapters.forEach(function (adapters) { }); }); }); + it('should cancel for live replication', function (done) { var remote = new PouchDB(dbs.remote); var db = new PouchDB(dbs.name); @@ -2372,9 +2373,66 @@ adapters.forEach(function (adapters) { }); } }); - remote.put({}, 'gazaa'); + remote.put({}, 'hazaa'); + }); + it('retry stuff', function (done) { + var remote = new PouchDB(dbs.remote); + var Promise = PouchDB.utils.Promise; + var allDocs = remote.allDocs; + var i = 0; + var started = 0; + remote.allDocs = function (opts) { + if (opts.keys[0] === 'foo') { + i++; + if (i !== 3) { + return Promise.reject(new Error('flunking you')); + } + } + return allDocs.apply(remote, arguments); + }; + var db = new PouchDB(dbs.name); + var rep = db.replicate.from(remote, { + live: true, + retry: true + }); + rep.once('syncStopped', function () { + i.should.equal(1, 'sync stopped event'); + started.should.equal(1, 'sync stopped event'); + started++; + }); + rep.on('syncRestarted', function () { + i.should.equal(3, 'sync restarted event'); + started.should.equal(2, 'sync stopped event'); + started++; + }); + rep.on('syncStarted', function () { + i.should.equal(0, 'sync started event'); + started.should.equal(0, 'sync started event'); + started++; + }); + rep.catch(done); + var called = 3; + rep.on('change', function () { + if ((--called) === 2) { + remote.put({}, 'foo').then(function () { + return remote.put({}, 'bar'); + }); + } else if (!called) { + rep.cancel(); + remote.put({}, 'foo2').then(function () { + return remote.put({}, 'bar2'); + }).then(function () { + setTimeout(function () { + started.should.equal(3, 'everything was emitted'); + done(); + }, 500); + }); + } else if (called < 0) { + done(new Error('called too many times')); + } + }); + remote.put({}, 'hazaa'); }); - if (adapters[1] === 'http') { // test validate_doc_update, which is a reasonable substitute // for testing design doc replication of non-admin users, since we