Skip to content

Latest commit

 

History

History
357 lines (234 loc) · 34.1 KB

02-hadoop_basics.asciidoc

File metadata and controls

357 lines (234 loc) · 34.1 KB

Hadoop Basics

Note
This chapter and the next couple will see some reshuffling to give the following narrative flow:
  1. (first chapter)

  2. (this one) Here’s how to use hadoop

  3. Here’s your first map/reduce jobs, and how data moves around behind the scenes

  4. Pig lets you work with whole datasets, not record-by-record

  5. The core analytic patterns of Hadoop, as Pig scripts and as Wukong map/reduce scripts

Pardon this digression-laden introduction

You can think of Hadoop’s processing model in a few ways

The first part of this book will introduce the Hadoop toolset.

  • The centerpiece of the Big Data toolset is Hadoop, an open-source batch processing toolkit

    • This book will present several (paradigms for/ways to think about) Big Data analytics

    • Hadoop serves well as a distributed runner.

    • One (approach/paradigm) is record-oriented

Data is worthless. Actually, it’s worse than worthless: it requires money and effort to collect, store, transport and organize. Nobody wants data.

What everybody wants is insight — the patterns, connections, (summarizations) that lead to deeper understanding and better decisions

  1. Process records independently into a new form

  2. Assemble records into context

  3. Synthesize context groups into new forms

  4. Write the results to a datastore or external system

    • Operations that apply locally to each partition and cause no network transfer

    • Repartitioning operations that repartition a stream but otherwise don’t change the contents (involves network transfer)

    • Aggregation operations that do network transfer as part of the operation

    • Operations on grouped streams

    • Merges and joins

data flutters by (process and label records) elephants make sturdy piles (contextify? assemble? by label) context yields insight (process context groups)

We’ll start with an application that only requires processing records independently — each record requires no context. You’ll learn the mechanics of running Hadoop jobs: how to load and retrieve data, launch your job and see its progress, and so forth. But Hadoop is useful for far more than such so-called "embarrassingly parallel" problems. The second program exhibits the full map-reduce paradigm. The program is simple, but it’s scalable. Slight modification of the program to Count 56,000 UFO sightings by month Build the analogous timeseries for the X billion Wikipedia pages.

We’ve just seen how Now let’s understand a high-level picture of What Hadoop is doing, and why this makes it scalable. (Merge sort, secondary sort)

So far we’ve seen two paradigms: distributed work Record-oriented

  • Letters to toyforms

  • Toyforms to parts forms, parts and toyforms to desks

  • Toys by type and subtype

  • Toys by crate and then address

  • Map/Reduce Paradigm

  • Elephant and Chimpanzee Save Christmas part 1

  • Elves in Crisis

  • Making Toys: Children’s letters Become Labelled Toy Forms

  • Making Toys: Toy Forms Dispatched to Workbench

  • map/reduce *

  • part 2

  • Shipping Toys: Cities are mapped to Crates

  • Shipping Toys:

  • Tracking Inventory:

  • Secondary Sort

  • part 3?

  • Tracking Inventory: *

  • Aggregation

  • Structural Operations Paradigm

  • Overview of Operation types

  • FOREACH processes records individually

  • FILTER

  • JOIN matches records in two tables

  • Use a Replicated JOIN When One Table is Small

  • GROUP with Aggregating Functions Summarize Related Records

  • GROUP and COGROUP Assemble Complex

  • After a GROUP, a FOREACH has special abilities

The harsh realities of the laws of physics and economics prevent traditional data analysis solutions such as relational databases, supercomputing and so forth from economically scaling to arbitrary-sized data for reasons very similar to Santa’s original system (see sidebar). Hadoop’s Map/Reduce paradigm does not provide complex operations, modification of existing records, fine-grain control over how the data is distributed or anything else beyond the ability to write programs that adhere to a single, tightly-constrained template. If Hadoop were a publishing medium, it would be one that refused essays, novels, sonnets and all other literary forms beyond the haiku:

data flutters by
    elephants make sturdy piles
  context yields insight
data flutters by              		(process and label records)
    elephants make sturdy piles   	(contextify/assemble (?) by label)
  insight shuffles forth        		(process context groups; store(?))

