-
Notifications
You must be signed in to change notification settings - Fork 6
/
Copy pathmv.clj
108 lines (98 loc) · 4.38 KB
/
mv.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
(ns scylla.mv
"Some kind of test for materialized views. Not exactly sure what this does!"
(:require [clojure [pprint :refer :all]]
[clojure.tools.logging :refer [info]]
[jepsen
[client :as client]
[generator :as gen]
[nemesis :as nemesis]]
[qbits.alia :as alia]
[qbits.alia.policy.retry :as retry]
[qbits.hayt :refer :all]
[scylla [checker :as checker]
[client :as c]])
(:import (clojure.lang ExceptionInfo)
(java.net InetSocketAddress)))
(defrecord MVMapClient [tbl-created? conn]
client/Client
(open! [this test node]
(assoc this :conn (c/open test node)))
(setup! [_ test]
(let [c (:session conn)]
(locking tbl-created?
(when (compare-and-set! tbl-created? false true)
(c/retry-each
(alia/execute c (create-keyspace
:jepsen_keyspace
(if-exists false)
(with {:replication {:class :SimpleStrategy
:replication_factor 3}})))
(alia/execute c (use-keyspace :jepsen_keyspace))
(alia/execute c (create-table
:map
(if-exists false)
(column-definitions {:key :int
:value :int
:primary-key [:key]})
(with {:compaction {:class (:compaction-strategy test)}})))
(try (alia/execute
c (str "CREATE MATERIALIZED VIEW mvmap AS SELECT"
" * FROM map WHERE value IS NOT NULL"
" AND key IS NOT NULL "
"PRIMARY KEY (value, key)"
"WITH compaction = "
"{'class' : '" (:compaction-strategy test)
"'};"))
(catch com.datastax.driver.core.exceptions.AlreadyExistsException e)))))))
(invoke! [_ _ op]
(let [c (:session conn)]
(c/with-errors op #{:read}
(alia/execute c (use-keyspace :jepsen_keyspace))
(case (:f op)
:assoc (do (alia/execute c
(update :map
(set-columns {:value (:v (:value op))})
(where [[= :key (:k (:value op))]]))
(merge {:consistency :one
:retry-policy (retry/fallthrough-retry-policy)}
(c/write-opts test)))
(assoc op :type :ok))
:read (let [value (->> (alia/execute c
(select :mvmap)
; TODO: do we really want ALL
; here?
(merge {:consistency :all}
(c/read-opts test)))
(#(zipmap (map :key %) (map :value %))))]
(assoc op :type :ok :value value))))))
(close! [_ _]
(c/close! conn))
(teardown! [_ _])
client/Reusable
(reusable? [_ _] true))
(defn mv-map-client
"A map implemented using MV"
([]
(->MVMapClient (atom false) nil)))
(defn assocs
"Generator that emits :assoc operations for sequential integers,
mapping x to (f x)"
[f]
(->> (range)
(map (fn [x] {:f :assoc :value {:k x, :v (f x)}}))))
(defn workload
[opts]
(let [tl (:time-limit opts)]
{:client (mv-map-client)
; Not exactly sure what this is supposed to do. Looks like it sets x to x
; on the map, for sequential integers, then does a read, then turns around
; and sets x to -x, and does another read. Is the read timing important?
; We should look at associative-map (and maybe move it into this ns if
; it's not used elsewhere). There were also no-op replayer conductors
; here... what were they for?
:generator (gen/phases
(gen/time-limit (/ tl 2) (assocs identity))
{:f :read}
(assocs -))
:final-generator {:f :read}
:checker (checker/associative-map)}))