Skip to content

Latest commit

 

History

History
449 lines (286 loc) · 44.1 KB

03-map_reduce.asciidoc

File metadata and controls

449 lines (286 loc) · 44.1 KB

Chimpanzee and Elephant Save Christmas

In the previous chapter, you worked with the simple-as-possible Pig Latin script, which let you learn the mechanics of running Hadoop jobs, understand the essentials of the HDFS, and appreciate its scalability. It is an example of an "embarrassingly parallel" problem: each record could be processed individually, just as they were organized in the source files.

Hadoop’s real power comes from the ability to process data in context, using what’s known as the Map/Reduce paradigm. Every map/reduce job is a program with the same three phases. In the first phase, your program processes its input in any way you see fit, emitting labelled output records. In the second phase, Hadoop groups and sorts those records according to their labels. Finally, your program processes each group and Hadoop stores its output. That grouping-by-label part is where the magic lies: it ensures that no matter where the relevant records started, they arrive at the same place in a predictable manner, ready to be synthesized.

We’ll open the chapter with a straightforward example map/reduce program: aggregating records from a dataset of Unidentified Flying Object sightings to find out when UFOs are most likely to appear.

Next, we’ll outline how a map/reduce dataflow works — first with a physical analogy provided by our friends at Elephant and Chimpanzee inc, and then in moderate technical detail.

For two good reasons, we’re going to use very particular language whenever we discuss how to design a map/reduce dataflow. First, because it will help you reason by comparison as you meet more and more map/reduce patterns. The second reason is that those core patterns are not specific to the map/reduce paradigm. You’ll see them in different dress but with the same essentials when we dive into the Streaming Analytics paradigm (REF) later in the book, and then generalized into a conceptual model for distributed analytics in Chapter (REF). Using the terms from that conceptual model will help you see the essential core of these patterns throughout the book.

Summarizing UFO Sightings using Map/Reduce===

Santa Claus and his elves are busy year-round, but Santa’s flying reindeer have few responsibilities outside the holiday season. As flying objects themselves, they spend a good part of their multi-month break is spent pursuing their favorite hobby: UFOlogy (the study of Unidentified Flying Objects and the search for extraterrestrial civilization). So you can imagine how excited they were to learn about the data set of more than 60,000 documented UFO sightings we worked with in the last chapter.

Since sixty thousand sightings is much higher than a reindeer can count (only four hooves!), JT and Nanette occasionally earn a little karmic bonus with Santa Claus by helping the reindeer analyzing UFO data. We can do our part by helping our reindeer friends understand when, during the day, UFOs are most likely to be sighted.

UFO Sighting Data Model

The data model for a UFO sighting has fields for date of sighting and of report; human-entered location; duration; shape of craft; and eye-witness description.

	class SimpleUfoSighting
	  include Wu::Model
	  field :sighted_at,   Time
	  field :reported_at,  Time
	  field :shape,        Symbol
	  field :city, String
	  field :state, String
	  field :country, String
	  field :duration_str, String
	  field :location_str, String
	  field :description,  String
	end

Group the UFO Sightings by Time Bucket

The first request from the reindeer team is to organize the sightings into groups by the shape of craft, and to record how many sightings there are for each shape.

Mapper

In the Chimpanzee&Elephant world, a chimp had the following role:

  • reads and understand each letter

  • creates a new intermediate item having a label (the type of toy) and information about the toy (the work order)

  • hands it to the elephants for delivery to the elf responsible for making that toy type.

We’re going to write a Hadoop "mapper" that performs a similar purpose:

  • reads the raw data and parses it into a structured record

  • creates a new intermediate item having a label (the shape of craft) and information about the sighting (the original record).

  • hands it to Hadoop for delivery to the reducer responsible for that group

The program looks like this:

 	mapper(:count_ufo_shapes) do
  consumes UfoSighting, from: json
  #
  process do |ufo_sighting|      # for each record
    record = 1                   # create a dummy payload,
    label  = ufo_sighting.shape  # label with the shape,
           yield [label, record]        # and send it downstream for processing
  end
end

