Skip to content

Commit

Permalink
feat: EffectOptions.pipeline for customising processing of event.
Browse files Browse the repository at this point in the history
  • Loading branch information
mnasyrov committed Aug 28, 2022
1 parent b560114 commit 3f60eb9
Show file tree
Hide file tree
Showing 2 changed files with 112 additions and 45 deletions.
39 changes: 39 additions & 0 deletions packages/rx-effects/src/effect.test.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import {
exhaustMap,
firstValueFrom,
from,
mapTo,
Expand Down Expand Up @@ -303,6 +304,44 @@ describe('Effect', () => {
]);
});
});

describe('EffectOptions.pipeline', () => {
const timeIntervalHandler = (interval: number) =>
timer(interval).pipe(mapTo(interval));

it('should use the default mergeMap pipeline in case the option is not set', async () => {
const effect = createEffect<number, number>(timeIntervalHandler);

const resultPromise = getFirstValues(effect.result$, 3);
effect.handle(from([100, 50, 10]));

expect(effect.pending.get()).toBe(true);
expect(await resultPromise).toEqual([10, 50, 100]);
expect(effect.pending.get()).toBe(false);
});

it('should specify a custom pipeline (switchMap) for effect execution', async () => {
const effect = createEffect<number, number>(timeIntervalHandler, {
pipeline: (eventProject) => switchMap(eventProject),
});

const resultPromise = getFirstValues(effect.result$, 1);
effect.handle(from([100, 50, 10]));

expect(await resultPromise).toEqual([10]);
});

it('should specify a custom pipeline (exhaustMap) for effect execution', async () => {
const effect = createEffect<number, number>(timeIntervalHandler, {
pipeline: (eventProject) => exhaustMap(eventProject),
});

const resultPromise = getFirstValues(effect.result$, 1);
effect.handle(from([100, 50, 10]));

expect(await resultPromise).toEqual([100]);
});
});
});

async function getFirstValues<T>(
Expand Down
118 changes: 73 additions & 45 deletions packages/rx-effects/src/effect.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,17 @@
import { from, identity, merge, Observable, Subject, Subscription } from 'rxjs';
import { map } from 'rxjs/operators';
import {
defer,
identity,
map,
merge,
mergeMap,
Observable,
of,
OperatorFunction,
retry,
Subject,
Subscription,
tap,
} from 'rxjs';
import { Action } from './action';
import { declareState } from './stateDeclaration';
import { StateQuery } from './stateQuery';
Expand Down Expand Up @@ -44,6 +56,26 @@ export type EffectState<Event, Result = void, ErrorType = Error> = {
readonly pendingCount: StateQuery<number>;
};

export type EffectEventProject<Event, Result> = (
event: Event,
) => Observable<Result>;

export type EffectPipeline<Event, Result> = (
eventProject: EffectEventProject<Event, Result>,
) => OperatorFunction<Event, Result>;

const DEFAULT_MERGE_MAP_PIPELINE: EffectPipeline<any, any> = (eventProject) =>
mergeMap(eventProject);

export type EffectOptions<Event, Result> = Readonly<{
/**
* Custom pipeline for processing effect's events.
*
* `mergeMap` pipeline is used by default.
*/
pipeline?: EffectPipeline<Event, Result>;
}>;

/**
* Effect encapsulates a handler for Action or Observable.
*
Expand Down Expand Up @@ -82,53 +114,51 @@ const decreaseCount = (count: number): number => count - 1;
*/
export function createEffect<Event = void, Result = void, ErrorType = Error>(
handler: EffectHandler<Event, Result>,
options?: EffectOptions<Event, Result>,
): Effect<Event, Result, ErrorType> {
const pipeline: EffectPipeline<Event, Result> =
options?.pipeline ?? DEFAULT_MERGE_MAP_PIPELINE;

const subscriptions = new Subscription();

const event$ = new Subject<Event>();
const done$ = new Subject<{ event: Event; result: Result }>();
const error$ = new Subject<{ event: Event; error: ErrorType }>();

const pendingCount = PENDING_COUNT_STATE.createStore();

function applyHandler(event: Event): void {
pendingCount.update(increaseCount);

try {
const handlerResult = handler(event);

if (
handlerResult instanceof Promise ||
handlerResult instanceof Observable
) {
subscriptions.add(executeObservable(event, from(handlerResult)));
return;
}

pendingCount.update(decreaseCount);
done$.next({ event, result: handlerResult });
} catch (error) {
pendingCount.update(decreaseCount);
error$.next({ event, error: error as ErrorType });
}
}
subscriptions.add(() => {
event$.complete();
done$.complete();
error$.complete();
pendingCount.destroy();
});

const eventProject: EffectEventProject<Event, Result> = (event: Event) => {
return defer(() => {
pendingCount.update(increaseCount);

const result = handler(event);

return result instanceof Observable || result instanceof Promise
? result
: of(result);
}).pipe(
tap({
next: (result) => {
done$.next({ event, result });
},
complete: () => {
pendingCount.update(decreaseCount);
},
error: (error) => {
pendingCount.update(decreaseCount);
error$.next({ event, error });
},
}),
);
};

function executeObservable(
event: Event,
observable: Observable<Result>,
): Subscription {
return observable.subscribe({
next: (result) => {
done$.next({ event, result });
},
complete: () => {
pendingCount.update(decreaseCount);
},
error: (error) => {
pendingCount.update(decreaseCount);
error$.next({ event, error });
},
});
}
subscriptions.add(event$.pipe(pipeline(eventProject), retry()).subscribe());

function handle<SourceErrorType = Error>(
source: Observable<Event> | Action<Event>,
Expand All @@ -139,7 +169,7 @@ export function createEffect<Event = void, Result = void, ErrorType = Error>(
) as Observable<Event>;

const subscription = observable.subscribe({
next: (event) => applyHandler(event),
next: (event) => event$.next(event),
error: (error) => options?.onSourceFailed?.(error),
complete: () => options?.onSourceCompleted?.(),
});
Expand All @@ -157,11 +187,9 @@ export function createEffect<Event = void, Result = void, ErrorType = Error>(
pendingCount: pendingCount.query(identity),

handle,

destroy: () => {
subscriptions.unsubscribe();
done$.complete();
error$.complete();
pendingCount.destroy();
},
};
}

0 comments on commit 3f60eb9

Please sign in to comment.