Permalink
Browse files

feat(createEpicMiddleware): schedule emitted actions and epic subscri…

…ption on the queueScheduler, so that epic order matters less

BREAKING CHANGE:

You must now provide your rootEpic to `epicMiddleware.run(rootEpic)` instead of passing it to `createEpicMiddleware`. This fixes issues with redux v4 where it's no longer allowed to dispatch actions while middleware is still being setup. See https://redux-observable.js.org/MIGRATION.html

BREAKING CHANGE:

`epicMiddleware.replaceEpic` has been removed. A the equivilant behavior can be accomplished by dispatching your own `END` action that your rootEpic is listening for with a `takeUntil`, then providing the next rootEpic to `epicMiddleware.run(nextRootEpic)`. See https://redux-observable.js.org/MIGRATION.html

BREAKING CHANGE:

Actions your epics emit are now scheduled using the queueScheduler. This is a bit hard to explain (and understand) but as the name suggests, a queue is used. If the queue is empty, the action is emitted as usual, but if that action causes other actions to be emitted they will be queued up until the call stack of the first action returns.

In a large majority of cases this will have no perceivable impact, but it may affect the order of any complex epic-to-epic communication you have.

The benefit is that actions which are emitted by an epic on start up are not missed by epics which come after it. e.g. With `combineEpics(epic1, epic2)` previously if epic1 emitted on startup, epic2 would not receive that action because it had not yet been set up. See https://redux-observable.js.org/MIGRATION.html
  • Loading branch information...
jayphelps committed May 14, 2018
1 parent 816a916 commit d3516bf16a660cc80bd8e634c95b583ef207b67b
Showing with 344 additions and 196 deletions.
  1. +91 −0 MIGRATION.md
  2. +2 −4 index.d.ts
  3. +5 −5 package.json
  4. +0 −1 src/EPIC_END.js
  5. +41 −43 src/createEpicMiddleware.js
  6. +0 −1 src/index.js
  7. +5 −1 src/utils/console.js
  8. +183 −126 test/createEpicMiddleware-spec.js
  9. +17 −15 test/typings.ts
@@ -24,6 +24,97 @@ Version 1.0.0 of redux-observable requires v6.0.0 of RxJS. As of this writing 6.

There will likely be a backwards compatibility layer provided with RxJS that will allow you to use the old import-style and prototype-based operators. This _should_ still work with redux-observable, if you have issues definitely file an issue.

## Setting up the middleware

In 1.0.0 you no longer provide your root Epic to `createEpicMiddleware`. Instead, you call `epicMiddleware.run(rootEpic)` on the instance of the middleware _after_ you have created your store with it.

```ts
const epicMiddleware = createEpicMiddleware();
const store = createStore(rootReducer, applyMiddleware(epicMiddleware));
epicMiddleware.run(rootEpic);
```

This change was neccesary because in redux v4 you are no longer supposed to dispatch actions while middleware is still be setup, which an Epic could have done with the previous API.

This new API also gives you the ability to easily add Epics later, as in async lazy loading. Subsequent calls of `epicMiddleware.run(epic)` do not replace the previous ones, they are merged together.

The optional configuration/options argument to `createEpicMiddleware` for providing dependencies, adapters, etc is now the first and only argument to `createEpicMiddleware(options)`.

## Actions emitted by your epics are now scheduled on a queue

In 1.0.0 we now subscribe to your root Epic, and dispatch actions emitted by it, using the queueScheduler from RxJS. This is a bit hard to explain (and understand) but as the name suggests, a queue is used. If the queue is empty, the action is emitted as usual, but if that action synchronously causes other actions to be emitted they will be queued up until the call stack of the first action returns.

In a large majority of cases this will have no perceivable impact, but it may affect the order of any complex epic-to-epic communication you have.

One benefit is that actions which are emitted by an epic on start up are not missed by epics which come after it. e.g. With `combineEpics(epic1, epic2)` previously if epic1 emitted on startup, epic2 would not receive that action because it had not yet been set up. It also changes the potential order of complex epic-to-epic communication in a way that most may find more intuitive.

Take this example:

```ts
const epic1 = action$ =>
action$.pipe(
ofType('FIRST'),
mergeMap(() =>
of({ type: 'SECOND' }, { type: 'THIRD' })
)
);
const epic2 = action$ =>
action$.pipe(
ofType('SECOND'),
map(() => ({ type: 'FOURTH' })),
startWith({ type: 'FIRST' })
);
// notice that epic2 comes *after* epic1
const rootEpic = combineEpics(epic1, epic2);
```

In older version of redux-observable, your reducers would have been missing the FOURTH:

```
FIRST
SECOND
THIRD
```

However in 1.0.0 it now would see it as the last one:
```
FIRST
SECOND
THIRD
FOURTH
```