You can test the mapper on the commandline:

       $ cat ./data/geo/ufo_sightings/ufo_sightings-sample.json   |
    ./examples/geo/ufo_sightings/count_ufo_shapes.rb --map |
    head -n25 | wu-lign
 disk	   1972-06-16T05:00:00Z	1999-03-02T06:00:00Z	Provo (south of), UT     	disk     	several min.   	Str...
 sphere	   1999-03-02T06:00:00Z	1999-03-02T06:00:00Z	Dallas, TX              	sphere  	60 seconds     	Whi...
 triangle  1997-07-03T05:00:00Z	1999-03-09T06:00:00Z	Bochum (Germany),       	triangle	ca. 2min       	Tri...
 light	   1998-11-19T06:00:00Z	1998-11-19T06:00:00Z	Phoenix (west valley), AZ	light   	15mim          	Whi...
 triangle  1999-02-27T06:00:00Z	1999-02-27T06:00:00Z	San Diego, CA            	triangle	10 minutes     	cha...
 triangle  1997-09-15T05:00:00Z	1999-02-17T06:00:00Z	Wedgefield, SC             	triangle	15 min         	Tra...
...

The output is simply the partitioning label (UFO shape), followed by the attributes of the signing, separated by tabs. The framework uses the first field to group/sort by default; the rest is cargo.

Reducer

Just as the pygmy elephants transported work orders to elves' workbenches, Hadoop delivers each record to the 'reducer', the second stage of our job.

   reducer(:count_sightings) do
     def process_group(label, group)
count = 0
group.each do |record|           # on each record,
  count += 1                     #   increment the count
  yield record                   #   re-output the record
end                              #
yield ['# count:', label, count] # at end of group, summarize
     end
   end

The elf at each workbench saw a series of work orders, with the guarantee that a) work orders for each toy type are delivered together and in order; and b) this was the only workbench to receive work orders for that toy type.

Similarly, the reducer receives a series of records, grouped by label, with a guarantee that it is the unique processor for such records. All we have to do here is re-emit records as they come in, then add a line following each group with its count. We’ve put a '#' at the start of the summary lines, which lets you easily filter them.

Test the full mapper-sort-reducer stack from the commandline:

$ cat ./data/geo/ufo_sightings/ufo_sightings-sample.json      |
    ./examples/geo/ufo_sightings/count_ufo_shapes.rb --map    | sort |
    ./examples/geo/ufo_sightings/count_ufo_shapes.rb --reduce | wu-lign
1985-06-01T05:00:00Z	1999-01-14T06:00:00Z	North Tonawanda, NY  	chevron  	1 hr 	7 lights in a chevron shape not sure it was one object lighted or 7 s
1999-01-20T06:00:00Z	1999-01-31T06:00:00Z	Olney, IL            	chevron  	10 seconds        	Stargazing, saw a dimly lit V-shape coming overhaed from west t east,
1998-12-16T06:00:00Z	1998-12-16T06:00:00Z	Lubbock, TX          	chevron  	3 minutes         	Object southbound, displaying three white lights, slowed, hovered, qu
# count:	chevron	3
1999-01-16T06:00:00Z	1999-01-16T06:00:00Z	Deptford, NJ         	cigar    	2 Hours           	An aircraft of some type was seen in the sky with approximately five
# count:	cigar	1
1947-10-15T06:00:00Z	1999-02-25T06:00:00Z	Palmira,             	circle   	1 hour            	After a concert given in the small town of Palmira, Colombia,  a grou
1999-01-10T06:00:00Z	1999-01-11T06:00:00Z	Tyson's Corner, VA   	circle   	1 to 2 sec        	Bright green circularly shaped light moved downward and easterly thro
...

Secondary Sort: Extend UFO Sightings with Detailed Location Information

Close Encounters of the Reindeer Kind (pt 2)

Since our reindeer friends want to spend their summer months visiting the locations of various UFO sighting, they would like more information to help plan their trip. The Geonames dataset (REF) provides more than seven million well-described points of interest, so we can extend each UFO sighting whose location matches a populated place name with its longitude, latitude, population and more.

Your authors have additionally run the free-text locations — "Merrimac, WI" or "Newark, NJ (south of Garden State Pkwy)" — through a geolocation service to (where possible) add structured geographic information: longitude, latitude and so forth.

Put UFO Sightings And Places In Context By Location Name

When you are writing a Map/Reduce job, the first critical question is how to group the records in context for the Reducer to synthesize. In this case, we want to match every UFO sighting against the corresponding Geonames record with the same city, state and country, so the Mapper labels each record with those three fields. This ensures records with the same location name all are received by a single Reducer in a single group, just as we saw with toys sent to the same workbench or visits "sent" to the same time bucket. The Reducer will also need to know which records are sightings and which records are places, so we have extended the label with an "A" for places and a "B" for sightings. (You will see in a moment why we chose those letters.) While we are at it, we will also eliminate Geonames records that are not populated places.

