-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathidempotence.clj
More file actions
390 lines (348 loc) · 14.1 KB
/
Copy pathidempotence.clj
File metadata and controls
390 lines (348 loc) · 14.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
(ns jepsen.tigerbeetle.workload.idempotence
"A workload which only checks for idempotent writes. We work a lot like the
general-purpose transfer workload, but intentionally repeat write operations,
both with identical values and with random ones. The checker verifies that
writes for an ID never succeed twice (a 'duplicate'), and that reads always
observe a single version of a given ID ('divergence')."
(:require [bifurcan-clj [core :as b]
[map :as bm]
[list :as bl]
[set :as bs]]
[clojure [datafy :refer [datafy]]
[pprint :refer [pprint]]]
[clojure.core.match :refer [match]]
[clojure.data.generators :as dg]
[clojure.tools.logging :refer [info warn]]
[dom-top.core :refer [letr loopr]]
[jepsen [checker :as checker]
[generator :as gen]
[history :as h]
[util :refer [timeout zipf zipf-default-skew
nil-if-empty]]]
[jepsen.tigerbeetle [core :refer :all]
[checker :as tigerbeetle.checker]
[lifecycle-map :as lm]]
[jepsen.tigerbeetle.workload [generator
:as tb-gen
:refer [final-gen
gen
zipf-nth]]
[client :refer [client]]]
[potemkin :refer [definterface+]]
[slingshot.slingshot :refer [try+ throw+]]
[tesser.core :as t]))
(defn poison
"Takes a nice healthy vector of account or transfer maps and replaces some of
them (with probability `p`) with values and/or IDs drawn from the given
Bifurcan list."
[events p memory]
(let [n (count events)]
(loop [i 0
events (transient events)]
(if (= i n)
(persistent! events)
(recur (inc i)
(if (and (< (dg/double) p)
(< 0 (b/size memory)))
; Replace. We can either swap just the ID, or the entire value.
(if-let [replacement (zipf-nth memory)]
(assoc! events i
(condp < (dg/double)
; Just replace ID+ledger. Why ledger? Because the
; underlying generator keeps track of accounts
; organized by ledger, and if we mess up that
; relationship it's going to get VERY confused.
1/2 (assoc (nth events i)
:id (:id replacement)
:ledger (:ledger replacement))
; Replace entire event
replacement))
; Nothing to replace with
events)
; Not replacing
events))))))
(defn remember
"Takes a Bifurcan list, a maximum size `max-size`, a probaility `p`, and a
vector of account or transfer maps. Saves some of those maps into the list,
with probability `p`, so we can repeat them later."
[memory max-size p events]
(loopr [memory (b/linear memory)]
[event events]
(recur
(cond ; Initial fill
(< (b/size memory) max-size)
(bl/add-last memory event)
; Replacement
(< (dg/double) p)
(bl/set memory (zipf max-size) event)
true
memory))
(b/forked memory)))
(defrecord Repeater
[gen ; The underlying TigerBeetle generator
max-size ; Maximum size of our memory pools
dup-p ; Probability we replace an account/transfer with a duplicate
accounts ; A Bifurcan list of accounts we can repeat
transfers] ; A Bifurcan list of transfers we can repeat
gen/Generator
(op [this test ctx]
; Ask the generator for an operation.
(let [[{:keys [f value] :as op} gen'] (gen/op gen test ctx)]
(cond ; Exhausted
(nil? op)
nil
; Pending
(identical? :pending op)
[op this]
; Create accounts
(identical? :create-accounts f)
(let [accounts (remember accounts max-size dup-p value)
op (assoc op :value (poison value dup-p accounts))]
[op (assoc this :gen gen' :accounts accounts)])
; Create transfers
(identical? :create-transfers f)
(let [accounts (remember transfers max-size dup-p value)
op (assoc op :value (poison value dup-p transfers))]
[op (assoc this :gen gen' :transfers transfers)])
; Some other f
true
[op (assoc this :gen gen')])))
(update [this test ctx event]
(assoc this :gen (gen/update gen test ctx event))))
(defn repeater
"Constructs a new generator that repeats writes, wrapping a standard transfer
generator."
[gen]
(Repeater. gen 1024 1/2 bl/empty bl/empty))
(defrecord UnsafeLM [gen]
gen/Generator
(op [this test ctx]
(binding [lm/*safe* false]
(when-let [[op gen'] (gen/op gen test ctx)]
[op (UnsafeLM. gen')])))
(update [this test ctx event]
(binding [lm/*safe* false]
(UnsafeLM. (gen/update gen test ctx event)))))
(defn wrap-gen
"Wraps the final and standard generator. We turn off LifecycleMap safety
checks because we'll be introducing duplicate events on purpose."
[gen]
(UnsafeLM. (tb-gen/wrap-gen gen)))
;; Checker
(defn merge-cat
"Merge Bifurcan maps with list concat"
[m1 m2]
(bm/merge m1 m2 bl/concat))
(defn merge-union
"Merge Bifurcan maps with set union"
[m1 m2]
(bm/merge m1 m2 bs/union))
(t/deftransform bmap-merge-cat
"A Tesser transform that merges maps of lists with list concat."
[]
(assert (nil? downstream))
{:reducer-identity (comp b/linear bm/map)
:reducer merge-cat
:post-reducer identity
:combiner-identity (comp b/linear bm/map)
:combiner merge-cat
:post-combiner b/forked})
(t/deftransform bmap-merge-union
"A Tesser transform that merges maps of sets with union."
[]
(assert (nil? downstream))
{:reducer-identity (comp b/linear bm/map)
:reducer merge-union
:post-reducer identity
:combiner-identity (comp b/linear bm/map)
:combiner merge-union
:post-combiner b/forked})
(defn writes-by-id
"Takes a history and constructs a Bifurcan map of ID -> [w1, w2, ...], where
each w is an event map (e.g. an Account or Transfer), augmented with the
following metadata:
:index The history index of the invocation of the write
:index' The history index of the completion of the write
:type The type of the completion--:ok, :info, or :fail
:result The result keyword (e.g. :ok, :credits-must-not-exceed-debits,
...), or nil if unknown."
[history]
(h/ensure-pair-index history)
(->> (t/filter h/invoke?)
(t/map (fn per-op [op]
(let [op' (h/completion history op)
index (:index op)
index' (:index op')
type (:type op')]
(bireduce (fn per-event [writes event result]
(bm/put writes
(:id event)
(bl/list (vary-meta event assoc
:index index
:index' index'
:type type
:result result))
bl/concat))
(b/linear bm/empty)
(:value op)
(:value op')))))
bmap-merge-cat
(h/tesser history)))
(defn immutable-part
"Projects a transfer or account map into its immutable part--e.g. without
:debits-pending etc."
[event]
; Detect type
(if (contains? event :amount)
; Transfers are all immutable, but we don't necessarily generate a :timeout
; field for them.
(if (:timeout event)
event
(assoc event :timeout 0))
; Accounts
(-> event
; Drop four derived fields
(dissoc :credits-pending :credits-posted :debits-pending :debits-posted)
; And the closed flag
(update :flags disj :closed))))
(defn immutable-reads
"Takes an OK read. Extracts a Bifurcan map of IDs to Bifurcan sets of
immutable values."
[{:keys [value] :as op'}]
(let [index' (:index op')]
(loopr [reads (b/linear bm/empty)]
[event value]
(let [event (vary-meta (immutable-part event)
assoc :index' index')]
(recur (bm/put reads
(:id event)
(bs/add bs/empty event)
bs/union))))))
(defn reads-by-id
"Takes a history and constructs a Bifurcan map of ID -> #{r1, r2 ...} (a
Bifurcan Set), where each r is an event map (e.g. an Account or Transfer)
restricted to their purely immutable parts. Also adds a piece of metadata:
:index' The history index of the completion of the write"
[history]
(h/ensure-pair-index history)
(->> (t/filter h/ok?)
(t/map immutable-reads)
bmap-merge-union
(h/tesser history)))
(defn ok-write?
"Is this write (from writes-by-id) acknowledged as OK?"
[write]
(= :ok (:result (meta write))))
(defn duplicates
"Takes writes by ID, looks for duplicates. Returns a sequence of vectors,
each vector with duplicate events."
[writes-by-id]
(keep (fn dups [writes]
(let [oks (vec (filter ok-write? writes))]
(when (< 1 (count oks))
oks)))
(bm/values writes-by-id)))
(defn c=
"Compatible equality between a written value and a read value. TigerBeetle
uses 0 to represent an absent value that will be filled in later, so we treat
0 or nil as compatible with everything."
[w r]
(cond (nil? w) true
(and (number? w) (zero? w)) true
true (= w r)))
(defn write-compatible-with-read?
"Is a given write of a transfer or account compatible with a read? Note that
several fields are filled in after a write by TigerBeetle automatically. Also
note that balancing credits mean amounts may differ."
[w r]
(loopr []
[k (keys (merge w r))]
(case k
; Amounts may differ due to balancing credits/debits/etc.
:amount
(if (or (zero? (:amount w))
(<= (:amount r) (:amount w)))
(recur)
false)
; All other keys
(if (c= (get w k) (get r k))
(recur)
false))
true))
(defn divergences
"Takes writes by ID and reads by ID. Looks for divergences. Returns a
seqeunce of vectors, each vector with divergent events for a single ID."
[writes-by-id reads-by-id]
(keep (fn divs [pair]
(letr [id (bm/key pair)
reads (vec (bm/value pair))
; If there are no reads, we're done.
_ (when (= 0 (count reads))
(return nil))
; If we have multiple reads, we've got divergence
_ (when (< 1 (count reads))
(return reads))
; We have exactly one read. Let's make sure it aligns with
; every write.
expected (first reads)
writes (->> (bm/get writes-by-id id)
(filter ok-write?)
(map immutable-part)
(remove #(write-compatible-with-read? % expected)))]
(if (seq writes)
; Some conflicting writes
(concat writes reads)
; Only one read, no conflicting writes
nil)))
reads-by-id))
(defn check-type
"Takes either :accounts or :transfers and checks for errors."
[type history]
(let [f (case type
:accounts :create-accounts
:transfers :create-transfers)
writes-by-id (->> history
(h/filter (h/has-f? f))
writes-by-id)
reads-by-id (->> history
(h/filter (h/has-f?
(case type
:accounts read-account-fs
:transfers read-transfer-fs)))
reads-by-id)
dups (nil-if-empty (take 8 (duplicates writes-by-id)))
divs (nil-if-empty (take 8 (divergences writes-by-id reads-by-id)))]
{:duplicates dups
:divergences divs}))
(defrecord Checker []
checker/Checker
(check [this test history opts]
(let [accounts (check-type :accounts history)
transfers (check-type :transfers history)
]
{:valid? (not (or (:duplicates accounts)
(:duplicates transfers)
(:divergences accounts)
(:divergences transfers)))
:accounts accounts
:transfers transfers})))
(defn checker
"Constructs a new checker for idempotence histories.
Our main checker is predicated on the idea that we only attempt a write of a
given ID once. That won't fly here--we try writing IDs multiple times and
can't tell which one will win. It breaks our inference of which writes
succeeded: if we read ID 4, we don't know which of the three operations that
wrote 4 actually executed it.
Instead, we're just here to verify:
1. No duplicates. A write of an ID succeeds at most once.
2. No divergence. All reads of an ID are identical."
[]
(Checker.))
(defn workload
"Takes CLI opts and constructs a partial test map."
[opts]
{:client (client)
:generator (repeater (gen opts))
:final-generator (final-gen)
:wrap-generator wrap-gen
:checker (Checker.)})