-
Notifications
You must be signed in to change notification settings - Fork 743
Expand file tree
/
Copy pathkafka.clj
More file actions
2584 lines (2366 loc) · 111 KB
/
kafka.clj
File metadata and controls
2584 lines (2366 loc) · 111 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
(ns jepsen.tests.kafka
"This workload is intended for systems which behave like the popular Kafka
queue. This includes Kafka itself, as well as compatible systems like
Redpanda.
At the abstract level of this workload, these systems provide a set of
totally-ordered append-only logs called *partitions*, each of which stores a
single arbitrary (and, for our purposes, unique) *message* at a particular
*offset* into the log. Partitions are grouped together into *topics*: each
topic is therefore partially ordered.
Each client has a *producer* and a *consumer* aspect; in Kafka these are
separate clients, but for Jepsen's purposes we combine them. A
producer can *send* a message to a topic-partition, which assigns it a
unique, theoretically monotonically-increasing offset and saves it durably at
that offset. A consumer can *subscribe* to a topic, in which case the system
aautomatically assigns it any number of partitions in that topic--this
assignment can change at any time. Consumers can also assign themselves
specific partitions manually. When a consumer *polls*, it receives messages
and their offsets from whatever topic-partitions it is currently assigned to,
and advances its internal state so that the next poll (barring a change in
assignment) receives the immediately following messages.
## Operations
To subscribe to a new set of topics, we issue an operation like:
{:f :subscribe, :value [k1, k2, ...]}
or
{:f :assign, :value [k1, k2, ...]}
... where k1, k2, etc denote specific partitions. For subscribe,
we convert those partitions to the topics which contain them, and subscribe
to those topics; the database then controls which specific partitions we get.
Just like the Kafka client API, both subscribe and assign replace the current
topics for the consumer.
Assign ops can also have a special key `:seek-to-beginning? true` which
indicates that the client should seek to the beginning of all its partitions.
Reads and writes (and mixes thereof) are encoded as a vector of
micro-operations:
{:f :poll, :value [op1, op2, ...]}
{:f :send, :value [op1, op2, ...]}
{:f :txn, :value [op1, op2, ...]}
Where :poll and :send denote transactions comprising only reads or writes,
respectively, and :txn indicates a general-purpose transaction. Operations
are of two forms:
[:send key value]
... instructs a client to append `value` to the integer `key`--which maps
uniquely to a single topic and partition. These operations are returned as:
[:send key [offset value]]
where offset is the returned offset of the write, if available, or `nil` if
it is unknown (e.g. if the write times out).
Reads are invoked as:
[:poll]
... which directs the client to perform a single `poll` operation on its
consumer. The results of that poll are expanded to:
[:poll {key1 [[offset1 value1] [offset2 value2] ...],
key2 [...]}]
Where key1, key2, etc are integer keys obtained from the topic-partitions
returned by the call to poll, and the value for that key is a vector of
[offset value] pairs, corresponding to the offset of that message in that
particular topic-partition, and the value of the message---presumably,
whatever was written by `[:send key value]` earlier.
When polling *without* using assign, clients should call `.commitSync` before
returning a completion operation.
Before a transaction completes, we commit its offsets.
All transactions may return an optional key :rebalance-log, which is a vector
of rebalancing events (changes in assigned partitions) that occurred during
the execution of that transaction. Each rebalance event is a map like:
{:keys [k1 k2 ...]}
There may be more keys in this map; I can't remember right now.
## Topic-partition Mapping
We identify topics and partitions using abstract integer *keys*, rather than
explicit topics and partitions. The client is responsible for mapping these
keys bijectively to topics and partitions.
## Analysis
From this history we can perform a number of analyses:
1. For any observed value of a key, we check to make sure that its writer was
either :ok or :info; if the writer :failed, we know this constitutes an
aborted read.
2. We verify that all sends and polls agree on the value for a given key and
offset. We do not require contiguity in offsets, because transactions add
invisible messages which take up an offset slot but are not visible to the
API. If we find divergence, we know that Kakfa disagreed about the value at
some offset.
Having verified that each [key offset] pair uniquely identifies a single
value, we eliminate the offsets altogether and perform the remainder of the
analysis purely in terms of keys and values. We construct a graph where
vertices are values, and an edge v1 -> v2 means that v1 immediately precedes
v2 in the offset order (ignoring gaps in the offsets, which we assume are due
to transaction metadata messages).
3. For each key, we take the highest observed offset, and then check that
every :ok :send operation with an equal or lower offset was *also* read by at
least one consumer. If we find one, we know a write was lost!
4. We build a dependency graph between pairs of transactions T1 and T2, where
T1 != T2, like so:
ww. T1 sent value v1 to key k, and T2 sent v2 to k, and o1 < o2
in the version order for k.
wr. T1 sent v1 to k, and T2's highest read of k was v1.
rw. T1's highest read of key k was offset o1, and T2 sent offset o2 to k,
and o1 < o2 in the version order for k.
Our use of \"highest offset\" is intended to capture the fact that each poll
operation observes a *range* of offsets, but in general those offsets could
have been generated by *many* transactions. If we drew wr edges for every
offset polled, we'd generate superfluous edges--all writers are already
related via ww dependencies, so the final wr edge, plus those ww edges,
captures those earlier read values.
We draw rw edges only for the final versions of each key observed by a
transaction. If we drew rw edges for an earlier version, we would incorrectly
be asserting that later transactions were *not* observed!
We perform cycle detection and categorization of anomalies from this graph
using Elle.
5. Internal Read Contiguity: Within a transaction, each pair of reads on the
same key should be directly related in the version order. If we observe a gap
(e.g. v1 < ... < v2) that indicates this transaction skipped over some
values. If we observe an inversion (e.g. v2 < v1, or v2 < ... < v1) then we
know that the transaction observed an order which disagreed with the \"true\"
order of the log.
6. Internal Write Contiguity: Gaps between sequential pairs of writes to the
same key are detected via Elle as write cycles. Inversions are not, so we
check for them explicitly: a transaction sends v1, then v2, but v2 < v1 or v2
< ... v1 in the version order.
7. Intermediate reads? I assume these happen constantly, but are they
supposed to? It's not totally clear what this MEANS, but I think it might
look like a transaction T1 which writes [v1 v2 v3] to k, and another T2 which
polls k and observes any of v1, v2, or v3, but not *all* of them. This
miiight be captured as a wr-rw cycle in some cases, but perhaps not all,
since we're only generating rw edges for final reads.
8. Precommitted reads. These occur when a transaction observes a value that
it wrote. This is fine in most transaction systems, but illegal in Kafka,
which assumes that consumers (running at read committed) *never* observe
uncommitted records."
(:require [analemma [xml :as xml]
[svg :as svg]]
[bifurcan-clj [core :as b]
[int-map :as bim]
[list :as bl]
[map :as bm]
[set :as bs]]
[clojure [datafy :refer [datafy]]
[pprint :refer [pprint]]
[set :as set]]
[clojure.core.reducers :as r]
[clojure.java.io :as io]
[clojure.tools.logging :refer [info warn]]
[dom-top.core :refer [assert+ real-pmap loopr]]
[elle [core :as elle]
[graph :as g]
[list-append :refer [rand-bg-color]]
[txn :as txn]
[util :refer [index-of]]
[rels :refer [ww wr rw]]]
[gnuplot.core :as gnuplot]
[jepsen [checker :as checker]
[client :as client]
[generator :as gen]
[history :as h]
[store :as store]
[util :as util :refer [map-vals
meh
nanos->secs
pprint-str]]]
[jepsen.checker.perf :as perf]
[jepsen.tests.cycle.append :as append]
[slingshot.slingshot :refer [try+ throw+]]
[tesser.core :as t]))
;; Generator
(defn txn-generator
"Takes a list-append generator and rewrites its transactions to be [:poll] or
[:send k v] micro-ops. Also adds a :keys field onto each operation, with a
set of keys that txn would have interacted with; we use this to generate
:subscribe ops later."
[la-gen]
(gen/map (fn rewrite-op [op]
(-> op
(assoc :keys (set (map second (:value op))))
(update :value
(partial mapv (fn rewrite-mop [[f k v]]
(case f
:append [:send k v]
:r [:poll]))))))
la-gen))
(defrecord InterleaveSubscribes [sub-p gen]
gen/Generator
(op [this test context]
; When we're asked for an operation, ask the underlying generator for
; one...
(when-let [[op gen'] (gen/op gen test context)]
(if (= :pending op)
[:pending this]
(let [this' (InterleaveSubscribes. sub-p gen')]
(if (< (rand) sub-p)
; At random, emit a subscribe/assign op instead.
(let [f (rand-nth (vec (:sub-via test)))
op {:f f, :value (vec (:keys op))}]
[(gen/fill-in-op op context) this])
; Or pass through the op directly
[(dissoc op :keys) this'])))))
; Pass through updates
(update [this test context event]
(InterleaveSubscribes.
sub-p
(gen/update gen test context event))))
(defn interleave-subscribes
"Takes CLI options (:sub-p) and a txn generator. Keeps track of the keys
flowing through it, interspersing occasional :subscribe or :assign operations
for recently seen keys."
[opts txn-gen]
(InterleaveSubscribes. (:sub-p opts 1/64) txn-gen))
(defn tag-rw
"Takes a generator and tags operations as :f :poll or :send if they're
entirely comprised of send/polls."
[gen]
(gen/map (fn tag-rw [op]
(case (->> op :value (map first) set)
#{:poll} (assoc op :f :poll)
#{:send} (assoc op :f :send)
op))
gen))
(defn firstv
"First for vectors."
[v]
(nth v 0))
(defn secondv
"Second for vectors."
[v]
(nth v 1))
(defn op->max-poll-offsets
"Takes an operation and returns a map of keys to the highest offsets polled."
[{:keys [type f value]}]
(case type
(:info, :ok)
(case f
(:poll, :txn)
(reduce (fn [offsets [f :as mop]]
(case f
:poll (->> (second mop)
(map-vals (fn [pairs]
(->> pairs
(map first)
(remove nil?)
(reduce max -1))))
(merge-with max offsets))
:send offsets))
{}
value)
nil)
nil))
(defn op->max-send-offsets
"Takes an operation and returns a map of keys to the highest offsets sent."
[{:keys [type f value]}]
(case type
(:info, :ok)
(case f
(:send, :txn)
(reduce (fn [offsets [f :as mop]]
(case f
:poll offsets
:send (let [[_ k v] mop]
(when (and (vector? v) (first v))
(assoc offsets k (max (get offsets k 0)
(first v)))))))
{}
value)
nil)
nil))
(defn op->max-offsets
"Takes an operation (presumably, an OK or info one) and returns a map of keys
to the highest offsets interacted with, either via send or poll, in that op."
[op]
(merge-with max
(op->max-poll-offsets op)
(op->max-send-offsets op)))
(defrecord PollUnseen [gen sent polled]
; sent and polled are both maps of keys to maximum offsets sent and polled,
; respectively.
gen/Generator
(op [this test context]
(when-let [[op gen'] (gen/op gen test context)]
(if (= :pending op)
[:pending this]
(let [this' (PollUnseen. gen' sent polled)]
; About 1/3 of the time, merge our sent-but-unpolled keys into an
; assign/subscribe
(if (and (< (rand) 1/3)
(< 0 (count sent))
(or (= :assign (:f op))
(= :subscribe (:f op))))
[(assoc op
:value (->> (:value op)
(concat (keys sent))
distinct
vec)
; Just for debugging, so you can look at the log and tell
; what's going on
:unseen (->> sent
(map (fn [[k sent-offset]]
[k {:polled (polled k -1)
:sent sent-offset}]))
(into (sorted-map))))
this']
; Pass through
[op this'])))))
; Look at the offsets we sent and polled, and advance our state to match
(update [this test context event]
(if (= :ok (:type event))
(let [sent' (merge-with max sent (op->max-send-offsets event))
polled' (merge-with max polled (op->max-poll-offsets event))
; Trim keys we're caught up on
[sent' polled']
(loopr [sent sent'
polled polled']
[k (distinct (concat (keys sent') (keys polled')))]
(if (< (polled k -1) (sent k -1))
; We have unseen elements
(recur sent polled)
; We're caught up!
(recur (dissoc sent k) (dissoc polled k))))]
(PollUnseen. (gen/update gen test context event) sent' polled'))
; No change
this)))
(defn poll-unseen
"Wraps a generator. Keeps track of every offset that is successfully sent,
and every offset that's successfully polled. When there's a key that has some
offsets which were sent but not polled, we consider that unseen. This
generator occasionally rewrites assign/subscribe operations to try and catch
up to unseen keys."
[gen]
(PollUnseen. gen {} {}))
(defrecord TrackKeyOffsets [gen offsets]
gen/Generator
(op [this test context]
(when-let [[op gen'] (gen/op gen test context)]
(if (= :pending op)
[:pending this]
[op (TrackKeyOffsets. gen' offsets)])))
(update [this test context event]
(when (= :ok (:type event))
(let [op-offsets (op->max-offsets event)]
(when-not (empty? op-offsets)
(swap! offsets #(merge-with max % op-offsets)))))
(TrackKeyOffsets.
(gen/update gen test context event) offsets)))
(defn track-key-offsets
"Wraps a generator. Keeps track of every key that generator touches in the
given atom, which is a map of keys to highest offsets seen."
[keys-atom gen]
(TrackKeyOffsets. gen keys-atom))
(defrecord FinalPolls [target-offsets gen]
gen/Generator
(op [this test context]
;(info "waiting for" target-offsets)
(when-not (empty? target-offsets)
(when-let [[op gen'] (gen/op gen test context)]
[op (assoc this :gen gen')])))
(update [this test context {:keys [type f value] :as event}]
; (info :update event)
(if (and (= type :ok)
(= f :poll))
(let [offsets' (reduce (fn [target-offsets' [k seen-offset]]
(if (<= (get target-offsets' k -1)
seen-offset)
; We've read past our target offset for
; this key
(dissoc target-offsets' k)
target-offsets'))
target-offsets
(op->max-offsets event))]
(when-not (identical? target-offsets offsets')
(info "Process" (:process event) "now waiting for" offsets'))
(FinalPolls. offsets' gen))
; Not relevant
this)))
(defn final-polls
"Takes an atom containing a map of keys to offsets. Constructs a generator
which:
1. Checks the topic-partition state from the admin API
2. Crashes the client, to force a fresh one to be opened, just in case
there's broken state inside the client.
3. Assigns the new client to poll every key, and seeks to the beginning
4. Polls repeatedly
This process repeats every 10 seconds until polls have caught up to the
offsets in the offsets atom."
[offsets]
(delay
(let [offsets @offsets]
(info "Polling up to offsets" offsets)
(->> [{:f :crash}
{:f :debug-topic-partitions, :value (keys offsets)}
{:f :assign, :value (keys offsets), :seek-to-beginning? true}
(->> {:f :poll, :value [[:poll]], :poll-ms 1000}
repeat
(gen/stagger 1/5))]
(gen/time-limit 10000)
repeat
(FinalPolls. offsets)))))
(defn crash-client-gen
"A generator which, if the test has :crash-clients? true, periodically emits
an operation to crash a random client."
[opts]
(when (:crash-clients? opts)
(->> (repeat {:f :crash})
(gen/stagger (/ (:crash-client-interval opts 30)
(:concurrency opts))))))
;; Checker ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(defn assocv
"An assoc on vectors which allows you to assoc at arbitrary indexes, growing
the vector as needed. When v is nil, constructs a fresh vector rather than a
map."
[v i value]
(if v
(if (<= i (count v))
(assoc v i value)
(let [nils (repeat (- i (count v)) nil)]
(assoc (into v nils) i value)))
; Nil is treated as an empty vector.
(recur [] i value)))
(defn nth+
"Nth for vectors, but returns nil instead of out-of-bounds."
[v i]
(when (< i (count v))
(nth v i)))
(defn op-writes-helper
"Takes an operation and a function which takes an offset-value pair. Returns
a map of keys written by this operation to the sequence of (f [offset value])
sends for that key. Note that offset may be nil."
[op f]
(when (#{:txn :send} (:f op))
(reduce (fn [writes mop]
(if (= :send (first mop))
(let [[_ k v] mop
vs (get writes k [])
; Values can be either a literal value or a [offset value]
; pair.
value (if (vector? v)
(f v)
(f [nil v]))]
(assoc writes k (conj vs value)))
; Something other than a send
writes))
{}
(:value op))))
(defn op-writes
"Returns a map of keys to the sequence of all values written to that key in
an op."
[op]
(op-writes-helper op second))
(defn op-write-offsets
"Returns a map of keys to the sequence of all offsets written to that key in
an op."
[op]
(op-writes-helper op first))
(defn op-write-pairs
"Returns a map of keys to the sequence of all [offset value] pairs written to
that key in an op."
[op]
(op-writes-helper op identity))
(defn op-reads-helper
"Takes an operation and a function which takes an offset-value pair. Returns
a map of keys read by this operation to the sequence of (f [offset value])
read for that key."
[^jepsen.history.Op op f]
; This shows up in lots of our tight loops, so it's full of
; micro-optimizations
; shape
(when (or (identical? :txn (.f op))
(identical? :poll (.f op)))
(let [r (reduce (fn mop [res mop]
(if (and (identical? :poll (nth mop 0))
(<= 2 (count mop)))
(reduce (fn per-key [res [k pairs]]
(let [vs' (reduce (fn per-pair [vs pair]
(conj! vs (f pair)))
(get res k (transient []))
pairs)]
(assoc! res k vs')))
res
(nth mop 1))
res))
(transient {})
(.value op))]
; Persist
(update-vals (persistent! r) persistent!))))
(defn op-read-pairs
"Returns a map of keys to the sequence of all [offset value] pairs read for
that key."
[op]
(op-reads-helper op identity))
(defn op-read-offsets
"Returns a map of keys to the sequence of all offsets read for that key."
[op]
(op-reads-helper op firstv))
(defn op-reads
"Returns a map of keys to the sequence of all values read for that key."
[op]
(op-reads-helper op secondv))
(defn op-reads-index
"We call op-reads a LOT. This takes a history and builds an efficient index,
then returns a function (op-reads op) which works just like (op-reads op),
but is memoized."
[history]
(let [reducer (fn reducer
([] (b/linear (bim/int-map)))
([index] index)
([index ^jepsen.history.Op op]
(bim/put index (.index op) (op-reads op))))
combiner (fn combiner
([] (b/linear (bim/int-map)))
([index] (b/forked index))
([index1 index2]
(bm/merge index1 index2)))
fold {:name :op-reads-index
:reducer reducer
:combiner combiner}
index (h/fold history fold)]
(fn memoized [^jepsen.history.Op op]
(bim/get index (.index op) nil))))
(defn op-pairs
"Returns a map of keys to the sequence of all [offset value] pairs either
written or read for that key; writes first."
[op]
(merge-with concat (op-write-pairs op) (op-read-pairs op)))
(defn reads-of-key
"Returns a seq of all operations which read the given key, and, optionally,
read the given value."
([k history]
(->> history
(filter (comp #{:txn :send :poll} :f))
(filter (fn [op]
(contains? (op-reads op) k)))))
([k v history]
(->> history
(reads-of-key k)
(filter (fn [op]
(some #{v} (get (op-reads op) k)))))))
(defn writes-of-key
"Returns a seq of all operations which wrote the given key, and, optionally,
sent the given value."
([k history]
(->> history
(filter (comp #{:txn :send :poll} :f))
(filter (fn [op]
(contains? (op-writes op) k)))))
([k v history]
(->> history
(writes-of-key k)
(filter (fn [op]
(some #{v} (get (op-writes op) k)))))))
(defn reads-of-key-offset
"Returns a seq of all operations which read the given key and offset."
[k offset history]
(->> history
(reads-of-key k)
(filter (fn [op]
(some #{offset} (get (op-read-offsets op) k))))))
(defn writes-of-key-offset
"Returns a seq of all operations which wrote the given key and offset."
[k offset history]
(->> history
(writes-of-key k)
(filter (fn [op]
(some #{offset} (get (op-write-offsets op) k))))))
(defn reads-of-key-value
"Returns a seq of all operations which read the given key and value."
[k value history]
(->> history
(reads-of-key k)
(filter (fn [op] (some #{value} (get (op-reads op) k))))))
(defn writes-of-key-value
"Returns a seq of all operations which wrote the given key and value."
[k value history]
(->> history
(writes-of-key k)
(filter (fn [op] (some #{value} (get (op-writes op) k))))))
(defn op-around-key-offset
"Takes an operation and returns that operation with its value trimmed so that
any send/poll operations are constrained to just the given key, and values
within n of the given offset. Returns nil if operation is not relevant."
([k offset op]
(op-around-key-offset k offset 3 op))
([k offset n op]
(when (and (not= :invoke (:type op))
(#{:send :poll :txn} (:f op)))
(let [value'
(keep (fn [[f v :as mop]]
(case f
:poll (when-let [pairs (get v k)]
; Trim pairs to region around offset
(let [trimmed
(filter (fn [[o v]]
(<= (- offset n) o (+ offset n)))
pairs)]
(when (seq trimmed)
[:poll {k trimmed}])))
:send (let [[_ k2 v-or-pair] mop]
(when (vector? v-or-pair)
(let [[o v] v-or-pair]
(when (and (= k k2)
(<= (- offset n) o (+ offset n)))
mop))))))
(:value op))]
(when-not (empty? value')
(assoc op :value value'))))))
(defn around-key-offset
"Filters a history to just those operations around a given key and offset;
trimming their mops to just those regions as well."
([k offset history]
(around-key-offset k offset 3 history))
([k offset n history]
(keep (partial op-around-key-offset k offset n) history)))
(defn around-some
"Clips a sequence to just those elements near a predicate. Takes a predicate,
a range n, and a sequence xs. Returns the series of all x in xs such x is
within n elements of some x' matching predicate."
[pred n coll]
(let [indices (first
(reduce (fn [[indices i] x]
(if (pred x)
[(into indices (range (- i n) (+ i n 1))) (inc i)]
[indices (inc i)]))
[#{} 0]
coll))]
(first (reduce (fn [[out i] x]
(if (indices i)
[(conj out x) (inc i)]
[out (inc i)]))
[[] 0]
coll))))
(defn op-around-key-value
"Takes an operation and returns that operation with its value trimmed so that
any send/poll operations are constrained to just the given key, and values
within n of the given value. Returns nil if operation is not relevant."
([k value op]
(op-around-key-value k value 3 op))
([k value n op]
(when (and (= :ok (:type op))
(#{:send :poll :txn} (:f op)))
(let [value'
(keep (fn [[f v :as mop]]
(case f
:poll (when-let [pairs (get v k)]
; Trim pairs to region around offset
(let [trimmed (around-some (comp #{value} second)
n pairs)]
(when (seq trimmed)
{k trimmed})))
:send (let [[_ k2 [o v]] mop]
(when (and (= k k2) (= value v))
mop))))
(:value op))]
(when-not (empty? value')
(assoc op :value value'))))))
(defn around-key-value
"Filters a history to just those operations around a given key and value;
trimming their mops to just those regions as well."
([k value history]
(around-key-value k value 3 history))
([k value n history]
(keep (partial op-around-key-value k value n) history)))
(defn writes-by-type
"Takes a history and constructs a map of types (:ok, :info, :fail) to maps of
keys to the set of all values which were written for that key. We use this to
identify, for instance, what all the known-failed writes were for a given
key."
[history]
(->> history
(remove (comp #{:invoke} :type))
(filter (comp #{:txn :send} :f))
(group-by :type)
(map-vals (fn [ops]
(->> ops
; Construct a seq of {key [v1 v2 ...]} maps
(map op-writes)
; And turn [v1 v2 ...] into #{v1 v2 ...}
(map (partial map-vals set))
; Then merge them all together
(reduce (partial merge-with set/union) {}))))))
(defn reads-by-type
"Takes a history and an op-reads fn, and constructs a map of types (:ok,
:info, :fail) to maps of keys to the set of all values which were read for
that key. We use this to identify, for instance, the known-successful reads
for some key as a part of finding lost updates."
[history op-reads]
(->> history
(remove (comp #{:invoke} :type))
(filter (comp #{:txn :poll} :f))
(group-by :type)
(map-vals (fn [ops]
(->> ops
(map op-reads)
(map (partial map-vals set))
(reduce (partial merge-with set/union) {}))))))
(defn must-have-committed?
"Takes a reads-by-type map and a (presumably :info) transaction which sent
something. Returns true iff the transaction was :ok, or if it was :info and
we can prove that some send from this transaction was read."
[reads-by-type op]
(or (= :ok (:type op))
(and (= :info (:type op))
(let [ok (:ok reads-by-type)]
(some (fn [[k vs]]
(let [ok-vs (get ok k #{})]
(some ok-vs vs)))
(op-writes op))))))
(defn version-orders-update-log
"Updates a version orders log with the given offset and value."
[log offset value]
(bm/update (or log (bim/int-map))
offset
(fn update [values]
(bs/add (if (nil? values)
(b/linear bs/empty)
values)
value))))
(defn version-orders-reduce-mop
"Takes a logs object from version-orders and a micro-op, and integrates that
micro-op's information about offsets into the logs."
[logs mop]
(case (firstv mop)
:send (let [[_ k v] mop]
(if (vector? v)
(let [[offset value] v]
(if offset
; We completed the send and know an offset
(bm/update logs k version-orders-update-log offset value)
; Not sure what the offset was
logs))
; Not even offset structure: maybe an :info txn
logs))
:poll (loopr [logs logs]
[; For each key and series of pairs polled for that key
[k pairs] (second mop)
; And for each offset and value in those pairs
[offset value] pairs]
(recur (if offset
(bm/update logs k
version-orders-update-log offset value)
; Don't know offset
logs)))))
(defn index-seq
"Takes a seq of distinct values, and returns a map of:
{:by-index A vector of the sequence
:by-value A map of values to their indices in the vector.}"
[xs]
{:by-index (vec xs)
:by-value (into {} (map-indexed (fn [i x] [x i]) xs))})
(defn log->value->first-index
"Takes a log: a vector of sets of read values for each offset in a partition,
possibly including `nil`s. Returns a map which takes a value to the index
where it first appeared."
[log]
(->> (remove nil? log)
(reduce (fn [[earliest i] values]
[(reduce (fn [earliest value]
(if (contains? earliest value)
earliest
(assoc earliest value i)))
earliest
values)
(inc i)])
[{} 0])
first))
(defn log->last-index->values
"Takes a log: a vector of sets of read values for each offset in a partition,
possibly including `nil`s. Returns a vector which takes indices (dense
offsets) to sets of values whose *last* appearance was at that position."
[log]
(->> (remove nil? log)
; Build up a map of values to their latest indexes
(reduce (fn latest [[latest i] values]
[(reduce (fn [latest value]
(assoc latest value i))
latest
values)
(inc i)])
[{} 0])
first
; Then invert that map into a vector of indexes to sets
(reduce (fn [log [value index]]
(let [values (get log index #{})]
(assocv log index (conj values value))))
[])))
(defn datafy-version-order-log
"Turns a bifurcan integer map of Bifurcan sets, and converts it to a vector
of Clojure sets."
[m]
(let [size (b/size m)]
(if (= 0 size)
[]
(let [; Since keys are sorted...
max-i (bm/key (b/nth m (dec size)))]
; Create a vector for each index up to and including max-i
(loop [v (transient [])
i 0]
(if (< max-i i)
; Done
(persistent! v)
; Copy this offset into the vector
(let [values (bim/get m i nil)
values (when values
(set values))]
(recur (assoc! v i values)
(inc i)))))))))
(defn version-orders
"Takes a history and a reads-by-type structure. Constructs a map of:
{:orders A map of keys to orders for that key. Each order is a map of:
{:by-index A vector which maps indices to single values,
in log order.
:by-value A map of values to indices in the log.
:log A vector which maps offsets to sets of values
in log order.}
:errors A series of error maps describing any incompatible orders, where
a single offset for a key maps to multiple values.}
Offsets are directly from Kafka. Indices are *dense* offsets, removing gaps
in the log.
Note that we infer version orders from sends only when we can prove their
effects were visible, but from *all* polls, including :info and :fail ones.
Why? Because unlike a traditional transaction, where you shouldn't trust
reads in aborted txns, pollers in Kafka's transaction design are *always*
supposed to emit safe data regardless of whether the transaction commits or
not."
[history reads-by-type]
; First, build up our logs concurrently. We start with a Bifurcan map of keys
; to logs. Each log is a Bifurcan integer map of offsets to sets of values.
; This gives us efficient mergeability.
(let [fold
{:name :version-orders
:reducer
(fn reducer
([] (b/linear bm/empty))
([logs] logs)
([logs op]
(case (:f op)
(:poll, :send, :txn)
(if (must-have-committed? reads-by-type op)
; OK or info, and we can prove effects were visible
(reduce version-orders-reduce-mop logs (:value op))
; We're not sure it was bisible, just use the polls.
(reduce version-orders-reduce-mop logs
(r/filter (comp #{:poll} firstv) (:value op))))
; Some non-transactional op
logs)))
; Merge logs together
:combiner
(fn combiner
([] (b/linear bm/empty))
([logs]
; Convert back to Clojure
(loopr [logs' (transient {})]
[k-log logs]
(let [k (bm/key k-log)
log (bm/value k-log)]
(recur
(assoc! logs' k (datafy-version-order-log log))))
(persistent! logs')))
; Merge two logs
([logs1 logs2]
(bm/merge logs1 logs2
(fn merge-log [log1 log2]
(bm/merge log1 log2
(fn merge-offset [vs1 vs2]
(bs/union vs1 vs2)))))))}
logs (h/fold history fold)]
; Transform our logs to orders.
{:errors
(->> logs
(mapcat (fn errors [[k log]]
(->> log
(reduce (fn [[offset index errs] values]
(condp <= (count values)
; Divergence
2 [(inc offset) (inc index)
(conj errs {:key k
:offset offset
:index index
:values
(into (sorted-set)
values)})]
; No divergence
1 [(inc offset) (inc index) errs]
; Hole in log
0 [(inc offset) index errs]))
[0 0 []])
last)))
seq)
:orders
(map-vals
(fn key-order [log]
(assoc (->> log (remove nil?) (map first) index-seq)
:log log))
logs)}))
(defn g1a-cases