(TODO code for UFO sighting geolocator mapper)
	class UfoSighting
	  include Wu::Model
	  field :sighted_at,   Time
	  field :reported_at,  Time
	  field :shape,        Symbol
	  field :city, String
	  field :state, String
	  field :country, String
	  field :duration_str, String
	  field :location_str, String
	  #
	  field :longitude,  Float
	  field :latitude,   Float
	  field :city,  String
	  field :region,  String
	  field :country,  String
	  field :population,  Integer
	  field :quadkey,    String
              #
	  field :description,  String
	end

Extend The UFO Sighting Records In Each Location Co-Group With Place Data

Building a toy involved selecting, first, the toy form, then each of the corresponding parts, so the elephants carrying toy forms stood at the head of the workbench next to all the parts carts. While the first part of the label (the partition key) defines how records are grouped, the remainder of the label (the sort key) describes how they are ordered within the group. Denoting places with an "A" and sightings with a "B" ensures our Reducer always first receives the place for a given location name followed by the sightings. For each group, the Reducer holds the place record in a temporary variable and appends the places fields to those of each sighting that follows. Iin the happy case where a group holds both place and sightings, the Reducer iterates over each sighting. There are many places that match no UFO sightings; these are discarded. There are some UFO sightings without reconcilable location data; we will hold onto those but leave the place fields blank. Even if these groups had been extremely large, this matching required no more memory overhead than the size of a place record.

Partitioning, Grouping and Sorting

As you’ve seen, the way that Hadoop forms groups is actually by sorting the records. It’s time now to clearly separate the three fundamental locality operations Hadoop performs for you:

  • 'partition':

    • data in the same partition must go to the same machine

  • 'group':

    • data in the same group must be in the same partition

  • 'sort':

the Elves' system is meant to evoke the liabilities of database and worker-queue based systems:

  • setup and teardown of workstation == using latency code for a throughput process

    • running the same code in a tight loop makes life easy for the CPU cache in low level languages…​

    • and makes it easy for the interpreter in high-level languages, especially JIT

  • swinging the mail claw out to retrieve next work order == latency of seek on disk

  • chimpanzees are dextrous == processing sorted data in RAM is very fast

  • elves pull work orders in sequence: The chimpanzees call this a "merge sort", and the elves' memory a "sort buffer"

Chimpanzee and Elephant Save Christmas (pt 1)

It was holiday time at the North Pole, and letters from little boys and little girls all over the world flooded in as they always do. But one year several years ago, the world had grown just a bit too much. The elves just could not keep up with the scale of requests — Christmas was in danger! Luckily, their friends at the Elephant & Chimpanzee Corporation were available to help. Packing their typewriters and good winter coats, JT, Nanette and the crew headed to the Santaplex, the headquarters for toy manufacture at the North Pole. Here’s what they found.

Letters Cannot be Stored with the Right Context for Toy-Making

As you know, each year children from every corner of the earth write to Santa to request toys, and Santa — knowing who’s been naughty and who’s been nice — strives to meet the wishes of every good little boy and girl who writes him. He employs a regular army of toymaker elves, each of whom specializes in certain kinds of toy: some elves make Action Figures and Dolls, others make Xylophones and Yo-Yos.

Under the elves' old system, as bags of mail arrived they were examined by an elven postal clerk and then hung from the branches of the Big Tree at the center of the Santaplex. Letters were organized on the tree according to the child’s town, as the shipping department has a critical need to organize toys by their final delivery schedule. But the toymaker elves must know what toys to make as well, and so for each letter a postal clerk recorded its Big Tree coordinates in a ledger that was organized by type of toy.

So to retrieve a letter, a doll-making elf would look under "Doll" in the ledger to find the next letter’s coordinates, then wait as teamster elves swung a big claw arm to retrieve it from the Big Tree. As JT readily observed, the mail couldn’t be organized both by toy type and also by delivery location, and so this ledger system was a necessary evil. "The next request for Lego is as likely to be from Cucamonga as from Novosibirsk, and letters can’t be pulled from the tree any faster than the crane arm can move!"

What’s worse, the size of Santa’s operation meant that the workbenches were very far from where letters came in. The hallways were clogged with frazzled elves running from Big Tree to workbench and back, spending as much effort requesting and retrieving letters as they did making toys. "Throughput, not Latency!" trumpeted Nanette. "For hauling heavy loads, you need a stately elephant parade, not a swarm of frazzled elves!"

Elf Workstations
Figure 1. The elves' workbenches are meticulous and neat.
Fetching the next letter to Santa
Figure 2. Little boys and girls' mail is less so.

