Skip to content

Commit

Permalink
Parallelize writing histories, and only write them once.
Browse files Browse the repository at this point in the history
  • Loading branch information
aphyr committed May 16, 2016
1 parent bdf3bb3 commit 1c30e3b
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 9 deletions.
4 changes: 2 additions & 2 deletions jepsen/src/jepsen/core.clj
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,7 @@
:sessions)]

(info "Run complete, writing")
(when (:name test) (store/save! test))
(when (:name test) (store/save-1! test))

(info "Analyzing")
(let [test (assoc test :results (checker/check-safe
Expand All @@ -435,5 +435,5 @@
(:history test)))]

(info "Analysis complete")
(when (:name test) (store/save! test))
(when (:name test) (store/save-2! test))
test)))))))))))))
20 changes: 15 additions & 5 deletions jepsen/src/jepsen/store.clj
Original file line number Diff line number Diff line change
Expand Up @@ -226,8 +226,7 @@
(defn write-history!
"Writes out a history.txt file."
[test]
(with-out-file test "history.txt"
(util/print-history (:history test))))
(util/pwrite-history! (path! test "history.txt") (:history test)))

(defn write-fressian!
"Write the entire test as a .fressian file"
Expand All @@ -237,11 +236,22 @@
out (fress/create-writer file :handlers write-handlers)]
(fress/write-object out test))))

(defn save!
"Writes a test to disk and updates latest symlinks. Returns test."
(defn save-1!
"Writes a history and fressian file to disk and updates latest symlinks.
Returns test."
[test]
(->> [(future (write-history! test))
(future (write-fressian! test))]
(map deref)
dorun)
(update-symlinks! test)
test)

(defn save-2!
"Phase 2: after computing results, we re-write the fressian file and also
dump results as edn. Returns test."
[test]
(->> [(future (write-results! test))
(future (write-history! test))
(future (write-fressian! test))]
(map deref)
dorun)
Expand Down
80 changes: 78 additions & 2 deletions jepsen/src/jepsen/util.clj
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,19 @@
[clojure.string :as str]
[clojure.pprint :refer [pprint]]
[clojure.walk :as walk]
[clojure.java.io :as io]
[clj-time.core :as time]
[clj-time.local :as time.local]
[clojure.tools.logging :refer [debug info warn]]
[knossos.history :as history])
(:import (java.util.concurrent.locks LockSupport)))
(:import (java.util.concurrent.locks LockSupport)
(java.io File
RandomAccessFile)))

(defn processors
"How many processors on this platform?"
[]
(.. Runtime getRuntime availableProcessors))

(defn majority
"Given a number, returns the smallest integer strictly greater than half."
Expand All @@ -36,6 +44,35 @@
(let [t (time.local/local-now)]
(time/minus t (time/millis (time/milli t)))))

(defn chunk-vec
"Partitions a vector into reducibles of size n (somewhat like partition-all)
but uses subvec for speed.
(chunk-vec 2 [1]) ; => ([1])
(chunk-vec 2 [1 2 3]) ; => ([1 2] [3])"
([^long n v]
(let [c (count v)]
(->> (range 0 c n)
(map #(subvec v % (min c (+ % n))))))))

(def buf-size 1048576)

(defn concat-files!
"Appends contents of all fs, writing to out. Returns fs."
[out fs]
(with-open [oc (.getChannel (RandomAccessFile. (io/file out) "rw"))]
(doseq [f fs]
(with-open [fc (.getChannel (RandomAccessFile. (io/file f) "r"))]
(let [size (.size fc)]
(loop [position 0]
(when (< position size)
(recur (+ position (.transferTo fc
position
(min (- size position)
buf-size)
oc)))))))))
fs)

(defn op->str
"Format an operation as a string."
[op]
Expand All @@ -46,11 +83,50 @@
(when-let [err (:error op)]
(str \tab err))))

(defn prn-op
"Prints an operation to the console."
[op]
(pr (:process op)) (print \tab)
(pr (:type op)) (print \tab)
(pr (:f op)) (print \tab)
(pr (:value op))
(when-let [err (:error op)]
(print \tab) (print err))
(print \newline))

(defn print-history
"Prints a history to the console."
[history]
(doseq [op history]
(println (op->str op))))
(prn-op op)))

(defn write-history!
"Writes a history to a file."
[f history]
(with-open [w (io/writer f)]
(binding [*out* w]
(print-history history))))

(defn pwrite-history!
"Writes history, taking advantage of more cores."
[f history]
(if (or (< (count history) 16384) (not (vector? history)))
; Plain old write
(write-history! f history)
; Parallel variant
(let [chunks (chunk-vec (Math/ceil (/ (count history) (processors)))
history)
files (repeatedly (count chunks)
#(File/createTempFile "jepsen-history" ".part"))]
(try
(->> chunks
(map (fn [file chunk] (future (write-history! file chunk) file))
files)
doall
(map deref)
(concat-files! f))
(finally
(doseq [f files] (.delete f)))))))

(defn log-op
"Logs an operation and returns it."
Expand Down

0 comments on commit 1c30e3b

Please sign in to comment.