Skip to content

Commit

Permalink
refactor(nx): rxjs lettable operators
Browse files Browse the repository at this point in the history
  • Loading branch information
Manduro authored and vsavkin committed Nov 17, 2017
1 parent 058c899 commit 715efa4
Show file tree
Hide file tree
Showing 6 changed files with 41 additions and 61 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
"karma-jasmine": "~1.1.0",
"karma-webpack": "2.0.4",
"prettier": "1.7.4",
"rxjs": "5.4.3",
"rxjs": "5.5.2",
"tslint": "5.7.0",
"typescript": "2.5.3"
},
Expand Down
6 changes: 4 additions & 2 deletions packages/nx/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@
"description": "Nrwl Extensions for Angular",
"main": "index.js",
"types": "index.d.js",
"dependencies" :{
"dependencies": {
"jasmine-marbles": "0.1.0"
},
"peerDependencies" :{},
"peerDependencies": {
"rxjs": "^5.5.0"
},
"author": "Victor Savkin",
"license": "MIT"
}
8 changes: 3 additions & 5 deletions packages/nx/spec/data-persistence.spec.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
import 'rxjs/add/operator/delay';

import { Component, Injectable } from '@angular/core';
import { ComponentFixture, fakeAsync, TestBed, tick } from '@angular/core/testing';
import { ActivatedRouteSnapshot, Router } from '@angular/router';
Expand All @@ -11,7 +9,7 @@ import { Store, StoreModule } from '@ngrx/store';
import { Observable } from 'rxjs/Observable';
import { of } from 'rxjs/observable/of';
import { _throw } from 'rxjs/observable/throw';
import { delay } from 'rxjs/operator/delay';
import { delay } from 'rxjs/operators';
import { Subject } from 'rxjs/Subject';

import { DataPersistence } from '../index';
Expand Down Expand Up @@ -249,7 +247,7 @@ describe('DataPersistence', () => {
return of({
type: 'TODOS',
payload: { user: state.user, todos: 'some todos' }
}).delay(1);
}).pipe(delay(1));
},

