Permalink
Browse files

feat(middleware processor): add argument to middleware to set up asyn…

…c processing for all actions pumped through store

BREAKING CHANGE: dispatched actions will now occur _before_ the actions created by synchronous observable side effects.
  • Loading branch information...
benlesh committed May 20, 2016
1 parent 1d53012 commit 5a672be00504f6305f1e50816175d8a49f334e6c
Showing with 32 additions and 4 deletions.
  1. +6 −2 src/reduxObservable.js
  2. +26 −2 test/reduxObservable-spec.js
@@ -2,19 +2,23 @@ import { Subject } from 'rxjs/Subject';
import { from } from 'rxjs/observable/from';
import { ActionsObservable } from './ActionsObservable';
export function reduxObservable() {
export function reduxObservable(processor) {
let actions = new Subject();
let actionsObs = new ActionsObservable(actions);
let middleware = (store) => (next) => {
if (processor) {
processor(actionsObs, store).subscribe(store.dispatch);
}
return (action) => {
if (typeof action === 'function') {
let obs = from(action(actionsObs, store));
let sub = obs.subscribe(store.dispatch);
return sub;
} else {
let result = next(action);
actions.next(action);
return next(action);
return result;
}
};
};
@@ -10,6 +10,30 @@ import $$observable from 'symbol-observable';
const { Observable } = Rx;
describe('reduxObservable', () => {
it('should accept a processor argument that wires up a stream of actions to a stream of actions', () => {
const reducer = (state = [], action) => state.concat(action);
const middleware = reduxObservable((actions, store) =>
Observable.merge(
actions.ofType('FIRE_1').mapTo({ type: 'ACTION_1' }),
actions.ofType('FIRE_2').mapTo({ type: 'ACTION_2' })
)
);
const store = createStore(reducer, applyMiddleware(middleware));
store.dispatch({ type: 'FIRE_1' });
store.dispatch({ type: 'FIRE_2' });
expect(store.getState()).to.deep.equal([
{ type: '@@redux/INIT' },
{ type: 'FIRE_1' },
{ type: 'ACTION_1' },
{ type: 'FIRE_2' },
{ type: 'ACTION_2' }
]);
});
it('should intercept and process actions', (done) => {
const reducer = (state = [], action) => state.concat(action);
@@ -141,8 +165,8 @@ describe('reduxObservable', () => {
expect(store.getState()).to.deep.equal([
{ type: '@@redux/INIT' },
{ type: 'ASYNC_ACTION_1' },
{ type: 'ASYNC_ACTION_ABORT_MERGED' },
{ type: 'ASYNC_ACTION_ABORT' }
{ type: 'ASYNC_ACTION_ABORT' },
{ type: 'ASYNC_ACTION_ABORT_MERGED' }
]);
done();
}, 100);

0 comments on commit 5a672be

Please sign in to comment.