Skip to content

Commit

Permalink
hydrate from indexer logs
Browse files Browse the repository at this point in the history
  • Loading branch information
holic committed Nov 29, 2023
1 parent 504e25d commit 26635c0
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 73 deletions.
2 changes: 1 addition & 1 deletion packages/common/src/utils/chunk.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
export function* chunk<T>(arr: T[], n: number): Generator<T[], void> {
export function* chunk<T>(arr: readonly T[], n: number): Generator<readonly T[], void> {
for (let i = 0; i < arr.length; i += n) {
yield arr.slice(i, i + n);
}
Expand Down
18 changes: 13 additions & 5 deletions packages/store-sync/src/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ export type Table = {
export type TableWithRecords = Table & { records: TableRecord[] };

export type StoreEventsLog = Log<bigint, number, false, StoreEventsAbiItem, true, StoreEventsAbi>;
export type BlockLogs = { blockNumber: StoreEventsLog["blockNumber"]; logs: StoreEventsLog[] };
export type BlockLogs = { blockNumber: StoreEventsLog["blockNumber"]; logs: readonly StoreEventsLog[] };

// only two keys for now, to reduce complexity of creating indexes on SQL tables
// TODO: make tableId optional to enable filtering just on keys (any table)
Expand Down Expand Up @@ -90,11 +90,19 @@ export type SyncOptions<TConfig extends StoreConfig = StoreConfig> = {
*/
indexerUrl?: string;
/**
* Optional initial state to hydrate from. Useful if you're hydrating from your own indexer or cache.
* Optional initial state to hydrate from. Useful if you're hydrating from an indexer or cache.
* @deprecated Use `initialLogs` option instead.
*/
initialState?: {
blockNumber: bigint | null;
tables: TableWithRecords[];
blockNumber: bigint;
tables: readonly TableWithRecords[];
};
/**
* Optional initial logs to hydrate from. Useful if you're hydrating from an indexer or cache.
*/
initialBlockLogs?: {
blockNumber: bigint;
logs: readonly StorageAdapterLog[];
};
};

Expand All @@ -108,7 +116,7 @@ export type SyncResult = {

// TODO: add optional, original log to this?
export type StorageAdapterLog = Partial<StoreEventsLog> & UnionPick<StoreEventsLog, "address" | "eventName" | "args">;
export type StorageAdapterBlock = { blockNumber: BlockLogs["blockNumber"]; logs: StorageAdapterLog[] };
export type StorageAdapterBlock = { blockNumber: BlockLogs["blockNumber"]; logs: readonly StorageAdapterLog[] };
export type StorageAdapter = (block: StorageAdapterBlock) => Promise<void>;

export const schemasTableId = storeTables.Tables.tableId;
Expand Down
132 changes: 65 additions & 67 deletions packages/store-sync/src/createStoreSync.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,13 @@
import { StoreConfig, storeEventsAbi } from "@latticexyz/store";
import { Hex, TransactionReceiptNotFoundError, encodeAbiParameters, parseAbiParameters } from "viem";
import { Hex, TransactionReceiptNotFoundError } from "viem";
import {
StorageAdapter,
StorageAdapterBlock,
StorageAdapterLog,
SyncFilter,
SyncOptions,
SyncResult,
TableWithRecords,
internalTableIds,
storeTables,
} from "./common";
import { createBlockStream, blockRangeToLogs, groupLogsByBlockNumber } from "@latticexyz/block-logs-stream";
import {
Expand All @@ -33,7 +31,7 @@ import {
import { debug as parentDebug } from "./debug";
import { createIndexerClient } from "./trpc-indexer";
import { SyncStep } from "./SyncStep";
import { chunk, isDefined } from "@latticexyz/common/utils";
import { bigIntMax, chunk, isDefined } from "@latticexyz/common/utils";
import { encodeKey, encodeValueArgs } from "@latticexyz/protocol-parser";
import { tableToLog } from "./tableToLog";

Expand Down Expand Up @@ -62,50 +60,70 @@ export async function createStoreSync<TConfig extends StoreConfig = StoreConfig>
startBlock: initialStartBlock = 0n,
maxBlockRange,
initialState,
initialBlockLogs,
indexerUrl,
}: CreateStoreSyncOptions<TConfig>): Promise<SyncResult> {
const filters: SyncFilter[] =
initialFilters.length || tableIds.length
? [...initialFilters, ...tableIds.map((tableId) => ({ tableId })), ...defaultFilters]
: [];
const initialState$ = defer(
async (): Promise<
| {
blockNumber: bigint | null;
tables: TableWithRecords[];
}
| undefined
> => {
if (initialState) return initialState;
if (!indexerUrl) return;

debug("fetching initial state from indexer", indexerUrl);
if (initialBlockLogs && initialState) {
throw new Error("Only one of initialBlockLogs or initialState should be provided.");
}

onProgress?.({
step: SyncStep.SNAPSHOT,
percentage: 0,
latestBlockNumber: 0n,
lastBlockNumberProcessed: 0n,
message: "Fetching snapshot from indexer",
});
const initialBlockLogs$ = defer(async (): Promise<StorageAdapterBlock | undefined> => {
if (initialBlockLogs) return initialBlockLogs;

const indexer = createIndexerClient({ url: indexerUrl });
const chainId = publicClient.chain?.id ?? (await publicClient.getChainId());
const result = await indexer.findAll.query({ chainId, address, filters });
// Backwards compatibility with previous indexer snapshot format
if (initialState) {
const logs: StorageAdapterLog[] = [
...initialState.tables.map(tableToLog),
...initialState.tables.flatMap((table) =>
table.records.map(
(record): StorageAdapterLog => ({
eventName: "Store_SetRecord",
address: table.address,
args: {
tableId: table.tableId,
keyTuple: encodeKey(table.keySchema, record.key),
...encodeValueArgs(table.valueSchema, record.value),
},
})
)
),
];
return { blockNumber: initialState.blockNumber, logs };
}

onProgress?.({
step: SyncStep.SNAPSHOT,
percentage: 100,
latestBlockNumber: 0n,
lastBlockNumberProcessed: 0n,
message: "Fetched snapshot from indexer",
});
if (!indexerUrl) return;

return result;
}
).pipe(
debug("fetching logs from indexer", indexerUrl);

onProgress?.({
step: SyncStep.SNAPSHOT,
percentage: 0,
latestBlockNumber: 0n,
lastBlockNumberProcessed: 0n,
message: "Fetching snapshot from indexer",
});

const indexer = createIndexerClient({ url: indexerUrl });
const chainId = publicClient.chain?.id ?? (await publicClient.getChainId());
const result = await indexer.getLogs.query({ chainId, address, filters });

onProgress?.({
step: SyncStep.SNAPSHOT,
percentage: 100,
latestBlockNumber: 0n,
lastBlockNumberProcessed: 0n,
message: "Fetched snapshot from indexer",
});

return result;
}).pipe(
catchError((error) => {
debug("error fetching initial state from indexer", error);
debug("error fetching logs from indexer", error);

onProgress?.({
step: SyncStep.SNAPSHOT,
Expand All @@ -120,19 +138,10 @@ export async function createStoreSync<TConfig extends StoreConfig = StoreConfig>
shareReplay(1)
);

const startBlock$ = initialState$.pipe(
map((initialState) => initialState?.blockNumber ?? initialStartBlock),
// TODO: if start block is still 0, find via deploy event
tap((startBlock) => debug("starting sync from block", startBlock))
);

const initialLogs$ = initialState$.pipe(
filter(
(initialState): initialState is { blockNumber: bigint; tables: TableWithRecords[] } =>
initialState != null && initialState.blockNumber != null && initialState.tables.length > 0
),
concatMap(async ({ blockNumber, tables }) => {
debug("hydrating from initial state to block", blockNumber);
const storedInitialBlockLogs$ = initialBlockLogs$.pipe(
filter(isDefined),
concatMap(async ({ blockNumber, logs }) => {
debug("hydrating", logs.length, "logs to block", blockNumber);

onProgress?.({
step: SyncStep.SNAPSHOT,
Expand All @@ -142,23 +151,6 @@ export async function createStoreSync<TConfig extends StoreConfig = StoreConfig>
message: "Hydrating from snapshot",
});

const logs: StorageAdapterLog[] = [
...tables.map(tableToLog),
...tables.flatMap((table) =>
table.records.map(
(record): StorageAdapterLog => ({
eventName: "Store_SetRecord",
address: table.address,
args: {
tableId: table.tableId,
keyTuple: encodeKey(table.keySchema, record.key),
...encodeValueArgs(table.valueSchema, record.value),
},
})
)
),
];

// Split snapshot operations into chunks so we can update the progress callback (and ultimately render visual progress for the user).
// This isn't ideal if we want to e.g. batch load these into a DB in a single DB tx, but we'll take it.
//
Expand Down Expand Up @@ -189,6 +181,12 @@ export async function createStoreSync<TConfig extends StoreConfig = StoreConfig>
shareReplay(1)
);

const startBlock$ = storedInitialBlockLogs$.pipe(
map((storedBlockLogs) => bigIntMax(storedBlockLogs.blockNumber, initialStartBlock)),
// TODO: if start block is still 0, find via deploy event
tap((startBlock) => debug("starting sync from block", startBlock))
);

const latestBlock$ = createBlockStream({ publicClient, blockTag: "latest" }).pipe(shareReplay(1));
const latestBlockNumber$ = latestBlock$.pipe(
map((block) => block.number),
Expand Down Expand Up @@ -231,7 +229,7 @@ export async function createStoreSync<TConfig extends StoreConfig = StoreConfig>

let lastBlockNumberProcessed: bigint | null = null;
const storedBlockLogs$ = concat(
initialLogs$,
storedInitialBlockLogs$,
blockLogs$.pipe(
concatMap(async (block) => {
await storageAdapter(block);
Expand Down

0 comments on commit 26635c0

Please sign in to comment.