In that example, the SECOND action is now seen by epic2 because it is queued on the same schedule as subscribing (setting up) the Epics themselves is. Since the middleware will try to subscribe to the Epics first, it now always will finish doing so before any action is emitted--so epic2 doesn't miss any actions.

Another way of looking at it is that when an individual Epic is synchronously emitting actions, they will always be emitted in the sequence provided, without any other Epics being able to sneak another action in-between. When we did `of({ type: 'SECOND' }, { type: 'THIRD' })`, we now know _for sure_ that THIRD will immediately follow SECOND; in older versions of redux-observable this wasn't guaranteed as another Epic could have been listening for SECOND and emitted some other action before THIRD, because they shared the same call-stack.

Because this is dealing with very complex recursion, call stacks, and sequences, this may be tough to fully wrap your head around. We hope that what actually happens in practice is itself more intuitive, even if truly understanding how things are queued is now.

## `epicMiddleware.replaceEpic` was removed

If you were using `epicMiddleware.replaceEpic`, you can achieve similar behavior by dispatching your own `END` action that your root Epic listens for with a `takeUntil`, directing it to terminate. You then call `epicMiddleware.run(nextEpic)` with the next root Epic you wish to run.

```ts
// Your root Epic uses function composition to add the takeUntil.
// It combines your epics together, but instead of returning that new combined epic
// it calls it, providing the action$, state$, etc so that we can pipe the takeUntil
// on the result
const rootEpic = (action$, ...rest) =>
combineEpics(epic1, epic2, ...etc)(action$, ...rest).pipe(
takeUntil(action$.pipe(
ofType('END')
))
);
function replaceRootEpic(nextRootEpic) {
store.dispatch({ type: 'END' });
epicMiddleware.run(nextRootEpic);
}
```

## Dispatching an action

The ability to call `store.dispatch()` inside your Epics was originally provided as an escape hatch, to be used rarely, if ever. Unfortunately in practice we've seen a large number of people using it extensively. Instead, Epics should emit actions through the Observable the Epic returns, using idiomatic RxJS. If you're looking for the ability to directly call dispatch yourself (rather than emit through streams) you may be interested in using an alternative middleware that is less opinionated around RxJS.
@@ -43,7 +43,7 @@ export declare interface Epic<T extends Action, S, D = any, O extends T = T> {
}

export interface EpicMiddleware<T extends Action, S, D = any, O extends T = T> extends Middleware {
replaceEpic(nextEpic: Epic<T, S, D, O>): void;
run(rootEpic: Epic<T, S, D, O>): void;
}

