-
Notifications
You must be signed in to change notification settings - Fork 3
/
clj_rocksdb.clj
365 lines (335 loc) · 9.51 KB
/
clj_rocksdb.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
(ns clj-rocksdb
(:refer-clojure :exclude [get sync])
(:require
[clojure.java.io :as io]
[byte-streams :as bs])
(:import
[java.io
Closeable
File]))
(import
'[org.rocksdb
WriteBatch
RocksIterator
Options
ReadOptions
WriteOptions
CompressionType
RocksDB])
;;;
(defn- closeable-seq
"Creates a seq which can be closed, given a latch which can be closed
and dereferenced to check whether it's already been closed."
[s close-fn]
(reify
Closeable
(close [this]
(close-fn))
clojure.lang.Sequential
clojure.lang.ISeq
clojure.lang.Seqable
clojure.lang.IPersistentCollection
(equiv [this x]
(loop [a this, b x]
(if (or (empty? a) (empty? b))
(and (empty? a) (empty? b))
(if (= (first x) (first b))
(recur (rest a) (rest b))
false))))
(empty [_]
[])
(count [_]
(count s))
(cons [_ a]
(cons a s))
(next [_]
(if-let [next-s (next s)]
(closeable-seq next-s close-fn)
(do
(close-fn)
nil)))
(more [this]
(let [rst (next this)]
(if (empty? rst)
'()
rst)))
(first [_]
(first s))
(seq [_]
(seq s))))
(defn- iterator-seq- [^RocksIterator iterator start end key-decoder key-encoder val-decoder]
(if start
(.seek ^RocksIterator iterator (bs/to-byte-array (key-encoder start)))
(.seekToFirst ^RocksIterator iterator))
(let [iter (fn iter [it]
(if-not (.isValid it) '()
(lazy-seq (cons [(.key it) (.value it)] (iter (doto it .next))))))
s (iter iterator)
s (if end
(let [end (bs/to-byte-array (key-encoder end))]
(take-while
#(not (pos? (bs/compare-bytes (first %) end)))
s))
s)]
(closeable-seq
(map
#(vector
(key-decoder (first %))
(val-decoder (second %)))
s)
(reify
Object
(finalize [_] (.close iterator))
clojure.lang.IFn
(invoke [_] (.close iterator))))))
;;;
(defprotocol IRocksDB
(^:private ^RocksDB db- [_])
(^:private batch- [_] [_ options])
(^:private iterator- [_] [_ start end])
(^:private get- [_ k])
(^:private put- [_ k v options])
(^:private del- [_ k options])
(^:private snapshot- [_]))
(defrecord Snapshot
[db
key-decoder
key-encoder
val-decoder
^ReadOptions read-options]
IRocksDB
(snapshot- [this] this)
(db- [_] (db- db))
(get- [_ k]
(val-decoder (.get (db- db) read-options (bs/to-byte-array (key-encoder k)))))
(iterator- [_ start end]
(iterator-seq-
(.newIterator (db- db) read-options)
start
end
key-decoder
key-encoder
val-decoder))
Closeable
(close [_]
(-> read-options .snapshot .close))
(finalize [this] (.close this)))
(defrecord Batch
[^RocksDB db
^WriteBatch batch
key-encoder
val-encoder
^WriteOptions options]
IRocksDB
(db- [_] db)
(batch- [this _] this)
(put- [_ k v _]
(.put batch
(bs/to-byte-array (key-encoder k))
(bs/to-byte-array (val-encoder v))))
(del- [_ k _]
(.delete batch (bs/to-byte-array (key-encoder k))))
Closeable
(close [_]
(.write db (or options (WriteOptions.)) batch)
(.close batch)))
(defrecord DB
[^RocksDB db
key-decoder
key-encoder
val-decoder
val-encoder]
Closeable
(close [_] (.close db))
IRocksDB
(db- [_]
db)
(get- [_ k]
(let [k (bs/to-byte-array (key-encoder k))]
(some-> (.get db k)
val-decoder)))
(put- [_ k v options]
(let [k (bs/to-byte-array (key-encoder k))
v (bs/to-byte-array (val-encoder v))]
(if options
(.put db k v options)
(.put db k v))))
(del- [_ k options]
(let [k (bs/to-byte-array (key-encoder k))]
(if options
(.delete db k options)
(.delete db k))))
(snapshot- [this]
(->Snapshot
this
key-decoder
key-encoder
val-decoder
(doto (ReadOptions.)
(.setSnapshot (.getSnapshot db)))))
(batch- [this options]
(->Batch
db
(WriteBatch.)
key-encoder
val-encoder
options))
(iterator- [_ start end]
(iterator-seq-
(.newIterator db)
start
end
key-decoder
key-encoder
val-decoder)))
(def ^:private option-setters
{:create-if-missing? #(.setCreateIfMissing ^Options %1 %2)
:error-if-exists? #(.setErrorIfExists ^Options %1 %2)
:write-buffer-size #(.setDbWriteBufferSize ^Options %1 %2)
:block-size #(.setArenablockSize ^Options %1 %2)
:max-open-files #(.setMaxOpenFiles ^Options %1 %2)
:cache-size #(.setBlockCacheSize ^Options %1 %2)
:comparator #(.setComparator ^Options %1 %2)
:paranoid-checks? #(.setParanoidChecks ^Options %1 %2)
:compress? #(.setCompressionType ^Options %1 (if % CompressionType/SNAPPY_COMPRESSION CompressionType/NO_COMPRESSION))
:logger #(.setLogger ^Options %1 %2)})
(defn create-db
"Creates a closeable database object, which takes a directory and zero or more options.
The key and val encoder/decoders are functions for transforming to and from byte-arrays."
[directory
{:keys [key-decoder
key-encoder
val-decoder
val-encoder
create-if-missing?
error-if-exists?
write-buffer-size
block-size
max-open-files
cache-size
comparator
compress?
paranoid-checks?
block-restart-interval
logger]
:or {key-decoder identity
key-encoder identity
val-decoder identity
val-encoder identity
compress? true
cache-size 32
block-size (* 16 1024)
write-buffer-size (* 32 1024 1024)
create-if-missing? true
error-if-exists? false}
:as options}]
(->DB
(RocksDB/open
(let [opts (Options.)]
(doseq [[k v] options]
(when (contains? option-setters k)
((option-setters k) opts v)))
opts
directory))
key-decoder
key-encoder
val-decoder
val-encoder))
(defn destroy-db
"Destroys the database at the specified `directory`."
[directory]
(RocksDB/destroyDB
directory
(Options.)))
;;;
(defn get
"Returns the value of `key` for the given database or snapshot. If the key doesn't exist, returns
`default-value` or nil."
([db key]
(get db key nil))
([db key default-value]
(let [v (get- db key)]
(if (nil? v)
default-value
v))))
(defn snapshot
"Returns a snapshot of the database that can be used with `get` and `iterator`. This implements
java.io.Closeable, and can leak space in the database if not closed."
[db]
(snapshot- db))
(defn iterator
"Returns a closeable sequence of map entries (accessed with `key` and `val`) that is the inclusive
range from `start `to `end`. If exhausted, the sequence is automatically closed."
([db]
(iterator db nil nil))
([db start]
(iterator db start nil))
([db start end]
(iterator- db start end)))
(defn put
"Puts one or more key/value pairs into the given `db`."
([db])
([db key val]
(put- db key val nil))
([db key val & key-vals]
(with-open [^Batch batch (batch- db nil)]
(put- batch key val nil)
(doseq [[k v] (partition 2 key-vals)]
(put- batch k v nil)))))
(defn delete
"Deletes one or more keys in the given `db`."
([db])
([db key]
(del- db key nil))
([db key & keys]
(with-open [^Batch batch (batch- db nil)]
(del- batch key nil)
(doseq [k keys]
(del- batch k nil)))))
(defn sync
"Forces the database to fsync."
[db]
(with-open [^Batch batch (batch- db (doto (WriteOptions.) (.sync true)))]))
(defn stats
"Returns statistics for the database."
[db property]
(.getProperty (db- db) "rocksdb.stats"))
(defn bounds
"Returns a tuple of the lower and upper keys in the database or snapshot."
[db]
(let [key-decoder (:key-decoder db)]
(with-open [^RocksIterator iterator (condp instance? db
DB (.newIterator (db- db))
Snapshot (.newIterator (db- db) (:read-options db)))]
(doto iterator .seekToFirst)
(when (.isValid iterator)
[(-> (doto iterator .seekToFirst) .key key-decoder)
(-> (doto iterator .seekToLast) .key key-decoder)]))))
(defn compact
"Forces compaction of database over the given range. If `start` or `end` are nil, they default to
the full range of the database."
([db]
(compact db nil nil))
([db start]
(compact db start nil))
([db start end]
(let [encoder (:key-encoder db)
[start' end'] (bounds db)
start (or start start')
end (or end end')]
(when (and start end)
(.compactRange (db- db)
(bs/to-byte-array (encoder start))
(bs/to-byte-array (encoder end)))))))
(defn batch
"Batch a collection of put and/or delete operations into the supplied `db`.
Takes a map of the form `{:put [key1 value1 key2 value2] :delete [key3 key4]}`.
If `:put` key is provided, it must contain an even-length sequence of alternating keys and values."
([db])
([db {puts :put deletes :delete}]
(assert (even? (count puts)) ":put option requires even number of keys and values.")
(with-open [^Batch batch (batch- db nil)]
(doseq [[k v] (partition 2 puts)]
(put- batch k v nil))
(doseq [k deletes]
(del- batch k nil)))))