-
Notifications
You must be signed in to change notification settings - Fork 0
/
createSync.ts
88 lines (77 loc) · 3.12 KB
/
createSync.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
import { Read, Sync } from "@primodiumxyz/sync-stack";
import { StorageAdapterLog } from "@/adapter";
import { ContractTableDefs } from "@/lib";
import { CreateSyncOptions, CreateSyncResult } from "@test/utils/sync/types";
import { hydrateFromRpc, subToRpc } from "@test/utils/sync/handleSync";
import { SyncStep } from "@test/utils/sync/tables";
export const createSync = <tableDefs extends ContractTableDefs>({
contractTables,
localTables,
tableDefs,
storageAdapter,
triggerUpdateStream,
networkConfig,
onSync,
}: CreateSyncOptions<tableDefs>): CreateSyncResult => {
const { complete: onComplete } = onSync ?? {};
const { publicClient, initialBlockNumber, worldAddress } = networkConfig;
const logFilters = Object.values(tableDefs).map((table) => ({ tableId: table.tableId as string }));
const tables = { ...contractTables, ...localTables };
const unsubs: (() => void)[] = [];
const startSync = async () => {
// Create RPC sync from the initial block up to the latest block
const latestBlockNumber = await publicClient.getBlockNumber();
// Sync all blocks from the initial block to the latest block
const historicalRpcSync = Sync.withCustom({
reader: Read.fromRPC.filter({
address: worldAddress,
publicClient,
filter: { address: worldAddress, filters: logFilters },
fromBlock: initialBlockNumber,
toBlock: latestBlockNumber,
}),
writer: storageAdapter,
});
const pendingLogs: StorageAdapterLog[] = [];
const storePendingLogs = (log: StorageAdapterLog) => pendingLogs.push(log);
const processPendingLogs = () =>
pendingLogs.forEach((log, index) => {
storageAdapter(log);
tables.SyncStatus.update({
message: "Processing pending logs",
progress: index / pendingLogs.length,
lastBlockNumberProcessed: log.blockNumber ?? BigInt(0),
});
});
// Sync incoming blocks
const liveRpcSync = Sync.withCustom({
reader: Read.fromRPC.subscribe({
address: worldAddress,
publicClient,
logFilter: logFilters,
}),
// During historical sync, store all incoming blocks to process them after it's complete
// Then, process logs directly
writer: (logs) =>
tables.SyncStatus.get()?.step === SyncStep.Live ? storageAdapter(logs) : storePendingLogs(logs),
});
// start live sync
subToRpc(tables, liveRpcSync);
// start historical sync
hydrateFromRpc(tables, historicalRpcSync, {
...onSync,
// Once historical sync is complete, process blocks that went in during historical sync, trigger the update stream
// and set SyncStatus.step to SyncStep.Live so it starts directly processing blocks
complete: (blockNumber) => {
onComplete?.(blockNumber);
// process blocks that went in during historical sync
processPendingLogs();
// now we're truly up to date
triggerUpdateStream();
},
});
unsubs.push(historicalRpcSync.unsubscribe);
unsubs.push(liveRpcSync.unsubscribe);
};
return { start: startSync, unsubscribe: () => unsubs.forEach((unsub) => unsub()) };
};