Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(store-sync,store-indexer): sync from getLogs indexer endpoint #1973

Merged
merged 21 commits into from
Dec 1, 2023
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
16 changes: 16 additions & 0 deletions .changeset/angry-peas-heal.md
@@ -0,0 +1,16 @@
---
"@latticexyz/store-sync": minor
---

Refactored how we fetch snapshots from an indexer, preferring the new `getLogs` endpoint and falling back to the previous `findAll` if it isn't available. This refactor also prepares for an easier entry point for adding client caching of snapshots.

The `initialState` option for various sync methods (`syncToPostgres`, `syncToRecs`, etc.) is now deprecated in favor of `initialBlockLogs`. For now, we'll automatically convert `initialState` into `initialBlockLogs`, but if you want to update your code, you can do:

```ts
import { tablesWithRecordsToLogs } from "@latticexyz/store-sync";

const initialBlockLogs = {
blockNumber: initialState.blockNumber,
logs: tablesWithRecordsToLogs(initialState.tables),
};
```
5 changes: 5 additions & 0 deletions .changeset/wet-crabs-punch.md
@@ -0,0 +1,5 @@
---
"@latticexyz/common": minor
---

Updated `chunk` types to use readonly arrays
5 changes: 5 additions & 0 deletions .changeset/wicked-donuts-cheat.md
@@ -0,0 +1,5 @@
---
"@latticexyz/store-indexer": minor
---

Added `getLogs` query support to sqlite indexer
2 changes: 1 addition & 1 deletion e2e/packages/sync-test/indexerSync.test.ts
Expand Up @@ -58,7 +58,7 @@ describe("Sync from indexer", async () => {
await waitForInitialSync(page);

expect(asyncErrorHandler.getErrors()).toHaveLength(1);
expect(asyncErrorHandler.getErrors()[0]).toContain("error fetching initial state from indexer");
expect(asyncErrorHandler.getErrors()[0]).toContain("error getting snapshot");
});

