-
Notifications
You must be signed in to change notification settings - Fork 54
/
TipTracker.ts
94 lines (89 loc) · 3.03 KB
/
TipTracker.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
import { Cardano } from '@cardano-sdk/core';
import { ConnectionStatus, PersistentDocumentTrackerSubject, tipEquals } from './util';
import { DocumentStore } from '../persistence';
import {
EMPTY,
Observable,
Subject,
combineLatest,
concat,
delay,
distinctUntilChanged,
exhaustMap,
filter,
finalize,
merge,
of,
startWith,
switchMap,
takeUntil,
tap,
timeout
} from 'rxjs';
import { Logger } from 'ts-log';
import { Milliseconds } from './types';
import { SyncStatus } from '../types';
export interface TipTrackerProps {
provider$: Observable<Cardano.Tip>;
syncStatus: SyncStatus;
connectionStatus$: Observable<ConnectionStatus>;
store: DocumentStore<Cardano.Tip>;
/** Once */
minPollInterval: Milliseconds;
maxPollInterval: Milliseconds;
logger: Logger;
}
export interface TipTrackerInternals {
externalTrigger$?: Subject<void>;
}
const triggerOrInterval$ = <T = unknown>(trigger$: Observable<T>, interval: number): Observable<T | boolean> =>
trigger$.pipe(timeout({ each: interval, with: () => concat(of(true), triggerOrInterval$(trigger$, interval)) }));
export class TipTracker extends PersistentDocumentTrackerSubject<Cardano.Tip> {
#externalTrigger$ = new Subject<void>();
#logger: Logger;
constructor(
{ provider$, minPollInterval, maxPollInterval, store, syncStatus, connectionStatus$, logger }: TipTrackerProps,
{ externalTrigger$ = new Subject() }: TipTrackerInternals = {}
) {
super(
merge(
// schedule a fetch:
// - after some delay once fully synced and online
// - if it's not settled for maxPollInterval
combineLatest([
triggerOrInterval$(syncStatus.isSettled$.pipe(filter((isSettled) => isSettled)), maxPollInterval).pipe(
// trigger fetch after some delay once fully synced and online
delay(minPollInterval),
// trigger fetch on start
startWith(null)
),
connectionStatus$
]).pipe(
// Throttle syncing by interval, cancel ongoing request on external trigger
tap(([, connectionStatus]) => {
logger.debug(connectionStatus === ConnectionStatus.down ? 'Skipping fetch tip' : 'Fetching tip...');
}),
exhaustMap(([, connectionStatus]) =>
connectionStatus === ConnectionStatus.down
? EMPTY
: provider$.pipe(takeUntil(externalTrigger$.pipe(tap(() => logger.debug('Tip fetch canceled')))))
),
distinctUntilChanged(tipEquals),
tap((tip) => logger.debug('Fetched new tip', tip))
),
// Always immediately restart request on external trigger
externalTrigger$.pipe(
switchMap(() => provider$),
tap((tip) => logger.debug('External trigger fetched tip', tip))
)
).pipe(finalize(() => this.#externalTrigger$.complete())),
store
);
this.#externalTrigger$ = externalTrigger$;
this.#logger = logger;
}
sync() {
this.#logger.debug('Manual sync triggered');
this.#externalTrigger$.next();
}
}