# A TOUR OF HADOOP ECOSYSTEM THROUGH HORTONWORKS SANDBOX

Hortonworks' "Sandbox" and Cloudera's "Clusterdock" are two self-contained docker images, that can run a full hadoop distribution in the stand-alone or semidistributed mode

Both of them are installed in Lab 409 PC's

However for ease of use, its open-source nature, the variety of Hadoop services available out of the box and focus on teaching Hadoop, we will prefer Hortonworks Sandbox

Hortonworks Sandbox features a standalone version of the full "Hortonworks Data Platform" (HDP) Hadoop distribution

In [None]:
# start_sandbox-hdp.sh

It will take several minutes to make the sandbox up and running

You can communicate with the Sandbox through localhost:8888

In [1]:
firefox localhost:8888 &

[1] 12542


When you hit the "Launch Dashboard" button, the login screen for Ambari will appear.

Ambari is a completely open source management platform for provisioning, managing, monitoring and securing Apache Hadoop clusters.

The credentials are: admin, bda505

From the ambari dashboard, you can access:
- HDFS
- YARN
- MapReduce2
- Hive
- Hbase
- Pig
- Sqoop
- Spark2
- Zeppelin

and other projects within the Hadoop ecosystem.
We will see the use cases of all

The pop-up window features a tutorial series for using HDP

You can follow the link to watch a video about Introduction to Sandbox:

