Skip to content

Commit

Permalink
Merge pull request #197 from natew/master
Browse files Browse the repository at this point in the history
Split out watchForChanges to support external replications
  • Loading branch information
pubkey committed Jun 5, 2017
2 parents 85ba78a + 7642c56 commit d7a6733
Showing 1 changed file with 48 additions and 41 deletions.
89 changes: 48 additions & 41 deletions src/RxCollection.js
Original file line number Diff line number Diff line change
Expand Up @@ -372,54 +372,40 @@ class RxCollection {
return Promise.all(importFns);
}


/**
* because it will have document-conflicts when 2 syncs write to the same storage
*/
async sync(serverURL, alsoIfNotLeader = false) {

if (typeof this.pouch.sync !== 'function') {
throw new Error(
`RxCollection.sync needs 'pouchdb-replication'. Code:
RxDB.plugin(require('pouchdb-replication')); `
);
}

if (!alsoIfNotLeader)
await this.database.waitForLeadership();

watchForChanges() {
if (!this.synced) {
/**
* this will grap the changes and publish them to the rx-stream
* this is to ensure that changes from 'synced' dbs will be published
*/
const sendChanges = {};
const pouch$ = util.Rx.Observable
.fromEvent(
this.pouch.changes({
since: 'now',
live: true,
include_docs: true
}), 'change')
.filter(c => c.id.charAt(0) != '_')
.map(c => c.doc)
.map(doc => {
doc._ext = true;
return doc;
const pouch$ = util.Rx.Observable.fromEvent(
this.pouch.changes({
since: 'now',
live: true,
include_docs: true
})
.filter(doc => !this._changeEventBuffer.buffer.map(cE => cE.data.v._rev).includes(doc._rev))
.filter(doc => sendChanges[doc._rev] = 'YES')
.delay(10)
.map(doc => {
let ret = null;
if (sendChanges[doc._rev] == 'YES') ret = doc;
delete sendChanges[doc._rev];
return ret;
})
.filter(doc => doc != null)
.subscribe(doc => {
this.$emit(RxChangeEvent.fromPouchChange(doc, this));
});
, 'change'
)
.filter(c => c.id.charAt(0) != '_')
.map(c => c.doc)
.map(doc => {
doc._ext = true;
return doc;
})
.filter(doc => sendChanges[doc._rev] = 'YES')
.delay(10)
.map(doc => {
let ret = null;
if (sendChanges[doc._rev] == 'YES') ret = doc;
delete sendChanges[doc._rev];
return ret;
})
.filter(doc => doc != null)
.subscribe(doc => {
this.$emit(RxChangeEvent.fromPouchChange(doc, this));
});

this._subs.push(pouch$);

const ob2 = this.$
Expand All @@ -430,7 +416,28 @@ class RxCollection {
.subscribe();
this._subs.push(ob2);
}

this.synced = true;
}


/**
* because it will have document-conflicts when 2 syncs write to the same storage
*/
async sync(serverURL, alsoIfNotLeader = false) {

if (typeof this.pouch.sync !== 'function') {
throw new Error(
`RxCollection.sync needs 'pouchdb-replication'. Code:
RxDB.plugin(require('pouchdb-replication')); `
);
}

if (!alsoIfNotLeader)
await this.database.waitForLeadership();

this.watchForChanges();

const sync = this.pouch.sync(serverURL, {
live: true,
retry: true
Expand Down

0 comments on commit d7a6733

Please sign in to comment.