(TODO: insight shuffles forth?)

Our Map/Reduce haiku illustrates Hadoop’s template:

  1. The Mapper portion of your script processes records, attaching a label to each.

  2. Hadoop assembles those records into context groups according to their label.

  3. The Reducer portion of your script processes those context groups and writes them to a data store or external system.

What is remarkable is that from this single primitive, we can construct the familiar relational operations (such as GROUPs and ROLLUPs) of traditional databases, many machine-learning algorithms, matrix and graph transformations and the rest of the advanced data analytics toolkit. In the next two chapters, we will demonstrate high-level relational operations and illustrate the Map/Reduce patterns they express. In order to understand the performance and reasoning behind those patterns, let’s first understand the motion of data within a Map/Reduce job.

The Map Phase Processes and Labels Records Individually

The Map phase receives 0, 1 or many records individually, with no guarantees from Hadoop about their numbering, order or allocation. (FOOTNOTE: In special cases, you may know that your input bears additional guarantees — for example, the MERGE/JOIN described in Chapter (TODO: REF) requires its inputs to be in total sorted order. It is on you, however, to enforce and leverage those special properties.) Hadoop does guarantee that every record arrives in whole to exactly one Map task and that the job will only succeed if every record is processed without error.

The Mapper receives those records sequentially — it must fully process one before it receives the next — and can emit 0, 1 or many inputs of any shape or size. The chimpanzees working on the SantaCorp project received letters but dispatched toy forms. Julia’s thoughtful note produced two toy forms, one for her doll and one for Joe’s robot, while the spam letter produced no toy forms. Hadoop’s 'distcp' utility, used to copy data from cluster to cluster, takes this to a useful extreme: Each Mapper’s input is a remote file to fetch. Its action is to write that file’s contents directly to the HDFS as a Datanode client and its output is a summary of what it transferred.

The right way to bring in data from an external resource is by creating a custom loader or input format (see the chapter on Advanced Pig (TODO: REF)), which decouples loading data from processing data and allows Hadoop to intelligently manage tasks. The poor-man’s version of a custom loader, useful for one-offs, is to prepare a small number of file names, URLs, database queries or other external handles as input and emit the corresponding contents.

Please be aware, however, that it is only appropriate to access external resources from within a Hadoop job in exceptionally rare cases. Hadoop processes data in batches, which means failure of a single record results in the retry of the entire batch. It also means that when the remote resource is unavailable or responding sluggishly, Hadoop will spend several minutes and unacceptably many retries before abandoning the effort. Lastly, Hadoop is designed to drive every system resource at its disposal to its performance limit. (FOOTNOTE: We will drive this point home in the chapter on Event Log Processing (TODO: REF), where we will stress test a web server to its performance limit by replaying its request logs at full speed.)

While a haiku with only its first line is no longer a haiku, a Hadoop job with only a Mapper is a perfectly acceptable Hadoop job, as you saw in the Pig Latin translation example. In such cases, each Map Task’s output is written directly to the HDFS, one file per Map Task, as you’ve seen. Such jobs are only suitable, however, for so-called "embarrassingly parallel problems" — where each record can be processed on its own with no additional context.

The Map stage in a Map/Reduce job has a few extra details. It is responsible for labeling the processed records for assembly into context groups. Hadoop files each record into the equivalent of the pigmy elephants' file folders: an in-memory buffer holding each record in sorted order. There are two additional wrinkles, however, beyond what the pigmy elephants provide. First, the Combiner feature lets you optimize certain special cases by preprocessing partial context groups on the Map side; we will describe these more in a later chapter (TODO: REF). Second, if the sort buffer reaches or exceeds a total count or size threshold, its contents are "spilled" to disk and subsequently merge/sorted to produce the Mapper’s proper output.

SIDEBAR: What’s Fast At High Scale

Throughput and Cost for Compute Primitives — the "Numbers Every Programmer Should Know"
Cost to Host and Serve One Billion 1kB Records (1 TB)

