Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Edits from Q

  • Loading branch information...
commit 150f69f817b13ab57467b4fbfbe7aa234e1c878a 1 parent 25833a6
Philip (flip) Kromer authored
Showing with 178 additions and 82 deletions.
  1. +177 −81 02-hadoop_basics.asciidoc
  2. +1 −1  11a-spatial_join.asciidoc
View
258 02-hadoop_basics.asciidoc
@@ -1,78 +1,68 @@
-[[simple_transform]]
+[[hadoop_basics]]
== Hadoop Basics
-The harsh realities of the laws of physics and economics prevent traditional data analysis solutions such as relational databases, supercomputing and so forth from economically scaling to arbitrary-sized data for reasons very similar to Santa's original system (see sidebar). Hadoop's Map/Reduce paradigm does not provide complex operations, modification of existing records, fine-grain control over how the data is distributed or anything else beyond the ability to write programs that adhere to a single, tightly-constrained template. If Hadoop were a publishing medium, it would be one that refused essays, novels, sonnets and all other literary forms beyond the haiku:
-
- data flutters by
- elephants make sturdy piles
- context yields insight
-
-Our Map/Reduce haiku illustrates Hadoop's template:
-
-1. The Mapper portion of your script processes records, attaching a label to each.
-2. Hadoop assembles those records into context groups according to their label.
-3. The Reducer portion of your script processes those context groups and writes them to a data store or external system.
-
-What is remarkable is that from this single primitive, we can construct the familiar relational operations (such as GROUPs and ROLLUPs) of traditional databases, many machine-learning algorithms, matrix and graph transformations and the rest of the advanced data analytics toolkit. In the next two chapters, we will demonstrate high-level relational operations and illustrate the Map/Reduce patterns they express. In order to understand the performance and reasoning behind those patterns, let's first understand the motion of data within a Map/Reduce job.
+Hadoop is a large and complex beast. There's a lot to learn before one can even begin to use the system, much less become meaningfully adept at doing so. While Hadoop has stellar documentation, not everyone is comfortable diving right in to the menagerie of Mappers, Reducers, Shuffles, and so on. For that crowd, we've taken a different route, in the form of a story.
=== Chimpanzee and Elephant Start a Business ===
A few years back, two friends -- JT, a gruff silverback chimpanzee, and Nanette, a meticulous matriarch elephant -- decided to start a business. As you know, Chimpanzees love nothing more than sitting at keyboards processing and generating text. Elephants have a prodigious ability to store and recall information, and will carry huge amounts of cargo with great determination. This combination of skills impressed a local publishing company enough to earn their first contract, so Chimpanzee and Elephant Corporation (C&E Corp for short) was born.
-The publishing firm’s project was to translate the works of Shakespeare into every language known to man, so JT and Nanette devised the following scheme. Their crew set up a large number of cubicles, each with one elephant-sized desk and one or more chimp-sized desks, and a command center where JT and Nanette can coordinate the action. As with any high-scale system, each member of the team has a single responsibility to perform. The task of a chimpanzee is simply to read a set of passages, and type out the corresponding text in a new language. The cubicle's librarian elephant maintains a neat set of scrolls, according to a scheme Nanette devised, with each scroll holding a passage to translate or some passage's translated result.
+The publishing firm’s project was to translate the works of Shakespeare into every language known to man, so JT and Nanette devised the following scheme. Their crew set up a large number of cubicles, each with one elephant-sized desk and one or more chimp-sized desks, and a command center where JT and Nanette can coordinate the action.
+
+As with any high-scale system, each member of the team has a single responsibility to perform. The task of each chimpanzee is simply to read a set of passages and type out the corresponding text in a new language. JT, their foreman, efficiently assigns passages to chimpanzees, deals with absentee workers and sick days, and reports progress back to the customer. The task of each librarian elephant is to maintain a neat set of scrolls, holding either a passage to translate or some passage's translated result. Nanette serves as chief librarian. She keeps a card catalog listing, for every book, the location and essential characteristics of the various scrolls that maintain its contents.
-JT acts as foreman for the chimpanzees. When each worker clocks in for the day, they check with JT, who hands off the day's translation manual and the name of a passage to translate. Throughout the day, as each chimp completes their assigned passage, they radio in to JT, who names the next passage to translate. Nanette, meanwhile, serves as chief librarian. She keeps a card catalog that lists, for every book, the location and essential characteristics of the various scrolls that maintain its contents.
+When workers clock in for the day, they check with JT, who hands off the day's translation manual and the name of a passage to translate. Throughout the day the chimps radio progress reports in to JT; if their assigned passage is complete, JT will specify the next passage to translate.
-JT and Nanette work wonderfully together -- JT rambunctiously barking orders, Nanette peacefully gardening her card catalog -- and subtly improve the efficiency of their team in a variety of ways. We'll look closely at their bag of tricks later in the book (TODO ref) but here are two. The most striking thing any visitor to the worksite will notice is how _calm_ everything is. One reason for this is Nanette's filing scheme, which designates each book passage to be stored by multiple elephants. Nanette quietly advises JT of each passage's location, allowing him to almost always assign his chimpanzees a passage held by the librarian in their cubicle. In turn, when an elephant receives a freshly-translated scroll, she makes two photocopies and dispatches them to two other cubicles. The hallways contain a stately parade of pygmy elephants, each carrying an efficient load; the only traffic consists of photocopied scrolls to store and the occasional non-cubicle-local assignment.
+If you were to walk by a cubicle mid-workday, you would see a highly-efficient interplay between chimpanzee and elephant, ensuring the expert translators rarely had a wasted moment. As soon as JT radios back what passage to translate next, the elephant hands it across. The chimpanzee types up the translation on a new scroll, hands it back to its librarian partner and radios for the next passage. The librarian runs the scroll through a fax machine to send it to two of its counterparts at other cubicles, producing the redundant copies Nanette's scheme requires.
-The other source of calm is on the part of their clients, who know that when Nanette's on the job, their archives are safe -- the words of Shakespeare will retain their eternal form footnote:[When Nanette is not on the job, it's a total meltdown -- a story for much later in the book. But you'd be wise to always take extremely good care of the Nanettes in your life.] To ensure that no passage is never lost, the librarians on Nanette's team send regular reports on the scrolls they maintain. If ever an elephant doesn't report in (whether it stepped out for an hour or left permanently), Nanette identifies the scrolls designated for that elephant and commissions the various librarians who hold other replicas of that scroll to make and dispatch fresh copies. Each scroll also bears a check of authenticity validating that photocopying, transferring its contents or even mouldering on the shelf has caused no loss of fidelity. Her librarians regularly recalculate those checks and include them in their reports, so if even a single letter on a scroll has been altered, Nanette can commission a new replica at once.
+The fact that each chimpanzee's work is independent of any other's -- no interoffice memos, no meetings, no requests for documents from other departments -- made this the perfect first contract for the Chimpanzee & Elephant, Inc. crew. JT and Nanette, however, were cooking up a new way to put their million-chimp army to work, one that could radically streamline the processes of any modern paperful office footnote:[Some chimpanzee philosophers have put forth the fanciful conceit of a "paper-less" office, requiring impossibilities like a sea of electrons that do the work of a chimpanzee, and disks of magnetized iron that would serve as scrolls. These ideas are, of course, pure lunacy -- right up there with the foolish proposal one could harness superheated water orfg controlled explosions of marsh gas to replace the motive power of an Elephant!]. JT and Nanette would soon have the chance of a lifetime to try it out for a customer in the far north with a big, big problem.
=== Map-only Jobs: Process Records Individually ===
-We might not be as clever as JT's multilingual chimpanzees, but even we can translate text into Igpay Atinlay footnote:[aka Pig Latin. Since Pig Latin is also TODO: make sure igpay is the standard throughout book]. For the unfamiliar, here's how to http://en.wikipedia.org/wiki/Pig_latin#Rules[translate standard English into Igpay Atinlay]:
+Having read that short allegory, you've just learned a lot about how Hadoop operates under the hood. We can now use it to walk you through some examples. This first example uses only the _Map_ phase of MapReduce, to take advantage of what some people call an "embarrassingly parallel" problem.
+
+We might not be as clever as JT's multilingual chimpanzees, but even we can translate text into a language we'll call _Igpay Atinlay_ footnote:[Sharp-eyed readers will note that this language is really called _Pig Latin._ That term has another name in the Hadoop universe, though, so we've chosen to call it Igpay Atinlay -- Pig Latinizing the term "Pig Latin" -- for this example.]. For the unfamiliar, here's how to http://en.wikipedia.org/wiki/Pig_latin#Rules[translate standard English into Igpay Atinlay]:
* If the word begins with a consonant-sounding letter or letters, move them to the end of the word adding "ay": "happy" becomes "appy-hay", "chimp" becomes "imp-chay" and "yes" becomes "es-yay".
* In words that begin with a vowel, just append the syllable "way": "another" becomes "another-way", "elephant" becomes "elephant-way".
<<pig_latin_translator>> is our first Hadoop job, a program that translates plain text files into Igpay Atinlay. It's written in Wukong, a simple library to rapidly develop big data analyses. Like the chimpanzees, it is single-concern: there's nothing in there about loading files, parallelism, network sockets or anything else. Yet you can run it over a text file from the commandline -- or run it over petabytes on a cluster (should you for whatever reason have a petabyte of text crying out for pig-latinizing).
-// TODO: Is there a way to do this without regular expressions? eg tokenize then use matching regexp not gsub
-
[[pig_latin_translator]]
.Igpay Atinlay translator, actual version
----
- CONSONANTS = "bcdfghjklmnpqrstvwxz"
- UPPERCASE_RE = /[A-Z]/
- PIG_LATIN_RE = %r{
- \b # word boundary
- ([#{CONSONANTS}]*) # all initial consonants
- ([\w\']+) # remaining wordlike characters
- }xi
-
- each_line do |line|
- latinized = line.gsub(PIG_LATIN_RE) do
- head, tail = [$1, $2]
- head = 'w' if head.blank?
- tail.capitalize! if head =~ UPPERCASE_RE
- "#{tail}-#{head.downcase}ay"
- end
- yield(latinized)
- end
+ CONSONANTS = "bcdfghjklmnpqrstvwxz"
+ UPPERCASE_RE = /[A-Z]/
+ PIG_LATIN_RE = %r{
+ \b # word boundary
+ ([#{CONSONANTS}]*) # all initial consonants
+ ([\w\']+) # remaining wordlike characters
+ }xi
+
+ each_line do |line|
+ latinized = line.gsub(PIG_LATIN_RE) do
+ head, tail = [$1, $2]
+ head = 'w' if head.blank?
+ tail.capitalize! if head =~ UPPERCASE_RE
+ "#{tail}-#{head.downcase}ay"
+ end
+ yield(latinized)
+ end
----
[[pig_latin_translator]]
.Igpay Atinlay translator, pseudocode
----
- for each line,
- recognize each word in the line and change it as follows:
- separate the head consonants (if any) from the tail of the word
+ for each line,
+ recognize each word in the line and change it as follows:
+ separate the head consonants (if any) from the tail of the word
if there were no initial consonants, use 'w' as the head
- give the tail the same capitalization as the word
- change the word to "{tail}-#{head}ay"
- end
- emit the latinized version of the line
- end
+ give the tail the same capitalization as the word
+ change the word to "{tail}-#{head}ay"
+ end
+ emit the latinized version of the line
+ end
----
.Ruby helper
@@ -85,54 +75,160 @@ We might not be as clever as JT's multilingual chimpanzees, but even we can tran
It's best to begin developing jobs locally on a subset of data. Run your Wukong script directly from your terminal's commandline:
- wu-local examples/text/pig_latin.rb data/magi.txt -
+ wu-local examples/text/pig_latin.rb data/magi.txt -
The `-` at the end tells wukong to send its results to standard out (STDOUT) rather than a file -- you can pipe its output into other unix commands or Wukong scripts. In this case, there is no consumer and so the output should appear on your terminal screen. The last line should read:
- Everywhere-way ey-thay are-way isest-way. Ey-thay are-way e-thay agi-may.
+ Everywhere-way ey-thay are-way isest-way. Ey-thay are-way e-thay agi-may.
That's what it looks like when a `cat` is feeding the program data; let's see how it works when an elephant sets the pace.
-==== Transfer Data to the Cluster ====
+[NOTE]
+.Are you running on a cluster?
+====
+
+If you've skimmed Hadoop's documentation already, you've probaby seen the terms _fully-distributed,_ _pseudo-distributed,_ and _local,_ bandied about. Those describe different ways to setup your Hadoop cluster, and they're relevant to how you'll run the examples in this chapter.
-TODO: clarify that if you don't have a cluster, you can (a) make your machine use pseudo-distributed mode, or (b) skip this part as the only thing it tells you about is how to use a cluster.
+In short: if you're running the examples on your laptop, during a long-haul flight, you're likely running in local mode. That means all of the computation work takes place on your machine, and all of your data sits on your local filesystem.
-_Note: this assumes you have a working Hadoop installation, however large or small, running in distributed mode. Appendix 1 (TODO REF) lists resources for acquiring one._
+On the other hand, if you have access to a cluster, your jobs run in fully-distributed mode. All the work is farmed out to the cluster machines. In this case, your data will sit in the cluster's filesystem called HDFS.
-Hadoop jobs run best reading data from the Hadoop Distributed File System (HDFS). To copy the data onto the cluster, run these lines:
+Run the following commands to copy your data to HDFS:
- hadoop fs -mkdir ./data
- hadoop fs -put wukong_example_data/text ./data/
+ hadoop fs -mkdir ./data
+ hadoop fs -put wukong_example_data/text ./data/
These commands understand `./data/text` to be a path on the HDFS, not your local disk; the dot `.` is treated as your HDFS home directory (use it as you would `~` in Unix.). The `wu-put` command, which takes a list of local paths and copies them to the HDFS, treats its final argument as an HDFS path by default, and all the preceding paths as being local.
-==== Run the Job on the Cluster ====
+
+(Note: if you don't have access to a Hadoop cluster, Appendix 1 (REF) lists resources for acquiring one.)
+
+
+
+==== Run the Job ====
First, let's test on the same tiny little file we used at the commandline.
// Make sure to notice how much _longer_ it takes this elephant to squash a flea than it took to run without Hadoop.
- wukong launch examples/text/pig_latin.rb ./data/text/magi.txt ./output/latinized_magi
+ wukong launch examples/text/pig_latin.rb ./data/text/magi.txt ./output/latinized_magi
+
+CODE: something about what the reader can expect to see on screen
+
+While the script outputs a bunch of happy robot-ese to your screen, open up the jobtracker in your browser window (see the sidebar REF). The job should appear on the jobtracker window within a few seconds -- likely in more time than the whole job took to complete.
+
+----
+SIDEBAR: The Jobtracker Console
+
+When you are running on a distributed Hadoop cluster, the jobtracker offers a built-in console for monitoring and diagnosing jobs. You typically access it by pointing your web browser at 'http://hostname.of.jobtracker:50030' (replace "hostname.of.jobtracker" with, you know, the hostname of your jobtracker and if that jobtracker is running on your local machine, you can use http://hostname.of.jobtracker:50030`). At the top, the cluster summary shows how many jobs are running and how many worker slots exist. Clicking on the hyperlinked value under "nodes" will take you to a screen summarizing all the task trackers in the cluster. Keep an eye out for other such hyperlinked values within the jobtracker console -- they lead to screens describing those elements in details.
+
+Further down the page, you will see sections for running, completed and failed jobs. The last part of each job's name is an index showing the order it was received and appears in the flurry of messages when you launched your job.
+
+Clicking on the job name will take you to a page summarizing that job. We will walk through the page from the bottom up because that is how to best understand the job's progress. The very bottom of the page contains a fun pair of bar charts showing the progress of each Map and Reduce task, from zero to 100-percent complete. A healthy job with many tasks should look like the screenshots below (TODO: Screenshot). During the Map phase, you should see a rolling wave of bars: completed tasks at 100 percent on the left, pending tasks at zero percent on the right and a wavefront of running tasks that smoothly advance from zero to 100 percent at a fairly uniform pace. During the Reduce phase, you should see the full set of bars advanced at nearly the same rate up the page. Each bar has three sections (we will learn later more about what they mean but it is enough for now to know how they should behave).
+
+You should observe slow progress through the shuffle stage beginning part way through the Map phase of the job and steady progress at a slightly higher pace as soon as the Map phase concludes. Unless you are heavily burdening your Reducers, the graph should walk right through the Sort stage and begin making steady uniform progress through the final Reduce step.
+
+(TODO: Check that shuffle progress is displayed as non-0 during Map phase).
-TODO: something about what the reader can expect to see on screen
+The job is not completely finished when the last Reducer hits 100 percent -- there remains a Commit phase with minor bookkeeping chores, and replication of the output data -- but the delay from end of Reduce to successful job completion should be small.
+
+The main thing to watch for in the Reduce phase is rapid progress by most of your Reducers and painfully slow progress by a few of them -- the "skewed reducer" problem. Because of either a simple mistake on your part or a deep challenge resulting from the structure of your data, Hadoop has sent nearly all the records to those few machines. Those simple mistakes are described in Chapter (REF); defense against highly-skewed data is, in a sense, the motivation for most of the methods presented in the middle section of the book.
+
+Do not put too much faith in the "percent complete" numbers for the job as a whole and even for its individual tasks. These really only show the fraction of data processed, which is an imperfect indicator of progress and harder to determine than you might think. Among other pecadillos, some compressed input formats report no progress mid-task; they linger at zero then go straight to 100 percent.
+
+Above the job progress bar graphs is a hot mess of a table showing all sorts of metrics about your job, such as how much data read and written at each phase. We will run down the important ones a bit later in the book (REF).
+
+Above that section is a smaller table giving the count of pending, running, complete, killed and failed jobs. Most of the values in that table are hyperlinks that begin the annoyingly long trip required to see your logs. Clicking on the completed tasks number takes you to a screen listing all the tasks; clicking on a task ID takes you to a screen listing the machine or machines running it. Clicking on the attempt ID shows a page describing that machine's progress through the task; and all the way on the right side of the table on that page, you will find three sets of links reading "4KB," "8KB," "All." (TECH: check details). Those links lead, at long last, to the log files for that job on that machine. The "All" link shows you the full contents but if your job is so screwed up that the log file will flood your browser, the "8KB" link shows the truncated tale.
+
+Lastly, near the top of a page is a long URL that ends with "job.xml". Do not go there now; it is a monstrous file listing every single configuration value set by your job but keep it in mind for when you have run out of ideas as to why a job is failing.
+
+----
+
+...
+
+----
+SIDEBAR: How a job is born, the thumbnail version
+
+Apart from one important detail, the mechanics of how a job is born should never become interesting to a Hadoop user. But since some people's brains won't really believe that the thing actually works unless we dispel some of the mystery, here's a brief synopsis.
+
+When you run `wukong ...` or `pig ...` or otherwise launch a Hadoop job, your local program contacts the jobtracker to transfer information about the job and the Java `.jar` file each worker should execute. If the input comes from an HDFS, the jobtracker (TECH: ?job client?) consults its namenode for details about the input blocks, figures out a job ID and any other remaining configuration settings, and accepts the job. It replies to your job client with the job ID and other information, leading to the happy mess of log messages your program emits. (TECH: check the details on this) Your local program continues to run during the full course of the job so that you can see its progress, but is now irrelevant -- logging out or killing the local program has no impact on the job's success.
+
+As you have gathered, each Hadoop worker runs a tasktracker daemon to coordinate the tasks run by that machine. Like JT's chimpanzees, those tasktrackers periodically report progress to the jobtracker, requesting new work whenever there is an idle slot. The jobtracker never makes outward contact with a task tracker -- this ensures work is only distributed to healthy machines at a rate they can handle. Just as JT strives to ensure that chimpanzees are only assigned passages held by their cubicle mates, the jobtracker schedules strives to assign each map attempt to a machine that holds its input blocks (known as "mapper-local" task). But if too many blocks of a file hotspot on a small number of datanodes, mapper slots with no remaining mapper-local blocks to handle still receive task attempts, and simply pull in their input data over the network.
+
+The one important detail to learn in all this is that _task trackers do not run your job, they only launch it_. Your job executes in a completely independent child process with its own Java settings and library dependencies. In fact, if you are using Hadoop streaming programs like Wukong, your job runs in even yet its own process, spawned by the Java child process. We've seen people increase the tasktracker memory sizes thinking it will improve cluster performance -- the only impact of doing so is to increase the likelihood of out-of-memory errors.
+----
-While the script outputs a bunch of happy robot-ese to your screen, open up the jobtracker in your browser window by visiting `http://hostname_of_jobtracker:50030`. The job should appear on the jobtracker window within a few seconds -- likely in more time than the whole job took to complete. You will see (TODO describe jobtracker job overview).
You can compare its output to the earlier by running
- hadoop fs -cat ./output/latinized_magi/\*
+ hadoop fs -cat ./output/latinized_magi/\*
That command, like the Unix ‘cat’ command, dumps the contents of a file to standard out, so you can pipe it into any other command line utility. It produces the full contents of the file, which is what you would like for use within scripts but if your file is hundreds of MB large, as HDFS files typically are, dumping its entire contents to your terminal screen is ill appreciated. We typically, instead, use the Unix ‘head’ command to limit its output (in this case, to the first ten lines).
- hadoop fs -cat ./output/latinized_magi/\* | head -n 10
+ hadoop fs -cat ./output/latinized_magi/\* | head -n 10
Since you wouldn't want to read a whole 10GB file just to see whether the right number of closing braces come at the end, there is also a `hadoop fs -tail` command that dumps the terminal one kilobyte of a file.
Here's what the head and tail of your output should contain:
- TODO screenshot of hadoop fs -cat ./output/latinized_magi/\* | head -n 10
- TODO screenshot of hadoop fs -tail ./output/latinized_magi/\*
+ TODO screenshot of hadoop fs -cat ./output/latinized_magi/\* | head -n 10
+ TODO screenshot of hadoop fs -tail ./output/latinized_magi/\*
+
+
+=== Grouping and Sorting: Analyzing UFO Sightings with Pig
+
+While those embarassingly parallel, Map-only jobs are useful, Hadoop also shines when it comes to filtering, grouping, and counting items in a dataset. We can apply these techniques to build a travel guide of UFO sightings across the continental US.
+
+Whereas our last example used the wukong framework, this time around we'll use another Hadoop abstraction, called Pig. footnote:[http://pig.apache.org] Pig's claim to fame is that it gives you full Hadoop power, using a syntax that lets you think in terms of data flow instead of pure Map and Reduce operations.
+
+The example data included with the book includes a data set from the http://www.infochimps.com/datasets/60000-documented-ufo-sightings-with-text-descriptions-and-metada[National UFO Reporting Center], containing more than 60,000 documented UFO sightings footnote:[For our purposes, although sixty thousand records are too small to justify Hadoop on their own, it's the perfect size to learn with.].
-=== Map/Reduce
+Now it's sad to say, but many of the sighting reports are likely to be bogus. To eliminate sightings that lack a detailed description, we can filter out records whose description Field is shorter than 80 characters:
+
+
+----
+
+TODO code
+
+----
+
+
+
+A key activity in a Big Data exploration is summarizing big datasets into a comprehensible smaller ones. Each sighting has a field giving the shape of the flying object: cigar, disk, etc. This script will tell us how many sightings there are for each craft type:
+
+
+
+----
+
+LOAD sightings
+
+GROUP sightings BY craft type
+
+FOREACH cf_sightings GENERATE COUNTSTAR(sightings)
+
+STORE cf_counts INTO 'out/geo/ufo_sightings/craft_type_counts';
+
+----
+
+We can make a little travel guide for the sightings by amending each sighting with the Wikipedia article about its place. The JOIN operator matches records from different tables based on a common key:
+
+----
+
+TODO pseudocode
+
+----
+
+
+
+This yields the following output:
+
+----
+
+TODO output
+
+----
+
+
+This travel guide is a bit lame, but of course we can come up with all sorts of ways to improve it. For instance, a proper guide would hold not just the one article about the general location, but a set of prominent nearby places of interest. These notions crop up in many different problems, so later in the book we'll show you how to do a nearby-ness query (in the Geodata chapter (REF)), and how to attach a notion of "prominence" (in the event log chapter (REF)), and much more.
As a demonstration, let's find out when aliens like to visit the planet earth. Here is a Wukong script to processes the UFO dataset to find the aggregate number of sightings per month:
@@ -142,7 +238,7 @@ MAPPER EXTRACTS MONTHS, EMITS MONTH AS KEY WITH NO VALUE
COUNTING REDUCER INCREMENTS ON EACH ENTRY IN GROUP AND EMITS TOTAL IN FINALIZED METHOD
----
-To run the Wukong job, go into the (TODO: REF) directory and run
+To run the Wukong job, go into the (REF) directory and run
----
wu-run monthly_visit_counts.rb --reducers_count=1 /data_UFO_sightings.tsv /dataresults monthly_visit_counts-wu.tsv
@@ -150,7 +246,7 @@ wu-run monthly_visit_counts.rb --reducers_count=1 /data_UFO_sightings.tsv /datar
The output shows (TODO:CODE: INSERT CONCLUSIONS).
-==== Wikipedia Visitor Counts
+=== Understanding Progress and Results: Wikipedia Visitor Counts
Let’s put Pig to a sterner test. Here’s the script above, modified to run on the much-larger Wikipedia dataset and to assemble counts by hour, not month:
@@ -158,19 +254,22 @@ EDIT TODO modified script
==== See Progress and Results
+Until now, we've run small jobs so you could focus on learning. Hadoop is built for big jobs, though, and it's important to understand how work flows through the system.
+
+
EDIT Wikipedia visitor counts, summing values -- not weather, not articles
Now let's run it on a corpus large enough to show off the power of distributed computing. Shakespeare's combined works are too small -- at (TODO find size) even the prolific bard's lifetime of work won't make Hadoop break a sweat. Luckily, we've had a good slice of humanity typing thoughts into wikipedia for several years, and the corpus containing every single wikipedia article is enough to warrant Hadoop's power (and tsoris footnote:[trouble and suffering]).
- wukong launch examples/text/pig_latin.rb ./data/text/wikipedia/wp_articles ./output/latinized_wikipedia
+ wukong launch examples/text/pig_latin.rb ./data/text/wikipedia/wp_articles ./output/latinized_wikipedia
TODO screenshot of output, and fix up filenames
-This job will take quite a bit longer to run, giving us a chance to demonstrate how to monitor its progress. (If your cluster is so burly the job finishes in under a minute or so, quit bragging and supply enough duplicate copies of the input to grant you time.) In the center of the Job Tracker’s view of your job, there is a table listing, for Mappers and Reducers, the number of tasks pending (waiting to be run), running, complete, killed (terminated purposefully not by error) and failed (terminated due to failure).
+Visit the jobtracker console (see sidebar REF). The first thing you'll notice is how much slower this runs! That gives us a chance to demonstrate how to monitor its progress. (If your cluster is so burly the job finishes in under a minute or so, quit bragging and supply enough duplicate copies of the input to grant you time.) In the center of the Job Tracker’s view of your job you will find a table that lists status of map and reduce tasks. The number of tasks pending (waiting to be run), running, complete, killed (terminated purposefully not by error) and failed (terminated due to failure).
-The most important numbers to note are the number of running tasks (there should be some unless your job is finished or the cluster is congested) and the number of failed tasks (for a healthy job on a healthy cluster, there should never be any). Don't worry about killed tasks; for reasons we'll explain later on, it's OK if a few appear late in a job. We will describe what to do when there are failing attempts later in the section on debugging Hadoop jobs (TODO: REF), but in this case, there shouldn't be any. Clicking on the number of running Map tasks will take you to a window that lists all running attempts (and similarly for the other categories). On the completed tasks listing, note how long each attempt took; for the Amazon M3.xlarge machines we used, each attempt took about x seconds (TODO: correct time and machine size). There is a lot of information here, so we will pick this back up in chapter (TODO ref), but the most important indicator is that your attempts complete in a uniform and reasonable length of time. There could be good reasons why you might find task 00001 to still be running after five minutes while other attempts have been finishing in ten seconds, but if that's not what you thought would happen you should dig deeper footnote:[A good reason is that task 00001's input file was compressed in a non-splittable format and is 40 times larger than the rest of the files. A bad reason is that task 00001 is trying to read from a failing-but-not-failed datanode, or has a corrupted record that is sending the XML parser into recursive hell. The good reasons you can always predict from the data itself; otherwise assume it's a bad reason].
+The most important numbers to note are the number of running tasks (there should be some unless your job is finished or the cluster is congested) and the number of failed tasks (for a healthy job on a healthy cluster, there should never be any). Don't worry about killed tasks; for reasons we'll explain later on, it's OK if a few appear late in a job. We will describe what to do when there are failing attempts later in the section on debugging Hadoop jobs (REF), but in this case, there shouldn't be any. Clicking on the number of running Map tasks will take you to a window that lists all running attempts (and similarly for the other categories). On the completed tasks listing, note how long each attempt took; for the Amazon M3.xlarge machines we used, each attempt took about x seconds (TODO: correct time and machine size). There is a lot of information here, so we will pick this back up in chapter (REF), but the most important indicator is that your attempts complete in a uniform and reasonable length of time. There could be good reasons why you might find task 00001 to still be running after five minutes while other attempts have been finishing in ten seconds, but if that's not what you thought would happen you should dig deeper footnote:[A good reason is that task 00001's input file was compressed in a non-splittable format and is 40 times larger than the rest of the files. A bad reason is that task 00001 is trying to read from a failing-but-not-failed datanode, or has a corrupted record that is sending the XML parser into recursive hell. The good reasons you can always predict from the data itself; otherwise assume it's a bad reason].
-You should get in the habit of sanity-checking the number of tasks and the input and output sizes at each job phase for the jobs you write. In this case, the job should ultimately require x Map tasks, no Reduce tasks and on our x machine cluster, it completed in x minutes. For this input, there should be one Map task per HDFS block, x GB of input with the typical one-eighth GB block size, means there should be 8x Map tasks. Sanity checking the figure will help you flag cases where you ran on all the data rather than the one little slice you intended or vice versa; to cases where the data is organized inefficiently; or to deeper reasons that will require you to flip ahead to chapter (TODO: REF).
+You should get in the habit of sanity-checking the number of tasks and the input and output sizes at each job phase for the jobs you write. In this case, the job should ultimately require x Map tasks, no Reduce tasks and on our x machine cluster, it completed in x minutes. For this input, there should be one Map task per HDFS block, x GB of input with the typical one-eighth GB block size, means there should be 8x Map tasks. Sanity checking the figure will help you flag cases where you ran on all the data rather than the one little slice you intended or vice versa; to cases where the data is organized inefficiently; or to deeper reasons that will require you to flip ahead to chapter (REF).
Annoyingly, the Job view does not directly display the Mapper input data, only the cumulative quantity of data per source, which is not always an exact match. Still, the figure for HDFS bytes read should closely match the size given by ‘Hadoop fs -du’ (TODO: add pads to command).
@@ -178,20 +277,19 @@ You can also estimate how large the output should be, using the "Gift of the Mag
We cannot stress enough how important it is to validate that your scripts are doing what you think they are. The whole problem of Big Data is that it is impossible to see your data in its totality. You can spot-check your data, and you should, but without independent validations like these you're vulnerable to a whole class of common defects. This habit -- of validating your prediction of the job’s execution -- is not a crutch offered to the beginner, unsure of what will occur; it is a best practice, observed most diligently by the expert, and one every practitioner should adopt.
-
=== The Map Phase Processes Records Individually
-The Map phase receives 0, 1 or many records individually, with no guarantees from Hadoop about their numbering, order or allocation. (FOOTNOTE: In special cases, you may know that your input bears additional guarantees -- for example, the MERGE/JOIN described in Chapter (TODO: REF) requires its inputs to be in total sorted order. It is on you, however, to enforce and leverage those special properties.) Hadoop does guarantee that every record arrives in whole to exactly one Map task and that the job will only succeed if every record is processed without error.
+The Map phase receives 0, 1 or many records individually, with no guarantees from Hadoop about their numbering, order or allocation. (FOOTNOTE: In special cases, you may know that your input bears additional guarantees -- for example, the MERGE/JOIN described in Chapter (REF) requires its inputs to be in total sorted order. It is on you, however, to enforce and leverage those special properties.) Hadoop does guarantee that every record arrives in whole to exactly one Map task and that the job will only succeed if every record is processed without error.
The Mapper receives those records sequentially -- it must fully process one before it receives the next -- and can emit 0, 1 or many inputs of any shape or size. The chimpanzees working on the SantaCorp project received letters but dispatched toy forms. Julia's thoughtful note produced two toy forms, one for her doll and one for Joe's robot, while the spam letter produced no toy forms. Hadoop's 'distcp' utility, used to copy data from cluster to cluster, takes this to a useful extreme: Each Mapper's input is a remote file to fetch. Its action is to write that file's contents directly to the HDFS as a Datanode client and its output is a summary of what it transferred.
-The right way to bring in data from an external resource is by creating a custom loader or input format (see the chapter on Advanced Pig (TODO: REF)), which decouples loading data from processing data and allows Hadoop to intelligently manage tasks. The poor-man's version of a custom loader, useful for one-offs, is to prepare a small number of file names, URLs, database queries or other external handles as input and emit the corresponding contents.
+The right way to bring in data from an external resource is by creating a custom loader or input format (see the chapter on Advanced Pig (REF)), which decouples loading data from processing data and allows Hadoop to intelligently manage tasks. The poor-man's version of a custom loader, useful for one-offs, is to prepare a small number of file names, URLs, database queries or other external handles as input and emit the corresponding contents.
-Please be aware, however, that it is only appropriate to access external resources from within a Hadoop job in exceptionally rare cases. Hadoop processes data in batches, which means failure of a single record results in the retry of the entire batch. It also means that when the remote resource is unavailable or responding sluggishly, Hadoop will spend several minutes and unacceptably many retries before abandoning the effort. Lastly, Hadoop is designed to drive every system resource at its disposal to its performance limit. (FOOTNOTE: We will drive this point home in the chapter on Event Log Processing (TODO: REF), where we will stress test a web server to its performance limit by replaying its request logs at full speed.)
+Please be aware, however, that it is only appropriate to access external resources from within a Hadoop job in exceptionally rare cases. Hadoop processes data in batches, which means failure of a single record results in the retry of the entire batch. It also means that when the remote resource is unavailable or responding sluggishly, Hadoop will spend several minutes and unacceptably many retries before abandoning the effort. Lastly, Hadoop is designed to drive every system resource at its disposal to its performance limit. (FOOTNOTE: We will drive this point home in the chapter on Event Log Processing (REF), where we will stress test a web server to its performance limit by replaying its request logs at full speed.)
While a haiku with only its first line is no longer a haiku, a Hadoop job with only a Mapper is a perfectly acceptable Hadoop job, as you saw in the Igpay Atinlay translation example. In such cases, each Map Task's output is written directly to the HDFS, one file per Map Task, as you've seen. Such jobs are only suitable, however, for so-called "embarrassingly parallel problems" -- where each record can be processed on its own with no additional context.
-The Map stage in a Map/Reduce job has a few extra details. It is responsible for labeling the processed records for assembly into context groups. Hadoop files each record into the equivalent of the pigmy elephants' file folders: an in-memory buffer holding each record in sorted order. There are two additional wrinkles, however, beyond what the pigmy elephants provide. First, the Combiner feature lets you optimize certain special cases by preprocessing partial context groups on the Map side; we will describe these more in a later chapter (TODO: REF). Second, if the sort buffer reaches or exceeds a total count or size threshold, its contents are "spilled" to disk and subsequently merge/sorted to produce the Mapper's proper output.
+The Map stage in a Map/Reduce job has a few extra details. It is responsible for labeling the processed records for assembly into context groups. Hadoop files each record into the equivalent of the pigmy elephants' file folders: an in-memory buffer holding each record in sorted order. There are two additional wrinkles, however, beyond what the pigmy elephants provide. First, the Combiner feature lets you optimize certain special cases by preprocessing partial context groups on the Map side; we will describe these more in a later chapter (REF). Second, if the sort buffer reaches or exceeds a total count or size threshold, its contents are "spilled" to disk and subsequently merge/sorted to produce the Mapper's proper output.
=== The HDFS: Highly Durable Storage Optimized for Analytics ===
@@ -201,7 +299,7 @@ The HDFS typically stores multiple replicas of each block (three is the universa
JT and Nanette’s workflow illustrates the second benefit of replication: being able to “move the compute to the data, not [expensively] moving the data to the compute.” Multiple replicas give the Job Tracker enough options that it can dependably assign most tasks to be “Mapper-local.”
-Like Nanette, the Namenode holds no data, only a sort of file allocation table (FAT), tracking for every file the checksum responsible Datanodes and other essential characteristics of each of its blocks. The Namenode depends on the Datanodes to report in regularly. Every three seconds, it sends a heartbeat -- a lightweight notification saying, basically, "I'm still here!". On a longer timescale, each Datanode prepares a listing of the replicas it sees on disk along with a full checksum of each replica's contents. Having the Datanode contact the Namenode is a good safeguard that it is operating regularly and with good connectivity. Conversely, the Namenode uses the heartbeat response as its opportunity to issue commands dening a struggling Datanode.
+The _Namenode_ daemon is responsible for distributing those blocks of data across the cluster. Like Nanette, the Namenode holds no data, only a sort of file allocation table (FAT), tracking for every file the checksum responsible Datanodes and other essential characteristics of each of its blocks. The Namenode depends on the Datanodes to report in regularly. Every three seconds, it sends a heartbeat -- a lightweight notification saying, basically, "I'm still here!". On a longer timescale, each Datanode prepares a listing of the replicas it sees on disk along with a full checksum of each replica's contents. Having the Datanode contact the Namenode is a good safeguard that it is operating regularly and with good connectivity. Conversely, the Namenode uses the heartbeat response as its opportunity to issue commands dening a struggling Datanode.
If, at any point, the Namenode finds a Datanode has not sent a heartbeat for several minutes, or if a block report shows missing or corrupted files, it will commission new copies of the affected blocks by issuing replication commands to other Datanodes as they heartbeat in.
@@ -214,15 +312,14 @@ One last essential to note about the HDFS is that its contents are immutable. O
Possibly the biggest rookie mistake made by those new to Big Data is a tendency to economize on the amount of data they store; we will try to help you break that habit. You should be far more concerned with the amount of data you send over the network or to your CPU than with the amount of data you store and most of all, with the amount of time you spend deriving insight rather than acting on it. Checkpoint often, denormalize when reasonable and preserve the full provenance of your results.
We'll spend the next few chapters introducing these core operations from the ground up. Let's start by joining JT and Nannette with their next client.
-
+
=== SIDEBAR: What's Fast At High Scale
-
image::images/02-Throughput-and-Cost-for-Compute-Primitives-aka-Numbers-Every-Programmer-Should-Know.png[Throughput and Cost for Compute Primitives -- the "Numbers Every Programmer Should Know"]
image::images/02-Cost-to-Host-and-Serve-1TB.png[Cost to Host and Serve One Billion 1kB Records (1 TB)]
-The table at the right (TODO: REF) summarizes the 2013 values for Peter Norvig's http://norvig.com/21-days.html#answers["Numbers Every Programmer Should Know."] -- the length of time for each computation primitive on modern hardware. We've listed the figures several different ways: as latency (time to execute); as the number of 500-byte records that could be processed in an hour (TODO: day), if that operation were the performance bottleneck of your process; and as an amount of money to process one billion records of 500-byte each on commodity hardware. Big Data requires high volume, high throughput computing, so our principle bound is the speed at which data can be read from and stored to disk. What is remarkable is that with the current state of technology, most of the other operations are slammed to one limit or the other: either bountifully unconstraining or devastatingly slow. That lets us write down the following "rules for performance at scale:"
+The table at the right (REF) summarizes the 2013 values for Peter Norvig's http://norvig.com/21-days.html#answers["Numbers Every Programmer Should Know."] -- the length of time for each computation primitive on modern hardware. We've listed the figures several different ways: as latency (time to execute); as the number of 500-byte records that could be processed in an hour (TODO: day), if that operation were the performance bottleneck of your process; and as an amount of money to process one billion records of 500-byte each on commodity hardware. Big Data requires high volume, high throughput computing, so our principle bound is the speed at which data can be read from and stored to disk. What is remarkable is that with the current state of technology, most of the other operations are slammed to one limit or the other: either bountifully unconstraining or devastatingly slow. That lets us write down the following "rules for performance at scale:"
* High throughput programs cannot run faster than x (TODO: Insert number)
* Data can be streamed to and from disk at x GB per hour (x records per hour, y records per hour, z dollars per billion records) (TODO: insert numbers)
@@ -246,9 +343,8 @@ Now that you know how Hadoop moves data around, you can use these rules to expla
That leaves the big cost of most Hadoop jobs: the midstream merge-sort. Spilled blocks are merged in several passes (at the Reducer and sometimes at the Mapper) as follows. Hadoop begins streaming data from each of the spills in parallel. Under the covers, what this means is that the OS is handing off the contents of each spill as blocks of memory in sequence. It is able to bring all its cleverness to bear, scheduling disk access to keep the streams continually fed as rapidly as each is consumed.
-Hadoop's actions are fairly straightforward. Since the spills are each individually sorted, at every moment the next (lowest ordered) record to emit is guaranteed to be the next unread record from one of its streams. It continues in this way, eventually merging each of its inputs into an unbroken output stream to disk. The memory requirements -- the number of parallel streams times the buffer size per stream -- are manageable and the CPU burden is effectively nil, so the merge/sort as well runs at the speed of streaming to disk.
+Hadoop's actions are fairly straightforward. Since the spills are each individually sorted, at every moment the next (lowest ordered) record to emit is guaranteed to be the next unread record from one of its streams. It continues in this way, eventually merging each of its inputs into an unbroken output stream to disk. At no point does the Hadoop framework require a significant number of seeks on disk or requests over the network; the memory requirements (the number of parallel streams times the buffer size per stream) are manageable; and the CPU burden is effectively nil, so the merge/sort as well runs at the speed of streaming to disk.
+
-At no point does the Hadoop framework require a significant number of seeks on disk or requests over the network.
-is individually sorted, the first (lowest ordered record) in the merged stream to emit is guaranteed to be the lowest ordered record in one of its input streams.
View
2  11a-spatial_join.asciidoc
@@ -57,7 +57,7 @@ Large-scale geodata processing in hadoop starts with the quadtile grid system, a
==== The Quadtile Grid System ====
-We'll start by adopting the simple, flat Mercator projection -- directly map longitude and latitude to (X,Y). This makes geographers cringe, because of its severe distortion at the poles, but its computational benefits are worth it. footnote:[Two guides for which map projection to choose: http://www.radicalcartography.net/?projectionref http://xkcd.com/977/ . As you proceed to finer and finer zoom levels the projection distortion becomes less and less relevant, so the simplicity of Mercator or Equirectangular is appealing.]
+We'll start by adopting the simple, flat Mercator projection -- directly map longitude and latitude to (X,Y). This makes geographers cringe, because of its severe distortion at the poles, but its computational benefits are worth it. footnote:[Two guides for which map projection to choose: http://www.radicalcartography.net/?projectionref http://xkcd.com/977/ . As you proceed to finer and finer zoom levels the projection distortion becomes less and less relevant, so the simplicity of Mercator or Equirectangular are appealing.]
Now divide the world into four and make a Z pattern across them:
Please sign in to comment.
Something went wrong with that request. Please try again.