# Introduction to Hadoop Ecosystem

## Start SSH

First let's get ssh server service up, since hadoop components communicate over ssh protocol:

In [None]:
sudo service ssh stop
sudo service ssh start

And check that sshd works:

In [None]:
service ssh status

## Configuration

Let's check some environment variables:

In [None]:
echo $HADOOP_HOME
echo $HADOOP_CONF_DIR
echo $HADOOP_PREFIX
echo $PATH

Let's view important configuration files. This is a very simple configuration for standalone mode:

In [None]:
ls $HADOOP_CONF_DIR

In [None]:
cat $HADOOP_CONF_DIR/core-site.xml

In [None]:
cat $HADOOP_CONF_DIR/hdfs-site.xml

In [None]:
cat $HADOOP_CONF_DIR/mapred-site.xml

In [None]:
cat $HADOOP_CONF_DIR/slaves

Before first use, the hdfs must be formatted. We do not do it now, since data is already imported into hdfs:

In [3]:
#yes Y | hdfs namenode -format

: 1

## HDFS

### Start and stop HDFS:

Start the hdfs services:

The
```Bash
2>&1 | grep -Pv "^WARNING"
```
part is there for suppressing annoying WARNING messages in the standard error

In [None]:
start-dfs.sh 2>&1 | grep -Pv "^WARNING"

Check which hadoop services run:

In [None]:
jps

Check the status of hdfs:

In [None]:
hdfs dfsadmin -report 2>&1 | grep -Pv "^WARNING"

To stop the services you will use these commands:

In [None]:
# stop-dfs.sh

The logs exist at:

In [None]:
ls /opt/hadoop-2.9.2/logs/

## HDFS operations

First let's create a test file and import into hdfs:

In [None]:
echo "this a test file" > deneme

In [None]:
cat deneme

In [None]:
hdfs dfs -put ~/deneme / 2>&1 | grep -Pv "^WARNING"

And check the file system:

In [None]:
hdfs dfs -ls / 2>&1 | grep -Pv "^WARNING"

Read the contents of the file in the hdfs:

In [None]:
hdfs dfs -cat /deneme 2>&1 | grep -Pv "^WARNING"

Create a copy of the file:

In [None]:
hdfs dfs -cp /deneme /deneme2 2>&1 | grep -Pv "^WARNING"

Check that it is created:

In [None]:
hdfs dfs -ls / 2>&1 | grep -Pv "^WARNING"

Create a directory named somedir in hdfs:

In [None]:
hdfs dfs -mkdir /somedir 2>&1 | grep -Pv "^WARNING"

Check that it is created:

In [None]:
hdfs dfs -ls / 2>&1 | grep -Pv "^WARNING"

Move deneme2 into somedir:

In [None]:
hdfs dfs -mv /deneme2 /somedir 2>&1 | grep -Pv "^WARNING"

Check the contents of somedir:

In [None]:
hdfs dfs -ls /somedir 2>&1 | grep -Pv "^WARNING"

Now export somedir from hdfs to local file system:

In [None]:
hdfs dfs -get /somedir ~/ 2>&1 | grep -Pv "^WARNING"

Check whether it exists in the local file system:

In [None]:
ls ~

In [None]:
ls ~/somedir

Check the disk usage of files and directories:

In [None]:
hdfs dfs -du /

What exists under /data?

In [None]:
hdfs dfs -ls /data

We know the imdb, comtrade_s1 and he_sisli datasets from previous sessions

- ncdc is a part of a huge dataset on detailed meteorological data for USA beginning from 1901
- ngrams is a dataset of the words that appeared in books at books.google.com

We will try to make use of all this data in this and the following session

Now delete the directory /somedir in hdfs. Note that we should pass the recursive (-r) option just as we should do in the local file system:

In [None]:
hdfs dfs -rm -r /somedir 2>&1 | grep -Pv "^WARNING"

In [None]:
hdfs fsck / 2>&1 | grep -Pv "^WARNING"

You can view the report at:

In [None]:
curl "http://localhost:50070/fsck?ugi=hadoop&path=%2F"

To access a remote or local system though the web UI, you can point your browser to the above link (not in binder)

More commands are listed at:

https://hadoop.apache.org/docs/r2.9.2/hadoop-project-dist/hadoop-hdfs/HDFSCommands.html

https://hadoop.apache.org/docs/r2.9.2/hadoop-project-dist/hadoop-common/FileSystemShell.html

https://data-flair.training/blogs/top-hadoop-hdfs-commands-tutorial/

https://www.edureka.co/blog/hdfs-commands-hadoop-shell-command

```Bash
Usage: hadoop fs [generic options]
	[-appendToFile <localsrc> ... <dst>]
	[-cat [-ignoreCrc] <src> ...]
	[-checksum <src> ...]
	[-chgrp [-R] GROUP PATH...]
	[-chmod [-R] <MODE[,MODE]... | OCTALMODE> PATH...]
	[-chown [-R] [OWNER][:[GROUP]] PATH...]
	[-copyFromLocal [-f] [-p] [-l] [-d] <localsrc> ... <dst>]
	[-copyToLocal [-f] [-p] [-ignoreCrc] [-crc] <src> ... <localdst>]
	[-count [-q] [-h] [-v] [-t [<storage type>]] [-u] [-x] <path> ...]
	[-cp [-f] [-p | -p[topax]] [-d] <src> ... <dst>]
	[-createSnapshot <snapshotDir> [<snapshotName>]]
	[-deleteSnapshot <snapshotDir> <snapshotName>]
	[-df [-h] [<path> ...]]
	[-du [-s] [-h] [-x] <path> ...]
	[-expunge]
	[-find <path> ... <expression> ...]
	[-get [-f] [-p] [-ignoreCrc] [-crc] <src> ... <localdst>]
	[-getfacl [-R] <path>]
	[-getfattr [-R] {-n name | -d} [-e en] <path>]
	[-getmerge [-nl] [-skip-empty-file] <src> <localdst>]
	[-help [cmd ...]]
	[-ls [-C] [-d] [-h] [-q] [-R] [-t] [-S] [-r] [-u] [<path> ...]]
	[-mkdir [-p] <path> ...]
	[-moveFromLocal <localsrc> ... <dst>]
	[-moveToLocal <src> <localdst>]
	[-mv <src> ... <dst>]
	[-put [-f] [-p] [-l] [-d] <localsrc> ... <dst>]
	[-renameSnapshot <snapshotDir> <oldName> <newName>]
	[-rm [-f] [-r|-R] [-skipTrash] [-safely] <src> ...]
	[-rmdir [--ignore-fail-on-non-empty] <dir> ...]
	[-setfacl [-R] [{-b|-k} {-m|-x <acl_spec>} <path>]|[--set <acl_spec> <path>]]
	[-setfattr {-n name [-v value] | -x name} <path>]
	[-setrep [-R] [-w] <rep> <path> ...]
	[-stat [format] <path> ...]
	[-tail [-f] <file>]
	[-test -[defsz] <path>]
	[-text [-ignoreCrc] <src> ...]
	[-touchz <path> ...]
	[-truncate [-w] <length> <path> ...]
	[-usage [cmd ...]]
```

**EXERCISE 1:**

Use above commands at least once for your own example
- Create a test file
- Import into hdfs
- Create a directory
- Move and copy files around
- Cat the files
- Export from hdfs into local file system

## YARN

Start yarn services:

In [None]:
start-yarn.sh

Check whether resource manager works by:

In [None]:
jps

You can stop the service by:

In [None]:
#stop-yarn.sh

### Map Reduce Job: Word Count on NCDC Data

We will go through two examples using the 1932-1936 years of the ncdc weather data set:

- Word Count
- Max Temperature

This example is the "hello world" of map reduce and also cited inside the official documentation

We will first implement the map reduce job as a unix command with pipes

If this works right, we will run it using hadoop streaming

