Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(state$): The second argument of an Epic is now a stream of state$, not a store #410

Merged
merged 2 commits into from Apr 4, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
70 changes: 70 additions & 0 deletions src/StateObservable.js
@@ -0,0 +1,70 @@
import { Observable } from 'rxjs';

// Used as a placeholder value so we can tell
// whether or not the real state has been set,
// even if the real state was `undefined`
export const UNSET_STATE_VALUE = {};

// We can't use BehaviorSubject because when we create
// and give state$ to your epics your reducers have not
// yet run, so there is no initial state yet until the
// `@@redux/INIT` action is emitted. BehaviorSubject
// requires an initial value (which would be undefined)
// and if an epic subscribes to it on app boot it would
// synchronously emit that undefined value incorrectly.
// We also don't want to expose a Subject to the user at
// all because then they could do `state$.next()` and
// screw things up and it would just be a footgun.
export class StateObservable extends Observable {
constructor(stateSubject, store) {
super();
this.source = stateSubject;
// If you're reading this, keep in mind that this is
// NOT part of the public API and will be removed!
this.__store = store;
this.__value = UNSET_STATE_VALUE;

this.source.subscribe(value => {
this.__value = value;
});
}

_subscribe(subscriber) {
const subscription = super._subscribe(subscriber);
if (this.__value !== UNSET_STATE_VALUE && subscription && !subscription.closed) {
subscriber.next(this.__value);
}
return subscription;
}

get value() {
if (this.__value === UNSET_STATE_VALUE) {
if (process.env.NODE_ENV !== 'production') {
require('./utils/console').warn('You accessed state$.value inside one of your Epics, before your reducers have run for the first time, so there is no state yet. You\'ll need to wait until after the first action (@@redux/INIT) is dispatched or by using state$ as an Observable.');
}
return undefined;
} else {
return this.__value;
}
}

lift(operator) {
const observable = new StateObservable(this);
observable.operator = operator;
return observable;
}

getState() {
if (process.env.NODE_ENV !== 'production') {
require('./utils/console').deprecate('calling store.getState() in your Epics is deprecated and will be removed. The second argument to your Epic is now a stream of state$ (a StateObservable), instead of the store. To imperatively get the current state use state$.value instead of getState(). Alternatively, since it\'s now a stream you can compose and react to state changes.\n\n function <T, R, S, D>(action$: ActionsObservable<T>, state$: StateObservable<S>, dependencies?: D): Observable<R>\n\nLearn more: https://redux-observable.js.org/MIGRATION.html');
}
return this.__value;
}

dispatch(...args) {
if (process.env.NODE_ENV !== 'production') {
require('./utils/console').deprecate('calling store.dispatch() directly in your Epics is deprecated and will be removed. The second argument to your Epic is now a stream of state$ (a StateObservable), instead of the store. Instead of calling store.dispatch() in your Epic, emit actions through the Observable your Epic returns.\n\n function <T, R, S, D>(action$: ActionsObservable<T>, state$: StateObservable<S>, dependencies?: D): Observable<R>\n\nLearn more: https://redux-observable.js.org/MIGRATION.html');
}
return this.__store.dispatch(...args);
}
}
44 changes: 21 additions & 23 deletions src/createEpicMiddleware.js
@@ -1,6 +1,7 @@
import { Subject } from 'rxjs';
import { map, switchMap } from 'rxjs/operators';
import { ActionsObservable } from './ActionsObservable';
import { StateObservable } from './StateObservable';
import { EPIC_END } from './EPIC_END';