Chimpanzees Process Letters into Labelled Toy Requests

In marched Chimpanzee and Elephant, Inc, and set up a finite number of chimpanzees at a finite number of typewriters, each with an elephant desk-mate.

Postal clerks still stored each letter on the Big Tree (allowing the legacy shipping system to continue unchanged), but now also handed off bags holding copies of the mail. As she did with the translation passages, Nanette distributed these mailbags across the desks just as they arrived. The overhead of recording each letter in the much-hated ledger was no more, and the hallways were no longer clogged with elves racing to and fro.

The chimps' job was to take letters one after another from a mailbag, and fill out a toyform for each request. A toyform has a prominent label showing the type of toy, and a body with all the information you’d expect: Name, Nice/Naughty Status, Location, and so forth. You can see some examples here:

# Good kids, generates a toy for Julia and a toy for her brother
       Deer SANTA                                     robot | type="optimus prime" recipient="Joe"
                                                      doll  | type="green hair"  recipient="Joe's sister Julia"
       I wood like a doll for me and
       and an optimus prime robot for my
brother joe
I have been good this year
love julia
# Spam, no action
Greetings to you Mr Claus, I came to know
of you in my search for a  reliable and
reputable person to handle a very confidential
business transaction, which involves the
transfer of a huge sum of money...
# Frank is not only a jerk but a Yankees fan. He will get coal.
HEY SANTA I WANT A YANKEES HAT AND NOT             coal  | type="anthracite" recipient="Frank" reason="doesn't like to read"
ANY DUMB BOOKS THIS YEAR
FRANK
---------------------------------------         # Spam, no action
Chimps read each letter
Letters become toyforms

The first note, from a very good girl who is thoughtful for her brother, creates two toyforms: one for Joe’s robot and one for Julia’s doll. The second note is spam, so it creates no toyforms, while the third one yields a toyform directing Santa to put coal in his stocking.

Pygmy Elephants Carry Each Toyform to the Appropriate Workbench

Here’s the new wrinkle on top of the system used in the translation project. Next to every desk now stood a line of pygmy elephants, each dressed in a capes that listed the types of toy it would deliver. Each desk had a pygmy elephant for Archery Kits and Dolls, another one for Xylophones and Yo-Yos, and so forth — matching the different specialties of toymaker elves.

As the chimpanzees would work through a mail bag, they’d place each toyform into the basket on the back of the pygmy elephant that matched its type. At the completion of a bag, the current line of elephants would march off to the workbenches, and behind them a new line of elephants would trundle into place. What fun!

toyforms go off in batches

Finally, the pygmy elephants would march through the now-quiet hallways to the toy shop floor, each reporting to the workbench that matched its toy types. So the Archery Kit/Doll workbench had a line of pygmy elephants, one for every Chimpanzee&Elephant desk; similarly the Xylophone/Yo-Yo workbench, and all the rest.

Toymaker elves now began producing a steady stream of toys, no longer constrained by the overhead of walking the hallway and waiting for Big-Tree retrieval on every toy.

Each toy at a unique station

Hadoop vs Traditional Databases

Fundamentally, the storage engine at the heart of a traditional relational database does two things: it holds all the records, and it maintains a set of indexes for lookups and other operations. To retrieve a record, it must consult the appropriate index to find the location of the record, then load it from the disk. This is very fast for record-by-record retrieval, but becomes cripplingly inefficient for general high-throughput access. If the records are stored by location and arrival time (as the mailbags were on the Big Tree), then there is no "locality of access" for records retrieved by, say, type of toy — records for Lego will be spread all across the disk. With traditional drives, the disk’s read head has to physically swing back and forth in a frenzy across the disk, and though the newer flash drives have smaller retrieval latency it’s still far too high for bulk operations.

What’s more, traditional database applications lend themselves very well to low-latency operations (such as rendering a webpage showing the toys you requested), but very poorly to high-throughput operations (such as requesting every single doll order in sequence). Unless you invest specific expertise and effort, you have little ability to organize requests for efficient retrieval. You either suffer a variety of non-locality and congestion based inefficiencies, or wind up with an application that caters to the database more than to its users. You can to a certain extent use the laws of economics to bend the laws of physics — as the commercial success of Oracle and Netezza show — but the finiteness of time, space and memory present an insoluble scaling problem for traditional databases.

Hadoop solves the scaling problem by not solving the data organization problem. Rather than insist that the data be organized and indexed as it’s written to disk, catering to every context that could be requested. Instead, it focuses purely on the throughput case. TODO explain disk is the new tape It takes X to seek but

