Skip to content

Commit

Permalink
Merge pull request #21 from rocicorp/aa/schema-migration
Browse files Browse the repository at this point in the history
Implement schema migration from old schema to dd31 one
  • Loading branch information
aboodman committed May 29, 2023
2 parents 9da3582 + 2b33b6b commit 8fae013
Show file tree
Hide file tree
Showing 5 changed files with 129 additions and 29 deletions.
4 changes: 2 additions & 2 deletions src/backend/pg.ts
Expand Up @@ -50,7 +50,7 @@ export async function withExecutor<R>(f: (executor: Executor) => R) {
return withExecutorAndPool(f, p);
}

async function withExecutorAndPool<R>(
export async function withExecutorAndPool<R>(
f: (executor: Executor) => R,
p: Pool
): Promise<R> {
Expand Down Expand Up @@ -90,7 +90,7 @@ export async function transact<R>(body: TransactionBodyFn<R>) {
});
}

async function transactWithExecutor<R>(
export async function transactWithExecutor<R>(
executor: Executor,
body: TransactionBodyFn<R>
) {
Expand Down
34 changes: 26 additions & 8 deletions src/backend/pull.ts
Expand Up @@ -5,32 +5,50 @@ import {
getLastMutationIDsSince,
} from "./data.js";
import { z } from "zod";
import type { ClientID, PatchOperation, ReadonlyJSONValue } from "replicache";
import type { ClientID, PatchOperation } from "replicache";
import type Express from "express";

const pullRequest = z.object({
const pullRequestV0 = z.object({
pullVersion: z.literal(0),
});

const pullRequestV1 = z.object({
pullVersion: z.literal(1),
profileID: z.string(),
clientGroupID: z.string(),
cookie: z.union([z.number(), z.null()]),
schemaVersion: z.string(),
});

// TODO: not exported from replicache
type PullResponseOK = {
cookie: ReadonlyJSONValue;
const pullRequest = z.union([pullRequestV0, pullRequestV1]);

// Causes the client to reload, getting a newer version of Replicache that can talk to this server.
export type PullResponseV0 = {
error: "ClientStateNotFound";
};

export type PullResponseV1 = {
cookie: number;
lastMutationIDChanges: Record<ClientID, number>;
patch: PatchOperation[];
};

export async function pull(
spaceID: string,
requestBody: Express.Request
): Promise<PullResponseOK> {
): Promise<PullResponseV0 | PullResponseV1> {
console.log(`Processing pull`, JSON.stringify(requestBody, null, ""));

const pull = pullRequest.parse(requestBody);
const requestCookie = pull.cookie;
const { pullVersion } = pull;
if (pullVersion === 0) {
const resp: PullResponseV0 = {
error: "ClientStateNotFound",
};
return resp;
}

const { cookie: requestCookie } = pull;

console.log("spaceID", spaceID);

Expand All @@ -55,7 +73,7 @@ export async function pull(
throw new Error(`Unknown space ${spaceID}`);
}

const resp: PullResponseOK = {
const resp: PullResponseV1 = {
lastMutationIDChanges,
cookie: responseCookie,
patch: [],
Expand Down
33 changes: 21 additions & 12 deletions src/backend/push.ts
Expand Up @@ -18,13 +18,19 @@ const mutationSchema = z.object({
args: z.any(),
});

const pushRequestSchema = z.object({
const pushRequestV0Schema = z.object({
pushVersion: z.literal(0),
});

const pushRequestV1Schema = z.object({
pushVersion: z.literal(1),
profileID: z.string(),
clientGroupID: z.string(),
mutations: z.array(mutationSchema),
});

const pushRequestSchema = z.union([pushRequestV0Schema, pushRequestV1Schema]);

export type Error = "SpaceNotFound";

export function parseIfDebug<T extends ReadonlyJSONValue>(
Expand All @@ -49,6 +55,14 @@ export async function push<M extends MutatorDefs>(
pushRequestSchema,
requestBody
);
const { pushVersion } = push;
if (pushVersion === 0) {
throw new Error(
"Unsupported push version: 0 - next pull should update client"
);
}

const { clientGroupID } = push;

const t0 = Date.now();
await transact(async (executor) => {
Expand All @@ -70,11 +84,11 @@ export async function push<M extends MutatorDefs>(

for (let i = 0; i < push.mutations.length; i++) {
const mutation = push.mutations[i];
const lastMutationID = lastMutationIDs[mutation.clientID];
const { clientID } = mutation;
const lastMutationID = lastMutationIDs[clientID];
if (lastMutationID === undefined) {
throw new Error(
"invalid state - lastMutationID not found for client: " +
mutation.clientID
"invalid state - lastMutationID not found for client: " + clientID
);
}
const expectedMutationID = lastMutationID + 1;
Expand All @@ -93,7 +107,7 @@ export async function push<M extends MutatorDefs>(
console.log("Processing mutation:", JSON.stringify(mutation, null, ""));

const t1 = Date.now();
tx.clientID = mutation.clientID;
tx.clientID = clientID;
tx.mutationID = mutation.id;

// eslint-disable-next-line @typescript-eslint/no-explicit-any
Expand All @@ -110,17 +124,12 @@ export async function push<M extends MutatorDefs>(
);
}

lastMutationIDs[mutation.clientID] = expectedMutationID;
lastMutationIDs[clientID] = expectedMutationID;
console.log("Processed mutation in", Date.now() - t1);
}

await Promise.all([
setLastMutationIDs(
executor,
push.clientGroupID,
lastMutationIDs,
nextVersion
),
setLastMutationIDs(executor, clientGroupID, lastMutationIDs, nextVersion),
setCookie(executor, spaceID, nextVersion),
tx.flush(),
]);
Expand Down
44 changes: 44 additions & 0 deletions src/backend/schema.test.ts
@@ -0,0 +1,44 @@
import { expect } from "chai";
import { test } from "mocha";
import { createSpace } from "./data.js";
import { transactWithExecutor, withExecutorAndPool } from "./pg.js";
import { getDBConfig } from "./pgconfig/pgconfig.js";
import { createSchemaVersion1, createSchemaVersion2 } from "./schema.js";

test("v2Migration", async () => {
const dbConfig = getDBConfig();
const pool = dbConfig.initPool();

// the pool will emit an error on behalf of any idle clients
// it contains if a backend error or network partition happens
pool.on("error", (err) => {
console.error("Unexpected error on idle client", err);
process.exit(-1);
});
pool.on("connect", async (client) => {
// eslint-disable-next-line @typescript-eslint/no-floating-promises
client.query(
"SET SESSION CHARACTERISTICS AS TRANSACTION ISOLATION LEVEL SERIALIZABLE"
);
});

await withExecutorAndPool(async (executor) => {
await transactWithExecutor(executor, async (executor) => {
await createSchemaVersion1(executor);

await createSpace(executor, "s1");
await executor(
`insert into replicache_client (id, lastmutationid, lastmodified) values ('c1', 7, now())`
);

await createSchemaVersion2(executor);
const result = await executor("select * from replicache_client");
expect(result.rowCount).eq(1);
expect(result.rows[0].id).eq("c1");
expect(result.rows[0].lastmutationid).eq(7);
expect(result.rows[0].lastmodified).not.eq(undefined);
expect(result.rows[0].clientgroupid).eq("c1");
expect(result.rows[0].version).eq(0);
});
}, pool);
});
43 changes: 36 additions & 7 deletions src/backend/schema.ts
Expand Up @@ -4,17 +4,24 @@ import type { Executor } from "./pg.js";
export async function createDatabase(executor: Executor, dbConfig: PGConfig) {
console.log("creating database");
const schemaVersion = await dbConfig.getSchemaVersion(executor);
if (schemaVersion < 0 || schemaVersion > 1) {
const migrations = [createSchemaVersion1, createSchemaVersion2];
if (schemaVersion < 0 || schemaVersion > migrations.length) {
throw new Error("Unexpected schema version: " + schemaVersion);
}
if (schemaVersion === 0) {
await createSchemaVersion1(executor);

for (let i = schemaVersion; i < migrations.length; i++) {
console.log("Running migration for schemaVersion", i);
await migrations[i](executor);
}
}

export async function createSchemaVersion1(executor: Executor) {
await executor("create table replicache_meta (key text primary key, value json)");
await executor("insert into replicache_meta (key, value) values ('schemaVersion', '1')");
await executor(
"create table replicache_meta (key text primary key, value json)"
);
await executor(
"insert into replicache_meta (key, value) values ('schemaVersion', '1')"
);

await executor(`create table replicache_space (
id text primary key not null,
Expand All @@ -24,9 +31,7 @@ export async function createSchemaVersion1(executor: Executor) {

await executor(`create table replicache_client (
id text primary key not null,
clientgroupid text not null,
lastmutationid integer not null,
version integer not null,
lastmodified timestamp(6) not null
)`);

Expand All @@ -44,3 +49,27 @@ export async function createSchemaVersion1(executor: Executor) {
await executor(`create index on replicache_entry (deleted)`);
await executor(`create index on replicache_entry (version)`);
}

export async function createSchemaVersion2(executor: Executor) {
// Add the clientgroupid column. Existing clients will use their id as the clientgroupid.
await executor(`alter table replicache_client add column clientgroupid text`);
await executor(`update replicache_client set clientgroupid=id`);
await executor(
`alter table replicache_client alter column clientgroupid set not null`
);

// Add the version column. Existing clients will use zero as their version.
await executor(`alter table replicache_client add column version integer`);
await executor(`update replicache_client set version=0`);
await executor(
`alter table replicache_client alter column version set not null`
);

// Add an index to find clients by clientgroupid and version quickly.
await executor("create index on replicache_client (clientgroupid,version)");

// Update schema version.
await executor(
"update replicache_meta set value = '2' where key = 'schemaVersion'"
);
}

0 comments on commit 8fae013

Please sign in to comment.