-
Notifications
You must be signed in to change notification settings - Fork 2
/
merge-delay-error.ts
49 lines (43 loc) · 2.3 KB
/
merge-delay-error.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
import {BehaviorSubject, concat, merge, Observable, of} from 'rxjs';
import {catchError, filter, finalize, first, map, mergeAll} from 'rxjs/operators';
/* tslint:disable:max-line-length */
export function mergeDelayError<O1>(o1: Observable<O1>): Observable<O1>;
export function mergeDelayError<O1, O2>(o1: Observable<O1>, o2: Observable<O2>): Observable<O1 | O2>;
export function mergeDelayError<O1, O2, O3>(o1: Observable<O1>, o2: Observable<O2>, o3: Observable<O3>): Observable<O1 | O2 | O3>;
export function mergeDelayError<O1, O2, O3, O4>(o1: Observable<O1>, o2: Observable<O2>, o3: Observable<O3>, o4: Observable<O4>): Observable<O1 | O2 | O3 | O4>;
export function mergeDelayError<O1, O2, O3, O4, O5>(o1: Observable<O1>, o2: Observable<O2>, o3: Observable<O3>, o4: Observable<O4>, o5: Observable<O5>): Observable<O1 | O2 | O3 | O4 | O5>;
export function mergeDelayError<O1, O2, O3, O4, O5, O6>(o1: Observable<O1>, o2: Observable<O2>, o3: Observable<O3>, o4: Observable<O4>, o5: Observable<O5>, o6: Observable<O6>): Observable<O1 | O2 | O3 | O4 | O5 | O6>;
export function mergeDelayError<T>(...observables: Observable<T>[]): Observable<T>;
/* tslint:enable:max-line-length */
/**
* Creates an output observable which concurrently emits all values from every
* given input observable, but delays any thrown errors until all observables have
* completed, and throws the first error.
*/
export function mergeDelayError<T>(...observables: Observable<T>[]): Observable<T> {
const EMPTY_ERROR = Object.freeze({});
return of(observables).pipe(
map(obs => {
const replayError$ = new BehaviorSubject(EMPTY_ERROR);
const mergeDelayError$ = obs.map(o => o.pipe(
catchError((err) => {
if (replayError$.getValue() === EMPTY_ERROR) {
replayError$.next(err);
}
return of();
})
));
const error$ = replayError$.pipe(
first(),
filter(value => value !== EMPTY_ERROR),
map(value => {
throw value;
})
);
return concat(merge<T>(...mergeDelayError$), error$).pipe(
finalize(() => replayError$.complete())
);
}),
mergeAll()
);
}