# <center> Introduction to Hadoop MapReduce </center>

## 3. Optimization

First principle of optimizing Hadoop workflow: **Reduce data movement in the shuffle phase**

In [None]:
!hdfs dfs -rm -r intro-to-hadoop/output-movielens-02
!yarn jar /usr/hdp/current/hadoop-mapreduce-client/hadoop-streaming.jar \
    -input /repository/movielens/ratings.csv \
    -output intro-to-hadoop/output-movielens-02 \
    -file ./codes/avgRatingMapper04.py \
    -mapper avgRatingMapper04.py \
    -file ./codes/avgRatingReducer01.py \
    -reducer avgRatingReducer01.py \
    -file ./movielens/movies.csv

- What is being passed from Map to Reduce?
- Can reducer do the same thing as mapper, that is, to load in external data?
- If we load external data on the reduce side, do we need to do so on the map side?

In [None]:
%%writefile codes/avgRatingReducer02.py
#!/usr/bin/env python
import sys
import csv

movieFile = "./movies.csv"
movieList = {}

with open(movieFile, mode = 'r') as infile:
    reader = csv.reader(infile)
    for row in reader:
        movieList[row[0]] = {}
        movieList[row[0]]["title"] = row[1]
        movieList[row[0]]["genre"] = row[2]

current_movie = None
current_rating_sum = 0
current_rating_count = 0

for line in sys.stdin:
    line = line.strip()
    movie, rating = line.split("\t", 1)
    try:
        rating = float(rating)
    except ValueError:
        continue

    if current_movie == movie:
        current_rating_sum += rating
        current_rating_count += 1
    else:
        if current_movie:
            rating_average = current_rating_sum / current_rating_count
            movieTitle = movieList[current_movie]["title"]
            movieGenres = movieList[current_movie]["genre"]
            print ("%s\t%s\t%s" % (movieTitle, rating_average, movieGenres))    
        current_movie = movie
        current_rating_sum = rating
        current_rating_count = 1

if current_movie == movie:
    rating_average = current_rating_sum / current_rating_count
    movieTitle = movieList[current_movie]["title"]
    movieGenres = movieList[current_movie]["genre"]
    print ("%s\t%s\t%s" % (movieTitle, rating_average, movieGenres))

In [None]:
!hdfs dfs -rm -r intro-to-hadoop/output-movielens-03
!yarn jar /usr/hdp/current/hadoop-mapreduce-client/hadoop-streaming.jar \
    -input /repository/movielens/ratings.csv \
    -output intro-to-hadoop/output-movielens-03 \
    -file ./codes/avgRatingMapper02.py \
    -mapper avgRatingMapper02.py \
    -file ./codes/avgRatingReducer02.py \
    -reducer avgRatingReducer02.py \
    -file ./movielens/movies.csv

In [None]:
!hdfs dfs -ls intro-to-hadoop/output-movielens-02
!hdfs dfs -ls intro-to-hadoop/output-movielens-03

In [None]:
!hdfs dfs -cat intro-to-hadoop/output-movielens-03/part-00000 \
    2>/dev/null | head -n 10

How does the number shuffle bytes in this example compare to the previous example?

#### Find genres which have the highest average ratings over the years

Common optimization approaches:

1. In-mapper reduction of key/value pairs
2. Additional combiner function

In [None]:
%%writefile codes/avgGenreMapper01.py
#!/usr/bin/env python
import sys
import csv

# for nonHDFS run
movieFile = "./movielens/movies.csv"

# for HDFS run
#movieFile = "./movies.csv"
movieList = {}

with open(movieFile, mode = 'r') as infile:
    reader = csv.reader(infile)
    for row in reader:
        movieList[row[0]] = {}
        movieList[row[0]]["title"] = row[1]
        movieList[row[0]]["genre"] = row[2]

for oneMovie in sys.stdin:
    oneMovie = oneMovie.strip()
    ratingInfo = oneMovie.split(",")
    try:
        genreList = movieList[ratingInfo[1]]["genre"]
        rating = float(ratingInfo[2])
        for genre in genreList.split("|"):
            print ("%s\t%s" % (genre, rating))
    except ValueError:
        continue

In [None]:
%%writefile codes/avgGenreReducer01.py
#!/usr/bin/env python
import sys
import csv
import json

current_genre = None
current_rating_sum = 0
current_rating_count = 0

