Skip to content

Commit

Permalink
feat(scheduler): allow scheduler for state scanning to be specified
Browse files Browse the repository at this point in the history
  • Loading branch information
protoman92 committed Jun 12, 2018
1 parent d7def0b commit a3c9ea8
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 31 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "reactive-rx-redux-js",
"version": "1.2.2",
"version": "1.2.3",
"description": "Rx-based Redux implementation.",
"main": "./dist/src/index.js",
"types": "./dist/src/index.d.ts",
Expand Down
28 changes: 18 additions & 10 deletions src/store/dispatch.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,10 @@
import { BehaviorSubject, Observable, Subscription, queueScheduler } from 'rxjs';
import {
BehaviorSubject,
Observable,
Scheduler,
Subscription,
} from 'rxjs';

import { observeOn, scan } from 'rxjs/operators';
import { Nullable } from 'javascriptutilities';
import { Type as StoreType } from './types';
Expand Down Expand Up @@ -68,18 +74,15 @@ export class Self<State> implements Type<State> {
return this.state.value;
}

public initialize(reducer: Reducer<State, any>): void {
let disposable = this.action.asObservable()
public initialize(reducer: Reducer<State, any>, scheduler: Scheduler): void {
this.subscription.add(this.action.asObservable()
.pipe(
mapNonNilOrEmpty(v => v),
scan((acc: State, action: action.Type<any>): State => {
return reducer(acc, action);
}, this.state.value),
observeOn(queueScheduler),
)
.subscribe(this.state);

this.subscription.add(disposable);
observeOn(scheduler))
.subscribe(this.state));
}

