Skip to content

Commit

Permalink
lifecycle of an ack
Browse files Browse the repository at this point in the history
  • Loading branch information
Philip (flip) Kromer committed Jun 24, 2013
1 parent 30017c8 commit d036c98
Show file tree
Hide file tree
Showing 2 changed files with 334 additions and 1 deletion.
329 changes: 328 additions & 1 deletion 88-storm-lifecycle_of_a_record.asciidoc
Expand Up @@ -11,7 +11,7 @@ For the following, assume:
* Our goal is to get this to be purely CPU-bound, with as much of that CPU as possible dedicated to the wukong-bolt processing.
* There is effectively no skew in the final group-by stage -- each ES+State bolt gets a uniform fraction of elements from each batch.
==== Steps
==== Components

* **worker**
- jvm process launched by the supervisor (the `storm_worker` process)
Expand All @@ -37,6 +37,333 @@ For the following, assume:
- Each executor is responsible for one bolt
- so with 3 kafka spouts on a worker, there are three executors spouting


==== Storm Transport

Each executor (bolt or spout) has two disruptor queues: its 'send queue' (the individual tuples it emits) and its 'receive queue' (batches of tuples staged for processing)footnote:[It might seem odd that the spout has a receive queue, but much of storm's internal bookkeeping is done using tuples -- there's actually a regular amount of traffic sent to each spout].

===== Disruptor Queue

At the heart

===== Spout Tuple Handling

* If the spout executor's async-loop decides conditions are right, it calls the spout's `nextTuple()` method.
* The spout can then emit zero, one or many tuples, which the emitter publishes non-blocking into the spout's executor send queue (see below for details).
* Each executor send queue (spout or bolt) has an attached router (`transfer-fn`). In an infinite loop, it
- lays claim to all messages currently in the queue (everything between its last-read position and the write head), and loads them into a local tuple-batch.
- sorts tuples into two piles: local ones, destined for tasks on this worker; and remote ones, destined for tasks on other workers.
- all the remote tuples are published (blocking) as a single batch into the worker's transfer queue; they'll be later sent over the network each to the appropriate worker
- the router regroups the tuples by task, and publishes (blocking) each tuple-batch into that task's executor receive buffer.
Note that the executor send queue holds individual _tuples_, where as the worker transfer queue and executor receive queues hold _collections of tuples_. An executor send queue size of 1024 slots with an executor receive queue size of 2048 slots means there won't ever be more than `2048 * 1024` tuples waiting for that executor to process. It's important also to recognize that, although the code uses the label `tuple-batch` for these collections of tuples, they have nothing to do with the higher-level concept of a 'Trident batch' you'll meet later.

===== Bolt Tuple Handling



===== Worker Transfer and Receive Handlers


Unlike the transfer and the executor queues, the worker's receive buffer is a ZeroMQ construct, not a disruptor queue

===== Acking

When a tuple is born in the spout,

* creates a `root-id` -- this will identify the tuple tree. Let's say it had the value `3`.
* for all the places the tuple will go, makes an `edge-id` (`executor.clj:465`)
- set the ack tree as `{ root_id: edge_id }`. Say the tuple was to be sent to three places; it would call `out_tuple(... {3: 100})`, `out_tuple(... {3: 101})`, `out_tuple(... {3: 102})`.
* XORs all the edge_id's together to form a partial checksum: `100 ^ 101 ^ 102`.
* sends an `init_stream` tuple to the acker as `root_id, partial_checksum, spout_id`
* the tuple's `ack val` starts at zero.

When a tuple is sent from a bolt, it claims one or more anchors (the tuples it came from), and one or more destination task ids.


===== Acker Walkthrough

When a tuple is born in the spout,

* creates a `root-id` -- this will identify the tuple tree. Let's say it had the value `3`.
* for all the places the tuple will go, makes an `edge-id` (`executor.clj:465`)
- set the ack tree as `{ root_id: edge_id }`. Say the tuple was to be sent to three places; it would call `out_tuple(... {3: 100})`, `out_tuple(... {3: 101})`, `out_tuple(... {3: 102})`.
* XORs all the edge_id's together to form a partial checksum: `100 ^ 101 ^ 102`.
* sends an `init_stream` tuple to the acker as `root_id, partial_checksum, spout_id`
* the tuple's `ack val` starts at zero.

When a tuple is sent from a bolt, it claims one or more anchors (the tuples it came from), and one or more destination task ids.