[Hadoop Tutorial: Introduction to Hortonworks Sandbox](https://www.youtube.com/watch?v=H0KXnfE9Z9s&list=PL2y_WpKCCNQcABNHVSwUwwK169xxwSdNp)

We can connect to our docker container using ssh.

For this please open a shell screen and enter the following command (without the leading #). The ssh password of root is set as "hadoopbda505"

Hopefully, passwordless ssh login is enabled as of this class:

In [1]:
# ssh root@localhost -p 2223

Now we show a simple data import and map reduce job:

## Simple Data import and Map Reduce Job

### Data Import

In order to import data into sandbox, we have to send the data to the docker container through ssh

Data are sent to the docker cotainer from the main system through scp or rsync commands

4 datasets are pushed to the docker container:

- The imdb dataset as tsv files
- A portion of the 2015 UN COMTRADE dataset (international trade statistics) as json files 
- 1901-1950 portion of the ncdc dataset (US weather statistics) as plain text files
- A portion of the google ngrams dataset (frequency of words in publications indexed by Google Books, by year of publication) as plain text files

The bash commands to create the necessary directories inside the docker container are as follows

Note that, this command is executed from the main system, not from inside the docker. It is executed via ssh:

In [1]:
# create data directories
#ssh root@localhost -p 2223 'yum -y install expect;
#if [ ! -d /data ]; then mkdir /data; fi;
#if [ ! -d /data/imdb ]; then mkdir /data/imdb; fi;
#if [ ! -d /data/comtrade ]; then mkdir /data/comtrade; fi;
#if [ ! -d /data/comtrade/gz ]; then mkdir /data/comtrade/gz; fi;
#if [ ! -d /data/ncdc ]; then mkdir /data/ncdc; fi;
#if [ ! -d /data/ncdc/gz ]; then mkdir /data/ncdc/gz; fi;
#if [ ! -d /data/ngrams ]; then mkdir /data/ngrams; fi;
#if [ ! -d /data/ngrams/gz ]; then mkdir /data/ngrams/gz; fi;'


And here we push the data into the container:

In [2]:
#rsync -e 'ssh -p 2223' -a /home/bda505/mef/01/Session_01_dataset/tsv/*2.tsv root@localhost:/data/imdb/;
#rsync -e 'ssh -p 2223' -a /home/bda505/mef/04/comtrade_2015/gz/2015_{1..152}_* root@localhost:/data/comtrade/gz;
#rsync -e 'ssh -p 2223' -a /home/bda505/mef/06/ncdc/{1901..1950}* root@localhost:/data/ncdc/gz;
#rsync -e 'ssh -p 2223' -a /home/bda505/mef/07/ngrams/googlebooks-eng-all-1gram-20120701-*[0-9a].gz root@localhost:/data/ngrams/gz;

#ssh root@localhost -p 2223 'if [ ! -d /data/comtrade/json ]; then cp -r /data/comtrade/gz /data/comtrade/json;\
#cd /data/comtrade/json; gunzip *.gz; fi;

#if [ ! -d /data/ncdc/txt ]; then cp -r /data/ncdc/gz /data/ncdc/txt;\
#cd /data/ncdc/txt; gunzip *.gz; fi;

#if [ ! -d /data/ngrams/txt ]; then cp -r /data/ngrams/gz /data/ngrams/txt;\
#cd /data/ngrams/txt; gunzip *.gz; fi

#chmod -R 777 /data

And the codes ...

In [10]:
# rsync -e 'ssh -p 2223' -a /home/bda505/mef-bigdata/week_07/codes root@localhost:/data/

Over network connections, and a large count of files that my change in time, rsync is a better choice. It is small, fast yet versatile: Supports many different options.

Rsync alone can be a perfect tool for backing up and synchronization even over remote connections

For a smaller number of files, scp may also be used:

In [None]:
# scp -P 2223 /path/to/localfile root@localhost:~/

Now a little task for you:
- First open a shell window
- And open a second shell window
- With "ssh root@localhost -p 2223" command, login to the docker container, so that you can view what is inside
- In your "real" shell window - not the docker shell - download the 2014 flight data for NYC airports via:
"wget https://raw.githubusercontent.com/wiki/arunsrinivasan/flights/NYCflights14/flights14.csv"
- "wget" is also a very light, powerful and versatile command: You can even use it as a spider to download and create a mirror of a complete website!
- Now push the csv file into the docker container and confirm it is inside the docker container (using scp or rsync)
- You may create a separate directory inside the docker container
- You may use "ls" command to list directory contents

You can also access the shell inside the docker using the  shell web client or shell-in-a-box at localhost:4200

Use the credentials: root, hadoopbda505

We may use shell commands for some tasks, and ambari web ui for other purposes

The login details for ambari ui at localhost:8888 is admin, bda505

If you cannot login with this password, use the below command from inside the docker, to input your new password:

In [3]:
# ambari-admin-password-reset

We can upload data into HDFS either using ambari web ui or command line

Now, let's first see what our HDFS directories look like

From localhost:8888, click on dashboard, and login with admin, bda505

Just to the left of the top right corner, select "Files View"

That shows the contents of the root directory of HDFS

You can create folders or upload files from your "main" system - that's the easy way

But for the time being, we will upload data using command line

Inside docker shell or shell web client, change the user to hdfs, so that we can access the "hdfs":

In [None]:
# su hdfs

And cd to root

In [5]:
# cd /

You can see the data directory that we pushed to the docker container with ls

hdfs commands are prefixed with "hdfs dfs" followed by a dash and ordinary system commands of Linux

First, let's enlarge the permissions of the user directory inside the root of hdfs so that any user can read, write and execute inside user directory:

In [6]:
# hdfs dfs -chmod 777 /user

Note that, 777 is the most relaxed permission set and in production environments, it may be harmful: Think before you do it

Now let's add a directory to hdfs as /user/data

In [7]:
# hdfs dfs -mkdir /user/data

You can check that the directory is created checking the ambari UI

And now, let's "put" our data inside docker container into the hdfs, let's do it for the ncdc data:

In [None]:
# hdfs dfs -mkdir /user/data/ncdc

In [None]:
# hdfs dfs -put /data/ncdc/txt /user/data/ncdc/

You can view the files are uploaded through Ambari UI or a command from docker shell: 

In [None]:
# hdfs dfs -ls /data/ncdc/txt

Some examples to other commands that can be used with "hdfs dfs" are rm, cp, du and get

You can have more information on hdfs commands at:
https://hortonworks.com/hadoop-tutorial/using-commandline-manage-files-hdfs/

and using HDFS from Ambari at:

https://hortonworks.com/tutorial/hadoop-tutorial-getting-started-with-hdp/section/2/

Let's import other datasets:

In [None]:
# hdfs dfs -mkdir /user/data/imdb

In [None]:
# hdfs dfs -put /data/imdb/ /user/data/imdb/

In [None]:
# hdfs dfs -mkdir /user/data/ngrams

In [None]:
# hdfs dfs -put /data/ngrams/txt /user/data/ngrams/

And the code ...

In [None]:
# hdfs dfs -put /data/codes /user/data/

Note that, all configuration parameters - like the replication factor or the hostnames of the slaves, or the hostname of the master node -  are set inside a few simple xml files such as:

- core-site.xml
- hadoop-env.sh
- hdfs-site.xml
- mapred-site.xml
- slaves

under $HADOOP_HOME/etc/hadoop

$HADOOP_HOME path may differ among systems

### Map Reduce Job

We'll use the maximum temperature example from the Elephant Book as a "Hadoop streaming" job using:

Your computers have a century of the temperature data of USA. We will use a portion of this data

And for each year we will get the max temperature

We will first start with standard unix tools

I played a bit with the original script given as a part of the supplementary material for the Elephant Book

The script is as follows:

```bash
#!/usr/bin/env bash

# adjusted by Serhat Cevikel to measure the time

#START=$(date +%s.%N)
path=$1
starty=$2
endy=$3

years=$(seq $starty $endy)


for year in $years
do
    filee="${path}/${year}"
  echo -ne `basename $year .gz`"\t"
  gunzip -c $filee | \ 
    awk '{ temp = substr($0, 88, 5) + 0;
           q = substr($0, 93, 1);
           if (temp !=9999 && q ~ /[01459]/ && temp > max) max = temp }
         END { print max }'
done
```

Missing temp values are coded as 9999 and they are excluded
"q" is a quality code and should be one of 0,1,4,5,9 to be included

The path to script is /home/bda505/mef-bigdata/week_07/codes/hadoop_max_temperature.sh

In my implementation it takes 3 parameters: the path to gz files, start year and end year

The data resides at /home/bda505/mef/06/ncdc

Let's do it for 1901 to 1925

In [3]:
/home/bda505/mef-bigdata/week_07/codes/hadoop_max_temperature.sh \
/home/bda505/mef/06/ncdc \
1901 1925

1901	317
1902	244
1903	289
1904	256
1905	283
1906	294
1907	283
1908	289
1909	278
1910	
1911	306
1912	322
1913	300
1914	333
1915	294
1916	278
1917	317
1918	322
1919	378
1920	294
1921	283
1922	278
1923	
1924	
1925	317


Now let's transform it to a map reduce job 

In the map phase, the key value pairs are extracted from the data: the year and temp reading

In the reduce phase the max temp for each year is calculated

```R
#!/usr/bin/Rscript

liness <- readLines(file("stdin"))
#liness <- readLines("1901")

year <- as.numeric(substr(liness, 16, 19))
temp <- as.numeric(substr(liness, 88, 92))
qq <- as.numeric(substr(liness, 93, 93))

output <- cbind(year, temp)

output <- output[temp != 9999 & qq %in% c(0, 1, 4, 5, 9),]

for (i in 1:nrow(output))
{
    pasted <- paste(output[i,], collapse = " ")
    cat(sprintf("%s\n", pasted))
}
```                    

The code accepts input from stdin so can be used similar to the previous one - before Hadoop

The path to code is /home/bda505/mef-bigdata/week_07/codes/mapper.R

In [6]:
zcat /home/bda505/mef/06/ncdc/{1901..1925} | /home/bda505/mef-bigdata/week_07/codes/mapper.R | head

1901 -78
1901 -72
1901 -94
1901 -61
1901 -56
1901 -28
1901 -67
1901 -33
1901 -28
1901 -33
Error in cat(sprintf("%s\n", pasted)) : ignoring SIGPIPE signal
In parent.env(env) : closing unused connection 3 (stdin)
Execution halted


The reducer code is as follows:

```R
#!/usr/bin/Rscript

#liness <- readLines(file("stdin"))
liness <- readLines("mapped")

keyval <- list()

for (i in 1:length(liness))
{
    linex <- unlist(strsplit(liness[i], split = " "))
    key <- linex[1]
    val <- as.numeric(linex[2])

    cur.maxval <- keyval[[key]]

    if (is.null(cur.maxval))
    {
        keyval[[key]] <- val
    }
    else
    {
        if (val > cur.maxval)
        {
            keyval[[key]] <- val
        }
    }
}

keys <- as.numeric(names(keyval))
vals <- as.numeric(unlist(keyval))

output <- matrix(c(keys, vals), ncol = 2)
output <- output[order(keys),, drop = F]


for (i in 1:nrow(output))
{
    pasted <- paste(output[i,], collapse = " ")
    cat(sprintf("%s\n", pasted))
}
```

The path to the reducer code is /home/bda505/mef-bigdata/week_07/codes/reducer.R

In [9]:
zcat /home/bda505/mef/06/ncdc/{1901..1925} | \
/home/bda505/mef-bigdata/week_07/codes/mapper.R | \
/home/bda505/mef-bigdata/week_07/codes/reducer
.R

In parent.env(env) : closing unused connection 3 (stdin)
closing unused connection 3 (stdin) 
1901 317
1902 244
1903 289
1904 256
1905 283
1906 294
1907 283
1908 289
1909 278
1911 306
1912 322
1913 300
1914 333
1915 294
1916 278
1917 317
1918 322
1919 378
1920 294
1921 283
1922 278
1925 317


Now we can run the map reduce job

```R
hadoop jar /usr/hdp/2.6.3.0-235/hadoop-mapreduce/hadoop-streaming.jar \
-input /user/data/ncdc/txt \
-output /user/data/ncdc/output \
-mapper /data/codes/mapper.R \
-reducer /data/codes/reducer.R
```