/
s3transfer.clj
106 lines (93 loc) · 3.78 KB
/
s3transfer.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
(ns amazonica.aws.s3transfer
(:require [amazonica.core :refer [IMarshall marshall coerce-value stack->string]]
[amazonica.aws.s3])
(:import [com.amazonaws.event
ProgressEvent
ProgressEventType
ProgressListener]
[com.amazonaws.services.s3.transfer
Copy
Download
MultipleFileUpload
MultipleFileDownload
Upload
TransferManager
Transfer
TransferProgress]))
(defn- default-listener [transfer e]
(cond (= (:event e) :failed) (println ((:wait-for-exception transfer)))
(= (:event e) :completed) (println "Transfer complete.")))
(defn add-listener
[^Transfer obj]
(fn [f]
(let [listener (reify ProgressListener
(progressChanged [this event]
(f (marshall event))))]
(.addProgressListener obj listener)
listener)))
(defmacro wait
[transfer & body]
`(fn []
(.waitForCompletion ~transfer)
(do ~@body)))
(defn transfer
[^Transfer obj]
{:transfer obj
:add-progress-listener (add-listener obj)
:get-description (wait obj (.getDescription obj))
:get-progress #(marshall (.getProgress obj))
:get-state #(str (.getState obj))
:is-done #(.isDone obj)
:abort #(if (instance? Download obj) (.abort ^Download obj) (.abort ^Upload obj))
:pause #(if (instance? Download obj) (.pause ^Download obj) (.pause ^Upload obj))
:remove-progress-listener #(.removeProgressListener obj ^ProgressListener %)
:wait-for-completion #(.waitForCompletion obj)
:wait-for-exception #(stack->string (.waitForException obj))})
(extend-protocol IMarshall
TransferProgress
(marshall [obj]
{:total-bytes-to-transfer (.getTotalBytesToTransfer obj)
:bytes-transferred (.getBytesTransferred obj)
:percent-transferred (.getPercentTransferred obj)})
Upload
(marshall [obj]
(let [t (transfer obj)]
(merge (transfer obj)
{:try-pause #(.tryPause obj %)
:upload-result #(marshall (.waitForUploadResult obj))})))
Copy
(marshall [obj]
(let [t (transfer obj)]
(merge t {:copy-result #(marshall (.waitForCopyResult obj))})))
Download
(marshall [obj]
(let [t (transfer obj)]
(merge t {:key (wait obj (.getKey obj))
:bucket-name (wait obj (.getBucketName obj))
:object-metadata (wait obj (marshall (.getObjectMetadata obj)))})))
MultipleFileUpload
(marshall [obj]
(merge (transfer obj)
{:bucket-name #(.getBucketName obj)
:key-prefix #(.getKeyPrefix obj)}))
MultipleFileDownload
(marshall [obj]
(merge (transfer obj)
{:abort #(.abort obj)
:bucket-name #(.getBucketName obj)
:key-prefix #(.getKeyPrefix obj)}))
ProgressEvent
(marshall [obj]
{:bytes-transferred (.getBytesTransferred obj)
:event (condp = (.getEventType obj)
ProgressEventType/TRANSFER_STARTED_EVENT :started
ProgressEventType/TRANSFER_COMPLETED_EVENT :completed
ProgressEventType/TRANSFER_FAILED_EVENT :failed
ProgressEventType/TRANSFER_CANCELED_EVENT :cancelled
ProgressEventType/TRANSFER_PREPARING_EVENT :preparing
ProgressEventType/REQUEST_BYTE_TRANSFER_EVENT :transfered
ProgressEventType/TRANSFER_PART_STARTED_EVENT :part-started
ProgressEventType/TRANSFER_PART_COMPLETED_EVENT :part-completed
ProgressEventType/TRANSFER_PART_FAILED_EVENT :part-failed
nil)}))
(amazonica.core/set-client TransferManager *ns*)