-
Notifications
You must be signed in to change notification settings - Fork 725
/
Copy pathupsert.clj
86 lines (78 loc) · 3.3 KB
/
upsert.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
(ns jepsen.dgraph.upsert
"Attempts to upsert documents based on an index read, and verifies that at
most one upsert succeeds per key."
(:require [clojure.tools.logging :refer [info]]
[dom-top.core :refer [with-retry]]
[knossos.op :as op]
[jepsen.dgraph [client :as c]]
[jepsen [client :as client]
[checker :as checker]
[generator :as gen]
[independent :as independent]]))
(defrecord Client [conn]
client/Client
(open! [this test node]
(assoc this :conn (c/open node)))
(setup! [this test]
(c/alter-schema! conn (str "email: string @index(exact)"
(when (:upsert-schema test) " @upsert")
" .")))
(invoke! [this test op]
(let [[k v] (:value op)]
(c/with-conflict-as-fail op
(c/with-txn test [t conn]
(case (:f op)
:upsert (let [inserted (c/upsert! t
:email
{:email (str k)})]
(assoc op
:type (if inserted :ok :fail)
:value (independent/tuple
k (first (vals inserted)))))
:read (->> (c/query t (str "{\n"
" q(func: eq(email, $email)) {\n"
" uid\n"
" }\n"
"}")
{:email (str k)})
:q
(map :uid)
sort
(independent/tuple k)
(assoc op :type :ok, :value)))))))
(teardown! [this test])
(close! [this test]
(c/close! conn)))
(defn checker
"Ensures that at most one UID is ever returned from any read."
[]
(reify checker/Checker
(check [this test model history opts]
(let [reads (->> history
(filter op/ok?)
(filter #(= :read (:f %))))
upserts (->> history
(filter op/ok?)
(filter #(= :upsert (:f %))))
bad-reads (filter #(< 1 (count (:value %))) reads)]
{:valid? (and (empty? bad-reads)
(<= (count upserts) 1))
:bad-reads bad-reads
:ok-upserts upserts}))))
(defn workload
"Stuff you need to build a test!"
[opts]
{:client (Client. nil)
:checker (independent/checker (checker))
:generator (independent/concurrent-generator
(min (:concurrency opts)
(* 2 (count (:nodes opts))))
(range)
(fn [k]
; This is broken because phases inserts a global barrier for
; all threads at this point. When a thread finishes due to
; time-limit, it might give up without ever making it to the
; barrier. That *traps* the other threads on the barrier
; forever.
(gen/phases (gen/each (gen/once {:type :invoke, :f :upsert}))
(gen/each (gen/once {:type :invoke, :f :read})))))})