Lifecycles are a feature that allow you to control code that executes at particular points during task execution on each peer. Lifecycles are data driven and composable.
There are several interesting points to execute arbitrary code during a task in Onyx. Onyx lets you plug in and calls functions before a task, after a task, before a batch, and after a batch on every peer. Additionally, there is another lifecycle hook that allows you to delay starting a task in case you need to do some work like acquiring a lock under contention. A peer’s lifecycle is isolated to itself, and lifecycles never coordinate across peers. Usage of lifecycles are entirely optional. Lifecycle data is submitted as a data structure at job submission time.
A function that takes two arguments - an event map, and the matching lifecycle map. Must return a boolean value indicating whether to start the task or not. If false, the process backs off for a preconfigured amount of time and calls this task again. Useful for lock acquisition. This function is called prior to any processes inside the task becoming active.
A function that takes two arguments - an event map, and the matching lifecycle map. Must return a map that is merged back into the original event map. This function is called after processes in the task are launched, but before the peer listens for incoming segments from other peers.
A function that takes two arguments - an event map, and the matching lifecycle map. Must return a map that is merged back into the original event map. This function is called prior to receiving a batch of segments from the reading function.
A function that takes two arguments - an event map, and the matching
lifecycle map. Must return a map that is merged back into the original
event map. This function is called immediately after a batch of segments
has been read by the peer. The segments are available in the event map
by the key :onyx.core/batch
.
A function that takes two arguments - an event map, and the matching
lifecycle map. Must return a map that is merged back into the original
event map. This function is called after the :onyx/fn
has been applied to the
input segments and created output segments.
A function that takes two arguments - an event map, and the matching lifecycle map. Must return a map that is merged back into the original event map. This function is called before the peer relinquishes its task. No more segments will be received.
If an exception is thrown during any lifecycle execution except
after-task-stop
, one or more lifecycle handlers may be defined. If
present, the exception will be caught and passed to this function. See
the details on the
Onyx
cheat sheet.
Let’s work with an example to show how lifecycles work. Suppose you want to print out a message at all the possible lifecycle hooks. You’d start by defining 9 functions for the 9 hooks:
(ns my.ns)
(defn start-task? [event lifecycle]
(println "Executing once before the task starts.")
true)
(defn before-task-start [event lifecycle]
(println "Executing once before the task starts.")
{})
(defn after-task-stop [event lifecycle]
(println "Executing once after the task is over.")
{})
(defn before-batch [event lifecycle]
(println "Executing once before each batch.")
{})
(defn after-read-batch [event lifecycle]
(println "Executing once after this batch has been read.")
{})
(defn after-apply-fn [event lifecycle]
(println "Executing once after the onyx/fn has been called on the input segments.")
{})
(defn after-batch [event lifecycle]
(println "Executing once after each batch.")
{})
(defn handle-exception [event lifecycle lifecycle-phase e]
(println "Caught exception: " e)
(println "Returning :restart, indicating that this task should restart.")
:restart)
Notice that all lifecycle functions return maps except start-task?
.
This map is merged back into the event
parameter that you received.
start-task?
is a boolean function that allows you to block and back
off if you don’t want to start the task quite yet. This function will be
called periodically as long as false
is returned. If more than one
start-task?
is specified in your lifecycles, they must all return
true
for the task to begin. start-task?
is invoked before
before-task-start
.
Next, define a map that wires all these functions together by mapping predefined keywords to the functions:
(def calls
{:lifecycle/start-task? start-task?
:lifecycle/before-task-start before-task-start
:lifecycle/before-batch before-batch
:lifecycle/after-read-batch after-read-batch
:lifecycle/after-apply-fn after-apply-fn
:lifecycle/after-batch after-batch
:lifecycle/after-task-stop after-task-stop
:lifecycle/handle-exception handle-exception})
Each of these 9 keys maps to a function. All of these keys are optional, so you can mix and match depending on which functions you actually need to use.
Finally, create a lifecycle data structure by pointing
:lifecycle/calls
to the fully qualified namespaced keyword that
represents the calls map that we just defined. Pass it to your
onyx.api/submit-job
call:
(def lifecycles
[{:lifecycle/task :my-task-name-here
:lifecycle/calls :my.ns/calls
:lifecycle/doc "Test lifecycles and print a message at each stage"}])
(onyx.api/submit-job
peer-config
{
...
:lifecycles lifecycles
...
}
It is also possible to have a lifecycle apply to every task in a
workflow by specifying :lifecycle/task :all
. This is useful for
instrumenting your tasks with metrics, error handling, or debugging
information.
(def lifecycles
[{:lifecycle/task :all
:lifecycle/calls :my.ns/add-metrics
:lifecycle/doc "Instruments all tasks in a workflow with the example function 'add-metrics'"}])
You can supply as many sets of lifecycles as you want. They are invoked in the order that they are supplied in the vector, giving you a predictable sequence of calls. Be sure that all the keyword symbols and functions are required onto the classpath for the peer that will be executing them.
Tip
|
Example project: lifecycles |