interface Adapter {
@@ -56,11 +56,9 @@ interface Options<D = any> {
dependencies?: D;
}

export declare function createEpicMiddleware<T extends Action, S, D = any, O extends T = T>(rootEpic: Epic<T, S, D, O>, options?: Options<D>): EpicMiddleware<T, S, D, O>;
export declare function createEpicMiddleware<T extends Action, S, D = any, O extends T = T>(options?: Options<D>): EpicMiddleware<T, S, D, O>;

export declare function combineEpics<T extends Action, S, D = any, O extends T = T>(...epics: Epic<T, S, D, O>[]): Epic<T, S, D, O>;
export declare function combineEpics<E>(...epics: E[]): E;

export declare function ofType<T extends Action, R extends T = T, K extends R['type'] = R['type']>(...key: K[]): (source: Observable<T>) => Observable<R>;

export declare const EPIC_END: '@@redux-observable/EPIC_END';
@@ -68,8 +68,8 @@
"rxjs": ">=6.0.0-beta.0 <7"
},
"devDependencies": {
"@types/chai": "^3.4.34",
"@types/mocha": "^2.2.33",
"@types/chai": "^3.5.2",
"@types/mocha": "^2.2.48",
"@types/sinon": "^4.3.1",
"babel-cli": "^6.11.4",
"babel-core": "^6.26.0",
@@ -80,7 +80,7 @@
"babel-polyfill": "^6.13.0",
"babel-preset-env": "^1.6.1",
"babel-register": "^6.11.6",
"chai": "^4.0.1",
"chai": "^4.1.2",
"conventional-changelog-cli": "1.3.3",
"cross-env": "^5.0.0",
"eslint": "^4.6.0",
@@ -94,11 +94,11 @@
"gulp": "^3.9.1",
"gulp-babel": "^6.1.2",
"json-server": "^0.10.0",
"mocha": "^3.0.1",
"mocha": "^3.5.3",
"redux": "^3.5.2",
"rimraf": "^2.5.4",
"rxjs": "^6.0.0",
"sinon": "^2.3.3",
"sinon": "^4.5.0",
"typescript": "^2.1.4",
"webpack": "^4.5.0",
"webpack-cli": "^2.0.13",

This file was deleted.

Oops, something went wrong.
@@ -1,8 +1,7 @@
import { Subject } from 'rxjs';
import { map, switchMap } from 'rxjs/operators';
import { Subject, from, queueScheduler } from 'rxjs';
import { map, mergeMap, observeOn, subscribeOn } from 'rxjs/operators';
import { ActionsObservable } from './ActionsObservable';
import { StateObservable } from './StateObservable';
import { EPIC_END } from './EPIC_END';

const defaultAdapter = {
input: action$ => action$,
@@ -13,75 +12,74 @@ const defaultOptions = {
adapter: defaultAdapter
};

export function createEpicMiddleware(rootEpic, options = defaultOptions) {
if (typeof rootEpic !== 'function') {
throw new TypeError('You must provide a root Epic to createEpicMiddleware');
}

export function createEpicMiddleware(options = defaultOptions) {
// even though we used default param, we need to merge the defaults
// inside the options object as well in case they declare only some
options = { ...defaultOptions, ...options };

const input$ = new Subject();
const actionSubject$ = new Subject().pipe(
observeOn(queueScheduler)
);
const action$ = options.adapter.input(
new ActionsObservable(input$)
new ActionsObservable(actionSubject$)
);
const epic$ = new Subject();
let store;

const epicMiddleware = _store => {
if (process.env.NODE_ENV !== 'production' && store) {
// https://github.com/redux-observable/redux-observable/issues/389
require('./utils/console').warn('this middleware is already associated with a store. createEpicMiddleware should be called for every store.\n\n See https://goo.gl/2GQ7Da');
require('./utils/console').warn('this middleware is already associated with a store. createEpicMiddleware should be called for every store.\n\nLearn more: https://goo.gl/2GQ7Da');
}
const stateInput$ = new Subject();
const state$ = new StateObservable(stateInput$, _store);
store = _store;
const stateSubject$ = new Subject().pipe(
observeOn(queueScheduler)
);
const state$ = new StateObservable(stateSubject$, _store);

const result$ = epic$.pipe(
map(epic => {
const output$ = 'dependencies' in options
? epic(action$, state$, options.dependencies)
: epic(action$, state$);

if (!output$) {
throw new TypeError(`Your root Epic "${epic.name || '<anonymous>'}" does not return a stream. Double check you\'re not missing a return statement!`);
}

return output$;
}),
mergeMap(output$ =>
from(options.adapter.output(output$)).pipe(
subscribeOn(queueScheduler),
observeOn(queueScheduler)
)
)
);

result$.subscribe(store.dispatch);

return next => {
const result$ = epic$.pipe(
map(epic => {
const output$ = ('dependencies' in options)
? epic(action$, state$, options.dependencies)
: epic(action$, state$);

if (!output$) {
throw new TypeError(`Your root Epic "${epic.name || '<anonymous>'}" does not return a stream. Double check you\'re not missing a return statement!`);
}

return output$;
}),
switchMap(output$ => options.adapter.output(output$))
);

result$.subscribe(store.dispatch);

// Setup initial root epic. It's done this way so that
// it's possible for them to call replaceEpic later
epic$.next(rootEpic);

return action => {
// Downstream middleware gets the action first,
// which includes their reducers, so state is
// updated before epics receive the action
const result = next(action);

// It's important to update the state$ before we emit
// the action because otherwise it would be stale!
stateInput$.next(store.getState());
input$.next(action);
// the action because otherwise it would be stale
stateSubject$.next(store.getState());
actionSubject$.next(action);

return result;
};
};
};

epicMiddleware.replaceEpic = rootEpic => {
// gives the previous root Epic a last chance
// to do some clean up
store.dispatch({ type: EPIC_END });
// switches to the new root Epic, synchronously terminating
// the previous one
epicMiddleware.run = rootEpic => {
if (process.env.NODE_ENV !== 'production' && !store) {
require('./utils/console').warn('epicMiddleware.run(rootEpic) called before the middleware has been setup by redux. Provide the epicMiddleware instance to createStore() first.');
}
epic$.next(rootEpic);
};

@@ -2,5 +2,4 @@ export { createEpicMiddleware } from './createEpicMiddleware';
export { ActionsObservable } from './ActionsObservable';
export { StateObservable } from './StateObservable';
export { combineEpics } from './combineEpics';
export { EPIC_END } from './EPIC_END';
export { ofType } from './operators';
@@ -1,4 +1,8 @@
const deprecationsSeen = {};
let deprecationsSeen = {};
export const resetDeprecationsSeen = () => {
deprecationsSeen = {};
};

const consoleWarn = (typeof console === 'object' && typeof console.warn === 'function')
? (...args) => console.warn(...args)
: () => { };
Oops, something went wrong.

0 comments on commit d3516bf

Please sign in to comment.