-
Notifications
You must be signed in to change notification settings - Fork 3
/
rds.ts
176 lines (161 loc) · 5.22 KB
/
rds.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
// Low-level config and utilities for RDS.
import {
BeginTransactionCommand,
CommitTransactionCommand,
ExecuteStatementCommand,
ExecuteStatementCommandOutput,
Field,
RDSDataClient,
RollbackTransactionCommand,
} from "@aws-sdk/client-rds-data";
import { execute } from "fp-ts/lib/State";
const region = "us-west-2";
const dbName =
process.env.REPLIDRAW_DB_NAME || "replicache_sample_replidraw__dev";
console.log({ dbName });
const resourceArn =
"arn:aws:rds:us-west-2:712907626835:cluster:replicache-demo-notes";
const secretArn =
"arn:aws:secretsmanager:us-west-2:712907626835:secret:rds-db-credentials/cluster-X5NALMLWZ34K55M5ZZVPN2IYOI/admin-65L3ia";
const client = new RDSDataClient({
region,
// TODO: How to have it fall back to the .aws/credentials file?
credentials: {
accessKeyId: process.env.AMAZON_ACCESS_KEY_ID ?? "",
secretAccessKey: process.env.AMAZON_SECRET_ACCESS_KEY ?? "",
},
});
export type ExecuteStatementFn = typeof executeStatement;
export type TransactionBodyFn = (executor: ExecuteStatementFn) => Promise<void>;
/**
* Invokes a supplied function within an RDS transaction.
* @param body Function to invoke. If this throws, the transaction will be rolled
* back. The thrown error will be re-thrown.
*/
export async function transact(body: TransactionBodyFn) {
const transactionId = (
await client.send(
new BeginTransactionCommand({
resourceArn,
secretArn,
database: dbName,
})
)
).transactionId;
try {
await body(async (sql, parameters) => {
return await executeStatementInDatabase(
dbName,
sql,
parameters,
transactionId
);
});
} catch (e) {
await client.send(
new RollbackTransactionCommand({
resourceArn,
secretArn,
transactionId,
})
);
throw e;
}
await client.send(
new CommitTransactionCommand({
resourceArn,
secretArn,
transactionId,
})
);
}
/**
* Creates our databse in RDS if necessary.
*/
export async function ensureDatabase() {
const result = await executeStatementInDatabase(null, "SHOW DATABASES");
if (result.records) {
if (result.records.find((record) => record[0].stringValue == dbName)) {
// TODO: Maybe migrate version
return;
}
}
await createDatabase();
}
async function createDatabase() {
await executeStatementInDatabase(null, "CREATE DATABASE " + dbName);
await executeStatement(`CREATE TABLE Cookie (
Version BIGINT NOT NULL)`);
await executeStatement(`CREATE TABLE Client (
Id VARCHAR(255) PRIMARY KEY NOT NULL,
LastMutationID BIGINT NOT NULL,
LastModified TIMESTAMP NOT NULL
DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP)`);
await executeStatement(`CREATE TABLE ClientState (
Id VARCHAR(255) PRIMARY KEY NOT NULL,
Content TEXT NOT NULL,
Version BIGINT NOT NULL)`);
// TODO: When https://github.com/rocicorp/replicache-sdk-js/issues/275 is
// fixed, can enable this.
//FOREIGN KEY (Id) REFERENCES Client (Id),
await executeStatement(`CREATE TABLE Shape (
Id VARCHAR(255) PRIMARY KEY NOT NULL,
Content TEXT NOT NULL,
Version BIGINT NOT NULL)`);
await executeStatement(`INSERT INTO Cookie (Version) VALUES (0)`);
// To calculate which rows have changed for return in the pull endpoint, we
// use a very simply global version number. This is easy to understand but
// does serialize writes.
//
// There are many different strategies for calculating changed rows and the
// details are very dependent on what you are building. Contact us if you'd
// like help: https://replicache.dev/#contact.
await executeStatement(`CREATE PROCEDURE NextVersion (OUT result BIGINT)
BEGIN
UPDATE Cookie SET Version = Version + 1;
SELECT Version INTO result FROM Cookie;
END`);
await executeStatement(`CREATE PROCEDURE PutShape (IN pId VARCHAR(255), IN pContent TEXT)
BEGIN
SET @version = 0;
CALL NextVersion(@version);
INSERT INTO Shape (Id, Content, Version) VALUES (pId, pContent, @version)
ON DUPLICATE KEY UPDATE Id = pId, Content = pContent, Version = @version;
END`);
await executeStatement(`CREATE PROCEDURE PutClientState (IN pId VARCHAR(255), IN pContent TEXT)
BEGIN
SET @version = 0;
CALL NextVersion(@version);
INSERT INTO ClientState (Id, Content, Version) VALUES (pId, pContent, @version)
ON DUPLICATE KEY UPDATE Id = pId, Content = pContent, Version = @version;
END`);
}
async function executeStatement(
sql: string,
parameters?: { [name: string]: Field }
): Promise<ExecuteStatementCommandOutput> {
return await executeStatementInDatabase(dbName, sql, parameters);
}
async function executeStatementInDatabase(
database: string | null,
sql: string,
parameters?: { [name: string]: Field },
transactionId?: string
): Promise<ExecuteStatementCommandOutput> {
const params =
parameters &&
Object.entries(parameters).map(([name, value]) => ({
name,
value,
}));
console.log("Executing", database, sql, params);
const command = new ExecuteStatementCommand({
database: database ?? "",
resourceArn,
secretArn,
sql,
parameters: params,
transactionId,
});
return await client.send(command);
}