Permalink
Browse files

fix(state$): initial state is no longer skipped by state$

  • Loading branch information...
jayphelps committed Jun 1, 2018
1 parent b352a98 commit 2509605885add15a9cf32f8f677c47a5cbe7eede
Showing with 49 additions and 93 deletions.
  1. +6 −37 src/StateObservable.js
  2. +10 −10 src/createEpicMiddleware.js
  3. +8 −22 test/StateObservable-spec.js
  4. +25 −24 test/createEpicMiddleware-spec.js
@@ -1,57 +1,26 @@
import { Observable, Subject } from 'rxjs';

// Used as a placeholder value so we can tell
// whether or not the real state has been set,
// even if the real state was `undefined`
export const UNSET_STATE_VALUE = {};

// We can't use BehaviorSubject because when we create
// and give state$ to your epics your reducers have not
// yet run, so there is no initial state yet until the
// `@@redux/INIT` action is emitted. BehaviorSubject
// requires an initial value (which would be undefined)
// and if an epic subscribes to it on app boot it would
// synchronously emit that undefined value incorrectly.
// We could use ReplaySubject to get that behavior, but
// then it wouldn't have the .value property.
// We also don't want to expose any Subject to the user at
// all because then they could do `state$.next()` and
// screw things up and it would just be a footgun.
// This also allows us to add a warning message for
// accessing the state$.value property before redux
// has initialized.
export class StateObservable extends Observable {
constructor(stateSubject) {
constructor(stateSubject, initialState) {
super(subscriber => {
const subscription = this.__notifier.subscribe(subscriber);
if (this.__value !== UNSET_STATE_VALUE && subscription && !subscription.closed) {
subscriber.next(this.__value);
if (subscription && !subscription.closed) {
subscriber.next(this.value);
}
return subscription;
});

this.__value = UNSET_STATE_VALUE;
this.value = initialState;
this.__notifier = new Subject();
this.__subscription = stateSubject.subscribe(value => {
// We only want to update state$ if it has actually changed since
// redux requires reducers use immutability patterns.
// This is basically what distinctUntilChanged() does but it's so simple
// we don't need to pull that code in
if (value !== this.__value) {
this.__value = value;
if (value !== this.value) {
this.value = value;
this.__notifier.next(value);
}
});
}

get value() {
if (this.__value === UNSET_STATE_VALUE) {
if (process.env.NODE_ENV !== 'production') {
require('./utils/console').warn('You accessed state$.value inside one of your Epics, before your reducers have run for the first time, so there is no state yet. You\'ll need to wait until after the first action (@@redux/INIT) is dispatched or by using state$ as an Observable.');
}
return undefined;
} else {
return this.__value;
}
}
}
@@ -21,16 +21,6 @@ export function createEpicMiddleware(options = defaultOptions) {
// inside the options object as well in case they declare only some
options = { ...defaultOptions, ...options };

const actionSubject$ = new Subject().pipe(
observeOn(queueScheduler)
);
const stateSubject$ = new Subject().pipe(
observeOn(queueScheduler)
);
const action$ = options.adapter.input(
new ActionsObservable(actionSubject$)
);
const state$ = new StateObservable(stateSubject$);
const epic$ = new Subject();
let store;

@@ -40,6 +30,16 @@ export function createEpicMiddleware(options = defaultOptions) {
require('./utils/console').warn('this middleware is already associated with a store. createEpicMiddleware should be called for every store.\n\nLearn more: https://goo.gl/2GQ7Da');
}
store = _store;
const actionSubject$ = new Subject().pipe(
observeOn(queueScheduler)
);
const stateSubject$ = new Subject().pipe(
observeOn(queueScheduler)
);
const action$ = options.adapter.input(
new ActionsObservable(actionSubject$)
);
const state$ = new StateObservable(stateSubject$, store.getState());

const result$ = epic$.pipe(
map(epic => {
@@ -22,42 +22,36 @@ describe('StateObservable', () => {

it('should mirror the source subject', () => {
const input$ = new Subject();
const state$ = new StateObservable(input$);
const state$ = new StateObservable(input$, 'first');
let result = null;

state$.subscribe(state => {
result = state;
});

expect(result).to.equal(null);
input$.next('first');
expect(result).to.equal('first');
input$.next('second');
expect(result).to.equal('second');
input$.next('third');
expect(result).to.equal('third');
});

it('should cache last state on the `value` property', () => {
const input$ = new Subject();
const state$ = new StateObservable(input$);
const state$ = new StateObservable(input$, 'first');

expect(state$.value).to.equal(undefined);
input$.next('first');
expect(state$.value).to.equal('first');
input$.next('second');
expect(state$.value).to.equal('second');
});

it('should only update when the next value shallowly differs', () => {
const input$ = new Subject();
const state$ = new StateObservable(input$);
const first = { value: 'first' };
const state$ = new StateObservable(input$, first);
const next = spySandbox.spy();
state$.subscribe(next);

expect(state$.value).to.equal(undefined);
expect(next.callCount).to.equal(0);

const first = { value: 'first' };
input$.next(first);
expect(state$.value).to.equal(first);
expect(next.callCount).to.equal(1);
expect(next.getCall(0).args).to.deep.equal([first]);
@@ -79,28 +73,20 @@ describe('StateObservable', () => {
});

it('works correctly (and does not lift) with operators applied', () => {
const first = { value: 'first' };
const input$ = new Subject();
const state$ = new StateObservable(input$).pipe(
const state$ = new StateObservable(input$, first).pipe(
map(d => d.value)
);
const next = spySandbox.spy();
state$.subscribe(next);

expect(state$.value).to.equal(undefined);
expect(next.callCount).to.equal(0);

const first = { value: 'first' };
input$.next(first);
// because we piped an operator over it state$ is no longer a StateObservable
// it's just a regular Observable and so it loses its `.value` prop
expect(state$.value).to.equal(undefined);
expect(next.callCount).to.equal(1);
expect(next.getCall(0).args).to.deep.equal(['first']);

input$.next(first);
expect(state$.value).to.equal(undefined);
expect(next.callCount).to.equal(1);

first.value = 'something else';
input$.next(first);
expect(state$.value).to.equal(undefined);
@@ -92,6 +92,10 @@ describe('createEpicMiddleware', () => {

expect(store.getState()).to.equal(2);
expect(actions).to.deep.equal([initAction, {
type: 'PONG',
input: 0,
state: 0
}, {
type: 'PING'
}, {
type: 'PONG',
@@ -114,6 +118,27 @@ describe('createEpicMiddleware', () => {
}]);
});

it('should allow accessing state$.value on epic startup', () => {
const reducer = (state = [], action) => state.concat(action);
const epic = (action$, state$) => of({
type: 'PONG',
state: state$.value
});

const middleware = createEpicMiddleware();
const store = createStore(reducer, applyMiddleware(middleware));
middleware.run(epic);

store.dispatch({ type: 'PING' });

expect(store.getState()).to.deep.equal([initAction, {
type: 'PONG',
state: [initAction]
}, {
type: 'PING'
}]);
});

it('should queue state$ updates', () => {
const actions = [];
const reducer = (state = { action: null, value: 0 }, action) => {
@@ -177,30 +202,6 @@ describe('createEpicMiddleware', () => {
}]);
});

it('should warn about accessing state$.value before @@redux/INIT', () => {
spySandbox.spy(console, 'warn');
const reducer = (state = [], action) => state.concat(action);
const epic = (action$, state$) => of({
type: 'PONG',
state: state$.value
});

const middleware = createEpicMiddleware();
const store = createStore(reducer, applyMiddleware(middleware));
middleware.run(epic);

store.dispatch({ type: 'PING' });

expect(console.warn.callCount).to.equal(1);
expect(console.warn.getCall(0).args[0]).to.equal('redux-observable | WARNING: You accessed state$.value inside one of your Epics, before your reducers have run for the first time, so there is no state yet. You\'ll need to wait until after the first action (@@redux/INIT) is dispatched or by using state$ as an Observable.');
expect(store.getState()).to.deep.equal([initAction, {
type: 'PONG',
state: undefined
}, {
type: 'PING'
}]);
});

it('should accept an epic that wires up action$ input to action$ out', () => {
const reducer = (state = [], action) => state.concat(action);
const epic = (action$, state$) =>

0 comments on commit 2509605

Please sign in to comment.