-
Notifications
You must be signed in to change notification settings - Fork 25
/
defaults.cljc
477 lines (453 loc) · 21.5 KB
/
defaults.cljc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
(ns konserve.impl.defaults
"Default implementation of the high level protocol given a binary backing implementation as defined in the storage-layout namespace."
(:require
[clojure.core.async :refer [<! timeout]]
[clojure.string :refer [ends-with?]]
[hasch.core :refer [uuid]]
[konserve.serializers :refer [key->serializer]]
[konserve.compressor :refer [get-compressor]]
[konserve.encryptor :refer [get-encryptor]]
[konserve.protocols :refer [PEDNKeyValueStore -exists?
PBinaryKeyValueStore
-serialize -deserialize
PKeyIterable]]
[konserve.impl.storage-layout :refer [-atomic-move -create-store
-copy -create-blob -delete-blob -blob-exists?
-keys -sync-store
-migratable -migrate -handle-foreign-key
-close -get-lock -sync
-read-header -read-meta -read-value -read-binary
-write-header -write-meta -write-value -write-binary
PBackingLock -release
default-version
parse-header create-header header-size]]
[konserve.utils #?@(:clj [:refer [async+sync *default-sync-translation*]]
:cljs [:refer [*default-sync-translation*] :refer-macros [async+sync]])]
[superv.async :refer [go-try- <?-]]
[taoensso.timbre :refer [trace]])
#?(:clj
(:import
[java.io ByteArrayOutputStream ByteArrayInputStream])))
(extend-protocol PBackingLock
nil
(-release [_this env]
(if (:sync? env) nil (go-try- nil))))
(defn key->store-key [key]
(str (uuid key) ".ksv"))
(defn store-key->uuid-key [^String store-key]
(cond
(.endsWith store-key ".ksv") (subs store-key 0 (- (.length store-key) 4))
(.endsWith store-key ".ksv.new") (subs store-key 0 (- (.length store-key) 8))
(.endsWith store-key ".ksv.backup") (subs store-key 0 (- (.length store-key) 11))
:else (throw (ex-info (str "Invalid konserve store key: " store-key)
{:key store-key}))))
#?(:cljs (extend-type js/Uint8Array ICounted (-count [this] (alength this))))
(defn update-blob
"This function writes first the meta-size, then the meta-data and then the
actual updated data into the underlying backing store."
[backing store-key serializer write-handlers
{:keys [key-vec compressor encryptor up-fn up-fn-meta
config operation input sync? version] :as env} [old-meta old-value]]
(async+sync
sync? *default-sync-translation*
(go-try-
(let [[key & rkey] key-vec
store-key (or store-key (key->store-key key))
to-array #?(:cljs
(fn [value]
(-serialize ((encryptor (:encryptor config)) (compressor serializer)) nil write-handlers value))
:clj
(fn [value]
(let [bos (ByteArrayOutputStream.)]
(try (-serialize ((encryptor (:encryptor config)) (compressor serializer))
bos write-handlers value)
(.toByteArray bos)
(finally
(.close bos))))))
meta (up-fn-meta old-meta)
value (when (= operation :write-edn)
(if-not (empty? rkey)
(update-in old-value rkey up-fn)
(up-fn old-value)))
new-store-key (if (:in-place? config)
store-key
(str store-key ".new"))
backup-store-key (str store-key ".backup")
_ (when (:in-place? config) ;; let's back things up before writing then
(trace "backing up to blob: " backup-store-key " for key " key)
(<?- (-copy backing store-key backup-store-key env)))
meta-arr (to-array meta)
meta-size (count meta-arr)
header (create-header version
serializer compressor encryptor meta-size)
new-blob (<?- (-create-blob backing new-store-key env))]
(try
(<?- (-write-header new-blob header env))
(<?- (-write-meta new-blob meta-arr env))
(if (= operation :write-binary)
(<?- (-write-binary new-blob meta-size input env))
(let [value-arr (to-array value)]
(<?- (-write-value new-blob value-arr meta-size env))))
(when (:sync-blob? config)
(trace "syncing for " key)
(<?- (-sync new-blob env))
(<?- (-sync-store backing env)))
(<?- (-close new-blob env))
(when-not (:in-place? config)
(trace "moving blob: " key)
(<?- (-atomic-move backing new-store-key store-key env)))
(if (= operation :write-edn) [old-value value] true)
(finally
(<?- (-close new-blob env))))))))
(defn read-header [ac serializers env]
(let [{:keys [sync?]} env]
(async+sync sync? *default-sync-translation*
(go-try-
(let [arr (<?- (-read-header ac env))]
(parse-header arr serializers))))))
(defn read-blob
"Read meta, edn or binary from blob."
[blob read-handlers serializers {:keys [sync? operation locked-cb config store-key] :as env}]
(async+sync
sync? *default-sync-translation*
(go-try-
(let [[_ serializer compressor encryptor meta-size header-size]
(<?- (read-header blob serializers env))
env (assoc env :header-size header-size)
fn-read (partial -deserialize
(compressor ((encryptor (:encryptor config)) serializer))
read-handlers)]
(case operation
:read-meta #?(:cljs (fn-read (<?- (-read-meta blob meta-size env)))
:clj
(let [bais-read (ByteArrayInputStream.
(<?- (-read-meta blob meta-size env)))
value (fn-read bais-read)
_ (.close bais-read)]
value))
:read-edn #?(:cljs (fn-read (<?- (-read-value blob meta-size env)))
:clj
(let [bais-read (ByteArrayInputStream.
(<?- (-read-value blob meta-size env)))
value (fn-read bais-read)
_ (.close bais-read)]
value))
:write-binary #?(:cljs
(let [meta (fn-read (<?- (-read-meta blob meta-size env)))]
[meta nil])
:clj
(let [bais-read (ByteArrayInputStream.
(<?- (-read-meta blob meta-size env)))
meta (fn-read bais-read)
_ (.close bais-read)]
[meta nil]))
:write-edn #?(:cljs
(let [meta (fn-read (<?- (-read-meta blob meta-size env)))
value (fn-read (<?- (-read-value blob meta-size env)))]
[meta value])
:clj
(let [bais-meta (ByteArrayInputStream.
(<?- (-read-meta blob meta-size env)))
meta (fn-read bais-meta)
_ (.close bais-meta)
bais-value (ByteArrayInputStream.
(<?- (-read-value blob meta-size env)))
value (fn-read bais-value)
_ (.close bais-value)]
[meta value]))
:read-binary (<?- (-read-binary blob meta-size locked-cb env)))))))
(defn delete-blob
"Remove/Delete key-value pair of backing store by given key. If success it will return true."
[backing env]
(async+sync
(:sync? env) *default-sync-translation*
(go-try-
(let [{:keys [key-vec base]} env
key (first key-vec)
store-key (key->store-key key)
blob-exists? (<?- (-blob-exists? backing store-key env))]
(if blob-exists?
(try
(<?- (-delete-blob backing store-key env))
true
(catch #?(:clj Exception :cljs js/Error) e
(throw (ex-info "Could not delete key."
{:key key
:base base
:exception e}))))
false)))))
(def ^:const max-lock-attempts 100)
(defn get-lock [this store-key env]
(async+sync
(:sync? env)
*default-sync-translation*
(go-try-
(loop [i 0]
(let [[l e :as res] (try
[(<?- (-get-lock this env)) nil]
(catch #?(:clj Exception :cljs js/Error) e
(trace "Failed to acquire lock: " e)
[nil e]))]
(if-not (nil? l)
l
(do
#?(:cljs
(when-not (:sync? env)
;; cannot blocking sleep in sync nodejs w/o package
(<! (timeout (rand-int 20))))
:clj
(if (:sync? env)
(Thread/sleep (long (rand-int 20)))
(<! (timeout (rand-int 20)))))
(if (> i max-lock-attempts)
(throw (ex-info (str "Failed to acquire lock after " i " iterations.")
{:type :file-lock-acquisition-error
:error e
:store-key store-key}))
(recur (inc i))))))))))
(defn io-operation
"Read/Write blob. For better understanding use the flow-chart of konserve."
[{:keys [backing]} serializers read-handlers write-handlers
{:keys [key-vec operation default-serializer sync? overwrite? config] :as env}]
(async+sync
sync? *default-sync-translation*
(go-try-
(let [key (first key-vec)
store-key (key->store-key key)
env (assoc env :store-key store-key :header-size header-size)
serializer (get serializers default-serializer)
store-key-exists? (<?- (-blob-exists? backing store-key env))
migration-key (<?- (-migratable backing key store-key env))]
(if (and (not store-key-exists?) migration-key)
(<?- (-migrate backing migration-key key-vec serializer read-handlers write-handlers env))
(if (or store-key-exists? (= :write-edn operation) (= :write-binary operation))
(let [blob (<?- (-create-blob backing store-key env))
lock (when (:lock-blob? config)
(trace "Acquiring blob lock for: " key (str blob))
(<?- (get-lock blob (first key-vec) env)))]
(try
(let [old (if (and store-key-exists? (not overwrite?))
(<?- (read-blob blob read-handlers serializers env))
[nil nil])]
(if (or (= :write-edn operation) (= :write-binary operation))
(<?- (update-blob backing store-key serializer write-handlers env old))
old))
(finally
(when (:lock-blob? config)
(trace "Releasing lock for " (first key-vec) (str blob))
(<?- (-release lock env)))
(<?- (-close blob env)))))
nil))))))
(defn list-keys
"Return all keys in the store."
[{:keys [backing]}
serializers read-handlers write-handlers {:keys [sync? config] :as env}]
(async+sync
sync? *default-sync-translation*
(go-try-
(let [serializer (get serializers (:default-serializer env))
store-keys (<?- (-keys backing env))]
(loop [keys #{}
[store-key & store-keys] store-keys]
(if store-key
(cond
(or (ends-with? store-key ".new")
(ends-with? store-key ".backup"))
(recur keys store-keys)
(ends-with? store-key ".ksv")
(let [blob (<?- (-create-blob backing store-key env))
env (update-in env [:msg :keys] (fn [_] store-key))
env (assoc env :store-key store-key)
lock (when (and (:in-place? config) (:lock-blob? config))
(trace "Acquiring blob lock for: " store-key (str blob))
(<?- (-get-lock blob env)))
keys-new (try (conj keys (<?- (read-blob blob read-handlers serializers env)))
;; it can be that the blob has been deleted, ignore reading errors
(catch #?(:clj Exception :cljs js/Error) _
keys)
(finally
(<?- (-release lock env))
(<?- (-close blob env))))]
(recur keys-new store-keys))
:else ;; needs migration
(let [additional-keys (<! (-handle-foreign-key backing store-key serializer read-handlers write-handlers env))]
(recur (into keys additional-keys) store-keys)))
keys))))))
(defrecord DefaultStore [version backing serializers default-serializer compressor encryptor
read-handlers write-handlers buffer-size locks config]
PEDNKeyValueStore
(-exists? [_ key env]
(async+sync
(:sync? env) *default-sync-translation*
(go-try-
(let [store-key (key->store-key key)]
(or (<?- (-blob-exists? backing store-key env))
(<?- (-migratable backing key store-key env))
false)))))
(-get-in [this key-vec not-found opts]
(let [{:keys [sync?]} opts]
(async+sync
sync?
*default-sync-translation*
(go-try-
(if (<?- (-exists? this (first key-vec) opts))
(let [a (<?-
(io-operation this serializers read-handlers write-handlers
{:key-vec key-vec
:operation :read-edn
:compressor compressor
:encryptor encryptor
:format :data
:version version
:sync? sync?
:buffer-size buffer-size
:config config
:default-serializer default-serializer
:msg {:type :read-edn-error
:key key}}))]
(clojure.core/get-in a (rest key-vec)))
not-found)))))
(-get-meta [this key opts]
(let [{:keys [sync?]} opts]
(io-operation this serializers read-handlers write-handlers
{:key-vec [key]
:operation :read-meta
:compressor compressor
:encryptor encryptor
:default-serializer default-serializer
:version version
:sync? sync?
:buffer-size buffer-size
:config config
:msg {:type :read-meta-error
:key key}})))
(-assoc-in [this key-vec meta-up val opts]
(let [{:keys [sync?]} opts]
(io-operation this serializers read-handlers write-handlers
{:key-vec key-vec
:operation :write-edn
:compressor compressor
:encryptor encryptor
:version version
:default-serializer default-serializer
:up-fn (fn [_] val)
:up-fn-meta meta-up
:config config
:sync? sync?
:buffer-size buffer-size
:overwrite? true
:msg {:type :write-edn-error
:key (first key-vec)}})))
(-update-in [this key-vec meta-up up-fn opts]
(let [{:keys [sync?]} opts]
(io-operation this serializers read-handlers write-handlers
{:key-vec key-vec
:operation :write-edn
:compressor compressor
:encryptor encryptor
:version version
:default-serializer default-serializer
:up-fn up-fn
:up-fn-meta meta-up
:config config
:sync? sync?
:buffer-size buffer-size
:msg {:type :write-edn-error
:key (first key-vec)}})))
(-dissoc [_ key opts]
(delete-blob backing
{:key-vec [key]
:operation :write-edn
:compressor compressor
:encryptor encryptor
:version version
:default-serializer default-serializer
:config config
:sync? (:sync? opts)
:buffer-size buffer-size
:msg {:type :deletion-error
:key key}}))
PBinaryKeyValueStore
(-bget [this key locked-cb opts]
(let [{:keys [sync?]} opts]
(io-operation this serializers read-handlers write-handlers
{:key-vec [key]
:operation :read-binary
:default-serializer default-serializer
:compressor compressor
:encryptor encryptor
:config config
:version version
:sync? sync?
:buffer-size buffer-size
:locked-cb locked-cb
:msg {:type :read-binary-error
:key key}})))
(-bassoc [this key meta-up input opts]
(let [{:keys [sync?]} opts]
(io-operation this serializers read-handlers write-handlers
{:key-vec [key]
:operation :write-binary
:default-serializer default-serializer
:compressor compressor
:encryptor encryptor
:input input
:version version
:up-fn-meta meta-up
:config config
:sync? sync?
:buffer-size buffer-size
:msg {:type :write-binary-error
:key key}})))
PKeyIterable
(-keys [this opts]
(let [{:keys [sync?]} opts]
(list-keys this
serializers read-handlers write-handlers
{:operation :read-meta
:default-serializer default-serializer
:version version
:compressor compressor
:encryptor encryptor
:config config
:sync? sync?
:buffer-size buffer-size
:msg {:type :read-all-keys-error}}))))
(defn connect-default-store
"Create general store in given base of backing store."
[backing
{:keys [default-serializer serializers
read-handlers write-handlers
buffer-size config opts]
:or {default-serializer :FressianSerializer
read-handlers (atom {})
write-handlers (atom {})
buffer-size (* 1024 1024)
opts {:sync? false}}}]
;; check config
(let [complete-config (merge {:sync-blob? true
:in-place? false
:lock-blob? true}
config)
compressor (get-compressor (get-in config [:compressor :type]))
encryptor (get-encryptor (get-in config [:encryptor :type]))]
(async+sync
(:sync? opts) *default-sync-translation*
(go-try-
(if (and (:in-place? complete-config) (not (:lock-blob? complete-config)))
(throw (ex-info "You need to activate file-locking for in-place mode."
{:type :store-configuration-error
:config complete-config}))
(let [_ (<?- (-create-store backing opts))
store (map->DefaultStore {:backing backing
:default-serializer default-serializer
:serializers (merge key->serializer serializers)
:version default-version
:compressor compressor
:encryptor encryptor
:read-handlers read-handlers
:write-handlers write-handlers
:buffer-size buffer-size
:locks (atom {})
:config complete-config})]
store))))))