Skip to content

Commit

Permalink
Add processMutation
Browse files Browse the repository at this point in the history
  • Loading branch information
aboodman committed Jan 23, 2022
1 parent 7b26978 commit b5f0741
Show file tree
Hide file tree
Showing 3 changed files with 201 additions and 12 deletions.
71 changes: 71 additions & 0 deletions src/process/process-mutation.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
import { EntryCache } from "../storage/entry-cache";
import { ReplicacheTransaction } from "../storage/replicache-transaction";
import { Storage } from "../storage/storage";
import { ClientMutation } from "../types/client-mutation";
import { getClientRecord, putClientRecord } from "../types/client-record";
import { putVersion, Version } from "../types/version";
import { LogContext } from "../util/logger";

// eslint-disable-next-line @typescript-eslint/no-explicit-any
export type Mutator = (tx: ReplicacheTransaction, args: any) => Promise<void>;
export type MutatorMap = Map<string, Mutator>;

// Runs a single mutation and updates storage accordingly.
// At exit:
// - storage will have been updated with effect of mutation
// - version key will have been updated if any change was made
// - client record of mutating client will have been updated
export async function processMutation(
lc: LogContext,
mutation: ClientMutation,
mutators: MutatorMap,
storage: Storage,
version: Version
): Promise<void> {
const t0 = Date.now();
try {
lc.debug?.(
"processing mutation",
JSON.stringify(mutation),
"version",
version
);
const { clientID } = mutation;
const cache = new EntryCache(storage);
const record = await getClientRecord(clientID, cache);
if (!record) {
lc.info?.("client not found");
throw new Error(`Client ${clientID} not found`);
}

const expectedMutationID = record.lastMutationID + 1;
if (mutation.id < expectedMutationID) {
lc.debug?.("skipping duplicate mutation", JSON.stringify(mutation));
return;
}

if (mutation.id > expectedMutationID) {
lc.info?.("skipping out of order mutation", JSON.stringify(mutation));
return;
}

const tx = new ReplicacheTransaction(cache, clientID, version);
try {
const mutator = mutators.get(mutation.name);
if (!mutator) {
lc.info?.("skipping unknown mutator", JSON.stringify(mutation));
} else {
await mutator(tx, mutation.args);
}
} catch (e) {
lc.info?.("skipping mutation because error", JSON.stringify(mutation), e);
}

record.lastMutationID = expectedMutationID;
await putClientRecord(clientID, record, cache);
await putVersion(version, cache);
await cache.flush();
} finally {
lc.debug?.(`processMutation took ${Date.now() - t0} ms`);
}
}
130 changes: 130 additions & 0 deletions test/process/process-mutation.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
import { WriteTransaction } from "replicache";
import { DurableStorage } from "../../src/storage/durable-storage";
import { ClientMutation } from "../../src/types/client-mutation";
import {
ClientRecord,
getClientRecord,
putClientRecord,
} from "../../src/types/client-record";
import { getUserValue } from "../../src/types/user-value";
import { getVersion } from "../../src/types/version";
import { clientMutation, clientRecord } from "../util/test-utils";
import {
MutatorMap,
processMutation,
} from "../../src/process/process-mutation";
import { LogContext } from "../../src/util/logger";

const { COUNTER } = getMiniflareBindings();
const id = COUNTER.newUniqueId();

test("processMutation", async () => {
type Case = {
name: string;
existingRecord?: ClientRecord;
mutation: ClientMutation;
expectedError?: string;
expectedRecord?: ClientRecord;
expectAppWrite: boolean;
expectVersionWrite: boolean;
};

const cases: Case[] = [
{
name: "clientID not found",
mutation: clientMutation("c1", 1),
expectedError: "Error: Client c1 not found",
expectAppWrite: false,
expectVersionWrite: false,
},
{
name: "duplicate mutation",
existingRecord: clientRecord(null, 1),
mutation: clientMutation("c1", 1),
expectedRecord: clientRecord(null, 1),
expectAppWrite: false,
expectVersionWrite: false,
},
{
name: "ooo mutation",
existingRecord: clientRecord(null, 1),
mutation: clientMutation("c1", 3),
expectedRecord: clientRecord(null, 1),
expectAppWrite: false,
expectVersionWrite: false,
},
{
name: "unknown mutator",
existingRecord: clientRecord(null, 1),
mutation: clientMutation("c1", 2, "unknown"),
expectedRecord: clientRecord(null, 2),
expectAppWrite: false,
expectVersionWrite: true,
},
{
name: "mutator throws",
existingRecord: clientRecord(null, 1),
mutation: clientMutation("c1", 2, "throws"),
expectedRecord: clientRecord(null, 2),
expectAppWrite: false,
expectVersionWrite: true,
},
{
name: "success",
existingRecord: clientRecord(null, 1),
mutation: clientMutation("c1", 2, "foo"),
expectedRecord: clientRecord(null, 2),
expectAppWrite: true,
expectVersionWrite: true,
},
];

const mutators: MutatorMap = new Map([
[
"foo",
async (tx: WriteTransaction) => {
await tx.put("foo", "bar");
},
],
[
"throws",
async () => {
throw new Error("bonk");
},
],
]);

const durable = await getMiniflareDurableObjectStorage(id);

for (const c of cases) {
const storage = new DurableStorage(durable);
const version = 2;
const { clientID } = c.mutation;

if (c.existingRecord) {
await putClientRecord(clientID, c.existingRecord, storage);
}

let err: string | undefined;
try {
await processMutation(
new LogContext("info"),
c.mutation,
mutators,
storage,
version
);
} catch (e) {
err = String(e);
}

expect(err).toEqual(c.expectedError);
expect(await getClientRecord(clientID, storage)).toEqual(c.expectedRecord);
expect(await getUserValue("foo", storage)).toEqual(
c.expectAppWrite ? { version, deleted: false, value: "bar" } : undefined
);

const expectedVersion = c.expectVersionWrite ? version : undefined;
expect(await getVersion(storage)).toEqual(expectedVersion);
}
});
12 changes: 0 additions & 12 deletions test/util/test-utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,8 @@ import { JSONType } from "../../src/protocol/json";
import { Mutation } from "../../src/protocol/push";
import { ClientMutation } from "../../src/types/client-mutation";
import { ClientID, ClientState, Socket } from "../../src/types/client-state";
import { RoomID, RoomMap, RoomState } from "../../src/types/room-state";
import { NullableVersion } from "../../src/types/version";

export function roomMap(...rooms: [RoomID, RoomState][]): RoomMap {
return new Map(rooms);
}

export function room(
id: RoomID,
...clients: [ClientID, ClientState][]
): [RoomID, RoomState] {
return [id, { clients: new Map(clients) }];
}

export function client(
id: ClientID,
socket: Socket = new Mocket(),
Expand Down

0 comments on commit b5f0741

Please sign in to comment.