const defaultAdapter = {
Expand All @@ -20,6 +21,7 @@ export function createEpicMiddleware(rootEpic, options = defaultOptions) {
// even though we used default param, we need to merge the defaults
// inside the options object as well in case they declare only some
options = { ...defaultOptions, ...options };

const input$ = new Subject();
const action$ = options.adapter.input(
new ActionsObservable(input$)
Expand All @@ -32,47 +34,43 @@ export function createEpicMiddleware(rootEpic, options = defaultOptions) {
// https://github.com/redux-observable/redux-observable/issues/389
require('./utils/console').warn('this middleware is already associated with a store. createEpicMiddleware should be called for every store.\n\n See https://goo.gl/2GQ7Da');
}
const stateInput$ = new Subject();
const state$ = new StateObservable(stateInput$, _store);
store = _store;

return next => {
epic$.pipe(
const result$ = epic$.pipe(
map(epic => {
const vault = (process.env.NODE_ENV === 'production') ? store : {
getState: store.getState,
dispatch: (action) => {
require('./utils/console').deprecate('calling store.dispatch() directly in your Epics is deprecated and will be removed. Instead, emit actions through the Observable your Epic returns.\n\n https://goo.gl/WWNYSP');
return store.dispatch(action);
}
};

const output$ = ('dependencies' in options)
? epic(action$, vault, options.dependencies)
: epic(action$, vault);
? epic(action$, state$, options.dependencies)
: epic(action$, state$);

if (!output$) {
throw new TypeError(`Your root Epic "${epic.name || '<anonymous>'}" does not return a stream. Double check you\'re not missing a return statement!`);
}

return output$;
}),
switchMap(output$ => options.adapter.output(output$)))
.subscribe(action => {
try {
store.dispatch(action);
} catch (err) {
console.error(err);
}
}, (err) => {
console.error(err.message);
throw err;
});
switchMap(output$ => options.adapter.output(output$))
);

// Setup initial root epic
result$.subscribe(store.dispatch);

// Setup initial root epic. It's done this way so that
// it's possible for them to call replaceEpic later
epic$.next(rootEpic);

return action => {
// Downstream middleware gets the action first,
// which includes their reducers, so state is
// updated before epics receive the action
const result = next(action);

// It's important to update the state$ before we emit
// the action because otherwise it would be stale!
stateInput$.next(store.getState());
input$.next(action);

return result;
};
};
Expand Down
1 change: 1 addition & 0 deletions src/index.js
@@ -1,5 +1,6 @@
export { createEpicMiddleware } from './createEpicMiddleware';
export { ActionsObservable } from './ActionsObservable';
export { StateObservable } from './StateObservable';
export { combineEpics } from './combineEpics';
export { EPIC_END } from './EPIC_END';
export { ofType } from './operators';
23 changes: 11 additions & 12 deletions src/utils/console.js
@@ -1,16 +1,15 @@
const deprecationsSeen = {};
const consoleWarn = (typeof console === 'object' && typeof console.warn === 'function')
? (...args) => console.warn(...args)
: () => { };

export const deprecate = (typeof console === 'object' && typeof console.warn === 'function')
? msg => {
if (!deprecationsSeen[msg]) {
deprecationsSeen[msg] = true;
console.warn(`redux-observable | DEPRECATION: ${msg}`);
}
export const deprecate = msg => {
if (!deprecationsSeen[msg]) {
deprecationsSeen[msg] = true;
consoleWarn(`redux-observable | DEPRECATION: ${msg}`);
}
: () => {};
};

export const warn = (typeof console === 'object' && typeof console.warn === 'function')
? msg => {
console.warn(`redux-observable | WARNING: ${msg}`);
}
: () => {};
export const warn = msg => {
consoleWarn(`redux-observable | WARNING: ${msg}`);
};
37 changes: 37 additions & 0 deletions test/StateObservable-spec.js
@@ -0,0 +1,37 @@
/* globals describe it */
import { expect } from 'chai';
import { StateObservable } from '../';
import { Observable, Subject } from 'rxjs';

describe('StateObservable', () => {
it('should exist', () => {
expect(StateObservable.prototype).to.be.instanceof(Observable);
});

it('should mirror the source subject', () => {
const input$ = new Subject();
const state$ = new StateObservable(input$);
let result = null;

state$.subscribe(state => {
result = state;
});

expect(result).to.equal(null);
input$.next('first');
expect(result).to.equal('first');
input$.next('second');
expect(result).to.equal('second');
});

it('should cache last state on the `value` property', () => {
const input$ = new Subject();
const state$ = new StateObservable(input$);

expect(state$.value).to.equal(undefined);
input$.next('first');
expect(state$.value).to.equal('first');
input$.next('second');
expect(state$.value).to.equal('second');
});
});