From f2d33cb630d320e5a9d72dca83cd098ac1693b48 Mon Sep 17 00:00:00 2001 From: mcorbin Date: Fri, 28 Apr 2017 11:30:45 +0200 Subject: [PATCH] Replace refs in moving-time-window by an atom --- src/riemann/streams.clj | 41 +++++++++++++++++++++++------------------ 1 file changed, 23 insertions(+), 18 deletions(-) diff --git a/src/riemann/streams.clj b/src/riemann/streams.clj index 4873a29c9..5e020a7a9 100644 --- a/src/riemann/streams.clj +++ b/src/riemann/streams.clj @@ -298,25 +298,30 @@ Events without times accrue in the current window." [n & children] - (let [cutoff (ref 0) - buffer (ref [])] + (let [state (atom {:cutoff 0 + :buffer [] + :send? true})] (fn stream [event] - (let [events (dosync - ; Compute minimum allowed time - (let [cutoff (alter cutoff max (- (get event :time 0) n))] - (when (or (nil? (:time event)) - (< cutoff (:time event))) - ; This event belongs in the buffer, and our cutoff may - ; have changed. - (alter buffer conj event) - (alter buffer - (fn alter [events] - (vec (filter - (fn [e] (or (nil? (:time e)) - (< cutoff (:time e)))) - events)))))))] - (when events - (call-rescue events children)))))) + (let [result (swap! + state + (fn [{:keys [cutoff buffer]}] + ; Compute minimum allowed time + (let [cutoff (max cutoff (- (get event :time 0) n)) + send? (or (nil? (:time event)) + (< cutoff (:time event))) + buffer (if send? + ; This event belongs in the buffer, + ; and our cutoff may have changed. + (vec (filter + (fn [e] (or (nil? (:time e)) + (< cutoff (:time e)))) + (conj buffer event))) + buffer)] + {:cutoff cutoff + :buffer buffer + :send? send?})))] + (when (:send? result) + (call-rescue (:buffer result) children)))))) (defn- fixed-time-window-fn "A fixed window over the event stream in time. Emits vectors of events, such