The table at the right (TODO: REF) summarizes the 2013 values for Peter Norvig’s "Numbers Every Programmer Should Know."  — the length of time for each computation primitive on modern hardware. We’ve listed the figures several different ways: as latency (time to execute); as the number of 500-byte records that could be processed in an hour (TODO: day), if that operation were the performance bottleneck of your process; and as an amount of money to process one billion records of 500-byte each on commodity hardware. Big Data requires high volume, high throughput computing, so our principle bound is the speed at which data can be read from and stored to disk. What is remarkable is that with the current state of technology, most of the other operations are slammed to one limit or the other: either bountifully unconstraining or devastatingly slow. That lets us write down the following "rules for performance at scale:"

  • High throughput programs cannot run faster than x (TODO: Insert number)

  • Data can be streamed to and from disk at x GB per hour (x records per hour, y records per hour, z dollars per billion records) (TODO: insert numbers)

  • High throughput programs cannot run faster than that but not run an order of magnitude slower.

  • Data streams over the network at the same rate as disk.

  • Memory access is infinitely fast.

  • CPU is fast enough to not worry about except in the obvious cases where it is not.

  • Random access (seeking to individual records) on disk is unacceptably slow.

  • Network requests for data (anything involving a round trip) is infinitely slow.

  • Disk capacity is free.

  • CPU and network transfer costs are cheap.

  • Memory is expensive and cruelly finite. For most tasks, available memory is either all of your concern or none of your concern.

Now that you know how Hadoop moves data around, you can use these rules to explain its remarkable scalability.

  1. Mapper streams data from disk and spills it back to disk; cannot go faster than that.

  2. In between, your code processes the data

  3. If your unwinding proteins or multiplying matrices are otherwise CPU or memory bound, Hadoop at least will not get in your way; the typical Hadoop job can process records as fast as they are streamed.

  4. Spilled records are sent over the network and spilled back to disk; again, cannot go faster than that.

That leaves the big cost of most Hadoop jobs: the midstream merge-sort. Spilled blocks are merged in several passes (at the Reducer and sometimes at the Mapper) as follows. Hadoop begins streaming data from each of the spills in parallel. Under the covers, what this means is that the OS is handing off the contents of each spill as blocks of memory in sequence. It is able to bring all its cleverness to bear, scheduling disk access to keep the streams continually fed as rapidly as each is consumed.

Hadoop’s actions are fairly straightforward. Since the spills are each individually sorted, at every moment the next (lowest ordered) record to emit is guaranteed to be the next unread record from one of its streams. It continues in this way, eventually merging each of its inputs into an unbroken output stream to disk. The memory requirements — the number of parallel streams times the buffer size per stream — are manageable and the CPU burden is effectively nil, so the merge/sort as well runs at the speed of streaming to disk.

At no point does the Hadoop framework require a significant number of seeks on disk or requests over the network.

is individually sorted, the first (lowest ordered record) in the merged stream to emit is guaranteed to be the lowest ordered record in one of its input streams.

Introduce the chapter to the reader * take the strands from the last chapter, and show them braided together * in this chapter, you’ll learn …​. OR ok we’re done looking at that, now let’s xxx * Tie the chapter to the goals of the book, and weave in the larger themes * perspective, philosophy, what we’ll be working, a bit of repositioning, a bit opinionated, a bit personal.

Chimpanzee and Elephant Start a Business

A few years back, two friends — JT, a gruff silverback chimpanzee, and Nanette, a meticulous matriarch elephant — decided to start a business. As you know, Chimpanzees love nothing more than sitting at keyboards processing and generating text. Elephants have a prodigious ability to store and recall information, and will carry huge amounts of cargo with great determination. This combination of skills impressed a local publishing company enough to earn their first contract, so Chimpanzee and Elephant Corporation (C&E Corp for short) was born.

The publishing firm’s project was to translate the works of Shakespeare into every language known to man, so JT and Nanette devised the following scheme. Their crew set up a large number of cubicles, each with one elephant-sized desk and one or more chimp-sized desks, and a command center where JT and Nanette can coordinate the action. As with any high-scale system, each member of the team has a single responsibility to perform. The task of a chimpanzee is simply to read a set of passages, and type out the corresponding text in a new language. The cubicle’s librarian elephant maintains a neat set of scrolls, according to a scheme Nanette devised, with each scroll holding a passage to translate or some passage’s translated result.

