Skip to content

Commit

Permalink
feat(store-indexer): clean database if outdated (#1984)
Browse files Browse the repository at this point in the history
  • Loading branch information
holic committed Dec 1, 2023
1 parent 5df1f31 commit e48fb3b
Show file tree
Hide file tree
Showing 16 changed files with 113 additions and 53 deletions.
5 changes: 5 additions & 0 deletions .changeset/fair-bulldogs-decide.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@latticexyz/store-sync": major
---

Renamed singleton `chain` table to `config` table for clarity.
5 changes: 5 additions & 0 deletions .changeset/tasty-cows-pay.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@latticexyz/store-indexer": minor
---

When the Postgres indexer starts up, it will now attempt to detect if the database is outdated and, if so, cleans up all MUD-related schemas and tables before proceeding.
11 changes: 8 additions & 3 deletions packages/store-indexer/bin/postgres-indexer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import { isDefined } from "@latticexyz/common/utils";
import { combineLatest, filter, first } from "rxjs";
import { drizzle } from "drizzle-orm/postgres-js";
import postgres from "postgres";
import { createStorageAdapter } from "@latticexyz/store-sync/postgres";
import { cleanDatabase, createStorageAdapter, shouldCleanDatabase } from "@latticexyz/store-sync/postgres";
import { createStoreSync } from "@latticexyz/store-sync";
import { indexerEnvSchema, parseEnv } from "./parseEnv";

Expand Down Expand Up @@ -37,6 +37,11 @@ const publicClient = createPublicClient({
const chainId = await publicClient.getChainId();
const database = drizzle(postgres(env.DATABASE_URL));

if (await shouldCleanDatabase(database, chainId)) {
console.log("outdated database detected, clearing data to start fresh");
cleanDatabase(database);
}

const { storageAdapter, tables } = await createStorageAdapter({ database, publicClient });

let startBlock = env.START_BLOCK;
Expand All @@ -46,8 +51,8 @@ let startBlock = env.START_BLOCK;
try {
const chainState = await database
.select()
.from(tables.chainTable)
.where(eq(tables.chainTable.chainId, chainId))
.from(tables.configTable)
.where(eq(tables.configTable.chainId, chainId))
.limit(1)
.execute()
// Get the first record in a way that returns a possible `undefined`
Expand Down
4 changes: 2 additions & 2 deletions packages/store-indexer/src/postgres/getLogs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ export async function getLogs(
// TODO: move the block number query into the records query for atomicity so we don't have to merge them here
const chainState = await database
.select()
.from(tables.chainTable)
.where(eq(tables.chainTable.chainId, chainId))
.from(tables.configTable)
.where(eq(tables.configTable.chainId, chainId))
.limit(1)
.execute()
// Get the first record in a way that returns a possible `undefined`
Expand Down
27 changes: 0 additions & 27 deletions packages/store-sync/src/postgres-decoded/cleanDatabase.ts

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,12 @@ describe("createStorageAdapter", async () => {
await storageAdapter.storageAdapter(block);
}

expect(await db.select().from(storageAdapter.tables.chainTable)).toMatchInlineSnapshot(`
expect(await db.select().from(storageAdapter.tables.configTable)).toMatchInlineSnapshot(`
[
{
"chainId": 31337,
"lastUpdatedBlockNumber": 12n,
"version": "0.0.1",
},
]
`);
Expand Down
1 change: 0 additions & 1 deletion packages/store-sync/src/postgres-decoded/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
export * from "./buildTable";
export * from "./cleanDatabase";
export * from "./createStorageAdapter";
export * from "./getTables";
export * from "./syncToPostgres";
48 changes: 38 additions & 10 deletions packages/store-sync/src/postgres/cleanDatabase.ts
Original file line number Diff line number Diff line change
@@ -1,18 +1,46 @@
import { PgDatabase, getTableConfig } from "drizzle-orm/pg-core";
import { tables } from "./tables";
import { debug } from "./debug";
import { PgDatabase, pgSchema, varchar } from "drizzle-orm/pg-core";
import { debug as parentDebug } from "./debug";
import { sql } from "drizzle-orm";
import { pgDialect } from "./pgDialect";
import { isDefined, unique } from "@latticexyz/common/utils";
import { isNotNull } from "@latticexyz/common/utils";

// This intentionally just cleans up known schemas/tables/rows. We could drop the database but that's scary.
const debug = parentDebug.extend("cleanDatabase");

// Postgres internal table
const schemata = pgSchema("information_schema").table("schemata", {
schemaName: varchar("schema_name", { length: 64 }),
});

function isMudSchemaName(schemaName: string): boolean {
// address-prefixed schemas like {address}__{namespace} used by decoded postgres tables
// optional prefix for schemas created in tests
if (/(^|__)0x[0-9a-f]{40}__/i.test(schemaName)) {
return true;
}
// schema for internal tables
// optional prefix for schemas created in tests
if (/(^|__)mud$/.test(schemaName)) {
return true;
}
// old schema for internal tables
// TODO: remove after a while
if (/__mud_internal$/.test(schemaName)) {
return true;
}
return false;
}

/**
* VERY DESTRUCTIVE! Finds and drops all MUD indexer related schemas and tables.
* @internal
*/
export async function cleanDatabase(db: PgDatabase<any>): Promise<void> {
const schemaNames = unique(
Object.values(tables)
.map((table) => getTableConfig(table).schema)
.filter(isDefined)
);
const schemaNames = (await db.select({ schemaName: schemata.schemaName }).from(schemata).execute())
.map((row) => row.schemaName)
.filter(isNotNull)
.filter(isMudSchemaName);

debug(`dropping ${schemaNames.length} schemas`);

await db.transaction(async (tx) => {
for (const schemaName of schemaNames) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,12 @@ describe("createStorageAdapter", async () => {
await storageAdapter.storageAdapter(block);
}

expect(await db.select().from(storageAdapter.tables.chainTable)).toMatchInlineSnapshot(`
expect(await db.select().from(storageAdapter.tables.configTable)).toMatchInlineSnapshot(`
[
{
"chainId": 31337,
"lastUpdatedBlockNumber": 12n,
"version": "0.0.1",
},
]
`);
Expand Down
6 changes: 4 additions & 2 deletions packages/store-sync/src/postgres/createStorageAdapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import { tables } from "./tables";
import { spliceHex } from "@latticexyz/common";
import { setupTables } from "./setupTables";
import { StorageAdapter, StorageAdapterBlock } from "../common";
import { version } from "./version";

// Currently assumes one DB per chain ID

Expand Down Expand Up @@ -195,13 +196,14 @@ export async function createStorageAdapter<TConfig extends StoreConfig = StoreCo
}

await tx
.insert(tables.chainTable)
.insert(tables.configTable)
.values({
version,
chainId,
lastUpdatedBlockNumber: blockNumber,
})
.onConflictDoUpdate({
target: [tables.chainTable.chainId],
target: [tables.configTable.chainId],
set: {
lastUpdatedBlockNumber: blockNumber,
},
Expand Down
1 change: 1 addition & 0 deletions packages/store-sync/src/postgres/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,6 @@ export * from "./cleanDatabase";
export * from "./columnTypes";
export * from "./createStorageAdapter";
export * from "./setupTables";
export * from "./shouldCleanDatabase";
export * from "./syncToPostgres";
export * from "./tables";
4 changes: 2 additions & 2 deletions packages/store-sync/src/postgres/setupTables.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ describe("setupTables", async () => {

describe("before running", () => {
it("should be missing schemas", async () => {
await expect(db.select().from(tables.chainTable)).rejects.toThrow(/relation "\w+mud.chain" does not exist/);
await expect(db.select().from(tables.configTable)).rejects.toThrow(/relation "\w+mud.config" does not exist/);
await expect(db.select().from(tables.recordsTable)).rejects.toThrow(/relation "\w+mud.records" does not exist/);
});
});
Expand All @@ -29,7 +29,7 @@ describe("setupTables", async () => {
});

it("should have schemas", async () => {
expect(await db.select().from(tables.chainTable)).toMatchInlineSnapshot("[]");
expect(await db.select().from(tables.configTable)).toMatchInlineSnapshot("[]");
expect(await db.select().from(tables.recordsTable)).toMatchInlineSnapshot("[]");
});
});
Expand Down
4 changes: 3 additions & 1 deletion packages/store-sync/src/postgres/setupTables.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ import { AnyPgColumn, PgTableWithColumns, PgDatabase, getTableConfig } from "dri
import { getTableColumns, sql } from "drizzle-orm";
import { ColumnDataType } from "kysely";
import { isDefined, unique } from "@latticexyz/common/utils";
import { debug } from "./debug";
import { debug as parentDebug } from "./debug";
import { pgDialect } from "./pgDialect";

const debug = parentDebug.extend("setupTables");

export async function setupTables(
db: PgDatabase<any>,
tables: PgTableWithColumns<any>[]
Expand Down
36 changes: 36 additions & 0 deletions packages/store-sync/src/postgres/shouldCleanDatabase.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import { PgDatabase } from "drizzle-orm/pg-core";
import { tables } from "./tables";
import { debug as parentDebug } from "./debug";
import { version as expectedVersion } from "./version";

const debug = parentDebug.extend("shouldCleanDatabase");

/**
* @internal
*/
export async function shouldCleanDatabase(db: PgDatabase<any>, expectedChainId: number): Promise<boolean> {
try {
const config = (await db.select().from(tables.configTable).limit(1).execute()).find(() => true);

if (!config) {
debug("no record found in config table");
return true;
}

if (config.version !== expectedVersion) {
debug(`configured version (${config.version}) did not match expected version (${expectedVersion})`);
return true;
}

if (config.chainId !== expectedChainId) {
debug(`configured chain ID (${config.chainId}) did not match expected chain ID (${expectedChainId})`);
return true;
}

return false;
} catch (error) {
console.error(error);
debug("error while querying config table");
return true;
}
}
7 changes: 4 additions & 3 deletions packages/store-sync/src/postgres/tables.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { boolean, index, pgSchema, primaryKey } from "drizzle-orm/pg-core";
import { boolean, index, pgSchema, primaryKey, varchar } from "drizzle-orm/pg-core";
import { transformSchemaName } from "./transformSchemaName";
import { asAddress, asBigInt, asHex, asNumber } from "./columnTypes";

Expand All @@ -7,7 +7,8 @@ const schemaName = transformSchemaName("mud");
/**
* Singleton table for the state of the chain we're indexing
*/
const chainTable = pgSchema(schemaName).table("chain", {
const configTable = pgSchema(schemaName).table("config", {
version: varchar("version").notNull(),
chainId: asNumber("chain_id", "bigint").notNull().primaryKey(),
lastUpdatedBlockNumber: asBigInt("last_updated_block_number", "numeric").notNull(),
});
Expand Down Expand Up @@ -39,6 +40,6 @@ const recordsTable = pgSchema(schemaName).table(
);

export const tables = {
chainTable,
configTable,
recordsTable,
};
1 change: 1 addition & 0 deletions packages/store-sync/src/postgres/version.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export const version = "0.0.1";

0 comments on commit e48fb3b

Please sign in to comment.