diff --git a/src/riemann/streams.clj b/src/riemann/streams.clj index f9eaab05e..cc051f4b1 100644 --- a/src/riemann/streams.clj +++ b/src/riemann/streams.clj @@ -482,6 +482,53 @@ OA (setup) (recur event)))))) +(defn part-time-simple + "Divides wall clock time into discrete windows. Returns a stream. Whenever + events arrive in a given window, calls (create) to generate some initial + state for that window, and uses (add state event) to combine events with that + state. At the end of the window, calls (finish state). + + Does not treat expired events differently; ticks will still be scheduled even + if an expired event arrives. When no events arrive in a given window, does + nothing. (create) may be invoked partway through a window, and should be + idempotent, as it will be run inside of (swap!). + + Concurrency guarantees: + + (create) may be called multiple times for a given time slice. + (add) when called, will receive exactly one distinct bucket in each time + slice. + (finish) will be called *exactly once* for each time slice." + [dt create add finish] + (let [anchor (unix-time) + ; Whether or not the next tick has been scheduled, and the current + ; window. + state (atom [nil nil]) + + ; Called every dt seconds to flush the window. + tick (fn tick [] + (let [last-window (atom nil)] + ; Reset the state to nil. + (swap! state (fn [[_ window]] + (reset! last-window window) + [nil nil])) + + ; And finalize the last window + (finish @last-window)))] + + (fn stream [event] + (let [[scheduled _] (swap! state (fn [[scheduled window]] + (if (nil? scheduled) + ; We're the first ones here. + [:first (add (create) event)] + + ; Some other thread has already + ; scheduled + [:done (add window event)])))] + (when (= :first scheduled) + ; We were the first thread to update this window. + (once! (next-tick anchor dt) tick)))))) + (defn fold-interval "Applies the folder function to all event-key values of events during interval seconds."