[[acker_lifecycle_simple]]
.Acker Lifecycle: Simple
[cols="1*<.<d,1*<.<d,1*<.<d",options="header"]
|=======
| Event | Tuples | Acker Tree
| spout emits one tuple to bolt-0 | noah: `<~, { noah: a }>` |
| spout sends an acker-init tuple, seeding the ack tree with `noah: a`
| | `{ noah: a }`
| bolt-0 emits two tuples to bolt-1 anchored on `noah`. Those new tuples each create an edge-id for each anchor, which is XORed into the anchor's `ackVal` and used in the new tuple's message-id.
| shem: `<~, { noah: b }>` +
ham: `<~, { noah: c }>` +
noah: `<b^c, { noah: a }>` |
| bolt-0 acks acks `noah` using the XOR of its ackVal and tuple tree: `noah: a^b^c`. Since `a^a^b^c = b^c`, this clears off the key `a`, but implicates the keys `b` and `c` -- the tuple tree remains incomplete.
| | `{ noah: b^c }`
| bolt-1 processes `shem`, emits `abe` to bolt-2
| abe: `<~, { noah: d }>` +
shem: `<d, { noah: b }>` |
| bolt-1 acks `shem` with `noah: d^b` | | `{ noah: c^d }`
| bolt-1 processes `ham`, emits nothing | ham: `<~, { noah: c }>` |
| bolt-1 acks `ham` with `noah: c` | | `{ noah: d }`
| bolt-1 processes `abe`, emits nothing | abe: `<~, { noah: d }>` |
| bolt-1 acks `abe` with `noah: d` | | `{ noah: 0 }`
| acker removes noah from ledger, notifies spout
| | `{}`
| | |
| `______________________` | `______________________________` | `___________________`
|=======

We have one tuple, with many anchors, to many out-task ids.

----
hera ----v---- zeus ----v---- dione
| |
ares ---v--- aphrodite
|
+--------+--------+
phobos deimos harmonia
----

[[acker_lifecycle_complex]]
.Acker Lifecycle: Complex
|=======
| Event | Tuples | Acker Tree
| spout emits three tuples | zeus: `<~, { zeus: a }>` |
| to bolt-0 and acker-inits | hera: `<~, { hera: b }>` |
| | dione: `<~, { dione: c }>` |
| and sends acker-inits as it does so | | { zeus: `a`, hera: `b`, dione: `c` }
| ... | |
| bolt-0 emits "war" | ares: `<~, { zeus: d, hera: e }>` |
| to bolt-1 (ares) | zeus: `<d, { zeus: a }>` |
| anchored on zeus (edge id `d`) | hera: `<e, { hera: b }>` |
| and hera (edge id `e`) | dione: `<~, { dione: c }>` |
| ... | |
| bolt-0 acks hera | acks with `hera: b^e` | { zeus: `a`, hera: `e`, dione: `c` }
| ... | |
| bolt-0 emits "love" | ares: `<~, { zeus: d, hera: e }>` |
| sent to bolt-1 (aphrodite) | aphrdt: `<~, { zeus: f, hera: g }>` |
| anchored on zeus (edge id `f`) | zeus: `<d^f, { zeus: a }>` |
| and dione (edge id `g`) | hera: `<e, { hera: b }>` |
| | dione: `<g, { dione: c }>` |
| | |
| ... | |
| bolt-0 acks dione | acks with `dione: c^g` | { zeus: `a`, hera: `e`, dione: `g` }
| bolt-0 acks zeus | acks with `zeus: a^d^f` | { zeus: `d^f`, hera: `e`, dione: `g` }
| ... | |
| bolt-1 emits "strife" | phobos: `<~, { zeus: h^i, hera: h, dione: i }>` | { zeus: `d^f`, hera: `e`, dione: `g` }
| sent to bolt-2 (phobos) | ares: `<h, { zeus: d, hera: e }>` |
| and aphrodite | aphrdt: `<i, { zeus: f, dione: g }>` |
| ... | |
| and sent to bolt-3 (deimos) | phobos: `<~, { zeus: h^i, hera: h, dione: i }>` | { zeus: `d^f`, hera: `e`, dione: `g` }
| (edge ids `j`,`k`) | deimos: `<~, { zeus: j^k, hera: j, dione: k }>` |
| anchored on ares | ares: `<h^j, { zeus: d, hera: e }>` |
| | aphrdt: `<i^k, { zeus: f, dione: g }>` |
| ... | |
| bolt-1 emits "calm" | harmonia: `<0, { zeus: l^m, hera: l, dione: m }>` | { zeus: `d^f`, hera: `e`, dione: `g` }
| sent only to bolt-2 (harmonia) | phobos: `<~, { zeus: h^i, hera: h, dione: i }>` |
| (edge ids `j`,`k`) | deimos: `<~, { zeus: j^k, hera: j, dione: k }>` |
| anchored on ares | ares: `<h^j^l, { zeus: d, hera: e }>` |
| | aphrdt: `<i^k^m, { zeus: f, dione: g }>` |
| ... | |
| bolt-1 acks ares | acks `zeus: d^h^j^l, hera: `e^h^j^l` | { zeus: `f^h^j^l`, hera: `h^j^l`, dione: `g` }
| bolt-1 acks aphrodite | acks `zeus: f^i^k^m, dione: `g^i^k^m` | { zeus: `h^i^j^k^l^m`, hera: `h^j^l`, dione: `i^k^m` }
| ... | |
| bolt-2 processes phobos, emits none | phobos: `<~, { zeus: h^i, hera: h, dione: i }>` |
| bolt-2 acks phobos | acks `zeus: h^i, hera: h, dione: i` | { zeus: `j^k^l^m`, hera: `j^l`, dione: `k^m` }
| bolt-2 processes harmonia, emits none | harmonia: `<~, { zeus: l^m, hera: l, dione: m }>` |
| bolt-2 acks harmonia | acks `zeus: l^m, hera: l, dione: m` | { zeus: `j^k`, hera: `j`, dione: `k` }
| bolt-3 processes deimos, emits none | deimos: `<~, { zeus: j^k, hera: j, dione: k }>` |
| bolt-3 acks deimos | acks `zeus: j^k, hera: j, dione: k` | { zeus: `0`, hera: `0`, dione: `0` }
| ...
| acker removes them each from ledger, notifies spout | | `{ }`
|=======