The typical Hadoop operation streams large swaths of data The locality

The Map-Reduce Haiku

As you recall, the bargain that Map/Reduce proposes is that you agree to only write programs that fit this Haiku:

data flutters by
    elephants make sturdy piles
  context yields insight

More prosaically,

  1. process and label  — turn each input record into any number of labelled records

  2. sorted context groups — hadoop groups those records uniquely under each label, in a sorted order

  3. synthesize (process context groups)  — for each group, process its records in order; emit anything you want.

The trick lies in the 'group/sort' step: assigning the same label to two records in the 'label' step ensures that they will become local in the reduce step.

The machines in stage 1 ('label') are out of context. They see each record exactly once, but with no promises as to order, and no promises as to which one sees which record. We’ve 'moved the compute to the data', allowing each process to work quietly on the data in its work space.

As each pile of output products starts to accumulate, we can begin to group them. Every group is assigned to its own reducer. When a pile reaches a convenient size, it is shipped to the appropriate reducer while the mapper keeps working. Once the map finishes, we organize those piles for its reducer to process, each in proper order.

If you notice, the only time data moves from one machine to another is when the intermediate piles of data get shipped. Instead of monkeys flinging poo, we now have a dignified elephant parade conducted in concert with the efforts of our diligent workers.

Hadoop’s Contract

Hadoop imposes a few seemingly-strict constraints and provides a very few number of guarantees in return. As you’re starting to see, that simplicity provides great power and is not as confining as it seems. You can gain direct control over things like partitioning, input splits and input/output formats. We’ll touch on a very few of those, but for the most part this book concentrates on using Hadoop from the outside — (TODO: ref) Hadoop: The Definitive Guide covers this stuff (definitively).

The Mapper Guarantee

The contract Hadoop presents for a map task is simple, because there isn’t much of one. Each mapper will get a continuous slice (or all) of some file, split at record boundaries, and in order within the file. You won’t get lines from another input file, no matter how short any file is; you won’t get partial records; and though you have no control over the processing order of chunks ("file splits"), within a file split all the records are in the same order as in the original file.

For a job with no reducer — a "mapper-only" job — you can then output anything you like; it is written straight to disk. For a Wukong job with a reducer, your output should be tab-delimited data, one record per line. You can designate the fields to use for the partition key, the sort key and the group key. (By default, the first field is used for all three.)

The typical job turns each input record into zero, one or many records in a predictable manner, but such decorum is not required by Hadoop. You can read in lines from Shakespeare and emit digits of pi; read in all input records, ignore them and emit nothing; or boot into an Atari 2600 emulator, publish the host and port and start playing Pac-Man. Less frivolously: you can accept URLs or filenames (local or HDFS) and emit their contents; accept a small number of simulation parameters and start a Monte Carlo simulation; or accept a database query, issue it against a datastore and emit each result.

The Group/Sort Guarantee

When Hadoop does the group/sort, it establishes the following guarantee for the data that arrives at the reducer:

  • each labelled record belongs to exactly one sorted group;

  • each group is processed by exactly one reducer;

  • groups are sorted lexically by the chosen group key;

  • and records are further sorted lexically by the chosen sort key.

It’s very important that you understand what that unlocks, so I’m going to redundantly spell it out a few different ways:

  • Each mapper-output record goes to exactly one reducer, solely determined by its key.

  • If several records have the same key, they will all go to the same reducer.

  • From the reducer’s perspective, if it sees any element of a group it will see all elements of the group.

You should typically think in terms of groups and not about the whole reduce set: imagine each partition is sent to its own reducer. It’s important to know, however, that each reducer typically sees multiple partitions. (Since it’s more efficient to process large batches, a certain number of reducer processes are started on each machine. This is in contrast to the mappers, who run one task per input split.) Unless you take special measures, the partitions are distributed arbitrarily among the reducers [1]. They are fed to the reducer in order by key.

Similar to a mapper-only task, your reducer can output anything you like, in any format you like. It’s typical to output structured records of the same or different shape, but you’re free engage in any of the shenanigans listed above.

Elephant and Chimpanzee Save Christmas pt 2: A Critical Bottleneck Emerges===

After a day or two of the new toyform process, Mrs. Claus reported dismaying news. Even though productivity was much improved over the Big-Tree system, it wasn’t going to be enough to hit the Christmas deadline.

