/
core.clj
205 lines (164 loc) · 6.49 KB
/
core.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
(ns konserve-carmine.core
(:require [konserve.serializers :as ser]
[hasch.core :refer [uuid]]
[clojure.core.async :as async
:refer [<!! chan go close! put!]]
[konserve.protocols :refer [PEDNAsyncKeyValueStore
-exists? -get-in -update-in -dissoc
PBinaryAsyncKeyValueStore -bget -bassoc
-serialize -deserialize]]
[taoensso.carmine :as car])
(:import [java.io ByteArrayInputStream ByteArrayOutputStream]))
;; TODO document how redis guarantees fsync in order of messages (never loses
;; intermediary writes) on a single peer
(defrecord CarmineStore [conn serializer read-handlers write-handlers locks]
PEDNAsyncKeyValueStore
(-exists? [this key]
(let [fn (str (uuid key))
res (chan)]
(put! res (= (car/wcar conn (car/exists fn)) 1))
(close! res)
res))
(-get-in [this key-vec]
(let [[fkey & rkey] key-vec
id (str (uuid fkey))]
(if-not (= (car/wcar conn (car/exists id)) 1)
(go nil)
(let [res-ch (chan)]
(try
(let [bais (ByteArrayInputStream. (car/wcar conn (car/parse-raw (car/get id))))]
(when-let [res (get-in
(second (-deserialize serializer read-handlers bais))
rkey)]
(put! res-ch res)))
res-ch
(catch Exception e
(put! res-ch (ex-info "Could not read key."
{:type :read-error
:key fkey
:exception e}))
res-ch)
(finally
(close! res-ch)))))))
(-update-in [this key-vec up-fn] (-update-in this key-vec up-fn []))
(-update-in [this key-vec up-fn args]
(let [[fkey & rkey] key-vec
id (str (uuid fkey))
res-ch (chan)]
(try
(let [old-bin (car/wcar conn (car/parse-raw (car/get id)))
old (when old-bin
(let [bais (ByteArrayInputStream. (car/wcar conn (car/parse-raw (car/get id))))]
(second (-deserialize serializer write-handlers bais))))
new (if (empty? rkey)
(apply up-fn old args)
(apply update-in old rkey up-fn args))]
(println "old new" old new)
(let [baos (ByteArrayOutputStream.)]
(-serialize serializer baos write-handlers [key-vec new])
(car/wcar conn (car/set id (car/raw (.toByteArray baos)))))
(put! res-ch [(get-in old rkey)
(get-in new rkey)]))
res-ch
(catch Exception e
(put! res-ch (ex-info "Could not write key."
{:type :write-error
:key fkey
:exception e}))
res-ch)
(finally
(close! res-ch)))))
(-assoc-in [this key-vec val] (-update-in this key-vec (fn [_] val)))
(-dissoc [this key]
(let [id (str (uuid key))
res-ch (chan)]
(try
(car/wcar conn (car/del id))
(catch Exception e
(put! res-ch (ex-info "Could not delete key."
{:type :write-error
:key key
:exception e})))
(finally
(close! res-ch)))
res-ch))
PBinaryAsyncKeyValueStore
(-bget [this key locked-cb]
(let [id (uuid key)]
(if-not (= (car/wcar conn (car/exists id)) 1)
(go nil)
(go
(try
(let [bin (car/wcar conn (car/parse-raw (car/get id)))
bais (ByteArrayInputStream. bin)]
(locked-cb {:input-stream bais
:size (count bin)}))
(catch Exception e
(ex-info "Could not read key."
{:type :read-error
:key key
:exception e})))))))
(-bassoc [this key input]
(let [id (uuid key)]
(go
(try
(car/wcar conn (car/set id (car/raw input)))
nil
(catch Exception e
(ex-info "Could not write key."
{:type :write-error
:key key
:exception e})))))))
(defn new-carmine-store
([]
(new-carmine-store {:pool {} :spec {}}))
([carmine-conn & {:keys [serializer read-handlers write-handlers]
:or {serializer (ser/fressian-serializer)
read-handlers (atom {})
write-handlers (atom {})}}]
(go (map->CarmineStore {:conn carmine-conn
:read-handlers read-handlers
:write-handlers write-handlers
:serializer serializer
:locks (atom {})}))))
(comment
(def store (<!! (new-carmine-store)))
(let [numbers (doall (range 1024))]
(time
(doseq [i (range 1000)]
(<!! (-update-in store [i] (fn [_] numbers))))))
(<!! (-get-in store [100]))
(drop 2 (byte-array [1 2 3]))
(<!! (-exists? store "bars"))
(<!! (-update-in store ["bars"] (fn [_] 1)))
(<!! (-update-in store ["bars"] inc))
(<!! (-get-in store ["bars"]))
(<!! (-bassoc store "bbar" (byte-array (range 5))))
(<!! (-bget store "bbar" (fn [{:keys [input-stream]}]
(map byte (slurp input-stream)))))
; See `wcar` docstring for opts
(def conn {:pool {} :spec {}})
(defmacro wcar* [& body] `(car/wcar server1-conn ~@body))
(map byte (car/wcar conn (car/parse-raw (car/get "foo"))))
(98 97 114)
(car/wcar conn (car/parse-raw (car/set "foo" (byte-array [1 2 3]))))
(wcar* (car/ping))
(wcar*
(car/ping)
(car/set "foo" "bar")
(car/get "foo"))
(wcar* (car/set "clj-key" {:bigint (bigint 31415926535897932384626433832795)
:vec (vec (range 5))
:set #{true false :a :b :c :d}
:bytes (byte-array 5)
;; ...
})
(car/get "clj-key"))
(wcar*
(car/swap "clj-key" (fn [old nx?]
(let [new (assoc old :foo :bar)]
(println old)
[new [old new]]))))
(wcar*
(car/exists "foo"))
)