diff --git a/src/RxCollection.js b/src/RxCollection.js index 9a03906fdb6..046f391bbb7 100644 --- a/src/RxCollection.js +++ b/src/RxCollection.js @@ -372,54 +372,40 @@ class RxCollection { return Promise.all(importFns); } - - /** - * because it will have document-conflicts when 2 syncs write to the same storage - */ - async sync(serverURL, alsoIfNotLeader = false) { - - if (typeof this.pouch.sync !== 'function') { - throw new Error( - `RxCollection.sync needs 'pouchdb-replication'. Code: - RxDB.plugin(require('pouchdb-replication')); ` - ); - } - - if (!alsoIfNotLeader) - await this.database.waitForLeadership(); - + watchForChanges() { if (!this.synced) { /** * this will grap the changes and publish them to the rx-stream * this is to ensure that changes from 'synced' dbs will be published */ const sendChanges = {}; - const pouch$ = util.Rx.Observable - .fromEvent( - this.pouch.changes({ - since: 'now', - live: true, - include_docs: true - }), 'change') - .filter(c => c.id.charAt(0) != '_') - .map(c => c.doc) - .map(doc => { - doc._ext = true; - return doc; + const pouch$ = util.Rx.Observable.fromEvent( + this.pouch.changes({ + since: 'now', + live: true, + include_docs: true }) - .filter(doc => !this._changeEventBuffer.buffer.map(cE => cE.data.v._rev).includes(doc._rev)) - .filter(doc => sendChanges[doc._rev] = 'YES') - .delay(10) - .map(doc => { - let ret = null; - if (sendChanges[doc._rev] == 'YES') ret = doc; - delete sendChanges[doc._rev]; - return ret; - }) - .filter(doc => doc != null) - .subscribe(doc => { - this.$emit(RxChangeEvent.fromPouchChange(doc, this)); - }); + , 'change' + ) + .filter(c => c.id.charAt(0) != '_') + .map(c => c.doc) + .map(doc => { + doc._ext = true; + return doc; + }) + .filter(doc => sendChanges[doc._rev] = 'YES') + .delay(10) + .map(doc => { + let ret = null; + if (sendChanges[doc._rev] == 'YES') ret = doc; + delete sendChanges[doc._rev]; + return ret; + }) + .filter(doc => doc != null) + .subscribe(doc => { + this.$emit(RxChangeEvent.fromPouchChange(doc, this)); + }); + this._subs.push(pouch$); const ob2 = this.$ @@ -430,7 +416,28 @@ class RxCollection { .subscribe(); this._subs.push(ob2); } + this.synced = true; + } + + + /** + * because it will have document-conflicts when 2 syncs write to the same storage + */ + async sync(serverURL, alsoIfNotLeader = false) { + + if (typeof this.pouch.sync !== 'function') { + throw new Error( + `RxCollection.sync needs 'pouchdb-replication'. Code: + RxDB.plugin(require('pouchdb-replication')); ` + ); + } + + if (!alsoIfNotLeader) + await this.database.waitForLeadership(); + + this.watchForChanges(); + const sync = this.pouch.sync(serverURL, { live: true, retry: true