The problem was plain to see. Repeatedly throughout the day, workbenches would run out of parts for the toys they were making. The dramatically-improved efficiency of order handling, and the large built-up backlog of orders, far outstripped what the toy parts warehouse could supply. Various workbenches were clogged with Jack-in-the-boxes awaiting springs, number blocks awaiting paint and the like. Tempers were running high, and the hallways became clogged again with overloaded parts carts careening off each other. JT and Nanette filled several whiteboards with proposed schemes, but none of them felt right.

To clear his mind, JT wandered over to the reindeer ready room, eager to join in the cutthroat games of poker Rudolph and his pals regularly ran. During a break in the action, JT found himself idly sorting out the deck of cards by number, as you do to check that it is a regular deck of 52. (With reindeer, you never know when an extra ace or three will inexplicably appear at the table). As he did so, something in his mind flashed back to the unfinished toys on the assembly floor: mounds of number blocks, stacks of Jack-in-the-boxes, rows of dolls. Sorting the cards by number had naturally organized them into groups by kind as well: he saw all the numbers in blocks in a run, followed by all the jacks, then the queens and the kings and the aces.

"Sorting is equivalent to grouping!" he exclaimed to the reindeers' puzzlement. "Sorry, boys, you’ll have to deal me out," he said, as he ran off to find Nanette.

The next day, they made several changes to the toy-making workflow. First, they set up a delegation of elvish parts clerks at desks behind the letter-writing chimpanzees, directing the chimps to hand a carbon copy of each toy form to a parts clerk as well. On receipt of a toy form, each parts clerk would write out a set of tickets, one for each part in that toy, and note on the ticket the ID of its toyform. These tickets were then dispatched by pygmy elephant to the corresponding section of the parts warehouse to be retrieved from the shelves.

Now, here is the truly ingenious part that JT struck upon that night. Before, the chimpanzees placed their toy forms onto the back of each pygmy elephant in no particular order. JT replaced these baskets with standing file folders — the kind you might see on an organized person’s desk. He directed the chimpanzees to insert each toy form into the file folder according to the alphabetical order of its ID. (Chimpanzees are exceedingly dextrous, so this did not appreciably impact their speed.) Meanwhile, at the parts warehouse Nanette directed a crew of elvish carpenters to add a clever set of movable set of frames to each of the part carts. She similarly prompted the parts pickers to put each cart’s parts in the place properly preserving the alphabetical order of their toyform IDs.

paper sorter

//// Perhaps a smaller sizing for the image? Amy////

After a double shift that night by the parts department and the chimpanzees, the toymakers arrived in the morning to find, next to each workbench, the pygmy elephants with their toy forms and a set of carts from each warehouse section holding the parts they’d need. As work proceeded, a sense of joy and relief soon spread across the shop.

The elves were now producing a steady stream of toys as fast as their hammers could fly, with an economy of motion they’d never experienced. Since both the parts and the toy forms were in the same order by toyform ID, as the toymakers would pull the next toy form from the file they would always find the parts for it first at hand. Pull the toy form for a wooden toy train and you would find a train chassis next in the chassis cart, small wooden wheels next in the wheel cart, and magnetic bumpers next in the small parts cart. Pull the toy form for a rolling duck on a string, and you would find instead, a duck chassis, large wooden wheels and a length of string at the head of their respective carts.

Not only did work now proceed with an unbroken swing, but the previously cluttered workbenches were now clear — their only contents were the parts immediately required to assemble the next toy. This space efficiency let Santa pull in extra temporary workers from the elves' Rivendale branch, who were bored with fighting orcs and excited to help out.

Toys were soon coming off the line at a tremendous pace, far exceeding what the elves had ever been able to achieve. By the second day of the new system, Mrs. Claus excitedly reported the news everyone was hoping to hear: they were fully on track to hit the Christmas Eve deadline!

And that’s the story of how Elephant and Chimpanzee saved Christmas.

How Hadoop Manages Midstream Data

The first part of this chapter (TODO: REF) described the contract Hadoop supplies to a Reducer: each record is sent to exactly one reducer; all records with a given label are sent to the same Reducer; and all records for a label are delivered in a continuous ordered group. Let’s understand the remarkably economical motion of data Hadoop uses to accomplish this.

Mappers Spill Data In Sorted Chunks

As your Map task produces each labeled record, Hadoop inserts it into a memory buffer according to its order. Like the dextrous chimpanzee, the current performance of CPU and memory means this initial ordering imposes negligible overhead compared to the rate that data can be read and processed. When the Map task concludes or that memory buffer fills, its contents are flushed as a stream to disk. The typical Map task operates on a single HDFS block and produces an output size not much larger. A well-configured Hadoop cluster sets the sort buffer size accordingly (FOOTNOTE: The chapter on Hadoop Tuning For The Brave And Foolish (TODO: REF) shows you how); that most common case produces only a single spill.

