Skip to content

Commit

Permalink
ADD #763 RxReplicationState.denied$
Browse files Browse the repository at this point in the history
  • Loading branch information
pubkey committed Sep 2, 2018
1 parent a5b6f82 commit 04e7ddd
Show file tree
Hide file tree
Showing 7 changed files with 96 additions and 56 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ Breaking:
Features:
- Added `RxDocument.atomicSet()`
- Added `RxCollection.awaitPersistence()` for in-memory-collections
- Added `RxReplicationState.denied$` [#763](https://github.com/pubkey/rxdb/issues/763)

Bugfixes:
- checkAdapter doesn't cleanup test databases [#714](https://github.com/pubkey/rxdb/issues/714)
Expand Down
7 changes: 7 additions & 0 deletions docs-src/replication.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,13 @@ Emits each replicated document-data.
replicationState.docs$.subscribe(docData => console.dir(docData));
```

### denied$
Emits when a document failed to replicate (e.g. due to permissions).

```js
replicationState.denied$.subscribe(docData => console.dir(docData));
```

### active$
Emits `true` or `false` depending if the replication is running. For example if you sync with a remote server and the connection dies, this is `false` until the connection can be reestablished.

Expand Down
4 changes: 0 additions & 4 deletions src/plugins/in-memory.js
Original file line number Diff line number Diff line change
Expand Up @@ -146,11 +146,7 @@ export class InMemoryRxCollection extends RxCollection.RxCollection {
* @overwrite
*/
async _pouchPut(obj, overwrite) {
console.log('new pouch put');
const ret = await this._oldPouchPut(obj, overwrite);
console.log('Ret');
console.dir(ret);

this._nonPersistentRevisions.add(ret.rev);
return ret;
}
Expand Down
59 changes: 33 additions & 26 deletions src/plugins/replication.js
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ export class RxReplicationState {
this._subjects = {
change: new Subject(),
docs: new Subject(),
denied: new Subject(),
active: new BehaviorSubject(false),
complete: new BehaviorSubject(false),
error: new Subject(),
Expand All @@ -46,7 +47,7 @@ export class RxReplicationState {
// create getters
Object.keys(this._subjects).forEach(key => {
Object.defineProperty(this, key + '$', {
get: function () {
get: function() {
return this._subjects[key].asObservable();
}
});
Expand All @@ -60,54 +61,60 @@ export class RxReplicationState {
// change
this._subs.push(
fromEvent(evEmitter, 'change')
.subscribe(ev => this._subjects.change.next(ev))
.subscribe(ev => this._subjects.change.next(ev))
);

// denied
this._subs.push(
fromEvent(evEmitter, 'denied')
.subscribe(ev => this._subjects.denied.next(ev))
);

// docs
this._subs.push(
fromEvent(evEmitter, 'change')
.subscribe(ev => {
if (
this._subjects.docs.observers.length === 0 ||
ev.direction !== 'pull'
) return;

ev.change.docs
.filter(doc => doc.language !== 'query') // remove internal docs
.map(doc => this.collection._handleFromPouch(doc)) // do primary-swap and keycompression
.forEach(doc => this._subjects.docs.next(doc));
}));
.subscribe(ev => {
if (
this._subjects.docs.observers.length === 0 ||
ev.direction !== 'pull'
) return;

ev.change.docs
.filter(doc => doc.language !== 'query') // remove internal docs
.map(doc => this.collection._handleFromPouch(doc)) // do primary-swap and keycompression
.forEach(doc => this._subjects.docs.next(doc));
}));

// error
this._subs.push(
fromEvent(evEmitter, 'error')
.subscribe(ev => this._subjects.error.next(ev))
.subscribe(ev => this._subjects.error.next(ev))
);

// active
this._subs.push(
fromEvent(evEmitter, 'active')
.subscribe(() => this._subjects.active.next(true))
.subscribe(() => this._subjects.active.next(true))
);
this._subs.push(
fromEvent(evEmitter, 'paused')
.subscribe(() => this._subjects.active.next(false))
.subscribe(() => this._subjects.active.next(false))
);

// complete
this._subs.push(
fromEvent(evEmitter, 'complete')
.subscribe(info => {
.subscribe(info => {

/**
* when complete fires, it might be that not all changeEvents
* have passed throught, because of the delay of .wachtForChanges()
* Therefore we have to first ensure that all previous changeEvents have been handled
*/
const unhandledEvents = Array.from(this.collection._watchForChangesUnhandled);
/**
* when complete fires, it might be that not all changeEvents
* have passed throught, because of the delay of .wachtForChanges()
* Therefore we have to first ensure that all previous changeEvents have been handled
*/
const unhandledEvents = Array.from(this.collection._watchForChangesUnhandled);

Promise.all(unhandledEvents).then(() => this._subjects.complete.next(info));
})
Promise.all(unhandledEvents).then(() => this._subjects.complete.next(info));
})
);
}

Expand Down Expand Up @@ -252,7 +259,7 @@ export const prototypes = {
export const overwritable = {};

export const hooks = {
createRxCollection: function (collection) {
createRxCollection: function(collection) {
INTERNAL_POUCHDBS.add(collection.pouch);
}
};
Expand Down
18 changes: 18 additions & 0 deletions test/unit/replication.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,24 @@ describe('replication.test.js', () => {
await AsyncTestUtil.waitUntil(() => emitedDocs.length === 10);
emitedDocs.forEach(doc => assert.ok(doc.firstName));

c.database.destroy();
c2.database.destroy();
});
});
describe('denied$', () => {
it('should not emit', async () => {
const c = await humansCollection.create(0);
const c2 = await humansCollection.create(10);
const repState = await c.sync({
remote: c2,
waitForLeadership: false
});
const emitted = [];
repState.denied$.subscribe(doc => emitted.push(doc));

await AsyncTestUtil.wait(100);
assert.equal(emitted.length, 0);

c.database.destroy();
c2.database.destroy();
});
Expand Down
31 changes: 31 additions & 0 deletions typings/plugins/replication.d.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import { Observable } from 'rxjs';

import { RxQuery } from '../rx-query.d';
import {
PouchReplicationOptions
} from '../pouch';

export declare class RxReplicationState {
change$: Observable<any>;
docs$: Observable<any>;
denied$: Observable<any>;
active$: Observable<any>;
complete$: Observable<any>;
error$: Observable<any>;
cancel(): Promise<any>;

// if you do a custom sync, put the thing you get back from pouch here
setPouchEventEmitter(pouchSyncState: any): void;
}

export interface SyncOptions {
remote: string | any,
waitForLeadership?: boolean,
direction?: {
push?: boolean,
pull?: boolean
},
// for options see https://pouchdb.com/api.html#replication
options?: PouchReplicationOptions,
query?: RxQuery<any, any>
}
32 changes: 6 additions & 26 deletions typings/rx-collection.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,7 @@ import {
RxQuery
} from './rx-query';
import {
PouchSettings,
PouchReplicationOptions
PouchSettings
} from './pouch';
import {
RxChangeEventInsert,
Expand All @@ -28,6 +27,11 @@ import {
RxLocalDocument
} from './rx-document';

import {
SyncOptions,
RxReplicationState
} from './plugins/replication';

export interface RxCollectionCreator {
name: string;
schema: RxJsonSchema;
Expand All @@ -48,30 +52,6 @@ export interface RxCollectionCreator {
options?: any;
}

export declare class RxReplicationState {
change$: Observable<any>;
docs$: Observable<any>;
active$: Observable<any>;
complete$: Observable<any>;
error$: Observable<any>;
cancel(): Promise<any>;

// if you do a custom sync, put the thing you get back from pouch here
setPouchEventEmitter(pouchSyncState: any): void;
}

export interface SyncOptions {
remote: string | any,
waitForLeadership?: boolean,
direction?: {
push?: boolean,
pull?: boolean
},
// for options see https://pouchdb.com/api.html#replication
options?: PouchReplicationOptions,
query?: RxQuery<any, any>
}

export declare class RxCollection<RxDocumentType, OrmMethods = {}> {
readonly database: RxDatabase;
readonly name: string;
Expand Down

0 comments on commit 04e7ddd

Please sign in to comment.