describe.each([["sqlite"], ["postgres"]] as const)("%s indexer", (indexerType) => {
Expand Down
2 changes: 1 addition & 1 deletion packages/common/src/utils/chunk.ts
@@ -1,4 +1,4 @@
export function* chunk<T>(arr: T[], n: number): Generator<T[], void> {
export function* chunk<T>(arr: readonly T[], n: number): Generator<readonly T[], void> {
for (let i = 0; i < arr.length; i += n) {
yield arr.slice(i, i + n);
}
Expand Down
7 changes: 6 additions & 1 deletion packages/store-indexer/src/postgres/getLogs.ts
Expand Up @@ -54,7 +54,12 @@ export async function getLogs(
.select()
.from(tables.recordsTable)
.where(or(...conditions))
.orderBy(asc(tables.recordsTable.lastUpdatedBlockNumber));
.orderBy(
asc(tables.recordsTable.lastUpdatedBlockNumber),
// TODO: add logIndex and use that to sort instead of address/tableId? (https://github.com/latticexyz/mud/issues/1979)
asc(tables.recordsTable.address),
asc(tables.recordsTable.tableId)
);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for context, why do we have to order the result here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it was an attempt to fix the e2e tests having inconsistent results between rpc and indexer as far how data is populated, but ended up not being enough

I fixed the e2e test here and need to bring this commit over: 74611df

ultimately we lose some fidelity/ordering when hydrating from the indexer because everything is done at the block level, so records pulled from the DB within the same block may not be exactly the same order as records created from a list of logs from the RPC (mostly due to a lack of logIndex)

this isn't really an issue practically speaking, but felt like it would be nice to have some consistency between rpc and indexer in terms of data returned and its ordering

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we still need the sorting now? unclear if it affects performance at all

Copy link
Member Author

@holic holic Dec 1, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should still prob order by block number, but maybe not others

I think these will still take advantage of the index, but will remove for now


const blockNumber = records.reduce(
(max, record) => bigIntMax(max, record.lastUpdatedBlockNumber ?? 0n),
Expand Down
56 changes: 7 additions & 49 deletions packages/store-indexer/src/sqlite/createQueryAdapter.ts
@@ -1,10 +1,7 @@
import { eq } from "drizzle-orm";
import { BaseSQLiteDatabase } from "drizzle-orm/sqlite-core";
import { buildTable, chainState, getTables } from "@latticexyz/store-sync/sqlite";
import { QueryAdapter } from "@latticexyz/store-sync/trpc-indexer";
import { debug } from "../debug";
import { getAddress } from "viem";
import { decodeDynamicField } from "@latticexyz/protocol-parser";
import { getTablesWithRecords } from "./getTablesWithRecords";
import { tablesWithRecordsToLogs } from "@latticexyz/store-sync";

/**
* Creates a storage adapter for the tRPC server/client to query data from SQLite.
Expand All @@ -15,51 +12,12 @@ import { decodeDynamicField } from "@latticexyz/protocol-parser";
export async function createQueryAdapter(database: BaseSQLiteDatabase<"sync", any>): Promise<QueryAdapter> {
const adapter: QueryAdapter = {
async getLogs(opts) {
// TODO
throw new Error("Not implemented");
const { blockNumber, tables } = getTablesWithRecords(database, opts);
const logs = tablesWithRecordsToLogs(tables);
return { blockNumber: blockNumber ?? 0n, logs };
},
async findAll({ chainId, address, filters = [] }) {
// If _any_ filter has a table ID, this will filter down all data to just those tables. Which mean we can't yet mix table filters with key-only filters.
// TODO: improve this so we can express this in the query (need to be able to query data across tables more easily)
const tableIds = Array.from(new Set(filters.map((filter) => filter.tableId)));
const tables = getTables(database)
.filter((table) => address == null || getAddress(address) === getAddress(table.address))
.filter((table) => !tableIds.length || tableIds.includes(table.tableId));

const tablesWithRecords = tables.map((table) => {
const sqliteTable = buildTable(table);
const records = database.select().from(sqliteTable).where(eq(sqliteTable.__isDeleted, false)).all();
const filteredRecords = !filters.length
? records
: records.filter((record) => {
const keyTuple = decodeDynamicField("bytes32[]", record.__key);
return filters.some(
(filter) =>
filter.tableId === table.tableId &&
(filter.key0 == null || filter.key0 === keyTuple[0]) &&
(filter.key1 == null || filter.key1 === keyTuple[1])
);
});
return {
...table,
records: filteredRecords.map((record) => ({
key: Object.fromEntries(Object.entries(table.keySchema).map(([name]) => [name, record[name]])),
value: Object.fromEntries(Object.entries(table.valueSchema).map(([name]) => [name, record[name]])),
})),
};
});

const metadata = database.select().from(chainState).where(eq(chainState.chainId, chainId)).all();
const { lastUpdatedBlockNumber } = metadata[0] ?? {};

const result = {
blockNumber: lastUpdatedBlockNumber ?? null,
tables: tablesWithRecords,
};

debug("findAll", chainId, address, result);

return result;
async findAll(opts) {
return getTablesWithRecords(database, opts);
},
};
return adapter;
Expand Down
72 changes: 72 additions & 0 deletions packages/store-indexer/src/sqlite/getTablesWithRecords.ts
@@ -0,0 +1,72 @@
import { asc, eq } from "drizzle-orm";
import { BaseSQLiteDatabase } from "drizzle-orm/sqlite-core";
import { buildTable, chainState, getTables } from "@latticexyz/store-sync/sqlite";
import { Hex, getAddress } from "viem";
import { decodeDynamicField } from "@latticexyz/protocol-parser";
import { SyncFilter, TableWithRecords } from "@latticexyz/store-sync";

// TODO: refactor sqlite and replace this with getLogs to match postgres (https://github.com/latticexyz/mud/issues/1970)

/**
* @deprecated
* */
export function getTablesWithRecords(
database: BaseSQLiteDatabase<"sync", any>,
{
chainId,
address,
filters = [],
}: {
readonly chainId: number;
readonly address?: Hex;
readonly filters?: readonly SyncFilter[];
}
): { blockNumber: bigint | null; tables: readonly TableWithRecords[] } {
const metadata = database
.select()
.from(chainState)
.where(eq(chainState.chainId, chainId))
.limit(1)
.all()
.find(() => true);

// If _any_ filter has a table ID, this will filter down all data to just those tables. Which mean we can't yet mix table filters with key-only filters.
// TODO: improve this so we can express this in the query (need to be able to query data across tables more easily)
const tableIds = Array.from(new Set(filters.map((filter) => filter.tableId)));
const tables = getTables(database)
.filter((table) => address == null || getAddress(address) === getAddress(table.address))
.filter((table) => !tableIds.length || tableIds.includes(table.tableId));

const tablesWithRecords = tables.map((table) => {
const sqliteTable = buildTable(table);
const records = database
.select()
.from(sqliteTable)
.where(eq(sqliteTable.__isDeleted, false))
.orderBy(asc(sqliteTable.__lastUpdatedBlockNumber))
.all();
const filteredRecords = !filters.length
? records
: records.filter((record) => {
const keyTuple = decodeDynamicField("bytes32[]", record.__key);
return filters.some(
(filter) =>
filter.tableId === table.tableId &&
(filter.key0 == null || filter.key0 === keyTuple[0]) &&
(filter.key1 == null || filter.key1 === keyTuple[1])
);
});
return {
...table,
records: filteredRecords.map((record) => ({
key: Object.fromEntries(Object.entries(table.keySchema).map(([name]) => [name, record[name]])),
value: Object.fromEntries(Object.entries(table.valueSchema).map(([name]) => [name, record[name]])),
})),
};
});

return {
blockNumber: metadata?.lastUpdatedBlockNumber ?? null,
tables: tablesWithRecords,
};
}
18 changes: 13 additions & 5 deletions packages/store-sync/src/common.ts
Expand Up @@ -34,7 +34,7 @@ export type Table = {
export type TableWithRecords = Table & { records: TableRecord[] };

export type StoreEventsLog = Log<bigint, number, false, StoreEventsAbiItem, true, StoreEventsAbi>;
export type BlockLogs = { blockNumber: StoreEventsLog["blockNumber"]; logs: StoreEventsLog[] };
export type BlockLogs = { blockNumber: StoreEventsLog["blockNumber"]; logs: readonly StoreEventsLog[] };

// only two keys for now, to reduce complexity of creating indexes on SQL tables
// TODO: make tableId optional to enable filtering just on keys (any table)
Expand Down Expand Up @@ -90,11 +90,19 @@ export type SyncOptions<TConfig extends StoreConfig = StoreConfig> = {
*/
indexerUrl?: string;
/**
* Optional initial state to hydrate from. Useful if you're hydrating from your own indexer or cache.
* Optional initial state to hydrate from. Useful if you're hydrating from an indexer or cache.
* @deprecated Use `initialLogs` option instead.
*/
initialState?: {
blockNumber: bigint | null;
tables: TableWithRecords[];
blockNumber: bigint;
tables: readonly TableWithRecords[];
};
/**
* Optional initial logs to hydrate from. Useful if you're hydrating from an indexer or cache.
*/
initialBlockLogs?: {
blockNumber: bigint;
logs: readonly StorageAdapterLog[];
};
};

Expand All @@ -108,7 +116,7 @@ export type SyncResult = {

// TODO: add optional, original log to this?
export type StorageAdapterLog = Partial<StoreEventsLog> & UnionPick<StoreEventsLog, "address" | "eventName" | "args">;
export type StorageAdapterBlock = { blockNumber: BlockLogs["blockNumber"]; logs: StorageAdapterLog[] };
export type StorageAdapterBlock = { blockNumber: BlockLogs["blockNumber"]; logs: readonly StorageAdapterLog[] };
export type StorageAdapter = (block: StorageAdapterBlock) => Promise<void>;

export const schemasTableId = storeTables.Tables.tableId;
Expand Down