Skip to content
Browse files

consolidated event streams portion

  • Loading branch information...
1 parent 7c98ec0 commit ae3161f7ed89e129ec304088eae3cdf6e96be276 Philip (flip) Kromer committed Jan 10, 2014
View
2 07-event_streams.asciidoc
@@ -1,2 +0,0 @@
-[[event_streams]]
-== Event Streams ==
View
34 07-intro_to_storm+trident.asciidoc
@@ -1,34 +0,0 @@
-== Intro to Storm+Trident
-
-=== Enter the Dragon: C&E Corp Gains a New Partner
-
-Nim
-Dragons are fast and sleek,
- and never have to sleep. They exist in some ways out of time --
- a dragon can perform a thousand actions in the blink of an eye, and yet a thousand years is to them a passing moment
-
-
-=== Intro: Storm+Trident Fundamentals
-
-At this point, you have good familiarity with Hadoop’s batch processing power and the powerful inquiries it unlocks above and as a counterpart to traditional database approach. Stream analytics is a third mode of data analysis and, it is becoming clear, one that is just as essential and transformative as massive scale batch processing has been.
-
-Storm is an open-source framework developed at Twitter that provides scalable stream processing. Trident draws on Storm’s powerful transport mechanism to provide _exactly once_ processing of records in _windowed batches es_ for aggregating and persisting
-to an external data store.
-
-The central challenge in building a system that can perform fallible operations on billions of records reliably is how to do so without yourself producing so much bookkeeping that it becomes its own scalable Stream processing challenge. Storm handles all details of reliable transport and efficient routing for you, leaving you with only the business process at hand. (The remarkably elegant way Storm handles that bookkeeping challenge is one of its principle breakthroughs; you’ll learn about it in the later chapter on Storm Internals.)
-
-This takes Storm past the mere processing of records to Stream Analytics -- with some limitations and some advantages, you have the same ability to specify locality and write arbitrarily powerful general-purpose code to handle every record. A lot of Storm+Trident’s adoption is in application to real-time systems. footnote:[for reasons you’ll learn in the Storm internals chapter, it’s not suitable for ultra-low latency (below, say, 5s of milliseconds), Wall Street-type applications, but if latencies above that are real-time enough for you, Storm+Trident shines.]
-
-But, just as importantly, the framework exhibits radical _tolerance_ of latency. It’s perfectly reasonable to, for every record, perform reads of a legacy data store, call an internet API and the like, even if those might have hundreds or thousands of milliseconds worst-case latency. That range of timescales is simply impractical within a batch processing run or database query. In the later chapter on the Lambda Architecture, you’ll learn how to use stream and batch analytics together for latencies that span from milliseconds to years.
-
-As an example, one of the largest hard drive manufacturers in the world ingests sensor data from its manufacturing line, test data from quality assurance processes, reports from customer support and post mortem analysis of returned devices. They have been able to mine the accumulated millisecond scale sensor data for patterns that predict flaws months and years later. Hadoop produces the "slow, deep" results, uncovering the patterns that predict failure. Storm+Trident produces the fast, relevant results: operational alerts when those anomalies are observed.
-
-Things you should take away from this chapter:
-
-Understand the type of problems you solve using stream processing and apply it to real examples using the best-in-class stream analytics frameworks.
-Acquire the practicalities of authoring, launching and validating a Storm+Trident flow.
-Understand Trident’s operators and how to use them: Each apply `CombinerAggregator`s, `ReducerAggregator`s and `AccumulatingAggregator`s (generic aggregator?)
-Persist records or aggregations directly to a backing database or to Kafka for item-potent downstream storage.
-(probably not going to discuss how to do a streaming join, using either DRPC or a hashmap join)
-
-NOTE: This chapter will only speak of Storm+Trident, the high level and from the outside. We won’t spend any time on how it’s making this all work until (to do ref the chapter on Storm+Trident internals)
View
26 07a-event_streams-intro.asciidoc
@@ -1,26 +0,0 @@
-////This intro here is a good model for other chapters - this one's rough, but the bones are here. Amy////
-
-Much of Hadoop's adoption is driven by organizations realizing they the opportunity to measure every aspect of their operation, unify those data sources, and act on the patterns that Hadoop and other Big Data tools uncover.////Share a few examples of things that can be measured (website hits, etc.) Amy////
-
-For
-
-e-commerce site, an advertising broker, or a hosting provider, there's no wonder inherent in being able to measure every customer interaction, no controversy that it's enormously valuable to uncovering patterns in those interactions, and no lack of tools to act on those patterns in real time.
-
-can use the clickstream of interactions with each email
-; this was one of the cardinal advantages cited in the success of Barack Obama's 2012 campaign.
-
-
-This chapter's techniques will help, say, a hospital process the stream of data from every ICU patient; a retailer process the entire purchase-decision process
-from
-or a political campaign to understand and tune
-the response to
-each email batch and advertising placement.
-
-
-Hadoop cut its teeth at Yahoo, where it was primarily used for processing internet-sized web crawls(see next chapter on text processing) and
-
-// ?? maybe this should just be 'data streams' or something
-
-
-Quite likely, server log processing either a) is the reason you got this book or b) seems utterly boring.////I think you should explain this for readers. Amy//// For the latter folks, stick with it; hidden in this chapter are basic problems of statistics (finding histogram of pageviews), text processing (regular expressions for parsing), and graphs (constructing the tree of paths that users take through a website).
-
View
42 07b-pageview_histograms.asciidoc
@@ -1,42 +0,0 @@
-=== Pageview Histograms ===
-////Ease the reader in with something like, "Our goal here will be..." Amy////
-
-Let's start exploring the dataset. Andy Baio
-
-----
-include::code/serverlogs/old/logline-02-histograms-mapper.rb[]
-----
-
-We want to group on `date_hr`, so just add a 'virtual accessor' -- a method that behaves like an attribute but derives its value from another field:
-
-----
-include::code/serverlogs/old/logline-00-model-date_hr.rb[]
-----
-
-This is the advantage of having a model and not just a passive sack of data.
-
-Run it in map mode:
-
-----
-include::code/serverlogs/old/logline-02-histograms-02-mapper-wu-lign-sort.log[]
-----
-
-TODO: digression about `wu-lign`.
-
-Sort and save the map output; then write and debug your reducer.
-
-----
-include::code/serverlogs/old/logline-02-histograms-full.rb[]
-----
-
-When things are working, this is what you'll see. Notice that the `.../Star_Wars_Kid.wmv` file already have five times the pageviews as the site root (`/`).
-
-----
-include::code/serverlogs/old/logline-02-histograms-03-reduce.log[]
-----
-
-You're ready to run the script in the cloud! Fire it off and you'll see dozens of workers start processing the data.
-
-----
-include::code/serverlogs/old/logline-02-histograms-04-freals.log[]
-----
View
49 07c-sessionizing.asciidoc
@@ -1,49 +0,0 @@
-
-=== User Paths through the site ("Sessionizing")
-
-We can use the user logs to assemble a picture of how users navigate the site -- 'sessionizing' their pageviews. Marketing and e-commerce sites have a great deal of interest in optimizing their "conversion funnel", the sequence of pages that visitors follow before filling out a contact form, or buying those shoes, or whatever it is the site exists to serve. Visitor sessions are also useful for defining groups of related pages, in a manner far more robust than what simple page-to-page links would define. A recommendation algorithm using those relations would for example help an e-commerce site recommend teflon paste to a visitor buying plumbing fittings, or help a news site recommend an article about Marilyn Monroe to a visitor who has just finished reading an article about John F Kennedy. Many commercial web analytics tools don't offer a view into user sessions -- assembling them is extremely challenging for a traditional datastore. It's a piece of cake for Hadoop, as you're about to see.
-
-////This spot could be an effective place to say more about "Locality" and taking the reader deeper into thinking about that concept in context. Amy////
-
-NOTE:[Take a moment and think about the locality: what feature(s) do we need to group on? What additional feature(s) should we sort with?]
-
-
-spit out `[ip, date_hr, visit_time, path]`.
-
-----
-include::code/serverlogs/old/logline-03-breadcrumbs-full.rb[]
-----
-
-You might ask why we don't partition directly on say both `visitor_id` and date (or other time bucket). Partitioning by date would break the locality of any visitor session that crossed midnight: some of the requests would be in one day, the rest would be in the next day.
-
-run it in map mode:
-
-----
-include::code/serverlogs/old/logline-02-histograms-01-mapper.log[]
-----
-
-----
-include::code/serverlogs/old/logline-03-breadcrumbs-02-mapper.log[]
-----
-
-group on user
-
-----
-include::code/serverlogs/old/logline-03-breadcrumbs-03-reducer.log[]
-----
-
-We use the secondary sort so that each visit is in strict order of time within a session.
-
-You might ask why that is necessary -- surely each mapper reads the lines in order? Yes, but you have no control over what order the mappers run, or where their input begins and ends.
-
-This script will accumulate multiple visits of a page.
-
-TODO: say more about the secondary sort.
-////This may sound wacky, but please try it out: use the JFK/MM exmaple again, here. Tie this all together more, the concepts, using those memorable people. I can explain this live, too. Amy////
-
-==== Web-crawlers and the Skew Problem ====
-
-In a
-
-It's important to use real data when you're testing algorithms:
-a skew problem like this
View
21 07d-page_page_similarity.asciidoc
@@ -1,21 +0,0 @@
-=== Page-Page similarity
-
-What can you do with the sessionized logs? Well, each row lists a visitor-session on the left and a bunch of pages on the right. We've been thinking about that as a table, but it's also a graph -- actually, a bunch of graphs! The <<sidebar,serverlogs_affinity_graph>> describes an _affinity graph_, but we can build a simpler graph that just connects pages to pages by counting the number of times a pair of pages were visited by the same session. Every time a person requests the `/archive/2003/04/03/typo_pop.shtml` page _and_ the `/archive/2003/04/29/star_war.shtml` page in the same visit, that's one point towards their similarity. The chapter on <<graph_processing>> has lots of fun things to do with a graph like this, so for now we'll just lay the groundwork by computing the page-page similarity graph defined by visitor sessions.
-
-----
-include::code/serverlogs/old/logline-04-page_page_edges-full.rb[]
-----
-
-----
-include::code/serverlogs/old/logline-04-page_page_edges-03-reducer.log[]
-----
-
-[[serverlogs_affinity_graph]]
-.Affinity Graph
-****
-First, you can think of it as an _affinity graph_ pairing visitor sessions with pages. The Netflix prize motivated a lot of work to help us understand affinity graphs -- in that case, a pairing of Netflix users with movies. Affinity graph-based recommendation engines simultaneously group similar users with similar movies (or similar sessions with similar pages). Imagine the following device. Set up two long straight lengths of wire, with beads that can slide along it. Beads on the left represent visitor sessions, ones on the right represent pages. These are magic beads, in that they can slide through each other, and they can clamp themselves to the wire. They are also slightly magnetic, so with no other tension they would not clump together but instead arrange themselves at some separated interval along the wire. Clamp all the beads in place for a moment and tie a small elastic string between each session bead and each page in that session. (These elastic bands also magically don't interfere with each other). To combat the crawler-robot effect, choose tighter strings when there are few pages in the session, and weaker strings when there are lots of pages in the session. Once you've finished stringing this up, unclamp one of the session beads. It will snap to a position opposit the middle of all the pages it is tied to. If you now unclamp each of those page beads, they'll move to sit opposite that first session bead. As you continue to unclamp all the beads, you'll find that they organize into clumps along the wire: when a bunch of sessions link to a common set of pages, their mutal forces combine to drag them opposite each other. That's the intuitive view; there are proper mathematical treatments, of course, for kind of co-clustering.
-
-----
-TODO: figure showing bipartite session-page graph
-----
-****
View
35 07e-geo_ip_matching.asciidoc
@@ -1,35 +0,0 @@
-
-=== Geo-IP Matching ===
-
-[[range_query]]
-[[geo_ip_matching]]
-
-You can learn a lot about your site's audience in aggregate by mapping IP addresses to geolocation. Not just in itself, but joined against other datasets, like census data, store locations, weather and time. footnote:[These databases only impute a coarse-grained estimate of each visitor's location -- they hold no direct information about the persom. Please consult your priest/rabbi/spirit guide/grandmom or other appropriate moral compass before diving too deep into the world of unmasking your site's guests.]
-
-Maxmind makes their http://www.maxmind.com/en/geolite[GeoLite IP-to-geo database] available under an open license (CC-BY-SA)footnote:[For serious use, there are professional-grade datasets from Maxmind, Quova, Digital Element among others.]. Out of the box, its columns are `beg_ip`, `end_ip`, `location_id`, where the first two columns show the low and high ends (inclusive) of a range that maps to that location. Every address lies in at most one range; locations may have multiple ranges.
-
-This arrangement caters to range queries in a relational database, but isn't suitable for our needs. A single IP-geo block can span thousands of addresses.
-
-To get the right locality, take each range and break it at some block level. Instead of having `1.2.3.4` to `1.2.5.6` on one line, let's use the first three quads (first 24 bits) and emit rows for `1.2.3.4` to `1.2.3.255`, `1.2.4.0` to `1.2.4.255`, and `1.2.5.0` to `1.2.5.6`. This lets us use the first segment as the partition key, and the full ip address as the sort key.
-
- lines bytes description file
- 15_288_766 1_094_541_688 24-bit partition key maxmind-geolite_city-20121002.tsv
- 2_288_690 183_223_435 16-bit partition key maxmind-geolite_city-20121002-16.tsv
- 2_256_627 75_729_432 original (not denormalized) GeoLiteCity-Blocks.csv
-
-
-=== Range Queries ===
-////Gently introduce the concept. "So, here's what range queries are all about, in a nutshell..." Amy////
-
-This is a generally-applicable approach for doing range queries.
-
-* Choose a regular interval, fine enough to avoid skew but coarse enough to avoid ballooning the dataset size.
-* Whereever a range crosses an interval boundary, split it into multiple records, each filling or lying within a single interval.
-* Emit a compound key of `[interval, join_handle, beg, end]`, where
- - `interval` is
- - `join_handle` identifies the originating table, so that records are grouped for a join (this is what ensures
- If the interval is transparently a prefix of the index (as it is here), you can instead just ship the remainder: `[interval, join_handle, beg_suffix, end_suffix]`.
-* Use the
-
-
-In the geodata section, the "quadtile" scheme is (if you bend your brain right) something of an extension on this idea -- instead of splitting ranges on regular intervals, we'll split regions on a regular grid scheme.
View
36 07f-benign_ddos.asciidoc
@@ -1,36 +0,0 @@
-
-[[server_logs_ddos]]
-=== Using Hadoop for website stress testing ("Benign DDos")
-
-Hadoop is engineered to consume the full capacity of every available resource up to the currently-limiting one. So in general, you should never issue requests against external services from a Hadoop job -- one-by-one queries against a database; crawling web pages; requests to an external API. The resulting load spike will effectively be attempting what web security folks call a "DDoS", or distributed denial of service attack.
-
-Unless of course you are trying to test a service for resilience against an adversarial DDoS -- in which case that assault is a feature, not a bug!
-
-.elephant_stampede
-----
- require 'faraday'
-
- processor :elephant_stampede do
-
- def process(logline)
- beg_at = Time.now.to_f
- resp = Faraday.get url_to_fetch(logline)
- yield summarize(resp, beg_at)
- end
-
- def summarize(resp, beg_at)
- duration = Time.now.to_f - beg_at
- bytesize = resp.body.bytesize
- { duration: duration, bytesize: bytesize }
- end
-
- def url_to_fetch(logline)
- logline.url
- end
- end
-
- flow(:mapper){ input > parse_loglines > elephant_stampede }
-----
-
-
-You must use Wukong's eventmachine bindings to make more than one simultaneous request per mapper.
View
6 07g-end_of_event_streams.asciidoc
@@ -1,6 +0,0 @@
-
-=== Refs ===
-
-* http://www.botsvsbrowsers.com/[Database of Robot User Agent strings]
-
-* http://research.microsoft.com/apps/pubs/default.aspx?id=67818[Improving Web Search Results Using Affinity Graph]
View
35 89-intro-to-storm.asciidoc → 08-intro_to_storm+trident.asciidoc
@@ -1,10 +1,37 @@
-Intro To Storm (Change Title)
+[[intro_to_storm_trident]]
+== Intro to Storm+Trident
-Storm+Trident is a system for processing streams of data at scale, and represents another
-<Storm Intro>
+=== Enter the Dragon: C&E Corp Gains a New Partner
+Dragons are fast and sleek, and never have to sleep. They exist in some ways out of time -- a dragon can perform a thousand actions in the blink of an eye, and yet a thousand years is to them a passing moment
-Your First Topology
+
+=== Intro: Storm+Trident Fundamentals
+
+At this point, you have good familiarity with Hadoop’s batch processing power and the powerful inquiries it unlocks above and as a counterpart to traditional database approach. Stream analytics is a third mode of data analysis and, it is becoming clear, one that is just as essential and transformative as massive scale batch processing has been.
+
+Storm is an open-source framework developed at Twitter that provides scalable stream processing. Trident draws on Storm’s powerful transport mechanism to provide _exactly once_ processing of records in _windowed batches es_ for aggregating and persisting
+to an external data store.
+
+The central challenge in building a system that can perform fallible operations on billions of records reliably is how to do so without yourself producing so much bookkeeping that it becomes its own scalable Stream processing challenge. Storm handles all details of reliable transport and efficient routing for you, leaving you with only the business process at hand. (The remarkably elegant way Storm handles that bookkeeping challenge is one of its principle breakthroughs; you’ll learn about it in the later chapter on Storm Internals.)
+
+This takes Storm past the mere processing of records to Stream Analytics -- with some limitations and some advantages, you have the same ability to specify locality and write arbitrarily powerful general-purpose code to handle every record. A lot of Storm+Trident’s adoption is in application to real-time systems. footnote:[for reasons you’ll learn in the Storm internals chapter, it’s not suitable for ultra-low latency (below, say, 5s of milliseconds), Wall Street-type applications, but if latencies above that are real-time enough for you, Storm+Trident shines.]
+
+But, just as importantly, the framework exhibits radical _tolerance_ of latency. It’s perfectly reasonable to, for every record, perform reads of a legacy data store, call an internet API and the like, even if those might have hundreds or thousands of milliseconds worst-case latency. That range of timescales is simply impractical within a batch processing run or database query. In the later chapter on the Lambda Architecture, you’ll learn how to use stream and batch analytics together for latencies that span from milliseconds to years.
+
+As an example, one of the largest hard drive manufacturers in the world ingests sensor data from its manufacturing line, test data from quality assurance processes, reports from customer support and post mortem analysis of returned devices. They have been able to mine the accumulated millisecond scale sensor data for patterns that predict flaws months and years later. Hadoop produces the "slow, deep" results, uncovering the patterns that predict failure. Storm+Trident produces the fast, relevant results: operational alerts when those anomalies are observed.
+
+Things you should take away from this chapter:
+
+Understand the type of problems you solve using stream processing and apply it to real examples using the best-in-class stream analytics frameworks.
+Acquire the practicalities of authoring, launching and validating a Storm+Trident flow.
+Understand Trident’s operators and how to use them: Each apply `CombinerAggregator`s, `ReducerAggregator`s and `AccumulatingAggregator`s (generic aggregator?)
+Persist records or aggregations directly to a backing database or to Kafka for item-potent downstream storage.
+(probably not going to discuss how to do a streaming join, using either DRPC or a hashmap join)
+
+NOTE: This chapter will only speak of Storm+Trident, the high level and from the outside. We won’t spend any time on how it’s making this all work until (to do ref the chapter on Storm+Trident internals)
+
+=== Your First Topology
Topologies in Storm are analogous to jobs in Hadoop - they define the path data takes through your system and the operations applied along the way. Topologies are compiled locally and then submitted to a Storm cluster where they run indefinitely until stopped. You define your topology and Storm handles all the hard parts -- fault tolerance, retrying, and distributing your code across the cluster among other things.
View
82 07a-serverlog_parsing.asciidoc → 10-event_streams.asciidoc
@@ -1,3 +1,6 @@
+[[event_streams]]
+== Event Streams ==
+
=== Webserver Log Parsing ===
@@ -77,3 +80,82 @@ Then <<serverlogs_output_parser_freals,run it on the full dataset>> to produce t
----
TODO
----
+
+
+
+=== Geo-IP Matching ===
+
+[[range_query]]
+[[geo_ip_matching]]
+
+You can learn a lot about your site's audience in aggregate by mapping IP addresses to geolocation. Not just in itself, but joined against other datasets, like census data, store locations, weather and time. footnote:[These databases only impute a coarse-grained estimate of each visitor's location -- they hold no direct information about the persom. Please consult your priest/rabbi/spirit guide/grandmom or other appropriate moral compass before diving too deep into the world of unmasking your site's guests.]
+
+Maxmind makes their http://www.maxmind.com/en/geolite[GeoLite IP-to-geo database] available under an open license (CC-BY-SA)footnote:[For serious use, there are professional-grade datasets from Maxmind, Quova, Digital Element among others.]. Out of the box, its columns are `beg_ip`, `end_ip`, `location_id`, where the first two columns show the low and high ends (inclusive) of a range that maps to that location. Every address lies in at most one range; locations may have multiple ranges.
+
+This arrangement caters to range queries in a relational database, but isn't suitable for our needs. A single IP-geo block can span thousands of addresses.
+
+To get the right locality, take each range and break it at some block level. Instead of having `1.2.3.4` to `1.2.5.6` on one line, let's use the first three quads (first 24 bits) and emit rows for `1.2.3.4` to `1.2.3.255`, `1.2.4.0` to `1.2.4.255`, and `1.2.5.0` to `1.2.5.6`. This lets us use the first segment as the partition key, and the full ip address as the sort key.
+
+ lines bytes description file
+ 15_288_766 1_094_541_688 24-bit partition key maxmind-geolite_city-20121002.tsv
+ 2_288_690 183_223_435 16-bit partition key maxmind-geolite_city-20121002-16.tsv
+ 2_256_627 75_729_432 original (not denormalized) GeoLiteCity-Blocks.csv
+
+
+=== Range Queries ===
+////Gently introduce the concept. "So, here's what range queries are all about, in a nutshell..." Amy////
+
+This is a generally-applicable approach for doing range queries.
+
+* Choose a regular interval, fine enough to avoid skew but coarse enough to avoid ballooning the dataset size.
+* Whereever a range crosses an interval boundary, split it into multiple records, each filling or lying within a single interval.
+* Emit a compound key of `[interval, join_handle, beg, end]`, where
+ - `interval` is
+ - `join_handle` identifies the originating table, so that records are grouped for a join (this is what ensures
+ If the interval is transparently a prefix of the index (as it is here), you can instead just ship the remainder: `[interval, join_handle, beg_suffix, end_suffix]`.
+* Use the
+
+
+In the geodata section, the "quadtile" scheme is (if you bend your brain right) something of an extension on this idea -- instead of splitting ranges on regular intervals, we'll split regions on a regular grid scheme.
+
+[[server_logs_ddos]]
+=== Using Hadoop for website stress testing ("Benign DDos")
+
+Hadoop is engineered to consume the full capacity of every available resource up to the currently-limiting one. So in general, you should never issue requests against external services from a Hadoop job -- one-by-one queries against a database; crawling web pages; requests to an external API. The resulting load spike will effectively be attempting what web security folks call a "DDoS", or distributed denial of service attack.
+
+Unless of course you are trying to test a service for resilience against an adversarial DDoS -- in which case that assault is a feature, not a bug!
+
+.elephant_stampede
+----
+ require 'faraday'
+
+ processor :elephant_stampede do
+
+ def process(logline)
+ beg_at = Time.now.to_f
+ resp = Faraday.get url_to_fetch(logline)
+ yield summarize(resp, beg_at)
+ end
+
+ def summarize(resp, beg_at)
+ duration = Time.now.to_f - beg_at
+ bytesize = resp.body.bytesize
+ { duration: duration, bytesize: bytesize }
+ end
+
+ def url_to_fetch(logline)
+ logline.url
+ end
+ end
+
+ flow(:mapper){ input > parse_loglines > elephant_stampede }
+----
+
+
+You must use Wukong's eventmachine bindings to make more than one simultaneous request per mapper.
+
+=== Refs ===
+
+* http://www.botsvsbrowsers.com/[Database of Robot User Agent strings]
+
+* http://research.microsoft.com/apps/pubs/default.aspx?id=67818[Improving Web Search Results Using Affinity Graph]
View
140 10a-event_streams-more.asciidoc
@@ -0,0 +1,140 @@
+////This intro here is a good model for other chapters - this one's rough, but the bones are here. Amy////
+
+Much of Hadoop's adoption is driven by organizations realizing they the opportunity to measure every aspect of their operation, unify those data sources, and act on the patterns that Hadoop and other Big Data tools uncover.////Share a few examples of things that can be measured (website hits, etc.) Amy////
+
+For
+
+e-commerce site, an advertising broker, or a hosting provider, there's no wonder inherent in being able to measure every customer interaction, no controversy that it's enormously valuable to uncovering patterns in those interactions, and no lack of tools to act on those patterns in real time.
+
+can use the clickstream of interactions with each email
+; this was one of the cardinal advantages cited in the success of Barack Obama's 2012 campaign.
+
+
+This chapter's techniques will help, say, a hospital process the stream of data from every ICU patient; a retailer process the entire purchase-decision process
+from
+or a political campaign to understand and tune
+the response to
+each email batch and advertising placement.
+
+
+Hadoop cut its teeth at Yahoo, where it was primarily used for processing internet-sized web crawls(see next chapter on text processing) and
+
+// ?? maybe this should just be 'data streams' or something
+
+
+Quite likely, server log processing either a) is the reason you got this book or b) seems utterly boring.////I think you should explain this for readers. Amy//// For the latter folks, stick with it; hidden in this chapter are basic problems of statistics (finding histogram of pageviews), text processing (regular expressions for parsing), and graphs (constructing the tree of paths that users take through a website).
+
+=== Pageview Histograms ===
+////Ease the reader in with something like, "Our goal here will be..." Amy////
+
+Let's start exploring the dataset. Andy Baio
+
+----
+include::code/serverlogs/old/logline-02-histograms-mapper.rb[]
+----
+
+We want to group on `date_hr`, so just add a 'virtual accessor' -- a method that behaves like an attribute but derives its value from another field:
+
+----
+include::code/serverlogs/old/logline-00-model-date_hr.rb[]
+----
+
+This is the advantage of having a model and not just a passive sack of data.
+
+Run it in map mode:
+
+----
+include::code/serverlogs/old/logline-02-histograms-02-mapper-wu-lign-sort.log[]
+----
+
+TODO: digression about `wu-lign`.
+
+Sort and save the map output; then write and debug your reducer.
+
+----
+include::code/serverlogs/old/logline-02-histograms-full.rb[]
+----
+
+When things are working, this is what you'll see. Notice that the `.../Star_Wars_Kid.wmv` file already have five times the pageviews as the site root (`/`).
+
+----
+include::code/serverlogs/old/logline-02-histograms-03-reduce.log[]
+----
+
+You're ready to run the script in the cloud! Fire it off and you'll see dozens of workers start processing the data.
+
+----
+include::code/serverlogs/old/logline-02-histograms-04-freals.log[]
+----
+
+
+=== User Paths through the site ("Sessionizing")
+
+We can use the user logs to assemble a picture of how users navigate the site -- 'sessionizing' their pageviews. Marketing and e-commerce sites have a great deal of interest in optimizing their "conversion funnel", the sequence of pages that visitors follow before filling out a contact form, or buying those shoes, or whatever it is the site exists to serve. Visitor sessions are also useful for defining groups of related pages, in a manner far more robust than what simple page-to-page links would define. A recommendation algorithm using those relations would for example help an e-commerce site recommend teflon paste to a visitor buying plumbing fittings, or help a news site recommend an article about Marilyn Monroe to a visitor who has just finished reading an article about John F Kennedy. Many commercial web analytics tools don't offer a view into user sessions -- assembling them is extremely challenging for a traditional datastore. It's a piece of cake for Hadoop, as you're about to see.
+
+////This spot could be an effective place to say more about "Locality" and taking the reader deeper into thinking about that concept in context. Amy////
+
+NOTE:[Take a moment and think about the locality: what feature(s) do we need to group on? What additional feature(s) should we sort with?]
+
+
+spit out `[ip, date_hr, visit_time, path]`.
+
+----
+include::code/serverlogs/old/logline-03-breadcrumbs-full.rb[]
+----
+
+You might ask why we don't partition directly on say both `visitor_id` and date (or other time bucket). Partitioning by date would break the locality of any visitor session that crossed midnight: some of the requests would be in one day, the rest would be in the next day.
+
+run it in map mode:
+
+----
+include::code/serverlogs/old/logline-02-histograms-01-mapper.log[]
+----
+
+----
+include::code/serverlogs/old/logline-03-breadcrumbs-02-mapper.log[]
+----
+
+group on user
+
+----
+include::code/serverlogs/old/logline-03-breadcrumbs-03-reducer.log[]
+----
+
+We use the secondary sort so that each visit is in strict order of time within a session.
+
+You might ask why that is necessary -- surely each mapper reads the lines in order? Yes, but you have no control over what order the mappers run, or where their input begins and ends.
+
+This script will accumulate multiple visits of a page.
+
+TODO: say more about the secondary sort.
+////This may sound wacky, but please try it out: use the JFK/MM exmaple again, here. Tie this all together more, the concepts, using those memorable people. I can explain this live, too. Amy////
+
+==== Web-crawlers and the Skew Problem ====
+
+In a
+
+It's important to use real data when you're testing algorithms:
+a skew problem like this
+
+=== Page-Page similarity
+
+What can you do with the sessionized logs? Well, each row lists a visitor-session on the left and a bunch of pages on the right. We've been thinking about that as a table, but it's also a graph -- actually, a bunch of graphs! The <<sidebar,serverlogs_affinity_graph>> describes an _affinity graph_, but we can build a simpler graph that just connects pages to pages by counting the number of times a pair of pages were visited by the same session. Every time a person requests the `/archive/2003/04/03/typo_pop.shtml` page _and_ the `/archive/2003/04/29/star_war.shtml` page in the same visit, that's one point towards their similarity. The chapter on <<graph_processing>> has lots of fun things to do with a graph like this, so for now we'll just lay the groundwork by computing the page-page similarity graph defined by visitor sessions.
+
+----
+include::code/serverlogs/old/logline-04-page_page_edges-full.rb[]
+----
+
+----
+include::code/serverlogs/old/logline-04-page_page_edges-03-reducer.log[]
+----
+
+[[serverlogs_affinity_graph]]
+.Affinity Graph
+****
+First, you can think of it as an _affinity graph_ pairing visitor sessions with pages. The Netflix prize motivated a lot of work to help us understand affinity graphs -- in that case, a pairing of Netflix users with movies. Affinity graph-based recommendation engines simultaneously group similar users with similar movies (or similar sessions with similar pages). Imagine the following device. Set up two long straight lengths of wire, with beads that can slide along it. Beads on the left represent visitor sessions, ones on the right represent pages. These are magic beads, in that they can slide through each other, and they can clamp themselves to the wire. They are also slightly magnetic, so with no other tension they would not clump together but instead arrange themselves at some separated interval along the wire. Clamp all the beads in place for a moment and tie a small elastic string between each session bead and each page in that session. (These elastic bands also magically don't interfere with each other). To combat the crawler-robot effect, choose tighter strings when there are few pages in the session, and weaker strings when there are lots of pages in the session. Once you've finished stringing this up, unclamp one of the session beads. It will snap to a position opposit the middle of all the pages it is tied to. If you now unclamp each of those page beads, they'll move to sit opposite that first session bead. As you continue to unclamp all the beads, you'll find that they organize into clumps along the wire: when a bunch of sessions link to a common set of pages, their mutal forces combine to drag them opposite each other. That's the intuitive view; there are proper mathematical treatments, of course, for kind of co-clustering.
+
+----
+TODO: figure showing bipartite session-page graph
+----
+****
View
287 21-hadoop_internals.asciidoc
@@ -1,290 +1,51 @@
[[hadoop_internals]]
== Hadoop Internals
-One of the wonderful and terrible things about Hadoop (or anything else at Big Data scale) is that there are very few boundary cases for performance optimization. If your dataflow has the wrong shape, it is typically so catastrophically inefficient as to be unworkable. Otherwise, Hadoop’s scalability makes the price of simply throwing more hardware at the problem competitive with investing to optimize it, especially for exploratory analytics. That’s why the repeated recommendation of this book is: Get the algorithm right, get the contextability right and size your cluster to your job.
+For 16 chapters now, we’ve been using Hadoop and Storm+Trident from the outside. The biggest key to writing efficient dataflows is to understand the interface and fundamental patterns of use, not the particulars of how these framework executes the dataflow. However, even the strongest abstractions pushed far enough can leak, and so at some point, it’s important to understand these internal details. These next few chapters concentrate on equipping you to understand your jobs’ performance and practical tips for improving it; if you’re looking for more, by far, the best coverage of this material is found in (TODO: Add links Tom White’s Hadoop: The Definitive Guide and Eric Sammer’s Hadoop Operations).
-When you begin to productionize the results of all your clever work, it becomes valuable to look for these 30-percent, 10-percent improvements. Debug loop time is also important, though, so it is useful to get a feel for when and why early optimization is justified.
+Let’s first focus on the internals of Hadoop.
-The first part of this chapter discusses Tuning for the Wise and Lazy -- showing you how to answer the question, “Is my job slow and should I do anything about it?” Next, we will discuss Tuning for the Brave and Foolish, diving into the details of Hadoop’s maddeningly numerous, often twitchy, configuration knobs. There are low-level setting changes that can dramatically improve runtime; we will show you how to recognize them and how to determine what per-job overrides to apply. Lastly, we will discuss a formal approach for diagnosing the health and performance of your Hadoop cluster and your Hadoop jobs to help you confidently and efficiently identify the source of deeper flaws or bottlenecks.
+=== HDFS (NameNode and DataNode)
-=== Chimpanzee and Elephant: A Day at Work ===
+It’s time to learn how the HDFS stores your data; how the Map/Reduce framework launches and coordinates job attempts; and what happens within the framework as your Map/Reduce process executes.
-Each day, the chimpanzee's foreman, a gruff silverback named J.T., hands each chimp the day's translation manual and a passage to translate as they clock in. Throughout the day, he also coordinates assigning each block of pages to chimps as they signal the need for a fresh assignment.
+The HDFS provides three remarkable guarantees: durability (your data is never lost or corrupted), availability (you can always access it) and efficiency (you can consume the data at high rate especially from Map/Reduce jobs and other clients). The center of action, as you might guess, is the NameNode. The NameNode is a permanently running daemon process that tracks the location of every block on the network of DataNodes, along with its name and other essential metadata. (If you’re familiar with the File Allocation Table (FAT) of a traditional file system, it’s like a more active version of that. FOOTNOTE: [If you’re not familiar with what an FAT is, then it’s like the system you’re reading about but for a file system.])
-Some passages are harder than others, so it's important that any elephant can deliver page blocks to any chimpanzee -- otherwise you'd have some chimps goofing off while others are stuck translating _King Lear_ into Kinyarwanda. On the other hand, sending page blocks around arbitrarily will clog the hallways and exhaust the elephants.
+(NOTE: check something … appending)
-The elephants' chief librarian, Nanette, employs several tricks to avoid this congestion.
+When a client wishes to create a file, it contacts the NameNode with the desired path and high-level metadata. The NameNode records that information in an internal table and identifies the DataNodes that will hold the data. The NameNode then replies with that set of DataNodes, identifying one of them as the initial point of contact. (When we say "client", that’s anything accessing the NameNode, whether it’s a Map/Reduce job, one of the Hadoop filesystem commands or any other program.) The file is now exclusively available to the client for writing but will remain invisible to anybody else until the write has concluded (TODO: Is it when the first block completes or when the initial write completes?).
-Since each chimpanzee typically shares a cubicle with an elephant, it's most convenient to hand a new page block across the desk rather then carry it down the hall. J.T. assigns tasks accordingly, using a manifest of page blocks he requests from Nanette. Together, they're able to make most tasks be "local".
+Within the client’s request, it may independently prescribe a replication factor, file permissions, and block size _____________ (TODO: fill in)
-Second, the page blocks of each play are distributed all around the office, not stored in one book together. One elephant might have pages from Act I of _Hamlet_, Act II of _The Tempest_, and the first four scenes of _Troilus and Cressida_ footnote:[Does that sound complicated? It is -- Nanette is able to keep track of all those blocks, but if she calls in sick, nobody can get anything done. You do NOT want Nanette to call in sick.]. Also, there are multiple 'replicas' (typically three) of each book collectively on hand. So even if a chimp falls behind, JT can depend that some other colleague will have a cubicle-local replica. (There's another benefit to having multiple copies: it ensures there's always a copy available. If one elephant is absent for the day, leaving her desk locked, Nanette will direct someone to make a xerox copy from either of the two other replicas.)
+The client now connects to the indicated DataNode and begins sending data. At the point you’ve written a full block’s worth of data, the DataNode transparently finalizes that block and begins another (TODO: check that it’s the DataNode that does this). As it finishes each block or at the end of the file, it independently prepares a checksum of that block, radioing it back to the NameNode and begins replicating its contents to the other DataNodes. (TODO: Is there an essential endoffile ritual?) This is all transparent to the client, who is able to send data as fast as it can cram it through the network pipe.
-Nanette and J.T. exercise a bunch more savvy optimizations (like handing out the longest passages first, or having folks who finish early pitch in so everyone can go home at the same time, and more). There's no better demonstration of power through simplicity.
+Once you’ve created a file, its blocks are immutable -- as opposed to a traditional file system, there is no mechanism for modifying its internal contents. This is not a limitation; it’s a feature. Making the file system immutable not only radically simplifies its implementation, it makes the system more predictable operationally and simplifies client access. For example, you can have multiple jobs and clients access the same file knowing that a client in California hasn’t modified a block being read in Tokyo (or even worse, simultaneously modified by someone in Berlin). (TODO: When does append become a thing?)
-=== Brief Anatomy of a Hadoop Job ===
+The end of the file means the end of its data but not the end of the story. At all times, the DataNode periodically reads a subset of its blocks to find their checksums and sends a "heartbeat" back to the DataNode with the (hopefully) happy news. ____________ (TODO: fill in). There are several reasons a NameNode will begin replicating a block. If a DataNode’s heartbeat reports an incorrect block checksum, the NameNode will remove that DataNode from the list of replica holders for that block, triggering its replication from one of the remaining DataNodes from that block. If the NameNode has not received a heartbeat from a given DataNode within the configured timeout, it will begin replicating all of that DataNode’s blocks; if that DataNode comes back online, the NameNode calmly welcomes it back into the cluster, cancelling replication of the valid blocks that DataNode holds. Furthermore, if the amount of data on the most populated and least populated DataNodes becomes larger than a certain threshold or the replication factor for a file is increased, it will rebalance; you can optionally trigger one earlier using the `hadoop balancer` command.
-We'll go into much more detail in (TODO: ref), but here are the essentials of what you just performed.
+However it’s triggered, there is no real magic; one of the valid replica-holder DataNodes sends the block contents to the new replica holder, which heartbeats back the block once received. (TODO: Check details)
-==== Copying files to the HDFS ====
+As you can see, the NameNode and its metadata are at the heart of every HDFS story. This is why the new HighAvailability (HA) NameNode feature in recent versions is so important and should be used in any production installation. It’s even more important, as well, to protect and backup the NameNode’s metadata, which, unfortunately, many people don’t know to do. (TODO: Insert notes on NameNode metadata hygiene previously written).
-When you ran the `hadoop fs -mkdir` command, the Namenode (Nanette's Hadoop counterpart) simply made a notation in its directory: no data was stored. If you're familiar with the term, think of the namenode as a 'File Allocation Table (FAT)' for the HDFS.
+The NameNode selects the recipient DataNodes with some intelligence. Information travels faster among machines on the same switch, switches within the same data center and so forth. However, if a switch fails, all its machines are unavailable and if a data center fails, all its switches are unavailable and so forth. When you configure Hadoop, there is a setting that allows you to tell it about your network hierarchy. If you do so, the NameNode will ensure the first replica lies within the most distant part of the hierarchy -- durability is important above all else. All further replicas will be stored within the same rack -- providing the most efficient replication. (TODO: Mention the phrase "rack aware.") As permitted within that scheme, it also tries to ensure that the cluster is balanced -- preferring DataNodes with less data under management. (TODO: Check that it’s amount of data not percent free)
-When you run `hadoop fs -put ...`, the putter process does the following for each file:
+(TODO: Does it make contact on every block or at the start of a file?)
-1. Contacts the namenode to create the file. This also just makes a note of the file; the namenode doesn't ever have actual data pass through it.
-2. Instead, the putter process asks the namenode to allocate a new data block. The namenode designates a set of datanodes (typically three), along with a permanently-unique block ID.
-3. The putter process transfers the file over the network to the first data node in the set; that datanode transfers its contents to the next one, and so forth. The putter doesn't consider its job done until a full set of replicas have acknowledged successful receipt.
-4. As soon as each HDFS block fills, even if it is mid-record, it is closed; steps 2 and 3 are repeated for the next block.
+The most important feature of the HDFS is that it is highly resilient against failure of its underlying components. Any system achieves resiliency through four mechanisms: Act to prevent failures; insulate against failure through redundancy; isolate failure within independent fault zones; and lastly, detect and remediate failures rapidly. (FOOTNOTE: This list is due to James Hamilton, TODO: link whose blocks and papers are essential reading). The HDFS is largely insulated from failure by using file system-based access (it does not go behind the back of the operating system), by being open source (ensuring code is reviewed by thousands of eyes and run at extremely high scale by thousands of users), and so forth. Failure above the hardware level is virtually unheard of. The redundancy is provided by replicating the data across multiple DataNodes and, in recent versions, using the Zookeeper-backed HighAvailability NameNode implementation.
-==== Running on the cluster ====
+The rack awareness, described above, isolates failure using your defined network hierarchy, and at the semantic level, independently for each HDFS block. Lastly, the heartbeat and checksum mechanisms along with its active replication and monitoring hooks allow it and its Operators to detect intermediate faults.
-Now let's look at what happens when you run your job.
+==== S3 File System
-(TODO: verify this is true in detail. @esammer?)
+The Amazon EC2 Cloud provides an important alternative to the HDFS, its S3 object store. S3 transparently provides multi-region replication far exceeding even HDFS’ at exceptionally low cost (at time of writing, about $80 per terabyte per month, and decreasing at petabyte and higher scale). What’s more, its archival datastore solution, Glacier, will hold rarely-accessed data at one-tenth that price and even higher durability. (FOOTNOTE: The quoted durability figure puts the engineering risk below, say, the risk of violent overthrow of our government). For machines in the Amazon Cloud with a provisioned connection, the throughput to and from S3 is quite acceptable for Map/Reduce use.
-* _Runner_: The program you launch sends the job and its assets (code files, etc) to the jobtracker. The jobtracker hands a `job_id` back (something like `job_201204010203_0002` -- the datetime the jobtracker started and the count of jobs launched so far); you'll use this to monitor and if necessary kill the job.
-* _Jobtracker_: As tasktrackers "heartbeat" in, the jobtracker hands them a set of 'task's -- the code to run and the data segment to process (the "split", typically an HDFS block).
-* _Tasktracker_: each tasktracker launches a set of 'mapper child processes', each one an 'attempt' of the tasks it received. (TODO verify:) It periodically reassures the jobtracker with progress and in-app metrics.
-* _Jobtracker_: the Jobtracker continually updates the job progress and app metrics. As each tasktracker reports a complete attempt, it receives a new one from the jobtracker.
-* _Tasktracker_: after some progress, the tasktrackers also fire off a set of reducer attempts, similar to the mapper step.
-* _Runner_: stays alive, reporting progress, for the full duration of the job. As soon as the job_id is delivered, though, the Hadoop job itself doesn't depend on the runner -- even if you stop the process or disconnect your terminal the job will continue to run.
+Hadoop has a built-in facade for the S3 file system, and you can do more or less all the things you do with an HDFS: list, put and get files; run Map/Reduce jobs to and from any combination of HDFS and S3; read and create files transparently using Hadoop’s standard file system API. There are actually two facades. The `s3hdfs` facade (confusingly labeled as plain `s3` by Hadoop but we will refer to it here as `s3hdfs`) stores blocks in individual files using the same checksum format as on a DataNode and stores the Name Node-like metadata separately in a reserved area. The `s3n` facade, instead, stores a file as it appears to the Hadoop client, entirely in an `s3` object with a corresponding path. When you visit S3 using Amazon’s console or any other standard S3 client, you’ll see a file called `/my/data.txt` as an object called `datadoc.txt` in `MyContainer` and its contents are immediately available to any such client; that file, written `s3hdfs` will appear in objects named for 64-bit identifiers like `0DA37f...` and with uninterpretable contents. However, `s3n` cannot store an individual file larger than 5 terabytes. The `s3hdfs` blocks minorly improve Map/Reduce efficiency and can store files of arbitrary size. All in all, we prefer the `s3n` facade; the efficiency improvement for the robots does not make up for the impact on convenience on the humans using the system and that it’s a best-practice to not make individual files larger than 1 terabyte any way.
-[WARNING]
-===============================
-Please keep in mind that the tasktracker does _not_ run your code directly -- it forks a separate process in a separate JVM with its own memory demands. The tasktracker rarely needs more than a few hundred megabytes of heap, and you should not see it consuming significant I/O or CPU.
-===============================
+The universal availability of client libraries makes S3 a great hand-off point to other systems or other people looking to use the data. We typically use a combination of S3, HDFS and Glacier in practice. Gold data -- anything one project produces that another might use -- is stored on S3. In production runs, jobs read their initial data from S3 and write their final data to S3 but use an HDFS local to all its compute nodes for any intermediate checkpoints.
-=== Chimpanzee and Elephant: Splits ===
+When developing a job, we run an initial `distcp` from S3 onto the HDFS and do all further development using the cluster-local HDFS. The cluster-local HDFS provides better (but not earth-shakingly better) Map/Reduce performance. It is, however, noticeably faster in interactive use (file system commands, launching jobs, etc). Applying the "robots are cheap, humans are important" rule easily justifies the maintenance of the cluster-local HDFS.
-I've danced around a minor but important detail that the workers take care of. For the Chimpanzees, books are chopped up into set numbers of pages -- but the chimps translate _sentences_, not pages, and a page block boundary might happen mid-sentence.
-//// Provide a real world analogous example here to help readers correlate this story to their world and data analysis needs, "...This example is similar to..." Amy////
-
-The Hadoop equivalent of course is that a data record may cross and HDFS block boundary. (In fact, you can force map-reduce splits to happen anywhere in the file, but the default and typically most-efficient choice is to split at HDFS blocks.)
-
-A mapper will skip the first record of a split if it's partial and carry on from there. Since there are many records in each split, that's no big deal. When it gets to the end of the split, the task doesn't stop processing until is completes the current record -- the framework makes the overhanging data seamlessley appear.
-//// Again, here, correlate this example to a real world scenario; "...so if you were translating x, this means that..." Amy////
-
-In practice, Hadoop users only need to worry about record splitting when writing a custom `InputFormat` or when practicing advanced magick. You'll see lots of reference to it though -- it's a crucial subject for those inside the framework, but for regular users the story I just told is more than enough detail.
-
-
-=== Tuning For The Wise and Lazy
-
-Before tuning your job, it is important to first understand whether it is slow and if so, where the bottleneck arises. Generally speaking, your Hadoop job will either be CPU-bound within your code, memory-bound within your code, bound by excessive Reducer data (causing too many merge passes) or throughput-bound within the framework or underlying system. (In the rare cases it is not one of those, you would next apply the more comprehensive “USE method” (TODO: REF) described later in this chapter.) But for data-intensive jobs, the fundamental upper bound on Hadoop’s performance is its effective throughput from disk. You cannot make your job process data faster than it can load it.
-
-Here is the conceptual model we will use: Apart from necessary overhead, job coordination and so forth, a Hadoop job must:
-
-* Streams data to the Mapper, either locally from disk or remotely over the network;
-* Runs that data through your code;
-* Spills the midstream data to disk one or more times;
-* Applies combiners, if any;
-* Merge/Sorts the spills and sends the over the network to the Reducer.
-
-The Reducer:
-
-* Writes each Mapper’s output to disk;
-* Performs some number of Merge/Sort passes;
-* Reads the data from disk;
-* Runs that data through your code;
-* Writes its output to the DataNode, which writes that data once to disk and twice through the network;
-* Receives two other Reducers’ output from the network and writes those to disk.
-
-Hadoop is, of course, pipelined; to every extent possible, those steps are happening at the same time. What we will do, then, is layer in these stages one by one, at each point validating that your job is as fast as your disk until we hit the stage where it is not.
-
-==== Fixed Overhead
-
-The first thing we want is a job that does nothing; this will help us understand the fixed overhead costs. Actually, what we will run is a job that does almost nothing; it is useful to know that your test really did run.
-
-----
-(TODO: Disable combining splits)
-Load 10_tiny_files
-Filter most of it out
-Store to disk
-
-(TODO: Restrict to 50 Mapper slots or rework)
-Load 10000_tiny_files
-Filter most of it out
-Store to disk
-----
-(TODO: is there a way to limit the number of Reduce slots in Pig? Otherwise, revisit the below.)
-
-In (TODO: REF), there is a performance comparison worksheet that you should copy and fill in as we go along. It lists the performance figures for several reference clusters on both cloud and dedicated environments for each of the tests we will perform. If your figures do not compare well with the appropriate reference cluster, it is probably worthwhile adjusting the overall configuration. Assuming your results are acceptable, you can tape the worksheet to your wall and use it to baseline all the jobs you will write in the future. The rest of the chapter will assume that your cluster is large enough to warrant tuning but not grossly larger than the largest reference cluster.
-
-If you run the Pig script above (TODO: REF), Hadoop will execute two jobs: one with 10 Mappers and no Reducers and another with 10,000 Mappers and no Reducers. From the Hadoop Job Tracker page for your job, click on the link showing the number of Map tasks to see the full task listing. All 10 tasks for the first job should have started at the same time and uniformly finished a few seconds after that. Back on the main screen, you should see that the total job completion time was more or less identical to that of the slowest Map task.
-
-The second job ran its 10,000 Map tasks through a purposefully restricted 50 Mapper slots -- so each Mapper slot will have processed around 200 files. If you click through to the Task listing, the first wave of tasks should start simultaneously and all of them should run in the same amount of time that the earlier test did.
-
-(TODO: show how to find out if one node is way slower)
-
-Even in this trivial case, there is more variance in launch and runtimes than you might first suspect (if you don't, you definitely will in the next -- but for continuity, we will discuss it here). If that splay -- the delay between the bulk of jobs finishing and the final job finishing -- is larger than the runtime of a typical task, however, it may indicate a problem, but as long as it is only a few seconds, don’t sweat it. If you are interested in a minor but worth-it tweak, adjust the `mapred.job.reuse.jvm.num.tasks` setting to ‘-1’, which causes each Mapper to use the same child process JVM for its attempts, eliminating the brief but noticeable JVM startup time. If you are writing your own native Java code, you might know a reason to go with the default (disabled) setting but it is harmless for any well-behaved program.
-
-On the Job screen, you should see that the total runtime for the job was about 200 times slower for the second job than the first and not much more than 200 times the typical task’s runtime; if not, you may be putting pressure on the Job Tracker. Rerun your job and watch the Job Tracker’s heap size; you would like the Job Tracker heap to spend most of its life below, say 50-percent, so if you see it making any significant excursions toward 100-percent, that would unnecessarily impede cluster performance. The 1 GB out-of-the-box setting is fairly small; for production use we recommend at least 3 GB of heap on a dedicated machine with at least 7 GB total RAM.
-
-If the Job coordination overhead is unacceptable but the Job Tracker heap is not to blame, a whole host of other factors might be involved; apply the USE method, described (TODO: REF).
-
-=== Mapper Input
-
-Now that we’ve done almost nothing, let’s do almost something -- read in a large amount of data, writing just enough to disk to know that we really were there.
-
-----
-Load 100 GB from disk
-Filter all but 100 MB
-Store it to disk
-----
-
-Run that job on the 100-GB GitHub archive dataset. (TODO: Check that it will do speculative execution.) Once the job completes, you will see as many successful Map tasks as there were HDFS blocks in the input; if you are running a 128-MB block size, this will be about (TODO: How many blocks are there?).
-
-Again, each Map task should complete in a uniform amount of time and the job as a whole should take about ‘length_of_Map_task*number_of_Map_tasks=number_of_Mapper_slots’. The Map phase does not end until every Mapper task has completed and, as we saw in the previous example, even in typical cases, there is some amount of splay in runtimes.
-
-(TODO: Move some of JT and Nanette’s optimizations forward to this chapter). Like the chimpanzees at quitting time, the Map phase cannot finish until all Mapper tasks have completed.
-
-You will probably notice a half-dozen or so killed attempts as well. The ‘TODO: name of speculative execution setting’, which we recommend enabling, causes Hadoop to opportunistically launch a few duplicate attempts for the last few tasks in a job. The faster job cycle time justifies the small amount of duplicate work.
-
-Check that there are few non-local Map tasks -- Hadoop tries to assign Map attempts (TODO: check tasks versus attempts) to run on a machine whose DataNode holds that input block, thus avoiding a trip across the network (or in the chimpanzees’ case, down the hallway). It is not that costly, but if you are seeing a large number of non-local tasks on a lightly-loaded cluster, dig deeper.
-
-Dividing the average runtime by a full block of Map task by the size of an HDFS block gives you the Mapper’s data rate. In this case, since we did almost nothing and wrote almost nothing, that value is your cluster’s effective top speed. This has two implications: First, you cannot expect a data-intensive job to run faster than its top speed. Second, there should be apparent reasons for any job that runs much slower than its top speed. Tuning Hadoop is basically about making sure no other part of the system is slower than the fundamental limit at which it can stream from disk.
-
-While setting up your cluster, it might be worth baselining Hadoop’s top speed against the effective speed of your disk and your network. Follow the instructions for the ‘scripts/baseline_performance’ script (TODO: write script) from the example code above. It uses a few dependable user-level processes to measure the effective data rate to disk (‘DD’ and ‘CP’) and the effective network rate (‘NC’ and ‘SCP’). (We have purposely used user-level processes to account for system overhead; if you want to validate that as well, use a benchmark like Bonnie++ (TODO: link)). If you are dedicated hardware, the network throughput should be comfortably larger than the disk throughput. If you are on cloud machines, this, unfortunately, might not hold but it should not be atrociously lower.
-
-If the effective top speed you measured above is not within (TODO: figure out healthy percent) percent, dig deeper; otherwise, record each of these numbers on your performance comparison chart.
-
-If you're setting up your cluster, take the time to generate enough additional data to keep your cluster fully saturated for 20 or more minutes and then ensure that each machine processed about the same amount of data. There is a lot more variance in effective performance among machines than you might expect, especially in a public cloud environment; it can also catch a machine with faulty hardware or setup. This is a crude but effective benchmark, but if you're investing heavily in a cluster consider running a comprehensive benchmarking suite on all the nodes -- the chapter on Stupid Hadoop Tricks shows how (TODO ref)
-
-=== The Many Small Files Problem
-
-One of the most pernicious ways to impair a Hadoop cluster’s performance is the “many-small-files” problem. With a 128-MB block size (which we will assume for the following discussion), a 128-MB file takes one block (obviously), a 1-byte file takes one block and a 128-MB+1 byte file takes two blocks, one of them full, the other with one solitary byte.
-
-Storing 10 GB of data in, say, 100 files is harmless -- the average block occupancy is a mostly-full 100 MB. Storing that same 10GB in say 10,000 files is, however, harmful in several ways. At the heart of the Namenode is a table that lists every file and block. As you would expect, the memory usage of that table roughly corresponds to the number of files plus the number of blocks, so the many-small-files example uses about 100 times as much memory as warranted. Engage in that bad habit often enough and you will start putting serious pressure on the Namenode heap and lose your job shortly thereafter. What is more, the many-small-files version will require 10,000 Map tasks, causing memory pressure on the Job Tracker and a job whose runtime is dominated by task overhead. Lastly, there is the simple fact that working with 10,000 things is more annoying than working with 100 -- it takes up space in datanode heartbeats, client requests, your terminal screen and your head.
-
-Causing this situation is easier to arrive at than you might expect; in fact, you just did so. The 100-GB job you just ran most likely used 800 Mapper slots yet output only a few MB of data. Any time your mapper output is significantly smaller than its input -- for example, when you apply a highly-restrictive filter to a large input -- your output files will have poor occupancy.
-
-A sneakier version of this is a slightly “expansive” Mapper-Only job. A job whose Mappers turned a 128-MB block into, say, 150 MB of output data would reduce the block occupancy by nearly half and require nearly double the Mapper slots in the following jobs. Done once, that is merely annoying but in a workflow that iterates or has many stages, the cascading dilution could become dangerous.
-
-You can audit your HDFS to see if this is an issue using the ‘hadoop fsck [directory]’ command. Running that command against the directory holding the GitHub data should show 100 GB of data in about 800 blocks. Running it against your last job’s output should show only a few MB of data in an equivalent number of blocks.
-
-You can always distill a set of files by doing ‘group_by’ with a small number of Reducers using the record itself as a key. Pig and Hive both have settings to mitigate the many-small-files problem. In Pig, the (TODO: find name of option) setting will feed multiple small files to the same Mapper; in Hive (TODO: look up what to do in Hive). In both cases, we recommend modifying your configuration to make that the default and disable it on a per-job basis when warranted.
-
-=== Midstream Data
-
-Now let’s start to understand the performance of a proper Map/Reduce job. Run the following script, again, against the 100 GB GitHub data.
-
-----
-Parallel 50
-Disable optimizations for pushing up filters and for Combiners
-Load 100 GB of data
-Group by record itself
-Filter out almost everything
-Store data
-----
-
-The purpose of that job is to send 100 GB of data at full speed through the Mappers and midstream processing stages but to do almost nothing in the Reducers and write almost nothing to disk. To keep Pig from “helpfully” economizing the amount of midstream data, you will notice in the script we disabled some of its optimizations. The number of Map tasks and their runtime should be effectively the same as in the previous example, and all the sanity checks we’ve given so far should continue to apply. The overall runtime of the Map phase should only be slightly longer (TODO: how much is slightly?) than in the previous Map-only example, depending on how well your network is able to outpace your disk.
-
-It is an excellent idea to get into the habit of predicting the record counts and data sizes in and out of both Mapper and Reducer based on what you believe Hadoop will be doing to each record and then comparing to what you see on the Job Tracker screen. In this case, you will see identical record counts for Mapper input, Mapper output and Reducer input and nearly identical data sizes for HDFS bytes read, Mapper output, Mapper file bytes written and Reducer input. The reason for the small discrepancies is that, for the file system metrics, Hadoop is recording everything that is read or written, including logged files and so forth.
-
-Midway or so through the job -- well before the finish of the Map phase -- you should see the Reducer tasks start up; their eagerness can be adjusted using the (TODO: name of setting) setting. By starting them early, the Reducers are able to begin merge/sorting the various Map task outputs in parallel with the Map phase. If you err low on this setting, you will disappoint your coworkers by consuming Reducer slots with lots of idle time early but that is better than starting them too late, which will sabotage parallels.
-
-Visit the Reducer tasks listing. Each Reducer task should have taken a uniform amount of time, very much longer than the length of the Map tasks. Open a few of those tasks in separate browser tabs and look at their counters; each should have roughly the same input record count and data size. It is annoying that this information is buried as deeply as it is because it is probably the single most important indicator of a flawed job; we will discuss it in detail a bit later on.
-
-==== Spills
-
-First, though, let’s finish understanding the data’s detailed journey from Mapper to Reducer. As a Map task outputs records, Hadoop sorts them in the fixed-size io.sort buffer. Hadoop files records into the buffer in partitioned, sorted order as it goes. When that buffer fills up (or the attempt completes), Hadoop begins writing to a new empty io.sort buffer and, in parallel, “spills” that buffer to disk. As the Map task concludes, Hadoop merge/sorts these spills (if there were more than one) and sends the sorted chunks to each Reducer for further merge/sorting.
-
-The Job Tracker screen shows the number of Mapper spills. If the number of spills equals the number of Map tasks, all is good -- the Mapper output is checkpointed to disk before being dispatched to the Reducer. If the size of your Map output data is large, having multiple spills is the natural outcome of using memory efficiently; that data was going to be merge/sorted anyway, so it is a sound idea to do it on the Map side where you are confident it will have a uniform size distribution.
-
-(TODO: do combiners show as multiple spills?)
-
-What you hate to see, though, are Map tasks with two or three spills. As soon as you have more than one spill, the data has to be initially flushed to disk as output, then read back in full and written again in full for at least one merge/sort pass. Even the first extra spill can cause roughly a 30-percent increase in Map task runtime.
-
-There are two frequent causes of unnecessary spills. First is the obvious one: Mapper output size that slightly outgrows the io.sort buffer size. We recommend sizing the io.sort buffer to comfortably accommodate Map task output slightly larger than your typical HDFS block size -- the next section (TODO: REF) shows you how to calculate. In the significant majority of jobs that involve a Reducer, the Mapper output is the same or nearly the same size -- JOINs or GROUPs that are direct, are preceded by a projection or filter or have a few additional derived fields. If you see many of your Map tasks tripping slightly over that limit, it is probably worth requesting a larger io.sort buffer specifically for your job.
-
-There is also a disappointingly sillier way to cause unnecessary spills: The io.sort buffer holds both the records it will later spill to disk and an index to maintain the sorted order. An unfortunate early design decision set a fixed size on both of those with fairly confusing control knobs. The ‘iosortrecordpercent’ (TODO: check name of setting) setting gives the size of that index as a fraction of the sort buffer. Hadoop spills to disk when either the fraction devoted to records or the fraction devoted to the index becomes full. If your output is long and skinny, cumulatively not much more than an HDFS block but with a typical record size smaller than, say, 100 bytes, you will end up spilling multiple small chunks to disk when you could have easily afforded to increase the size of the bookkeeping buffer.
-
-There are lots of ways to cause long, skinny output but set a special triggers in your mind for cases where you have long, skinny input; turn an adjacency-listed graph into an edge-listed graph or otherwise FLATTEN bags of records on the Mapper side. In each of these cases, the later section (TODO: REF) will show you how to calculate it.
-
-(TODO: either here or later, talk about the surprising cases where you fill up MapRed scratch space or FS.S3.buffer.dir and the rest of the considerations about where to put this).
-
-
-==== Combiners
-
-It is a frequent case that the Reducer output is smaller than its input (and kind of annoying that the word “Reducer” was chosen, since it also frequently is not smaller). “Algebraic” aggregations such as COUNT, AVG and so forth, and many others can implement part of the Reducer operation on the Map side, greatly lessening the amount of data sent to the Reducer.
-
-Pig and Hive are written to use Combiners whenever generically appropriate. Applying a Combiner requires extra passes over your data on the Map side and so, in some cases, can themselves cost much more time than they save.
-
-If you ran a distinct operation over a data set with 50-percent duplicates, the Combiner is easily justified since many duplicate pairs will be eliminated early. If, however, only a tiny fraction of records are duplicated, only a disappearingly-tiny fraction will occur on the same Mapper, so you will have spent disk and CPU without reducing the data size.
-
-Whenever your Job Tracker output shows that Combiners are being applied, check that the Reducer input data is, in fact, diminished. (TODO: check which numbers show this) If Pig or Hive have guessed badly, disable the (TODO: name of setting) setting in Pig or the (TODO: name of setting) setting in Hive.
-
-==== Reducer Merge (aka Shuffle and Sort)
-
-We are now ready to dig into the stage with the most significant impact on job performance, the merge/sort performed by the Reducer before processing. In almost all the rest of the cases discussed here, an inefficient choice causes only a marginal impact on runtime. Bring down too much data on your Reducers, however, and you will find that, two hours into the execution of what you thought was a one-hour job, a handful of Reducers indicate they have four hours left to run.
-
-First, let’s understand what is going on and describe healthy execution; then, we will discuss various ways it can go wrong and how to address them.
-
-As you just saw, data arrives from the Mappers pre-sorted. The Reducer reads them from memory into its own sort buffers. Once a threshold (controlled by the (TODO: name of setting) setting) of data has been received, the Reducer commissions a new sort buffer and separately spills the data to disk, merge/sorting the Mapper chunks as it goes. (TODO: check that this first merge/sort happens on spill)
-
-Enough of these spills later (controlled by the (TODO: setting) setting), the Reducer begins merge/sorting the spills into a larger combined chunk. All of this activity is happening in parallel, so by the time the last Map task output is received, the typical healthy situation is to have a modest number of large sorted chunks and one small-ish chunk holding the dregs of the final spill. Once the number of chunks is below the (TODO: look up name of setting) threshold, the merge/sort is complete -- it does not need to fully merge the data into a single file onto disk. Instead, it opens an input stream onto each of those final chunks, consuming them in sort order.
-
-Notice that the Reducer flushes the last spill of received Map data to disk, then immediately starts reconsuming it. If the memory needs of your Reducer are modest, you can instruct Hadoop to use the sort buffer directly in the final merge, eliminating the cost and delay of that final spill. It is a nice marginal improvement when it works but if you are wrong about how modest your Reducer’s memory needs are, the negative consequences are high and if your Reducers have to perform multiple merge/sort passes, the benefits are insignificant.
-
-For a well-tested job heading to production that requires one or fewer merge/sort passes, you may judiciously (TODO: describe how to adjust this).
-
-(TODO: discuss buffer sizes here or in Brave and Foolish section)
-(TODO: there is another setting that I’m forgetting here - what is it?)
-
-Once your job has concluded, you can find the number of merge/sort passes by consulting the Reduce tasks counters (TODO: DL screenshot and explanation). During the job, however, the only good mechanism is to examine the Reducer logs directly. At some reasonable time after the Reducer has started, you will see it initiate spills to disk (TODO: tell what the log line looks like). At some later point, it will begin merge/sorting those spills (TODO: tell what the log line looks like).
-
-The CPU burden of a merge/sort is disappearingly small against the dominating cost of reading then writing the data to disk. If, for example, your job only triggered one merge/sort pass halfway through receiving its data, the cost of the merge/sort is effectively one and a half times the base cost of writing that data at top speed to disk: all of the data was spilled once, half of it was rewritten as merged output. Comparing the total size of data received by the Reducer to the merge/sort settings will let you estimate the expected number of merge/sort passes; that number, along with the “top speed” figure you collected above, will, in turn, allow you to estimate how long the Reduce should take. Much of this action happens in parallel but it happens in parallel with your Mapper’s mapping, spilling and everything else that is happening on the machine.
-
-A healthy, data-intensive job will have Mappers with nearly top speed throughput, the expected number of merge/sort passes and the merge/sort should conclude shortly after the last Map input is received. (TODO: tell what the log line looks like). In general, if the amount of data each Reducer receives is less than a factor of two to three times its share of machine RAM, (TODO: should I supply a higher-fidelity thing to compare against?) all those conditions should hold. Otherwise, consult the USE method (TODO: REF).
-
-If the merge/sort phase is killing your job’s performance, it is most likely because either all of your Reducers are receiving more data than they can accommodate or because some of your Reducers are receiving far more than their fair share. We will take the uniform distribution case first.
-
-The best fix to apply is to send less data to your Reducers. The chapters on writing Map/Reduce jobs (TODO: REF or whatever we are calling Chapter 5) and the chapter on advanced Pig (TODO: REF or whatever we are calling that now) both have generic recommendations for how to send around less data and throughout the book, we have described powerful methods in a domain-specific context which might translate to your problem.
-
-If you cannot lessen the data burden, well, the laws of physics and economics must be obeyed. The cost of a merge/sort is ‘O(N LOG N)’. In a healthy job, however, most of the merge/sort has been paid down by the time the final merge pass begins, so up to that limit, your Hadoop job should run in ‘O(N)’ time governed by its top speed.
-
-The cost of excessive merge passes, however, accrues directly to the total runtime of the job. Even though there are other costs that increase with the number of machines, the benefits of avoiding excessive merge passes are massive. A cloud environment makes it particularly easy to arbitrage the laws of physics against the laws of economics -- it costs the same to run 60 machines for two hours as it does to run ten machines for 12 hours, as long as your runtime stays roughly linear with the increased number of machines, you should always size your cluster to your job, not the other way around. The thresholding behavior of excessive reduces makes it exceptionally valuable to do so. This is why we feel exploratory data analytics is far more efficiently done in an elastic cloud environment, even given the quite significant performance hit you take. Any physical cluster is too large and also too small; you are overpaying for your cluster overnight while your data scientists sleep and you are overpaying your data scientists to hold roller chair sword fights while their undersized cluster runs. Our rough rule of thumb is to have not more than 2-3 times as much total reducer data as you have total child heap size on all the reducer machines you'll use.
-
-(TODO: complete)
-
-==== Skewed Data and Stuck Reducers
-
-
-
-(TODO: complete)
-
-==== Reducer Processing
-
-(TODO: complete)
-
-==== Commit and Replication
-
-(TODO: complete)
-
-
-=== Top-line Performance/Sanity Checks
-
-* The first wave of Mappers should start simultaneously.
-* In general, all a job’s full block Map attempts should take roughly the same amount of time.
-* The full map phase should take around ‘average_Map_task_time*(number_of_Map_tasks/number_of_Mapper_slots+1)’
-* Very few non-local Map tasks.
-* Number of spills equals number of Map tasks (unless there are Combiners).
-* If there are Combiners, the Reducer input data should be much less than the Mapper output data (TODO: check this).
-* Record counts and data sizes for Mapper input, Reducer input and Reducer output should correspond to your conception of what the job is doing.
-* Map tasks are full speed (data rate matches your measured baseline)
-* Most Map tasks process a full block of data.
-* Processing stage of Reduce attempts should be full speed.
-* Not too many Merge passes in Reducers.
-* Shuffle and sort time is explained by the number of Merge passes.
-* Commit phase should be brief.
-* Total job runtime is not much more than the combined Map phase and Reduce phase runtimes.
-* Reducers generally process the same amount of data.
-* Most Reducers process at least enough data to be worth it.
-*
-
-// ____________________________________
-
-=== Performance Comparison Worksheet
-
-(TODO: DL Make a table comparing performance baseline figures on AWS and fixed hardware. reference clusters.)
+If you use a cluster-local HDFS in the way described, that is, it holds no gold data, only development and checkpoint artifacts, _______________ (TODO: fill in). Provision your HDFS to use EBS volumes, not the local (ephemeral) ones. EBS volumes surprisingly offer the same or better throughput as local ones and allow you to snapshot a volume in use, or even kill all the compute instances attached to those volumes then reattach them to a later incarnation of the cluster. (FOOTNOTE: This does require careful coordination. Our open-source Iron-Fan framework has all the code required to do so.) Since the EBS volumes have significant internal redundancy, it then becomes safe to run a replication factor of 2 or even 1. For many jobs, the portion of the commit stage waiting for all DataNodes to acknowledge replication can become a sizable portion of the time it takes a Map/Reduce stage to complete. Do this only if you’re an amateur with low stakes or a professional whose colleagues embrace these tradeoffs; nobody ever got fired for using a replication factor of 3.
+As your S3 usage grows --- certainly if you find you have more than, say, a dozen terabytes of data not in monthly use -- it’s worth marking that data for storage in Glacier, not S3 (you can only do this, of course, if you’re using the `s3n` facade). There’s a charge for migrating data and, of course, your time is valuable, but the savings can be enormous.
View
0 21d-hadoop_internals-map_reduce.asciidoc → 21b-hadoop_internals-map_reduce.asciidoc
File renamed without changes.
View
63 21c-hadoop_internals-hdfs.asciidoc
@@ -1,63 +0,0 @@
-==== Skeleton: Hadoop Internals
-
-===== HDFS
-
-* Lifecycle of a File:
- * What happens as the Namenode and Datanode collaborate to create a new file.
- * How that file is replicated to acknowledged by other Datanodes.
- * What happens when a Datanode goes down or the cluster is rebalanced.
- * Briefly, the S3 DFS facade // (TODO: check if HFS?).
-
-// Before publication, please check that this will be chapter 17 //
-
-For 16 chapters now, we’ve been using Hadoop and Storm+Trident from the outside.
-
-(NOTE: something needs to go here) ____________________ (TODO: fill in)
-
-The biggest key to writing efficient dataflows is to understand the interface and fundamental patterns of use, not the particulars of how these framework executes the dataflow. However, even the strongest abstractions pushed far enough can leak, and so at some point, it’s important to understand these internal details. These next few chapters concentrate on equipping you to understand your jobs’ performance and practical tips for improving it; if you’re looking for more, by far, the best coverage of this material is found in (TODO: Add links Tom White’s Hadoop: The Definitive Guide and Eric Sammer’s Hadoop Operations).
-
-Let’s first focus on the internals of Hadoop.
-
-=== HDFS (NameNode and DataNode)
-
-It’s time to learn how the HDFS stores your data; how the Map/Reduce framework launches and coordinates job attempts; and what happens within the framework as your Map/Reduce process executes.
-
-The HDFS provides three remarkable guarantees: durability (your data is never lost or corrupted), availability (you can always access it) and efficiency (you can consume the data at high rate especially from Map/Reduce jobs and other clients). The center of action, as you might guess, is the NameNode. The NameNode is a permanently running daemon process that tracks the location of every block on the network of DataNodes, along with its name and other essential metadata. (If you’re familiar with the File Allocation Table (FAT) of a traditional file system, it’s like a more active version of that. FOOTNOTE: [If you’re not familiar with what an FAT is, then it’s like the system you’re reading about but for a file system.])
-
-(NOTE: check something … appending)
-
-When a client wishes to create a file, it contacts the NameNode with the desired path and high-level metadata. The NameNode records that information in an internal table and identifies the DataNodes that will hold the data. The NameNode then replies with that set of DataNodes, identifying one of them as the initial point of contact. (When we say "client", that’s anything accessing the NameNode, whether it’s a Map/Reduce job, one of the Hadoop filesystem commands or any other program.) The file is now exclusively available to the client for writing but will remain invisible to anybody else until the write has concluded (TODO: Is it when the first block completes or when the initial write completes?).
-
-Within the client’s request, it may independently prescribe a replication factor, file permissions, and block size _____________ (TODO: fill in)
-
-The client now connects to the indicated DataNode and begins sending data. At the point you’ve written a full block’s worth of data, the DataNode transparently finalizes that block and begins another (TODO: check that it’s the DataNode that does this). As it finishes each block or at the end of the file, it independently prepares a checksum of that block, radioing it back to the NameNode and begins replicating its contents to the other DataNodes. (TODO: Is there an essential endoffile ritual?) This is all transparent to the client, who is able to send data as fast as it can cram it through the network pipe.
-
-Once you’ve created a file, its blocks are immutable -- as opposed to a traditional file system, there is no mechanism for modifying its internal contents. This is not a limitation; it’s a feature. Making the file system immutable not only radically simplifies its implementation, it makes the system more predictable operationally and simplifies client access. For example, you can have multiple jobs and clients access the same file knowing that a client in California hasn’t modified a block being read in Tokyo (or even worse, simultaneously modified by someone in Berlin). (TODO: When does append become a thing?)
-
-The end of the file means the end of its data but not the end of the story. At all times, the DataNode periodically reads a subset of its blocks to find their checksums and sends a "heartbeat" back to the DataNode with the (hopefully) happy news. ____________ (TODO: fill in). There are several reasons a NameNode will begin replicating a block. If a DataNode’s heartbeat reports an incorrect block checksum, the NameNode will remove that DataNode from the list of replica holders for that block, triggering its replication from one of the remaining DataNodes from that block. If the NameNode has not received a heartbeat from a given DataNode within the configured timeout, it will begin replicating all of that DataNode’s blocks; if that DataNode comes back online, the NameNode calmly welcomes it back into the cluster, cancelling replication of the valid blocks that DataNode holds. Furthermore, if the amount of data on the most populated and least populated DataNodes becomes larger than a certain threshold or the replication factor for a file is increased, it will rebalance; you can optionally trigger one earlier using the `hadoop balancer` command.
-
-However it’s triggered, there is no real magic; one of the valid replica-holder DataNodes sends the block contents to the new replica holder, which heartbeats back the block once received. (TODO: Check details)
-
-As you can see, the NameNode and its metadata are at the heart of every HDFS story. This is why the new HighAvailability (HA) NameNode feature in recent versions is so important and should be used in any production installation. It’s even more important, as well, to protect and backup the NameNode’s metadata, which, unfortunately, many people don’t know to do. (TODO: Insert notes on NameNode metadata hygiene previously written).
-
-The NameNode selects the recipient DataNodes with some intelligence. Information travels faster among machines on the same switch, switches within the same data center and so forth. However, if a switch fails, all its machines are unavailable and if a data center fails, all its switches are unavailable and so forth. When you configure Hadoop, there is a setting that allows you to tell it about your network hierarchy. If you do so, the NameNode will ensure the first replica lies within the most distant part of the hierarchy -- durability is important above all else. All further replicas will be stored within the same rack -- providing the most efficient replication. (TODO: Mention the phrase "rack aware.") As permitted within that scheme, it also tries to ensure that the cluster is balanced -- preferring DataNodes with less data under management. (TODO: Check that it’s amount of data not percent free)
-
-(TODO: Does it make contact on every block or at the start of a file?)
-
-The most important feature of the HDFS is that it is highly resilient against failure of its underlying components. Any system achieves resiliency through four mechanisms: Act to prevent failures; insulate against failure through redundancy; isolate failure within independent fault zones; and lastly, detect and remediate failures rapidly. (FOOTNOTE: This list is due to James Hamilton, TODO: link whose blocks and papers are essential reading). The HDFS is largely insulated from failure by using file system-based access (it does not go behind the back of the operating system), by being open source (ensuring code is reviewed by thousands of eyes and run at extremely high scale by thousands of users), and so forth. Failure above the hardware level is virtually unheard of. The redundancy is provided by replicating the data across multiple DataNodes and, in recent versions, using the Zookeeper-backed HighAvailability NameNode implementation.
-
-The rack awareness, described above, isolates failure using your defined network hierarchy, and at the semantic level, independently for each HDFS block. Lastly, the heartbeat and checksum mechanisms along with its active replication and monitoring hooks allow it and its Operators to detect intermediate faults.
-
-==== S3 File System
-
-The Amazon EC2 Cloud provides an important alternative to the HDFS, its S3 object store. S3 transparently provides multi-region replication far exceeding even HDFS’ at exceptionally low cost (at time of writing, about $80 per terabyte per month, and decreasing at petabyte and higher scale). What’s more, its archival datastore solution, Glacier, will hold rarely-accessed data at one-tenth that price and even higher durability. (FOOTNOTE: The quoted durability figure puts the engineering risk below, say, the risk of violent overthrow of our government). For machines in the Amazon Cloud with a provisioned connection, the throughput to and from S3 is quite acceptable for Map/Reduce use.
-
-Hadoop has a built-in facade for the S3 file system, and you can do more or less all the things you do with an HDFS: list, put and get files; run Map/Reduce jobs to and from any combination of HDFS and S3; read and create files transparently using Hadoop’s standard file system API. There are actually two facades. The `s3hdfs` facade (confusingly labeled as plain `s3` by Hadoop but we will refer to it here as `s3hdfs`) stores blocks in individual files using the same checksum format as on a DataNode and stores the Name Node-like metadata separately in a reserved area. The `s3n` facade, instead, stores a file as it appears to the Hadoop client, entirely in an `s3` object with a corresponding path. When you visit S3 using Amazon’s console or any other standard S3 client, you’ll see a file called `/my/data.txt` as an object called `datadoc.txt` in `MyContainer` and its contents are immediately available to any such client; that file, written `s3hdfs` will appear in objects named for 64-bit identifiers like `0DA37f...` and with uninterpretable contents. However, `s3n` cannot store an individual file larger than 5 terabytes. The `s3hdfs` blocks minorly improve Map/Reduce efficiency and can store files of arbitrary size. All in all, we prefer the `s3n` facade; the efficiency improvement for the robots does not make up for the impact on convenience on the humans using the system and that it’s a best-practice to not make individual files larger than 1 terabyte any way.
-
-The universal availability of client libraries makes S3 a great hand-off point to other systems or other people looking to use the data. We typically use a combination of S3, HDFS and Glacier in practice. Gold data -- anything one project produces that another might use -- is stored on S3. In production runs, jobs read their initial data from S3 and write their final data to S3 but use an HDFS local to all its compute nodes for any intermediate checkpoints.
-
-When developing a job, we run an initial `distcp` from S3 onto the HDFS and do all further development using the cluster-local HDFS. The cluster-local HDFS provides better (but not earth-shakingly better) Map/Reduce performance. It is, however, noticeably faster in interactive use (file system commands, launching jobs, etc). Applying the "robots are cheap, humans are important" rule easily justifies the maintenance of the cluster-local HDFS.
-
-If you use a cluster-local HDFS in the way described, that is, it holds no gold data, only development and checkpoint artifacts, _______________ (TODO: fill in). Provision your HDFS to use EBS volumes, not the local (ephemeral) ones. EBS volumes surprisingly offer the same or better throughput as local ones and allow you to snapshot a volume in use, or even kill all the compute instances attached to those volumes then reattach them to a later incarnation of the cluster. (FOOTNOTE: This does require careful coordination. Our open-source Iron-Fan framework has all the code required to do so.) Since the EBS volumes have significant internal redundancy, it then becomes safe to run a replication factor of 2 or even 1. For many jobs, the portion of the commit stage waiting for all DataNodes to acknowledge replication can become a sizable portion of the time it takes a Map/Reduce stage to complete. Do this only if you’re an amateur with low stakes or a professional whose colleagues embrace these tradeoffs; nobody ever got fired for using a replication factor of 3.
-
-As your S3 usage grows --- certainly if you find you have more than, say, a dozen terabytes of data not in monthly use -- it’s worth marking that data for storage in Glacier, not S3 (you can only do this, of course, if you’re using the `s3n` facade). There’s a charge for migrating data and, of course, your time is valuable, but the savings can be enormous.
View
287 22-hadoop_tuning.asciidoc
@@ -1,3 +1,290 @@
[[hadoop_tuning]]
== Hadoop Tuning
+
+One of the wonderful and terrible things about Hadoop (or anything else at Big Data scale) is that there are very few boundary cases for performance optimization. If your dataflow has the wrong shape, it is typically so catastrophically inefficient as to be unworkable. Otherwise, Hadoop’s scalability makes the price of simply throwing more hardware at the problem competitive with investing to optimize it, especially for exploratory analytics. That’s why the repeated recommendation of this book is: Get the algorithm right, get the contextability right and size your cluster to your job.
+
+When you begin to productionize the results of all your clever work, it becomes valuable to look for these 30-percent, 10-percent improvements. Debug loop time is also important, though, so it is useful to get a feel for when and why early optimization is justified.
+
+The first part of this chapter discusses Tuning for the Wise and Lazy -- showing you how to answer the question, “Is my job slow and should I do anything about it?” Next, we will discuss Tuning for the Brave and Foolish, diving into the details of Hadoop’s maddeningly numerous, often twitchy, configuration knobs. There are low-level setting changes that can dramatically improve runtime; we will show you how to recognize them and how to determine what per-job overrides to apply. Lastly, we will discuss a formal approach for diagnosing the health and performance of your Hadoop cluster and your Hadoop jobs to help you confidently and efficiently identify the source of deeper flaws or bottlenecks.
+
+
+=== Chimpanzee and Elephant: A Day at Work ===
+
+Each day, the chimpanzee's foreman, a gruff silverback named J.T., hands each chimp the day's translation manual and a passage to translate as they clock in. Throughout the day, he also coordinates assigning each block of pages to chimps as they signal the need for a fresh assignment.
+
+Some passages are harder than others, so it's important that any elephant can deliver page blocks to any chimpanzee -- otherwise you'd have some chimps goofing off while others are stuck translating _King Lear_ into Kinyarwanda. On the other hand, sending page blocks around arbitrarily will clog the hallways and exhaust the elephants.
+
+The elephants' chief librarian, Nanette, employs several tricks to avoid this congestion.
+
+Since each chimpanzee typically shares a cubicle with an elephant, it's most convenient to hand a new page block across the desk rather then carry it down the hall. J.T. assigns tasks accordingly, using a manifest of page blocks he requests from Nanette. Together, they're able to make most tasks be "local".
+
+Second, the page blocks of each play are distributed all around the office, not stored in one book together. One elephant might have pages from Act I of _Hamlet_, Act II of _The Tempest_, and the first four scenes of _Troilus and Cressida_ footnote:[Does that sound complicated? It is -- Nanette is able to keep track of all those blocks, but if she calls in sick, nobody can get anything done. You do NOT want Nanette to call in sick.]. Also, there are multiple 'replicas' (typically three) of each book collectively on hand. So even if a chimp falls behind, JT can depend that some other colleague will have a cubicle-local replica. (There's another benefit to having multiple copies: it ensures there's always a copy available. If one elephant is absent for the day, leaving her desk locked, Nanette will direct someone to make a xerox copy from either of the two other replicas.)
+
+Nanette and J.T. exercise a bunch more savvy optimizations (like handing out the longest passages first, or having folks who finish early pitch in so everyone can go home at the same time, and more). There's no better demonstration of power through simplicity.
+
+=== Brief Anatomy of a Hadoop Job ===
+
+We'll go into much more detail in (TODO: ref), but here are the essentials of what you just performed.
+
+==== Copying files to the HDFS ====
+
+When you ran the `hadoop fs -mkdir` command, the Namenode (Nanette's Hadoop counterpart) simply made a notation in its directory: no data was stored. If you're familiar with the term, think of the namenode as a 'File Allocation Table (FAT)' for the HDFS.
+
+When you run `hadoop fs -put ...`, the putter process does the following for each file:
+
+1. Contacts the namenode to create the file. This also just makes a note of the file; the namenode doesn't ever have actual data pass through it.
+2. Instead, the putter process asks the namenode to allocate a new data block. The namenode designates a set of datanodes (typically three), along with a permanently-unique block ID.
+3. The putter process transfers the file over the network to the first data node in the set; that datanode transfers its contents to the next one, and so forth. The putter doesn't consider its job done until a full set of replicas have acknowledged successful receipt.
+4. As soon as each HDFS block fills, even if it is mid-record, it is closed; steps 2 and 3 are repeated for the next block.
+
+==== Running on the cluster ====
+
+Now let's look at what happens when you run your job.
+
+(TODO: verify this is true in detail. @esammer?)
+
+* _Runner_: The program you launch sends the job and its assets (code files, etc) to the jobtracker. The jobtracker hands a `job_id` back (something like `job_201204010203_0002` -- the datetime the jobtracker started and the count of jobs launched so far); you'll use this to monitor and if necessary kill the job.
+* _Jobtracker_: As tasktrackers "heartbeat" in, the jobtracker hands them a set of 'task's -- the code to run and the data segment to process (the "split", typically an HDFS block).
+* _Tasktracker_: each tasktracker launches a set of 'mapper child processes', each one an 'attempt' of the tasks it received. (TODO verify:) It periodically reassures the jobtracker with progress and in-app metrics.
+* _Jobtracker_: the Jobtracker continually updates the job progress and app metrics. As each tasktracker reports a complete attempt, it receives a new one from the jobtracker.
+* _Tasktracker_: after some progress, the tasktrackers also fire off a set of reducer attempts, similar to the mapper step.
+* _Runner_: stays alive, reporting progress, for the full duration of the job. As soon as the job_id is delivered, though, the Hadoop job itself doesn't depend on the runner -- even if you stop the process or disconnect your terminal the job will continue to run.
+
+[WARNING]
+===============================
+Please keep in mind that the tasktracker does _not_ run your code directly -- it forks a separate process in a separate JVM with its own memory demands. The tasktracker rarely needs more than a few hundred megabytes of heap, and you should not see it consuming significant I/O or CPU.
+===============================
+
+=== Chimpanzee and Elephant: Splits ===
+
+I've danced around a minor but important detail that the workers take care of. For the Chimpanzees, books are chopped up into set numbers of pages -- but the chimps translate _sentences_, not pages, and a page block boundary might happen mid-sentence.
+//// Provide a real world analogous example here to help readers correlate this story to their world and data analysis needs, "...This example is similar to..." Amy////
+
+The Hadoop equivalent of course is that a data record may cross and HDFS block boundary. (In fact, you can force map-reduce splits to happen anywhere in the file, but the default and typically most-efficient choice is to split at HDFS blocks.)
+
+A mapper will skip the first record of a split if it's partial and carry on from there. Since there are many records in each split, that's no big deal. When it gets to the end of the split, the task doesn't stop processing until is completes the current record -- the framework makes the overhanging data seamlessley appear.
+//// Again, here, correlate this example to a real world scenario; "...so if you were translating x, this means that..." Amy////
+
+In practice, Hadoop users only need to worry about record splitting when writing a custom `InputFormat` or when practicing advanced magick. You'll see lots of reference to it though -- it's a crucial subject for those inside the framework, but for regular users the story I just told is more than enough detail.
+
+
+=== Tuning For The Wise and Lazy
+
+Before tuning your job, it is important to first understand whether it is slow and if so, where the bottleneck arises. Generally speaking, your Hadoop job will either be CPU-bound within your code, memory-bound within your code, bound by excessive Reducer data (causing too many merge passes) or throughput-bound within the framework or underlying system. (In the rare cases it is not one of those, you would next apply the more comprehensive “USE method” (TODO: REF) described later in this chapter.) But for data-intensive jobs, the fundamental upper bound on Hadoop’s performance is its effective throughput from disk. You cannot make your job process data faster than it can load it.
+
+Here is the conceptual model we will use: Apart from necessary overhead, job coordination and so forth, a Hadoop job must:
+
+* Streams data to the Mapper, either locally from disk or remotely over the network;
+* Runs that data through your code;
+* Spills the midstream data to disk one or more times;
+* Applies combiners, if any;
+* Merge/Sorts the spills and sends the over the network to the Reducer.
+
+The Reducer:
+
+* Writes each Mapper’s output to disk;
+* Performs some number of Merge/Sort passes;
+* Reads the data from disk;
+* Runs that data through your code;
+* Writes its output to the DataNode, which writes that data once to disk and twice through the network;
+* Receives two other Reducers’ output from the network and writes those to disk.
+
+Hadoop is, of course, pipelined; to every extent possible, those steps are happening at the same time. What we will do, then, is layer in these stages one by one, at each point validating that your job is as fast as your disk until we hit the stage where it is not.
+
+==== Fixed Overhead
+
+The first thing we want is a job that does nothing; this will help us understand the fixed overhead costs. Actually, what we will run is a job that does almost nothing; it is useful to know that your test really did run.
+
+----
+(TODO: Disable combining splits)
+Load 10_tiny_files
+Filter most of it out
+Store to disk
+
+(TODO: Restrict to 50 Mapper slots or rework)
+Load 10000_tiny_files
+Filter most of it out
+Store to disk
+----
+(TODO: is there a way to limit the number of Reduce slots in Pig? Otherwise, revisit the below.)
+
+In (TODO: REF), there is a performance comparison worksheet that you should copy and fill in as we go along. It lists the performance figures for several reference clusters on both cloud and dedicated environments for each of the tests we will perform. If your figures do not compare well with the appropriate reference cluster, it is probably worthwhile adjusting the overall configuration. Assuming your results are acceptable, you can tape the worksheet to your wall and use it to baseline all the jobs you will write in the future. The rest of the chapter will assume that your cluster is large enough to warrant tuning but not grossly larger than the largest reference cluster.
+
+If you run the Pig script above (TODO: REF), Hadoop will execute two jobs: one with 10 Mappers and no Reducers and another with 10,000 Mappers and no Reducers. From the Hadoop Job Tracker page for your job, click on the link showing the number of Map tasks to see the full task listing. All 10 tasks for the first job should have started at the same time and uniformly finished a few seconds after that. Back on the main screen, you should see that the total job completion time was more or less identical to that of the slowest Map task.
+
+The second job ran its 10,000 Map tasks through a purposefully restricted 50 Mapper slots -- so each Mapper slot will have processed around 200 files. If you click through to the Task listing, the first wave of tasks should start simultaneously and all of them should run in the same amount of time that the earlier test did.
+
+(TODO: show how to find out if one node is way slower)
+
+Even in this trivial case, there is more variance in launch and runtimes than you might first suspect (if you don't, you definitely will in the next -- but for continuity, we will discuss it here). If that splay -- the delay between the bulk of jobs finishing and the final job finishing -- is larger than the runtime of a typical task, however, it may indicate a problem, but as long as it is only a few seconds, don’t sweat it. If you are interested in a minor but worth-it tweak, adjust the `mapred.job.reuse.jvm.num.tasks` setting to ‘-1’, which causes each Mapper to use the same child process JVM for its attempts, eliminating the brief but noticeable JVM startup time. If you are writing your own native Java code, you might know a reason to go with the default (disabled) setting but it is harmless for any well-behaved program.
+
+On the Job screen, you should see that the total runtime for the job was about 200 times slower for the second job than the first and not much more than 200 times the typical task’s runtime; if not, you may be putting pressure on the Job Tracker. Rerun your job and watch the Job Tracker’s heap size; you would like the Job Tracker heap to spend most of its life below, say 50-percent, so if you see it making any significant excursions toward 100-percent, that would unnecessarily impede cluster performance. The 1 GB out-of-the-box setting is fairly small; for production use we recommend at least 3 GB of heap on a dedicated machine with at least 7 GB total RAM.
+
+If the Job coordination overhead is unacceptable but the Job Tracker heap is not to blame, a whole host of other factors might be involved; apply the USE method, described (TODO: REF).
+
+=== Mapper Input
+
+Now that we’ve done almost nothing, let’s do almost something -- read in a large amount of data, writing just enough to disk to know that we really were there.
+
+----
+Load 100 GB from disk
+Filter all but 100 MB
+Store it to disk
+----
+
+Run that job on the 100-GB GitHub archive dataset. (TODO: Check that it will do speculative execution.) Once the job completes, you will see as many successful Map tasks as there were HDFS blocks in the input; if you are running a 128-MB block size, this will be about (TODO: How many blocks are there?).
+
+Again, each Map task should complete in a uniform amount of time and the job as a whole should take about ‘length_of_Map_task*number_of_Map_tasks=number_of_Mapper_slots’. The Map phase does not end until every Mapper task has completed and, as we saw in the previous example, even in typical cases, there is some amount of splay in runtimes.
+
+(TODO: Move some of JT and Nanette’s optimizations forward to this chapter). Like the chimpanzees at quitting time, the Map phase cannot finish until all Mapper tasks have completed.
+
+You will probably notice a half-dozen or so killed attempts as well. The ‘TODO: name of speculative execution setting’, which we recommend enabling, causes Hadoop to opportunistically launch a few duplicate attempts for the last few tasks in a job. The faster job cycle time justifies the small amount of duplicate work.
+
+Check that there are few non-local Map tasks -- Hadoop tries to assign Map attempts (TODO: check tasks versus attempts) to run on a machine whose DataNode holds that input block, thus avoiding a trip across the network (or in the chimpanzees’ case, down the hallway). It is not that costly, but if you are seeing a large number of non-local tasks on a lightly-loaded cluster, dig deeper.
+
+Dividing the average runtime by a full block of Map task by the size of an HDFS block gives you the Mapper’s data rate. In this case, since we did almost nothing and wrote almost nothing, that value is your cluster’s effective top speed. This has two implications: First, you cannot expect a data-intensive job to run faster than its top speed. Second, there should be apparent reasons for any job that runs much slower than its top speed. Tuning Hadoop is basically about making sure no other part of the system is slower than the fundamental limit at which it can stream from disk.
+
+While setting up your cluster, it might be worth baselining Hadoop’s top speed against the effective speed of your disk and your network. Follow the instructions for the ‘scripts/baseline_performance’ script (TODO: write script) from the example code above. It uses a few dependable user-level processes to measure the effective data rate to disk (‘DD’ and ‘CP’) and the effective network rate (‘NC’ and ‘SCP’). (We have purposely used user-level processes to account for system overhead; if you want to validate that as well, use a benchmark like Bonnie++ (TODO: link)). If you are dedicated hardware, the network throughput should be comfortably larger than the disk throughput. If you are on cloud machines, this, unfortunately, might not hold but it should not be atrociously lower.
+
+If the effective top speed you measured above is not within (TODO: figure out healthy percent) percent, dig deeper; otherwise, record each of these numbers on your performance comparison chart.
+
+If you're setting up your cluster, take the time to generate enough additional data to keep your cluster fully saturated for 20 or more minutes and then ensure that each machine processed about the same amount of data. There is a lot more variance in effective performance among machines than you might expect, especially in a public cloud environment; it can also catch a machine with faulty hardware or setup. This is a crude but effective benchmark, but if you're investing heavily in a cluster consider running a comprehensive benchmarking suite on all the nodes -- the chapter on Stupid Hadoop Tricks shows how (TODO ref)
+
+=== The Many Small Files Problem
+
+One of the most pernicious ways to impair a Hadoop cluster’s performance is the “many-small-files” problem. With a 128-MB block size (which we will assume for the following discussion), a 128-MB file takes one block (obviously), a 1-byte file takes one block and a 128-MB+1 byte file takes two blocks, one of them full, the other with one solitary byte.
+
+Storing 10 GB of data in, say, 100 files is harmless -- the average block occupancy is a mostly-full 100 MB. Storing that same 10GB in say 10,000 files is, however, harmful in several ways. At the heart of the Namenode is a table that lists every file and block. As you would expect, the memory usage of that table roughly corresponds to the number of files plus the number of blocks, so the many-small-files example uses about 100 times as much memory as warranted. Engage in that bad habit often enough and you will start putting serious pressure on the Namenode heap and lose your job shortly thereafter. What is more, the many-small-files version will require 10,000 Map tasks, causing memory pressure on the Job Tracker and a job whose runtime is dominated by task overhead. Lastly, there is the simple fact that working with 10,000 things is more annoying than working with 100 -- it takes up space in datanode heartbeats, client requests, your terminal screen and your head.
+
+Causing this situation is easier to arrive at than you might expect; in fact, you just did so. The 100-GB job you just ran most likely used 800 Mapper slots yet output only a few MB of data. Any time your mapper output is significantly smaller than its input -- for example, when you apply a highly-restrictive filter to a large input -- your output files will have poor occupancy.
+
+A sneakier version of this is a slightly “expansive” Mapper-Only job. A job whose Mappers turned a 128-MB block into, say, 150 MB of output data would reduce the block occupancy by nearly half and require nearly double the Mapper slots in the following jobs. Done once, that is merely annoying but in a workflow that iterates or has many stages, the cascading dilution could become dangerous.
+
+You can audit your HDFS to see if this is an issue using the ‘hadoop fsck [directory]’ command. Running that command against the directory holding the GitHub data should show 100 GB of data in about 800 blocks. Running it against your last job’s output should show only a few MB of data in an equivalent number of blocks.
+
+You can always distill a set of files by doing ‘group_by’ with a small number of Reducers using the record itself as a key. Pig and Hive both have settings to mitigate the many-small-files problem. In Pig, the (TODO: find name of option) setting will feed multiple small files to the same Mapper; in Hive (TODO: look up what to do in Hive). In both cases, we recommend modifying your configuration to make that the default and disable it on a per-job basis when warranted.
+
+=== Midstream Data
+
+Now let’s start to understand the performance of a proper Map/Reduce job. Run the following script, again, against the 100 GB GitHub data.
+
+----
+Parallel 50
+Disable optimizations for pushing up filters and for Combiners
+Load 100 GB of data
+Group by record itself
+Filter out almost everything
+Store data
+----
+
+The purpose of that job is to send 100 GB of data at full speed through the Mappers and midstream processing stages but to do almost nothing in the Reducers and write almost nothing to disk. To keep Pig from “helpfully” economizing the amount of midstream data, you will notice in the script we disabled some of its optimizations. The number of Map tasks and their runtime should be effectively the same as in the previous example, and all the sanity checks we’ve given so far should continue to apply. The overall runtime of the Map phase should only be slightly longer (TODO: how much is slightly?) than in the previous Map-only example, depending on how well your network is able to outpace your disk.
+
+It is an excellent idea to get into the habit of predicting the record counts and data sizes in and out of both Mapper and Reducer based on what you believe Hadoop will be doing to each record and then comparing to what you see on the Job Tracker screen. In this case, you will see identical record counts for Mapper input, Mapper output and Reducer input and nearly identical data sizes for HDFS bytes read, Mapper output, Mapper file bytes written and Reducer input. The reason for the small discrepancies is that, for the file system metrics, Hadoop is recording everything that is read or written, including logged files and so forth.
+
+Midway or so through the job -- well before the finish of the Map phase -- you should see the Reducer tasks start up; their eagerness can be adjusted using the (TODO: name of setting) setting. By starting them early, the Reducers are able to begin merge/sorting the various Map task outputs in parallel with the Map phase. If you err low on this setting, you will disappoint your coworkers by consuming Reducer slots with lots of idle time early but that is better than starting them too late, which will sabotage parallels.
+
+Visit the Reducer tasks listing. Each Reducer task should have taken a uniform amount of time, very much longer than the length of the Map tasks. Open a few of those tasks in separate browser tabs and look at their counters; each should have roughly the same input record count and data size. It is annoying that this information is buried as deeply as it is because it is probably the single most important indicator of a flawed job; we will discuss it in detail a bit later on.
+
+==== Spills
+
+First, though, let’s finish understanding the data’s detailed journey from Mapper to Reducer. As a Map task outputs records, Hadoop sorts them in the fixed-size io.sort buffer. Hadoop files records into the buffer in partitioned, sorted order as it goes. When that buffer fills up (or the attempt completes), Hadoop begins writing to a new empty io.sort buffer and, in parallel, “spills” that buffer to disk. As the Map task concludes, Hadoop merge/sorts these spills (if there were more than one) and sends the sorted chunks to each Reducer for further merge/sorting.
+
+The Job Tracker screen shows the number of Mapper spills. If the number of spills equals the number of Map tasks, all is good -- the Mapper output is checkpointed to disk before being dispatched to the Reducer. If the size of your Map output data is large, having multiple spills is the natural outcome of using memory efficiently; that data was going to be merge/sorted anyway, so it is a sound idea to do it on the Map side where you are confident it will have a uniform size distribution.
+
+(TODO: do combiners show as multiple spills?)
+
+What you hate to see, though, are Map tasks with two or three spills. As soon as you have more than one spill, the data has to be initially flushed to disk as output, then read back in full and written again in full for at least one merge/sort pass. Even the first extra spill can cause roughly a 30-percent increase in Map task runtime.
+
+There are two frequent causes of unnecessary spills. First is the obvious one: Mapper output size that slightly outgrows the io.sort buffer size. We recommend sizing the io.sort buffer to comfortably accommodate Map task output slightly larger than your typical HDFS block size -- the next section (TODO: REF) shows you how to calculate. In the significant majority of jobs that involve a Reducer, the Mapper output is the same or nearly the same size -- JOINs or GROUPs that are direct, are preceded by a projection or filter or have a few additional derived fields. If you see many of your Map tasks tripping slightly over that limit, it is probably worth requesting a larger io.sort buffer specifically for your job.
+
+There is also a disappointingly sillier way to cause unnecessary spills: The io.sort buffer holds both the records it will later spill to disk and an index to maintain the sorted order. An unfortunate early design decision set a fixed size on both of those with fairly confusing control knobs. The ‘iosortrecordpercent’ (TODO: check name of setting) setting gives the size of that index as a fraction of the sort buffer. Hadoop spills to disk when either the fraction devoted to records or the fraction devoted to the index becomes full. If your output is long and skinny, cumulatively not much more than an HDFS block but with a typical record size smaller than, say, 100 bytes, you will end up spilling multiple small chunks to disk when you could have easily afforded to increase the size of the bookkeeping buffer.
+
+There are lots of ways to cause long, skinny output but set a special triggers in your mind for cases where you have long, skinny input; turn an adjacency-listed graph into an edge-listed graph or otherwise FLATTEN bags of records on the Mapper side. In each of these cases, the later section (TODO: REF) will show you how to calculate it.
+
+(TODO: either here or later, talk about the surprising cases where you fill up MapRed scratch space or FS.S3.buffer.dir and the rest of the considerations about where to put this).
+
+
+==== Combiners
+
+It is a frequent case that the Reducer output is smaller than its input (and kind of annoying that the word “Reducer” was chosen, since it also frequently is not smaller). “Algebraic” aggregations such as COUNT, AVG and so forth, and many others can implement part of the Reducer operation on the Map side, greatly lessening the amount of data sent to the Reducer.
+
+Pig and Hive are written to use Combiners whenever generically appropriate. Applying a Combiner requires extra passes over your data on the Map side and so, in some cases, can themselves cost much more time than they save.
+
+If you ran a distinct operation over a data set with 50-percent duplicates, the Combiner is easily justified since many duplicate pairs will be eliminated early. If, however, only a tiny fraction of records are duplicated, only a disappearingly-tiny fraction will occur on the same Mapper, so you will have spent disk and CPU without reducing the data size.
+
+Whenever your Job Tracker output shows that Combiners are being applied, check that the Reducer input data is, in fact, diminished. (TODO: check which numbers show this) If Pig or Hive have guessed badly, disable the (TODO: name of setting) setting in Pig or the (TODO: name of setting) setting in Hive.
+
+==== Reducer Merge (aka Shuffle and Sort)
+
+We are now ready to dig into the stage with the most significant impact on job performance, the merge/sort performed by the Reducer before processing. In almost all the rest of the cases discussed here, an inefficient choice causes only a marginal impact on runtime. Bring down too much data on your Reducers, however, and you will find that, two hours into the execution of what you thought was a one-hour job, a handful of Reducers indicate they have four hours left to run.
+
+First, let’s understand what is going on and describe healthy execution; then, we will discuss various ways it can go wrong and how to address them.
+
+As you just saw, data arrives from the Mappers pre-sorted. The Reducer reads them from memory into its own sort buffers. Once a threshold (controlled by the (TODO: name of setting) setting) of data has been received, the Reducer commissions a new sort buffer and separately spills the data to disk, merge/sorting the Mapper chunks as it goes. (TODO: check that this first merge/sort happens on spill)
+
+Enough of these spills later (controlled by the (TODO: setting) setting), the Reducer begins merge/sorting the spills into a larger combined chunk. All of this activity is happening in parallel, so by the time the last Map task output is received, the typical healthy situation is to have a modest number of large sorted chunks and one small-ish chunk holding the dregs of the final spill. Once the number of chunks is below the (TODO: look up name of setting) threshold, the merge/sort is complete -- it does not need to fully merge the data into a single file onto disk. Instead, it opens an input stream onto each of those final chunks, consuming them in sort order.
+
+Notice that the Reducer flushes the last spill of received Map data to disk, then immediately starts reconsuming it. If the memory needs of your Reducer are modest, you can instruct Hadoop to use the sort buffer directly in the final merge, eliminating the cost and delay of that final spill. It is a nice marginal improvement when it works but if you are wrong about how modest your Reducer’s memory needs are, the negative consequences are high and if your Reducers have to perform multiple merge/sort passes, the benefits are insignificant.
+
+For a well-tested job heading to production that requires one or fewer merge/sort passes, you may judiciously (TODO: describe how to adjust this).
+
+(TODO: discuss buffer sizes here or in Brave and Foolish section)
+(TODO: there is another setting that I’m forgetting here - what is it?)
+
+Once your job has concluded, you can find the number of merge/sort passes by consulting the Reduce tasks counters (TODO: DL screenshot and explanation). During the job, however, the only good mechanism is to examine the Reducer logs directly. At some reasonable time after the Reducer has started, you will see it initiate spills to disk (TODO: tell what the log line looks like). At some later point, it will begin merge/sorting those spills (TODO: tell what the log line looks like).
+
+The CPU burden of a merge/sort is disappearingly small against the dominating cost of reading then writing the data to disk. If, for example, your job only triggered one merge/sort pass halfway through receiving its data, the cost of the merge/sort is effectively one and a half times the base cost of writing that data at top speed to disk: all of the data was spilled once, half of it was rewritten as merged output. Comparing the total size of data received by the Reducer to the merge/sort settings will let you estimate the expected number of merge/sort passes; that number, along with the “top speed” figure you collected above, will, in turn, allow you to estimate how long the Reduce should take. Much of this action happens in parallel but it happens in parallel with your Mapper’s mapping, spilling and everything else that is happening on the machine.
+
+A healthy, data-intensive job will have Mappers with nearly top speed throughput, the expected number of merge/sort passes and the merge/sort should conclude shortly after the last Map input is received. (TODO: tell what the log line looks like). In general, if the amount of data each Reducer receives is less than a factor of two to three times its share of machine RAM, (TODO: should I supply a higher-fidelity thing to compare against?) all those conditions should hold. Otherwise, consult the USE method (TODO: REF).
+
+If the merge/sort phase is killing your job’s performance, it is most likely because either all of your Reducers are receiving more data than they can accommodate or because some of your Reducers are receiving far more than their fair share. We will take the uniform distribution case first.
+
+The best fix to apply is to send less data to your Reducers. The chapters on writing Map/Reduce jobs (TODO: REF or whatever we are calling Chapter 5) and the chapter on advanced Pig (TODO: REF or whatever we are calling that now) both have generic recommendations for how to send around less data and throughout the book, we have described powerful methods in a domain-specific context which might translate to your problem.
+
+If you cannot lessen the data burden, well, the laws of physics and economics must be obeyed. The cost of a merge/sort is ‘O(N LOG N)’. In a healthy job, however, most of the merge/sort has been paid down by the time the final merge pass begins, so up to that limit, your Hadoop job should run in ‘O(N)’ time governed by its top speed.
+
+The cost of excessive merge passes, however, accrues directly to the total runtime of the job. Even though there are other costs that increase with the number of machines, the benefits of avoiding excessive merge passes are massive. A cloud environment makes it particularly easy to arbitrage the laws of physics against the laws of economics -- it costs the same to run 60 machines for two hours as it does to run ten machines for 12 hours, as long as your runtime stays roughly linear with the increased number of machines, you should always size your cluster to your job, not the other way around. The thresholding behavior of excessive reduces makes it exceptionally valuable to do so. This is why we feel exploratory data analytics is far more efficiently done in an elastic cloud environment, even given the quite significant performance hit you take. Any physical cluster is too large and also too small; you are overpaying for your cluster overnight while your data scientists sleep and you are overpaying your data scientists to hold roller chair sword fights while their undersized cluster runs. Our rough rule of thumb is to have not more than 2-3 times as much total reducer data as you have total child heap size on all the reducer machines you'll use.
+
+(TODO: complete)
+
+==== Skewed Data and Stuck Reducers
+
+(TODO: complete)
+
+==== Reducer Processing
+
+(TODO: complete)
+
+==== Commit and Replication
+
+(TODO: complete)
+
+
+=== Top-line Performance/Sanity Checks
+
+* The first wave of Mappers should start simultaneously.
+* In general, all a job’s full block Map attempts should take roughly the same amount of time.
+* The full map phase should take around ‘average_Map_task_time*(number_of_Map_tasks/number_of_Mapper_slots+1)’
+* Very few non-local Map tasks.
+* Number of spills equals number of Map tasks (unless there are Combiners).
+* If there are Combiners, the Reducer input data should be much less than the Mapper output data (TODO: check this).
+* Record counts and data sizes for Mapper input, Reducer input and Reducer output should correspond to your conception of what the job is doing.
+* Map tasks are full speed (data rate matches your measured baseline)
+* Most Map tasks process a full block of data.
+* Processing stage of Reduce attempts should be full speed.
+* Not too many Merge passes in Reducers.
+* Shuffle and sort time is explained by the number of Merge passes.
+* Commit phase should be brief.
+* Total job runtime is not much more than the combined Map phase and Reduce phase runtimes.
+* Reducers generally process the same amount of data.
+* Most Reducers process at least enough data to be worth it.
+*
+
+// ____________________________________
+
+=== Performance Comparison Worksheet
+
+(TODO: DL Make a table comparing performance baseline figures on AWS and fixed hardware. reference clusters.)
+
+
View
0 21a-hadoop_internals.asciidoc → supplementary/21a-hadoop_internals.asciidoc
File renamed without changes.

0 comments on commit ae3161f

Please sign in to comment.
Something went wrong with that request. Please try again.