Skip to content

Commit

Permalink
refactor(uniqueQueueScheduler subjects): refactor uniqueQueueSchedule…
Browse files Browse the repository at this point in the history
…r usage to avoid unsafe casts
  • Loading branch information
jayphelps committed Nov 14, 2019
1 parent 24ae92c commit 73dfebd
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 18 deletions.
4 changes: 2 additions & 2 deletions src/StateObservable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ export class StateObservable<S> extends Observable<S> {
value: S;
private __notifier = new Subject<S>();

constructor(stateSubject: Subject<S>, initialState: S) {
constructor(input$: Observable<S>, initialState: S) {
super(subscriber => {
const subscription = this.__notifier.subscribe(subscriber);
if (subscription && !subscription.closed) {
Expand All @@ -14,7 +14,7 @@ export class StateObservable<S> extends Observable<S> {
});

this.value = initialState;
stateSubject.subscribe(value => {
input$.subscribe(value => {
// We only want to update state$ if it has actually changed since
// redux requires reducers use immutability patterns.
// This is basically what distinctUntilChanged() does but it's so simple
Expand Down
53 changes: 37 additions & 16 deletions src/createEpicMiddleware.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,34 @@ interface Options<D = any> {
dependencies?: D;
}

export interface EpicMiddleware<T extends Action, O extends T = T, S = void, D = any> extends Middleware<{}, S, Dispatch<any>> {
export interface EpicMiddleware<
T extends Action,
O extends T = T,
S = void,
D = any
> extends Middleware<{}, S, Dispatch<any>> {
run(rootEpic: Epic<T, O, S, D>): void;
}

export function createEpicMiddleware<T extends Action, O extends T = T, S = void, D = any>(options: Options<D> = {}): EpicMiddleware<T, O, S, D> {
export function createEpicMiddleware<
T extends Action,
O extends T = T,
S = void,
D = any
>(options: Options<D> = {}): EpicMiddleware<T, O, S, D> {
// This isn't great. RxJS doesn't publicly export the constructor for
// QueueScheduler nor QueueAction, so we reach in. We need to do this because
// we don't want our internal queuing mechanism to be on the same queue as any
// other RxJS code outside of redux-observable internals.
const QueueScheduler: any = queueScheduler.constructor;
const uniqueQueueScheduler: typeof queueScheduler = new QueueScheduler((queueScheduler as any).SchedulerAction);
const uniqueQueueScheduler: typeof queueScheduler = new QueueScheduler(
(queueScheduler as any).SchedulerAction
);

if (process.env.NODE_ENV !== 'production' && typeof options === 'function') {
throw new TypeError('Providing your root Epic to `createEpicMiddleware(rootEpic)` is no longer supported, instead use `epicMiddleware.run(rootEpic)`\n\nLearn more: https://redux-observable.js.org/MIGRATION.html#setting-up-the-middleware');
throw new TypeError(
'Providing your root Epic to `createEpicMiddleware(rootEpic)` is no longer supported, instead use `epicMiddleware.run(rootEpic)`\n\nLearn more: https://redux-observable.js.org/MIGRATION.html#setting-up-the-middleware'
);
}

const epic$ = new Subject<Epic<T, O, S, D>>();
Expand All @@ -31,25 +45,30 @@ export function createEpicMiddleware<T extends Action, O extends T = T, S = void
const epicMiddleware: EpicMiddleware<T, O, S, D> = _store => {
if (process.env.NODE_ENV !== 'production' && store) {
// https://github.com/redux-observable/redux-observable/issues/389
warn('this middleware is already associated with a store. createEpicMiddleware should be called for every store.\n\nLearn more: https://goo.gl/2GQ7Da');
warn(
'this middleware is already associated with a store. createEpicMiddleware should be called for every store.\n\nLearn more: https://goo.gl/2GQ7Da'
);
}
store = _store;
// `.pipe()` transforms typing from `Subject<T>` to `Observable<T>`
const actionSubject$ = new Subject<T>().pipe(
observeOn(uniqueQueueScheduler)
) as any as Subject<T>;
const stateSubject$ = new Subject<S>().pipe(
observeOn(uniqueQueueScheduler)
) as any as Subject<S>;
const action$ = actionSubject$.asObservable();
const state$ = new StateObservable(stateSubject$, store.getState());
const actionSubject$ = new Subject<T>();
const stateSubject$ = new Subject<S>();
const action$ = actionSubject$
.asObservable()
.pipe(observeOn(uniqueQueueScheduler));
const state$ = new StateObservable(
stateSubject$.pipe(observeOn(uniqueQueueScheduler)),
store.getState()
);

const result$ = epic$.pipe(
map(epic => {
const output$ = epic(action$, state$, options.dependencies!);

if (!output$) {
throw new TypeError(`Your root Epic "${epic.name || '<anonymous>'}" does not return a stream. Double check you\'re not missing a return statement!`);
throw new TypeError(
`Your root Epic "${epic.name ||
'<anonymous>'}" does not return a stream. Double check you\'re not missing a return statement!`
);
}

return output$;
Expand Down Expand Up @@ -83,7 +102,9 @@ export function createEpicMiddleware<T extends Action, O extends T = T, S = void

epicMiddleware.run = rootEpic => {
if (process.env.NODE_ENV !== 'production' && !store) {
warn('epicMiddleware.run(rootEpic) called before the middleware has been setup by redux. Provide the epicMiddleware instance to createStore() first.');
warn(
'epicMiddleware.run(rootEpic) called before the middleware has been setup by redux. Provide the epicMiddleware instance to createStore() first.'
);
}
epic$.next(rootEpic);
};
Expand Down

0 comments on commit 73dfebd

Please sign in to comment.