Skip to content

Commit

Permalink
coalesce: simplify coalesce state storage (#748)
Browse files Browse the repository at this point in the history
  • Loading branch information
pyr committed Dec 15, 2016
1 parent cf9c4f0 commit f5d9943
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 81 deletions.
1 change: 0 additions & 1 deletion src/riemann/bin.clj
Expand Up @@ -40,7 +40,6 @@
(riemann.config/validate-config @config-file)
(riemann.time/reset-tasks!)
(riemann.config/clear!)
(riemann.config/clear-stream-state!)
(riemann.pubsub/sweep! (:pubsub @riemann.config/core))
(riemann.config/include @config-file)
(riemann.config/apply!)
Expand Down
5 changes: 0 additions & 5 deletions src/riemann/config.clj
Expand Up @@ -293,11 +293,6 @@
(locking core
(reset! next-core (core/core))))

(defn clear-stream-state!
"Resets the streams states atoms"
[]
(stream-state-transition!))

(defn apply!
"Applies pending changes to the core. Transitions the current core to the
next one, and resets the next core."
Expand Down
60 changes: 30 additions & 30 deletions src/riemann/streams.clj
Expand Up @@ -1164,43 +1164,43 @@
[ok expired] (swap! past update)]
(child event (concat expired (vals ok)))))))

(def stream-state (atom {}))
(def next-stream-state (atom {}))
(defonce stream-state (atom {}))

(defn set-next-stream-state!
"Set the `next-stream-name` key in stream-state to `state`
`stream-name` must be a keyword"
(defn set-stream-state!
"Ensure that there is a stored value for a named stream.
Yields the stored value."
[stream-name state]
(swap! next-stream-state assoc stream-name state))
(get
(swap! stream-state assoc stream-name state)
stream-name))

(defn reset-stream-states!
"Reset the `stream-state` and `next-stream-state` atoms to `{}`"
"Clear previously saved stream states"
[]
(reset! stream-state {})
(reset! next-stream-state {}))

(defn stream-state-transition!
"Copy the `next-stream-state` to `stream-state`
reset the `next-stream-state` value to `{}`"
[]
(reset! stream-state @next-stream-state)
(reset! next-stream-state {}))

(defn get-or-create-stream-state
"Returns the value associated to the `stream-name` key in `stream-state` or `default-value` if the key does not exists.
Update the `next-stream-state` atom."
[stream-name default-value]
(let [state (get @stream-state stream-name default-value)]
(when stream-name (set-next-stream-state! stream-name state))
state))
(reset! stream-state {}))

(defn named-stream-state
"Get a stream by name, if no previous value existed for this
named stream, use the 0-arity constructor `ctor` to initialize it."
[stream-name ctor]
(if-let [state (get @stream-state stream-name)]
state
(set-stream-state! stream-name (ctor))))

(defn expire-stream-state
"Utility function to expire a stream state by name.
This is meant to be called after a reload if a stream has changed name,
to expire the previous one."
[stream-name]
(swap! stream-state dissoc stream-name))

(defn extract-coalesce-args
"returns a map containing the coalesce args."
[dt children]
[args children]
(cond
(number? dt) {:dt dt :children children}
(map? dt) (assoc dt :dt (:dt dt 1) :children children)
:default {:dt 1 :children (cons dt children)}))
(number? args) {:dt args :children children}
(map? args) (assoc args :dt (:dt args 1) :children children)
:default {:dt 1 :children (cons args children)}))

