Skip to content

Commit

Permalink
fix(createEpicMiddleware): Don't share a scheduler queue with anyone …
Browse files Browse the repository at this point in the history
…else's RxJS code, fixes #624 (#625)
  • Loading branch information
jayphelps committed Mar 26, 2019
1 parent a3c09bc commit e5bae19
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 5 deletions.
15 changes: 11 additions & 4 deletions src/createEpicMiddleware.js
Expand Up @@ -4,6 +4,13 @@ import { ActionsObservable } from './ActionsObservable';
import { StateObservable } from './StateObservable';

export function createEpicMiddleware(options = {}) {
// This isn't great. RxJS doesn't publicly export the constructor for
// QueueScheduler nor QueueAction, so we reach in. We need to do this because
// we don't want our internal queuing mechanism to be on the same queue as any
// other RxJS code outside of redux-observable internals.
const QueueScheduler = queueScheduler.constructor;
const uniqueQueueScheduler = new QueueScheduler(queueScheduler.SchedulerAction);

if (process.env.NODE_ENV !== 'production' && typeof options === 'function') {
throw new TypeError('Providing your root Epic to `createEpicMiddleware(rootEpic)` is no longer supported, instead use `epicMiddleware.run(rootEpic)`\n\nLearn more: https://redux-observable.js.org/MIGRATION.html#setting-up-the-middleware');
}
Expand All @@ -18,10 +25,10 @@ export function createEpicMiddleware(options = {}) {
}
store = _store;
const actionSubject$ = new Subject().pipe(
observeOn(queueScheduler)
observeOn(uniqueQueueScheduler)
);
const stateSubject$ = new Subject().pipe(
observeOn(queueScheduler)
observeOn(uniqueQueueScheduler)
);
const action$ = new ActionsObservable(actionSubject$);
const state$ = new StateObservable(stateSubject$, store.getState());
Expand All @@ -40,8 +47,8 @@ export function createEpicMiddleware(options = {}) {
}),
mergeMap(output$ =>
from(output$).pipe(
subscribeOn(queueScheduler),
observeOn(queueScheduler)
subscribeOn(uniqueQueueScheduler),
observeOn(uniqueQueueScheduler)
)
)
);
Expand Down
38 changes: 37 additions & 1 deletion test/createEpicMiddleware-spec.js
Expand Up @@ -5,7 +5,7 @@ import sinon from 'sinon';
import { createStore, applyMiddleware } from 'redux';
import { createEpicMiddleware, combineEpics, ActionsObservable, StateObservable, ofType } from '../';
import { resetDeprecationsSeen } from '../lib/cjs/utils/console';
import { of, empty, merge } from 'rxjs';
import { of, empty, merge, queueScheduler } from 'rxjs';
import { mapTo, filter, map, mergeMap, startWith, ignoreElements, distinctUntilChanged } from 'rxjs/operators';
import { initAction } from './initAction';

Expand Down Expand Up @@ -414,4 +414,40 @@ describe('createEpicMiddleware', () => {

expect(epic.called).to.equal(true);
});

it('should not allow interference from the public queueScheduler singleton', (done) => {
const reducer = (state = [], action) => state.concat(action);
const epic1 = action$ =>
action$.pipe(
ofType('ACTION_1'),
mergeMap(() =>
of({ type: 'ACTION_2' }, { type: 'ACTION_3' })
)
);

const epic2 = action$ =>
action$.pipe(
ofType('ACTION_2'),
map(() => ({ type: 'ACTION_4' }))
);

const rootEpic = combineEpics(epic1, epic2);

queueScheduler.schedule(() => {
const middleware = createEpicMiddleware();
const store = createStore(reducer, applyMiddleware(middleware));
middleware.run(rootEpic);
store.dispatch({ type: 'ACTION_1' });

expect(store.getState()).to.deep.equal([
initAction,
{ type: 'ACTION_1' },
{ type: 'ACTION_2' },
{ type: 'ACTION_3' },
{ type: 'ACTION_4' }
]);

done();
});
});
});

0 comments on commit e5bae19

Please sign in to comment.