for line in sys.stdin:
    line = line.strip()
    genre, rating = line.split("\t", 1)

    if current_genre == genre:
        try:
            current_rating_sum += float(rating)
            current_rating_count += 1
        except ValueError:
            continue    
    else:
        if current_genre:
            rating_average = current_rating_sum / current_rating_count
            print ("%s\t%s" % (current_genre, rating_average))    
        current_genre = genre
        try:
            current_rating_sum = float(rating)
            current_rating_count = 1
        except ValueError:
            continue

if current_genre == genre:
    rating_average = current_rating_sum / current_rating_count
    print ("%s\t%s" % (current_genre, rating_average))

In [None]:
!hdfs dfs -rm -r intro-to-hadoop/output-movielens-04
!yarn jar /usr/hdp/current/hadoop-mapreduce-client/hadoop-streaming.jar \
    -input /repository/movielens/ratings.csv \
    -output intro-to-hadoop/output-movielens-04 \
    -file ./codes/avgGenreMapper01.py \
    -mapper avgGenreMapper01.py \
    -file ./codes/avgGenreReducer01.py \
    -reducer avgGenreReducer01.py \
    -file ./movielens/movies.csv

In [None]:
!hdfs dfs -ls intro-to-hadoop/output-movielens-04

In [None]:
!hdfs dfs -cat intro-to-hadoop/output-movielens-04/part-00000

#### 2.2.1 Optimization through in-mapper reduction of Key/Value pairs

In [None]:
!hdfs dfs -cat /repository/movielens/ratings.csv 2>/dev/null \
    | head -n 10

In [None]:
!hdfs dfs -cat /repository/movielens/ratings.csv 2>/dev/null \
    | head -n 10 \
    | python ./codes/avgGenreMapper01.py \

In [None]:
%%writefile codes/avgGenreMapper02.py
#!/usr/bin/env python

import sys
import csv
import json

# for nonHDFS run
# movieFile = "./movielens/movies.csv"

# for HDFS run
movieFile = "./movies.csv"

movieList = {}
genreList = {}

with open(movieFile, mode = 'r') as infile:
    reader = csv.reader(infile)
    for row in reader:
        movieList[row[0]] = {}
        movieList[row[0]]["title"] = row[1]
        movieList[row[0]]["genre"] = row[2]

for oneMovie in sys.stdin:
    oneMovie = oneMovie.strip()
    ratingInfo = oneMovie.split(",")
    try:
        genres = movieList[ratingInfo[1]]["genre"]
        rating = float(ratingInfo[2])
        for genre in genres.split("|"):
            if genre in genreList:
                genreList[genre]["total_rating"] += rating
                genreList[genre]["total_count"] += 1
            else:
                genreList[genre] = {}
                genreList[genre]["total_rating"] = rating
                genreList[genre]["total_count"] = 1
    except ValueError:
        continue
        
for genre in genreList:
    print ("%s\t%s" % (genre, json.dumps(genreList[genre])))

In [None]:
!hdfs dfs -cat /repository/movielens/ratings.csv 2>/dev/null \
    | head -n 10 \
    | python ./codes/avgGenreMapper02.py \

In [None]:
%%writefile codes/avgGenreReducer02.py
#!/usr/bin/env python
import sys
import csv
import json

current_genre = None
current_rating_sum = 0
current_rating_count = 0

for line in sys.stdin:
    line = line.strip()
    genre, ratingString = line.split("\t", 1)
    ratingInfo = json.loads(ratingString)

    if current_genre == genre:
        try:
            current_rating_sum += ratingInfo["total_rating"]
            current_rating_count += ratingInfo["total_count"]
        except ValueError:
            continue    
    else:
        if current_genre:
            rating_average = current_rating_sum / current_rating_count
            print ("%s\t%s" % (current_genre, rating_average))    
        current_genre = genre
        try:
            current_rating_sum = ratingInfo["total_rating"]
            current_rating_count = ratingInfo["total_count"]
        except ValueError:
            continue

if current_genre == genre:
    rating_average = current_rating_sum / current_rating_count
    print ("%s\t%s" % (current_genre, rating_average))

In [None]:
!hdfs dfs -cat /repository/movielens/ratings.csv 2>/dev/null \
    | head -n 10 \
    | python ./codes/avgGenreMapper02.py \
    | sort \
    | python ./codes/avgGenreReducer02.py