If there are multiple spills, Hadoop performs the additional action of merge/sorting the chunks into a single spill. (FOOTNOTE: This can be somewhat expensive, so in Chapter (TODO: REF), we will show you how to avoid unnecessary spills.) Whereas the pygmy elephants each belonged to a distinct workbench, a Hadoop Mapper produces only that one unified spill. That’s ok — it is easy enough for Hadoop to direct the records as each is sent to its Reducer.

As you know, each record is sent to exactly one Reducer. The label for each record actually consists of two important parts: the partition key that determines which Reducer the record belongs to, and the sort key, which groups and orders those records within the Reducer’s input stream. You will notice that, in the programs we have written, we only had to supply the record’s natural label and never had to designate a specific Reducer; Hadoop handles this for you by applying a partitioner to the key.

Partitioners Assign Each Record To A Reducer By Label

The default partitioner, which we find meets almost all our needs, is called the "RandomPartitioner." (FOOTNOTE: In the next chapter (TODO: REF), you will meet another partitioner, when you learn how to do a total sort.) It aims to distribute records uniformly across the Reducers by giving each key the same chance to land on any given Reducer. It is not really random in the sense of nondeterministic; running the same job with the same configuration will distribute records the same way. Rather, it achieves a uniform distribution of keys by generating a cryptographic digest — a number produced from the key with the property that any change to that key would instead produce an arbitrarily distinct number. Since the numbers thus produced have high and uniform distribution, the digest MODULO the number of Reducers reliably balances the Reducer’s keys, no matter their raw shape and size. (FOOTNOTE: If you will recall, x MODULO y gives the remainder after dividing x and y. You can picture it as a clock with y hours on it: 15 MODULO 12 is 3; 4 MODULO 12 is 4; 12 MODULO 12 is 0).

NOTE The default partitioner aims to provide a balanced distribution of keys — which does not at all guarantee a uniform distribution of records ! If 40-percent of your friends have the last name Chimpanzee and 40-percent have the last name Elephant, running a Map/Reduce job on your address book, partitioned by last name, will send all the Chimpanzees to some Reducer and all the Elephants to some Reducer (and if you are unlucky, possibly even the same one). Those unlucky Reducers will struggle to process 80-percent of the data while the remaining Reducers race through their unfairly-small share of what is left. This situation is far more common and far more difficult to avoid than you might think, so large parts of this book’s intermediate chapters are, in effect, tricks to avoid that situation.

(TODO: Move merge/sort description here)

Playing with Partitions: Aggregate by

Here’s another version of the script to total wikipedia pageviews. We’ve modified the mapper to emit separate fields for the century, year, month, day and hour (you wouldn’t normally do this; we’re trying to prove a point). The reducer intends to aggregate the total pageviews across all pages by year and month: a count for December 2010, for January 2011, and so forth. We’ve also directed it to use twenty reducers, enough to illustrate a balanced distribution of reducer data.

Run the script on the subuniverse pageview data with --partition_keys=3 --sort_keys=3 (TODO check params), and you’ll see it use the first three keys (century/year/month) as both partition keys and sort keys. Each reducer’s output will tend to have months spread across all the years in the sample, and the data will be fairly evenly distributed across all the reducers. In our runs, the -00000 file held the months of (TODO insert observed months), while the -00001 file held the months of (TODO insert observed months); all the files were close to (TODO size) MB large. (TODO consider updating to "1,2,3" syntax, perhaps with a gratuitous randomizing field as well. If not, make sure wukong errors on a partition_keys larger than the sort_keys). Running with --partition_keys=3 --sort_keys=4 doesn’t change anything: the get_key method in this particular reducer only pays attention to the century/year/month, so the ordering within the month is irrelevant.

Running it instead with --partition_keys=2 --sort_keys=3 tells Hadoop to partition on the century/year, but do a secondary sort on the month as well. All records that share a century and year now go to the same reducer, while the reducers still see months as continuous chunks. Now there are only six (or fewer) reducers that receive data — all of 2008 goes to one reducer, similarly 2009, 2010, and the rest of the years in the dataset. In our runs, we saw years X and Y (TODO adjust reducer count to let us prove the point, insert numbers) land on the same reducer. This uneven distribution of data across the reducers should cause the job to take slightly longer than the first run. To push that point even farther, running with --partition_keys=1 --sort_keys=3 now partitions on the century — which all the records share. You’ll now see 19 reducers finish promptly following the last mapper, and the job should take nearly twenty times as long as with --partition_keys=3.

