-
Notifications
You must be signed in to change notification settings - Fork 11
/
core.clj
192 lines (171 loc) · 6.36 KB
/
core.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
(ns envoy.core
(:require [cheshire.core :as json]
[clojure.data :refer [diff]]
[clojure.core.async :refer [go-loop go <! >! >!! alt! chan]]
[org.httpkit.client :as http]
[envoy.tools :as tools]
[clojure.string :as string])
(:import [java.util Base64]))
(defn- recurse [path]
(str path "?recurse"))
(defn- index-of [resp]
(-> resp
:headers
:x-consul-index))
(defn- with-ops [ops]
{:query-params (tools/remove-nils ops)})
(defn- read-index
([path]
(read-index path {}))
([path ops]
(-> (http/get path (with-ops ops))
index-of)))
(defn- fromBase64 [^String s]
(String. (.decode (Base64/getDecoder) s)))
(defn- read-values
([resp]
(read-values resp true))
([{:keys [body error status] :as resp} to-keys?]
(if (or error (not= status 200))
(throw (ex-info "failed to read from consul" (select-keys resp [:status :error])))
(into {}
(for [{:keys [Key Value]} (json/parse-string body true)]
[(if to-keys? (keyword Key) Key)
(when Value (fromBase64 Value))])))))
(defn- find-consul-node [hosts]
(let [at (atom -1)]
#(nth hosts (mod (swap! at inc)
(count hosts)))))
(defn url-builder
"Create an envoy kv-path builder"
[{:keys [hosts port secure?]
:or {hosts ["localhost"] port 8500 secure? false}
:as conf}]
(let [proto (if secure? "https://" "http://")
consul-node (find-consul-node hosts)]
(fn [& [path]]
(let [node (consul-node)]
(str proto node ":" port "/v1/kv" (when (seq path)
(str "/" (tools/clean-slash path))))))))
(defn put
([path v]
(put path v {}))
([path v ops]
;; (println "@(http/put" path (merge {:body v} (with-ops ops)))
(let [{:keys [status] :as resp} @(http/put path (merge {:body v}
(with-ops ops)))]
(when-not (= 200 status)
(throw (RuntimeException. (str "could not PUT to consul due to: " resp)))))))
(defn delete
([path]
(delete path {}))
([path ops]
@(http/delete (recurse path)
(with-ops ops))))
(defn get-all
([path]
(get-all path {}))
([path {:keys [keywordize?] :as ops
:or {keywordize? true}}]
(-> @(http/get (recurse (tools/with-slash path))
(with-ops (dissoc ops :keywordize?)))
(read-values keywordize?))))
(defn- start-watcher
([path fun stop?]
(start-watcher path fun stop? {}))
([path fun stop? ops]
(let [ch (chan)]
(go-loop [index nil current (get-all path)]
(http/get path
(with-ops (merge ops
{:index (or index (read-index path ops))}))
#(>!! ch %))
(alt!
stop? ([_]
(prn "stopping" path "watcher"))
ch ([resp]
(let [new-idx (index-of resp)
new-vs (read-values resp)]
(when (and index (not= new-idx index)) ;; first time there is no index
(when-let [changes (first (diff new-vs current))]
(fun changes)))
(recur new-idx new-vs))))))))
(defprotocol Stoppable
(stop [this]))
(deftype Watcher [ch]
Stoppable
(stop [_]
(>!! ch :done)))
(defn watch-path
([path fun]
(watch-path path fun {}))
([path fun ops]
(let [stop-ch (chan)]
(start-watcher (recurse path) fun stop-ch ops)
(Watcher. stop-ch))))
(defn consul->map
[path & [{:keys [serializer offset preserve-offset] :or {serializer :edn} :as ops}]]
(let [full-path (if offset
(tools/concat-with-slash path offset)
path)
consul-map (-> (partial get-all full-path (merge
(dissoc ops :serializer :offset :preserve-offset)
{:keywordize? false}))
(tools/props->map serializer))]
(if preserve-offset
consul-map
(get-in consul-map (tools/cpath->kpath offset)))))
(defn- overwrite-with
[kv-path m & [{:keys [serializer] :or {serializer :edn} :as ops}]]
(let [[consul-url sub-path] (string/split kv-path #"kv" 2)
update-kv-path (str consul-url "kv")
kpath (tools/cpath->kpath sub-path)
stored-map (reduce (fn [acc [k v]]
(merge acc (consul->map
(str kv-path "/" (name k))
{:serializer serializer})))
{} m)
;;to update correctly seq we need to pre-serialize map
[to-add to-remove _] (diff (tools/serialize-map m serializer)
(tools/serialize-map (get-in stored-map kpath) serializer))]
;;add
(doseq [[k v] (tools/map->props to-add serializer)]
(put (str kv-path "/" k) (str v) (dissoc ops :serializer :update)))
;;remove
(doseq [[k v] (tools/map->props to-remove serializer)]
(when (nil? (get-in to-add (tools/cpath->kpath k) nil))
@(http/delete (str kv-path "/" k))))))
(defn map->consul
[kv-path m & [{:keys [serializer overwrite?] :or {serializer :edn overwrite? false} :as ops}]]
(let [kv-path (tools/without-slash kv-path)]
(if-not overwrite?
(doseq [[k v] (tools/map->props m serializer)]
(put (str kv-path "/" k) (str v) (dissoc ops :serializer :update)))
(overwrite-with kv-path m ops))))
(defn copy
([path from to]
(copy path from to {}))
([path from to opts]
(let [data (consul->map path
(merge opts {:offset from}))
new-map (->> (tools/cpath->kpath to)
(tools/nest-map data))]
(map->consul path
new-map
opts))))
(defn move
([path from to]
(move path from to {}))
([path from to opts]
(let [dpath (str (tools/with-slash path)
(-> (tools/without-slash from {:slash :first})
(tools/with-slash)))]
(copy path from to opts)
(delete dpath opts))))
(defn merge-with-consul
([m path]
(merge-with-consul m path {}))
([m path ops]
(if-let [consul (consul->map path ops)]
(tools/merge-maps m consul)
m)))