-
Notifications
You must be signed in to change notification settings - Fork 1
/
pgdown.js
213 lines (174 loc) · 5.39 KB
/
pgdown.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
'use strict'
const inherits = require('inherits')
const AbstractLevelDOWN = require('abstract-leveldown/abstract-leveldown')
const util = require('./util')
const PgIterator = require('./pg-iterator')
const PgChainedBatch = require('./pg-chained-batch')
module.exports = PgDOWN
inherits(PgDOWN, AbstractLevelDOWN)
function PgDOWN (location) {
if (!(this instanceof PgDOWN)) {
return new PgDOWN(location)
}
AbstractLevelDOWN.call(this, location)
this._config = util.parseLocation(location)
}
const proto = PgDOWN.prototype
// NB: keys should *always* be stored using 'bytea'
proto._keyColumnType = 'bytea'
proto._valueColumnType = 'bytea'
proto._serializeKey = function (key) {
return util.serialize(this._keyColumnType, key)
}
proto._serializeValue = function (value) {
return util.serialize(this._valueColumnType, value)
}
proto._deserializeKey = function (key, asBuffer) {
return util.deserialize(this._keyColumnType, key, asBuffer)
}
proto._deserializeValue = function (value, asBuffer) {
return util.deserialize(this._valueColumnType, value, asBuffer)
}
// TODO: memoized getters
proto._sql_get = function (key) {
return `
SELECT value FROM ${this._config._relation} WHERE (key)=$1
`
}
proto._sql_del = function (key) {
return `
DELETE FROM ${this._config._relation} WHERE (key)=$1
`
}
proto._sql_insert = function () {
return `
INSERT INTO ${this._config._relation} (key, value) VALUES($1,$2)
`
}
proto._sql_update = function () {
return `
UPDATE ${this._config._relation} SET value=($2) WHERE key=($1)
`
}
proto._sql_put = function (key, value) {
return this._sql_insert() + ' ON CONFLICT (key) DO UPDATE SET value=excluded.value'
}
proto._open = function (options, cb) {
const config = this._config
const pool = this._pool = util.createPool(config)
// TODO: make pool init async, do create schema if not exists dance just once
const createIfMissing = options.createIfMissing
const errorIfExists = options.errorIfExists
const IF_NOT_EXISTS = errorIfExists ? '' : 'IF NOT EXISTS'
const schema = config._schema
const table = config._table
const relation = config._relation
// always create pgdown schema
pool.query(`
CREATE SCHEMA IF NOT EXISTS ${schema}
`, (err) => err ? fail(err) : info())
const info = () => {
pool.query(`
SELECT tablename FROM pg_catalog.pg_tables WHERE schemaname=$1 AND tablename=$2
`, [schema, table], (err, result) => {
const exists = result && result.rowCount === 1
if (errorIfExists && exists) {
err = new Error('table already exists: ' + table)
} else if (!createIfMissing && !exists) {
err = new Error('table does not exist: ' + table)
}
if (err) {
fail(err)
} else if (createIfMissing) {
create()
} else {
cb()
}
})
}
const create = () => {
// TODO: use separate column names for different value types?
pool.query(`
CREATE TABLE ${IF_NOT_EXISTS} ${relation} (
key ${this._keyColumnType} PRIMARY KEY,
value ${this._valueColumnType}
)
`, (err) => {
err ? fail(err) : cb()
})
}
const fail = (err) => {
this._pool = null
util.destroyPool(pool, (err_) => {
cb(err)
})
}
}
proto._close = function (cb) {
const pool = this._pool
if (pool) {
this._pool = null
util.destroyPool(pool, cb)
} else {
process.nextTick(cb)
}
}
proto._get = function (key, options, cb) {
this._pool.query(this._sql_get(), [key], (err, result) => {
if (err) {
cb(err)
} else if (result.rowCount === 1) {
cb(null, this._deserializeValue(result.rows[0].value, options.asBuffer))
} else if (result.rowCount === 0) {
cb(new util.NotFoundError('not found: ' + key))
} else {
cb(new Error('unexpected result for key: ' + key))
}
})
}
proto._put = function (key, value, options, cb) {
const batch = [{ type: 'put', key: key, value: value }]
this._batch(batch, options, (err) => cb(err || null))
}
proto._del = function (key, options, cb) {
const batch = [{ type: 'del', key: key }]
this._batch(batch, options, (err) => cb(err || null))
}
proto._batch = function (ops, options, cb) {
const tx = util.createTransaction(this._pool, cb)
ops.forEach((op) => {
// TODO: merge op.options with batch options?
if (op.type === 'put') {
tx.query(this._sql_put(), [op.key, op.value])
} else if (op.type === 'del') {
tx.query(this._sql_del(), [op.key])
}
})
tx.commit()
}
proto._chainedBatch = function () {
return new PgChainedBatch(this)
}
proto._iterator = function (options) {
return new PgIterator(this, options)
}
// NB: represents exact compressed size?
proto._approximateSize = function (start, end, cb) {
const options = { start: start, end: end }
// generate standard iterator sql and replace head clause
const context = PgIterator._parseOptions(this, options)
const relation = this._config._relation
const head = `SELECT sum(pg_column_size(tbl)) as size FROM ${relation} as tbl`
context.clauses.unshift(head)
const text = context.clauses.join(' ')
this._pool.query(text, context.values, (err, result) => {
if (err) return cb(err)
const size = result.rowCount && Number(result.rows[0].size)
if (result.rowCount === 1 && !isNaN(size)) {
cb(null, size)
} else {
cb(new Error('failed to calculate approximate size'))
}
})
}
PgDOWN.destroy = util.dropTable