This section discusses state management and fault tolerance used in windowing/streaming joins.
Onyx provides the ability to perform updates to a state machine for segments which are calculated over windows. For example, a grouping task may accumulate incoming values for a number of keys over windows of 5 minutes. This feature is commonly used for aggregations, such as summing values, though it can be used to build more complex state machines.
;; Task definition
{:onyx/name :sum-all-ages
:onyx/fn :clojure.core/identity
:onyx/type :function
:onyx/group-by-key :name
:onyx/flux-policy :recover
:onyx/min-peers 2
:onyx/batch-size 20}
;; Window definition
{:window/id :sum-all-ages-window
:window/task :sum-all-ages
:window/type :global
:window/aggregation [:your-sum-ns/sum :age]
:window/doc "Adds the :age key in all segments in a global window."}
As segments are processed, an internal state within the calculated window is updated. In this case we are trying to sum the ages of the incoming segments.
The :sum-all-ages
window definition referenced above, contains a
:window/aggregation
map. These window aggregation maps are defined
by containing the following keys (see the cheat-sheet for more information):
Key | Optional? | Description |
---|---|---|
|
true |
Fn (window) to initialise the state. |
|
false |
Fn (window, segment) to generate a serializable state machine update. |
|
false |
Fn (window, state, entry) to apply state machine update entry to a state. |
|
true |
Fn (window, state-1, state-2) to combine two states in the case of two windows being merged. |
The :window/aggregation
keys should map to corresponding functions.
This example shows those function definitions, paired with the
:window/aggregation
keys, and bound to the ::sum
aggregation referenced above.
(ns your-sum-ns)
(defn sum-init-fn [window]
0)
;; Given the example input in the next section, the below segment shape (over a kafka transport) would look something like this.
;; {:serialized-key-size 36,
;; :key "70144dea-cdd1-443d-9e7f-55cc5d0928d7",
;; :offset 0,
;; :serialized-value-size 22,
;; :partition 0,
;; :timestamp 1514680072539,
;; :message {:age 49, :name "John"}}
(defn sum-aggregation-fn [window segment]
(let [k (-> segment :message :age)]
{:value k}))
;; Now just pull out the value and add it to the previous state
(defn sum-application-fn [window state value]
(+ state (:value value)))
;; sum aggregation referenced in the window definition.
(def sum
{:aggregation/init sum-init-fn
:aggregation/create-state-update sum-aggregation-fn
:aggregation/apply-state-update sum-application-fn})
Let’s try processing some example segments using this aggregation:
[{:name "John" :age 49}
{:name "Madeline" :age 55}
{:name "Geoffrey" :age 14}]
Results in the following events:
Action | Result |
---|---|
Initial state |
|
Incoming segment |
|
Changelog entry |
|
Applied to state |
|
Incoming segment |
|
Changelog entry |
|
Applied to state |
|
Incoming segment |
|
Changelog entry |
|
Applied to state |
|
This state can be emitted via triggers or another mechanism. By
describing changelog updates as a vector with a log command, such as
:set-value
aggregation function can emit multiple types of state
transition if necessary.
To allow for full recovery after peer crashes, the window and trigger states must be checkpointed.
A consistent snapshot is performed over the cluster every :onyx.peer/coordinator-barrier-period-ms
ms.
Whenever a change to the cluster allocation occurs, this state snapshot is recovered from durable storage.
Storage can be configured via the peer-config.
The ZooKeeper window storage choice should not be used in production, unless paired with a periodic call to onyx.api/gc-checkpoints
.
As checkpoints will only accrete, typical production use should incorporate
onyx.api/gc-checkpoints
, and onyx.api/clear-checkpoints
when any
checkpoints will not be used to transfer state from one job to another via
resume-points.
Exactly once side-effects resulting from a segment being processed may occur, as exactly once side-effects are impossible to achieve. Onyx guarantees that a window state updates resulting from a segment are performed exactly once, however any side-effects that occur as a result of the segment being processed cannot be guaranteed to only occur once.