JT acts as foreman for the chimpanzees. When each worker clocks in for the day, they check with JT, who hands off the day’s translation manual and the name of a passage to translate. Throughout the day, as each chimp completes their assigned passage, they radio in to JT, who names the next passage to translate. Nanette, meanwhile, serves as chief librarian. She keeps a card catalog that lists, for every book, the location and essential characteristics of the various scrolls that maintain its contents.

JT and Nanette work wonderfully together — JT rambunctiously barking orders, Nanette peacefully gardening her card catalog — and subtly improve the efficiency of their team in a variety of ways. We’ll look closely at their bag of tricks later in the book (TODO ref) but here are two. The most striking thing any visitor to the worksite will notice is how calm everything is. One reason for this is Nanette’s filing scheme, which designates each book passage to be stored by multiple elephants. Nanette quietly advises JT of each passage’s location, allowing him to almost always assign his chimpanzees a passage held by the librarian in their cubicle. In turn, when an elephant receives a freshly-translated scroll, she makes two photocopies and dispatches them to two other cubicles. The hallways contain a stately parade of pygmy elephants, each carrying an efficient load; the only traffic consists of photocopied scrolls to store and the occasional non-cubicle-local assignment.

The other source of calm is on the part of their clients, who know that when Nanette’s on the job, their archives are safe — the words of Shakespeare will retain their eternal form [1] To ensure that no passage is never lost, the librarians on Nanette’s team send regular reports on the scrolls they maintain. If ever an elephant doesn’t report in (whether it stepped out for an hour or left permanently), Nanette identifies the scrolls designated for that elephant and commissions the various librarians who hold other replicas of that scroll to make and dispatch fresh copies. Each scroll also bears a check of authenticity validating that photocopying, transferring its contents or even mouldering on the shelf has caused no loss of fidelity. Her librarians regularly recalculate those checks and include them in their reports, so if even a single letter on a scroll has been altered, Nanette can commission a new replica at once.

Map-only Jobs: Process Records Individually

We might not be as clever as JT’s multilingual chimpanzees, but even we can translate text into Pig Latin. For the unfamiliar, here’s how to translate standard English into Pig Latin:

  • If the word begins with a consonant-sounding letter or letters, move them to the end of the word adding "ay": "happy" becomes "appy-hay", "chimp" becomes "imp-chay" and "yes" becomes "es-yay".

  • In words that begin with a vowel, just append the syllable "way": "another" becomes "another-way", "elephant" becomes "elephant-way".

Pig Latin translator, actual version is our first Hadoop job, a program that translates plain text files into Pig Latin. It’s written in Wukong, a simple library to rapidly develop big data analyses. Like the chimpanzees, it is single-concern: there’s nothing in there about loading files, parallelism, network sockets or anything else. Yet you can run it over a text file from the commandline — or run it over petabytes on a cluster (should you for whatever reason have a petabyte of text crying out for pig-latinizing).

