# Tutorial: Hadoop and Hadoop Distributed File System (HDFS)

In this tutorial, you will:

* Create a MapReduce task using the Hadoop "Streaming" API (Python)
* Import a spam dataset into the Hadoop Distributed File System (HDFS)
* Run a MapReduce task using HDFS

## Setup
* This tutorial expects you to be using the COMP6235 Virtual Machine for VirtualBox. No support is provided for other solutions. Setup instructions are available at http://edshare.soton.ac.uk/id/document/324163
* Run "run-jupyter" to start Jupyter Notebook
* Download the .ipynb file at http://edshare.soton.ac.uk/19650/ and import it into Jupyter

# Refresher: What is Hadoop?

Apache Hadoop is an open-source software framework for distributing the processing of large amounts of data across multiple machines. It has an emphasis on fault-tolerant processing of data on large clusters. Hadoop has three important components:

**Hadoop Distributed File System** - A distributed file-system that stores data and facilitates the sharing of data between different machines in a Hadoop cluster (group of machines).

**Hadoop YARN** - A platform for managing the computing resources available to Hadoop, notably performing the task of scheduling jobs to run on other machines.

**Hadoop MapReduce** - Support for the MapReduce programming model for large-scale data processing

All of these are already set up and (mostly) configured in the Virtual Machine, though this tutorial will walk you through starting and using these tools.

## Firstly: Start a new terminal
In addition to running Notebooks, Jupyter is also capable of running a terminal, an interactive text-based interface to the Virtual machine. On the main menu on the `Home` page, you can start a new terminal by clicking on `New` -> `Terminal`.  

We'll be using this to run some of the commands necessary to configure Hadoop. 

## Hadoop Modes of Operation

Hadoop has three main modes of operation:

**Standalone Mode** - This is the default mode used by Hadoop. It's localised to the current machine, and doesn't use HDFS, instead reading files from the local filesystem. It's primarily used for debugging.

**Pseudo-Distributed Mode** - This is where Hadoop uses a cluster consisting of only a single machine, with every Hadoop daemon (a type of program that sits there doing work in that background) running on that machine. This is mainly used for testing the Hadoop setup. 

**Fully-Distributed Mode** - This is where data and processing is split between multiple machines. This enables Hadoop to horizontally scale and leverage the resources of multiple machines. This is the main mode used by Hadoop in production.

In this tutorial, we'll only be using Standalone and Pseudo-Distributed modes.

## MapReduce

MapReduce is a programming model used by Hadoop to process large amounts of data in parallel. It accepts input data in the form of a set of key-value pairs <key1, value1>. It divides this set into individual chunks and assigns them as tasks to be processed on individual machines. It works in two phases: A Map phase and a Reduce phase.

The Map phase takes these key-value pairs in the form <key1, value1> and maps (processes them) into other, intermediate key-value pairs <key2, value2>.

These pairs are then sorted by their key, and passed into the Reduce phase.

The Reduce phase takes these keys and produces a third (smaller) set of keys, combining the elements from the intermediate pairs that share a common key.

In summary:

**<key1, value1>** is *mapped* to **<key2, value2>** which is *reduced* to a smaller set of **<key3, value3>**.

Don't worry if it's all a bit abstract - there'll be examples in the rest of the tutorial.

## Importing data

The first thing we're going to do, is download some data.  We will store this on our VMs, but could represent data which is remote, or in a datacentre somewhere.  Run the following code:

In [None]:
%%bash

wget https://archive.ics.uci.edu/ml/machine-learning-databases/00380/YouTube-Spam-Collection-v1.zip \
-O YouTube-Spam-Collection-v1.zip

unzip -o YouTube-Spam-Collection-v1.zip

ls -lh *.csv

Having downloaded the data, we want to be able to do a MapReduce task on it.  To do this, we will use the Hadoop Streaming API, which allows us to write Python code rather than the usual Java.  

When we call the Hadoop process, we pass two Python files to the command - one which maps, and one which reduces.

First, let's look at the data:

In [None]:
%%bash

head -n 10 Youtube04-Eminem.csv

## Check that Hadoop is running

The next thing to do is to check that we have Hadoop installed and running.  Open a terminal, and type in: 

    hadoop version
    
This should show you that the version you have is Hadoop 2.8.5.  

## Word counting

Now we have our CSV files, let's get started processing them.

The first thing we want to do is to set up a MapReduce function which will allow us to count the number of each individual word from the `comment` field of the file.

The streaming API uses streams, which means that the information passed in to the map process is information from the output of one of our CSV files, and the data is then passed between the map and reduce process is output which is printed to stdout.

