Skip to content

Commit

Permalink
Improved sampling implementation by using frequency-based random samp…
Browse files Browse the repository at this point in the history
…ling.
  • Loading branch information
sbtourist committed Apr 20, 2012
1 parent 0f8ba19 commit 362e780
Show file tree
Hide file tree
Showing 6 changed files with 81 additions and 46 deletions.
5 changes: 3 additions & 2 deletions .gitignore
Expand Up @@ -10,5 +10,6 @@ test.sh
nimrod.properties
/.lein-failures
/target/
/nimrod-data/
/nimrod.conf
/nimrod-data*
/nimrod.conf
/.tmp.zipeg.1333558157188866000/
2 changes: 1 addition & 1 deletion nimrod2.conf
Expand Up @@ -6,6 +6,6 @@ store {
"defrag.limit" : 3
}
sampling {
"test" : 10
"test.frequency" : 10
}
}
14 changes: 7 additions & 7 deletions src/nimrod/core/stat.clj
Expand Up @@ -11,16 +11,16 @@
(util/unrationalize (/ (+ previous-variance (* (- value previous-average) (- value current-average))) (dec samples)))
0))

(defn median [total index-fn]
(defn median [total read-fn]
(if (odd? total)
(index-fn (Math/round (/ total 2.0)))
(util/unrationalize (/ (+ (index-fn (inc (/ total 2))) (index-fn (/ total 2))) 2))))
(read-fn (Math/round (/ total 2.0)))
(util/unrationalize (/ (+ (read-fn (inc (/ total 2))) (read-fn (/ total 2))) 2))))

(defn percentiles [total percentages index-fn]
(defn percentiles [total percentages read-fn]
(into {}
(for [p percentages]
(cond
(and (> p 0) (< p 100)) (let [rank (Math/round (+ (* (/ p 100) total) 0.5))] [(keyword (str p "th")) (index-fn rank)])
(= 0 p) [(keyword (str p "th")) (index-fn 1)]
(= 100 p) [(keyword (str p "th")) (index-fn total)]
(and (> p 0) (< p 100)) (let [rank (Math/round (+ (* (/ p 100) total) 0.5))] [(keyword (str p "th")) (read-fn rank)])
(= 0 p) [(keyword (str p "th")) (read-fn 1)]
(= 100 p) [(keyword (str p "th")) (read-fn total)]
:else (throw (IllegalArgumentException. (str "Out of bounds percentile: " p)))))))
72 changes: 52 additions & 20 deletions src/nimrod/core/store.clj
Expand Up @@ -18,15 +18,16 @@
(defonce default-cache-entries 1000)
(defonce default-cache-results 1000000)
(defonce default-defrag-limit 0)
(defonce default-sampling-threshold 1)
(defonce default-sampling-factor 10)
(defonce default-sampling-frequency 0)

(defonce default-age oneMinute)

(defn- to-json [v]
(json/parse-smile v true (fn [_] #{})))

(defn- sample? [old-value new-value threshold]
(not (= (int (/ old-value threshold)) (int (/ new-value threshold)))))
(defn- do-sampling? [samples sampling-frequency]
(and (not (zero? sampling-frequency)) (= samples sampling-frequency)))

(defprotocol Store
(init [this])
Expand Down Expand Up @@ -54,9 +55,11 @@
(sql/with-connection connection-factory
(sql/transaction
(sql/do-prepared
"CREATE CACHED TABLE metrics (ns LONGVARCHAR, type LONGVARCHAR, id LONGVARCHAR, timestamp BIGINT, raw DOUBLE, metric LONGVARBINARY, PRIMARY KEY (ns,type,id))")
"CREATE CACHED TABLE metrics (ns LONGVARCHAR, type LONGVARCHAR, id LONGVARCHAR, seq BIGINT, timestamp BIGINT, raw DOUBLE, metric LONGVARBINARY, PRIMARY KEY (ns,type,id))")
(sql/do-prepared
"CREATE CACHED TABLE history (ns LONGVARCHAR, type LONGVARCHAR, id LONGVARCHAR, timestamp BIGINT, seq BIGINT GENERATED ALWAYS AS IDENTITY, raw DOUBLE, metric LONGVARBINARY, PRIMARY KEY (ns,type,id,timestamp,seq))")))
"CREATE CACHED TABLE history (ns LONGVARCHAR, type LONGVARCHAR, id LONGVARCHAR, seq BIGINT, timestamp BIGINT, raw DOUBLE, metric LONGVARBINARY, PRIMARY KEY (ns,type,id,seq))")
(sql/do-prepared
"CREATE INDEX history_idx ON history (ns,type,id,timestamp)")))
(catch Exception ex))
(sql/with-connection connection-factory
(sql/transaction
Expand All @@ -69,31 +72,60 @@
["SELECT * FROM metrics"]
(doseq [metric all-metrics]
(dosync
(alter memory assoc-in [(metric :ns) (metric :type) (metric :id) :raw] (metric :raw))
(alter memory assoc-in [(metric :ns) (metric :type) (metric :id) :metric] (to-json (metric :metric)))))))))
(alter memory assoc-in [(metric :ns) (metric :type) (metric :id) :seq] (metric :seq))
(alter memory assoc-in [(metric :ns) (metric :type) (metric :id) :metric] (to-json (metric :metric)))
(alter memory assoc-in [(metric :ns) (metric :type) (metric :id) :samples] 0)))))))

