# Hadoop and Map-Reduce demo

## Intro

__NOTE:__ for this notebook you should start your server with `Hadoop (with YARN) and Spark environment`.

![Hadoop in a box](images/hadoop_env.png)

[The Apache Hadoop software library](https://hadoop.apache.org/) is a framework that allows for the distributed processing of large data sets across clusters of computers using simple programming models. It is designed to scale up from single servers to thousands of machines, each offering local computation and storage. Rather than rely on hardware to deliver high-availability, the library itself is designed to detect and handle failures at the application layer, so delivering a highly-available service on top of a cluster of computers, each of which may be prone to failures.

The current installation includes following modules from the Hadoop ecosystem:

- __Hadoop Common:__ The common utilities that support the other Hadoop modules.
- __Hadoop Distributed File System (HDFS™):__ A distributed file system that provides high-throughput access to application data.
- __Hadoop YARN:__ A framework for job scheduling and cluster resource management.
- __Hadoop MapReduce:__ A YARN-based system for parallel processing of large data sets.
- __Spark™:__ A fast and general compute engine for Hadoop data. Spark provides a simple and expressive programming model that supports a wide range of applications, including ETL, machine learning, stream processing, and graph computation.
    

Pseudo Distributed Mode (Single Node Cluster)

Hadoop can also be run on a single-node in a [pseudo-distributed mode](https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-common/SingleCluster.html#Pseudo-Distributed_Operation) where each Hadoop daemon runs in a separate Java process.

In Pseudo-distributed Mode we also use only a single node, but the main thing is that the cluster is simulated, which means that all the processes inside the cluster will run independently to each other. All the daemons that are Namenode, Datanode, Secondary Name node, Resource Manager, Node Manager, etc. will be running as a separate process on separate JVM(Java Virtual Machine) or we can say run on different java processes that is why it is called a Pseudo-distributed.

<font color='red'>__VERY IMPORTANT NOTE:__ The Hadoop instance installed within 'Hadoop (with YARN) and Spark environment' was designed only for educational purposes and DOES NOT STORE DATA after you stop your server. You can create or delete files in HDFS filesystem, write data during session, but next time you start Jupyter server there will be clear filesystem with no data in it.</font>

## Libraries for demo

In [None]:
import os
import re
import json
import socket
import subprocess
import pandas as pd

In [None]:
YARN_PORT = 8088

# working directory for default user `jovyan`
WORK_DIR = '/jovyan'

## HDFS operations

[The Hadoop Distributed File System (HDFS)](https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-hdfs/HdfsDesign.html) is a distributed file system designed to run on commodity hardware. It has many similarities with existing distributed file systems. However, the differences from other distributed file systems are significant. HDFS is highly fault-tolerant and is designed to be deployed on low-cost hardware. HDFS provides high throughput access to application data and is suitable for applications that have large data sets.

Navigation through HDFS is available with `hdfs dfs` [commands](https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/FileSystemShell.html) which are quite simular to Unix shell navigation (`ls`, `cat`, etc.):

In [None]:
def hdfs_dirs(path, filter_str=''):
    """
    Returns files in path provided as a list. 
    File names may be filtered by `filter_str` parameter,
    e.g. `filter_str='csv'` will display only `csv` files.
    
    """
    process = subprocess.Popen(
        ['hdfs', 'dfs', '-ls', path], 
        stdout=subprocess.PIPE, 
        stderr=subprocess.PIPE
    )
    out, err = process.communicate()
    dirs = out.decode('utf-8').split('\n')
    dirs = list(filter(lambda x: filter_str in x, dirs))
    dirs = list(map(lambda x: x.split(' ')[-1], dirs))
    return dirs

def file_content(path):
    """
    Returns content of the file.
    Similar to `cat` command.
    
    """
    process = subprocess.Popen(
        ['hdfs', 'dfs', '-cat', path], 
        stdout=subprocess.PIPE, 
        stderr=subprocess.PIPE
    )
    out, err = process.communicate()
    return out.decode('unicode_escape')

In [None]:
# list root directory
!hdfs dfs -ls /

In [None]:
# list working directory '/jovyan'
# NOTE: variable WORK_DIR='/jovyan' used in braces
!hdfs dfs -ls {WORK_DIR}

You may put files in HDFS with `-put` command:

In [None]:
# put local file 'telecom_churn.csv' in
!hdfs dfs -put ./data/telecom_churn.csv {WORK_DIR}

In [None]:
# check if file appeared in HDFS
!hdfs dfs -ls {WORK_DIR}

In [None]:
# use function defined above
hdfs_dirs(WORK_DIR, 'csv')

In [None]:
# display the content of the 'telecom_churn.csv' file
content = file_content(f'{WORK_DIR}/telecom_churn.csv')
print(content[:512])

## Map-reduce

### Overview

[Hadoop MapReduce is a software framework](https://hadoop.apache.org/docs/current/hadoop-mapreduce-client/hadoop-mapreduce-client-core/MapReduceTutorial.html) for easily writing applications which process vast amounts of data (multi-terabyte data-sets) in-parallel on large clusters (thousands of nodes) of commodity hardware in a reliable, fault-tolerant manner.

A MapReduce job usually splits the input data-set into independent chunks which are processed by the map tasks in a completely parallel manner. The framework sorts the outputs of the maps, which are then input to the reduce tasks. Typically both the input and the output of the job are stored in a file-system. The framework takes care of scheduling tasks, monitoring them and re-executes the failed tasks.

The MapReduce framework operates exclusively on `<key, value>` pairs, that is, the framework views the input to the job as a set of `<key, value>` pairs and produces a set of `<key, value>` pairs as the output of the job, conceivably of different types.

Input and Output types of a MapReduce job:
```
(input) <k1, v1> -> map -> <k2, v2> -> combine -> <k2, v2> -> reduce -> <k3, v3> (output)
```

Demo for MapReduce framework is for well known task of word count:

![MapReduce](images/mapreducescheme.png)

### WordCount with Python

Next example will use [Hadoop streaming](https://hadoop.apache.org/docs/stable/hadoop-streaming/HadoopStreaming.html) concept. Hadoop streaming is a utility that comes with the Hadoop distribution. The utility allows you to create and run Map/Reduce jobs with any executable or script as the mapper and/or the reducer. 

In the example nelow, both the mapper and the reducer are executables that read the input from stdin (line by line) and emit the output to stdout. The utility will create a Map/Reduce job, submit the job to an appropriate cluster, and monitor the progress of the job until it completes.

Put a file to HDFS:

In [None]:
!hdfs dfs -put ./data/wordcount/lizard_king.txt {WORK_DIR}
!hdfs dfs -ls {WORK_DIR}

Two Python scripts are used `mapper.py` and `reducer.py`, let's look at them:

In [None]:
%%bash
echo -e "\n************** MAPPER.PY ****************\n"
cat ./manutils/mapreduce/mapper.py
echo -e "\n************** REDUCER.PY ****************\n"
cat ./manutils/mapreduce/reducer.py

Run the job and print the result:

In [None]:
%%bash
work_dir=/jovyan
out_dir=/lizard_king_output

# delete directory if exists
#hdfs dfs -rm -r ${work_dir}${out_dir}

yarn jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-3.2.2.jar \
    -input ${work_dir}/lizard_king.txt -output ${work_dir}${out_dir} \
    -file ./manutils/mapreduce/mapper.py -file ./manutils/mapreduce/reducer.py \
    -mapper "python3 mapper.py" -reducer "python3 reducer.py"

hdfs dfs -ls ${work_dir}/${out_dir}
hdfs dfs -cat ${work_dir}/${out_dir}/part-00000

### YARN jobs monitoring

Hadoop also provided YARN Web UI for Yarn Resource manager. All the jobs (submitted, running or finished) can be traced in YARN Web UI:

In [None]:
print(
    'YARN Web UI available at:',
    'https://jhas01.gsom.spbu.ru{}proxy/{}/cluster'.format(
        os.environ['JUPYTERHUB_SERVICE_PREFIX'],
        YARN_PORT
    )
)