Hadoop Framework for Data Processing
====


#### Outline
 * **Data processing in Hadoop**
   * Mapper, reducer, and partitioners
 * **Hadoop I/O**
   * InputFormat and OutputFormat
   * Readin sequence files
   * Compressing the output of mappers and reducers
 * **Chaining multiple map and reduce tasks**
 
## Hadoop Processing

Hadoop splits the input data among the mappers. Each mapper will generate (key,value) pairs, and the intermediate output of mappers is partitioned for the reducers and the partitions are written into disk (local disk to each mapper, not HDFS). 

#### Partitioner
When there are multiple reducers, the output of mappers need to be partitioned. There is a default partitioner that creates partitions by hashing the mappers' output keys:

```java
job.setPartitionerClass(HashPartitioner.class);

public class HashPartitioner<K,V> extends Partitioner<K,V> {
    public int getPartition(K ket, V value, int numReduceTasks) {
        return (key.hashcode() & Integer.MAX_VALUE) % numReduceTasks;
    }
}
```

### Example wordcount.java

The general scheme of a java program in hadoop is implemented in a main class, which the main class has a mapper class, a reducer class and a main function, as follows

<img src="hadoop-program.png" width=200></img>

```java
public class wordcount {
    public class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {...}
    
    public class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {...}
    
    public static void main(string[] args) throws Exception {
       ...
       ...
    }
}
```

## InputFormat (for mappers)

InputFormat is an abstract class to specify the way input records are defined in input data. By default, the input format is TextInputFormat where each record is a line and the input key is byte offset, and valye is contetn of the line.

```java
public abstract class InputFormat<K,V> {
    public abstract List<inputSplit> getSplit(JobContext context) throws Exception;
    
    public abstract RecordReader<K,V> {
        createRecordReader(inputSplit split,
                           TaskAttempt(Context context)) 
                           throws Exception;
    }
}
```

| **InputFormat** | **Description** |
|:--:|:--|
|TextInputFormat|Each line is a record; <br> *key:LongWritable*<br> *value:Text*|
|KeyValueTextInputFormat||
|SequenceFileInputFormat||
|NLineInputFormat||

Specifying InputFormat:
```java
public static void main(String[] args) throws Exception {
    Job job = new Job();
    ...
    job,setInputFormatClass(TextInputFormat.class);
}
```

## OutputFormat (for reducers)



## Hadoop Data Types for Keys & Values

* **WritableComparable** can be used for both keys & values
* **Writable** can be used for values

|Class|Description|
|:---:|:----:|
|BooleanWritable|Standard boolean writable|
|ByteWritable|a single byte|
|DoubleWritable|a double|
|FloatWritable|a float|
|IntWritable|an integer|
|LongWritable|a long|
|Text|to store text using UTF8 format|
|NullWritable|Placeholder when the key or value is not needed|

## Writing a Mapper Class

* **Context** objects are used to write the output of map() function