/
s3.clj
198 lines (172 loc) · 6.55 KB
/
s3.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
;; This Source Code Form is subject to the terms of the Mozilla Public
;; License, v. 2.0. If a copy of the MPL was not distributed with this
;; file, You can obtain one at http://mozilla.org/MPL/2.0/.
;;
;; Copyright (c) UXBOX Labs SL
(ns app.storage.s3
"Storage backends abstraction layer."
(:require
[app.common.data :as d]
[app.common.exceptions :as ex]
[app.common.spec :as us]
[app.common.uri :as u]
[app.storage.impl :as impl]
[app.util.time :as dt]
[clojure.java.io :as io]
[clojure.spec.alpha :as s]
[integrant.core :as ig])
(:import
java.time.Duration
java.util.Collection
software.amazon.awssdk.core.sync.RequestBody
software.amazon.awssdk.regions.Region
software.amazon.awssdk.services.s3.S3Client
software.amazon.awssdk.services.s3.model.Delete
software.amazon.awssdk.services.s3.model.CopyObjectRequest
software.amazon.awssdk.services.s3.model.DeleteObjectsRequest
software.amazon.awssdk.services.s3.model.DeleteObjectsResponse
software.amazon.awssdk.services.s3.model.GetObjectRequest
software.amazon.awssdk.services.s3.model.ObjectIdentifier
software.amazon.awssdk.services.s3.model.PutObjectRequest
software.amazon.awssdk.services.s3.presigner.S3Presigner
software.amazon.awssdk.services.s3.presigner.model.GetObjectPresignRequest
software.amazon.awssdk.services.s3.presigner.model.PresignedGetObjectRequest))
(declare put-object)
(declare copy-object)
(declare get-object)
(declare get-object-url)
(declare del-object-in-bulk)
(declare build-s3-client)
(declare build-s3-presigner)
;; --- BACKEND INIT
(s/def ::region #{:eu-central-1})
(s/def ::bucket ::us/string)
(s/def ::prefix ::us/string)
(defmethod ig/pre-init-spec ::backend [_]
(s/keys :opt-un [::region ::bucket ::prefix]))
(defmethod ig/prep-key ::backend
[_ {:keys [prefix] :as cfg}]
(cond-> (d/without-nils cfg)
prefix (assoc :prefix prefix)))
(defmethod ig/init-key ::backend
[_ cfg]
;; Return a valid backend data structure only if all optional
;; parameters are provided.
(when (and (contains? cfg :region)
(string? (:bucket cfg)))
(let [client (build-s3-client cfg)
presigner (build-s3-presigner cfg)]
(assoc cfg
:client client
:presigner presigner
:type :s3))))
(s/def ::type ::us/keyword)
(s/def ::client #(instance? S3Client %))
(s/def ::presigner #(instance? S3Presigner %))
(s/def ::backend
(s/keys :req-un [::region ::bucket ::client ::type ::presigner]
:opt-un [::prefix]))
;; --- API IMPL
(defmethod impl/put-object :s3
[backend object content]
(put-object backend object content))
(defmethod impl/copy-object :s3
[backend src-object dst-object]
(copy-object backend src-object dst-object))
(defmethod impl/get-object-data :s3
[backend object]
(get-object backend object))
(defmethod impl/get-object-url :s3
[backend object options]
(get-object-url backend object options))
(defmethod impl/del-objects-in-bulk :s3
[backend ids]
(del-object-in-bulk backend ids))
;; --- HELPERS
(defn- ^Region lookup-region
[region]
(case region
:eu-central-1 Region/EU_CENTRAL_1))
(defn- build-s3-client
[{:keys [region]}]
(.. (S3Client/builder)
(region (lookup-region region))
(build)))
(defn- build-s3-presigner
[{:keys [region]}]
(.. (S3Presigner/builder)
(region (lookup-region region))
(build)))
(defn- put-object
[{:keys [client bucket prefix]} {:keys [id] :as object} content]
(let [path (str prefix (impl/id->path id))
mdata (meta object)
mtype (:content-type mdata "application/octet-stream")
request (.. (PutObjectRequest/builder)
(bucket bucket)
(contentType mtype)
(key path)
(build))
content (RequestBody/fromInputStream (io/input-stream content)
(count content))]
(.putObject ^S3Client client
^PutObjectRequest request
^RequestBody content)))
(defn- copy-object
[{:keys [client bucket prefix]} src-object dst-object]
(let [source-path (str prefix (impl/id->path (:id src-object)))
source-mdata (meta src-object)
source-mtype (:content-type source-mdata "application/octet-stream")
dest-path (str prefix (impl/id->path (:id dst-object)))
request (.. (CopyObjectRequest/builder)
(copySource (u/query-encode (str bucket "/" source-path)))
(destinationBucket bucket)
(destinationKey dest-path)
(contentType source-mtype)
(build))]
(.copyObject ^S3Client client
^CopyObjectRequest request)))
(defn- get-object
[{:keys [client bucket prefix]} {:keys [id]}]
(let [gor (.. (GetObjectRequest/builder)
(bucket bucket)
(key (str prefix (impl/id->path id)))
(build))
obj (.getObject ^S3Client client ^GetObjectRequest gor)]
(io/input-stream obj)))
(def default-max-age
(dt/duration {:minutes 10}))
(defn- get-object-url
[{:keys [presigner bucket prefix]} {:keys [id]} {:keys [max-age] :or {max-age default-max-age}}]
(us/assert dt/duration? max-age)
(let [gor (.. (GetObjectRequest/builder)
(bucket bucket)
(key (str prefix (impl/id->path id)))
(build))
gopr (.. (GetObjectPresignRequest/builder)
(signatureDuration ^Duration max-age)
(getObjectRequest ^GetObjectRequest gor)
(build))
pgor (.presignGetObject ^S3Presigner presigner ^GetObjectPresignRequest gopr)]
(u/uri (str (.url ^PresignedGetObjectRequest pgor)))))
(defn- del-object-in-bulk
[{:keys [bucket client prefix]} ids]
(let [oids (map (fn [id]
(.. (ObjectIdentifier/builder)
(key (str prefix (impl/id->path id)))
(build)))
ids)
delc (.. (Delete/builder)
(objects ^Collection oids)
(build))
dor (.. (DeleteObjectsRequest/builder)
(bucket bucket)
(delete ^Delete delc)
(build))
dres (.deleteObjects ^S3Client client
^DeleteObjectsRequest dor)]
(when (.hasErrors ^DeleteObjectsResponse dres)
(let [errors (seq (.errors ^DeleteObjectsResponse dres))]
(ex/raise :type :s3-error
:code :error-on-bulk-delete
:context errors)))))