Pig Latin translator, actual version
    CONSONANTS   = "bcdfghjklmnpqrstvwxz"
    UPPERCASE_RE = /[A-Z]/
    PIG_LATIN_RE = %r{
      \b                  # word boundary
      ([#{CONSONANTS}]*)  # all initial consonants
      ([\w\']+)           # remaining wordlike characters
      }xi

    each_line do |line|
      latinized = line.gsub(PIG_LATIN_RE) do
        head, tail = [$1, $2]
        head       = 'w' if head.blank?
        tail.capitalize! if head =~ UPPERCASE_RE
        "#{tail}-#{head.downcase}ay"
      end
      yield(latinized)
    end
Pig Latin translator, pseudocode
    for each line,
      recognize each word in the line and change it as follows:
        separate the head consonants (if any) from the tail of the word
	if there were no initial consonants, use 'w' as the head
        give the tail the same capitalization as the word
        change the word to "{tail}-#{head}ay"
      end
      emit the latinized version of the line
    end
Ruby helper
  • The first few lines define "regular expressions" selecting the initial characters (if any) to move. Writing their names in ALL CAPS makes them be constants.

  • Wukong calls the each_line do …​ end block with each line; the |line| part puts it in the line variable.

  • the gsub ("globally substitute") statement calls its do …​ end block with each matched word, and replaces that word with the last line of the block.

  • yield(latinized) hands off the latinized string for wukong to output

It’s best to begin developing jobs locally on a subset of data. Run your Wukong script directly from your terminal’s commandline:

wu-local examples/text/pig_latin.rb data/magi.txt -

The - at the end tells wukong to send its results to standard out (STDOUT) rather than a file — you can pipe its output into other unix commands or Wukong scripts. In this case, there is no consumer and so the output should appear on your terminal screen. The last line should read:

Everywhere-way ey-thay are-way isest-way. Ey-thay are-way e-thay agi-may.

That’s what it looks like when a cat is feeding the program data; let’s see how it works when an elephant sets the pace.

Transfer Data to the Cluster

Note: this assumes you have a working Hadoop installation, however large or small, running in distributed mode. Appendix 1 (TODO REF) lists resources for acquiring one.

Hadoop jobs run best reading data from the Hadoop Distributed File System (HDFS). To copy the data onto the cluster, run these lines:

hadoop fs -mkdir ./data
hadoop fs -put   wukong_example_data/text ./data/

These commands understand ./data/text to be a path on the HDFS, not your local disk; the dot . is treated as your HDFS home directory (use it as you would ~ in Unix.). The wu-put command, which takes a list of local paths and copies them to the HDFS, treats its final argument as an HDFS path by default, and all the preceding paths as being local.

Run the Job on the Cluster

First, let’s test on the same tiny little file we used at the commandline.

wukong launch examples/text/pig_latin.rb ./data/text/magi.txt ./output/latinized_magi

TODO: something about what the reader can expect to see on screen

While the script outputs a bunch of happy robot-ese to your screen, open up the jobtracker in your browser window by visiting http://hostname_of_jobtracker:50030. The job should appear on the jobtracker window within a few seconds — likely in more time than the whole job took to complete. You will see (TODO describe jobtracker job overview).

You can compare its output to the earlier by running

hadoop fs -cat ./output/latinized_magi/\*

That command, like the Unix ‘cat’ command, dumps the contents of a file to standard out, so you can pipe it into any other command line utility. It produces the full contents of the file, which is what you would like for use within scripts but if your file is hundreds of MB large, as HDFS files typically are, dumping its entire contents to your terminal screen is ill appreciated. We typically, instead, use the Unix ‘head’ command to limit its output (in this case, to the first ten lines).

hadoop fs -cat ./output/latinized_magi/\* | head -n 10

Since you wouldn’t want to read a whole 10GB file just to see whether the right number of closing braces come at the end, there is also a hadoop fs -tail command that dumps the terminal one kilobyte of a file.

Here’s what the head and tail of your output should contain:

TODO screenshot of hadoop fs -cat ./output/latinized_magi/\* | head -n 10
TODO screenshot of hadoop fs -tail ./output/latinized_magi/\*

Map/Reduce

As a demonstration, let’s find out when aliens like to visit the planet earth. Here is a Wukong script to processes the UFO dataset to find the aggregate number of sightings per month:

DEFINE MODEL FOR INPUT RECORDS
MAPPER EXTRACTS MONTHS, EMITS MONTH AS KEY WITH NO VALUE
COUNTING REDUCER INCREMENTS ON EACH ENTRY IN GROUP AND EMITS TOTAL IN FINALIZED METHOD

To run the Wukong job, go into the (TODO: REF) directory and run

wu-run monthly_visit_counts.rb --reducers_count=1 /data_UFO_sightings.tsv /dataresults monthly_visit_counts-wu.tsv

The output shows (TODO:CODE: INSERT CONCLUSIONS).

Wikipedia Visitor Counts

Let’s put Pig to a sterner test. Here’s the script above, modified to run on the much-larger Wikipedia dataset and to assemble counts by hour, not month:

EDIT TODO modified script

See Progress and Results

EDIT Wikipedia visitor counts, summing values — not weather, not articles

Now let’s run it on a corpus large enough to show off the power of distributed computing. Shakespeare’s combined works are too small — at (TODO find size) even the prolific bard’s lifetime of work won’t make Hadoop break a sweat. Luckily, we’ve had a good slice of humanity typing thoughts into wikipedia for several years, and the corpus containing every single wikipedia article is enough to warrant Hadoop’s power (and tsoris [2]).

wukong launch examples/text/pig_latin.rb ./data/text/wikipedia/wp_articles ./output/latinized_wikipedia

TODO screenshot of output, and fix up filenames

This job will take quite a bit longer to run, giving us a chance to demonstrate how to monitor its progress. (If your cluster is so burly the job finishes in under a minute or so, quit bragging and supply enough duplicate copies of the input to grant you time.) In the center of the Job Tracker’s view of your job, there is a table listing, for Mappers and Reducers, the number of tasks pending (waiting to be run), running, complete, killed (terminated purposefully not by error) and failed (terminated due to failure).

The most important numbers to note are the number of running tasks (there should be some unless your job is finished or the cluster is congested) and the number of failed tasks (for a healthy job on a healthy cluster, there should never be any). Don’t worry about killed tasks; for reasons we’ll explain later on, it’s OK if a few appear late in a job. We will describe what to do when there are failing attempts later in the section on debugging Hadoop jobs (TODO: REF), but in this case, there shouldn’t be any. Clicking on the number of running Map tasks will take you to a window that lists all running attempts (and similarly for the other categories). On the completed tasks listing, note how long each attempt took; for the Amazon M3.xlarge machines we used, each attempt took about x seconds (TODO: correct time and machine size). There is a lot of information here, so we will pick this back up in chapter (TODO ref), but the most important indicator is that your attempts complete in a uniform and reasonable length of time. There could be good reasons why you might find task 00001 to still be running after five minutes while other attempts have been finishing in ten seconds, but if that’s not what you thought would happen you should dig deeper [3].

You should get in the habit of sanity-checking the number of tasks and the input and output sizes at each job phase for the jobs you write. In this case, the job should ultimately require x Map tasks, no Reduce tasks and on our x machine cluster, it completed in x minutes. For this input, there should be one Map task per HDFS block, x GB of input with the typical one-eighth GB block size, means there should be 8x Map tasks. Sanity checking the figure will help you flag cases where you ran on all the data rather than the one little slice you intended or vice versa; to cases where the data is organized inefficiently; or to deeper reasons that will require you to flip ahead to chapter (TODO: REF).

Annoyingly, the Job view does not directly display the Mapper input data, only the cumulative quantity of data per source, which is not always an exact match. Still, the figure for HDFS bytes read should closely match the size given by ‘Hadoop fs -du’ (TODO: add pads to command).

You can also estimate how large the output should be, using the "Gift of the Magi" sample we ran earlier (one of the benefits of first running in local mode). That job had an input size of x bytes and an output size of y bytes, for an expansion factor of z, and there is no reason to think the expansion factor on the whole Wikipedia corpus should be much different. In fact, dividing the HDFS bytes written by the HDFS bytes read line shows an expansion factor of q.

We cannot stress enough how important it is to validate that your scripts are doing what you think they are. The whole problem of Big Data is that it is impossible to see your data in its totality. You can spot-check your data, and you should, but without independent validations like these you’re vulnerable to a whole class of common defects. This habit — of validating your prediction of the job’s execution — is not a crutch offered to the beginner, unsure of what will occur; it is a best practice, observed most diligently by the expert, and one every practitioner should adopt.

The HDFS: Highly Durable Storage Optimized for Analytics

The HDFS, as we hope you’ve guessed, holds the same role within Hadoop that Nanette and her team of elephants do within C&E Corp. It ensures that your data is always available for use, never lost or degraded and organized to support efficient Map/Reduce jobs. Files are stored on the HDFS as blocks of limited size (128 MB is a common choice). Each block belongs to exactly one file; a file larger than the block size is stored in multiple blocks. The blocks are stored in cooked form as regular files on one of the Datanode’s regular volumes. (Hadoop’s decision to use regular files rather than attempting lower-level access to the disk, as many traditional databases do, helps make it remarkably portable, promotes reliability and plays to the strengths of the operating system’s finely-tuned access mechanisms.)

The HDFS typically stores multiple replicas of each block (three is the universal default, although you can adjust it per file), distributed across the cluster. Blocks within the same file may or may not share a Datanode but replicas never do (or they would not be replicas, would they?). The obvious reason for this replication is availability and durability — you can depend on finding a live Datanode for any block and you can depend that, if a Datanode goes down, a fresh replica can be readily produced.

JT and Nanette’s workflow illustrates the second benefit of replication: being able to “move the compute to the data, not [expensively] moving the data to the compute.” Multiple replicas give the Job Tracker enough options that it can dependably assign most tasks to be “Mapper-local.”

Like Nanette, the Namenode holds no data, only a sort of file allocation table (FAT), tracking for every file the checksum responsible Datanodes and other essential characteristics of each of its blocks. The Namenode depends on the Datanodes to report in regularly. Every three seconds, it sends a heartbeat — a lightweight notification saying, basically, "I’m still here!". On a longer timescale, each Datanode prepares a listing of the replicas it sees on disk along with a full checksum of each replica’s contents. Having the Datanode contact the Namenode is a good safeguard that it is operating regularly and with good connectivity. Conversely, the Namenode uses the heartbeat response as its opportunity to issue commands dening a struggling Datanode.

If, at any point, the Namenode finds a Datanode has not sent a heartbeat for several minutes, or if a block report shows missing or corrupted files, it will commission new copies of the affected blocks by issuing replication commands to other Datanodes as they heartbeat in.

A final prominent role the Namenode serves is to act as the public face of the HDFS. The ‘put’ and ‘get’ commands you just ran were Java programs that made network calls to the Namenode. There are API methods for the rest of the file system commands you would expect for use by that or any other low-level native client. You can also access its web interface, typically by visiting port 50070 (http://hostname.of.namenode:50070), which gives you the crude but effective ability to view its capacity, operational status and, for the very patient, inspect the contents of the HDFS.

Sitting behind the scenes is the often-misunderstood secondary Namenode; this is not, as its name implies and as you might hope, a hot standby for the Namenode. Unless you are using the “HA namenode” feature provided in later versions of Hadoop, if your Namenode goes down, your HDFS has gone down. All the secondary Namenode does is perform some essential internal bookkeeping. Apart from ensuring that it, like your Namenode, is always running happily and healthily, you do not need to know anything more about the second Namenode for now.

One last essential to note about the HDFS is that its contents are immutable. On a regular file system, every time you hit “save,” the application modifies the file in place — on Hadoop, no such thing is permitted. This is driven by the necessities of distributed computing at high scale but it is also the right thing to do. Data analysis should proceed by chaining reproducible syntheses of new beliefs from input data. If the actions you are applying change, so should the output. This casual consumption of hard drive resources can seem disturbing to those used to working within the constraints of a single machine, but the economics of data storage are clear; it costs $0.10 per GB per month at current commodity prices, or one-tenth that for archival storage, and at least $50 an hour for the analysts who will use it.

Possibly the biggest rookie mistake made by those new to Big Data is a tendency to economize on the amount of data they store; we will try to help you break that habit. You should be far more concerned with the amount of data you send over the network or to your CPU than with the amount of data you store and most of all, with the amount of time you spend deriving insight rather than acting on it. Checkpoint often, denormalize when reasonable and preserve the full provenance of your results.

We’ll spend the next few chapters introducing these core operations from the ground up. Let’s start by joining JT and Nannette with their next client.


1. When Nanette is not on the job, it’s a total meltdown — a story for much later in the book. But you’d be wise to always take extremely good care of the Nanettes in your life.
2. trouble and suffering
3. A good reason is that task 00001’s input file was compressed in a non-splittable format and is 40 times larger than the rest of the files. A bad reason is that task 00001 is trying to read from a failing-but-not-failed datanode, or has a corrupted record that is sending the XML parser into recursive hell. The good reasons you can always predict from the data itself; otherwise assume it’s a bad reason