-
Notifications
You must be signed in to change notification settings - Fork 2k
/
postgres.ts
197 lines (165 loc) Β· 5.77 KB
/
postgres.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
import pg, { PoolConfig, Pool } from "pg";
import {
ListKeyOptions,
RecordManagerInterface,
UpdateOptions,
} from "./base.js";
export type PostgresRecordManagerOptions = {
postgresConnectionOptions: PoolConfig;
tableName?: string;
schema?: string;
};
export class PostgresRecordManager implements RecordManagerInterface {
lc_namespace = ["langchain", "recordmanagers", "postgres"];
pool: Pool;
tableName: string;
namespace: string;
finalTableName: string;
constructor(namespace: string, config: PostgresRecordManagerOptions) {
const { postgresConnectionOptions, tableName } = config;
this.namespace = namespace;
this.pool = new pg.Pool(postgresConnectionOptions);
this.tableName = tableName || "upsertion_records";
this.finalTableName = config.schema
? `"${config.schema}"."${tableName}"`
: `"${tableName}"`;
}
async createSchema(): Promise<void> {
try {
await this.pool.query(`
CREATE TABLE IF NOT EXISTS ${this.finalTableName} (
uuid UUID PRIMARY KEY DEFAULT gen_random_uuid(),
key TEXT NOT NULL,
namespace TEXT NOT NULL,
updated_at Double PRECISION NOT NULL,
group_id TEXT,
UNIQUE (key, namespace)
);
CREATE INDEX IF NOT EXISTS updated_at_index ON ${this.finalTableName} (updated_at);
CREATE INDEX IF NOT EXISTS key_index ON ${this.finalTableName} (key);
CREATE INDEX IF NOT EXISTS namespace_index ON ${this.finalTableName} (namespace);
CREATE INDEX IF NOT EXISTS group_id_index ON ${this.finalTableName} (group_id);`);
// eslint-disable-next-line @typescript-eslint/no-explicit-any
} catch (e: any) {
// This error indicates that the table already exists
// Due to asynchronous nature of the code, it is possible that
// the table is created between the time we check if it exists
// and the time we try to create it. It can be safely ignored.
if ("code" in e && e.code === "23505") {
return;
}
throw e;
}
}
async getTime(): Promise<number> {
const res = await this.pool.query(
"SELECT EXTRACT(EPOCH FROM CURRENT_TIMESTAMP)"
);
return Number.parseFloat(res.rows[0].extract);
}
/**
* Generates the SQL placeholders for a specific row at the provided index.
*
* @param index - The index of the row for which placeholders need to be generated.
* @param numOfColumns - The number of columns we are inserting data into.
* @returns The SQL placeholders for the row values.
*/
private generatePlaceholderForRowAt(
index: number,
numOfColumns: number
): string {
const placeholders = [];
for (let i = 0; i < numOfColumns; i += 1) {
placeholders.push(`$${index * numOfColumns + i + 1}`);
}
return `(${placeholders.join(", ")})`;
}
async update(keys: string[], updateOptions?: UpdateOptions): Promise<void> {
if (keys.length === 0) {
return;
}
const updatedAt = await this.getTime();
const { timeAtLeast, groupIds: _groupIds } = updateOptions ?? {};
if (timeAtLeast && updatedAt < timeAtLeast) {
throw new Error(
`Time sync issue with database ${updatedAt} < ${timeAtLeast}`
);
}
const groupIds = _groupIds ?? keys.map(() => null);
if (groupIds.length !== keys.length) {
throw new Error(
`Number of keys (${keys.length}) does not match number of group_ids ${groupIds.length})`
);
}
const recordsToUpsert = keys.map((key, i) => [
key,
this.namespace,
updatedAt,
groupIds[i],
]);
const valuesPlaceholders = recordsToUpsert
.map((_, j) =>
this.generatePlaceholderForRowAt(j, recordsToUpsert[0].length)
)
.join(", ");
const query = `INSERT INTO ${this.finalTableName} (key, namespace, updated_at, group_id) VALUES ${valuesPlaceholders} ON CONFLICT (key, namespace) DO UPDATE SET updated_at = EXCLUDED.updated_at;`;
await this.pool.query(query, recordsToUpsert.flat());
}
async exists(keys: string[]): Promise<boolean[]> {
if (keys.length === 0) {
return [];
}
const startIndex = 2;
const arrayPlaceholders = keys
.map((_, i) => `$${i + startIndex}`)
.join(", ");
const query = `
SELECT k, (key is not null) ex from unnest(ARRAY[${arrayPlaceholders}]) k left join ${this.finalTableName} on k=key and namespace = $1;
`;
const res = await this.pool.query(query, [this.namespace, ...keys.flat()]);
return res.rows.map((row: { ex: boolean }) => row.ex);
}
async listKeys(options?: ListKeyOptions): Promise<string[]> {
const { before, after, limit, groupIds } = options ?? {};
let query = `SELECT key FROM ${this.finalTableName} WHERE namespace = $1`;
const values: (string | number | (string | null)[])[] = [this.namespace];
let index = 2;
if (before) {
values.push(before);
query += ` AND updated_at < $${index}`;
index += 1;
}
if (after) {
values.push(after);
query += ` AND updated_at > $${index}`;
index += 1;
}
if (limit) {
values.push(limit);
query += ` LIMIT $${index}`;
index += 1;
}
if (groupIds) {
values.push(groupIds);
query += ` AND group_id = ANY($${index})`;
index += 1;
}
query += ";";
const res = await this.pool.query(query, values);
return res.rows.map((row: { key: string }) => row.key);
}
async deleteKeys(keys: string[]): Promise<void> {
if (keys.length === 0) {
return;
}
const query = `DELETE FROM ${this.finalTableName} WHERE namespace = $1 AND key = ANY($2);`;
await this.pool.query(query, [this.namespace, keys]);
}
/**
* Terminates the connection pool.
* @returns {Promise<void>}
*/
async end(): Promise<void> {
await this.pool.end();
}
}