From 7a297aaf6a9c2917bace4e8886e157fd0d185509 Mon Sep 17 00:00:00 2001 From: Kyle Kingsbury Date: Mon, 1 Apr 2013 21:16:25 -0700 Subject: [PATCH] WIP: streams/part-time-simple. Intended for wall-clock timed reducers which should *not* use periodically-until-expired. No tests yet; I need to write tests and have it take over for throttle, rollup, and friends. --- src/riemann/streams.clj | 47 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 47 insertions(+) diff --git a/src/riemann/streams.clj b/src/riemann/streams.clj index f9eaab05e..cc051f4b1 100644 --- a/src/riemann/streams.clj +++ b/src/riemann/streams.clj @@ -482,6 +482,53 @@ OA (setup) (recur event)))))) +(defn part-time-simple + "Divides wall clock time into discrete windows. Returns a stream. Whenever + events arrive in a given window, calls (create) to generate some initial + state for that window, and uses (add state event) to combine events with that + state. At the end of the window, calls (finish state). + + Does not treat expired events differently; ticks will still be scheduled even + if an expired event arrives. When no events arrive in a given window, does + nothing. (create) may be invoked partway through a window, and should be + idempotent, as it will be run inside of (swap!). + + Concurrency guarantees: + + (create) may be called multiple times for a given time slice. + (add) when called, will receive exactly one distinct bucket in each time + slice. + (finish) will be called *exactly once* for each time slice." + [dt create add finish] + (let [anchor (unix-time) + ; Whether or not the next tick has been scheduled, and the current + ; window. + state (atom [nil nil]) + + ; Called every dt seconds to flush the window. + tick (fn tick [] + (let [last-window (atom nil)] + ; Reset the state to nil. + (swap! state (fn [[_ window]] + (reset! last-window window) + [nil nil])) + + ; And finalize the last window + (finish @last-window)))] + + (fn stream [event] + (let [[scheduled _] (swap! state (fn [[scheduled window]] + (if (nil? scheduled) + ; We're the first ones here. + [:first (add (create) event)] + + ; Some other thread has already + ; scheduled + [:done (add window event)])))] + (when (= :first scheduled) + ; We were the first thread to update this window. + (once! (next-tick anchor dt) tick)))))) + (defn fold-interval "Applies the folder function to all event-key values of events during interval seconds."