Skip to content

Commit

Permalink
Handle ServiceUnavailableException
Browse files Browse the repository at this point in the history
  • Loading branch information
as51340 committed May 24, 2024
1 parent e7b6c5c commit 1d81485
Showing 1 changed file with 140 additions and 90 deletions.
230 changes: 140 additions & 90 deletions tests/jepsen/src/jepsen/memgraph/habank.clj
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@
[jepsen.checker.timeline :as timeline]
[jepsen.memgraph.haclient :as haclient]
[jepsen.memgraph.client :as mgclient]
[jepsen.memgraph.utils :as utils]))
[jepsen.memgraph.utils :as utils]
[clojure.core :as c]))

(def account-num
"Number of accounts to be created"
Expand Down Expand Up @@ -71,12 +72,28 @@
[node]
(some #(= % node) #{"n4" "n5" "n6"}))

(defn accounts-not-created?
"Check if accounts are not created."
(defn random-coord
"Get random leader."
[nodes]
(nth nodes (+ 3 (rand-int 3)))) ; Assumes that first 3 instances are data instances and last 3 are coordinators.

(defn random-data-instance
"Get random data instance."
[nodes]
(nth nodes (rand-int 3)))

(defn accounts-exist?
"Check if accounts are created."
[conn]
(utils/with-session conn session
(let [accounts (->> (get-all-accounts session) (map :n) (reduce conj []))]
(empty? accounts))))
(not-empty accounts))))

(defn main-to-initialize?
"Check if main needs to be initialized. Accepts the name of the node and its bolt connection."
[node bolt-conn]
(and (is-main? node bolt-conn) ; TODO: (andi) Every instance is main at the beginning
(not (accounts-exist? bolt-conn))))

(defn transfer-money
"Transfer money from one account to another by some amount
Expand All @@ -89,7 +106,7 @@
(update-balance tx {:id to :amount amount})))
(info "Transfered money from account" from "to account" to "with amount" amount))

(defrecord Client [nodes-config license organization]
(defrecord Client [nodes-config first-leader first-main license organization]
client/Client
; Open Bolt connection to all nodes and Bolt+routing to coordinators.
(open! [this _test node]
Expand All @@ -102,93 +119,121 @@
:node node)))
; Use Bolt connection to set enterprise.license and organization.name.
(setup! [this _test]
(utils/with-session (:bolt-conn this) session
((haclient/set-db-setting "enterprise.license" license) session)
((haclient/set-db-setting "organization.name" organization) session)))
(try
(utils/with-session (:bolt-conn this) session
((haclient/set-db-setting "enterprise.license" license) session)
((haclient/set-db-setting "organization.name" organization) session))
(catch org.neo4j.driver.exceptions.ServiceUnavailableException _e
(info "Node" (:node this) "is down."))))

