/
cache.clj
188 lines (168 loc) · 7.98 KB
/
cache.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
(ns tech.io.cache
"Small caching layer to handle persistent, ever growing file caching"
(:require [clojure.java.io :as io]
[tech.io.protocols :as io-prot]
[tech.io.url :as url])
(:import [java.io InputStream OutputStream]
[java.util Date]))
(defn- safe-doall-streams
"Make best effort to do this across all streams"
[op streams]
(->> streams
(map #(try (op %)
nil
(catch Throwable e e)))
(remove nil?)
;;Force everything here
vec
;;throw at first error here
(map #(throw %))
dorun))
(defn- combined-output-streams
^OutputStream [streams]
(proxy [OutputStream] []
(close []
(safe-doall-streams #(.close ^OutputStream %) streams))
(flush []
(safe-doall-streams #(.flush ^OutputStream %) streams))
(write
([b]
(if (bytes? b)
(safe-doall-streams #(.write ^OutputStream % ^bytes b) streams)
(safe-doall-streams #(.write ^OutputStream % ^int b) streams)))
([b off len]
(safe-doall-streams #(.write ^OutputStream % b off len) streams)))))
(defn- date-before?
[^Date d1 ^Date d2]
(< (.getTime d1)
(.getTime d2)))
(defn- maybe-cache-stream
[url-parts options cache-parts cache-options cache-provider src-provider]
(let [missing? (not (io-prot/exists? cache-provider cache-parts options))
src-modify (:modify-date (io-prot/metadata src-provider url-parts options))]
(when (or missing?
(if (::cache-check-metadata-on-read? cache-options)
;;If we can't get a modification date from the src provider then we should only
;;get the object if it is missing.
(and src-modify
(date-before? (:modify-date (io-prot/metadata cache-provider cache-parts cache-options))
src-modify))
true))
(io-prot/put-object! cache-provider cache-parts
(io-prot/get-object src-provider url-parts options)
cache-options))))
;;Provider built for static or append-only datasets. Very limited ability to handle
;;changing datasets. Also has no full-threshold; will write until cache-provider runs out of space.
(defrecord CacheProvider [url-parts->cache-parts-fn cache-provider src-provider default-options]
io-prot/IOProvider
(input-stream [provider url-parts options]
(let [cache-parts (url-parts->cache-parts-fn url-parts)
cache-options (merge default-options options)]
(maybe-cache-stream url-parts options cache-parts cache-options cache-provider src-provider)
(io-prot/input-stream cache-provider cache-parts options)))
(output-stream! [provider url-parts options]
(let [cache-parts (url-parts->cache-parts-fn url-parts)
^OutputStream cache-output-stream (io-prot/output-stream!
cache-provider
cache-parts
(merge default-options options))]
(if (::cache-write-through? (merge default-options options))
(let [src-output-stream (io-prot/output-stream!
src-provider
url-parts
options)]
(combined-output-streams [src-output-stream cache-output-stream]))
cache-output-stream)))
(exists? [provider url-parts options]
(let [cache-parts (url-parts->cache-parts-fn url-parts)
cache-options (merge default-options options)]
(or (io-prot/exists? cache-provider
cache-parts
cache-options)
(io-prot/exists? src-provider
url-parts
options))))
(ls [provider url-parts options]
(io-prot/ls src-provider
url-parts
options))
(delete! [provider url-parts options]
(->> [[src-provider url-parts options]
[cache-provider
(url-parts->cache-parts-fn url-parts)
(merge default-options options)]]
(safe-doall-streams (fn [[provider url-parts options]]
(io-prot/delete! provider
url-parts
options)))))
(metadata [provider url-parts options]
(or (io-prot/metadata cache-provider
(url-parts->cache-parts-fn url-parts)
(merge default-options options))
(io-prot/metadata src-provider url-parts options)))
io-prot/ICopyObject
(get-object [provider url-parts options]
(let [cache-parts (url-parts->cache-parts-fn url-parts)
cache-options (merge default-options options)]
(maybe-cache-stream url-parts options cache-parts cache-options cache-provider src-provider)
(io-prot/get-object cache-provider cache-parts cache-options)))
(put-object! [provider url-parts value options]
(when (::cache-write-through? (merge default-options options))
(io-prot/put-object! src-provider url-parts value options))
(io-prot/put-object! cache-provider
(url-parts->cache-parts-fn url-parts)
value
(merge default-options options)))
io-prot/IUrlRedirect
(url->redirect-url [provider url]
(-> url
url/url->parts
url-parts->cache-parts-fn
url/parts->url)))
;;Generically forward everything to wherever the url points.
(defrecord ForwardingProvider [url-parts->provider default-options]
io-prot/IOProvider
(input-stream [provider url-parts options]
(io-prot/input-stream (url-parts->provider url-parts) url-parts (merge default-options options)))
(output-stream! [provider url-parts options]
(io-prot/output-stream! (url-parts->provider url-parts) url-parts (merge default-options options)))
(exists? [provider url-parts options]
(io-prot/exists? (url-parts->provider url-parts) url-parts (merge default-options options)))
(ls [provider url-parts options]
(io-prot/ls (url-parts->provider url-parts) url-parts (merge default-options options)))
(delete! [provider url-parts options]
(io-prot/delete! (url-parts->provider url-parts) url-parts (merge default-options options)))
(metadata [provider url-parts options]
(io-prot/metadata (url-parts->provider url-parts) url-parts (merge default-options options)))
io-prot/ICopyObject
(get-object [provider url-parts options]
(io-prot/get-object (url-parts->provider url-parts) url-parts
(merge default-options options)))
(put-object! [provider url-parts value options]
(io-prot/put-object! (url-parts->provider url-parts) url-parts value
(merge default-options options))))
(defn forwarding-provider
[& {:keys [url-parts->provider]
:or {url-parts->provider io-prot/url-parts->provider}
:as options}]
(->ForwardingProvider url-parts->provider (dissoc options :url-parts->provider)))
(defn url-parts->file-cache
[cache-dir url-parts]
(let [target-fname (str "file://" cache-dir
"/" (name (:protocol url-parts))
"/" (url/string-seq->file-path (:path url-parts)))]
(url/url->parts target-fname)))
(defn create-file-cache
[cache-dir {:keys [src-provider
::cache-check-metadata-on-read?
::cache-write-through?]
:or {cache-check-metadata-on-read? true
cache-write-through? true}
:as cache-options}]
(let [url-parts->cache-parts (partial url-parts->file-cache cache-dir)]
(->CacheProvider url-parts->cache-parts
(forwarding-provider)
(or src-provider (forwarding-provider))
(merge cache-options
{::cache-check-metadata-on-read? cache-check-metadata-on-read?
::cache-write-through? cache-write-through?}))))