Two versions will be run:
- One with a mapper and reducer,
- And the other with a mapper, combiner and reducer

In larger datasets run on many nodes, it is good practice to use a combiner so that the network traffic between the mapper and reducer is minimized

Note that all codes/commands must accept data from stdin and emit the result to stdout
It is best to delimit fields by tab

The mapper phase creates a key value pair out of the data. In the wc example, the original files are send to stdout 

"head" command exists just in order to limit the visible output and will not be a part of the mapper

In [None]:
cat ~/data/ncdc/* | head

Next, the reducer will count the lines

In [None]:
cat ~/data/ncdc/* | wc -l

Now we will convert this into a simple map reduce job:

- Note that the output folder should not exist, so flush it before running the job
- If the command takes parameters, wrap it around single or double quotes
- If the command includes pipes, do it like:
- `bash -c "your command | second command"`

- Note that input and output paths are inside the hdfs
- Mapper and reducer paths are inside the main filesystem
- Run these commands inside the docker shell:

First let's create a directory for outputs:

In [None]:
hdfs dfs -mkdir -p /output 2>&1 | grep -Pv "^WARNING"

Create a directory for outputs from jobs with ncdc dataset:

In [None]:
hdfs dfs -mkdir -p /output/ncdc 2>&1 | grep -Pv "^WARNING"

Before starting a new job, always make sure the output directory is empty otherwise an error is returned:

In [None]:
hdfs dfs -rm -r -f /output/ncdc/* 2>&1 | grep -Pv "^WARNING"

The path to hadoop-streaming jar file is:

In [None]:
ls $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-2.9.2.jar

Let's get the paths to cat and wc:

In [None]:
which cat
which wc

The output directory should be non-existent before the command:

In [None]:
hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-2.9.2.jar \
-input /data/ncdc \
-output /output/ncdc/1 \
-mapper /usr/bin/cat \
-reducer '/usr/bin/wc -l' \
2>&1 | grep -Pv "^WARNING"

Let's check the result:

In [None]:
hdfs dfs -cat /output/ncdc/1/* 2>&1 | grep -Pv "^WARNING"

It is in line with what he got from the local run

Now we will rewrite the job adding a combiner:

The combiner will take over the job of reducer: For each task, the word count is calculated
Now the reducer will just add the word counts!

From the shell, the "bc" command will do it for us.

First let's see what happens until the reducer:

In [None]:
# for file in /data/ncdc/txt/*; \ # each file is send to cat separately
                                  # and we see what mapper, combiner and reducer does
# do cat $file | \ # mapper
# wc -l; done # combiner

for file in ~/data/ncdc/*; do cat $file | wc -l; done

Now, add the reducer:
- "paste -sd+" puts a "+" sign between the numbers
- bc will calculate this formula, and add the numbers

In [None]:
for file in ~/data/ncdc/*; do cat $file | wc -l; done | paste -sd+ | bc

Now let's convert it to a mapreduce job:

Note that we have a new output directory:

In [None]:
hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-2.9.2.jar \
-input /data/ncdc \
-output /output/ncdc/2 \
-mapper /usr/bin/cat \
-combiner '/usr/bin/wc -l' \
-reducer "bash -c 'paste -sd+ | bc'" \
2>&1 | grep -Pv "^WARNING"

Now let's see the result:

In [None]:
hdfs dfs -cat /output/ncdc/2/* 2>&1 | grep -Pv "^WARNING"

### Title Count Across Years Using IMDB Dataset

Now we will revisit the all friend imdb

We will first split the title.basics file into 5 equal parts

In [None]:
tldr split

Let's split the title.basics.tsv into 5 parts:

In [None]:
mkdir -p ~/split

In [None]:
cd ~/split && split -n 5 ~/data/imdb/tsv/title.basics.tsv

In [None]:
ls ~/split

In [None]:
hdfs dfs -put ~/split /data 2>&1 | grep -Pv "^WARNING"

In [None]:
hdfs dfs -ls /data 2>&1 | grep -Pv "^WARNING"

You job is to 
- Get the start year column of the files as the mapper
- Get the count of each year as the combiner
- Aggregate the count of years in each task as the reducer

You can use the "cut" command to get the necessary column. Note that default field delimiter for cut is "\t", the same as the files

You can view the initial rows of the file with head, so that you decide on which column to extract

For combiner and reducer, you can use a small but very talented tool called "q" inside sandbox. "q" uses sqlite as its backend, however, it does not need a database: It can work on columnar data fed from stdin. Usual sql statements just work!

Only you should write "-" for the "from" clause and fields are named as c1, c2, etc. You can use select, from, group by and aggeragete functions such as count() or sum().

Note that when feeding into combiner or reducer you should wrap the line inside quotes. But 
the statement itself needs quotes. So one of the quote pair should be single and the other should be double as such: -combiner 'q "select ........"'

The output path you provide must be non-existent. So either provide a new one or flush the existing one.

First try the mapper, combiner and reducer on the command line:

`for file in ~/split/*; do your_mapper_command | head -3; echo; done`

1894
1892
1892

2000
2000
2000

2013
2011
2011

2014
1909
2014

2016
2016
2016



Then we add the combiner ("head" and "column" are there just for visual purposes):

`for file in ~/split/*; do your_mapper_command $file | \
q "your_combiner_sql_statement" | head -20 | column -c 150; done`

`Warning: column count is one - did you provide the correct delimiter?
1888 2		1891 7		1894 66		1897 798	1900 811	1903 1642	1906 508
1889 1		1892 9		1895 65		1898 1049	1901 924	1904 492	1907 553
1890 3		1893 2		1896 466	1899 893	1902 879	1905 307
Warning: column count is one - did you provide the correct delimiter?
1888 1		1895 3		1898 86		1901 155	1904 551	1907 823	1910 2633
1890 1		1896 105	1899 174	1902 224	1905 482	1908 1689	1911 3076
1894 4		1897 117	1900 180	1903 224	1906 524	1909 2016
Warning: column count is one - did you provide the correct delimiter?
1874 1		1888 2		1891 2		1896 250	1899 719	1902 702	1905 892
1878 1		1889 1		1894 25		1897 407	1900 823	1903 795	1906 822
1887 1		1890 1		1895 47		1898 603	1901 673	1904 783
Warning: column count is one - did you provide the correct delimiter?
1883 1		1896 4		1899 5		1904 1		1908 56		1911 426	1914 666
1890 1		1897 4		1900 9		1905 23		1909 1581	1912 612	1915 517
1891 1		1898 2		1903 1		1907 7		1910 1465	1913 473
Warning: column count is one - did you provide the correct delimiter?
1887 1	1897 16	1899 9	1901 7	1903 14	1905 2	1907 4	1909 8	1911 15	1913 78
1896 7	1898 30	1900 11	1902 3	1904 3	1906 3	1908 3	1910 10	1912 65	1914 49
`


And last, we add our reducer:

`for file in ~/split/*; do your_mapper_command $file | \
q "your_combiner_sql_statement"; done | \
q "your_reducer_sql_statement" | column -c 100`

`Warning: column count is one - did you provide the correct delimiter?
Warning: column count is one - did you provide the correct delimiter?
Warning: column count is one - did you provide the correct delimiter?
Warning: column count is one - did you provide the correct delimiter?
Warning: column count is one - did you provide the correct delimiter?
1874 1		1909 5420	1934 2701	1959 13273	1984 23243	2009 160879
1878 1		1910 6410	1935 2645	1960 14479	1985 24729	2010 178864
1883 1		1911 6439	1936 3081	1961 14573	1986 25012	2011 207730
1887 2		1912 8459	1937 3341	1962 13627	1987 26746	2012 229315
1888 5		1913 9562	1938 3191	1963 15111	1988 25883	2013 242918
1889 2		1914 9003	1939 2828	1964 16602	1989 27896	2014 248265
1890 6		1915 8428	1940 2396	1965 18361	1990 29530	2015 251666
1891 10		1916 6959	1941 2379	1966 19025	1991 30996	2016 253051
1892 9		1917 5524	1942 2277	1967 19709	1992 32802	2017 162619
1893 2		1918 4608	1943 2064	1968 18150	1993 35493	2018 11721
1894 95		1919 3974	1944 1895	1969 19139	1994 39719	2019 975
1895 115	1920 4425	1945 1855	1970 19248	1995 46688	2020 212
1896 832	1921 4177	1946 2248	1971 19507	1996 47151	2021 34
1897 1342	1922 3566	1947 2716	1972 18904	1997 51891	2022 25
1898 1770	1923 3004	1948 3296	1973 19964	1998 58519	2023 5
1899 1800	1924 3057	1949 4310	1974 19458	1999 62518	2024 3
1900 1834	1925 3288	1950 5374	1975 20122	2000 66473	2025 1
1901 1759	1926 3016	1951 6333	1976 19101	2001 74113	2026 1
1902 1808	1927 3100	1952 7119	1977 19296	2002 77652	2115 1
1903 2676	1928 3065	1953 7824	1978 19401	2003 86811	N 321491
1904 1830	1929 3149	1954 8276	1979 20030	2004 102515
1905 1706	1930 2778	1955 9502	1980 20921	2005 115374
1906 1857	1931 2779	1956 10716	1981 19853	2006 129299
1907 2481	1932 2783	1957 12085	1982 20390	2007 143551
1908 4274	1933 2635	1958 12564	1983 21264	2008 151624
`


Now we are ready to run it as a mapreduce job on hdfs and yarn, before that we flush the output  directory (or pass a non-existent one)

```Bash
hdfs dfs -mkdir -p /output/split
hdfs dfs -rm -r -f /output/split/*
```

And run you mapreduce job:

```Bash
hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-2.9.2.jar \
-input /data/split \
-output /output/split/1 \
-mapper "your_mapper_command" \
-combiner 'q "your_combiner_sql_statement"' \
-reducer 'q "your_reducer_sql_statement"'
```


### Creating Scripts for MapReduce Job: Maximum Temperature Example on NCDC Dataset

Now, using the same NCDC data set we'll recreate the maximum temperature example from the Elephant Book as a "Hadoop streaming" job:

Your computers have a century of the temperature data of USA. We will use just the first 10 years of this data

And for each year we will get the max temperature

We will first start with standard unix tools

I played a bit with the original script given as a part of the supplementary material for the Elephant Book

Note that there may be empty files and the job must have a remedy for this issue

The script is as follows:

```Bash
#!/usr/bin/env bash

# adjusted by Serhat Cevikel to measure the time

#START=$(date +%s.%N)
path=$1
starty=$2
endy=$3

years=$(seq $starty $endy)


for year in $years
do
    filee="${path}/${year}"
  echo -ne `basename $year `"\t"
  cat $filee | \ 
    awk '{ temp = substr($0, 88, 5) + 0;
           q = substr($0, 93, 1);
           if (temp !=9999 && q ~ /[01459]/ && temp > max) max = temp }
         END { print max }'
done
```

Missing temp values are coded as 9999 and they are excluded
"q" is a quality code and should be one of 0,1,4,5,9 to be included

The path to script is ~/codes/hadoop_max_temperature.sh

In my implementation it takes 3 parameters: the path to gz files, start year and end year

The data resides at ~/data/ncdc

Let's do it for 1932 to 1936

In [None]:
~/codes/hadoop_max_temperature.sh \
~/data/ncdc \
1901 1910

Now let's transform it to a map reduce job:

In the map phase, the key value pairs are extracted from the data: the year and temp reading

In the reduce phase the max temp for each year is calculated

mapper is:

```R
#!/usr/bin/Rscript

con <- file("stdin")
#con <- file("1910")
liness <- readLines(con)
close(con)

year <- as.numeric(substr(liness, 16, 19))
temp <- as.numeric(substr(liness, 88, 92))
qq <- as.numeric(substr(liness, 93, 93))

output <- cbind(year, temp)

output <- output[temp != 9999 & qq %in% c(0, 1, 4, 5, 9),]

for (i in seq_along(output[,1]))
{
    pasted <- paste(output[i,], collapse = "\t")
    cat(sprintf("%s\n", pasted))
}
```

The code accepts input from stdin so can be used similar to the previous one - before Hadoop

In [None]:
cat ~/data/ncdc/{1901..1910} | ~/codes/mapper.R | head

The reducer code is as follows:

```R
#!/usr/bin/Rscript

con <- file("stdin")
#com <- file("mapped")
liness <- readLines(con)
close(con)

keyval <- list()

for (i in seq_along(liness))
{
    linex <- unlist(strsplit(liness[i], split = "\t"))
    key <- linex[1]
    val <- as.numeric(linex[2])

    cur.maxval <- keyval[[key]]

    if (is.null(cur.maxval))
    {   
        keyval[[key]] <- val 
    }
    else
    {   
        if (val > cur.maxval)
        {
            keyval[[key]] <- val 
        }
    }
}

keys <- as.numeric(names(keyval))
vals <- as.numeric(unlist(keyval))

output <- matrix(c(keys, vals), ncol = 2)
output <- output[order(keys),, drop = F]

for (i in seq_along(output[,1]))
{
    pasted <- paste(output[i,], collapse = "\t")
    cat(sprintf("%s\n", pasted))
}
```

In [None]:
cat ~/data/ncdc/{1901..1910} | ~/codes/mapper.R | ~/codes/reducer.R

Now we can run the map reduce job. Note that we have to pass the custom script files via "-file" option so that all nodes can run it:

In [None]:
hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-2.9.2.jar \
-input /data/ncdc/{1901..1910} \
-output /output/ncdc/3 \
-mapper ~/codes/mapper.R \
-reducer ~/codes/reducer.R \
-file ~/codes/mapper.R \
-file ~/codes/reducer.R \
2>&1 | grep -Pv "^WARNING"

Or we can run it with a combiner (the same script as the reducer for this case):

In [None]:
hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-2.9.2.jar \
-input /data/ncdc/{1901..1910} \
-output /output/ncdc/3 \
-mapper ~/codes/mapper.R \
-combiner ~/codes/reducer.R \
-reducer ~/codes/reducer.R \
-file ~/codes/mapper.R \
-file ~/codes/reducer.R \
2>&1 | grep -Pv "^WARNING"

## SQOOP

We will use sqoop in order to import data
- from RDBMS
- into HDFS as text files
- or as hive tables

First let's start postgresql service:

In [None]:
sudo service postgresql start

Check whether it runs:

In [None]:
psql -U postgres -c "\l"

And let's remember the table names in imdb2:

In [None]:
psql -U postgres -d imdb2 -c "\dt+"

And let's see whether sqoop can connect to the database:

In [None]:
sqoop list-tables --connect jdbc:postgresql://localhost:5432/imdb2 \
--username postgres 2>&1 | grep -Pv "^(Warning|Please)"

Now, first import a single table as a text file into hdfs

The target directory should be non-existent. The direct flag is for fast imports:

In [None]:
sqoop import --connect jdbc:postgresql://localhost:5432/imdb2 \
--username postgres \
--table name_basics \
--target-dir /import \
--direct \
2>&1 | grep -Pv "^(Warning|Please)"

Now let's check whether the new directory and file(s) exist:

In [None]:
hdfs dfs -ls /import 2>&1 | grep -Pv "^WARNING"

And view the file:

In [None]:
hdfs dfs -cat /import/part-m-00000 2>&1 | grep -Pv "^WARNING" | head

And get the file out of hdfs:

In [None]:
hdfs dfs -get /import/part-m-00000 . 2>&1 | grep -Pv "^WARNING"

In [None]:
head part-m-00000