# MapReduce
MapReduce is a programming model for data processing. MapReduce programs are inherently parallel, thus putting very large-scale data analysis into the hands of anyone with enough machines at their disposal.

## The problem with parallel processing
1. file sizes vary, so processing time is limited by the longest file. A better approache is to split data into equal sized chunks
2. combining the result from different processes
3. processing on mulitple machines brings on problems like coordination and reliability

## MapReduce: an overview
A MapReduce program need three things: a map function, a reduce function, and some code to run the job.

### Datatypes
Hadoop uses its own datatypes found in the `org.apache.hadoop.io` package.

### Context
A `Context` instance is mainly used to write the output.

### Mapper
The `Mapper` class is a generic type, with four formal type parameters that specify the input key, input value, output key, and output value types of the map function. The `map()` method is passed a key and a value.

### Reducer
Like `Mapper`, four formal type parameters are used to specify the input and output types, this time for the reduce function. Output from the `Mapper` are grouped by their keys before being sent to the input of the `Reducer`.

### Job
A Job object forms the specification of the job and gives you control over how the job is run. 
Usual setup:
- The `setJarByClass()` method tell hadoop to look for the relevant Jar files containing this class.
- The `addInputPath()` method adds input files/directories/file patterns.
- The `setOutputPath()` sets the output path. This directory shouldn't exist before the program is run.
- specify the map and reduce types to use via the `setMapperClass()` and `setReducerClass()` methods.
- The `setOutputKeyClass()` and `setOutputValueClass()` methods control the output types for the reduce function, and must match what the Reduce class produces. The map output types default to the same types, so they do not need to be set if the mapper produces the same types as the reducer (as it does in our case). However, if they are different, the map output types must be set using the `setMapOutputKeyClass()` and `setMapOutputValueClass()` methods.


## Example: A simple MapReduce program
this program takes temperature data as input and find the maximum temperature for each year.

`Mapper`:
```java
public class MaxTemperatureMapper
    extends Mapper<LongWritable, Text, Text, IntWritable> {

  private static final int MISSING = 9999;
  
  @Override
  public void map(LongWritable key, Text value, Context context)
      throws IOException, InterruptedException {
    
    String line = value.toString();
    String year = line.substring(15, 19);
    int airTemperature;
    if (line.charAt(87) == '+') { // parseInt doesn't like leading plus signs
      airTemperature = Integer.parseInt(line.substring(88, 92));
    } else {
      airTemperature = Integer.parseInt(line.substring(87, 92));
    }
    String quality = line.substring(92, 93);
    if (airTemperature != MISSING && quality.matches("[01459]")) {
      context.write(new Text(year), new IntWritable(airTemperature));
    }
  }
}
```

`Reducer`:
```java
public class MaxTemperatureReducer
    extends Reducer<Text, IntWritable, Text, IntWritable> {
  
  @Override
  public void reduce(Text key, Iterable<IntWritable> values, Context context)
      throws IOException, InterruptedException {
    
    int maxValue = Integer.MIN_VALUE;
    for (IntWritable value : values) {
      maxValue = Math.max(maxValue, value.get());
    }
    context.write(key, new IntWritable(maxValue));
  }
}
```

`Driver`:
```java
public static void main(String[] args) throws Exception {
    if (args.length != 2) {
      System.err.println("Usage: MaxTemperature <input path> <output path>");
      System.exit(-1);
    }
    
    Job job = new Job();
    job.setJarByClass(MaxTemperature.class);
    job.setJobName("Max temperature");

    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    
    job.setMapperClass(MaxTemperatureMapper.class);
    job.setReducerClass(MaxTemperatureReducer.class);

    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }
```

## Inner workings and scaling out
a MapReduce `Job` consists of the input data, the MapReduce program, and configuration information.

a `task` is a subdivision of a `job`. There are two types: *map tasks* and *reduce tasks*.

a `split` is a fixed-size piece of the input. Hadoop creates one map task for each split. For most jobs, a good split size tends to be the size of an HDFS block, which is 128 MB by default.

a `rack` is literally a rack of machines in a data centre. 

`data locality optimization`: 3 possibilities:
1. Hadoop tries to run the map task on a node where the input data resides in HDFS.
2. If this is not possible, look for a free node on the same rack
3. If this is not possible, look for a off the rack node - inter-rack network transfer.

> It should now be clear why the optimal split size is the same as the block size: it is the largest size of input that can be guaranteed to be stored on a single node. If the split spanned two blocks, it would be unlikely that any HDFS node stored both blocks, so some of the split would have to be transferred across the network to the node running the map task, which is clearly less efficient than running the whole map task using local data.

Map tasks write their output to the local disk, not to HDFS. Since this is *intermediate* output and storing it in HDFS with replication would be overkill.

The output of the reduce is normally stored in HDFS for reliability. 

<img src='./src/reducetask.png' />

The number of reduce tasks is not governed by the size of the input, but instead is specified independently. When there are multiple reducers, the map tasks partition their output, each creating one partition for each reduce task. There can be many keys (and their associated values) in each partition, but the records for any given key are all in a single partition. The partitioning can be controlled by a user-defined partitioning function, but normally the default partitioner—which buckets keys using a hash function—works very well.

The **"shuffle"** refers to the data flow between mapper and reducers. As seen in the above diagram. The MapReduce framework ensures that the keys are ordered, so we know that if a key is different from the previous one, we have moved into a new key group.

A **Combiner** can be specified to run on the map output. Using the max temperature example, We could use a combiner function that, just like the reduce function, finds the maximum temperature for each map output. The reduce function would then be called with (suppose 2 mappers were used):

    (1950, [20, 25])
    
instead of 

    (1950, [0, 20, 10, 25, 15])

Combiner in code:
```java
public class MaxTemperatureWithCombiner {

  public static void main(String[] args) throws Exception {

    Job job = new Job();
    
 ...
    job.setMapperClass(MaxTemperatureMapper.class);
    job.setCombinerClass(MaxTemperatureReducer.class);
    job.setReducerClass(MaxTemperatureReducer.class);

 ...
    
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }
}
```

Combiners are defined using the `Reducer` class and in this case has the same implementation as the `Reducer`.

## Hadoop Streaming
Hadoop provides an API to MapReduce that allows you to write your map and reduce functions in languages other than Java. Hadoop Streaming uses *Unix standard streams* as the interface between Hadoop and your program.

Map input data is passed over standard input to your map function, which processes it line by line and writes lines to standard output. A map output key-value pair is written as a single tab-delimited line. Input to the reduce function is in the same format—a tab-separated key-value pair—passed over standard input. The reduce function reads lines from standard input, which the framework guarantees are sorted by key, and writes its results to standard output.

```shell
% hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-*.jar \
  -input input/ncdc/sample.txt \
  -output output \
  -mapper ch02-mr-intro/src/main/ruby/max_temperature_map.rb \
  -reducer ch02-mr-intro/src/main/ruby/max_temperature_reduce.rb
```

Note also the use of -files, which we use when running Streaming programs on the cluster to ship the scripts to the cluster.