Let's suppose you go to emit a tuple with anchors `aphrodite` and `ares`, destined for three different places

aphrodite: { ack_val: ~, ack_tree: { zeus: a, dione: b } }
ares: { ack_val: ~, ack_tree: { zeus: c, hera: d } }

For each anchor, generate an edge id; in this case, one for aphrodite and one for ares:

----
aphrodite: { ack_val: (e), ack_tree: { zeus: a, dione: b } }
ares: { ack_val: (f), ack_tree: { zeus: c, hera: d } }
eros: { ack_val: ~, ack_tree: { zeus: (e ^ f), dione: e, hera: f }
aphrodite: { ack_val: (e^g), ack_tree: { zeus: a, dione: b } }
ares: { ack_val: (f^h), ack_tree: { zeus: c, hera: d } }
eros: { ack_val: ~, ack_tree: { zeus: (e ^ f), dione: e, hera: f }
phobos: { ack_val: ~, ack_tree: { zeus: (g ^ h), dione: g, hera: h }
aphrodite: { ack_val: (e^g^i), ack_tree: { zeus: a, dione: b } }
ares: { ack_val: (f^h^j), ack_tree: { zeus: c, hera: d } }
eros: { ack_val: ~, ack_tree: { zeus: (e ^ f), dione: e, hera: f }
phobos: { ack_val: ~, ack_tree: { zeus: (g ^ h), dione: g, hera: h }
deimos: { ack_val: ~, ack_tree: { zeus: (i ^ j), dione: i, hera: j }
----

Now the executor acks `aphrodite` and `ares`.
This sends the following:

----
ack( zeus, a ^ e^g^i )
ack( dione, b ^ e^g^i )
ack( zeus, c ^ f^h^j )
ack( hera, d ^ f^h^j )
----

That makes the acker's ledger be

----
zeus: ( spout_id: 0, val: a ^ a ^ e^g^i ^ c ^ c ^ f^h^j)
dione: ( spout_id: 0, val: b ^ b ^ e^g^i)
hera: ( spout_id: 0, val: d ^ d ^ f^h^j)
----

Finally, let's assume eros, phobos and deimos are processed without further issue of tuples. They will also ack with the XOR of their ackVal (zero, since they have no children) and the ack tree

----
ack( zeus, e^f ^ 0 )
ack( dione, e ^ 0 )
ack( hera, f ^ 0 )
ack( zeus, g^h ^ 0 )
ack( dione, g ^ 0 )
ack( hera, h ^ 0 )
ack( zeus, i^j ^ 0 )
ack( dione, i ^ 0 )
ack( hera, j ^ 0 )
----

----
zeus: ( spout_id: 0, val: a ^ a ^ e^g^i ^ c ^ c ^ f^h^j ^ e^f ^ g^h ^ i^j)
dione: ( spout_id: 0, val: b ^ b ^ e^g^i ^ e ^ g ^ i )
hera: ( spout_id: 0, val: d ^ d ^ f^h^j ^ f ^ h ^ j )
----

At this point, every term appears twice in the checksum:
its record is removed from the ack ledger,
and the spout is notified (via emit-direct) that the tuple tree has been successfully completed.

traffic occurs to the acker in two places:

* each time a spout emits a tuple
* each time a bolt acks a tuple

even if there are thousands of tuples, only a very small amount of data is sent: the init_stream when the tuple tree is born, and once for each child tuple.
When a tuple is acked, it both clears its own record and implicates its children.

===== Acker

* Acker is just a regular bolt -- all the interesting action takes place in its execute method.
* it knows
- id == `tuple[0]` (TODO what is this)
- the tuple's stream-id
- there is a time-expiring data structure, the `RotatingHashMap`
- it's actually a small number of hash maps;
- when you go to update or add to it, it performs the operation on the right component HashMap.
- periodically (when you receive a tick tuple), it will pull off oldest component HashMap, mark it as dead; invoke the expire callback for each element in that HashMap.
* get the current checksum from `pending[id]`.

pending has objects like `{ val: "(checksum)", spout_task: "(task_id)" }`

* when it's an ACKER-INIT-STREAM
`pending[:val] = pending[:val] ^ tuple[1]`
*


pseudocode

class Acker < Bolt

def initialize
self.ackables = ExpiringHash.new
end

def execute(root_id, partial_checksum, from_task_id)
stream_type = tuple.stream_type
ackables.expire_stalest_bucket if (stream_type == :tick_stream)
curr = ackables[root_id]

case stream_type
when :init_stream
curr[:val] = (curr[:val] || 0) ^ partial_checksum
curr[:spout_task] = from_task_id
when :ack_stream
curr[:val] = (curr[:val] || 0) ^ partial_checksum
when :fail_stream
curr[:failed] = true
end

ackables[root_id] = curr

if curr[:spout_task] && (curr[:val] == 0)
ackables.delete(root_id)
collector.send_direct(curr[:spout_task], :ack_stream, [root_id])
elsif curr[:failed]
ackables.delete(root_id)
collector.send_direct(curr[:spout_task], :fail_stream, [root_id])
end

collector.ack # yeah, we have to ack as well -- we're a bolt
end

end






===== A few details

There's a few details to clarify:

First, the spout must never block when emitting -- if it did, critical bookkeeping tuples might get trapped, locking up the flow. So its emitter keeps an "overflow buffer", and publishes as follows:

* if there are tuples in the overflow buffer add the tuple to it -- the queue is certainly full.
* otherwise, publish the tuple to the flow with the non-blocking call. That call will either succeed immediately ...
* or fail with an `InsufficientCapacityException`, in which case add the tuple to the overflow buffer

The spout's async-loop won't call `nextTuple` if overflow is present, so the overflow buffer only has to accomodate the maximum number of tuples emitted in a single `nextTuple` call.



===== Code Locations

Since the Storm+Trident code is split across multiple parent directories, it can be hard to track where its internal logic lives. Here's a guide to the code paths as of version `0.9.0-wip`.

[[storm_transport_code]]
.Storm Transport Code
|=======
| Role | source path |
| `async-loop` | `clj/b/s/util.clj` |
| Spout instantiation | `clj/b/s/daemon/executor.clj` | `mk-threads :spout`
| Bolt instantiation | `clj/b/s/daemon/executor.clj` | `mk-threads :bolt`
| Disruptor Queue facade | `clj/b/s/disruptor.clj` and `jvm/b/s/utils/disruptor.java` |
| Emitter->Send Q logic | `clj/b/s/daemon/executor.clj` | `mk-executor-transfer-fn`
| Router (drains exec send Q) | `clj/b/s/daemon/worker.clj` | `mk-transfer-fn` | infinite loop attached to each disruptor queue
| Local Send Q -> exec Rcv Q | `clj/b/s/daemon/worker.clj` | `mk-transfer-local-fn` | invoked within the transfer-fn and receive thread
| Worker Rcv Q -> exec Rcv Q | `clj/b/s/messaging/loader.clj` | `launch-receive-thread!` | Worker Rcv Q -> exec Rcv Q
| Trans Q -> zmq | `clj/b/s/daemon/worker.clj` | `mk-transfer-tuples-handler`
| `..` | `clj/b/s/daemon/task.clj` |
| `..` | `clj/b/s/daemon/acker.clj` |
| `..` | `clj/b/s/` |
|=======


=== More on Transport


* **Queues between Spout and Wu-Stage**: exec.send/transfer/exec.receive buffers
- output of each spout goes to its executor send buffer
- router batches records destined for local executors directly to their receive disruptor Queues, and records destined for _all_ remote workers in a single m-batch into this worker's transfer queue buffer.
Expand Down
6 changes: 6 additions & 0 deletions 88-storm-overview.asciidoc
@@ -1,3 +1,9 @@
=== Questions

* how many ackers (per worker? per topology?)
* when is ack sent; what are parts of the ack checksummer inputs; what is checksum called
* how does it decide batch is done yet ack is overdued
* what happens if ack twice?
=== Trident Lifecycle
Expand Down

0 comments on commit d036c98

Please sign in to comment.