-
-
Notifications
You must be signed in to change notification settings - Fork 2k
/
effect_sources.ts
117 lines (106 loc) · 2.9 KB
/
effect_sources.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
import { ErrorHandler, Inject, Injectable } from '@angular/core';
import { Action } from '@ngrx/store';
import { Observable, Subject, merge } from 'rxjs';
import {
dematerialize,
exhaustMap,
filter,
groupBy,
map,
mergeMap,
take,
} from 'rxjs/operators';
import {
reportInvalidActions,
EffectNotification,
} from './effect_notification';
import { EffectsErrorHandler } from './effects_error_handler';
import { mergeEffects } from './effects_resolver';
import {
isOnIdentifyEffects,
isOnRunEffects,
isOnInitEffects,
} from './lifecycle_hooks';
import { EFFECTS_ERROR_HANDLER } from './tokens';
import {
getSourceForInstance,
isClassInstance,
ObservableNotification,
} from './utils';
@Injectable({ providedIn: 'root' })
export class EffectSources extends Subject<any> {
constructor(
private errorHandler: ErrorHandler,
@Inject(EFFECTS_ERROR_HANDLER)
private effectsErrorHandler: EffectsErrorHandler
) {
super();
}
addEffects(effectSourceInstance: any): void {
this.next(effectSourceInstance);
}
/**
* @internal
*/
toActions(): Observable<Action> {
return this.pipe(
groupBy((effectsInstance) =>
isClassInstance(effectsInstance)
? getSourceForInstance(effectsInstance)
: effectsInstance
),
mergeMap((source$) => {
return source$.pipe(groupBy(effectsInstance));
}),
mergeMap((source$) => {
const effect$ = source$.pipe(
exhaustMap((sourceInstance) => {
return resolveEffectSource(
this.errorHandler,
this.effectsErrorHandler
)(sourceInstance);
}),
map((output) => {
reportInvalidActions(output, this.errorHandler);
return output.notification;
}),
filter(
(notification): notification is ObservableNotification<Action> =>
notification.kind === 'N' && notification.value != null
),
dematerialize()
);
// start the stream with an INIT action
// do this only for the first Effect instance
const init$ = source$.pipe(
take(1),
filter(isOnInitEffects),
map((instance) => instance.ngrxOnInitEffects())
);
return merge(effect$, init$);
})
);
}
}
function effectsInstance(sourceInstance: any) {
if (isOnIdentifyEffects(sourceInstance)) {
return sourceInstance.ngrxOnIdentifyEffects();
}
return '';
}
function resolveEffectSource(
errorHandler: ErrorHandler,
effectsErrorHandler: EffectsErrorHandler
): (sourceInstance: any) => Observable<EffectNotification> {
return (sourceInstance) => {
const mergedEffects$ = mergeEffects(
sourceInstance,
errorHandler,
effectsErrorHandler
);
if (isOnRunEffects(sourceInstance)) {
return sourceInstance.ngrxOnRunEffects(mergedEffects$);
}
return mergedEffects$;
};
}