public deinitialize(): void {
Expand All @@ -96,10 +99,15 @@ export class Self<State> implements Type<State> {
* @template State Generics parameter.
* @param {State} initialState Initial state.
* @param {Reducer<State, any>} reducer A Reducer instance.
* @param {Scheduler} scheduler A Scheduler instance.
* @returns {Self} A dispatch store instance.
*/
export function createDefault<State>(initialState: State, reducer: Reducer<State, any>): Self<State> {
export function createDefault<State>(
initialState: State,
reducer: Reducer<State, any>,
scheduler: Scheduler,
): Self<State> {
let store = new Self(initialState);
store.initialize(reducer);
store.initialize(reducer, scheduler);
return store;
}
19 changes: 14 additions & 5 deletions src/store/rx.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Observable, merge, queueScheduler } from 'rxjs';
import { Observable, Scheduler, merge } from 'rxjs';
import { map, scan, observeOn, startWith } from 'rxjs/operators';
import { Nullable, Types } from 'javascriptutilities';
import { Type as StoreType } from './types';
Expand Down Expand Up @@ -74,16 +74,21 @@ export function createReducer<State, T>(obs: Observable<ActionType<T>>, reducer:
* @template State Generics paramter.
* @template T Generics parameter.
* @param {State} initialState Initial state.
* @param {Scheduler} scheduler A Scheduler instance.
* @param {...Observable<RxReducer<State, T>>[]} reducers An Array of Observable.
* @returns {Observable<stateinfo.Type<State, T>>} An Observable instance.
*/
export function create<State, T>(initialState: State, ...reducers: Observable<RxReducer<State, T>>[]): Observable<stateinfo.Type<State, T>> {
export function create<State, T>(
initialState: State,
scheduler: Scheduler,
...reducers: Observable<RxReducer<State, T>>[]
): Observable<stateinfo.Type<State, T>> {
return merge(...reducers).pipe(
scan((v1: stateinfo.Type<State, T>, v2: RxReducer<State, T>) => {
return v2(v1.state);
}, { state: initialState, lastAction: undefined }),
startWith({ state: initialState, lastAction: undefined }),
observeOn(queueScheduler),
observeOn(scheduler),
);
}

Expand All @@ -103,8 +108,12 @@ export interface Type<State> extends StoreType<State> { }
export class Self<State> implements Type<State> {
private store: Observable<stateinfo.Type<State, any>>;

public constructor(initialState: State, ...reducers: Observable<RxReducer<State, any>>[]) {
this.store = create(initialState, ...reducers);
public constructor(
initialState: State,
scheduler: Scheduler,
...reducers: Observable<RxReducer<State, any>>[]
) {
this.store = create(initialState, scheduler, ...reducers);
}

public get stateInfoStream(): Observable<stateinfo.Type<State, any>> {
Expand Down
30 changes: 15 additions & 15 deletions test/store.test.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
import { BehaviorSubject } from 'rxjs';
import { BehaviorSubject, queueScheduler as scheduler } from 'rxjs';
import { map } from 'rxjs/operators';
import { Collections, Nullable, Try } from 'javascriptutilities';
import { doOnNext } from 'rx-utilities-js';
import { State as S } from 'type-safe-state-js';
import { reduxstore } from './../src';
import { reduxstore as Store } from './../src';
import { Reducer as DispatchReducer } from './../src/store/dispatch';

type State = S.Type<any>;
type DispatchStore = reduxstore.dispatch.Self<State>;
type RxStore = reduxstore.rx.Self<State>;
type StoreType = reduxstore.Type<State>;
type DispatchStore = Store.dispatch.Self<State>;
type RxStore = Store.rx.Self<State>;
type StoreType = Store.Type<State>;

let timeout = 100;
let path1 = 'a.b.c.d';
Expand Down Expand Up @@ -61,28 +61,28 @@ let testReduxStore = (store: StoreType, actionFn: () => void): void => {

describe('Rx store should be implemented correctly', () => {
var action1: BehaviorSubject<number>;
var action2: BehaviorSubject<reduxstore.rx.action.Type<string>>;
var action3: BehaviorSubject<reduxstore.rx.action.Type<boolean>>;
var action2: BehaviorSubject<Store.rx.action.Type<string>>;
var action3: BehaviorSubject<Store.rx.action.Type<boolean>>;
var stateStore: RxStore;

let createStore = (): RxStore => {
let reducer1 = reduxstore.rx.createReducer(action1, (state: State, v) => {
let reducer1 = Store.rx.createReducer(action1, (state: State, v) => {
return state.mappingValue(path1, v1 => {
return v1.map(v2 => v2 + v.value).successOrElse(Try.success(v.value));
});
});

let reducer2 = reduxstore.rx.createReducer(action2, (state: State, v) => {
let reducer2 = Store.rx.createReducer(action2, (state: State, v) => {
return state.mappingValue(path2, v1 => {
return v1.map(v2 => v2 + v.value).successOrElse(Try.success(v.value));
});
});

let reducer3 = reduxstore.rx.createReducer(action3, (state: State, v) => {
let reducer3 = Store.rx.createReducer(action3, (state: State, v) => {
return state.updatingValue(path3, v.value);
});

return new reduxstore.rx.Self(S.empty(), reducer1, reducer2, reducer3);
return new Store.rx.Self(S.empty(), scheduler, reducer1, reducer2, reducer3);
};

beforeEach(() => {
Expand All @@ -108,19 +108,19 @@ describe('Rx store should be implemented correctly', () => {
describe('Dispatch store should be implemented correctly', () => {
var stateStore: DispatchStore;

let actionFn1 = (v: number): reduxstore.dispatch.action.Type<number> => ({
let actionFn1 = (v: number): Store.dispatch.action.Type<number> => ({
id: actionKey1,
fullValuePath: path1,
payload: v,
});

let actionFn2 = (v: string): reduxstore.dispatch.action.Type<string> => ({
let actionFn2 = (v: string): Store.dispatch.action.Type<string> => ({
id: actionKey2,
fullValuePath: path2,
payload: v,
});

let actionFn3 = (v: boolean): reduxstore.dispatch.action.Type<boolean> => ({
let actionFn3 = (v: boolean): Store.dispatch.action.Type<boolean> => ({
id: actionKey3,
fullValuePath: path3,
payload: v,
Expand Down Expand Up @@ -149,7 +149,7 @@ describe('Dispatch store should be implemented correctly', () => {
};

beforeEach(() => {
stateStore = reduxstore.dispatch.createDefault(S.empty(), reducer);
stateStore = Store.dispatch.createDefault(S.empty(), reducer, scheduler);
});

it('Dispatch action with action creators - should work', () => {
Expand Down

0 comments on commit a3c9ea8

Please sign in to comment.