Skip to content

Commit

Permalink
add purge method to datastore
Browse files Browse the repository at this point in the history
  • Loading branch information
Mårten Wikström committed Sep 3, 2018
1 parent da611e4 commit 8b97974
Show file tree
Hide file tree
Showing 6 changed files with 191 additions and 0 deletions.
2 changes: 2 additions & 0 deletions src/api/datastore.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { ICommandData } from "./command-data";
import { ICommandInput } from "./command-input";
import { JsonValue } from "./json-value";
import { IPurgeOptions } from "./purge-options";
import { IQueryData } from "./query-data";
import { IQueryDescriptor } from "./query-descriptor";
import { IQueryListOptions } from "./query-list-options";
Expand All @@ -14,6 +15,7 @@ export interface IDatastore {
getCommandList(): Promise<ICommandData[]>;
getQueryList(options?: IQueryListOptions): Promise<IQueryData[]>;
getQueryResult(query: IQueryDescriptor): Promise<IQueryResult | undefined>;
purge(options?: IPurgeOptions): Promise<void>;
setCommandAccepted(key: number, commit: string): Promise<boolean>;
setCommandRejected(key: number): Promise<boolean>;
setQueryResult(query: IQueryDescriptor, commit: string, data: JsonValue): Promise<string | undefined>;
Expand Down
8 changes: 8 additions & 0 deletions src/api/purge-options.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
import { IQueryDescriptor } from "./query-descriptor";

/** @public */
export interface IPurgeOptions {
activeQueries?: IQueryDescriptor[];
commandRetentionPeriod?: number;
queryRetentionPeriod?: number;
}
105 changes: 105 additions & 0 deletions src/datastore/purge.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
import "../test-helpers/setup-fake-indexeddb";
import "../test-helpers/setup-text-encoding";
import "../test-helpers/setup-webcrypto";

import { createJsonCrypto } from "../api/create-json-crypto";
import { IDatastore } from "../api/datastore";
import { openDatastore } from "../api/open-datastore";
import { IQueryDescriptor } from "../api/query-descriptor";

describe("purge", () => {
let store: IDatastore;
let now: Date;

beforeEach(async () => {
const name = `test-${Math.floor(Math.random() * 9999999)}`;
const crypto = await createJsonCrypto();
now = new Date();
store = await openDatastore({ name, crypto, now: () => now });
});

afterEach(() => store.close());

it("drops old queries", async () => {
const query: IQueryDescriptor = { type: "x" };
await store.setQueryResult(query, "a", null);
expect((await store.getQueryList()).length).toBe(1);
now = new Date(now.getTime() + 60 * 60 * 1000); // one hour later
await store.purge();
expect((await store.getQueryList()).length).toBe(1); // not dropped
now = new Date(now.getTime() + 1000); // but one more second later
await store.purge();
expect((await store.getQueryList()).length).toBe(0); // dropped!
});

it("drops old queries with custom retention period", async () => {
const query: IQueryDescriptor = { type: "x" };
await store.setQueryResult(query, "a", null);
expect((await store.getQueryList()).length).toBe(1);
now = new Date(now.getTime() + 1000);
await store.purge({ queryRetentionPeriod: 1000 });
expect((await store.getQueryList()).length).toBe(1); // not dropped
now = new Date(now.getTime() + 1);
await store.purge({ queryRetentionPeriod: 1000 });
expect((await store.getQueryList()).length).toBe(0); // dropped!
});

it("does not drop old and active queries", async () => {
const query: IQueryDescriptor = { type: "x" };
await store.setQueryResult(query, "a", null);
expect((await store.getQueryList()).length).toBe(1);
now = new Date(now.getTime() + 2 * 60 * 60 * 1000); // two hours later
await store.purge({ activeQueries: [ query ]});
expect((await store.getQueryList()).length).toBe(1); // not dropped (since it was active)
});

it("drops old rejected commands", async () => {
const cmd = await store.addCommand({ type: "x", target: "y" });
await store.setCommandRejected(cmd.key);
expect((await store.getCommandList()).length).toBe(1);
now = new Date(now.getTime() + 5 * 60 * 1000); // five minutes later
await store.purge();
expect((await store.getCommandList()).length).toBe(1); // not dropped
now = new Date(now.getTime() + 1000); // but one more second later
await store.purge();
expect((await store.getCommandList()).length).toBe(0); // dropped!
});

it("drops old rejected commands with custom retention period", async () => {
const cmd = await store.addCommand({ type: "x", target: "y" });
await store.setCommandRejected(cmd.key);
expect((await store.getCommandList()).length).toBe(1);
now = new Date(now.getTime() + 1000);
await store.purge({ commandRetentionPeriod: 1000 });
expect((await store.getCommandList()).length).toBe(1); // not dropped
now = new Date(now.getTime() + 1);
await store.purge({ commandRetentionPeriod: 1000 });
expect((await store.getCommandList()).length).toBe(0); // dropped!
});

it("does not drop pending commands", async () => {
const cmd = await store.addCommand({ type: "x", target: "y" });
expect((await store.getCommandList()).length).toBe(1);
now = new Date(now.getTime() + 60 * 60 * 1000); // one hour later later
await store.purge();
expect((await store.getCommandList()).length).toBe(1); // not dropped
await store.setCommandRejected(cmd.key);
now = new Date(now.getTime() + 6 * 60 * 1000); // six minutes later
await store.purge();
expect((await store.getCommandList()).length).toBe(0); // dropped!
});

it("drops synced accepted commands", async () => {
const query: IQueryDescriptor = { type: "x" };
const cmd = await store.addCommand({ type: "x", target: "y" });
await store.setCommandAccepted(cmd.key, "b");
expect((await store.getCommandList()).length).toBe(1); // not dropped
now = new Date(now.getTime() + 60 * 60 * 1000); // one hour later later
await store.setQueryResult(query, "a", null);
await store.purge();
expect((await store.getCommandList()).length).toBe(1); // not dropped
await store.updateQueryResult(query, { commitBefore: "a", commitAfter: "b" });
await store.purge();
expect((await store.getCommandList()).length).toBe(0); // dropped!
});
});
70 changes: 70 additions & 0 deletions src/datastore/purge.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
import { IJsonObject } from "../api/json-value";
import { IPurgeOptions } from "../api/purge-options";
import { computeJsonHash } from "../json/compute-json-hash";
import { assert } from "../utils/assert";
import { DEBUG } from "../utils/env";
import { uint8ArrayToBase64Url } from "../utils/uint8-array-to-base64-url";
import { DatastoreContext } from "./datastore-context";

/** @internal */
export async function purge(
context: DatastoreContext,
options: IPurgeOptions = {},
): Promise<void> {
// istanbul ignore else: debug assertion
if (DEBUG) {
assert(context instanceof DatastoreContext);
}

const {
db,
now,
} = context;

const {
activeQueries = [],
commandRetentionPeriod = 5 * 60 * 1000, // default is five minutes
queryRetentionPeriod = 60 * 60 * 1000, // default is one hour
} = options;

const currentTime = now().getTime();
const keepCommandsAfter = new Date(currentTime - commandRetentionPeriod);
const keepQueriesAfter = new Date(currentTime - queryRetentionPeriod);
const activeQueryKeys = new Set(await Promise.all(activeQueries.map(
async query => encodeKey(await computeJsonHash(query as any as IJsonObject)))));

await db.transaction(
"rw",
db.commands,
db.queries,
db.results,
async () => {
// Delete queries with a timestamp older than the specified retention period,
// but keep those that listed as "active".
const oldQueryKeys = await db.queries.where("timestamp").below(keepQueriesAfter).primaryKeys();
const queryKeysToDrop = oldQueryKeys.filter(key => !activeQueryKeys.has(encodeKey(key)));
await Promise.all([
db.queries.bulkDelete(queryKeysToDrop),
db.results.bulkDelete(queryKeysToDrop),
]);

// Delete commands with a timestamp older than the specified retention period,
// but keep all pending commands and all accepted commands with a commit
// version higher than the commit of the most out of sync query.
const mostOutOfSyncQuery = await db.queries.orderBy("commit").first();
const mostOutOfSyncCommit = mostOutOfSyncQuery ? mostOutOfSyncQuery.commit : "";
const oldCommands = await db.commands.where("timestamp").below(keepCommandsAfter).toArray();
const commandKeysToDrop = oldCommands.filter(cmd =>
cmd.resolved === true && // never drop pending commands
cmd.commit <= mostOutOfSyncCommit, // keep accepted commands with an unsynced commit
).map(cmd => cmd.key);
await db.commands.bulkDelete(commandKeysToDrop);
},
);
}

function encodeKey(
key: ArrayBuffer,
): string {
return uint8ArrayToBase64Url(new Uint8Array(key));
}
5 changes: 5 additions & 0 deletions src/datastore/unverified-open-datastore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import { DatastoreDB } from "./datastore-db";
import { getCommandList as rawGetCommandList } from "./get-command-list";
import { getQueryList as rawGetQueryList } from "./get-query-list";
import { getQueryResult as rawGetQueryResult } from "./get-query-result";
import { purge as rawPurge } from "./purge";
import { QueryDescriptorType } from "./query-descriptor-type";
import { QueryListOptionsType } from "./query-list-options-type";
import { setCommandResolved as rawSetCommandResolved } from "./set-command-resolved";
Expand Down Expand Up @@ -53,6 +54,9 @@ export async function unverifiedOpenDatastore(
query => verify("query descriptor", query, QueryDescriptorType),
);

// TODO: Add verification for purge options
const purge = bindFirst(rawPurge, context);

const setCommandResolved = bindFirst(rawSetCommandResolved, context);
const unverifiedSetCommandRejected: IDatastore["setCommandRejected"] = key => setCommandResolved(key, "");
const unverifiedSetCommandAccepted: IDatastore["setCommandAccepted"] = setCommandResolved;
Expand Down Expand Up @@ -93,6 +97,7 @@ export async function unverifiedOpenDatastore(
getCommandList,
getQueryList,
getQueryResult,
purge,
setCommandAccepted,
setCommandRejected,
setQueryResult,
Expand Down
1 change: 1 addition & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ export * from "./api/json-crypto";
export * from "./api/json-crypto-options";
export * from "./api/json-patch";
export * from "./api/json-value";
export * from "./api/purge-options";
export * from "./api/query-data";
export * from "./api/query-descriptor";
export * from "./api/query-result";
Expand Down

0 comments on commit 8b97974

Please sign in to comment.