-
Notifications
You must be signed in to change notification settings - Fork 55
/
SyncableIntervalTrackerSubject.ts
43 lines (37 loc) · 1.34 KB
/
SyncableIntervalTrackerSubject.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
import { Milliseconds } from '../types';
import { Observable, Subject, exhaustMap, interval, merge, startWith, switchMap, takeUntil } from 'rxjs';
import { TrackerSubject } from './TrackerSubject';
import { retryBackoff } from 'backoff-rxjs';
export type RetryOperator = () => ReturnType<typeof retryBackoff>;
export interface SourceTrackerProps<T> {
provider$: Observable<T>;
pollInterval: Milliseconds;
}
export interface ProviderTrackerSubjectInternals {
externalTrigger$?: Subject<void>;
interval$?: Observable<unknown>;
}
export class SyncableIntervalTrackerSubject<T> extends TrackerSubject<T> {
#externalTrigger$ = new Subject<void>();
constructor(
{ provider$, pollInterval }: SourceTrackerProps<T>,
{ externalTrigger$ = new Subject(), interval$ = interval(pollInterval) }: ProviderTrackerSubjectInternals = {}
) {
super(
merge(
// Fetch at regular interval
interval$.pipe(
startWith(null),
// Throttle syncing by interval, cancel ongoing request on external trigger
exhaustMap(() => provider$.pipe(takeUntil(externalTrigger$)))
),
// Always immediately restart request on external trigger
externalTrigger$.pipe(switchMap(() => provider$))
)
);
this.#externalTrigger$ = externalTrigger$;
}
sync() {
this.#externalTrigger$.next();
}
}