The streaming API provides us a stream of data to the program's "standard input", more commonly called "stdin". In this case, we'll get each of the lines of our CSV file as the input to the mapper. The mapper will then then process this, and put it to "standard output" or "stdout". This will then be used as the input to the reduce process, and so on.

It helps to first think of what the inputs and outputs of each stage of the process are. For word counting, we could do something like this:

**Line of CSV Data** is mapped to **<Word, 1>** is mapped to **<Word, Count>**

By default, the Streaming API uses a *tab character* as a seperator between the key and the value. The output of your map function might look something like:

    "Banana\t1" or "Banana    1"

Some code has been provided to you below, including the libraries used in our answer. However, other solutions are possible that do not use these libraries.

The cells below use the %%writefile magic keyword to write their contents to a file, instead of executing them.
If you wish to execute them, comment this out with a `#`.

In [None]:
%%writefile mapper.py
#!/usr/bin/env python
# MAPPER

import csv
import sys
import re

lines = sys.stdin.readlines()

csvreader = csv.reader(lines)
# YOUR CODE GOES BELOW

# Example output
# print(token + "\t" + "1")
        

In [None]:
%%writefile reducer.py
#!/usr/bin/env python
# REDUCER

import sys
from collections import defaultdict
# Keep simple example in for now, switch to stdin later

input_pairs = [
    '+447935454150	1',
    'lovely	1',
    'girl	1',
    'talk	1',
    'to	1',
    'me	1'
    'xxx	1'
]
# Once we test this with streams, we can uncomment this next line
# input_pairs = sys.stdin.readlines()

# YOUR CODE GOES BELOW


Ensure the above files have been written to two files: `mapper.py` and `reducer.py`. The easiest way to do this is make sure the `%%writefile mapper/reducer.py` lines are uncommented, then run the cell.

Since the mapper and reducer accept a stream into their stdin and output to stdout, we can test whether the scripts above work as a pipeline without using Hadoop!

The below command reads the .csv file, then pipes the output to `mapper.py`'s stdin. The mapper's output is then piped to `reducer.py`, and so on.

In [None]:
%%bash
cat Youtube04-Eminem.csv | ./mapper.py | ./reducer.py | sort

Now we've tested our pipeline works, it's time to integrate it into hadoop. The below commands clear the output folder, ensure Hadoop is in standalone mode, then run our pipeline.

The parameters are as follows:

`-files` - Ensures these files are provided to every machine in our cluster.

`-input` - The data sources to be passed to the pipeline.

`-mapper` - The mapper to use.

`-reducer` - The reducer to use.

`-output` - The output folder.

Test out your pipeline by running the command below!

In [None]:
%%bash

# Clear output
rm -rf output

# Make sure hadoop is in standalone mode
hadoop-standalone-mode.sh

# Main pipeline command
hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-*.jar \
-files mapper.py,reducer.py \
-input Youtube04-Eminem.csv \
-mapper ./mapper.py \
-reducer ./reducer.py \
-output output

## Setting up HDFS

Now you've had a chance to use Hadoop in standalone mode, it's time we set it to pseudo-distributed mode and set up HDFS.

To speed things up, some commands have been provided to easily configure Hadoop. Start by running in your terminal:

    hadoop-pseudo-distributed-mode.sh
    
If you're curious how this works, feel free to have a read of the code, you'll find it in `~/vm_creation/scripts`. 

We should now have HDFS configured for pseudo-distributed mode.  We will now need to create a disk for HDFS, which will use the configurations we just set:

    hdfs namenode -format

## Starting services

Now we need to start the different services and we can get to work!  Run the following command in the terminal to start the HDFS:

    start-dfs.sh

You'll also need to start YARN in order to run any MapReduce jobs, so let's do that now:

    start-yarn.sh

To see what this has left you with, you can see the processes which are running on the JVM by running the `jps` command:

In [None]:
%%bash 
jps

You should see something similar to the following:

```
XXXX ResourceManager
XXXX SecondaryNameNode
XXXX NameNode
XXXX DataNode
XXXX NodeManager
```

If any of these aren't running, double check that you've run all of the above commands. If any are still missing, you may encounter errors later, so please contact one of the demonstrators.

Now that we have a HDFS disk, and the appropriate Hadoop services running in pseudo-distributed mode, we can start to import the data into the new HDFS filesystem and run the MapReduce task there. 

Fully-distributed mode runs on the exact same principles described below, so we could apply MapReduce over various machines.

In summary, we need to: 

* Create a directory for the input
* Import the data from the local file to the HDFS datanode
* Run the MapReduce job
* View the output

