## Hadoop: Definition
* Hadoop is an open source software platform, for **distributed storage** and **distributed processing** of very large dataset on computer clusters built from **commodity hardware**.
* Mapreduce is batch processing system and NOT suitable for interactive analysis. It is capable of running query on large dataset and get result in reasonable time.
* Open source project, based on Google's MapReduce and GFS (Google File System).
* Hadoop is Not good for,
    - Random access (Process transaction)
    - When work can not parallelized.
    - Not good for processing lots of small files
    - Not good for low latency data access
    - Not good for intensive calculation on little data.
* Node is simply a computer. Rack is collection of nodes that are physically located near by and all connected to same network switch. Cluster is collection of racks.
* Hadoop used to have 2 main component:
    - Distributed File System : HDFS
    - Map Reduce Engine:
        - Framework for performing calculation on data in file system. In Hadoop 1.0 it also act as resource manager and scheduler.

-------------------

## Comparison with other systems

* **Iterative processing**: ML algo are iterative in nature, so it is efficient to hold intermediate result in memory, compare to loading from disk each time. MapReduce does not allows this, Spark does.

* **Stream processing**: Storm, spark streaming allows us to run real time distributed computations on unbounded stream of data and emits result to Hadoop storage or external system.

* **Search** : Solr search can run on Hadoop clusters, indexing document as they are loaded to HDFS and serving search queries from indexes stored in HDFS.

#### Relational database management systems and MapReduce
* RDBMS is good for updating small portion of database B-tree works well. Because it is limited by seek time (Moving disk head to particular place at disk where we want to read or write data.) it is not suitable to update large amount of data. Mapreduce is good fit for that as it uses Sort/Merge to rebuild the database.
* Mapreduce is good when we want to analyze large portion of dataset. RDBMS is good for point queries and update.
* Mapreduce is good when data is written once and read many times, RDBMS is good where database is continuously updated.
* Hadoop is good for semi and unstructured data as it interpret data at processing time (schema on read). Whereas RDBMS is costly in data loading phase as it required structured data (Schema on write).
* Relational database is normalized for integrity and remove redundancy. In hadoop normalization make reading nonlocal operation.

#### Grid computing/ Volunteer Computing and MapReduce/Hadoop
* Grid computing is good for compute intensive task. We distribute work across cluster of machine, transferring data to cluster is limited by network bandwidth, so cluster can sit idle. Hadoop tries to co-locate the data with the compute nodes, so data access is fast because it is local. It is called data locality.
* In grid computing manually we have to take care of recovery and checkpoints, failed job etc.

----------------

## Hadoop based application

### Avro:
* Data structures within context of Hadoop MapReduce jobs.

### Giraph :
* Interactive graph processing using Hadoop

### Mahout:
* Framework for ML applications using Hadoop and spark

### Pig:
* Main components
    - Pig Latin : High level data flow language to write scripts. Program by connecting thing together. Ex. make relation LOAD, STORE etc.
    - Infrastructure layer: Translate pig latin to MR code.
* In built operators and functions. Used to Extract, Transform, Load operations. Manipulating and analyzing raw data.
* Pig latin, lets SQL like syntax to define map and reduce.. Highly extensible with user defined function.
* We can run using Grunt, script or Ambari/Hue. 

### HIVE
* Query and manage data using HiveQL
* Data warehouse software
* HiveQL is SQL like language to structure, query data from HDFS or HBase.
* Execution environment can be MapReduce, Tez, spark.

### Impala
* SQL to access data. Instead converting it to Map reduce like HIVE impala can access HDFS dierectly.
* Low latency query, faster than HIVE. HIVE is more suitable for long batch processing jobs.

### HBase
* key-value store that uses HDFS for its underlying storage.
* No-sql columnar database
* Scalable data stores
* Consistency, High availability: Range partition of keys across the server. Data is in HDFS so it gives you replication by default and automatic Sharding: Tables are distributed when they are large.
* It provides online read/write access for individual rows and batch operations for reading and writing data in bulk.

### Flume
* Service for moving large amount of data around the cluster soon after data is produced.
* Gathering log files from every machine in cluster
* Transferring data to centralized persistent store (HDFS)

### Sqoop
* SQL to Hadoop
* Transfer between Hadoop and relational database
* Use mapreduce to import and export data.
* Store data in HDFS as delimited file.

### Oozie
* Work flow scheduler
* Way to schedule job in cluster.
* Jobs is triggered by frequency or data availability.

### Ambari : 
* Gives view of cluster, status of nodes
* Lets you execute HIVE and PIG query too.

### Hue
* Similar to Ambari, which provides top level view to execute query on hadoop cluster.
* Graphical front end to cluster

### MESOS
* Alternative to YARN
* Resource negotiator.

### STORM
* Processing streaming data in real time

### Zookeeper
* Coordinate cluster.
* Keeping track of which node is up or down
* Maintain reliable and consistent state of cluster

### Apache drill
* Query engine. Allows to write sql query to query data from no-sql database.

###  Apache phoenix
* Allows to run SQL on hadoop cluster WITH ACID guarantee.

### Presto

### Zeppelin
* Notebook lie approach to access cluster.

-----------------

## Hadoop : Overview

### HDFS:
* Runs on top of existing file system on node. Design to tolerate high failure rate using replication of data.
* Works best on large file as no need to frequent seek for data on disk.
* Designed for sequential data access, as in such case at starting of each block seek is needed and after that it can read data in sequence. Random access is not efficient

