/
s3_utils.clj
120 lines (110 loc) · 5.64 KB
/
s3_utils.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
(ns onyx.plugin.s3-utils
(:import [com.amazonaws.auth AWSStaticCredentialsProvider BasicAWSCredentials]
[com.amazonaws.handlers AsyncHandler]
[com.amazonaws.regions RegionUtils]
[com.amazonaws.client.builder AwsClientBuilder$EndpointConfiguration]
[com.amazonaws.event ProgressListener$ExceptionReporter]
[com.amazonaws.services.s3.transfer TransferManager Upload]
[com.amazonaws.services.s3 AmazonS3Client AmazonS3ClientBuilder]
[com.amazonaws.services.s3.model ListObjectsRequest S3Object S3ObjectSummary S3ObjectInputStream PutObjectRequest GetObjectRequest ObjectMetadata]
[com.amazonaws.services.s3.transfer.internal S3ProgressListener]
[com.amazonaws.event ProgressEventType]
[java.io ByteArrayInputStream InputStreamReader BufferedReader]
[org.apache.commons.codec.digest DigestUtils]
[java.util.concurrent ExecutorService Executors]
[org.apache.commons.codec.binary Base64]))
(defn new-client ^AmazonS3Client
[& {:keys [access-key secret-key region endpoint-url]}]
(if-let [builder (cond-> (AmazonS3ClientBuilder/standard)
access-key ^AmazonS3ClientBuilder (.withCredentials (AWSStaticCredentialsProvider. (BasicAWSCredentials. access-key secret-key)))
region ^AmazonS3ClientBuilder (.withRegion ^String region)
endpoint-url ^AmazonS3ClientBuilder (.withEndpointConfiguration (AwsClientBuilder$EndpointConfiguration. endpoint-url region)))]
(.build builder)))
(defn transfer-manager ^TransferManager
([^AmazonS3Client client]
(TransferManager. client))
([^AmazonS3Client client n-threads]
(if n-threads
(let [executor-service ^ExecutorService (Executors/newFixedThreadPool n-threads)]
(TransferManager. client executor-service))
(transfer-manager client))))
(defn upload [^TransferManager transfer-manager ^String bucket ^String key
^bytes serialized ^String content-type encryption]
(let [size (alength serialized)
md5 (String. (Base64/encodeBase64 (DigestUtils/md5 serialized)))
encryption-setting (case encryption
:aes256
(ObjectMetadata/AES_256_SERVER_SIDE_ENCRYPTION)
:none nil
(throw (ex-info "Unsupported encryption type."
{:encryption encryption})))
metadata (doto (ObjectMetadata.)
(.setContentLength size)
(.setContentMD5 md5))
_ (when content-type
(.setContentType metadata content-type))
_ (when encryption-setting
(.setSSEAlgorithm metadata encryption-setting))
put-request (PutObjectRequest. bucket
key
(ByteArrayInputStream. serialized)
metadata)
upload ^Upload (.upload transfer-manager put-request)]
upload))
(defn upload-synchronous [^AmazonS3Client client ^String bucket ^String k ^bytes serialized]
(let [size (alength serialized)
md5 (String. (Base64/encodeBase64 (DigestUtils/md5 serialized)))
metadata (doto (ObjectMetadata.)
(.setContentMD5 md5)
(.setContentLength size))]
(.putObject client
bucket
k
(ByteArrayInputStream. serialized)
metadata)))
(defn s3-object ^S3Object
[^AmazonS3Client client ^String bucket ^String k & [start-range]]
(let [object-request (GetObjectRequest. bucket k)
_ (when start-range
(.setRange object-request start-range))]
(.getObject client object-request)))
(defn s3-object ^S3Object
[^AmazonS3Client client ^String bucket ^String k & [start-range]]
(let [object-request (GetObjectRequest. bucket k)
_ (when start-range
(.setRange object-request start-range))]
(.getObject client object-request)))
(defn list-keys [^AmazonS3Client client ^String bucket ^String prefix]
(let [req (doto (ListObjectsRequest.)
(.setBucketName bucket)
(.setPrefix prefix))]
(loop [listing (.listObjects client req) ks []]
(let [new-ks (into ks
(map (fn [^S3ObjectSummary s] (.getKey s))
(.getObjectSummaries listing)))]
(if (.isTruncated listing)
(do (.setMarker req (.getNextMarker listing))
(recur (.listObjects client req) new-ks))
new-ks)))))
(defn read-object [deserializer-fn ^AmazonS3Client client ^String bucket ^String k]
(let [object (.getObject client bucket k)
metadata (.getObjectMetadata object)
length (.getContentLength metadata)
sse (.getSSEAlgorithm metadata)
bs (byte-array length)
content ^S3ObjectInputStream (.getObjectContent object)]
(deserializer-fn (slurp (clojure.java.io/reader content)))))
(defn get-bucket-keys
([^AmazonS3Client client ^String bucket]
(map #(.getKey ^S3ObjectSummary %)
(.getObjectSummaries (.listObjects client bucket))))
([^AmazonS3Client client ^String bucket ^String prefix]
(map #(.getKey ^S3ObjectSummary %)
(.getObjectSummaries (.listObjects client bucket prefix)))))
(defn retrieve-s3-results
([client bucket deserializer-fn]
(let [ks (get-bucket-keys client bucket)]
(mapcat (partial read-object deserializer-fn client bucket) ks)))
([client bucket deserializer-fn prefix]
(let [ks (get-bucket-keys client bucket prefix)]
(mapcat (partial read-object deserializer-fn client bucket) ks))))