Skip to content

Commit

Permalink
Replace refs in moving-time-window by an atom
Browse files Browse the repository at this point in the history
  • Loading branch information
mcorbin committed Apr 28, 2017
1 parent fe40f86 commit f2d33cb
Showing 1 changed file with 23 additions and 18 deletions.
41 changes: 23 additions & 18 deletions src/riemann/streams.clj
Expand Up @@ -298,25 +298,30 @@
Events without times accrue in the current window."
[n & children]
(let [cutoff (ref 0)
buffer (ref [])]
(let [state (atom {:cutoff 0
:buffer []
:send? true})]
(fn stream [event]
(let [events (dosync
; Compute minimum allowed time
(let [cutoff (alter cutoff max (- (get event :time 0) n))]
(when (or (nil? (:time event))
(< cutoff (:time event)))
; This event belongs in the buffer, and our cutoff may
; have changed.
(alter buffer conj event)
(alter buffer
(fn alter [events]
(vec (filter
(fn [e] (or (nil? (:time e))
(< cutoff (:time e))))
events)))))))]
(when events
(call-rescue events children))))))
(let [result (swap!
state
(fn [{:keys [cutoff buffer]}]
; Compute minimum allowed time
(let [cutoff (max cutoff (- (get event :time 0) n))
send? (or (nil? (:time event))
(< cutoff (:time event)))
buffer (if send?
; This event belongs in the buffer,
; and our cutoff may have changed.
(vec (filter
(fn [e] (or (nil? (:time e))
(< cutoff (:time e))))
(conj buffer event)))
buffer)]
{:cutoff cutoff
:buffer buffer
:send? send?})))]
(when (:send? result)
(call-rescue (:buffer result) children))))))

(defn- fixed-time-window-fn
"A fixed window over the event stream in time. Emits vectors of events, such
Expand Down

0 comments on commit f2d33cb

Please sign in to comment.