(defn coalesce
"Combines events over time. Coalesce remembers the most recent event for each
Expand Down Expand Up @@ -1228,8 +1228,8 @@
"
[& [dt & children]]
(let [{:keys [dt stream-name children]} (extract-coalesce-args dt children)
chm (get-or-create-stream-state stream-name
(java.util.concurrent.ConcurrentHashMap.))
ctor (fn [] (java.util.concurrent.ConcurrentHashMap.))
chm (if stream-name (named-stream-state stream-name ctor) (ctor))
callback (fn callback []
(let [es (vec (.values chm))
expired (filter expired? es)]
Expand Down
71 changes: 26 additions & 45 deletions test/riemann/streams_test.clj
Expand Up @@ -1363,38 +1363,23 @@
(is (= (set @out) #{b2 c1}))))

(deftest coalesce-state-test
(reset-stream-states!)
(let [out (atom [])
s (coalesce {:stream-name :coalesce-test} #(reset! out %))
a1 {:service :a :state "one" :time 0}
b1 {:service :b :state "one" :time 0}
c1 {:service :c :state "one" :time 0}
b2 {:service :b :state "two" :time 0}]
(s a1)
(advance! 1.1)
(is (= (set @out) (set (.values (:coalesce-test @next-stream-state))) #{a1}))
(reset-stream-states!)
(let [out (atom [])
s (coalesce {:stream-name :coalesce-test} #(reset! out %))
a1 {:service :a :state "one" :time 0}
b1 {:service :b :state "one" :time 0}]

(s b1)
(advance! 2.1)
(is (= (set @out) (set (.values (:coalesce-test @next-stream-state))) #{a1 b1})))
(s a1)
(advance! 1.1)

(stream-state-transition!) ;; simulate riemann reload
(testing "named stream coalesce stored values"
(is (= (set @out) (set (.values (:coalesce-test @stream-state))) #{a1}))

(let [out (atom [])
s (coalesce {:stream-name :coalesce-test} #(reset! out %))
a1 {:service :a :state "one" :time 0}
b1 {:service :b :state "one" :time 0}
a2 {:service :a :state "two" :time 3 :ttl 2}
c1 {:service :c :state "one" :time 0}
b2 {:service :b :state "two" :time 0}]
(s b1)
(advance! 2.1)
(is (= (set @out) (set (.values (:coalesce-test @stream-state))) #{a1 b1})))

(is (= (set (.values (:coalesce-test @stream-state))) #{a1 b1}))
(is (= (set (.values (:coalesce-test @next-stream-state))) #{a1 b1}))

(s a2)
(advance! 3.1)
(is (= (set @out) #{a2 b1} (set (.values (:coalesce-test @next-stream-state))))))
(reset-stream-states!))
(reset-stream-states!))) ;; cleanup

(deftest stable-test
; Doesn't emit until dt seconds have passed.
Expand Down Expand Up @@ -1736,29 +1721,25 @@

(deftest set-next-stream-state-test
(reset-stream-states!)
(set-next-stream-state! :foo "bar")
(is (= @next-stream-state {:foo "bar"}))
(set-next-stream-state! :foo "baz")
(is (= @next-stream-state {:foo "baz"}))
(set-next-stream-state! :bar "baz")
(is (= @next-stream-state {:foo "baz" :bar "baz"}))
(set-stream-state! :foo "bar")
(is (= @stream-state {:foo "bar"}))
(set-stream-state! :foo "baz")
(is (= @stream-state {:foo "baz"}))
(set-stream-state! :bar "baz")
(is (= @stream-state {:foo "baz" :bar "baz"}))
(reset-stream-states!))

(deftest get-or-create-stream-state-test
(reset-stream-states!)
(reset! stream-state {:foo "bar"})
(is (= (get-or-create-stream-state :foo "bar1") "bar"))
(is (= @next-stream-state {:foo "bar"}))
(is (= (get-or-create-stream-state :bar "foo") "foo"))
(is (= @next-stream-state {:foo "bar" :bar "foo"}))
(is (= (get-or-create-stream-state nil "baz") "baz"))
(is (= @next-stream-state {:foo "bar" :bar "foo"}))
(is (= (named-stream-state :foo (fn [] "bar1")) "bar"))
(is (= @stream-state {:foo "bar"}))
(is (= (named-stream-state :bar (fn [] "foo")) "foo"))
(is (= @stream-state {:foo "bar" :bar "foo"}))
(reset-stream-states!))

(deftest stream-state-transition-test
(reset-stream-states!)
(reset! next-stream-state {:foo "bar" :baz "foo"})
(stream-state-transition!)
(is (= @next-stream-state {}))
(is (= @stream-state {:foo "bar" :baz "foo"}))
(reset-stream-states!))
(reset! stream-state {:foo "bar" :baz "foo"})
(reset-stream-states!)
(is (= @stream-state {})))

0 comments on commit f5d9943

Please sign in to comment.