This repository has been archived by the owner on Mar 22, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 17
/
syncer.clj
327 lines (308 loc) · 18.8 KB
/
syncer.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
;;
;; Copyright (c) Two Sigma Open Source, LLC
;;
;; Licensed under the Apache License, Version 2.0 (the "License");
;; you may not use this file except in compliance with the License.
;; You may obtain a copy of the License at
;;
;; http://www.apache.org/licenses/LICENSE-2.0
;;
;; Unless required by applicable law or agreed to in writing, software
;; distributed under the License is distributed on an "AS IS" BASIS,
;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
;; See the License for the specific language governing permissions and
;; limitations under the License.
;;
(ns token-syncer.commands.syncer
(:require [clojure.pprint :as pp]
[clojure.set :as set]
[clojure.string :as str]
[clojure.tools.logging :as log]
[plumbing.core :as pc]
[token-syncer.utils :as utils]))
(defn retrieve-token->url->token-data
"Given collections of cluster urls and tokens, retrieve the token description on each cluster.
The resulting data structure is a map as follows: token->cluster->token-data where the
token-data format is defined by the return value of load-token."
[{:keys [load-token]} cluster-urls all-tokens]
(pc/map-from-keys
(fn [token]
(pc/map-from-keys
(fn [cluster-url]
(load-token cluster-url token))
cluster-urls))
all-tokens))
(defn retrieve-token->latest-description
"Given token->cluster-url->token-data, retrieves the latest token description (based on last-update-time)
for each token among all the clusters. The cluster urls are sorted to ensure deterministic outputs."
[token->cluster-url->token-data]
(->> token->cluster-url->token-data
(pc/map-vals
(fn [cluster-url->token-data]
(when (seq cluster-url->token-data)
(let [[cluster-url token-data]
(apply max-key #(-> % val (get-in [:description "last-update-time"] 0))
(-> cluster-url->token-data sort reverse))]
{:cluster-url cluster-url
:description (:description token-data)
:token-etag (:token-etag token-data)}))))
(pc/map-vals (fnil identity {}))))
(defn hard-delete-token-on-all-clusters
"Hard-deletes a given token on all clusters."
[{:keys [hard-delete-token]} cluster-urls token token-etag]
(log/info "hard-delete" token "on clusters" cluster-urls)
(reduce
(fn [cluster-sync-result cluster-url]
(->> (try
(let [{:keys [headers status] :as response} (hard-delete-token cluster-url token token-etag)]
{:code (if (utils/successful? response)
:success/hard-delete
:error/hard-delete)
:details (cond-> {:status status}
(utils/successful? response) (assoc :etag (get headers "etag")))})
(catch Exception ex
(log/error ex "unable to delete" token "on" cluster-url)
{:code :error/hard-delete
:details {:message (.getMessage ex)}}))
(assoc cluster-sync-result cluster-url)))
{}
cluster-urls))
(defn sync-token-on-clusters
"Syncs a given token description on all clusters.
If the cluster-url->token-data says that a given token was not successfully loaded, it is skipped.
Token sync-ing is also skipped if the tokens are active and the roots are different."
[{:keys [store-token]} cluster-urls token latest-token-description opt-out-metadata-name cluster-url->token-data]
(pc/map-from-keys
(fn [cluster-url]
(let [;; don't include "deleted" system metadata key, this must be considered for determining root-mismatch
ignored-root-mismatch-equality-comparison-keys ["cluster" "last-update-time" "last-update-user" "previous" "root"]
cluster-result
(try
(let [{:keys [description error status] :as token-data} (get cluster-url->token-data cluster-url)
{latest-root "root" latest-update-user "last-update-user"} latest-token-description
{cluster-root "root" cluster-update-user "last-update-user"} description
{:keys [code] :as result}
(cond
error
{:code :error/token-read
:details {:message (.getMessage error)}}
(nil? status)
{:code :error/token-read
:details {:message "status missing from response"}}
(nil? latest-root)
{:code :error/token-read
:details {:message "token root missing from latest token description"}}
(and latest-root
(= latest-token-description description))
{:code :success/token-match}
;; deleted on both clusters, irrespective of remaining values
(and (get latest-token-description "deleted")
(get description "deleted"))
{:code :error/tokens-deleted
:details {:message "soft-deleted tokens should have already been hard-deleted"}}
(and (some? opt-out-metadata-name)
;; opt-out of syncing if any of the descriptions signal opt-out intent
(or (= "true" (get-in latest-token-description ["metadata" opt-out-metadata-name]))
(= "true" (get-in description ["metadata" opt-out-metadata-name]))))
{:code :success/skip-opt-out}
;; token user-specified content (including "deleted") is different, and the last update user or root is different
(and (seq description)
(not= latest-root cluster-root)
(not= latest-update-user cluster-update-user)
(not= (apply dissoc description ignored-root-mismatch-equality-comparison-keys)
(apply dissoc latest-token-description ignored-root-mismatch-equality-comparison-keys)))
{:code :error/root-mismatch
:details {:cluster description
:latest latest-token-description}}
(not= latest-token-description description)
(do
;; log case where there is a root mismatch or editor mismatch but the user editable fields are the same
(when (and (or (not= latest-root cluster-root)
(not= latest-update-user cluster-update-user))
(= (apply dissoc latest-token-description ignored-root-mismatch-equality-comparison-keys)
(apply dissoc description ignored-root-mismatch-equality-comparison-keys)))
(log/info "syncing token with mismatched editor or root" {:current-token-description description
:latest-token-description latest-token-description}))
;; attempt to update the current token with the latest-token-description
(let [token-etag (:token-etag token-data)
{:keys [headers status] :as response} (store-token cluster-url token token-etag latest-token-description)]
{:code (if (get latest-token-description "deleted")
(if (utils/successful? response) :success/soft-delete :error/soft-delete)
(if (utils/successful? response) :success/sync-update :error/sync-update))
:details (cond-> {:status status}
(utils/successful? response) (assoc :etag (get headers "etag")))}))
:else
{:code :success/token-match})]
;; log full token descriptions when there was a syncing error detected
(when (= (namespace code) "error")
(log/error "error when syncing token descriptions: " {:code code
:current-token-description description
:latest-token-description latest-token-description}))
result)
(catch Exception ex
(log/error ex "unable to sync token on" cluster-url)
{:code :error/token-sync
:details {:message (.getMessage ex)}}))]
(log/info cluster-url "sync result is" cluster-result)
cluster-result))
cluster-urls))
(defn- perform-token-syncs
"Perform token syncs for all the specified tokens."
[waiter-api cluster-urls all-tokens opt-out-metadata-name]
(let [token->url->token-data (retrieve-token->url->token-data waiter-api cluster-urls all-tokens)
token->latest-description (retrieve-token->latest-description token->url->token-data)]
(pc/map-from-keys
(fn [token]
(log/info "syncing token:" token)
(let [{:keys [cluster-url description]} (token->latest-description token)
token-etag (get-in token->url->token-data [token cluster-url :token-etag])
remaining-cluster-urls (disj cluster-urls cluster-url)
all-soft-deleted (every? (fn soft-delete-pred [[_ token-data]]
(get-in token-data [:description "deleted"]))
(token->url->token-data token))]
(log/info "syncing" token "with token description from" cluster-url {:all-soft-deleted all-soft-deleted})
(let [sync-result (if all-soft-deleted
(hard-delete-token-on-all-clusters waiter-api cluster-urls token token-etag)
(sync-token-on-clusters waiter-api remaining-cluster-urls token description opt-out-metadata-name
(token->url->token-data token)))]
{:latest (token->latest-description token)
:sync-result sync-result})))
all-tokens)))
(defn load-and-classify-tokens
"Retrieves a summary of all tokens available in the clusters as a map containing the following keys:
:all-tokens, :pending-tokens and :synced-tokens.
Synced tokens have the same non-nil etag value and deleted=false from load-token-list on each cluster.
Pending tokens are un-synced tokens present in some cluster."
[load-token-list cluster-urls-set]
(let [cluster-url->index-entries (pc/map-from-keys #(load-token-list %) cluster-urls-set)
cluster-url->token->index-entries (pc/map-vals
(fn [index-entries]
(pc/map-from-vals #(get % "token") index-entries))
cluster-url->index-entries)
all-tokens (->> cluster-url->index-entries
vals
(mapcat identity)
(map #(get % "token"))
(remove nil?)
set)
synced-tokens (->> all-tokens
(filter
(fn [token]
(let [cluster-data (map
(fn [cluster-url]
(-> cluster-url->token->index-entries
(get cluster-url)
(get token)
(select-keys ["deleted" "etag"])))
cluster-urls-set)
cluster-deleted (map #(get % "deleted") cluster-data)
cluster-etags (map #(get % "etag") cluster-data)
already-synced? (and (every? false? cluster-deleted)
(not-any? nil? cluster-etags)
(= 1 (-> cluster-etags set count)))]
(if already-synced?
(log/info token "already synced across clusters with etag" (first cluster-etags))
(log/info token "not synced across clusters" {:deleted cluster-deleted :etags cluster-etags}))
already-synced?)))
set)]
(log/info "found" (count all-tokens) "across the clusters," (count synced-tokens) "previously synced")
{:all-tokens all-tokens
:pending-tokens (set/difference all-tokens synced-tokens)
:synced-tokens synced-tokens}))
(defn summarize-sync-result
"Summarizes the token sync result.
The summary includes the tokens that were unmodified, successfully synced, and failed to sync.
Tokens that were already synced show up in previously synced entry of the summary.
The summary also includes counts of the total number of tokens that were processed."
[{:keys [all-tokens already-synced-tokens pending-tokens selected-tokens]} token-sync-result]
(let [filter-tokens (fn [filter-fn]
(->> token-sync-result
keys
(filter
(fn [token]
(every? filter-fn (-> token-sync-result (get token) :sync-result vals))))
(into (sorted-set))))
unmodified-filter-fn (fn [result] (-> result :code (= :success/token-match)))
unmodified-tokens (filter-tokens unmodified-filter-fn)
updated-filter-fn (fn [result] (-> result :code namespace (= "success")))
updated-tokens (into (sorted-set)
(-> (filter-tokens updated-filter-fn)
(set/difference unmodified-tokens)))
failed-tokens (into (sorted-set)
(-> token-sync-result
keys
set
(set/difference unmodified-tokens updated-tokens)))
tokens->value-count (fn [tokens] {:count (count tokens)
:value (->> tokens set (into (sorted-set)))})]
{:sync {:failed failed-tokens
:unmodified unmodified-tokens
:updated updated-tokens}
:tokens {:pending (tokens->value-count pending-tokens)
:previously-synced (tokens->value-count already-synced-tokens)
:processed (tokens->value-count (keys token-sync-result))
:selected (tokens->value-count selected-tokens)
:total (tokens->value-count all-tokens)}}))
(defn sync-tokens
"Syncs tokens across provided clusters based on cluster-urls and returns the result of token syncing.
Throws an exception if there was an error during token syncing."
[{:keys [load-token-list] :as waiter-api} cluster-urls limit opt-out-metadata-name]
(try
(log/info "syncing tokens on clusters:" cluster-urls)
(let [cluster-urls-set (set cluster-urls)
{:keys [all-tokens pending-tokens synced-tokens]} (load-and-classify-tokens load-token-list cluster-urls-set)
selected-tokens (->> (sort pending-tokens)
(take limit))
token-sync-result (perform-token-syncs waiter-api cluster-urls-set selected-tokens opt-out-metadata-name)]
(log/info "completed syncing tokens (limited to " (min limit (count pending-tokens)) "tokens)")
{:details token-sync-result
:summary (-> {:all-tokens all-tokens
:already-synced-tokens synced-tokens
:pending-tokens pending-tokens
:selected-tokens selected-tokens}
(summarize-sync-result token-sync-result))})
(catch Throwable th
(log/error th "unable to sync tokens")
(throw th))))
(defn valid-metadata-name?
"Validates that a non-nil input passes as a vlaid metadata name:
- contains fewer than 100 characters; and
- made up of lower-case letters, numbers, and hyphens and must start with a letter."
[metadata-name]
(or (nil? metadata-name)
(and (< (count metadata-name) 100)
(some? (re-matches #"^[a-z][a-z0-9\\-]*$" metadata-name)))))
(def sync-clusters-config
{:execute-command (fn execute-sync-clusters-command
[{:keys [waiter-api]} {:keys [options]} arguments]
(let [{:keys [limit opt-out-metadata-name]} options
cluster-urls-set (set arguments)]
(cond
(<= (-> cluster-urls-set set count) 1)
{:exit-code 1
:message (str "at least two different cluster urls required, provided: " (vec arguments))}
:else
(let [{:keys [details summary] :as sync-result} (sync-tokens waiter-api cluster-urls-set limit opt-out-metadata-name)
exit-code (-> (get-in sync-result [:summary :sync :failed])
empty?
(if 0 1))]
(log/info "sync details:" (into (sorted-map) details))
(log/info "sync summary:" (into (sorted-map) summary))
{:exit-code exit-code
:message (str "exiting with code " exit-code)}))))
:option-specs [["-l" "--limit LIMIT" "The maximum number of tokens to attempt to sync, must be between 1 and 10000"
:default 1000
:parse-fn #(Integer/parseInt %)
:validate [#(< 0 % 10001) "Must be between 1 and 10000"]]
["-m" "--opt-out-metadata-name NAME"
(str "The metadata field that must be configured to 'true' on the latest description to "
"trigger opting out of syncing, default is all tokens opt in to syncing")
:default nil
:parse-fn str
:validate [valid-metadata-name?
(str "Must be made up of lower-case letters, numbers, and hyphens and must start with a letter. "
"It must also be no more than 100 characters.")]]]
:retrieve-documentation (fn retrieve-sync-clusters-documentation
[command-name _]
{:description (str "Syncs tokens across (at least two) Waiter clusters specified in the URL(s)")
:usage (str command-name " [OPTION]... URL URL...")})})