Skip to content
Browse files

Tuning Storm+Trident: executor queue sizing

  • Loading branch information...
1 parent 2788aed commit 9c7a876d3a44742e2cef723390271f3d08bb39b5 Philip (flip) Kromer committed
View
4 .gitattributes
@@ -1,4 +1,4 @@
-text=auto
+* text=auto
*.asciidoc text
*.md text
@@ -6,3 +6,5 @@ text=auto
*.rb text
*.pig text
*.tsv eol=lf
+
+*.pdf -text
View
10 88-storm-overview.asciidoc
@@ -19,9 +19,13 @@ The obvious applications are extract/transform/load ("ETL") processes -- the aut
It's best instead to think of Stream Analytics as a way to query your data on the way _in_ to your datastore. A full-capability big data platform has all three of the following pieces:
-* Stream Processing, to assemble data in simple context at write time;////For example... Amy////
-* Queryable Datastore, to assemble data in simple context at read time;////For example... Amy////
-* Batch Processing, to assemble data in global context////For example... Amy////
+* queryable scalable datastores, like HBase or Elasticsearch;
+* batch processing, typically Hadoop; and
+* stream analytics, like Storm or Spark Streaming.
+
+You're probably familiar with the datastore layer's responsibilities: direct row-level access to records, and answers in simple context at read time (summing counts by hour to form counts across days, or a simple faceted rollup on sales by dealership to sales by region). Batch processing gives you answers in global context, but delayed from both write time and read time.
+
+The most powerful feature of stream processing is that it lets you assemble data in simple context at _write_ time. /// More to come here.
The Storm+Trident data processing system, developed by Backtype and Twitter, is a remarkably elegant open-source project that's emerging as the dominant streaming analytics platform.
View
23 88-tuning_storm_trident.asciidoc
@@ -34,8 +34,9 @@ The principal bottleneck may be:
* _remote rate limit_: alternatively, you may be calling an external resource that imposes a maximum throughput limit. For example, terms-of-service restrictions from a third-party web API might only permit a certain number of bulk requests per hour, or a legacy datastore might only be able to serve a certain volume of requests before its performance degrades. If the remote resource allows bulk requests, you should take care that each Trident batch is sized uniformly. For example, the https://dev.twitter.com/docs/api/1.1/get/users/lookup[twitter users lookup API] returns user records for up to 100 user IDs -- so it’s essential that each batch consist of 100 tuples (and no more). Otherwise, there’s not much to do here besides tuning your throughput to comfortably exceed the maximum expected rate.
For each of the cases besides IO-bound and CPU-bound, there isn’t that much to say:
-for memory-bound flows, it’s "buy enough RAM" (though you should read the tips on JVM tuning later in this chapter). For remote rate-limited flows, buy a better API plan or remote datastore -- otherwise, tune as if CPU-bound, allowing generous headroom. For concurrency-bound flows, apply the recommendations that follow,
-increase concurrency until things get screwy. If that per-machine throughput is acceptable to your budget, great; otherwise, hire an advanced master sysadmin to help you chip away at the
+for memory-bound flows, it’s "buy enough RAM" (though you should read the tips on JVM tuning later in this chapter). For remote rate-limited flows, buy a better API plan or remote datastore -- otherwise, tune as if CPU-bound, allowing generous headroom.
+
+For concurrency-bound flows, apply the recommendations that follow, increase concurrency until things get screwy. If that per-machine throughput is acceptable to your budget, great; otherwise, hire an advanced master sysadmin to help you chip away at the
==== Provisioning
@@ -75,11 +76,9 @@ Once you have roughly dialed in the batch size and parallelism, check in with th
If you're memory-bound, use machines with lots of RAM.
-
===== Concurrency Bound
-In a concurrency bound problem, use very high parallelism
-If possible, use a QueryFunction to combine multiple queries into a batch request.
+In a concurrency bound problem, use very high parallelism. If possible, use a QueryFunction to combine multiple queries into a batch request.
===== Sidebar: Little's Law
@@ -117,9 +116,19 @@ Since the execution time increases slowly in case (2), you get better and better
===== Executor send buffer size
-Don't worry about this setting until most other things stabilize -- it's mostly important for ensuring that a burst of records doesn't clog the send queue.
+Now that the records-per-batch is roughly sized, take a look at the disruptor queue settings (the internal buffers between processing stages).
+
+As you learned in the storm internals chapter, each slot in the executor send buffer queue holds a single tuple. The worker periodically sweeps all its hanging records, dispatching them in bunches either directly into executor receive buffers (for executors in the same worker) or the worker transfer buffer (for remote executors). Let us highlight the important fact that the executor send queue contains _tuples_, while the receive/transfer queues contain _bunches of tuples_.
+
+These are advanced-level settings, so don't make changes unless you can quantify their effect, and make sure you understand why any large change is necessary. In all cases, the sizes have to be an even power of two (1024, 2048, 4096, and so forth).
+
+As long as the executor send queu is large enough, further increase makes no real difference apart from increased ram use and a small overhead cost. If the executor send queue is too small, however, a burst of records will clog it unnecessarily. Unfortunately, in current versions of Storm it applies universally so everyone has to live with the needs of the piggiest customer.
+
+This is most severe in the case of a spout, which will receive a large number of records in a burst, or anywhere there is high fanout (one tuple that rapidly turns into many tuples).
+Set the executor send buffer to be larger than the batch record count of the spout or first couple stages.
+
+
-Set the executor send buffer to be larger than the batch record count of the spout or first couple stages. Since it applies universally, don't go crazy with this value. It has to be an even power of two (1024, 2048, 4096, 8192, 16384).
==== Garbage Collection and other JVM options
View
2 89-intro-to-storm.asciidoc
@@ -4,8 +4,6 @@ Storm+Trident is a system for processing streams of data at scale, and represent
<Storm Intro>
-
-
Your First Topology
Topologies in Storm are analogous to jobs in Hadoop - they define the path data takes through your system and the operations applied along the way. Topologies are compiled locally and then submitted to a Storm cluster where they run indefinitely until stopped. You define your topology and Storm handles all the hard parts -- fault tolerance, retrying, and distributing your code across the cluster among other things.
View
BIN images/Storm+TridentDiagrams.graffle.pdf
Binary file not shown.
View
12 images/Storm+TridentDiagrams.graffle/data.plist
@@ -75,6 +75,16 @@
<string>int</string>
<string>0</string>
</array>
+ <key>NSPrinter</key>
+ <array>
+ <string>coded</string>
+ <string>BAtzdHJlYW10eXBlZIHoA4QBQISEhAlOU1ByaW50ZXIAhIQITlNPYmplY3QAhZKEhIQITlNTdHJpbmcBlIQBKwtDYW5vbiBNUDgzMIaG</string>
+ </array>
+ <key>NSPrinterName</key>
+ <array>
+ <string>string</string>
+ <string>Canon MP830</string>
+ </array>
<key>NSRightMargin</key>
<array>
<string>float</string>
@@ -94927,7 +94937,7 @@ Kafka\uc0\u8232 Spout}</string>
</dict>
</array>
<key>Frame</key>
- <string>{{0, 50}, {1594, 1128}}</string>
+ <string>{{192, 50}, {1594, 1128}}</string>
<key>ListView</key>
<false/>
<key>OutlineWidth</key>

0 comments on commit 9c7a876

Please sign in to comment.
Something went wrong with that request. Please try again.