(set-metric [this metric-ns metric-type metric-id metric raw]
(let [old-raw-value (get-in @memory [metric-ns metric-type metric-id :raw])
(let [current-seq-value (or (get-in @memory [metric-ns metric-type metric-id :seq]) 0)
current-samples (or (get-in @memory [metric-ns metric-type metric-id :samples]) 0)
new-seq-value (inc current-seq-value)
new-samples (inc current-samples)
new-raw-value raw
new-json-metric (json/generate-smile metric)
new-metric-timestamp (metric :timestamp)
sampling-threshold (or
(sampling metric-ns)
(sampling (str metric-ns "." metric-type))
(sampling (str metric-ns "." metric-type "." metric-id))
default-sampling-threshold)]
sampling-factor (or
(sampling (str metric-ns "." metric-type "." metric-id ".factor"))
(sampling (str metric-ns "." metric-type ".factor"))
(sampling (str metric-ns ".factor"))
default-sampling-factor)
sampling-frequency (or
(sampling (str metric-ns "." metric-type "." metric-id ".frequency"))
(sampling (str metric-ns "." metric-type ".frequency"))
(sampling (str metric-ns ".frequency"))
default-sampling-frequency)]
; Increment sequence prior to actually using it to avoid duplicated entries:
(dosync
(alter memory assoc-in [metric-ns metric-type metric-id :seq] new-seq-value))
; Insert metric/history:
(sql/with-connection connection-factory
(sql/transaction
(sql/update-or-insert-values
"metrics"
["ns=? AND type=? AND id=?" metric-ns metric-type metric-id]
{"ns" metric-ns "type" metric-type "id" metric-id "timestamp" new-metric-timestamp "raw" raw "metric" new-json-metric})
(when (or (nil? old-raw-value) (sample? old-raw-value raw sampling-threshold))
(sql/insert-record
"history"
{"ns" metric-ns "type" metric-type "id" metric-id "timestamp" new-metric-timestamp "raw" raw "metric" new-json-metric}))))
{"ns" metric-ns "type" metric-type "id" metric-id "timestamp" new-metric-timestamp "seq" new-seq-value "raw" new-raw-value "metric" new-json-metric})
(sql/insert-record
"history"
{"ns" metric-ns "type" metric-type "id" metric-id "timestamp" new-metric-timestamp "seq" new-seq-value "raw" new-raw-value "metric" new-json-metric})))
(dosync
(alter memory assoc-in [metric-ns metric-type metric-id :raw] raw)
(alter memory assoc-in [metric-ns metric-type metric-id :metric] metric))))
(alter memory assoc-in [metric-ns metric-type metric-id :metric] metric))
; Optionally sample:
(sql/with-connection connection-factory
(sql/transaction
(if (do-sampling? new-samples sampling-frequency)
(do
(loop [record new-seq-value samples new-samples to-delete (- new-samples (/ new-samples sampling-factor))]
(if ( <= (int (* samples (rand))) to-delete)
(do
(sql/delete-rows "history"
["ns=? AND type=? AND id=? AND seq=?"
metric-ns metric-type metric-id record])
(when (> (dec to-delete) 0) (recur (dec record) (dec samples) (dec to-delete))))
(recur (dec record) (dec samples) to-delete)))
(dosync
(alter memory assoc-in [metric-ns metric-type metric-id :samples] 0)))
(dosync
(alter memory assoc-in [metric-ns metric-type metric-id :samples] new-samples)))))))

