-
Notifications
You must be signed in to change notification settings - Fork 54
/
shareRetryBackoff.ts
38 lines (36 loc) · 1.49 KB
/
shareRetryBackoff.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import { Observable, OperatorFunction, ReplaySubject, defer, finalize } from 'rxjs';
import { RetryBackoffConfig, retryBackoff } from 'backoff-rxjs';
const defaultRetryBackoffConfig: RetryBackoffConfig = { initialInterval: 10, maxInterval: 5000, resetOnSuccess: true };
/**
* Subscribes to source observable once and wraps `operator` with retry logic based on provided `retryBackoffConfig`.
* Calls `operator` function on each retry, causing it to re-subscribe to it's source, which re-emits
* the last emitted value (the one which caused the error).
*
* @param operator re-subscribed on retry
* @param retryBackoffConfig by default { initialInterval: 10, maxInterval: 5000, resetOnSuccess: true }
*/
export const shareRetryBackoff =
<In, Out>(
operator: OperatorFunction<In, Out>,
retryBackoffConfig?: Partial<RetryBackoffConfig>
): OperatorFunction<In, Out> =>
(evt$) => {
const subject$ = new ReplaySubject<In>(1);
const sourceSubscription = evt$.subscribe(subject$);
return new Observable((observer) =>
subject$
.pipe(
(source$) => defer(() => operator(source$)),
// This will re-subscribe all the way up to subject$,
// which will re-emit the last event without re-subscribing to source event.
retryBackoff({
...defaultRetryBackoffConfig,
...retryBackoffConfig
}),
finalize(() => {
sourceSubscription.unsubscribe();
})
)
.subscribe(observer)
);
};