onError: (a, e: any) => null
Expand Down Expand Up @@ -295,7 +293,7 @@ describe('DataPersistence', () => {
@Effect()
loadTodo = this.s.fetch<GetTodo>('GET_TODO', {
id: (a, state) => a.payload.id,
run: (a, state) => of({ type: 'TODO', payload: a.payload }).delay(1),
run: (a, state) => of({ type: 'TODO', payload: a.payload }).pipe(delay(1)),
onError: (a, e: any) => null
});

Expand Down
70 changes: 29 additions & 41 deletions packages/nx/src/data-persistence.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,7 @@ import { ROUTER_NAVIGATION, RouterNavigationAction } from '@ngrx/router-store';
import { Action, State, Store } from '@ngrx/store';
import { Observable } from 'rxjs/Observable';
import { of } from 'rxjs/observable/of';
import { _catch } from 'rxjs/operator/catch';
import { concatMap } from 'rxjs/operator/concatMap';
import { filter } from 'rxjs/operator/filter';
import { groupBy } from 'rxjs/operator/groupBy';
import { map } from 'rxjs/operator/map';
import { mergeMap } from 'rxjs/operator/mergeMap';
import { switchMap } from 'rxjs/operator/switchMap';
import { withLatestFrom } from 'rxjs/operator/withLatestFrom';
import { catchError, concatMap, filter, groupBy, map, mergeMap, switchMap, withLatestFrom } from 'rxjs/operators';

/**
* See {@link DataPersistence.pessimisticUpdate} for more information.
Expand Down Expand Up @@ -46,8 +39,6 @@ export interface HandleNavigationOpts<T> {
onError?(a: ActivatedRouteSnapshot, e: any): Observable<any> | any;
}

type Pair = [Action, any];

/**
* @whatItDoes Provides convenience methods for implementing common operations of persisting data.
*/
Expand Down Expand Up @@ -91,10 +82,10 @@ export class DataPersistence<T> {
* }
* ```
*/
pessimisticUpdate<A = Action>(actionType: string, opts: PessimisticUpdateOpts<T, A>): Observable<any> {
const nav = this.actions.ofType(actionType);
const pairs = withLatestFrom.call(nav, this.store);
return concatMap.call(pairs, this.runWithErrorHandling(opts.run, opts.onError));
pessimisticUpdate<A extends Action = Action>(actionType: string, opts: PessimisticUpdateOpts<T, A>): Observable<any> {
const nav = this.actions.ofType<A>(actionType);
const pairs = nav.pipe(withLatestFrom(this.store));
return pairs.pipe(concatMap(this.runWithErrorHandling(opts.run, opts.onError)));
}

/**
Expand Down Expand Up @@ -133,10 +124,10 @@ export class DataPersistence<T> {
* }
* ```
*/
optimisticUpdate<A = Action>(actionType: string, opts: OptimisticUpdateOpts<T, A>): Observable<any> {
const nav = this.actions.ofType(actionType);
const pairs = withLatestFrom.call(nav, this.store);
return concatMap.call(pairs, this.runWithErrorHandling(opts.run, opts.undoAction));
optimisticUpdate<A extends Action = Action>(actionType: string, opts: OptimisticUpdateOpts<T, A>): Observable<any> {
const nav = this.actions.ofType<A>(actionType);
const pairs = nav.pipe(withLatestFrom(this.store));
return pairs.pipe(concatMap(this.runWithErrorHandling(opts.run, opts.undoAction)));
}

/**
Expand Down Expand Up @@ -206,17 +197,17 @@ export class DataPersistence<T> {
* In addition, if DataPersistence notices that there are multiple requests for Todo 1 scheduled,
* it will only run the last one.
*/
fetch<A = Action>(actionType: string, opts: FetchOpts<T, A>): Observable<any> {
const nav = this.actions.ofType(actionType);
const allPairs = withLatestFrom.call(nav, this.store);
fetch<A extends Action = Action>(actionType: string, opts: FetchOpts<T, A>): Observable<any> {
const nav = this.actions.ofType<A>(actionType);
const allPairs = nav.pipe(withLatestFrom(this.store));

if (opts.id) {
const groupedFetches: Observable<Observable<Pair>> = groupBy.call(allPairs, p => opts.id(p[0], p[1]));
return mergeMap.call(groupedFetches, (pairs: Observable<Pair>) =>
switchMap.call(pairs, this.runWithErrorHandling(opts.run, opts.onError))
const groupedFetches = allPairs.pipe(groupBy(([action, store]) => opts.id(action, store)));
return groupedFetches.pipe(
mergeMap(pairs => pairs.pipe(switchMap(this.runWithErrorHandling(opts.run, opts.onError))))
);
} else {
return concatMap.call(allPairs, this.runWithErrorHandling(opts.run, opts.onError));
return allPairs.pipe(concatMap(this.runWithErrorHandling(opts.run, opts.onError)));
}
}

Expand Down Expand Up @@ -255,24 +246,21 @@ export class DataPersistence<T> {
*
*/
navigation(component: Type<any>, opts: HandleNavigationOpts<T>): Observable<any> {
const nav = filter.call(
map.call(this.actions.ofType(ROUTER_NAVIGATION), (a: RouterNavigationAction<RouterStateSnapshot>) =>
findSnapshot(component, a.payload.routerState.root)
),
s => !!s
);
const nav = this.actions
.ofType<RouterNavigationAction<RouterStateSnapshot>>(ROUTER_NAVIGATION)
.pipe(map(a => findSnapshot(component, a.payload.routerState.root)), filter(s => !!s));

const pairs = withLatestFrom.call(nav, this.store);
return switchMap.call(pairs, this.runWithErrorHandling(opts.run, opts.onError));
const pairs = nav.pipe(withLatestFrom(this.store));
return pairs.pipe(switchMap(this.runWithErrorHandling(opts.run, opts.onError)));
}

private runWithErrorHandling(run: any, onError: any) {
return a => {
private runWithErrorHandling<A, R>(run: (a: A, state?: T) => Observable<R> | R | void, onError: any) {
return ([action, state]: [A, T]): Observable<R> => {
try {
const r = wrapIntoObservable(run(a[0], a[1]));
return _catch.call(r, e => wrapIntoObservable(onError(a[0], e)));
const r = wrapIntoObservable(run(action, state));
return r.pipe(catchError(e => wrapIntoObservable(onError(action, e))));
} catch (e) {
return wrapIntoObservable(onError(a[0], e));
return wrapIntoObservable(onError(action, e));
}
};
}
Expand All @@ -291,12 +279,12 @@ function findSnapshot(component: Type<any>, s: ActivatedRouteSnapshot): Activate
return null;
}

function wrapIntoObservable(obj: any): Observable<any> {
if (!!obj && typeof obj.subscribe === 'function') {
function wrapIntoObservable<O>(obj: Observable<O> | O | void): Observable<O> {
if (!!obj && obj instanceof Observable) {
return obj;
} else if (!obj) {
return of();
} else {
return of(obj);
return of(obj as O);
}
}
8 changes: 3 additions & 5 deletions packages/nx/src/testing-utils.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
import { Observable } from 'rxjs/Observable';
import { first } from 'rxjs/operator/first';
import { toArray } from 'rxjs/operator/toArray';
import { toPromise } from 'rxjs/operator/toPromise';
import { first, toArray } from 'rxjs/operators';

/**
* @whatItDoes reads all the values from an observable and returns a promise
Expand All @@ -16,7 +14,7 @@ import { toPromise } from 'rxjs/operator/toPromise';
* ```
*/
export function readAll<T>(o: Observable<T>): Promise<T[]> {
return toPromise.call(toArray.call(o));
return o.pipe(toArray()).toPromise();
}

/**
Expand All @@ -32,5 +30,5 @@ export function readAll<T>(o: Observable<T>): Promise<T[]> {
* ```
*/
export function readFirst<T>(o: Observable<T>): Promise<T> {
return toPromise.call(first.call(o));
return o.pipe(first()).toPromise();
}
8 changes: 1 addition & 7 deletions yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -5149,13 +5149,7 @@ run-queue@^1.0.0, run-queue@^1.0.3:
dependencies:
aproba "^1.1.1"

rxjs@5.4.3:
version "5.4.3"
resolved "https://registry.yarnpkg.com/rxjs/-/rxjs-5.4.3.tgz#0758cddee6033d68e0fd53676f0f3596ce3d483f"
dependencies:
symbol-observable "^1.0.1"

rxjs@^5.5.2:
rxjs@5.5.2, rxjs@^5.5.2:
version "5.5.2"
resolved "https://registry.yarnpkg.com/rxjs/-/rxjs-5.5.2.tgz#28d403f0071121967f18ad665563255d54236ac3"
dependencies:
Expand Down

0 comments on commit 715efa4

Please sign in to comment.