forked from jepsen-io/jepsen
-
Notifications
You must be signed in to change notification settings - Fork 22
/
Copy pathbank.clj
216 lines (199 loc) · 8.32 KB
/
bank.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
(ns tidb.bank
(:refer-clojure :exclude [test])
(:require [clojure.string :as str]
[jepsen
[client :as client]
[generator :as gen]
[checker :as checker]]
[knossos.op :as op]
[clojure.core.reducers :as r]
[clojure.java.jdbc :as j]
[tidb.sql :refer :all]
[tidb.basic :as basic]
[clojure.tools.logging :refer :all]))
(defrecord BankClient [node n starting-balance lock-type in-place?]
client/Client
(setup! [this test node]
(j/with-db-connection [c (conn-spec (first (:nodes test)))]
(j/execute! c ["create table if not exists accounts
(id int not null primary key,
balance bigint not null)"])
(dotimes [i n]
(try
(with-txn-retries
(j/insert! c :accounts {:id i, :balance starting-balance}))
(catch java.sql.SQLIntegrityConstraintViolationException e nil))))
(assoc this :node node))
(invoke! [this test op]
(with-txn op [c (first (:nodes test))]
(try
(case (:f op)
:read (->> (j/query c [(str "select * from accounts")])
(mapv :balance)
(assoc op :type :ok, :value))
:transfer
(let [{:keys [from to amount]} (:value op)
b1 (-> c
(j/query [(str "select * from accounts where id = ?" lock-type) from]
:row-fn :balance)
first
(- amount))
b2 (-> c
(j/query [(str "select * from accounts where id = ?"
lock-type)
to]
:row-fn :balance)
first
(+ amount))]
(cond (neg? b1)
(assoc op :type :fail, :value [:negative from b1])
(neg? b2)
(assoc op :type :fail, :value [:negative to b2])
true
(if in-place?
(do (j/execute! c ["update accounts set balance = balance - ? where id = ?" amount from])
(j/execute! c ["update accounts set balance = balance + ? where id = ?" amount to])
(assoc op :type :ok))
(do (j/update! c :accounts {:balance b1} ["id = ?" from])
(j/update! c :accounts {:balance b2} ["id = ?" to])
(assoc op :type :ok)))))))))
(teardown! [_ test]))
(defn bank-client
"Simulates bank account transfers between n accounts, each starting with
starting-balance."
[n starting-balance lock-type in-place?]
(BankClient. nil n starting-balance lock-type in-place?))
(defn bank-read
"Reads the current state of all accounts without any synchronization."
[_ _]
{:type :invoke, :f :read})
(defn bank-transfer
"Transfers a random amount between two randomly selected accounts."
[test process]
(let [n (-> test :client :n)]
{:type :invoke
:f :transfer
:value {:from (rand-int n)
:to (rand-int n)
:amount (rand-int 5)}}))
(def bank-diff-transfer
"Like transfer, but only transfers between *different* accounts."
(gen/filter (fn [op] (not= (-> op :value :from)
(-> op :value :to)))
bank-transfer))
(defn bank-checker
"Balances must all be non-negative and sum to the model's total."
[]
(reify checker/Checker
(check [this test model history opts]
(let [bad-reads (->> history
(r/filter op/ok?)
(r/filter #(= :read (:f %)))
(r/map (fn [op]
(let [balances (:value op)]
(cond (not= (:n model) (count balances))
{:type :wrong-n
:expected (:n model)
:found (count balances)
:op op}
(not= (:total model)
(reduce + balances))
{:type :wrong-total
:expected (:total model)
:found (reduce + balances)
:op op}))))
(r/filter identity)
(into []))]
{:valid? (empty? bad-reads)
:bad-reads bad-reads}))))
(defn bank-test-base
[opts]
(basic/basic-test
(merge
{:client {:client (:client opts)
:during (->> (gen/mix [bank-read bank-diff-transfer])
(gen/clients)
(gen/stagger 0))
:final (gen/clients (gen/once bank-read))}
:checker (checker/compose
{:perf (checker/perf)
:details (bank-checker)})}
(dissoc opts :client))))
(defn test
[opts]
(bank-test-base
(merge {:name "bank"
:model {:n 5 :total 50}
:client (bank-client 5 10 " FOR UPDATE" false)}
opts)))
; One bank account per table
(defrecord MultiBankClient [node tbl-created? n starting-balance lock-type in-place?]
client/Client
(setup! [this test node]
(locking tbl-created?
(when (compare-and-set! tbl-created? false true)
(j/with-db-connection [c (conn-spec (first (:nodes test)))]
(dotimes [i n]
(Thread/sleep 500)
(info "Creating table accounts" i)
(j/execute! c [(str "create table if not exists accounts" i
"(id int not null primary key,"
"balance bigint not null)")])
(Thread/sleep 500)
(try
(Thread/sleep 500)
(info "Populating account" i)
(with-txn-retries
(j/insert! c (str "accounts" i) {:id 0, :balance starting-balance}))
(catch java.sql.SQLIntegrityConstraintViolationException e nil))))))
(assoc this :node node))
(invoke! [this test op]
(with-txn op [c (first (:nodes test))]
(try
(case (:f op)
:read
(->> (range n)
(mapv (fn [x]
(->> (j/query
c [(str "select balance from accounts" x)]
:row-fn :balance)
first)))
(assoc op :type :ok, :value))
:transfer
(let [{:keys [from to amount]} (:value op)
from (str "accounts" from)
to (str "accounts" to)
b1 (-> c
(j/query
[(str "select balance from " from lock-type)]
:row-fn :balance)
first
(- amount))
b2 (-> c
(j/query [(str "select balance from " to lock-type)]
:row-fn :balance)
first
(+ amount))]
(cond (neg? b1)
(assoc op :type :fail, :error [:negative from b1])
(neg? b2)
(assoc op :type :fail, :error [:negative to b2])
true
(if in-place?
(do (j/execute! c [(str "update " from " set balance = balance - ? where id = 0") amount])
(j/execute! c [(str "update " to " set balance = balance + ? where id = 0") amount])
(assoc op :type :ok))
(do (j/update! c from {:balance b1} ["id = 0"])
(j/update! c to {:balance b2} ["id = 0"])
(assoc op :type :ok)))))))))
(teardown! [_ test]))
(defn multitable-bank-client
[n starting-balance lock-type in-place?]
(MultiBankClient. nil (atom false) n starting-balance lock-type in-place?))
(defn multitable-test
[opts]
(bank-test-base
(merge {:name "bank-multitable"
:model {:n 5 :total 50}
:client (multitable-bank-client 5 10 " FOR UPDATE" false)}
opts)))