(invoke! [this _test op]
(case (:f op)
(let [bolt-conn (:bolt-conn this)
node (:node this)]
(case (:f op)
; Show instances should be run only on coordinator.
:show-instances-read (if (coord-instance? (:node this))
(try
(utils/with-session (:bolt-conn this) session ; Use bolt connection for running show instances.
(let [instances (->> (get-all-instances session) (reduce conj []))]
(assoc op
:type :ok
:value {:instances instances :node (:node this)})))
(catch Exception e
(assoc op :type :fail :value e)))
(assoc op :type :fail :value "Not coord"))
:show-instances-read (if (coord-instance? node)
(try
(utils/with-session bolt-conn session ; Use bolt connection for running show instances.
(let [instances (->> (get-all-instances session) (reduce conj []))]
(assoc op
:type :ok
:value {:instances instances :node node})))
(catch org.neo4j.driver.exceptions.ServiceUnavailableException _e
; Even whole assoc can be returned -> so we don't forget :ok
(assoc op :type :ok :value (str "Node " node " is down"))) ; TODO: (abstract this message into a function)
(catch Exception e
(assoc op :type :fail :value (str e))))
(assoc op :type :ok :value "Not coord"))
; Reading balances should be done only on data instances -> use bolt connection.
:read-balances (if (data-instance? (:node this))
(utils/with-session (:bolt-conn this) session
(let [accounts (->> (get-all-accounts session) (map :n) (reduce conj []))
total (reduce + (map :balance accounts))]
(assoc op
:type :ok
:value {:accounts accounts
:node (:node this)
:total total
:correct (= total (* account-num starting-balance))})))
(assoc op :type :fail :value "Not data instance"))
:read-balances (if (data-instance? node)
(try
(utils/with-session bolt-conn session
(let [accounts (->> (get-all-accounts session) (map :n) (reduce conj []))
total (reduce + (map :balance accounts))]
(assoc op
:type :ok
:value {:accounts accounts
:node node
:total total
:correct (= total (* account-num starting-balance))})))
(catch org.neo4j.driver.exceptions.ServiceUnavailableException _e
(assoc op :type :ok :value (str "Node " node " is down")))
(catch Exception e
(assoc op :type :fail :value (str e))))
(assoc op :type :ok :value "Not data instance"))

; Transfer money from one account to another. Only executed on main.
; If the transferring succeeds, return :ok, otherwise return :fail.
; Transfer will fail if the account doesn't exist or if the account doesn't have enough or if update-balance
; doesn't return anything.
; Allow the exception due to down sync replica.
:transfer (if (is-main? (:node this) (:bolt-conn this))
(try
(let [transfer-info (:value op)]
(transfer-money
(:bolt-conn this)
(:from transfer-info)
(:to transfer-info)
(:amount transfer-info)))
(assoc op :type :ok)
(catch Exception e
(if (string/includes? (str e) "At least one SYNC replica has not confirmed committing last transaction.")
(assoc op :type :ok :value (str e)); Exception due to down sync replica is accepted/expected
(assoc op :type :fail :value (str e)))))
(assoc op :type :fail :value "Not main node."))

:register (if (= (:node this) "n4") ; Node with coordinator-id = 1
(do
(doseq [repl-config (filter #(contains? (val %) :replication-port)
nodes-config)]
(try
(utils/with-session (:bolt-conn this) session ; Use bolt connection for registering replication instances.
((haclient/register-replication-instance
(first repl-config)
(second repl-config)) session))
(catch Exception e
(assoc op :type :fail :value e))))
(doseq [coord-config (->> nodes-config
(filter #(not= (key %) "n4")) ; Don't register itself
(filter #(contains? (val %) :coordinator-id)))]
(try
(utils/with-session (:bolt-conn this) session ; Use bolt connection for registering coordinator instances.
((haclient/add-coordinator-instance
(second coord-config)) session))
(catch Exception e
(assoc op :type :fail :value e))))
(let [rand-main (nth (keys nodes-config) (rand-int 3))] ; 3 because first 3 instances are replication instances in cluster.edn
(try
(utils/with-session (:bolt-conn this) session ; Use bolt connection for setting instance to main.
((haclient/set-instance-to-main rand-main) session))
(catch Exception e
(assoc op :type :fail :value (str e)))))

(assoc op :type :ok))

(when (and (data-instance? (:node this)) (data-instance-is-main? (:bolt-conn this)) (accounts-not-created? (:bolt-conn this)))
(utils/with-session (:bolt-conn this) session
(info "Detaching and deleting all accounts...")
(mgclient/detach-delete-all session)
(info "Creating accounts...")
(dotimes [i account-num]
(create-account session {:id i :balance starting-balance})
(info "Created account:" i)))))))
:transfer
(let [transfer-info (:value op)]
(try
(if (and (is-main? node bolt-conn) (accounts-exist? bolt-conn))
(do
(transfer-money
bolt-conn
(:from transfer-info)
(:to transfer-info)
(:amount transfer-info))
(assoc op :type :ok))
(assoc op :type :ok :value "Transfer allowed only on initialized main."))
(catch org.neo4j.driver.exceptions.ServiceUnavailableException _e
(assoc op :type :ok :value (str "One of the nodes [" (:from transfer-info) ", " (:to transfer-info) "] in transfer is down")))
(catch Exception e
(if (string/includes? (str e) "At least one SYNC replica has not confirmed committing last transaction.")
(assoc op :type :ok :value (str e)); Exception due to down sync replica is accepted/expected
(assoc op :type :fail :value (str e))))))

:register
(if (= node first-leader)
(do
(doseq [repl-config (filter #(contains? (val %) :replication-port)
nodes-config)]
(try
(utils/with-session bolt-conn session ; Use bolt connection for registering replication instances.
((haclient/register-replication-instance
(first repl-config)
(second repl-config)) session))
(catch org.neo4j.driver.exceptions.ServiceUnavailableException _e
(assoc op :type :ok :value (str "Node " node " is down.")))
; TODO: (andi) We can be here more finer-grained and do better ok/fail handling.
(catch Exception e
(assoc op :type :fail :value (str e))))) ; We set type to fail but this isn't being checked at the end.

(doseq [coord-config (->> nodes-config
(filter #(not= (key %) first-leader)) ; Don't register itself
(filter #(contains? (val %) :coordinator-id)))]
(try
(utils/with-session bolt-conn session ; Use bolt connection for registering coordinator instances.
((haclient/add-coordinator-instance
(second coord-config)) session))
(catch org.neo4j.driver.exceptions.ServiceUnavailableException _e
(assoc op :type :ok :value (str "Node " node " is down.")))
(catch Exception e
(assoc op :type :fail :value (str e)))))
(try
(utils/with-session bolt-conn session ; Use bolt connection for setting instance to main.
((haclient/set-instance-to-main first-main) session))
(catch org.neo4j.driver.exceptions.ServiceUnavailableException _e
(assoc op :type :ok :value (str "Node " node " is down.")))
(catch Exception e
(assoc op :type :fail :value (str e))))

(assoc op :type :ok))

(try
(when (main-to-initialize? node bolt-conn)
(utils/with-session bolt-conn session
(info "Deleting all accounts...")
(mgclient/detach-delete-all session)
(info "Creating accounts...")
(dotimes [i account-num]
(create-account session {:id i :balance starting-balance})
(info "Created account:" i))))
(catch org.neo4j.driver.exceptions.ServiceUnavailableException _e
(assoc op :type :ok :value (str "Node " node " is down.")))
(catch Exception e
(assoc op :type :fail :value (str e))))))))

(teardown! [_this _test])
(close! [this _test]
Expand Down Expand Up @@ -368,9 +413,14 @@
(defn workload
"Basic HA workload."
[opts]
{:client (Client. (:nodes-config opts) (:license opts) (:organization opts))
:checker (checker/compose
{:habank (habank-checker)
:timeline (timeline/html)})
:generator (haclient/ha-gen (gen/mix [show-instances-reads read-balances valid-transfer]))
:final-generator {:clients (gen/once show-instances-reads) :recovery-time 20}})
(let [nodes-config (:nodes-config opts)
first-leader (random-coord (keys nodes-config))
first-main (random-data-instance (keys nodes-config))
organization (:organization opts)
license (:license opts)]
{:client (Client. nodes-config first-leader first-main license organization)
:checker (checker/compose
{:habank (habank-checker)
:timeline (timeline/html)})
:generator (haclient/ha-gen (gen/mix [show-instances-reads read-balances valid-transfer]))
:final-generator {:clients (gen/once show-instances-reads) :recovery-time 20}}))

0 comments on commit 1d81485

Please sign in to comment.