diff --git a/src/riemann/streams.clj b/src/riemann/streams.clj index 4873a29c9..5e020a7a9 100644 --- a/src/riemann/streams.clj +++ b/src/riemann/streams.clj @@ -298,25 +298,30 @@ Events without times accrue in the current window." [n & children] - (let [cutoff (ref 0) - buffer (ref [])] + (let [state (atom {:cutoff 0 + :buffer [] + :send? true})] (fn stream [event] - (let [events (dosync - ; Compute minimum allowed time - (let [cutoff (alter cutoff max (- (get event :time 0) n))] - (when (or (nil? (:time event)) - (< cutoff (:time event))) - ; This event belongs in the buffer, and our cutoff may - ; have changed. - (alter buffer conj event) - (alter buffer - (fn alter [events] - (vec (filter - (fn [e] (or (nil? (:time e)) - (< cutoff (:time e)))) - events)))))))] - (when events - (call-rescue events children)))))) + (let [result (swap! + state + (fn [{:keys [cutoff buffer]}] + ; Compute minimum allowed time + (let [cutoff (max cutoff (- (get event :time 0) n)) + send? (or (nil? (:time event)) + (< cutoff (:time event))) + buffer (if send? + ; This event belongs in the buffer, + ; and our cutoff may have changed. + (vec (filter + (fn [e] (or (nil? (:time e)) + (< cutoff (:time e)))) + (conj buffer event))) + buffer)] + {:cutoff cutoff + :buffer buffer + :send? send?})))] + (when (:send? result) + (call-rescue (:buffer result) children)))))) (defn- fixed-time-window-fn "A fixed window over the event stream in time. Emits vectors of events, such