Browse files

stream processing and tuning storm+trident

  • Loading branch information...
1 parent d036c98 commit bf9980be2c85d3d147b6b197ff4c59144af4aaeb Philip (flip) Kromer committed Jul 7, 2013
Showing with 201 additions and 20 deletions.
  1. +6 −0 05c-intro_to_wukong.asciidoc
  2. +3 −0 88-locality.asciidoc
  3. +26 −1 88-storm-lifecycle_of_a_record.asciidoc
  4. +63 −18 88-storm-overview.asciidoc
  5. +102 −0
  6. +1 −1 data
6 05c-intro_to_wukong.asciidoc
@@ -1,3 +1,9 @@
+Sun Wukong: "He acquired the powers of shapeshifting known as the "72 transformations"" -- Wikipedia
stream do |article|
words = Wukong::TextUtils.tokenize(article.text, remove_stopwords: true)
3 88-locality.asciidoc
@@ -1,3 +1,6 @@
+Proximity - adjacency
+Context - reshape - pivot
=== Three legs of the big data stack
In early 2012, my company (Infochimps) began selling our big data stack to enterprise companies. At first, clients came to us with the word Hadoop on their lips and a flood of data on their hands -- yet what consistently emerged as the right solution was an initial implementation using streaming data analytics into a scalable datastore, and a follow-on installment of Hadoop once their application reached scale. From nowhere on their radar last year, we now hear requests for Storm by name. I've seen Hadoop's historically fast growth from open-source product with momentum in 2008 to foundational enterprise technology in 2013, and can attest that the rate of adoption for streaming analytics (and Storm+Trident in particular) looks to be even faster.
27 88-storm-lifecycle_of_a_record.asciidoc
@@ -66,7 +66,32 @@ At the heart
Unlike the transfer and the executor queues, the worker's receive buffer is a ZeroMQ construct, not a disruptor queue
-===== Acking
+==== Acking In Storm
+* Noah is processed, produces Ham and Shem. Ack clears Noah, implicates Ham and Shem
+* Shem is processed, produces Abe. Ack clears Shem, implicates Abe
+* Ham is processed, produces non;e. Ack clears Ham
+* Alice does a favor for Bob and Charlie. Alice is now in the clear; Bob and Charlie owe
+* For every record generated, send it to the acker
+* Who keeps it in a table
+* For every record completed, send it to the acker
+* Who removes it from the table
+* Maintain tickets in a tree structure so you know what to retry
+* When the tuple tree is created, send an ack-init: the clan id along with its edge checksum
+* When each tuple is successfully completed, send an ack holding two sixty-four bit numbers: the tupletree id, and the XOR of its edge id and all the edge ids it generated. Do this for each of its tupletree ids.
+* The acker holds a single O(1) lookup table
+ - it's actually a set of lookup tables: current, old and dead. new tuple trees are added to the current bucket; every timeout number of seconds, current becomes old, and old becomes dead -- they are declared failed and their records retried.
+* The spout holds the original tuple until it receives notice from the acker. The spout won't fetch more than the max-pending number of tuples: this is to protect the spout against memory pressure , and the downstream system against congestion.
When a tuple is born in the spout,
81 88-storm-overview.asciidoc
@@ -1,22 +1,62 @@
-=== Questions
+One of the biggest changes to the practice of big data over the last year has been the rapid adoption of streaming data analytics.
+The obvious applications are extract/transform/load ("ETL") processes -- the automated version of chapter (TODO REF) on Data Munging -- and complex event processing ("CEP") -- the kind of real-time data handling that powers Wall Street and high-scale security monitoring.
-* how many ackers (per worker? per topology?)
-* when is ack sent; what are parts of the ack checksummer inputs; what is checksum called
-* how does it decide batch is done yet ack is overdued
-* what happens if ack twice?
+It's best instead to think of Stream Analytics as a way to query your data on the way _in_ to your datastore. A full-capability big data platform has all three of the following pieces:
+* Stream Processing, to assemble data in simple context at write time;
+* Queryable Datastore, to assemble data in simple context at read time;
+* Batch Processing, to assemble data in global context
+The Storm+Trident data processing system, developed by Backtype and Twitter, is a remarkably elegant open-source project that's emerging as the dominant streaming analytics platform.
-=== Trident Lifecycle
+While the raw API of Storm+Trident is quite different from that of Hadoop, the interesting parts are the same. Wukong unifies the two, letting us concentrate on data and questions. Let's dive in.
+=== Storm+Trident
-* exactly once processing
-* transactional db support
+Storm is the underlying framework that handles transport, messaging, process supervision and so forth. It guarantees reliable processing of each record using a brilliantly simple scheme that will scale to millions of records without requiring its own massive data flow for bookkeeping.
+Trident builds on Storm to provide exactly once processing and transactional
* layer on the flow DSL
* some primitive aggregators
+==== Orchestration and Transformation
+* Spouts and Bolts, to produce and transform records respectively;
+* Topologies, to orchestrate the grouping and combining of data streams
+==== Reliability and Guaranteed Processing
+The central challenge of high-scale stream analytics is this: the amount of metadata required to coordinate and process the data reliably can itself grow to such a volume that it becomes difficult to coordinate and process reliably. (TODO: wow needs rewording)
+A distributed framework can provide one of the following guarantees:
+* "best-effort":
+* "at least once": every record will be successfully processed at least once. If your data
+* "exactly-once": every record is successfully processed, and every unsuccessful attempt can be reliably invalidated or retracted
+A surprising number of tasks require only best-effort or at-least-once processing
+But exactly-once processing is essential for analytics processes: counting distinct records,
+=== Why Storm+Trident is bigger than it looks
+* Operational decoupling:
+* Latency Tolerance:
+* Reliability Glue:
+* Transport Agnosticism:
+* Distributed Programming without quantum mechanics
+How do you make a program that will run forever? Joe Armstrong, the inventor of Erlang, identifies these six key features:
+Isolation; Concurrency; Failure Detection; Fault Identification, Live Code Upgrade; Stable Storage
+Storm+Trident provides all six,
+=== Trident Lifecycle
==== Coordinator
-* coordinator is secretly the spout
+* Master coordinator is secretly the spout
* at trident batch delay period, will emit a transaction tuple
* it has a serially incrementing transaction ID, kept forever even across restarts.
* (We're only going to talk about Opaque Transactional topologies; more on that later)
@@ -30,7 +70,6 @@ spout emits batches; these succeed or fail as a whole. (That's because they're p
* tuples in batches are freely processed in parallel and asynchronously unless there are barriers (eg the state after the group by)
* In this case, Processor emits aggregable records
==== Group-by
==== Transactional State
@@ -48,7 +87,6 @@ spout emits batches; these succeed or fail as a whole. (That's because they're p
===== Ensuring Transactional reliability
Let's say for transaction ID 69 the old aggregated values were `{A:20, B: 10, C: 1, D: 0}`, and new aggregated values were `{A: 24, B: 12, C: 2, D: 1}`.
It stores (TODO: verify order of list):
@@ -75,13 +113,10 @@ This means another attempt has been here: maybe it succeeded but was slow; maybe
* transactional: exactly once; batches are always processed in whole
* opaque transactional: all records are processed, but might not be in same batches
=== Concepts
__Topology-level objects__
* __Bolt__ -- topology-level object.
- contract:
@@ -116,11 +151,8 @@ __Physical-level objects__
- belongs to executor; has one bolt/spout
- physical expression of the bolt or spout
- in Storm, can set many tasks per executor -- when you want to scale out (TODO: verify). (in Trident, left at one per; TODO: can this be changed?)
- -
* __Router__
From documentation:
@@ -153,3 +185,16 @@ The following should be even multiples:
* executor receive buffer
* worker receive buffer
* worker transfer buffer
+==== notes for genealogy analogy
+ [1.1.1] Sky was the first who ruled over the whole world.1 ...
+[1.1.3] [Uranus] begat children by Earth, to wit, the Titans as they are named: Ocean, Coeus, Hyperion, Crius, Iapetus, and, youngest of all, Cronus; also daughters, the Titanides as they are called: Tethys, Rhea, Themis, Mnemosyne, Phoebe, Dione, Thia.5 ...
+[1.3.1] Now Zeus wedded Hera and begat Hebe, Ilithyia, and Ares,32 but he had intercourse with many women, both mortals and immortals. By Themis, daughter of Sky, he had daughters, the Seasons, to wit, Peace, Order, and Justice; also the Fates, to wit, Clotho, Lachesis, and Atropus33; by Dione he had Aphrodite34; by Eurynome, daughter of Ocean, he had the Graces, to wit, Aglaia, Euphrosyne, and Thalia35; by Styx he had Persephone36; and by Memory (Mnemosyne) he had the Muses, first Calliope, then Clio, Melpomene, Euterpe, Erato, Terpsichore, Urania, Thalia, and Polymnia.37
+ "Straightway then they came to the abode of the gods, to steep Olympus and there wind-footed, swift Iris stayed the horses and loosed them from the car, and cast before them food ambrosial; but fair Aphrodite flung herself upon the knees of her mother Dione. She clasped her daughter in her arms, and stroked her with her hand and spake to her, saying: "Who now of the sons of heaven, dear child, hath entreated thee thus wantonly, as though thou wert working some evil before the face of all?""
+ Dione 1. Dione 1 ... daughter of Uranus & Gaia. According to some she consorted with Zeus and gave birth to Aphrodite. Apd.1.1.3, 1.3.1; Hom.Il.5.370; Hes.The.350ff
+ Aphrodite had three children by Ares: Deimos, Phobus 1 (Fear and Panic) and Harmonia 1
+ Hesiod [933] Also Cytherea bare to Ares the shield-piercer Panic and Fear, terrible gods who drive in disorder the close ranks of men in numbing war, with the help of Ares, sacker of towns: and Harmonia whom high-spirited Cadmus made his wife.
@@ -0,0 +1,102 @@
+=== Tuning Storm+Trident
+* Ensure each stage is always ready to accept records;
+* Deliver each processed record promptly to its destination
+==== Outline
+The first step in tuning a flow is to identify your principal goal: latency, throughput, memory or cost.
+Tuning for cost means balancing the throughput (records/hour per machine) and cost of infrastructure (amortized $/hour per machine). Once you've chosen the hardware to provision, tuning for cost is equivalent to tuning for throughput, so we'll just discuss latency and throughput as goals. (I'm also going to concentrate on typical latency/throughput, and not on say variance or 99th percentile figures)
+Next, identify your dataflow's principal limit, the constraining resource that most tightly bounds the performance of its slowest stage. A dataflow can't pass through more records per second than the cumulative output of its most constricted stage, and it can't deliver records in less end-to-end time than the stage with the longest delay.
+The principal limit may be:
+* _IO volume_: there's a hardware limit to the number of bytes per second that a machine's disks or network connection can sustain. Event log processing often involves large amounts of data requiring only parsing or other trivial transformations before storage -- throughput of such dataflows are IO bound.
+* _CPU_: a CPU-bound flow spends more time in calculations to process a record
+* _concurrency_: network requests to an external resource often require almost no CPU and minimal volume. If your principal goal is throughput, the flow is only bound by how many network requests you can make in parallel.
+* _remote rate limit bound_: alternatively, you may be calling an external resource that imposes a maximum throughput out of your control. A legacy datastore might only be able to serve a certain volume of requests before its performance degrades, or terms-of-service restrictions from a third-party web API (Google's Geolocation API.)
+* _memory_: large windowed joins or memory-intensive analytics algorithms may require so much RAM it defines the machine characteristics
+* Topology; Little's Law
+ - skew
+* System: machines; workers/machine, machine sizing; (zookeeper, kafka sizing)
+* Throttling: batch size; kafka-partitions; max pending; trident batch delay; spout delay; timeout
+* Congestion: number of ackers; queue sizing (exec send, exec recv, transfer); `zmq.threads`
+* Memory: Max heap (Xmx), new gen/survivor size; (queue sizes)
+* Ulimit, other ntwk sysctls for concurrency and ntwk; Netty vs ZMQ transport; drpc.worker.threads;
+* Other important settings: preferIPv4; `transactional.zookeeper.root` (parent name for transactional state ledger in Zookeeper); `` (java options passed to _your_ worker function), `topology.worker.shared.thread.pool.size`
+* Don't touch: `zmq.hwm` (unless you are seeing unreliable network trnsport under bursty load), disruptor wait strategy, worker receive buffer size
+==== Initial tuning
+CPU-bound flow
+Construct a topology with parallelism one, set max-pending to one, and time the flow through each stage.
+Increase the parallelism of CPU-bound stages to nearly saturate the CPU.
+Set the trident batch delay to be comfortably larger than how long a batch takes -- that is, there should be a short additional delay after each batch completes
+Adjust batch sizes to
+* `each()` functions should not care about batch size.
+* `partitionAggregate`
+* `partitionPersist` and `partitionQuery
+==== Topology
+Time each stage independently
+The stages upstream of your principal bottleneck should always have records ready to process. The stages downstream should always have capacity to accept and promptly deliver processed records.
+==== Provisioning
+Use one worker per topology per machine: storm passes tuples directly from sending executor to receiving executor if they're within the same worker. Also set number of ackers equal to number of workers -- the default of one per topology never makes sense (future versions of Storm will fix this).
+If you're CPU-bound, set one executor per core for the bounding stage (or one less than cores at large core count). Don't adjust the parallelism without reason -- even a shuffle implies network transfer. Shuffles don't impart any load-balancing.
+Match your spout parallelism to its downstream flow. Use the same number of kafka partitions as kafka spouts (or a small multiple). If there are more spouts than kafka machines*kpartitions, the extra spouts will sit idle.
+For map states or persistentAggregates -- things where results are accumulated into memory structures -- allocate one stage per worker. Cache efficiency and batch request overhead typically improve with large record set sizes.
+In a concurrency bound problem, use very high parallelism
+If possible, use a QueryFunction to combine multiple queries into a batch request.
+===== Little's Law
+ Throughput (recs/s) = Capacity / Latency
+If all records must pass through a stage that handles 10 records per second, then the flow cannot possibly proceed faster than 10 records per second, and it cannot have latency smaller than 100ms (1/10)
+==== Important equations and constants
+* Initial records per batch = fetch bytes / avg record size
+* Max network throughput
+==== Size
+Records per batch
+If you are using the Kafka spout, this is controlled by max fetch bytes
+account for fanout
+The executor send buffer shouldn't be outrageously smaller than the record count per batch
+JVM Heap size and new gen size.
+* Given the cardinal rule of stream processing (accept records and get rid of them quickly), Trident will be happiest with a large New Gen space.
+* Switch on GC logging
+ * The survivor spaces should not overflow after new-gen gcs
+ * very few objects promoted to old-gen
+ * new gen size must be less than one-half the full heap size
+* Your goal: no more than one new-gen GC per 5 seconds, with less than 0.1s pause times; no more than one old-gen GC per 5 minutes, and no full (stop-the-world) GCs seen even under heavy use
+* You cannot arbitrarily turn up the heap sizes
+ * Heap size larger than about 12-16 GB becomes fairly dangerous. Try as you might, a stop-the-world GC may some dday be necesary -- and the amount of time to sweep that full amount of memory must be less than the relevant timeouts
+==== Tempo and Throttling
+Max-pending (`TOPOLOGY_MAX_SPOUT_PENDING`) sets the number of tuple trees live in the system at any one time.
+Trident-batch-delay (`topology.trident.batch.emit.interval.millis`) sets the maximum pace at which the trident Master Batch Coordinator will issue new seed tuples. It's a cap, not an add-on: if t-b-d is 500ms and the most recent batch was released 486ms, the spout coordinator will wait 14ms before dispensing a new seed tuple. If the next pending entry isn't cleared for 523ms, it will be dispensed immediately. If it took 1400ms, it will also be released immediately -- but no make-up tuples are issued.
+Trident-batch-delay is principally useful to prevent congestion, especially around startup. As opposed to a traditional Storm spout, a Trident spout will likely dispatch hundreds of records with each batch. If max-pending is 20, and the spout releases 500 records per batch, the spout will try to cram 10,000 records into its send queue.
2 data
@@ -1 +1 @@
-Subproject commit 8ae2140e4862c18a2cd67c7209a6b3c4c5d1b18f
+Subproject commit 8541645b9bb8776f60130b364aa1ff9863bb06f4

0 comments on commit bf9980b

Please sign in to comment.