Finally, try running it with --partition_keys=4 --sort_keys=4, causing records to be partitioned by century/year/month/day. Now the days in a month will be spread across all the reducers: for December 2010, we saw -00000 receive X, Y and -00001 receive X, Y, Z; out of 20 reducers, X of them received records from that month (TODO insert numbers). Since our reducer class is coded to aggregate by century/year/month, each of those reducers prepared its own meaningless total pageview count for December 2010, each of them a fraction of the true value. You must always ensure that all the data you’ll combine in an aggregate lands on the same reducer.

Reducers Receive Sorted Chunks From Mappers

Partway through your job’s execution, you will notice its Reducers spring to life. Before each Map task concludes, it streams its final merged spill over the network to the appropriate Reducers (FOOTNOTE: Note that this communication is direct; it does not use the HDFS). Just as above, the Reducers file each record into a sort buffer, spills that buffer to disk as it fills and begins merge/sorting them once a threshold of spills is reached.

Whereas the numerous Map tasks typically skate by with a single spill to disk, you are best off running a number of Reducers, the same as or smaller than the available slots. This generally leads to a much larger amount of data per Reducer and, thus, multiple spills.

Reducers Read Records With A Final Merge/Sort Pass

The Reducers do not need to merge all records to a single unified spill. The elves at each workbench pull directly from the limited number of parts carts as they work' similarly, once the number of mergeable spills is small enough, the Reducer begins processing records from those spills directly, each time choosing the next in sorted order.

Your program’s Reducer receives the records from each group in sorted order, outputting records as it goes. Your reducer can output as few or as many records as you like at any time: on the start or end of its run, on any record, or on the start or end of a group. It is not uncommon for a job to produce output the same size as or larger than its input — "Reducer" is a fairly poor choice of names. Those output records can also be of any size, shape or format; they do not have to resemble the input records, and they do not even have to be amenable to further Map/Reduce processing.

Reducers Write Output Data (Which May Cost More Than You Think)

As your Reducers emit records, they are streamed directly to the job output, typically the HDFS or S3. Since this occurs in parallel with reading and processing the data, the primary spill to the Datanode typically carries minimal added overhead. However, the data is simultaneously being replicated as well, which can extend your job’s runtime by more than you might think.

Let’s consider how data flows in a job intended to remove duplicate records: for example, processing 100 GB of data with one-percent duplicates, and writing output with replication factor three. As you’ll see when we describe the 'distinct' patterns in Chapter 5 (REF), the Reducer input is about the same size as the mapper input. Using what you now know, Hadoop moves roughly the following amount of data, largely in parallel:

  • 100 GB of Mapper input read from disk;

  • 100 GB spilled back to disk;

  • 100 GB of Reducer input sent and received over the network;

  • 100 GB of Reducer input spilled to disk

  • some amount of data merge/sorted to disk if your cluster size requires multiple passes;

  • 100 GB of Reducer output written to disk by the local Datanode;

  • 200 GB of replicated output sent over the network, received over the network and written to disk by the Datanode.

If your Datanode is backed by remote volumes (common in some virtual environments [2]), you’ll additionally incur

  • 300 GB sent over the network to the remote file store

As you can see, unless your cluster is undersized (producing significant merge/sort overhead), the cost of replicating the data rivals the cost of the rest of the job. The default replication factor is 3 for two very good reasons: it helps guarantee the permanence of your data and it allows the Job tracker to efficiently allocate Mapper-local tasks. But in certain cases — intermediate checkpoint data, scratch data or where backed by a remote file system with its own durability guarantee — an expert who appreciates the risk can choose to reduce the replication factor to 2 or 1.

You may wish to send your job’s output not to the HDFS or S3 but to a scalable database or other external data store. We will show an example of this in the chapter on HBase (REF), and there are a great many other output formats available. While your job is in development, though, it is typically best to write its output directly to the HDFS (perhaps at replication factor 1), then transfer it to the external target in a separate stage. The HDFS is generally the most efficient output target and the least likely to struggle under load. This checkpointing also encourages the best practice of sanity-checking your output and asking questions.


1. Using a "consistent hash"; see (TODO: ref) the chapter on Sampling
2. This may sound outrageous to traditional IT folk, but the advantages of elasticity are extremely powerful — we’ll outline the case for virtualized Hadoop in Chapter (REF)