Skip to content

Commit

Permalink
datascript.test.storage/stress-test
Browse files Browse the repository at this point in the history
  • Loading branch information
tonsky committed Nov 21, 2023
1 parent bfc673a commit 9bf3a6e
Show file tree
Hide file tree
Showing 2 changed files with 93 additions and 14 deletions.
5 changes: 5 additions & 0 deletions script/stress_storage.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
#!/bin/bash
set -o errexit -o nounset -o pipefail
cd "`dirname $0`/.."

clojure -J-Xmx512m -M:test -m datascript.test.storage $@
102 changes: 88 additions & 14 deletions test/datascript/test/storage.clj
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,21 @@
[cognitect.transit :as transit]
[datascript.core :as d]
[datascript.storage :as storage]
[datascript.test.core :as tdc]))
[datascript.test.core :as tdc])
(:import
[java.util.concurrent Executors]))

(defrecord Storage [*disk *reads *writes *deletes]
storage/IStorage
(-store [_ addr+data-seq]
(doseq [[addr data] addr+data-seq]
(vswap! *disk assoc addr (pr-str data))
(vswap! *writes conj addr)))
(when *writes
(vswap! *writes conj addr))))

(-restore [_ addr]
(vswap! *reads conj addr)
(when *reads
(vswap! *reads conj addr))
(-> @*disk (get addr) edn/read-string))

(-list-addresses [_]
Expand All @@ -25,14 +29,18 @@
(-delete [_ addrs-seq]
(doseq [addr addrs-seq]
(vswap! *disk dissoc addr)
(vswap! *deletes conj addr))))
(when *deletes
(vswap! *deletes conj addr)))))

(defn make-storage [& [opts]]
(map->Storage
{:*disk (volatile! {})
:*reads (volatile! [])
:*writes (volatile! [])
:*deletes (volatile! [])}))
:*reads (when (:stats opts)
(volatile! []))
:*writes (when (:stats opts)
(volatile! []))
:*deletes (when (:stats opts)
(volatile! []))}))

(defn reset-stats [storage]
(vreset! (:*reads storage) [])
Expand All @@ -53,7 +61,7 @@
(deftest test-basics
(testing "empty db"
(let [db (d/empty-db)
storage (make-storage)]
storage (make-storage {:stats true})]
(d/store db storage)
(is (= 5 (count @(:*writes storage))))
(let [db' (d/restore storage)]
Expand All @@ -63,7 +71,7 @@

(testing "small db"
(let [db (small-db)
storage (make-storage)]
storage (make-storage {:stats true})]
(testing "store"
(d/store db storage)
(is (= 0 (count @(:*reads storage))))
Expand All @@ -90,7 +98,7 @@

(testing "large db"
(let [db (large-db)
storage (make-storage)]
storage (make-storage {:stats true})]

(testing "store"
(d/store db storage)
Expand Down Expand Up @@ -197,7 +205,7 @@
(is (= (:avet db) (:avet db')))))))))))

(deftest test-gc
(let [storage (make-storage)]
(let [storage (make-storage {:stats true})]
(let [db (large-db {:storage storage})]
(d/store db)
(is (= 135 (count (d/addresses db))))
Expand Down Expand Up @@ -228,7 +236,7 @@
(is (pos? (count (storage/-list-addresses storage)))))))

(deftest test-conn
(let [storage (make-storage)
(let [storage (make-storage {:stats true})
conn (d/create-conn nil {:storage storage
:branching-factor 32
:ref-type :strong})]
Expand Down Expand Up @@ -294,9 +302,75 @@
(let [conn''' (d/restore-conn storage)]
(is (= @conn'' @conn''')))))))

(defn stress-test [{:keys [time branching-factor ref-type]
:or {time 10000
branching-factor 32
ref-type :weak}}]
(println "Stress-testing storage for" time "ms")
(let [storage (make-storage {:stats false})
conn (d/create-conn
{:idx {:db/index true}
:name {:db/index true}}
{:storage storage
:branching-factor branching-factor
:ref-type ref-type})
threads (.availableProcessors (Runtime/getRuntime))
exec (Executors/newFixedThreadPool (+ 2 threads))
*running? (volatile! true)
bump (fn [db]
(let [op (:op (d/entity db 1))]
[[:db/add 1 :op (inc (or op 0))]]))
*exception (volatile! nil)]
;; transact threads
(dotimes [_ threads]
(.submit exec ^Runnable
(fn []
(try
(let [i (+ 2 (rand-int 100000))]
(d/transact! conn
[[:db.fn/call bump]
{:db/id i
:idx i
:name (str i)}]))
(catch Exception e
(vreset! *exception e)
(.printStackTrace e)))
(when @*running?
(recur)))))
;; JVM GC thread
(.submit exec ^Runnable
(fn []
(Thread/sleep (long (rand-int 100)))
(System/gc)
(when @*running?
(recur))))
;; Storage GC
(.submit exec ^Runnable
(fn []
(Thread/sleep 1000)
(d/collect-garbage storage)
(when @*running?
(recur))))
(Thread/sleep (long time))
(vreset! *running? false)
(.shutdown exec)
(println " ops:" (:op (d/entity @conn 1)))
(println " nodes:" (count @(:*disk storage)))
(println " max idx:" (->> (d/datoms @conn :avet :idx) (rseq) (first) :v))
(println " max name:" (->> (d/datoms @conn :avet :name) (rseq) (first) :v))
@*exception))

(defn -main [& {:as opts}]
(let [opts' {:time (some-> (opts "--time") parse-long)
:branching-factor (some-> (opts "--branching-factor") parse-long)
:ref-type (some-> (opts "--ref-type") keyword)}]
(when (stress-test opts')
(System/exit 1))))

; (t/test-ns *ns*)
; (t/run-test-var #'test-conn)
(comment
(t/test-ns *ns*)
(stress-test {:time 60000})
(t/run-test-var #'test-conn))

(comment
(let [serializable (with-open [is (io/input-stream (io/file "/Users/tonsky/ws/roam/db_3M.json_transit"))]
Expand Down

0 comments on commit 9bf3a6e

Please sign in to comment.