Permalink
Browse files

feat(state$): The second argument of an Epic is now a stream of state…

…$, not a store (#410)

* feat(state$): The second argument of an Epic is now a stream of state$, not a store. Closes #56

DEPRECATION: The second argument of an Epic is now a stream of
state$, not a store. You can access the current state imperatively on
the `state$.value` property, or by composing the StateObservable (state$)
reactively.

Learn more https://redux-observable.js.org/MIGRATION.html
  • Loading branch information...
jayphelps committed Apr 4, 2018
1 parent ca3e9a4 commit 2ff3f6e609f0731a0c93c6add848e619e2f0510a
Showing with 275 additions and 58 deletions.
  1. +70 −0 src/StateObservable.js
  2. +21 −23 src/createEpicMiddleware.js
  3. +1 −0 src/index.js
  4. +11 −12 src/utils/console.js
  5. +37 −0 test/StateObservable-spec.js
  6. +135 −23 test/createEpicMiddleware-spec.js
@@ -0,0 +1,70 @@
import { Observable } from 'rxjs';
// Used as a placeholder value so we can tell
// whether or not the real state has been set,
// even if the real state was `undefined`
export const UNSET_STATE_VALUE = {};
// We can't use BehaviorSubject because when we create
// and give state$ to your epics your reducers have not
// yet run, so there is no initial state yet until the
// `@@redux/INIT` action is emitted. BehaviorSubject
// requires an initial value (which would be undefined)
// and if an epic subscribes to it on app boot it would
// synchronously emit that undefined value incorrectly.
// We also don't want to expose a Subject to the user at
// all because then they could do `state$.next()` and
// screw things up and it would just be a footgun.
export class StateObservable extends Observable {
constructor(stateSubject, store) {
super();
this.source = stateSubject;
// If you're reading this, keep in mind that this is
// NOT part of the public API and will be removed!
this.__store = store;
this.__value = UNSET_STATE_VALUE;
this.source.subscribe(value => {
this.__value = value;
});
}
_subscribe(subscriber) {
const subscription = super._subscribe(subscriber);
if (this.__value !== UNSET_STATE_VALUE && subscription && !subscription.closed) {
subscriber.next(this.__value);
}
return subscription;
}
get value() {
if (this.__value === UNSET_STATE_VALUE) {
if (process.env.NODE_ENV !== 'production') {
require('./utils/console').warn('You accessed state$.value inside one of your Epics, before your reducers have run for the first time, so there is no state yet. You\'ll need to wait until after the first action (@@redux/INIT) is dispatched or by using state$ as an Observable.');
}
return undefined;
} else {
return this.__value;
}
}
lift(operator) {
const observable = new StateObservable(this);
observable.operator = operator;
return observable;
}
getState() {
if (process.env.NODE_ENV !== 'production') {
require('./utils/console').deprecate('calling store.getState() in your Epics is deprecated and will be removed. The second argument to your Epic is now a stream of state$ (a StateObservable), instead of the store. To imperatively get the current state use state$.value instead of getState(). Alternatively, since it\'s now a stream you can compose and react to state changes.\n\n function <T, R, S, D>(action$: ActionsObservable<T>, state$: StateObservable<S>, dependencies?: D): Observable<R>\n\nLearn more: https://redux-observable.js.org/MIGRATION.html');
}
return this.__value;
}
dispatch(...args) {
if (process.env.NODE_ENV !== 'production') {
require('./utils/console').deprecate('calling store.dispatch() directly in your Epics is deprecated and will be removed. The second argument to your Epic is now a stream of state$ (a StateObservable), instead of the store. Instead of calling store.dispatch() in your Epic, emit actions through the Observable your Epic returns.\n\n function <T, R, S, D>(action$: ActionsObservable<T>, state$: StateObservable<S>, dependencies?: D): Observable<R>\n\nLearn more: https://redux-observable.js.org/MIGRATION.html');
}
return this.__store.dispatch(...args);
}
}
@@ -1,6 +1,7 @@
import { Subject } from 'rxjs';
import { map, switchMap } from 'rxjs/operators';
import { ActionsObservable } from './ActionsObservable';
import { StateObservable } from './StateObservable';
import { EPIC_END } from './EPIC_END';
const defaultAdapter = {
@@ -20,6 +21,7 @@ export function createEpicMiddleware(rootEpic, options = defaultOptions) {
// even though we used default param, we need to merge the defaults
// inside the options object as well in case they declare only some
options = { ...defaultOptions, ...options };
const input$ = new Subject();
const action$ = options.adapter.input(
new ActionsObservable(input$)
@@ -32,47 +34,43 @@ export function createEpicMiddleware(rootEpic, options = defaultOptions) {
// https://github.com/redux-observable/redux-observable/issues/389
require('./utils/console').warn('this middleware is already associated with a store. createEpicMiddleware should be called for every store.\n\n See https://goo.gl/2GQ7Da');
}
const stateInput$ = new Subject();
const state$ = new StateObservable(stateInput$, _store);
store = _store;
return next => {
epic$.pipe(
const result$ = epic$.pipe(
map(epic => {
const vault = (process.env.NODE_ENV === 'production') ? store : {
getState: store.getState,
dispatch: (action) => {
require('./utils/console').deprecate('calling store.dispatch() directly in your Epics is deprecated and will be removed. Instead, emit actions through the Observable your Epic returns.\n\n https://goo.gl/WWNYSP');
return store.dispatch(action);
}
};
const output$ = ('dependencies' in options)
? epic(action$, vault, options.dependencies)
: epic(action$, vault);
? epic(action$, state$, options.dependencies)
: epic(action$, state$);
if (!output$) {
throw new TypeError(`Your root Epic "${epic.name || '<anonymous>'}" does not return a stream. Double check you\'re not missing a return statement!`);
}
return output$;
}),
switchMap(output$ => options.adapter.output(output$)))
.subscribe(action => {
try {
store.dispatch(action);
} catch (err) {
console.error(err);
}
}, (err) => {
console.error(err.message);
throw err;
});
switchMap(output$ => options.adapter.output(output$))
);
// Setup initial root epic
result$.subscribe(store.dispatch);
// Setup initial root epic. It's done this way so that
// it's possible for them to call replaceEpic later
epic$.next(rootEpic);
return action => {
// Downstream middleware gets the action first,
// which includes their reducers, so state is
// updated before epics receive the action
const result = next(action);
// It's important to update the state$ before we emit
// the action because otherwise it would be stale!
stateInput$.next(store.getState());
input$.next(action);
return result;
};
};
@@ -1,5 +1,6 @@
export { createEpicMiddleware } from './createEpicMiddleware';
export { ActionsObservable } from './ActionsObservable';
export { StateObservable } from './StateObservable';
export { combineEpics } from './combineEpics';
export { EPIC_END } from './EPIC_END';
export { ofType } from './operators';
@@ -1,16 +1,15 @@
const deprecationsSeen = {};
const consoleWarn = (typeof console === 'object' && typeof console.warn === 'function')
? (...args) => console.warn(...args)
: () => { };
export const deprecate = (typeof console === 'object' && typeof console.warn === 'function')
? msg => {
if (!deprecationsSeen[msg]) {
deprecationsSeen[msg] = true;
console.warn(`redux-observable | DEPRECATION: ${msg}`);
}
export const deprecate = msg => {
if (!deprecationsSeen[msg]) {
deprecationsSeen[msg] = true;
consoleWarn(`redux-observable | DEPRECATION: ${msg}`);
}
: () => {};
};
export const warn = (typeof console === 'object' && typeof console.warn === 'function')
? msg => {
console.warn(`redux-observable | WARNING: ${msg}`);
}
: () => {};
export const warn = msg => {
consoleWarn(`redux-observable | WARNING: ${msg}`);
};
@@ -0,0 +1,37 @@
/* globals describe it */
import { expect } from 'chai';
import { StateObservable } from '../';
import { Observable, Subject } from 'rxjs';
describe('StateObservable', () => {
it('should exist', () => {
expect(StateObservable.prototype).to.be.instanceof(Observable);
});
it('should mirror the source subject', () => {
const input$ = new Subject();
const state$ = new StateObservable(input$);
let result = null;
state$.subscribe(state => {
result = state;
});
expect(result).to.equal(null);
input$.next('first');
expect(result).to.equal('first');
input$.next('second');
expect(result).to.equal('second');
});
it('should cache last state on the `value` property', () => {
const input$ = new Subject();
const state$ = new StateObservable(input$);
expect(state$.value).to.equal(undefined);
input$.next('first');
expect(state$.value).to.equal('first');
input$.next('second');
expect(state$.value).to.equal('second');
});
});
Oops, something went wrong.

0 comments on commit 2ff3f6e

Please sign in to comment.