Skip to content
Permalink
Browse files
feature(ofType): Type inference for ofType, removal of ActionsObserva…
…ble in favor of just Observable (#681)

BREAKING CHANGE: ActionsObservable existed so we could provide an ofType() method to the prototype of action$, before RxJS had pipeable operators. Now that pipeable operators have been out for quite some time we are removing ActionsObservable in favor or using the pipeable ofType() instead.

```js
// BEFORE
function someEpic(action$) {
  return action$
    .ofType('PING')
    .mapTo({ type: 'PONG' });
}

// AFTER
import { ofType } from 'redux-observable';
import { mapTo } from 'rxjs/operators';
function someEpic(action$) {
  return action$.pipe(
    ofType('PING')
    mapTo({ type: 'PONG' })
  );
}
```
  • Loading branch information
jayphelps committed Nov 14, 2019
1 parent cc6b077 commit 16f083d405ace2039da0836c4a404a85d8052991
Showing with 81 additions and 170 deletions.
  1. +1 −1 docs/api/combineEpics.md
  2. +0 −50 src/ActionsObservable.ts
  3. +2 −2 src/StateObservable.ts
  4. +37 −17 src/createEpicMiddleware.ts
  5. +11 −3 src/epic.ts
  6. +0 −1 src/index.ts
  7. +24 −9 src/operators.ts
  8. +0 −80 test/ActionsObservable-spec.ts
  9. +4 −5 test/combineEpics-spec.ts
  10. +2 −2 test/createEpicMiddleware-spec.ts
@@ -9,7 +9,7 @@ Please keep in mind that the order in which epics are combined affect the order

#### Returns

(*`Epic`*): An Epic that merges the output of every Epic provided and passes along the `ActionsObservable` and redux store as arguments.
(*`Epic`*): An Epic that merges the output of every Epic provided and passes along the action$ Observable and redux store as arguments.

#### Example

This file was deleted.

@@ -4,7 +4,7 @@ export class StateObservable<S> extends Observable<S> {
value: S;
private __notifier = new Subject<S>();

constructor(stateSubject: Subject<S>, initialState: S) {
constructor(input$: Observable<S>, initialState: S) {
super(subscriber => {
const subscription = this.__notifier.subscribe(subscriber);
if (subscription && !subscription.closed) {
@@ -14,7 +14,7 @@ export class StateObservable<S> extends Observable<S> {
});

this.value = initialState;
stateSubject.subscribe(value => {
input$.subscribe(value => {
// We only want to update state$ if it has actually changed since
// redux requires reducers use immutability patterns.
// This is basically what distinctUntilChanged() does but it's so simple
@@ -1,7 +1,6 @@
import { Action, Middleware, MiddlewareAPI, Dispatch } from 'redux';
import { Subject, from, queueScheduler } from 'rxjs';
import { map, mergeMap, observeOn, subscribeOn } from 'rxjs/operators';
import { ActionsObservable } from './ActionsObservable';
import { StateObservable } from './StateObservable';
import { Epic } from './epic';
import { warn } from './utils/console';
@@ -10,20 +9,34 @@ interface Options<D = any> {
dependencies?: D;
}

export interface EpicMiddleware<T extends Action, O extends T = T, S = void, D = any> extends Middleware<{}, S, Dispatch<any>> {
export interface EpicMiddleware<
T extends Action,
O extends T = T,
S = void,
D = any
> extends Middleware<{}, S, Dispatch<any>> {
run(rootEpic: Epic<T, O, S, D>): void;
}

export function createEpicMiddleware<T extends Action, O extends T = T, S = void, D = any>(options: Options<D> = {}): EpicMiddleware<T, O, S, D> {
export function createEpicMiddleware<
T extends Action,
O extends T = T,
S = void,
D = any
>(options: Options<D> = {}): EpicMiddleware<T, O, S, D> {
// This isn't great. RxJS doesn't publicly export the constructor for
// QueueScheduler nor QueueAction, so we reach in. We need to do this because
// we don't want our internal queuing mechanism to be on the same queue as any
// other RxJS code outside of redux-observable internals.
const QueueScheduler: any = queueScheduler.constructor;
const uniqueQueueScheduler: typeof queueScheduler = new QueueScheduler((queueScheduler as any).SchedulerAction);
const uniqueQueueScheduler: typeof queueScheduler = new QueueScheduler(
(queueScheduler as any).SchedulerAction
);

if (process.env.NODE_ENV !== 'production' && typeof options === 'function') {
throw new TypeError('Providing your root Epic to `createEpicMiddleware(rootEpic)` is no longer supported, instead use `epicMiddleware.run(rootEpic)`\n\nLearn more: https://redux-observable.js.org/MIGRATION.html#setting-up-the-middleware');
throw new TypeError(
'Providing your root Epic to `createEpicMiddleware(rootEpic)` is no longer supported, instead use `epicMiddleware.run(rootEpic)`\n\nLearn more: https://redux-observable.js.org/MIGRATION.html#setting-up-the-middleware'
);
}

const epic$ = new Subject<Epic<T, O, S, D>>();
@@ -32,25 +45,30 @@ export function createEpicMiddleware<T extends Action, O extends T = T, S = void
const epicMiddleware: EpicMiddleware<T, O, S, D> = _store => {
if (process.env.NODE_ENV !== 'production' && store) {
// https://github.com/redux-observable/redux-observable/issues/389
warn('this middleware is already associated with a store. createEpicMiddleware should be called for every store.\n\nLearn more: https://goo.gl/2GQ7Da');
warn(
'this middleware is already associated with a store. createEpicMiddleware should be called for every store.\n\nLearn more: https://goo.gl/2GQ7Da'
);
}
store = _store;
// `.pipe()` transforms typing from `Subject<T>` to `Observable<T>`
const actionSubject$ = new Subject<T>().pipe(
observeOn(uniqueQueueScheduler)
) as any as Subject<T>;
const stateSubject$ = new Subject<S>().pipe(
observeOn(uniqueQueueScheduler)
) as any as Subject<S>;
const action$ = new ActionsObservable(actionSubject$);
const state$ = new StateObservable(stateSubject$, store.getState());
const actionSubject$ = new Subject<T>();
const stateSubject$ = new Subject<S>();
const action$ = actionSubject$
.asObservable()
.pipe(observeOn(uniqueQueueScheduler));
const state$ = new StateObservable(
stateSubject$.pipe(observeOn(uniqueQueueScheduler)),
store.getState()
);

const result$ = epic$.pipe(
map(epic => {
const output$ = epic(action$, state$, options.dependencies!);

if (!output$) {
throw new TypeError(`Your root Epic "${epic.name || '<anonymous>'}" does not return a stream. Double check you\'re not missing a return statement!`);
throw new TypeError(
`Your root Epic "${epic.name ||
'<anonymous>'}" does not return a stream. Double check you\'re not missing a return statement!`
);
}

return output$;
@@ -84,7 +102,9 @@ export function createEpicMiddleware<T extends Action, O extends T = T, S = void

epicMiddleware.run = rootEpic => {
if (process.env.NODE_ENV !== 'production' && !store) {
warn('epicMiddleware.run(rootEpic) called before the middleware has been setup by redux. Provide the epicMiddleware instance to createStore() first.');
warn(
'epicMiddleware.run(rootEpic) called before the middleware has been setup by redux. Provide the epicMiddleware instance to createStore() first.'
);
}
epic$.next(rootEpic);
};
@@ -1,8 +1,16 @@
import { Action } from 'redux';
import { Observable } from 'rxjs';
import { StateObservable } from './StateObservable';
import { ActionsObservable } from './ActionsObservable';

export declare interface Epic<Input extends Action = any, Output extends Input = Input, State = any, Dependencies = any> {
(action$: ActionsObservable<Input>, state$: StateObservable<State>, dependencies: Dependencies): Observable<Output>;
export declare interface Epic<
Input extends Action = any,
Output extends Input = Input,
State = any,
Dependencies = any
> {
(
action$: Observable<Input>,
state$: StateObservable<State>,
dependencies: Dependencies
): Observable<Output>;
}
@@ -1,5 +1,4 @@
export { createEpicMiddleware, EpicMiddleware } from './createEpicMiddleware';
export { ActionsObservable } from './ActionsObservable';
export { StateObservable } from './StateObservable';
export { Epic } from './epic';
export { combineEpics } from './combineEpics';
@@ -1,24 +1,39 @@
import { Action } from 'redux';
import { Observable } from 'rxjs';
import { OperatorFunction } from 'rxjs';
import { filter } from 'rxjs/operators';

const keyHasType = (type: unknown, key: unknown) => {
return type === key || typeof key === 'function' && type === key.toString();
return type === key || (typeof key === 'function' && type === key.toString());
};

export const ofType = <T extends Action, R extends T = T, K extends R['type'] = R['type']>(...keys: K[]) => (source: Observable<T>) => source.pipe(
filter<T, R>((action): action is R => {
/**
* Inferring the types of this is a bit challenging, and only works in newer
* versions of TypeScript.
*
* @param ...types One or more Redux action types you want to filter for, variadic.
*/
export function ofType<
// All possible actions your app can dispatch
Input extends Action,
// The types you want to filter for
Types extends Input['type'][],
// The resulting actions that match the above types
Output extends Input = Extract<Input, Action<Types[number]>>
>(...types: Types): OperatorFunction<Input, Output> {
return filter((action): action is Output => {
const { type } = action;
const len = keys.length;
const len = types.length;

if (len === 1) {
return keyHasType(type, keys[0]);
return keyHasType(type, types[0]);
} else {
for (let i = 0; i < len; i++) {
if (keyHasType(type, keys[i])) {
if (keyHasType(type, types[i])) {
return true;
}
}
}

return false;
})
);
});
}

This file was deleted.

@@ -1,6 +1,6 @@
import { expect } from 'chai';
import sinon from 'sinon';
import { combineEpics, ActionsObservable, ofType, Epic, StateObservable } from '../';
import { combineEpics, ofType, Epic, StateObservable } from '../';
import { Action } from 'redux';
import { Subject, Observable, EMPTY } from 'rxjs';
import { map, toArray } from 'rxjs/operators';
@@ -24,15 +24,14 @@ describe('combineEpics', () => {
);

const store = new StateObservable(new Subject(), { I: 'am', a: 'store' });
const subject = new Subject<Action>();
const actions = new ActionsObservable(subject);
const actions = new Subject<Action>();
const result: Observable<Action> = (epic as any)(actions, store);
const emittedActions: any[] = [];

result.subscribe(emittedAction => emittedActions.push(emittedAction));

subject.next({ type: 'ACTION1' });
subject.next({ type: 'ACTION2' });
actions.next({ type: 'ACTION1' });
actions.next({ type: 'ACTION2' });

expect(emittedActions).to.deep.equal([
{ type: 'DELEGATED1', action: { type: 'ACTION1' }, store },
@@ -2,7 +2,7 @@ import 'babel-polyfill';
import { expect } from 'chai';
import sinon from 'sinon';
import { createStore, applyMiddleware, Reducer, Middleware, Action, AnyAction } from 'redux';
import { createEpicMiddleware, combineEpics, ActionsObservable, StateObservable, ofType, Epic, __FOR_TESTING__resetDeprecationsSeen as resetDeprecationsSeen } from '../';
import { createEpicMiddleware, combineEpics, StateObservable, ofType, Epic, __FOR_TESTING__resetDeprecationsSeen as resetDeprecationsSeen } from '../';
import { of, empty, merge, queueScheduler, Observable } from 'rxjs';
import { mapTo, filter, map, mergeMap, startWith, ignoreElements, distinctUntilChanged } from 'rxjs/operators';
import { initAction } from './initAction';
@@ -25,7 +25,7 @@ describe('createEpicMiddleware', () => {
const epicMiddleware = createEpicMiddleware();
const mockMiddleware: Middleware = _store => _next => _action => {
expect(epic.calledOnce).to.equal(true);
expect(epic.firstCall.args[0]).to.be.instanceOf(ActionsObservable);
expect(epic.firstCall.args[0]).to.be.instanceOf(Observable);
expect(epic.firstCall.args[1]).to.be.instanceof(StateObservable);
done();
};

2 comments on commit 16f083d

@thebpmgroup
Copy link

@thebpmgroup thebpmgroup commented on 16f083d Jan 27, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change is causing me an issue with the following code

export const pingAuthEpic = (action$: Observable<Action>) => action$.pipe(
    ofType(SET_ONLINE),
    mergeMap(() => interval(PING_AUTH_INTERVAL).pipe(
        mergeMap(() => of({ type: 'PING_AUTH' })),
        takeUntil(action$.ofType(SET_OFFLINE)),
    )),
);

Property 'ofType' does not exist on type 'Observable<Action>'. TS2339

58 |     mergeMap(() => interval(PING_AUTH_INTERVAL).pipe(
59 |         mergeMap(() => of({ type: 'PING_AUTH' })),

60 | takeUntil(action$.ofType(SET_OFFLINE)),
| ^
61 | )),
62 | );
63 |

If I change it to use ofType exported from your package I get the following error

Argument of type 'OperatorFunction<Action, Action>' is not assignable to parameter of type 'ObservableInput'.
Property 'getReader' is missing in type 'OperatorFunction<Action, Action>' but required in type 'ReadableStreamLike'. TS2345

58 |     mergeMap(() => interval(PING_AUTH_INTERVAL).pipe(
59 |         mergeMap(() => of({ type: 'PING_AUTH' })),

60 | takeUntil(ofType(SET_OFFLINE)),
| ^
61 | )),
62 | );
63 |

@jayphelps is there a workaround pending a fix?

@evertbouw
Copy link
Member

@evertbouw evertbouw commented on 16f083d Feb 4, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@thebpmgroup try this

takeUntil(action$.pipe(ofType(SET_OFFLINE))),

Please sign in to comment.