The Commands on HDFS are similar to standard linux CLI commands, except for the fact that they are prefixed by either `hadoop fs` or `hdfs dfs`.

The `hadoop fs` command is more general, as it can cope with different types of filesystem, such as the one on the local disk.  As such, this is a better choice to use for commands relating solely to HDFS.

The command to create a directory is `-mkdir`.  Create a directory `/input` on the HDFS system.  Use the `hdfs dfs` command below to achieve this.

In [None]:
%%bash
# YOUR CODE HERE


Next, we need to import our data into HDFS. Here, we are dealing with two different filesystems: the local system and the HDFS node so we will use `hadoop fs`, with the `-copyFromLocal` command. This command copies files from the local filesystem to HDFS, accepting two arguments: file source and destination.

HDFS filesystems are defined by a URI prefixed by `hdfs://`, and the `hdfs dfs` and `hadoop fs` commands will normally expect to see them.

If they are not specified, the default location of the filesystem is specified in `core-site.xml`, which is one of the config files we imported earlier.  The value can be seen from the following command:

In [None]:
%%bash 
cat $HADOOP_HOME/etc/hadoop/core-site.xml

If you are interested in learning more about the configuration options we have specified for Hadoop, check out the documentation for Hadoop, as well as the `~/vm_creation/hadoop` folder.

For the `-copyFromLocal` we can either specify `hdfs://localhost:9000/` or leave it out, instead using `hdfs:///`. For example, `hdfs://localhost:9000/input` and `hdfs:///input` refer to the same location.

The local file can be specified with a relative command, leaving the import command as one of the following two.  Pick one and execute it in the cell below.

In [None]:
%%bash
# With fully specified URI
hadoop fs -copyFromLocal *.csv hdfs://localhost:9000/input

# Explicit HDFS, but with the default host
# hadoop fs -copyFromLocal *.csv hdfs:///input

# Implied URI based on default
# hadoop fs -copyFromLocal *.csv /input



Next, we'll check that the files have been successfully imported.

In [None]:
%%bash
hadoop fs -ls /input

Perform the same for the `mapper.py` and `reducer.py` files we created for the MapReduce task earlier, keeping those in the `input` directory as well.

You may need to add `-p` and `-f` as options. These options preserve file permissions, and force the new files to overwrite any existing files, respectively. 

In [None]:
%%bash
# YOUR CODE HERE

Now we run the hadoop command again, this time sourcing our files from HDFS instead of the local filesystem.

Note: If you run this command more than once, Hadoop will throw an error due to the output directory already existing. You may need to erase the existing directory or output to one with a different name.

In [None]:
%%bash
hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-*.jar \
-files hdfs:///input/mapper.py,hdfs:///input/reducer.py \
-input hdfs:///input/Youtube04-Eminem.csv \
-mapper ./mapper.py \
-reducer ./reducer.py \
-output hdfs://localhost:9000/output_2

In the cell below, write a command to view the files listed in the `/output_2` directory.

In [None]:
%%bash
# YOUR CODE HERE

The `_SUCCCESS` file indicates that the job was a success, which is good.  The other file, `part-00000` contains the result.  Write code in the cell below to get the output (from HDFS)

In [None]:
%%bash
# YOUR CODE HERE

You can include multiple `-input` parameters to operate on more than one file.  Update the streaming command above to include all 5 files in the cell below.  Make sure you include a new output directory!

In [None]:
%%bash

# Update this command to include multiple files
hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-*.jar \
-files hdfs:///input/mapper.py,hdfs:///input/reducer.py \
-input hdfs://localhost:9000/input/Youtube04-Eminem.csv \
-mapper ./mapper.py \
-reducer ./reducer.py \
-output hdfs://localhost:9000/output_8

## Summary

In this tutorial, you have started to use Hadoop and HDFS.  You created a MapReduce task using the Hadoop streaming framework, and then set up your Hadoop instance to work in pseudo-distributed mode.

### Where next?
You might want to look at Mahout, and try and put the data here into a format like the one in the [video](https://www.youtube.com/watch?v=TWl6AIZIVps) which can be used to train a naive Bayes algorithm to classify the spam data.  Alternatively, you can try and find the [SpamAssassin dataset](http://csmining.org/index.php/spam-email-datasets-.html) and import that yourself.

Hadoop over multiple machines is difficult to configure.  You might try and look at systems which assist you to do this, such as Ambari or Cloudera, and try to do these yourself.  If you don't have multiple computers available, to practice, give the [Caochong](https://github.com/weiqingy/caochong) library a try which sets up Hadoop in Docker containers on a single computer.