-
-
Notifications
You must be signed in to change notification settings - Fork 2
/
core.clj
349 lines (295 loc) · 14.4 KB
/
core.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
(ns konserve-s3.core
"S3 based konserve backend."
(:require [konserve.impl.defaults :refer [connect-default-store]]
[konserve.impl.storage-layout :refer [PBackingStore PBackingBlob PBackingLock -delete-store header-size]]
[konserve.utils :refer [async+sync *default-sync-translation*]]
[superv.async :refer [go-try-]]
[taoensso.timbre :refer [info]])
(:import [java.io ByteArrayInputStream ByteArrayOutputStream]
[java.util Arrays]
;; AWS API
[software.amazon.awssdk.regions Region]
[com.amazonaws.xray.interceptors TracingInterceptor]
[software.amazon.awssdk.core.client.config ClientOverrideConfiguration]
[software.amazon.awssdk.auth.credentials EnvironmentVariableCredentialsProvider AwsBasicCredentials StaticCredentialsProvider]
[software.amazon.awssdk.http.urlconnection UrlConnectionHttpClient]
[software.amazon.awssdk.core ResponseInputStream SdkBytes]
[software.amazon.awssdk.http AbortableInputStream]
;; AWS S3 API
;; https://sdk.amazonaws.com/java/api/latest/index.html?software/amazon/awssdk/services/s3/package-summary.html
[software.amazon.awssdk.services.s3 S3Client]
[software.amazon.awssdk.services.s3.model S3Object S3Request
CreateBucketRequest CreateBucketResponse
DeleteBucketRequest DeleteBucketResponse
HeadBucketRequest HeadBucketResponse
ListObjectsRequest ListObjectsResponse
GetObjectRequest GetObjectResponse
PutObjectRequest PutObjectRequest
CopyObjectRequest Delete DeleteObjectRequest DeleteObjectsRequest HeadObjectRequest
NoSuchBucketException NoSuchKeyException]
[software.amazon.awssdk.core.sync RequestBody]))
#_(set! *warn-on-reflection* 1)
(def ^:const default-bucket "konserve")
(def ^:const output-stream-buffer-size (* 1024 1024))
(def regions (into {} (map (fn [r] [(.toString r) r]) (Region/regions))))
(defn common-client-config
[client {:keys [region x-ray? access-key secret]}]
(-> client
(cond-> region (.region (regions region))
x-ray? (.overrideConfiguration (-> (ClientOverrideConfiguration/builder)
(.addExecutionInterceptor (TracingInterceptor.))
(.build)))
access-key (.credentialsProvider (StaticCredentialsProvider/create (AwsBasicCredentials/create access-key secret))))
(.httpClientBuilder (UrlConnectionHttpClient/builder))))
(defn s3-client
[opts]
(-> (S3Client/builder)
(common-client-config opts)
(.build)))
(defn bucket-exists? [client bucket]
(try
(.headBucket client (-> (HeadBucketRequest/builder)
(.bucket bucket)
(.build)))
true
(catch NoSuchBucketException _
false)))
(defn create-bucket [client bucket]
(.createBucket client (-> (CreateBucketRequest/builder)
(.bucket bucket)
(.build))))
(defn delete-bucket [client bucket]
(.deleteBucket client (-> (DeleteBucketRequest/builder)
(.bucket bucket)
(.build))))
(defn put-object [^S3Client client ^String bucket ^String key ^bytes bytes]
(.putObject client
(-> (PutObjectRequest/builder)
(.bucket bucket)
(.key key)
(.build))
^RequestBody (RequestBody/fromBytes bytes)))
(defn get-object [^S3Client client bucket key]
(try
(let [res (.getObject client
^S3Request (-> (GetObjectRequest/builder)
(.bucket bucket)
(.key key)
(.build)))
out (.readAllBytes res)]
(.close res)
out)
(catch NoSuchKeyException _
nil)))
(defn exists? [^S3Client client bucket key]
(try
(.headObject client
^S3Request (-> (HeadObjectRequest/builder)
(.bucket bucket)
(.key key)
(.build)))
true
(catch NoSuchKeyException _
false)))
(defn list-objects
[^S3Client client bucket]
(let [request (-> (ListObjectsRequest/builder)
(.bucket bucket)
(.build))]
(map #(.key %) (.contents (.listObjects client request)))))
(defn copy [client bucket source-key destination-key]
(.copyObject client (-> (CopyObjectRequest/builder)
(.sourceBucket bucket)
(.sourceKey source-key)
(.destinationBucket bucket)
(.destinationKey destination-key)
(.build))))
(defn delete [client bucket key]
(.deleteObject client (-> (DeleteObjectRequest/builder)
(.bucket bucket)
(.key key)
(.build))))
(defn delete-keys [client bucket keys]
(.deleteObject client (-> (DeleteObjectsRequest/builder)
(.bucket bucket)
(.delete (-> (Delete/builder)
(.objects keys)
(.build)))
(.build))))
(extend-protocol PBackingLock
Boolean
(-release [_ env]
(if (:sync? env) nil (go-try- nil))))
(defn ->key [store-id key]
(str store-id "_" key))
(defrecord S3Blob [bucket key data fetched-object]
PBackingBlob
(-sync [_ env]
(async+sync (:sync? env) *default-sync-translation*
(go-try- (let [{:keys [header meta value]} @data
baos (ByteArrayOutputStream. output-stream-buffer-size)]
(if (and header meta value)
(do
(.write baos header)
(.write baos meta)
(.write baos value)
(put-object (:client bucket)
(:bucket bucket)
key
(.toByteArray baos))
(.close baos))
(throw (ex-info "Updating a row is only possible if header, meta and value are set."
{:data @data})))
(reset! data {})))))
(-close [_ env]
(if (:sync? env) nil (go-try- nil)))
(-get-lock [_ env]
(if (:sync? env) true (go-try- true))) ;; May not return nil, otherwise eternal retries
(-read-header [_ env]
(async+sync (:sync? env) *default-sync-translation*
(go-try-
;; first access is always to header, after it is cached
(when-not @fetched-object
(reset! fetched-object (get-object (:client bucket) (:bucket bucket) key)))
(Arrays/copyOfRange ^bytes @fetched-object (int 0) (int header-size)))))
(-read-meta [_ meta-size env]
(async+sync (:sync? env) *default-sync-translation*
(go-try-
(Arrays/copyOfRange ^bytes @fetched-object (int header-size) (int (+ header-size meta-size))))))
(-read-value [_ meta-size env]
(async+sync (:sync? env) *default-sync-translation*
(go-try-
(let [obj ^bytes @fetched-object]
(Arrays/copyOfRange obj (int (+ header-size meta-size)) (int (alength obj)))))))
(-read-binary [_ meta-size locked-cb env]
(async+sync (:sync? env) *default-sync-translation*
(go-try-
(let [obj ^bytes @fetched-object]
(locked-cb {:input-stream
(ByteArrayInputStream.
(Arrays/copyOfRange obj (int (+ header-size meta-size)) (int (alength obj))))
:size (- (alength obj) (+ header-size meta-size))})))))
(-write-header [_ header env]
(async+sync (:sync? env) *default-sync-translation*
(go-try- (swap! data assoc :header header))))
(-write-meta [_ meta env]
(async+sync (:sync? env) *default-sync-translation*
(go-try- (swap! data assoc :meta meta))))
(-write-value [_ value _meta-size env]
(async+sync (:sync? env) *default-sync-translation*
(go-try- (swap! data assoc :value value))))
(-write-binary [_ _meta-size blob env]
(async+sync (:sync? env) *default-sync-translation*
(go-try- (swap! data assoc :value blob)))))
(defrecord S3Bucket [client bucket store-id]
PBackingStore
(-create-blob [this store-key env]
(async+sync (:sync? env) *default-sync-translation*
(go-try- (S3Blob. this (->key store-id store-key) (atom {}) (atom nil)))))
(-delete-blob [_ store-key env]
(async+sync (:sync? env) *default-sync-translation*
(go-try- (delete client bucket (->key store-id store-key)))))
(-blob-exists? [_ store-key env]
(async+sync (:sync? env) *default-sync-translation*
(go-try- (exists? client bucket (->key store-id store-key)))))
(-copy [_ from to env]
(async+sync (:sync? env) *default-sync-translation*
(go-try- (copy client bucket (->key store-id from) (->key store-id to)))))
(-atomic-move [_ from to env]
(async+sync (:sync? env) *default-sync-translation*
(go-try-
(copy client bucket (->key store-id from) (->key store-id to))
(delete client bucket (->key store-id from)))))
(-migratable [_ _key _store-key env]
(if (:sync? env) nil (go-try- nil)))
(-migrate [_ _migration-key _key-vec _serializer _read-handlers _write-handlers env]
(if (:sync? env) nil (go-try- nil)))
(-create-store [_ env]
(async+sync (:sync? env) *default-sync-translation*
(go-try-
(when-not (bucket-exists? client bucket)
(create-bucket client bucket)))))
(-sync-store [_ env]
(if (:sync? env) nil (go-try- nil)))
(-delete-store [_ env]
(async+sync (:sync? env) *default-sync-translation*
(go-try- (when (bucket-exists? client bucket)
(info "This will delete all konserve files, but won't delete the bucket. You can use konserve-s3.core/delete-bucket if you intend to delete the bucket as well.")
(doseq [keys (partition 1000 (list-objects client bucket))]
(delete-keys client bucket keys))
(.close client)))))
(-keys [_ env]
(async+sync (:sync? env) *default-sync-translation*
(go-try- (filter (fn [^String key]
(and (.startsWith key store-id)
(or (.endsWith key ".ksv")
(.endsWith key ".ksv.new")
(.endsWith key ".ksv.backup"))))
(list-objects client bucket))))))
(defn connect-store [s3-spec & {:keys [opts]
:as params}]
(let [complete-opts (merge {:sync? true} opts)
backing (S3Bucket. (s3-client s3-spec) (:bucket s3-spec) (:store-id s3-spec))
config (merge {:opts complete-opts
:config {:sync-blob? true
:in-place? false
:lock-blob? true}
:default-serializer :FressianSerializer
:buffer-size (* 1024 1024)}
(dissoc params :opts :config))]
(connect-default-store backing config)))
(defn release
"Must be called after work on database has finished in order to close connection"
[store env]
(async+sync (:sync? env) *default-sync-translation*
(go-try- (.close ^S3Client (:client (:backing store))))))
(defn delete-store [s3-spec & {:keys [opts]}]
(let [complete-opts (merge {:sync? true} opts)
backing (S3Bucket. (s3-client s3-spec) (:bucket s3-spec) (:store-id s3-spec))]
(-delete-store backing complete-opts)))
(comment
(require '[konserve.core :as k])
(def s3-spec {:region "us-west-1"
:bucket "konserve-s3"
:store-id "test2"
:x-ray? true
:access-key "ACCESS_KEY"
:password "SECRET"})
(def test-client (s3-client s3-spec))
(delete-store s3-spec :opts {:sync? true})
(def store (connect-store s3-spec :opts {:sync? true}))
(time (k/assoc-in store ["foo"] {:foo "baz"} {:sync? true}))
(k/get-in store ["foo"] nil {:sync? true})
(k/exists? store "foo" {:sync? true})
(time (k/assoc-in store [:bar] 42 {:sync? true}))
(k/update-in store [:bar] inc {:sync? true})
(k/get-in store [:bar] nil {:sync? true})
(k/dissoc store :bar {:sync? true})
(k/append store :error-log {:type :horrible} {:sync? true})
(k/log store :error-log {:sync? true})
(k/keys store {:sync? true})
(k/bassoc store :binbar (byte-array (range 10)) {:sync? true})
(k/bget store :binbar (fn [{:keys [input-stream]}]
(map byte (slurp input-stream)))
{:sync? true})
(release store {:sync? true}))
(comment
(require '[konserve.core :as k])
(require '[clojure.core.async :refer [<!!]])
(<!! (delete-store s3-spec :opts {:sync? false}))
(def store (<!! (connect-store s3-spec :opts {:sync? false})))
(time (<!! (k/assoc-in store ["foo" :bar] {:foo "baz"} {:sync? false})))
(<!! (k/get-in store ["foo"] nil {:sync? false}))
(<!! (k/exists? store "foo" {:sync? false}))
(time (<!! (k/assoc-in store [:bar] 42 {:sync? false})))
(<!! (k/update-in store [:bar] inc {:sync? false}))
(<!! (k/get-in store [:bar] nil {:sync? false}))
(<!! (k/dissoc store :bar {:sync? false}))
(<!! (k/append store :error-log {:type :horrible} {:sync? false}))
(<!! (k/log store :error-log {:sync? false}))
(<!! (k/keys store {:sync? false}))
(<!! (k/bassoc store :binbar (byte-array (range 10)) {:sync? false}))
(<!! (k/bget store :binbar (fn [{:keys [input-stream]}]
(map byte (slurp input-stream)))
{:sync? false}))
(<!! (release store {:sync? false})))