Skip to content

Commit f892cc8

Browse files
feat(component-store): add support for side effects (#2544)
1 parent c3afe2f commit f892cc8

File tree

2 files changed

+230
-2
lines changed

2 files changed

+230
-2
lines changed

modules/component-store/spec/component-store.spec.ts

Lines changed: 191 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,21 @@
11
import { ComponentStore } from '@ngrx/component-store';
22
import { fakeSchedulers, marbles } from 'rxjs-marbles/jest';
3-
import { of, Subscription, ConnectableObservable, interval, timer } from 'rxjs';
4-
import { delayWhen, publishReplay, take, map } from 'rxjs/operators';
3+
import {
4+
of,
5+
Subscription,
6+
ConnectableObservable,
7+
interval,
8+
timer,
9+
Observable,
10+
} from 'rxjs';
11+
import {
12+
delayWhen,
13+
publishReplay,
14+
take,
15+
map,
16+
tap,
17+
finalize,
18+
} from 'rxjs/operators';
519

620
describe('Component Store', () => {
721
describe('initialization', () => {
@@ -627,4 +641,179 @@ describe('Component Store', () => {
627641
componentStore.ngOnDestroy();
628642
});
629643
});
644+
645+
describe('effect', () => {
646+
let componentStore: ComponentStore<object>;
647+
648+
beforeEach(() => {
649+
componentStore = new ComponentStore<object>();
650+
});
651+
652+
it(
653+
'is run when value is provided',
654+
marbles(m => {
655+
const results: string[] = [];
656+
const mockGenerator = jest.fn((origin$: Observable<string>) =>
657+
origin$.pipe(tap(v => results.push(v)))
658+
);
659+
const effect = componentStore.effect(mockGenerator);
660+
effect('value 1');
661+
effect('value 2');
662+
663+
expect(results).toEqual(['value 1', 'value 2']);
664+
})
665+
);
666+
667+
it(
668+
'is run when undefined value is provided',
669+
marbles(m => {
670+
const results: string[] = [];
671+
const mockGenerator = jest.fn((origin$: Observable<undefined>) =>
672+
origin$.pipe(tap(v => results.push(typeof v)))
673+
);
674+
const effect = componentStore.effect(mockGenerator);
675+
effect(undefined);
676+
effect();
677+
678+
expect(results).toEqual(['undefined', 'undefined']);
679+
})
680+
);
681+
682+
it(
683+
'is run when observable is provided',
684+
marbles(m => {
685+
const mockGenerator = jest.fn(origin$ => origin$);
686+
const effect = componentStore.effect(mockGenerator);
687+
688+
effect(m.cold('-a-b-c|'));
689+
690+
m.expect(mockGenerator.mock.calls[0][0]).toBeObservable(
691+
m.hot(' -a-b-c-')
692+
);
693+
})
694+
);
695+
it(
696+
'is run with multiple Observables',
697+
marbles(m => {
698+
const mockGenerator = jest.fn(origin$ => origin$);
699+
const effect = componentStore.effect(mockGenerator);
700+
701+
effect(m.cold('-a-b-c|'));
702+
effect(m.hot(' --d--e----f-'));
703+
704+
m.expect(mockGenerator.mock.calls[0][0]).toBeObservable(
705+
m.hot(' -adb-(ce)-f-')
706+
);
707+
})
708+
);
709+
710+
describe('cancels effect Observable', () => {
711+
beforeEach(() => jest.useFakeTimers());
712+
it(
713+
'by unsubscribing with returned Subscription',
714+
fakeSchedulers(advance => {
715+
const results: string[] = [];
716+
const effect = componentStore.effect((origin$: Observable<string>) =>
717+
origin$.pipe(tap(v => results.push(v)))
718+
);
719+
720+
const observable$ = interval(10).pipe(
721+
map(v => String(v)),
722+
take(10) // just in case
723+
);
724+
725+
// Update with Observable.
726+
const subsription = effect(observable$);
727+
728+
// Advance for 40 fake milliseconds and unsubscribe - should capture
729+
// from '0' to '3'
730+
advance(40);
731+
subsription.unsubscribe();
732+
733+
// Advance for 20 more fake milliseconds, to check if anything else
734+
// is captured
735+
advance(20);
736+
737+
expect(results).toEqual(['0', '1', '2', '3']);
738+
})
739+
);
740+
it(
741+
'could be unsubscribed from the specific Observable when multiple' +
742+
' are provided',
743+
fakeSchedulers(advance => {
744+
// Record all the values that go through state$ into an array
745+
const results: Array<{ value: string }> = [];
746+
const effect = componentStore.effect(
747+
(origin$: Observable<{ value: string }>) =>
748+
origin$.pipe(tap(v => results.push(v)))
749+
);
750+
751+
// Pass the first Observable to the effect.
752+
const subsription = effect(
753+
interval(10).pipe(
754+
map(v => ({ value: 'a' + v })),
755+
take(10) // just in case
756+
)
757+
);
758+
759+
// Pass the second Observable that pushes values to effect
760+
effect(
761+
timer(15, 10).pipe(
762+
map(v => ({ value: 'b' + v })),
763+
take(10)
764+
)
765+
);
766+
767+
// Advance for 40 fake milliseconds and unsubscribe - should capture
768+
// from '0' to '3'
769+
advance(40);
770+
subsription.unsubscribe();
771+
772+
// Advance for 30 more fake milliseconds, to make sure that second
773+
// Observable still emits
774+
advance(30);
775+
776+
expect(results).toEqual([
777+
{ value: 'a0' },
778+
{ value: 'b0' },
779+
{ value: 'a1' },
780+
{ value: 'b1' },
781+
{ value: 'a2' },
782+
{ value: 'b2' },
783+
{ value: 'a3' },
784+
{ value: 'b3' },
785+
{ value: 'b4' },
786+
{ value: 'b5' }, // second Observable continues to emit values
787+
]);
788+
})
789+
);
790+
791+
it('completes when componentStore is destroyed', (doneFn: jest.DoneCallback) => {
792+
componentStore.effect(origin$ =>
793+
origin$.pipe(
794+
finalize(() => {
795+
doneFn();
796+
})
797+
)
798+
)(interval(10));
799+
800+
setTimeout(() => componentStore.ngOnDestroy(), 20);
801+
jest.advanceTimersByTime(20);
802+
});
803+
804+
it('observable argument completes when componentStore is destroyed', (doneFn: jest.DoneCallback) => {
805+
componentStore.effect(origin$ => origin$)(
806+
interval(10).pipe(
807+
finalize(() => {
808+
doneFn();
809+
})
810+
)
811+
);
812+
813+
setTimeout(() => componentStore.ngOnDestroy(), 20);
814+
815+
jest.advanceTimersByTime(20);
816+
});
817+
});
818+
});
630819
});

modules/component-store/src/component-store.ts

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import {
66
Subscription,
77
throwError,
88
combineLatest,
9+
Subject,
910
} from 'rxjs';
1011
import {
1112
concatMap,
@@ -17,6 +18,15 @@ import {
1718
} from 'rxjs/operators';
1819
import { debounceSync } from './debounceSync';
1920

21+
/**
22+
* Return type of the effect, that behaves differently based on whether the
23+
* argument is passed to the callback.
24+
*/
25+
interface EffectReturnFn<T> {
26+
(): void;
27+
(t: T | Observable<T>): Subscription;
28+
}
29+
2030
export class ComponentStore<T extends object> {
2131
// Should be used only in ngOnDestroy.
2232
private readonly destroySubject$ = new ReplaySubject<void>(1);
@@ -178,4 +188,33 @@ export class ComponentStore<T extends object> {
178188
);
179189
return distinctSharedObservable$;
180190
}
191+
192+
/**
193+
* Creates an effect.
194+
*
195+
* This effect is subscribed to for the life of the @Component.
196+
* @param generator A function that takes an origin Observable input and
197+
* returns an Observable. The Observable that is returned will be
198+
* subscribed to for the life of the component.
199+
* @return A function that, when called, will trigger the origin Observable.
200+
*/
201+
effect<V, R = unknown>(
202+
generator: (origin$: Observable<V>) => Observable<R>
203+
): EffectReturnFn<V> {
204+
const origin$ = new Subject<V>();
205+
generator(origin$)
206+
// tied to the lifecycle 👇 of ComponentStore
207+
.pipe(takeUntil(this.destroy$))
208+
.subscribe();
209+
210+
return (observableOrValue?: V | Observable<V>): Subscription => {
211+
const observable$ = isObservable(observableOrValue)
212+
? observableOrValue
213+
: of(observableOrValue);
214+
return observable$.pipe(takeUntil(this.destroy$)).subscribe(value => {
215+
// any new 👇 value is pushed into a stream
216+
origin$.next(value);
217+
});
218+
};
219+
}
181220
}

0 commit comments

Comments
 (0)