(remove-metric [this metric-ns metric-type metric-id]
(sql/with-connection connection-factory
Expand Down
2 changes: 1 addition & 1 deletion test/nimrod/conf/setup_test.clj
Expand Up @@ -19,4 +19,4 @@
(setup "nimrod2.conf")
(is (= "nimrod-data/db" @path))
(is (= {"cache.entries" 1 "cache.results" 2 "defrag.limit" 3} @options))
(is (= {"test" 10} @sampling)))))
(is (= {"test.frequency" 10} @sampling)))))
32 changes: 17 additions & 15 deletions test/nimrod/core/store_test.clj
Expand Up @@ -75,33 +75,33 @@
(let [metric-ns-1 "1" metric-type-1 "gauge" metric-id-1 "test"
metric-1-1 {:value 1 :timestamp 1}
metric-1-2 {:value 2 :timestamp 2}
metric-1-3 {:value 10 :timestamp 3}
metric-1-3 {:value 3 :timestamp 3}
metric-ns-2 "2" metric-type-2 "gauge" metric-id-2 "test"
metric-2-1 {:value 1 :timestamp 1}
metric-2-2 {:value 10 :timestamp 2}
metric-2-3 {:value 100 :timestamp 3}
metric-2-2 {:value 2 :timestamp 2}
metric-2-3 {:value 3 :timestamp 3}
metric-ns-3 "3" metric-type-3 "gauge" metric-id-3 "test"
metric-3-1 {:value 1 :timestamp 1}
metric-3-2 {:value 100 :timestamp 2}
metric-3-3 {:value 1000 :timestamp 3}]
metric-3-2 {:value 2 :timestamp 2}
metric-3-3 {:value 3 :timestamp 3}]
(set-metric store metric-ns-1 metric-type-1 metric-id-1 metric-1-1 1)
(set-metric store metric-ns-1 metric-type-1 metric-id-1 metric-1-2 2)
(set-metric store metric-ns-1 metric-type-1 metric-id-1 metric-1-3 10)
(set-metric store metric-ns-1 metric-type-1 metric-id-1 metric-1-3 3)
(set-metric store metric-ns-2 metric-type-2 metric-id-2 metric-2-1 1)
(set-metric store metric-ns-2 metric-type-2 metric-id-2 metric-2-2 10)
(set-metric store metric-ns-2 metric-type-2 metric-id-2 metric-2-3 100)
(set-metric store metric-ns-2 metric-type-2 metric-id-2 metric-2-2 2)
(set-metric store metric-ns-2 metric-type-2 metric-id-2 metric-2-3 3)
(set-metric store metric-ns-3 metric-type-3 metric-id-3 metric-3-1 1)
(set-metric store metric-ns-3 metric-type-3 metric-id-3 metric-3-2 100)
(set-metric store metric-ns-3 metric-type-3 metric-id-3 metric-3-3 1000)
(set-metric store metric-ns-3 metric-type-3 metric-id-3 metric-3-2 2)
(set-metric store metric-ns-3 metric-type-3 metric-id-3 metric-3-3 3)
(testing "Metric history values with sampling configured on metrics namespace"
(let [history (read-history store metric-ns-1 metric-type-1 metric-id-1 #{} nil 0 Long/MAX_VALUE)]
(is (= [metric-1-3 metric-1-1] (history :values)))))
(testing "Metric history values with sampling configured on metrics namespace and type"
(is (= 2 (count (history :values))))))
(testing "Metric history values with sampling configured on metrics namespace and type"
(let [history (read-history store metric-ns-2 metric-type-2 metric-id-2 #{} nil 0 Long/MAX_VALUE)]
(is (= [metric-2-3 metric-2-1] (history :values)))))
(is (= 2 (count (history :values))))))
(testing "Metric history values with sampling configured on metrics namespace, type and id"
(let [history (read-history store metric-ns-3 metric-type-3 metric-id-3 #{} nil 0 Long/MAX_VALUE)]
(is (= [metric-3-3 metric-3-1] (history :values)))))))
(is (= 2 (count (history :values))))))))

(defn set-and-remove-metric-history [store]
(let [metric-ns "1" metric-type "gauge" metric-id "1"
Expand Down Expand Up @@ -232,7 +232,9 @@
(set-and-read-metric-history-with-old-values-too (new-disk-store (java.io.File/createTempFile "test" "4")))
(set-and-read-metric-history-by-age (new-disk-store (java.io.File/createTempFile "test" "5")))
(set-and-read-metric-history-by-tags (new-disk-store (java.io.File/createTempFile "test" "6")))
(set-and-read-metric-history-with-sampling (new-disk-store (java.io.File/createTempFile "test" "7") {} {"1" 10 "2.gauge" 100 "3.gauge.test" 1000}))
(set-and-read-metric-history-with-sampling
(new-disk-store (java.io.File/createTempFile "test" "7") {}
{"1.factor" 2 "1.frequency" 2 "2.gauge.factor" 2 "2.gauge.frequency" 2 "3.gauge.test.factor" 2 "3.gauge.test.frequency" 2}))
(set-and-remove-metric-history (new-disk-store (java.io.File/createTempFile "test" "8")))
(set-and-remove-metric-history-by-age (new-disk-store (java.io.File/createTempFile "test" "9")))
(set-and-aggregate-metric-history (new-disk-store (java.io.File/createTempFile "test" "10")))
Expand Down

0 comments on commit 362e780

Please sign in to comment.