### HDFS file blocks:
* It is not same as OS file blocks. Hadoop block = multiple OS block
* Default size is 128 MB. 
* Blocks for single file are replicated on multiple nodes.
* If chunk of file is smaller than block only needed space is used.
* We can set replication factor in Hadoop configuration and even by each file

* To make program parallel
    - We divide input file to different processors and run in parallel
    - Dividing is challenge, some machine finish processing early. So finish time is dominated by large file. Alternative is divide the file in fixed size chunk
    - Combine the result. When we split by chunk data is divided to several chunk and combining them is hard.
    - When we want to add distributed processing, we have to handle failure, monitor busy machines.
    - Hadoop takes care of everything.
    
![](images/mapReduce.jpg)

----------------------

## Reading and writing from/to HDFS
### Reading from HDFS
![](images/read.jpg)

* Client open the file it wishes to read by calling open() on FileSystem object, HDFS is instance of DistributedFileSystem. DistributedFileSystem calls the namenode using RPC to find location of first few blocks. For each block namenode return address of datanode. datanode are sorted via proximity to client. Client also can be the other datanode. DistributedFileSystem Returns FSDataInputStream to the client for it to read data from. Client calls read() on stream. Client connected to first datanode for the first block of file.When end of block reach DFSInputStream will close the connection to datanode, then find best datanode for next block (client does not know about it). At last client call close() on FSDataInputStream.
* DFSInputStream encounter error with datanode, it automatically switch to other replica and report the error to namenode.
* As  client directly connected to datanode for data, we can have multiple client accessing different data nodes in parallel.

### Distance between nodes
- Process on same node
- Different node in same rack
- Nodes on different racks in the same data center
- Nodes in different data center.
![](images/distance.jpg)

### Write to HDFS

![](images/write.jpg)
* Client create file by calling create() on DFS. DFS contact namenode via RPC, to create file in FS's namespace with no block. namenode check for duplicate file and permission for write. namenode make record of file. If fials client will get IOException. DFS return FSDataOutputStream for client to start wrting data to.FSDataOutputStream wraps DataStreamer which handle communication with datanode and namenode. data is splits in packet and written to data queue. It is cosumed by data streamer, which ask namenode to create new block by picking list of suitable datanode for replication. By default there are 3 node in pipeline. DataStreamer streams the packet to 1st datanode in the pipeline, which stores each packet and forward to the second datanode, second will forward to third.
* DFSOutputStream maintain internal queue of packets that are waiting to be ack by datanodes, called ack queue. Packet is removed from ack queue only when  ack received from all replica datanode.
* When client finish writing it calls close() on the stream.
* Hadoop put 1st replica on node same as client. Client running out of cluster, node is chosen at random. 2nd replica at different rack. 3rd is different node but same rack as 2nd.
![](images/replica.jpg)

--------

## MapReduce : Overview

* Block size can be changed on cluster or even defined for particular file
* A MapReduce job is a unit of work that the client wants to be performed: it consists of the input data, the MapReduce program, and configuration information. Hadoop runs the job by dividing it into tasks, of which there are two types: map tasks and reduce tasks. The tasks are scheduled using YARN and run on nodes in the cluster. If a task fails, it will be automatically rescheduled to run on a different node.

* For data locality optimization, Hadoop try to run mapper on the same node where data resides. If no node is available for mapping task where the data is, job scheduler will look for another node in same rack. If even there no node are available then go for search in other rack.
![](images/data_locality.jpg)

* Optimal split size for input for mapper is block size.
* Output of map task is stored on local disk NOT on HDFS.
* Reduce can NOT take benefit of data locality as multiple mapper sends their output to (single) reducer. Output of mappers are merged and passed to reducer. Output of reducer is stored on HDFS. For each HDFS block of reducer output 1st replica stored in local disk and others at off rack.
![](images/map_reduce_flow.jpg)

* Number of  reducer is NOT depends on input data, we have to specify it.
* When there are multiple reducers, the map tasks partition their output, each creating one partition for each reduce task. There can be many keys (and their associated values) in each partition, but the records for any given key are all in a single partition. The partitioning can be controlled by a user-defined partitioning function, but normally the default partitioner—which buckets keys using a hash function—works very well.
![](images/multiple_Reducer.jpg)


* It is possible to have only mapper and 0 reducers
![](images/mapper_no_reducer.jpg)