In [None]:
# make sure that the path to movies.csv is correct inside avgGenreMapper02.py
!hdfs dfs -rm -R intro-to-hadoop/output-movielens-05
!yarn jar /usr/hdp/current/hadoop-mapreduce-client/hadoop-streaming.jar \
    -input /repository/movielens/ratings.csv \
    -output intro-to-hadoop/output-movielens-05 \
    -file ./codes/avgGenreMapper02.py \
    -mapper avgGenreMapper02.py \
    -file ./codes/avgGenreReducer02.py \
    -reducer avgGenreReducer02.py \
    -file ./movielens/movies.csv

In [None]:
!hdfs dfs -cat intro-to-hadoop/output-movielens-05/part-00000

In [None]:
!hdfs dfs -cat intro-to-hadoop/output-movielens-04/part-00000

**How different are the number of shuffle bytes between the two jobs?**

#### 2.2.2 Optimization through combiner function

In [4]:
!hdfs dfs -ls /repository/

Found 16 items
-rw-r--r--   2 lngo   hdfs-user       1034 2016-11-18 08:04 /repository/.pysparkrc
drwxr-xr-x   - lngo   hdfs-user          0 2017-09-14 09:23 /repository/airlines
-rw-r--r--   2 lngo   hdfs-user 2383967007 2016-11-29 21:31 /repository/bigdata-workshop.tgz
drwxr-xr-x   - denton hdfs-user          0 2017-10-11 09:15 /repository/chicago_data
-rw-r--r--   2 lngo   hdfs-user    5590193 2016-03-22 14:09 /repository/complete-shakespeare.txt
drwxr-xr-x   - denton hdfs-user          0 2016-11-02 08:16 /repository/cypress-pyspark-kernel
drwxr-xr-x   - lngo   hdfs-user          0 2016-02-03 10:17 /repository/gtrace
drwxr-xr-x   - lngo   hdfs-user          0 2017-05-23 08:40 /repository/halvade
-rw-r--r--   2 lngo   hdfs-user 2580196770 2017-03-16 06:02 /repository/intro-to-hadoop.tgz
-rw-r--r--   2 denton hdfs-user      34590 2016-12-01 09:31 /repository/intro-to-pyspark.ipynb
-rw-r--r--   2 lngo   hdfs-user 2775356893 2017-04-04 14:55 /repository/intro-to-spark-palmett

In [5]:
!yarn jar /usr/hdp/current/hadoop-mapreduce-client/hadoop-streaming.jar \
    -input /repository/complete-shakespeare.txt \
    -output intro-to-hadoop/output-wordcount-01 \
    -file ./codes/wordcountMapper.py \
    -mapper wordcountMapper.py \
    -file ./codes/wordcountReducer.py \
    -reducer wordcountReducer.py

17/10/11 12:22:53 WARN streaming.StreamJob: -file option is deprecated, please use generic option -files instead.
packageJobJar: [./codes/wordcountMapper.py, ./codes/wordcountReducer.py] [/usr/hdp/2.6.0.3-8/hadoop-mapreduce/hadoop-streaming-2.7.3.2.6.0.3-8.jar] /hadoop_java_io_tmpdir/streamjob2064724442186879809.jar tmpDir=null
17/10/11 12:22:55 INFO client.AHSProxy: Connecting to Application History server at dscim003.palmetto.clemson.edu/10.125.8.215:10200
17/10/11 12:22:55 INFO client.AHSProxy: Connecting to Application History server at dscim003.palmetto.clemson.edu/10.125.8.215:10200
17/10/11 12:22:55 INFO hdfs.DFSClient: Created HDFS_DELEGATION_TOKEN token 14512 for lngo on ha-hdfs:dsci
17/10/11 12:22:55 INFO security.TokenCache: Got dt for hdfs://dsci; Kind: HDFS_DELEGATION_TOKEN, Service: ha-hdfs:dsci, Ident: (HDFS_DELEGATION_TOKEN token 14512 for lngo)
17/10/11 12:22:56 INFO lzo.GPLNativeCodeLoader: Loaded native gpl library
17/10/11 12:22:56 INFO lzo.LzoCodec: Successfully lo

In [6]:
!yarn jar /usr/hdp/current/hadoop-mapreduce-client/hadoop-streaming.jar \
    -input /repository/complete-shakespeare.txt \
    -output intro-to-hadoop/output-wordcount-02 \
    -file ./codes/wordcountMapper.py \
    -mapper wordcountMapper.py \
    -file ./codes/wordcountReducer.py \
    -reducer wordcountReducer.py \
    -combiner wordcountReducer.py

