# <center> Introduction to Hadoop MapReduce </center>

## 2. Debugging Hadoop MapReduce Jobs

** Data: Movie Ratings and Recommendation **

An independent movie company is looking to invest in a new movie project. With limited finance, the company wants to 
analyze the reaction of audiences, particularly toward various movie genres, in order to identify beneficial 
movie project to focus on. The company relies on data collected from a publicly available recommendation service 
by [MovieLens](http://dl.acm.org/citation.cfm?id=2827872). This 
[dataset](http://files.grouplens.org/datasets/movielens/ml-10m-README.html) contains **24404096** ratings and **668953**
 tag applications across **40110** movies. These data were created by **247753** users between January 09, 1995 and January 29, 2016. This dataset was generated on October 17, 2016. 

From this dataset, several analyses are possible, include the followings:
1.   Find movies which have the highest average ratings over the years and identify the corresponding genre.
2.   Find genres which have the highest average ratings over the years.
3.   Find users who rate movies most frequently in order to contact them for in-depth marketing analysis.

These types of analyses, which are somewhat ambiguous, demand the ability to quickly process large amount of data in 
elatively short amount of time for decision support purposes. In these situations, the sizes of the data typically 
make analysis done on a single machine impossible and analysis done using a remote storage system impractical. For 
remainder of the lessons, we will learn how HDFS provides the basis to store massive amount of data and to enable 
the programming approach to analyze these data.

In [1]:
!hdfs --config ~/hadoop_palmetto/config dfs -ls /user/jin6

2020-11-04 11:58:47,117 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Found 3 items
drwx------   - jin6 supergroup          0 2020-11-04 11:53 /user/jin6/.Trash
drwxr-xr-x   - jin6 supergroup          0 2020-11-04 11:39 /user/jin6/intro-to-hadoop
drwxr-xr-x   - jin6 supergroup          0 2020-11-04 11:48 /user/jin6/repository


In [2]:
!hdfs --config ~/hadoop_palmetto/config dfs -mkdir /repository

2020-11-04 11:58:53,391 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
mkdir: `/repository': File exists


In [3]:
!hdfs --config ~/hadoop_palmetto/config dfs -put /zfs/citi/movielens /repository

2020-11-04 11:58:58,954 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
put: `/repository/movielens/genome-tags.csv': File exists
put: `/repository/movielens/movies.csv': File exists
put: `/repository/movielens/tags.csv': File exists
put: `/repository/movielens/README.txt': File exists
put: `/repository/movielens/links.csv': File exists
put: `/repository/movielens/genome-scores.csv': File exists
put: `/repository/movielens/ratings.csv': File exists


In [4]:
!hdfs --config ~/hadoop_palmetto/config dfs -ls /repository/movielens

2020-11-04 11:59:13,406 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Found 7 items
-rw-r--r--   3 jin6 supergroup       9511 2020-11-04 11:55 /repository/movielens/README.txt
-rw-r--r--   3 jin6 supergroup  333365341 2020-11-04 11:55 /repository/movielens/genome-scores.csv
-rw-r--r--   3 jin6 supergroup      18103 2020-11-04 11:55 /repository/movielens/genome-tags.csv
-rw-r--r--   3 jin6 supergroup     859311 2020-11-04 11:55 /repository/movielens/links.csv
-rw-r--r--   3 jin6 supergroup    2007982 2020-11-04 11:55 /repository/movielens/movies.csv
-rw-r--r--   3 jin6 supergroup  663420664 2020-11-04 11:55 /repository/movielens/ratings.csv
-rw-r--r--   3 jin6 supergroup   24032991 2020-11-04 11:55 /repository/movielens/tags.csv


### Find movies which have the highest average ratings over the years and report their ratings and genres

- Find the average ratings of all movies over the years
- Sort the average ratings from highest to lowest
- Report the results, augmented by genres

In [5]:
!hdfs --config ~/hadoop_palmetto/config dfs -cat /repository/movielens/README.txt

2020-11-04 11:59:25,858 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2020-11-04 11:59:27,691 INFO sasl.SaslDataTransferClient: SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false
Summary

This dataset (ml-latest) describes 5-star rating and free-text tagging activity from [MovieLens](http://movielens.org), a movie recommendation service. It contains 24404096 ratings and 668953 tag applications across 40110 movies. These data were created by 259137 users between January 09, 1995 and October 17, 2016. This dataset was generated on October 18, 2016.

Users were selected at random for inclusion. All selected users had rated at least 1 movies. No demographic information is included. Each user is represented by an id, and no other information is provided.

The data are contained in the files `genome-scores.csv`, `genome-tags.csv`, `links.csv`, `movies.csv`, `ratings.csv` and `tag

In [7]:
!hdfs --config ~/hadoop_palmetto/config dfs -cat /repository/movielens/links.csv \
    2>/dev/null | head -n 5

movieId,imdbId,tmdbId
1,0114709,862
2,0113497,8844
3,0113228,15602
4,0114885,31357


In [8]:
!hdfs --config ~/hadoop_palmetto/config dfs -cat /repository/movielens/movies.csv \
    2>/dev/null | head -n 5

movieId,title,genres
1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy
2,Jumanji (1995),Adventure|Children|Fantasy
3,Grumpier Old Men (1995),Comedy|Romance
4,Waiting to Exhale (1995),Comedy|Drama|Romance


In [9]:
!hdfs --config ~/hadoop_palmetto/config dfs -cat /repository/movielens/ratings.csv \
    2>/dev/null | head -n 5

userId,movieId,rating,timestamp
1,122,2.0,945544824
1,172,1.0,945544871
1,1221,5.0,945544788
1,1441,4.0,945544871


In [10]:
!hdfs --config ~/hadoop_palmetto/config dfs -cat /repository/movielens/tags.csv \
    2>/dev/null | head -n 5

userId,movieId,tag,timestamp
28,63062,angelina jolie,1263047558
40,4973,Poetic,1436439070
40,117533,privacy,1436439140
57,356,life positive,1291771526


### Note:

To write a MapReduce program, you have to be able to identify the necessary (Key,Value) that can contribute to the final realization of the required results. This is the reducing phase. From this (Key,Value) pair format, you will be able to develop the mapping phase. 

In [11]:
%%writefile codes/avgRatingMapper01.py
#!/usr/bin/env python

import sys

for oneMovie in sys.stdin:
    oneMovie = oneMovie.strip()
    ratingInfo = oneMovie.split(",")
    movieID = ratingInfo[1]
    rating = ratingInfo[2]
    print ("%s\t%s" % (movieID, rating)) 

Writing codes/avgRatingMapper01.py


In [16]:
!hdfs --config ~/hadoop_palmetto/config dfs -cat /repository/movielens/ratings.csv \
    2>/dev/null | head -n 5 | python ./codes/avgRatingMapper01.py

movieId	rating
122	2.0
172	1.0
1221	5.0
1441	4.0


#### *Do we really need the headers?*

In [13]:
%%writefile codes/avgRatingMapper02.py
#!/usr/bin/env python

import sys

for oneMovie in sys.stdin:
    oneMovie = oneMovie.strip()
    ratingInfo = oneMovie.split(",")
    try:
        movieID = ratingInfo[1]
        rating = float(ratingInfo[2])
        print ("%s\t%s" % (movieID, rating))
    except ValueError:
        continue

Writing codes/avgRatingMapper02.py


In [15]:
!hdfs --config ~/hadoop_palmetto/config dfs -cat /repository/movielens/ratings.csv \
    2>/dev/null | head -n 5 | python ./codes/avgRatingMapper02.py

122	2.0
172	1.0
1221	5.0
1441	4.0


#### *The outcome is correct. Is it useful?*

Getting additional file

In [17]:
!mkdir movielens
!hdfs --config ~/hadoop_palmetto/config dfs -get /repository/movielens/movies.csv movielens/movies.csv

2020-11-04 12:00:58,291 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2020-11-04 12:01:00,015 INFO sasl.SaslDataTransferClient: SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false


In [18]:
%%writefile codes/avgRatingMapper03.py
#!/usr/bin/env python

import sys
import csv

movieFile = "./movielens/movies.csv"
movieList = {}

with open(movieFile, mode = 'r') as infile:
    reader = csv.reader(infile)
    for row in reader:
        movieList[row[0]] = {}
        movieList[row[0]]["title"] = row[1]
        movieList[row[0]]["genre"] = row[2]

for oneMovie in sys.stdin:
    oneMovie = oneMovie.strip()
    ratingInfo = oneMovie.split(",")
    try:
        movieTitle = movieList[ratingInfo[1]]["title"]
        movieGenre = movieList[ratingInfo[1]]["genre"]
        rating = float(ratingInfo[2])
        print ("%s\t%s\t%s" % (movieTitle, rating, movieGenre))
    except ValueError:
        continue

Writing codes/avgRatingMapper03.py


In [19]:
!hdfs --config ~/hadoop_palmetto/config dfs -cat /repository/movielens/ratings.csv \
    2>/dev/null | head -n 5 | python ./codes/avgRatingMapper03.py

Boomerang (1992)	2.0	Comedy|Romance
Johnny Mnemonic (1995)	1.0	Action|Sci-Fi|Thriller
Godfather: Part II, The (1974)	5.0	Crime|Drama
Benny & Joon (1993)	4.0	Comedy|Romance


#### *Test reducer:*

In [20]:
%%writefile codes/avgRatingReducer01.py
#!/usr/bin/env python
import sys

current_movie = None
current_rating_sum = 0
current_rating_count = 0

for line in sys.stdin:
    line = line.strip()
    movie, rating, genre = line.split("\t", 2)
    try:
        rating = float(rating)
    except ValueError:
        continue

    if current_movie == movie:
        current_rating_sum += rating
        current_rating_count += 1
    else:
        if current_movie:
            rating_average = current_rating_sum / current_rating_count
            print ("%s\t%s\t%s" % (current_movie, rating_average, genre))    
        current_movie = movie
        current_rating_sum = rating
        current_rating_count = 1

if current_movie == movie:
    rating_average = current_rating_sum / current_rating_count
    print ("%s\t%s\t%s" % (current_movie, rating_average, genre))


Writing codes/avgRatingReducer01.py


In [22]:
!hdfs --config ~/hadoop_palmetto/config dfs -cat /repository/movielens/ratings.csv 2>/dev/null \
    | head -n 5 \
    | python ./codes/avgRatingMapper03.py \
    | sort \
    | python ./codes/avgRatingReducer01.py

Benny & Joon (1993)	4.0	Comedy|Romance
Boomerang (1992)	2.0	Crime|Drama
Godfather: Part II, The (1974)	5.0	Action|Sci-Fi|Thriller
Johnny Mnemonic (1995)	1.0	Action|Sci-Fi|Thriller


#### Non-HDFS correctness test

In [23]:
!hdfs --config ~/hadoop_palmetto/config dfs -cat /repository/movielens/ratings.csv 2>/dev/null \
    | head -n 2000 \
    | python ./codes/avgRatingMapper03.py \
    | grep Matrix

Matrix Reloaded, The (2003)	4.0	Action|Adventure|Sci-Fi|Thriller|IMAX
Matrix, The (1999)	3.5	Action|Sci-Fi|Thriller
Matrix, The (1999)	3.5	Action|Sci-Fi|Thriller
Matrix, The (1999)	4.5	Action|Sci-Fi|Thriller
Matrix Reloaded, The (2003)	1.0	Action|Adventure|Sci-Fi|Thriller|IMAX
Matrix, The (1999)	5.0	Action|Sci-Fi|Thriller
Matrix Reloaded, The (2003)	5.0	Action|Adventure|Sci-Fi|Thriller|IMAX
Matrix Revolutions, The (2003)	2.5	Action|Adventure|Sci-Fi|Thriller|IMAX
Matrix, The (1999)	3.5	Action|Sci-Fi|Thriller


In [24]:
!hdfs --config ~/hadoop_palmetto/config dfs -cat /repository/movielens/ratings.csv 2>/dev/null \
    | head -n 2000 \
    | python ./codes/avgRatingMapper03.py \
    | grep Matrix \
    | sort \
    | python ./codes/avgRatingReducer01.py

Matrix Reloaded, The (2003)	3.3333333333333335	Action|Adventure|Sci-Fi|Thriller|IMAX
Matrix Revolutions, The (2003)	2.5	Action|Sci-Fi|Thriller
Matrix, The (1999)	4.0	Action|Sci-Fi|Thriller


In [25]:
# Manual calculation check via python
(4.0+1.0+5.0)/3

3.3333333333333335

#### Full execution on HDFS

In [27]:
!mapred --config ~/hadoop_palmetto/config streaming \
    -input /repository/movielens/ratings.csv \
    -output intro-to-hadoop/output-movielens-01 \
    -file ./codes/avgRatingMapper03.py \
    -mapper avgRatingMapper03.py \
    -file ./codes/avgRatingReducer01.py \
    -reducer avgRatingReducer01.py \

2020-11-04 12:03:07,912 WARN streaming.StreamJob: -file option is deprecated, please use generic option -files instead.
2020-11-04 12:03:08,206 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
packageJobJar: [./codes/avgRatingMapper03.py, ./codes/avgRatingReducer01.py] [/software/spackages/linux-centos8-x86_64/gcc-8.3.1/hadoop-3.2.1-lux74iyfcxxzatxh4em67xezesi4z4i4/share/hadoop/tools/lib/hadoop-streaming-3.2.1.jar] /local_scratch/pbs.828633.pbs02/streamjob1182406568942728074.jar tmpDir=null
2020-11-04 12:03:09,977 INFO client.RMProxy: Connecting to ResourceManager at node0297.palmetto.clemson.edu/10.125.2.38:8050
2020-11-04 12:03:10,371 INFO client.RMProxy: Connecting to ResourceManager at node0297.palmetto.clemson.edu/10.125.2.38:8050
2020-11-04 12:03:10,719 INFO mapreduce.JobResourceUploader: Disabling Erasure Coding for path: /tmp/hadoop-yarn/staging/jin6/.staging/job_1604507172335_0002
2020-11-04 12:03

#### 2.1.1 First Error!!!

Go back to the first few lines of the previously and look for the INFO line **Submitted application application_xxxx_xxxx**. Running the logs command of yarn with the provided application ID is a straightforward way to access all available log information for that application. The syntax to view yarn log is:

```
! yarn logs -applicationId APPLICATION_ID
```

In [None]:
# Run the yarn view log command here
# Do not run this command in a notebook browser, it will likely crash the browser
#!yarn logs -applicationId application_1476193845089_0123

However, this information is often massive, as it contains the aggregated logs from all tasks (map and reduce) of the job, which can be in the hundreds. The example below demonstrates this problem by displaying all the possible information of a single-task MapReduce job.
In this example, the log of a container has three types of log (LogType): 
- stderr: Error messages from the actual task execution
- stdout: Print out messages if the task includes them
- syslog: Logging messages from the Hadoop MapReduce operation

One approach to reduce the number of possible output is to comment out all non-essential lines (lines containing **INFO**)

In [None]:
!yarn logs -applicationId application_1505269880969_0056 | grep -v INFO

2020-11-04 12:05:02,641 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2020-11-04 12:05:02,816 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
2020-11-04 12:05:04,363 INFO ipc.Client: Retrying connect to server: 0.0.0.0/0.0.0.0:8032. Already tried 0 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000 MILLISECONDS)
2020-11-04 12:05:05,367 INFO ipc.Client: Retrying connect to server: 0.0.0.0/0.0.0.0:8032. Already tried 1 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000 MILLISECONDS)
2020-11-04 12:05:06,370 INFO ipc.Client: Retrying connect to server: 0.0.0.0/0.0.0.0:8032. Already tried 2 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000 MILLISECONDS)
2020-11-04 12:05:07,373 INFO ipc.Client: Retrying connect to server: 0.0.0.0/0.0.0.0:8032. Already tried 3 time(s); re

Can we refine the information further:
- In a MapReduce setting, containers (often) execute the same task.
- Can we extract only message listing the Container IDs?

~~~
!yarn logs -applicationId APPLICATION_ID | grep '^Container:'
~~~

In [None]:
!yarn logs -applicationId application_1505269880969_0056 | grep '^Container:'

Looking at the previous report, we can further identify container information:

```
Container: container_XXXXXX on  YYYY.palmetto.clemson.edu_ZZZZZ
```

- Container ID: container_XXXXXX
- Address of node where container is placed: YYYY.palmetto.clemson.edu

To request yarn to provide a more detailed log at container level, we run:
```
!yarn logs -applicationId APPLICATION_ID -containerId CONTAINER_ID --nodeAddress NODE_ADDRESS \
    | grep -v INFO
```

In [None]:
!yarn logs -applicationId application_1505269880969_0056 \
    -containerId container_e30_1505269880969_0056_01_000012 \
    --nodeAddress dsci035.palmetto.clemson.edu \
    | grep -v INFO

This error message gives us some insights into the mechanism of Hadoop MapReduce. 
- Where are the map and reduce python scripts located?
- Where would the *movies.csv* file be, if the *-file* flag is used to upload this file?

In [None]:
%%writefile codes/avgRatingMapper04.py
#!/usr/bin/env python

import sys
import csv

movieFile = "./movies.csv"
movieList = {}

with open(movieFile, mode = 'r') as infile:
    reader = csv.reader(infile)
    for row in reader:
        movieList[row[0]] = {}
        movieList[row[0]]["title"] = row[1]
        movieList[row[0]]["genre"] = row[2]

for oneMovie in sys.stdin:
    oneMovie = oneMovie.strip()
    ratingInfo = oneMovie.split(",")
    try:
        movieTitle = movieList[ratingInfo[1]]["title"]
        movieGenre = movieList[ratingInfo[1]]["genre"]
        rating = float(ratingInfo[2])
        print ("%s\t%s\t%s" % (movieTitle, rating, movieGenre))
    except ValueError:
        continue

In [None]:
!yarn jar /usr/hdp/current/hadoop-mapreduce-client/hadoop-streaming.jar \
    -input /repository/movielens/ratings.csv \
    -output intro-to-hadoop/output-movielens-01 \
    -file ./codes/avgRatingMapper04.py \
    -mapper avgRatingMapper04.py \
    -file ./codes/avgRatingReducer01.py \
    -reducer avgRatingReducer01.py \
    -file ./movielens/movies.csv

#### 2.1.2 Second Error!!!

- HDFS is read only. Therefore, all output directories must not have existed prior to job submission
- This can be resolved either by specifying a new output directory or deleting the existing output directory

In [None]:
!yarn jar /usr/hdp/current/hadoop-mapreduce-client/hadoop-streaming.jar \
    -input /repository/movielens/ratings.csv \
    -output intro-to-hadoop/output-movielens-02 \
    -file ./codes/avgRatingMapper04.py \
    -mapper avgRatingMapper04.py \
    -file ./codes/avgRatingReducer01.py \
    -reducer avgRatingReducer01.py \
    -file ./movielens/movies.csv

In [None]:
!hdfs dfs -ls intro-to-hadoop/output-movielens-02

In [None]:
!hdfs dfs -cat intro-to-hadoop/output-movielens-02/part-00000 \
    2>/dev/null | head -n 20

### Challenge:

1. Modify *avgRatingReducer02.py* so that only movies with averaged ratings higher than 3.75 are collected
2. Further enhance your modification so that not only movies with averaged ratings higher than 3.75 are collected but these movies also need to be rated at least 5000 times. 

In [None]:
%%writefile codes/avgRatingMapper04challenge.py
#!/usr/bin/env python

import sys
import csv

movieFile = "./movies.csv"
movieList = {}


with open(movieFile, mode = 'r') as infile:
    reader = csv.reader(infile)
    for row in reader:
        movieList[row[0]] = {}
        movieList[row[0]]["title"] = row[1]
        movieList[row[0]]["genre"] = row[2]

for oneMovie in sys.stdin:
    oneMovie = oneMovie.strip()
    ratingInfo = oneMovie.split(",")
    try:
        movieTitle = movieList[ratingInfo[1]]["title"]
        movieGenre = movieList[ratingInfo[1]]["genre"]
        rating = float(ratingInfo[2])
        if _________:
            print ("%s\t%s\t%s" % (movieTitle, rating, movieGenre))
    except ValueError:
        continue

In [None]:
!yarn jar /usr/hdp/current/hadoop-mapreduce-client/hadoop-streaming.jar \
    -input /repository/movielens/ratings.csv \
    -output intro-to-hadoop/output-movielens-challenge \
    -file ____________ \
    -mapper ___________ \
    -file ./codes/avgRatingReducer01.py \
    -reducer avgRatingReducer01.py \
    -file ./codes/movielens/movies.csv