-
Notifications
You must be signed in to change notification settings - Fork 36
/
heartbeat.ts
65 lines (61 loc) · 1.6 KB
/
heartbeat.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
import type {LogContext} from '@rocicorp/logger';
import type {ClientID} from '../sync/client-id';
import type * as dag from '../dag/mod';
import {
ClientMap,
ClientStateNotFoundError,
noUpdates,
updateClients,
} from './clients';
import {initBgIntervalProcess} from './bg-interval';
const HEARTBEAT_INTERVAL_MS = 60 * 1000;
export let latestHeartbeatUpdate: Promise<ClientMap> | undefined;
export function startHeartbeats(
clientID: ClientID,
dagStore: dag.Store,
onClientStateNotFound: () => void,
lc: LogContext,
signal: AbortSignal,
): void {
initBgIntervalProcess(
'Heartbeat',
async () => {
latestHeartbeatUpdate = writeHeartbeat(clientID, dagStore);
try {
return await latestHeartbeatUpdate;
} catch (e) {
if (e instanceof ClientStateNotFoundError) {
onClientStateNotFound();
return;
}
throw e;
}
},
HEARTBEAT_INTERVAL_MS,
lc,
signal,
);
}
export async function writeHeartbeat(
clientID: ClientID,
dagStore: dag.Store,
): Promise<ClientMap> {
const updatedClients = await updateClients(clients => {
const client = clients.get(clientID);
if (!client) {
return noUpdates;
}
return {
clients: new Map(clients).set(clientID, {
heartbeatTimestampMs: Date.now(),
headHash: client.headHash,
mutationID: client.mutationID,
lastServerAckdMutationID: client.lastServerAckdMutationID,
}),
};
}, dagStore);
if (updatedClients.get(clientID) === undefined) {
throw new ClientStateNotFoundError(clientID);
}
return updatedClients;
}