-
Notifications
You must be signed in to change notification settings - Fork 71
/
postgres-store.ts
152 lines (139 loc) · 5.01 KB
/
postgres-store.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
import postgres, { PostgresError, PostgresType } from 'postgres';
import { Logger } from "../..";
const log = new Logger("PostgresStore");
export async function v0Schema(sql: postgres.Sql) {
await sql.begin(s => [
s`CREATE TABLE schema (version INTEGER UNIQUE NOT NULL);`,
s`INSERT INTO schema VALUES (0);`
]);
}
export interface PostgresStoreOpts extends postgres.Options<Record<string, PostgresType<unknown>>> {
/**
* URL to reach the database on.
*/
url?: string;
/**
* Should the schema table be automatically created (the v0 schema effectively).
* Defaults to `true`.
*/
autocreateSchemaTable?: boolean;
}
export type SchemaUpdateFunction = (sql: postgres.Sql) => Promise<void>|void;
/**
* PostgreSQL datastore abstraction which can be inherited by a specialised bridge class.
*
* @example
* class MyBridgeStore extends PostgresStore {
* constructor(myurl) {
* super([schemav1, schemav2, schemav3], { url: myurl });
* }
*
* async getData() {
* return this.sql`SELECT * FROM mytable`
* }
* }
*
* // Which can then be used by doing
* const store = new MyBridgeStore("postgresql://postgres_user:postgres_password@postgres");
* store.ensureSchema();
* const data = await store.getData();
*/
export abstract class PostgresStore {
private hasEnded = false;
public readonly sql: postgres.Sql;
public get latestSchema() {
return this.schemas.length;
}
/**
* Construct a new store.
* @param schemas The set of schema functions to apply to a database. The ordering of this array determines the
* schema number.
* @param opts Options to supply to the PostgreSQL client, such as `url`.
*/
constructor(private readonly schemas: SchemaUpdateFunction[], private readonly opts: PostgresStoreOpts) {
opts.autocreateSchemaTable = opts.autocreateSchemaTable ?? true;
this.sql = opts.url ? postgres(opts.url, opts) : postgres(opts);
process.once("beforeExit", () => {
// Ensure we clean up on exit
this.destroy().catch(ex => {
log.warn('Failed to cleanly exit', ex);
});
})
}
/**
* Ensure the database schema is up to date. If you supplied
* `autocreateSchemaTable` to `opts` in the constructor, a fresh database
* will have a `schema` table created for it.
*
* @throws If a schema could not be applied cleanly.
*/
public async ensureSchema(): Promise<void> {
log.info("Starting database engine");
let currentVersion = await this.getSchemaVersion();
if (currentVersion === -1) {
if (this.opts.autocreateSchemaTable) {
log.info(`Applying v0 schema (schema table)`);
await v0Schema(this.sql);
currentVersion = 0;
} else {
// We aren't autocreating the schema table, so assume schema 0.
currentVersion = 0;
}
}
// Zero-indexed, so schema 1 would be in slot 0.
while (this.schemas[currentVersion]) {
log.info(`Updating schema to v${currentVersion + 1}`);
const runSchema = this.schemas[currentVersion];
try {
await runSchema(this.sql);
currentVersion++;
await this.updateSchemaVersion(currentVersion);
}
catch (ex) {
log.warn(`Failed to run schema v${currentVersion + 1}:`, ex);
throw Error("Failed to update database schema");
}
}
log.info(`Database schema is at version v${currentVersion}`);
}
/**
* Clean away any resources used by the database. This is automatically
* called before the process exits.
*/
public async destroy(): Promise<void> {
log.info("Destroy called");
if (this.hasEnded) {
// No-op if end has already been called.
return;
}
this.hasEnded = true;
await this.sql.end();
log.info("PostgresSQL connection ended");
}
/**
* Update the current schema version.
* @param version
*/
protected async updateSchemaVersion(version: number): Promise<void> {
log.debug(`updateSchemaVersion: ${version}`);
await this.sql`UPDATE schema SET version = ${version};`;
}
/**
* Get the current schema version.
* @returns The current schema version, or `-1` if no schema table is found.
*/
protected async getSchemaVersion(): Promise<number> {
try {
const result = await this.sql<{version: number}[]>`SELECT version FROM SCHEMA;`;
return result[0].version;
}
catch (ex) {
if (ex instanceof PostgresError && ex.code === "42P01") { // undefined_table
log.warn("Schema table could not be found");
return -1;
}
log.error("Failed to get schema version", ex);
}
throw Error("Couldn't fetch schema version");
}
}