/
set.clj
85 lines (75 loc) · 3.23 KB
/
set.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
(ns scylla.collections.set
"A set backed by a CQL set."
(:require [clojure [pprint :refer :all]]
[clojure.tools.logging :refer [info]]
[jepsen
[client :as client]
[checker :as checker]
[generator :as gen]
[nemesis :as nemesis]]
[qbits.alia :as alia]
[qbits.hayt :refer :all]
[scylla [client :as c]
[db :as db]]))
(defrecord CQLSetClient [tbl-created? conn writec]
client/Client
(open! [this test node]
(assoc this :conn (c/open test node)))
(setup! [_ test]
(let [s (:session conn)]
(locking tbl-created?
(when (compare-and-set! tbl-created? false true)
(c/retry-each
(alia/execute s (create-keyspace
:jepsen_keyspace
(if-exists false)
(with {:replication {:class :SimpleStrategy
:replication_factor 3}})))
(alia/execute s (use-keyspace :jepsen_keyspace))
(alia/execute s (create-table
:sets
(if-exists false)
(column-definitions {:id :int
:elements (set-type :int)
:primary-key [:id]})
(with {:compaction {:class (db/compaction-strategy)}})))
(alia/execute s (insert :sets
(values [[:id 0]
[:elements #{}]])
(if-exists false))))))))
(invoke! [_ test op]
(let [s (:session conn)]
(c/with-errors op #{:read}
(alia/execute s (use-keyspace :jepsen_keyspace))
(case (:f op)
:add (do
(alia/execute s
(update :sets
(set-columns {:elements [+ #{(:value op)}]})
(where [[= :id 0]]))
{:consistency writec})
(assoc op :type :ok))
:read (do (db/wait-for-recovery 30 s)
(let [value (->> (alia/execute s
(select :sets
(where [[= :id 0]]))
{:consistency :all
:retry-policy c/aggressive-read})
first
:elements
(into (sorted-set)))]
(assoc op :type :ok :value value)))))))
(close! [_ _]
(c/close! conn))
(teardown! [_ _]))
(defn cql-set-client
"A set implemented using CQL sets"
([] (->CQLSetClient (atom false) nil :one))
([writec] (->CQLSetClient (atom false) nil writec)))
(defn workload
[opts]
{:client (cql-set-client)
:generator (->> (range)
(map (fn [x] {:f :add, :value x})))
:final-generator {:f :read}
:checker (checker/set)})