17/10/11 12:23:28 WARN streaming.StreamJob: -file option is deprecated, please use generic option -files instead.
packageJobJar: [./codes/wordcountMapper.py, ./codes/wordcountReducer.py] [/usr/hdp/2.6.0.3-8/hadoop-mapreduce/hadoop-streaming-2.7.3.2.6.0.3-8.jar] /hadoop_java_io_tmpdir/streamjob3330378274220223963.jar tmpDir=null
17/10/11 12:23:29 INFO client.AHSProxy: Connecting to Application History server at dscim003.palmetto.clemson.edu/10.125.8.215:10200
17/10/11 12:23:30 INFO client.AHSProxy: Connecting to Application History server at dscim003.palmetto.clemson.edu/10.125.8.215:10200
17/10/11 12:23:30 INFO hdfs.DFSClient: Created HDFS_DELEGATION_TOKEN token 14515 for lngo on ha-hdfs:dsci
17/10/11 12:23:30 INFO security.TokenCache: Got dt for hdfs://dsci; Kind: HDFS_DELEGATION_TOKEN, Service: ha-hdfs:dsci, Ident: (HDFS_DELEGATION_TOKEN token 14515 for lngo)
17/10/11 12:23:30 INFO lzo.GPLNativeCodeLoader: Loaded native gpl library
17/10/11 12:23:30 INFO lzo.LzoCodec: Successfully lo

In [7]:
%%writefile codes/avgGenreCombiner.py
#!/usr/bin/env python

import sys
import csv
import json

genreList = {}

for line in sys.stdin:
    line = line.strip()
    genre, ratingString = line.split("\t", 1)
    ratingInfo = json.loads(ratingString)

    if genre in genreList:
        genreList[genre]["total_rating"] += ratingInfo["total_rating"]
        genreList[genre]["total_count"] += ratingInfo["total_count"]
    else:
        genreList[genre] = {}
        genreList[genre]["total_rating"] = ratingInfo["total_rating"]
        genreList[genre]["total_count"] = 1

for genre in genreList:
    print ("%s\t%s" % (genre, json.dumps(genreList[genre])))

Overwriting codes/avgGenreCombiner.py


In [8]:
!hdfs dfs -rm -r intro-to-hadoop/output-movielens-06
!yarn jar /usr/hdp/current/hadoop-mapreduce-client/hadoop-streaming.jar \
    -input /repository/movielens/ratings.csv \
    -output intro-to-hadoop/output-movielens-06 \
    -file ./codes/avgGenreMapper02.py \
    -mapper avgGenreMapper02.py \
    -file ./codes/avgGenreReducer02.py \
    -reducer avgGenreReducer02.py \
    -file ./codes/avgGenreCombiner.py \
    -combiner avgGenreCombiner.py \
    -file ./movielens/movies.csv

17/10/11 12:28:37 INFO fs.TrashPolicyDefault: Moved: 'hdfs://dsci/user/lngo/intro-to-hadoop/output-movielens-06' to trash at: hdfs://dsci/user/lngo/.Trash/Current/user/lngo/intro-to-hadoop/output-movielens-06
17/10/11 12:28:39 WARN streaming.StreamJob: -file option is deprecated, please use generic option -files instead.
packageJobJar: [./codes/avgGenreMapper02.py, ./codes/avgGenreReducer02.py, ./codes/avgGenreCombiner.py, ./movielens/movies.csv] [/usr/hdp/2.6.0.3-8/hadoop-mapreduce/hadoop-streaming-2.7.3.2.6.0.3-8.jar] /hadoop_java_io_tmpdir/streamjob706582146786084890.jar tmpDir=null
17/10/11 12:28:40 INFO client.AHSProxy: Connecting to Application History server at dscim003.palmetto.clemson.edu/10.125.8.215:10200
17/10/11 12:28:40 INFO client.AHSProxy: Connecting to Application History server at dscim003.palmetto.clemson.edu/10.125.8.215:10200
17/10/11 12:28:41 INFO hdfs.DFSClient: Created HDFS_DELEGATION_TOKEN token 14539 for lngo on ha-hdfs:dsci
17/10/11 12:28:41 INFO security.Tok

**How different are the number of shuffle bytes between the two jobs?**