### Combiner
* To reduce bandwidth usage between mapper and reducer, we can specify combiner function which will run on mapper node. Combiner's output goes to reducer.
* Keep in mind, not always you can use combiner, think about mean. median problem.
[Program to find Max temperature](https://github.com/purvil/hadoop_java/blob/master/max_temp/MaxTemp.java)
* In Hadoop streaming we can define combiner with `-combiner combiner.py`


### Hadoop Streaming
* Using this API we can write MR program in any language as long as it can read standard i/p and write to standard o/p.
* Uses UNIX standard stream to communicate between program and hadoop.
* Map input data is passed over standard ip to mapper. Which process it line by line and write it to standard output. Output is tab delimited line.
* Typical java mapper, process input one at a time. Framework call Map() method of mapper for each record line in input. But in streaming entire input is given so mapper can decide how to process input. 
* In JAVA API we are given iterator for each key. In streaming we have to manually figure out boundary.
* number of part files in a MapReduce program's output directory is same as number of reducers.

---------------

## Hadoop Distributed File System (HDFS)
* File system which manage storage  across network of machines called distributed FS.
* Not good for low latency data access (use HBase). It is designed for high throughput data access. Read many write once principal.
* Not good when there are lots of small files. As nomenade has to store all the metadata for file and it takes significant memory. 150 bytes per block is needed for metadata. Furthermore number of checks with datanodes proportional to number of blocks, which can trigger network load. Larger number of smaller blocks, we need map task per block. 10 GB of data, 32k is file size => 327680 map tasks. we can not have such number of nodes and so lots of queued tasks. Large overhead of spin up/tear down for each task. Inefficient disk I/O with small sizes. SOlution is merge/concate files. sequences files. combinefileinputformat.
* Larger the file Hadoop spend less on seeking for the next data location. Seek are expensive operation.
* Write is made to end of file, in append only fashion. No support for arbitrary offset writing or multiple writing.

### Blocks
* Disk has block size which is min amount of data it can read or write.
* HDFS block are 128 MB by default.
* file size less than block, does not hold full 128 MB.
* Large HDFS block minimize seek time.
* File can be larger than single disk when divided as blocks.
* We can analyze disk usage easily, as we know the block size and will able to guess number of block in disk. Permission and other metadata of block are stored separately.
* Replication provide availability and fault tolerance. Each block is replicated over several nodes. To match replication level, if node is failed. Block resides in that failed node are replicated one more time.
* `hdfs fsck / -files -blocks` will list all blocks make up each file in file system. 
* Master-worker pattern: 2 kinds of nodes datanode and namenode. Namenode manage FS namespace. Maintains FS tree and metadata for all files and dir. On local disk it is stored as namespace image and edit log. It also knows where in which datanode block for files are stored. Block location is not in persistent storage as it can be reconstructed from data node on start.
* Data node store and retrieve node when they are told to. Periodically report Name node about block they have.
* If name node fails, we can not re generate file system
    - First way to make persistent copy, like NFS.
    - Secondary namenode: periodically merge image with edit log, to not to make edit log too large. It keeps copy of merged namespace image. But in the failure edit log is lost which is created after the large image creation. But we have backed up that on NFS. So, we will be fine.

#### Block caching:
* Block can be cached in data node's memory. By default each block can be cached in one data node. Application can tell namenode about which file to cache by adding cache directive to cache pool.

#### HDFS federation
* Allows multiple NameNodes with their own namespaces to share a pool of DataNodes.
* When there are too many files in system. namenode's disk can be small to store all metadata. In that case we can deploy 2 namenode which acts independently and manages partial file system namespace. Each datanode connected with both namenode and stores block from any.
* increased namespace scalability.
* If some application has high metadata use we can isolate it in its own namespace.
![](images/hadoop_federation.jpg)

#### HDFS high availability
* Combining NFS backup and storing checkpoint image of namenode can protect from data loss but does not increase availability. Namenode is the only machine to which we ask for file to block mapping.
* To start another namenode after failure takes time for loading namespace image, replay edit log from NFS, receive enough block report from datanode.
* To overcome in Haddop high availability mode there can be more than 1 active name node. If active fails, standalone take its responsibility.
    - NameNodes has to use highly available shared storage to share the edit log. When standby namenode comes up it reads up to the end of edit log to synchronize the state with active name node.
    - Datanodes must send block report to both as block mapping is stored in namenode's memory not disk.
* If namenode fails, standby take it fast as latest state is available in memory.
* In the case of fail we have to fence so that old name node which we think it is failed, does not handle any request. Fencing method can be block access to shared storage, turn off the power of namenode, disable network port.
* Uses JournalNodes to decide the active NameNode. number of journal nodes are always odd number.

#### Pseudo distributed mode
* single node cluster. namenode and datanode on same machine.
* set `fs.defaultFS = hdfs://localhost/` and `dfs.replication = 1` Default replication is 3.

#### Command line interface
* Also used below commands as `hdfd dfs`
* `hadoop fs -copyFromLocal localpath hdfspath` // -put
* `hadoop fs -copyToLocal hdfspath localspath` // -get
* `hadoop fs -mkdir dir_name`
* `hadoop fs -ls ` // returns replication factor too.
* `hadoop fs -help`
* `hadoop fs -usage <utility name>`
* `hadoop fs -du -h` // disk usage human readable
* `hadoop fs -df` consumed and available storage
* `hadoop fs -mkdir -p /level/dir_name` // create deep nested dir
* `hadoop fs -rm -r dir`
* To strictly delete it use `-skipTrash` command.
* `hadoop fs -touchz file.txt` // create 0 length file
* `hadoop fsck filename -files -blocks` // total size, total files, symlinks, total blocks, minimally replicated clocks, overly replicated blocks, under replicated blocks, default replication factor, avg block replication, number of data nodes, number of racks.
* `hadoop fsck -blockId blk_1073741825` // info about block using its id.
* `hadoop fs -tail filename` // shows last 1 KB of HDFS file on stdout.
* `hadoop fs -cat filename | head -4`
* `hadoop fs -cat filename | tail -4`
* `hadoopf fs -find /data/wiki -name '*part*'`
* `hadoop fs -chmod`
* `hadoop fdadmin -report` // overall report of cluster
* All fs shell commands takes path URI as argument (Universal Resource identifier)
    - `scheme://authority/path`
    - scheme for hdfs is hdfs
    - scheme for local fs is file
* `hadoop fs -cp file:///sampleData/spark/myfile.txt hdfs://rvm.svl.ibm.com:8020/user/spark/text/.`
* Default scheme and authority are in `core-site.xml`.
* `getMerge`: Gets all files in directories that match the source pattern. Merges and sorts them to only one file on local fs.
* `setRep` : sets replication factor of file. can specify to wait using `-w` until the replication level is achieved.

 * HDFS is just one of the FS that is compatible with hadoop abstract FS implementation `org.apache.hadoop.FileSystem`. There are other several FS implementation.
![](images/hadoop_fs.png)

* Even we can access local files and run mr on that, to access local from hadoop
* `hadoop fs -ls file:///`

In [None]:
### Coherency model
* Data visibility of reads and write of a file. Once 1 block is written, that block is visible to new readers. Only current block being written is not visible to readers.
* hflush() vs hsync()
    - hflush() Force all buffers to be flushed to the datanodes. HDFS guarantees that the data written up to that point in the file has reached all the datanode.
    - hflush() does not guarantee that data is written to disk. may be data only in memory of datanode. to gurantee that use hsync().
    
### distcp
* Copying data to and from HDFS in parallel. Behind the scene it uses 20 mappers to accomplish task. 

* `hdfs-site.xml` can be changed for, 
    - `dfs.blocksize` or `dfs.block.size` to change block size
    - `dfs.replication`
    - `dfs.datanode.handler.count(10)` : sets the number of server threads on each datanode
    - `dfs.namenode.fs-limits.max-blocks-per-file` Max number of blocks per file
* Namenode receives heartbeats from datanode. Datanode without recent heartbeat marked dead and no new IO sent. Blocks below replication factors re-replicated on other nodes.
* Checksum  computed on file creation, checksum stored in hdfs namenode and used to check retrieved data. if failed re-read from alternate replica. Ex. CRC-32 (32 bit cyclic redundancy check) Other variant is CRC-32C
* Separate checksum is created for every dfs.bytes-per-checksum bytes of data. Default is 512 bytes. Datanode varify checksum before storing data.
* If client detect error in checksum it will notify namenode that particular replica is corrupted. namenode will re replicate it from good replica. 
* `hadoop fs -checksum filename` to find chcksum. also varify that 2 files are same?
* Checksum provides DATA INTEGRITY.

* Original hadoop has issues like,
    - Centralized handling of job control flow
    - Tight coupling of a specific programming model with the resource management infrastructure.
    - Scalability due  to budy jobtracker. Only q job tracker keeps track of all task trackers. and map reduce jobs. Whereas task tracker only has to do map and reduce task.
 * YARN split up 2 major functionality of JobTracker. Resource management and job scheduling/monitoring
     - ResourceManager: Global resource manager and per node slave(nodemanager).Resourcemanager has complete power to assign resourced among all the applications in the system.  It has schedulter. 
     - ApplicationMaster: Per application application master is framework specific library and is tasked with negotiating  resources from resourcemanage and working with nodemanager to execute and manage task.
* YARN is scalable higher than v1 because there is 1 application master per job which can run on any cluster node. Separation of functionality allows individual operation to be improved. Resourcemanager ONLY handle scheduling.
* Multi-tenancy: YARN allows multiple access engines to use hadoop as the commin standard for batch, interactive and real time engine that can simultaneously access the same data set.
* YARN uses shared pool of nodes for all jobs. Its like overloading in object orriented. every engine can use hadoop in different way.
* YARN has high network utilization. Resource not used by one framework can be consumed by another.
* Nodemanager is more generic than Tasktracker. TaskTracker has fixed number of map and reduce slot.  NodeManager has a number of dynamically create containers. Sized of container is amount of resources assigned to it like memory, CPU and network IO.
* Resourcemanager keeps track of live nodemanagers and available resources. Allocate resources to applications and tasks. Monitor application master.
* Nodemanager provides computation resources in form of container. Manage processes running in containers.
* Application master coordinates execution of task within its application. Ask for resource containers to execute task.
* Container can run different type of task, application master and are of different size.
* Client can submit any type of application supported by YARN.
![](images/YARN2.png)
* Application connects to cluster and wants to start YARN job. Resource manager is contacted, resource manager launches application manager that will be responsible for this application(job). Application manager will ended when job will complete.
* Application manager needs some resources, so it requests that from resourceManager. Resourcemanager replies with container id , which are allocated to this job. Applicationmanager works with container, to manage it.
* Application master monitors mapper and reducer on each partition

## YARN
* Yet another resource negotiator.
* Hadoop's cluster resource management system.
* Provides API for requesting and working with cluster resources.
* Many distributed computing framework can run on YARN and cluster storage layer.
![](images/yarn.jpg)

* YARN has resource manager (1 per cluster) to manage the use of resources across the cluster. And a node manager running on all nodes in the cluster to launch and monitor containers. Container execute application specific process with constrained set of resources.
![](images/yarn1.jpg)
* To run application on YARN client contact resource manager. Resource manager then finds a node manager that can launch the application master in a container.
* Application master running in container can compute result and return to client or ask for more resources and run computation distributed.
* Application can request set of containers which represent computer resources and also specify locality constraints.
* Locality is expressed as same rack, specific node or anywhere in cluster.
* Spark application request all resources upfront.
* MapReduce on other hand, map task resources are requested upfront, reduce task resources are requested later.

* YARN application's lifespan can be,
    - 1 application per user job (MapReduce)
    - 1 application per session of user. useful to cache intermediate result between jobs. (Spark)
    - long running application like Impala daemon, which can be shared by different users. It has low latency so that request can be answered fast as no need of overhead to start application master.
    
![](images/mr1_YARN.jpg)
* YARN's main goal is to separate resource management and job scheduling. There is 1 global resource manager. Node manager on each node. ApplicationMaster one for each application.
* ApllicationMaster does job management and resource manager manages resources.
* high availability resourcemanager : stand by resource manager

#### YARN schedulers
* FIFO, Capacity and fair schedulers.
* Fairshare: balance resource share across the application over time
* Capacity : Guaranteed capacity for each application
![](images/fairshare.jpg)

--------

In [None]:
### Data model
* What data element are, what domain they come from, how different elements relate to each other, what they are composed of.
* Relational data model. Table, row, column.
* Graph data model: Vertices(entities) and edges (relation)
* Data model defines structure of data.

### File format
* Defines physical data layout
* csv, json, xls.
* Defines how to transfer row bytes to programmable data structure and vice versa. (serialization and deserialization)
* Space efficient, supported data types, encoding and decoding, splittable and monolithic structure (reading subset of data without reading entire file).

#### CSV, TSV
* bad space efficient
* good encoding and decoding
* only string is supported
* splittable without header

#### JSON
* Java Script object notation
* Not space efficient, includes field name
* speedy encoding, decoding
* supports maps, list, boolean, number, strings. Elements boiler plate serialization and deserialization code.

#### XML

### Binary format
#### SequenceFile
* Fist binary format in hadoop. Stores sequence of key and value. java serialization/deserialization

#### Avro
* Both format and support library
* Stores object defined by schema, Specifies field name, type and aliasis. Support for language other than java

#### RCFile (Record columnar)
* Row based and column based format

#### Parquet
* Columnar format
* Supports nested and repeated data

### Compression
* In CPU bound process compression is not useful. If program is IO bound use compression.

### Compression
* Saves disk usage and network bandwidth
* -1 means optimized for speed, -9 means optimized for space.
* `gzip -1 file`. gzip sits in between time/space trade offs. bzip2 compresses more effectively, but slower. LZO, LZ4 and snappy optimized for speed.
* Splittable means we can seek to any point in stream and start reading from that point.
* Codec is implementation of the compression and decompression algo.
* If compression is not splittable then we can not create split for map task.
* gzip use DEFLATE to store compressed data, DEFLATE stores data as series of compressed blocks and there is no way to distinguish start of each block. bzip2 support splitting (48 bit approximation of pi is used to show different blocks start point ).
![](images/compression.jpg)
![](images/compression1.jpg)

```
-D mapreduce.compress.map.output=true
-D mapreduce.map.output.compression.codec=

-D mapreduce.output.compress=true
-D mapreduce.output.compression.codec=
```
* Container fiole format like sequenceFile, ORCFile, Avro support compression and splitting To use gzip, split the file in chunks and compress each.
* Store very large file in splittable compressed format.
* To store map reduce output in compressed form in job configuration se,
```
mapreduce.output.fileoutputformat.compress = true
mapreduce.output.fileoutputformat.compress.codec = classname_compression_codec
```
* or set
```
FileOutputFormat.setCompressOutput(job, true);
FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
```
![](images/compression_property.jpg)

* Even intermidiate map data can be compressed, we can use fast compression method. 
![](images/map_compression.jpg)

```
Configuration conf = new Configuration();
conf.setBoolean(Job.MAP_OUTPUT_COMPRESS, true);
conf.setClass(Job.MAP_OUTPUT_COMPRESS_CODEC, GzipCodec.class,
CompressionCodec.class);
Job job = new Job(conf);
```

## Serialization
* Turning structured object into byte stream for transmission over a network. It is used in IPC and persistent storage.
* It reduces the network bandwidth usage.
* Hadoop use Writables as serialization format.

### IntWritable:

```
IntWritable  writable = new IntWritable();
writable.set(163);
or
IntWritable writable = new Intwritable(163);

wrritable.get() // return 163
```
* IntWritable implements WritableComparable interface, which is subinterface of the Writable and java.lang.Comparable. This allows to compare them without deserializing. 

```
RawComparator<IntWritable> comparator =
WritableComparator.get(IntWritable.class);

IntWritable w1 = new IntWritable(163);
IntWritable w2 = new IntWritable(67);
assertThat(comparator.compare(w1, w2), greaterThan(0));
```

![](images/writable_type.jpg)


### Text 
* Writable for UTF-8 sequence.

```
Text t = new Text("hadoop");
t.getLength()
t.charAt(2) // returns integer representing Unicode code point

t.find("do") // returns 2 same as indexOf()

```


![](images/map-reduce.png)

* Mapper class is generic type, with 4 args of map function's
    - input key type
    - input val type
    - output key type
    - output val type
    
```
public class MyMapper extends Mapper<LongWritable, Text, Text, IntWritable> // ip key, ip val, op key, op val type of map function
{
    // override map
    public void map(Longwritable key, Text val, Context context) throws IOException
    {
        <logic>
        context.write(new Text(key_str), new IntWriteable(val_int)); // collect output
    }
}
```

* Instance of `Context` is used to write the output to.

### Writable
* In MR types are like IntWritable, Text.
* Writable is an interface in Hadoop.
* It acts as a wrapper to primitive data types of Java.
* All MR data types must implement writable interface
![](images/writable.jpg)
* `org.apache.hadoop.io` has these types

## Map Reduce in detail
* Components in hadoop are configured using Hadoop's own configuration API. An instance of configuration class (in org.apache.hadoop.conf package) represent config property and values.
* It reads the property from config xml file.
```
<?xml version="1.0"?>
    <configuration>
        <property>
            <name>color</name>
            <value>yellow</value>
            <description>Color</description>
        </property>
        <property>
            <name>size</name>
            <value>10</value>
            <description>Size</description>
        </property>
        <property>
            <name>weight</name>
            <value>heavy</value>
            <final>true</final>  // can not be overridden in subsequent definition
            <description>Weight</description>
        </property>
        <property>
            <name>size-weight</name>
            <value>${size},${weight}</value> // config as other property, expanded using config.
            <description>Size and weight</description>
        </property>
    </configuration>
```
* Above file  is saved as `config-1.xml`.

```
Configuration conf = new Configuration();
conf.addResource("config-1.xml");
conf.get("color");
conf.getInt("size", 0);
conf.get("breadth", "wide")' // defualt value is wide
```
* Overriding from command line,
```
-Dproperty=value
```

* Config defined in resources that are added later override the earlier definitions.
* For unit test we can use junit and to write mapreduce test mrunit

* We can run Mapreduce job by calling submit() on Job object.

![](images/map_reduce.jpg)

* Client submit MR job. YARN resource manager, coordinate the allocation of computer resources on cluster. YARN node manager launch and monitor computer containers on machine in cluster. MR application master coordinates tasks running the MR job.MR master and MR task both run in containers that are scheduled by the resource manager and managed by node manager.
* HDFS is used for sharing job files between other entities.

- submit() on job create internal JobSubmit instance and call submitJobInternal() on it. waitForCompletion() polls job's progress once per second and reports the progress to the console if something is changed.
- JobSubmitter ask resource manager for new application ID, uses as MR job id.
* Check output specification of job, if output die exist, error
* Compute input splits for job. input path wrong,error.
* Copies resource needed to run job, JAR file, config file, input split in shared FS to dir name after job id. (step 3)
* Call submitApplication() on resource manager.

* Resource manager hands on request to YARN scheduler. Scheduler allocates container and RM, launches application master there, under node manager's management.
* Applications master is java application with main class MRAppMaster. It initialize job by number of jobkeeping expense by keeping track of job progress, it receive progress and completion report from task.Retrieve input split and create map task for each split and number of reducer job according to mapreduce.job.reduces (set by setNumReduceTasks() on Job) Each task is given id.
* Application master decide how to run task, if job is small it will be run on same container as master to save container creation overhead. Such job is called uberized.
* Small job is with less than 10 mappers and 1 reducers. Input size is less than 1 hdfs block. It can change by mapreduce.job.ubertask.maxmaps, mapreduce.job.ubertask.maxreduces, and map
reduce.job.ubertask.maxbytes.

* It will call setupJob() on OutputCommitter. FileOutputCommitter is default which create output directory and temp working place for task output.
* IF uber job is not possible, then master request resource manager for containers for each map and reduce tasks. All map task must be completed before reduce can start so container for reduce task is requested when 5% of map finishes. Scheduler try to honor data loacality advantage to map task. Runninb on the same node where split resides, same rack.
* Total memory, virtual cores for task is controlled by mapreduce.map.memory.mb, mapreduce.reduce.memory.mb, mapreduce.map.cpu, vcores and mapreduce.reduce.cpu.vcores.

* Master  starts container by contacting name node where container resides. Task will localize resources which it needs (job config and JAR, distributed cache)
* Then it run map or reduce task.

#### Streaming
* Streaming runs special map and reduce task for the purpose of launching executable supplied by user.
* Streaming task communicates with process using std ip and std op stream. Java process passes ip key-value pairs to external process. External process pass op key-value pair to Java process.
![](images/streaming.jpg) 

* When master receives notification of last task that it is completed, master changes job status to successful.  Application master and task container clean up working state and intermediate output.

* Master will reschedule execution of failed task.
* Task is failed when map or run throw runtime exception, streaming task exits with non zero exit code, or because of JVM bug. By default task is retried 4 times (controlled by mapreduce.map.maxattemots or  mapreduce.reduce.maxattempts). If task failed after 4 times whole job fails. We can set tolerance percentage of failed task using mapreduce.map.failures.maxpercent or mapreduce.reduce.failures.maxpercent
* Speculative duplicate task are killed or when node manager is failed then master mark all tasks running on that node manager killed. Killed task are not counted towards attempted retry.

* Even job are given retry attempts controlled by "mapreduce.am.max-attempts". default is 2. Also make sure yarn.resourcemanager.am.max-attempts are more or equal.
* Master sends periodic heart beat to resource manager. In the event of application master failure, resource manager will detect and start new instance of master in new container.
* Client poll master for update periodically, it has master's address cached given by resource manager in the beginning. If master is failed client has to locate new master instance. Client ask resource manager for it.
* Node manager also send heart beat to resource manager. if no heart beat received in specified time, that node is removed from pool of available node.
* All task is recovered from that name node and run on different node.
* If on some name node task getting failed over and over that name node is black listed by master. Resource manager is NOT black listing so new job's task can be run on bad node.
* Resource manager is single point of failure. to achieve high availability we can use active stand by resource manager. If active failed, the stand by will take responsibility.
* stand by resource manager is activated by zookeeper (failover controller)

### Shuffle and sort
* Input to every reducer is sorted by key.
* Shuffle is the process by which system performs sort and transfer map output to reducers.
![](images/shuffle.jpg)

* Each map task has circular memory buffer that it writes output to. It is 100 MB by default (mapreduce.task.io.sort.mb) When content reaches to certain threshold size, background thread will start to spill the contents to disk. Spills are written to mapreduce.cluster.local.dir
* Before written to disk, thread first divides data into partition corresponding to the reducers that will ultimately sent to. Within each partition background thread performs in memory sort by key.
* Combiner function run on output of the sort. Running combiner makes more compact map output. less data written to disk and  transferred.
* Each time buffer pass threshold new spill created.
* Spill is merged in single partition and sorted.
* If there are at least 3 spill files combiner will run again before output file is written.
* Map op may also be compressed. (mapreduce.map.output.compress = true and mapreduce.map.output.compress.codec)
* Map output is at local disk of machine where map ran. Now it is needed to the machine where reducer will run. Reduce node need map output for its partition from several map in cluster. Reduce copy each needed op from map to itself. By default 5 thread copy in parallel from various map.
 * Map output is deleted when master tell to do so to map after job is completed.
 * When in memory buffer reaches to threshold or threshold number of map output, it is merged and spilled to disk. If combiner is specified it will be run during the merge to reduce amount of data written to disk.
 * Background thread merges disk copies to large sorted file.
 * After all map op is copied reducer will move to sort phase, which merges the map op. If there are 50 files and merge factor is 10, there will be 5 rounds and at the end there will be 5 files, at the end there will be find merge to merge this 5 files. Final round of merge directs output to reducer.
* Reduce function is called for each key in the sorted op. OP is written to op FS (HDFS) First replica is written to local disk.
![](images/reducer_sort.jpg)
* Give shuffle as much memory as possible
* Make sure map and reduce use very low memory.

![](images/map_tuning.jpg)

![](images/reduce_tuning.png)

### Task execution environment
![](images/task_env.png)

* In streaming with python we can access it as `os.environ["mapreduce_job_id"]`

### Speculative execution
* Man goal of MR model is to run task in parallel to reduce time than sequential run. There are straggling task which takes significantly longer time than others. Hadoop launch another equivalent task in backup. It is speculative execution of task. When task executed successfully, duplicate task will be killed.
* Speculative execution is only useful when in original node/container there is hardware or software misconfiguration problem. If there is a problem in mapper/reducer it self, speculative execution will also run as slow as original task.
![](images/speculative_execution.jpg)

* Speculative in reducer cause network traffic to transfer op of mapper.
* Also not good  when cluster is busy

![](images/map_reduce_ip.jpg)

* Partition function operates on intermediate key and value types and return partition index.

![](images/map_reduce_config.jpg)

* Input type are set by input format, TextInputFormat generate keys of type LongWritable and values of type Text.
* other type are set by calling methods on Job. If not set intermediate type are defaults to final output type, which defaults to LongWritable and Text.
* If k2 and k3 are same no need to call setMapOutputKeyClass(), if v2 and v3 are same no need to call setMapOutputValueClass().

#### Default MR job
* Only config we set is ip and op path.

* Data type which implement Writable interface is used as values. Implemented as WritableComparable interface is used as keys and values. As we need to compare keys during partitioning and sorting.
![](images/writable_interface.jpg)
* Why writable not primitive?
    - During MR, data transfered from different nodes.  We need serialization and deserialization is done as hadoop only understand binary format, our data is structured data. It also decrease data size. So, writable is used to serialize and deserialize.

```
public class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable>
{
    public void reduce(Text key, Iterator<IntWritable> val, Context obj) throws IOException
    {
        <logic>
        obj.write(key, value); // make sure to convert to Writable
    }
}
```

### Types of mapper
1. Identity Mapper:
    - Default. Same key value pair is output. Mapper<k,v,k,v>
2. Inverse Mapper:
    - Mapper<k,v,v,k>
3. Regex Mapper - Provides a way to use regex in Map function
    - Mapper<k, Text, Text, Longwritable>
4. Token counter mapper
    - Used to generate token count for key

### Types of Reducer:
1. Identity Reducer: Default reducer used to write the output same as input
2. Long Sum Reducer: Determine sum of all values corresponding given key


## Driver class
- Telling hadoop which mapper and reducer class to use. Specify which partitioner and combiner and input format class to use.

```
public class MyDriver {
public static void main(String[] args) throws exception {
    if (args.length != 2) {
        System.err.println("Usage is wrong");
        System.exit(-1);
    }
    
    Job conf = new Job() // create job conf object
    
    conf.setJarByClass(MyDriver.class); // Hadoop will use it to locate jar file
    conf.setJobName("Sample job name") // set name of job
    
    FileInputFormat.addInputPath(conf, new Path(args[0])); // location from where mapper read ip
    // We can specify such multiple path/directory
    FileInputFormat.setOutputPath(conf, new Path(args[1])); // location where reducer write output
    
    conf.setMapperClass(MyMapper.class); // set mapper class
    conf.setReducerClass(Reducer.class); // set reducer class
    
    conf.setMapOutputKeyClass(Text.class); // Set the output key type for the mapper
    conf.setMapOutputValueClass(IntWritable.class); // Set the output Value type for the mapper
    conf.setOutputKeyClass(Text.class); // // Set the output key type for the Reducer
    conf.setOutputValueClass(IntWritable.class); // Set the output Value type for the Reducer
    // If mapper class produce same output as reducer we can ommit setting types for mapper.

    conf.setInputFormat(TextInputFormat.class); // format of the input
    conf.setOutputFormat(TextOutputFormat.class); // format of output
    
    System.exit(job.waitForCompletion(true)?0:1); 
}

}
```

* `Job` obeject gives control and specification of how to run job.
* We package our code in jar file, which is spreaded by hadoop around the clusters

### Debug MapReduce:
* `cat testfile* | ./ your-mapper-program.py | sort | ./ your-reducer-program.py`
* Hadoop has some environment variable that we can use such as task id.
```
import os
myid = os.environ["mapred_tip_id"]
```
* Open file to write
```
mylog = open("/tmp/mymaplog"+myid, "w")
mylog.write(debug_statement)
mylog.close()
```

### Joining Data
* Combining datasets by key
![](images/join.jpg)

* Result we want is,
![](images/join_op.jpg)

* Key is word and value is other info. File A already have Key as a word, in file B move date to value. In reducer there will be 1 line from file A and several from file B. Reducer has all the data at one place. So, it knows that from where the line is coming.
![](images/join_merge.jpg)

#### Vector multiplication
* Element of same index has to be grouped together and multiplied.
* <key, value> = <index, number>
* Bin the keys into ranges also reduce shuffling cost

### Distributed Cache:
* Deliver require file to the nodes.
* When we want to share the file across the node (container) where the mapper and reducer runs we can pass that with `-files` flag, meaning mapper.py, reducer.py will be shared/copied among all containers.
* Other ways is to use `-archives` which utilize network and unpacked at worker node. Content of the tar file will be unpacked in the folder with the same name.
* `-libjars` is used to distribute jars.

### Environment and counters
* Using distributed cache we can pass files to nodes.
* Another way is to use job environment variables to get job configuration options.
* When we launch job, hadoop assign input split of data to workers and we can get that data using,
```
if os.environ['mapred_task_is_map'] == "true": # I am mapper
    os.environ["mapreduce_map_input_file"]
    os.environ["mapreduce_map_input_start"]
    os.environ["maprecuce_map_input_length"]
```
* To access task id
```
os.environ["mapreduce_task_id"] # absolute task id
os.environ["mapreduce_task_parition"]
```

* Using `-D` flag we can pass arbitrary environment variable
```
-D word_pattern="\w+\d+"
```
* word_pattern can be accessed as environment variable.
* `-mapper 'mapper.py --yout_param some_value` can also be used
* All of above is communication from hadoop to python mapper/reducer
* Other way communication is needed to see job progress. We can use status or counters
```
print("processed word", file=sys.stderr) # status
print("counte:msg Counters, word found, 1", file=sys.stderr)
```

### Testing
* Unit test using pytest
* Integration testing like cat filename | ./mapper.py | sort | ./reducer.py
* If mapper ore reducer file is using configuration options then use empty config. It is called system testing
* Acceptance testing : use small testfiles and compare result.

### Partitioner
* Typically partitioner use 1st word (word before first \t) as a key and sort it.
* But what if I want to consider first 2 word as a key?
```
-D stream.num.map.output.key.fields=2
```
* Another options are
```
-D stream.map.output.field.separator=.
```

```
-D mapreduce.partition.keypartitioner.options=-k1.2,1.2 # k1 is first word and .2 is second  
                                                      #character. then 1.2 signify end character.
-partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedParitioner
```

### Comparator
* Specify rule to show 1 key is bigger than other.

```
-D mapreduce.job.output.key.comparator.class=org.apache.hadoop.mapreduce.lib.parition.KeyFieldBasedComparator

-D mapreduce.map.output.key.field.separator=.

-D mapreduce.paritioner.keycomparator.options="-k2,3nr"
```


### File input format
1. Text I/P format
    - Default. Each line is record. Key is byte offset value is entire line.
    ![](images/text_ip_format.jpg)
2. Key value text input format
    ![](images/key_val_ip.jpg)
3. N-line input format
    - Same as text ip format
    - In text ip format we can not decide how many line processed by mapper, here we can set N to do that. default N = 1. Each mapper will process only 1 line.
    ![](images/n_line_ip.jpg)

* Mapper, Reducer and Driver class are important

## Mapper




## Partitioner class
* Class takes input from mapper and for each key creates list of values and pass to reducer. ALso decides which key will go to which reducers.

```
public abstract class Partitioner<KEY, VALUE>extends Object
{
    public abstract int getPartition(KEY k, VALUE v, int numPartitions) // Decide to which reducer particular key/val go
    {
    }
}
```
* Abstract meaning any class which inherits Partitioner class has to provide implementation of all methods.
![](images/default_partitioner.jpg)

![](images/partitioner_sort.jpg)

* Mapper store output to in memory buffer (default size is 100 MB can be changed by io.sort.mb). When map writes in buffer and threshold reaches (80% default), daemon will start writing content of buffer to disk (threshold can be set by io.sort.spill.percent.property). spills are written in mapred.local.dir

* Data lakes are method of storing data that keep vast amount of data in their native format and horizontally to support the analysis of originally disparate source of data.