# <center> Introduction to Hadoop MapReduce -- Debugging </center>

## Example: Movie Ratings

** Data: Movie Ratings and Recommendation **

An independent movie company is looking to invest in a new movie project. With limited finance, the company wants to 
analyze the reaction of audiences, particularly toward various movie genres, in order to identify beneficial 
movie project to focus on. The company relies on data collected from a publicly available recommendation service 
by [MovieLens](http://dl.acm.org/citation.cfm?id=2827872). This 
[dataset](http://files.grouplens.org/datasets/movielens/ml-10m-README.html) contains **24404096** ratings and **668953**
 tag applications across **40110** movies. These data were created by **247753** users between January 09, 1995 and January 29, 2016. This dataset was generated on October 17, 2016. 

From this dataset, several analyses are possible, include the followings:
1.   Find movies which have the highest average ratings over the years and identify the corresponding genre.
2.   Find genres which have the highest average ratings over the years.
3.   Find users who rate movies most frequently in order to contact them for in-depth marketing analysis.

These types of analyses, which are somewhat ambiguous, demand the ability to quickly process large amount of data in 
elatively short amount of time for decision support purposes. In these situations, the sizes of the data typically 
make analysis done on a single machine impossible and analysis done using a remote storage system impractical. For 
remainder of the lessons, we will learn how HDFS provides the basis to store massive amount of data and to enable 
the programming approach to analyze these data.

In [1]:
!module add hdp

In [2]:
!cypress-kinit

In [3]:
!hdfs dfs -ls -h /repository/movielens

Found 7 items
-rw-r--r--   2 lngo hdfs-user      9.3 K 2017-03-15 09:49 /repository/movielens/README.txt
-rw-r--r--   2 lngo hdfs-user    317.9 M 2017-03-15 09:49 /repository/movielens/genome-scores.csv
-rw-r--r--   2 lngo hdfs-user     17.7 K 2017-03-15 09:49 /repository/movielens/genome-tags.csv
-rw-r--r--   2 lngo hdfs-user    839.2 K 2017-03-15 09:49 /repository/movielens/links.csv
-rw-r--r--   2 lngo hdfs-user      1.9 M 2017-03-15 09:49 /repository/movielens/movies.csv
-rw-r--r--   2 lngo hdfs-user    632.7 M 2017-03-15 09:49 /repository/movielens/ratings.csv
-rw-r--r--   2 lngo hdfs-user     22.9 M 2017-03-15 09:49 /repository/movielens/tags.csv


### Find movies which have the highest average ratings over the years and report their ratings and genres

- Find the average ratings of all movies over the years
- Sort the average ratings from highest to lowest
- Report the results, augmented by genres

In [5]:
%%writefile meanGenreMapper.py
#!/usr/bin/env python

import sys
import csv
import json

# for nonHDFS run
# movieFile = "./movielens/movies.csv"

# for HDFS run
movieFile = "./movies.csv"

movieList = {}
genreList = {}

with open(movieFile, mode = 'r') as infile:
    reader = csv.reader(infile)
    for row in reader:
        movieList[row[0]] = {}
        movieList[row[0]]["title"] = row[1]
        movieList[row[0]]["genre"] = row[2]

for oneMovie in sys.stdin:
    oneMovie = oneMovie.strip()
    ratingInfo = oneMovie.split(",")
    try:
        genres = movieList[ratingInfo[1]]["genre"]
        rating = float(ratingInfo[2])
        for genre in genres.split("|"):
            if genre in genreList:
                genreList[genre]["total_rating"] += rating
                genreList[genre]["total_count"] += 1
            else:
                genreList[genre] = {}
                genreList[genre]["total_rating"] = rating
                genreList[genre]["total_count"] = 1
    except ValueError:
        continue
        
for genre in genreList:
    print ("%s\t%s" % (genre, json.dumps(genreList[genre])))

Overwriting meanGenreMapper.py


In [6]:
!hdfs dfs -cat /repository/movielens/ratings.csv \
    2>/dev/null | head -n 5 | python meanGenreMapper.py | sort

Action	{"total_count": 1, "total_rating": 1.0}
Comedy	{"total_count": 2, "total_rating": 6.0}
Crime	{"total_count": 1, "total_rating": 5.0}
Drama	{"total_count": 1, "total_rating": 5.0}
Romance	{"total_count": 2, "total_rating": 6.0}
Sci-Fi	{"total_count": 1, "total_rating": 1.0}
Thriller	{"total_count": 1, "total_rating": 1.0}


In [7]:
%%writefile meanGenreReducer.py
#!/usr/bin/env python
import sys
import json

for line in sys.stdin:
    line = line.strip()
    genre, ratingString = line.split("\t", 1)
    ratingInfo = json.loads(ratingString)
    
    rating_sum = ratingInfo["total_rating"]
    rating_count = ratingInfo["total_count"]

    rating_average = rating_sum / rating_count
    print ("%s\t%s" % (genre, rating_average))

Overwriting meanGenreReducer.py


In [8]:
!hdfs dfs -cat /repository/movielens/ratings.csv 2>/dev/null \
    | head -n 1000 \
    | python meanGenreMapper.py \
    | sort \
    | python meanGenreReducer.py

Action	3.611111111111111
Adventure	3.6334745762711864
Animation	3.8511904761904763
Children	3.6458333333333335
Comedy	3.607769423558897
Crime	3.7708333333333335
Documentary	3.3333333333333335
Drama	3.759023354564756
Fantasy	3.6313559322033897
Film-Noir	4.1
Horror	3.1714285714285713
IMAX	3.9054054054054053
Musical	3.6475409836065573
Mystery	3.6339285714285716
Romance	3.7320143884892087
Sci-Fi	3.4507575757575757
Thriller	3.5379464285714284
War	3.8545454545454545
Western	3.5


In [9]:
!hdfs dfs -cat /repository/movielens/ratings.csv 2>/dev/null \
    | python meanGenreMapper.py \
    | sort \
    | python meanGenreReducer.py

Action	3.4544531514134182
Adventure	3.507091937178361
Animation	3.610498964247507
Children	3.4166406309798836
Comedy	3.4174603547910225
Crime	3.6785019629867692
Documentary	3.722772310125531
Drama	3.6742773734853547
Fantasy	3.5029912314315985
Film-Noir	3.9408055354000218
Horror	3.275260214307868
IMAX	3.637097658372336
Musical	3.5439135923140777
Mystery	3.6615097685590157
(no genres listed)	3.208014943114281
Romance	3.5424619995390905
Sci-Fi	3.455172838867968
Thriller	3.5126902934511794
War	3.8032667825565434
Western	3.571619485587279


In [20]:
%%writefile genreMapper.py
#!/usr/bin/env python

import sys
import csv
import json
import statistics

# for nonHDFS run
# movieFile = "./movielens/movies.csv"

# for HDFS run
movieFile = "./movies.csv"

movieList = {}
genreList = {}

with open(movieFile, mode = 'r') as infile:
    reader = csv.reader(infile)
    for row in reader:
        movieList[row[0]] = {}
        movieList[row[0]]["title"] = row[1]
        movieList[row[0]]["genre"] = row[2]

for oneMovie in sys.stdin:
    oneMovie = oneMovie.strip()
    ratingInfo = oneMovie.split(",")
    try:
        genres = movieList[ratingInfo[1]]["genre"]
        rating = float(ratingInfo[2])
        for genre in genres.split("|"):
            if genre not in genreList:
                genreList[genre] = []
            genreList[genre].append(rating)
    except ValueError:
        continue

genreInfoDict = {}        
for genre in genreList:
    genreInfoDict[genre] = {}
    genreInfoDict[genre]["mean"] = sum(genreList[genre]) / len(genreList[genre])
    genreInfoDict[genre]["median"] = statistics.median(genreList[genre])
    genreInfoDict[genre]["stdev"] = statistics.stdev(genreList[genre])
    print ("%s\t%s" % (genre, json.dumps(genreInfoDict[genre])))

Overwriting genreMapper.py


In [13]:
!hdfs dfs -cat /repository/movielens/ratings.csv 2>/dev/null \
    | head -n 1000 \
    | python genreMapper.py \
    | sort \

Action	{"median": 3.5, "stdev": 0.9462024864156431, "mean": 3.611111111111111}
Adventure	{"median": 4.0, "stdev": 0.8740844470801887, "mean": 3.6334745762711864}
Animation	{"median": 4.0, "stdev": 0.7354195667680543, "mean": 3.8511904761904763}
Children	{"median": 4.0, "stdev": 0.838655970905242, "mean": 3.6458333333333335}
Comedy	{"median": 3.5, "stdev": 0.8971841975811686, "mean": 3.607769423558897}
Crime	{"median": 4.0, "stdev": 0.882770194775071, "mean": 3.7708333333333335}
Documentary	{"median": 4.0, "stdev": 2.081665999466133, "mean": 3.3333333333333335}
Drama	{"median": 4.0, "stdev": 0.8805210627841876, "mean": 3.759023354564756}
Fantasy	{"median": 4.0, "stdev": 0.8732197178535702, "mean": 3.6313559322033897}
Film-Noir	{"median": 4.0, "stdev": 0.22360679774997896, "mean": 4.1}
Horror	{"median": 3.5, "stdev": 1.1627987101541413, "mean": 3.1714285714285713}
IMAX	{"median": 4.0, "stdev": 0.7438436013728311, "mean": 3.9054054054054053}
Musical	{"median": 4.0, "stdev": 1.

In [7]:
%%writefile medianGenreReducer.py
#!/usr/bin/env python
import sys
import statistics
import json

for line in sys.stdin:
    line = line.strip()
    genre, ratingString = line.split("\t", 1)
    ratingInfo = json.loads(ratingString)
    
    rating_median = ratingInfo["median"]
    print ("%s\t%s" % (genre, rating_median))    

Overwriting medianGenreReducer.py


In [8]:
!hdfs dfs -cat /repository/movielens/ratings.csv 2>/dev/null \
    | head -n 1000 \
    | python genreMapper.py \
    | sort \
    | python medianGenreReducer.py

Action	3.5
Adventure	4.0
Animation	4.0
Children	4.0
Comedy	3.5
Crime	4.0
Documentary	4.0
Drama	4.0
Fantasy	4.0
Film-Noir	4.0
Horror	3.5
IMAX	4.0
Musical	4.0
Mystery	3.5
Romance	4.0
Sci-Fi	3.5
Thriller	3.5
War	4.0
Western	3.5


In [9]:
!hdfs dfs -cat /repository/movielens/ratings.csv 2>/dev/null \
    | python genreMapper.py \
    | sort \
    | python medianGenreReducer.py

Action	3.5
Adventure	3.5
Animation	4.0
Children	3.5
Comedy	3.5
Crime	4.0
Documentary	4.0
Drama	4.0
Fantasy	3.5
Film-Noir	4.0
Horror	3.5
IMAX	4.0
Musical	4.0
Mystery	4.0
(no genres listed)	3.5
Romance	4.0
Sci-Fi	3.5
Thriller	3.5
War	4.0
Western	4.0


In [10]:
%%writefile stdevGenreReducer.py
#!/usr/bin/env python
import sys
import statistics
import json

for line in sys.stdin:
    line = line.strip()
    genre, ratingString = line.split("\t", 1)
    ratingInfo = json.loads(ratingString)
    
    rating_stdev = ratingInfo["stdev"]
    print ("%s\t%s" % (genre, rating_stdev))    

Overwriting stdevGenreReducer.py


In [11]:
!hdfs dfs -cat /repository/movielens/ratings.csv 2>/dev/null \
    | head -n 1000 \
    | python genreMapper.py \
    | sort \
    | python stdevGenreReducer.py

Action	0.9462024864156431
Adventure	0.8740844470801887
Animation	0.7354195667680543
Children	0.838655970905242
Comedy	0.8971841975811686
Crime	0.882770194775071
Documentary	2.081665999466133
Drama	0.8805210627841876
Fantasy	0.8732197178535702
Film-Noir	0.22360679774997896
Horror	1.1627987101541413
IMAX	0.7438436013728311
Musical	1.0462005157357184
Mystery	0.8338905063756132
Romance	0.8309992939269115
Sci-Fi	1.001639588098427
Thriller	0.8932418293648692
War	0.900990850000438
Western	1.0690449676496976


In [17]:
!hdfs dfs -cat /repository/movielens/ratings.csv 2>/dev/null \
    | python genreMapper.py \
    | sort \
    | python stdevGenreReducer.py

Action	1.0721365559782599
Adventure	1.0675220659603486
Animation	1.0317178473795843
Children	1.1010527199833091
Comedy	1.0850123560000309
Crime	1.0132314135781613
Documentary	1.0220074103047196
Drama	1.002524350894216
Fantasy	1.0867500247329882
Film-Noir	0.9154781611402477
Horror	1.1520617325923062
IMAX	1.0274731139134385
Musical	1.0627111413546169
Mystery	1.0119406716579022
(no genres listed)	1.2310507794221146
Romance	1.046780167005858
Sci-Fi	1.0916316918987354
Thriller	1.0398685086065347
War	0.9968902171996887
Western	1.0256165325514484


In [22]:
%%writefile genreReducer.py
#!/usr/bin/env python
import sys
import statistics
import json

print("Genre\t\tMean\t\tMedian\t\tStandard deviation")
for line in sys.stdin:
    line = line.strip()
    genre, ratingString = line.split("\t", 1)
    ratingInfo = json.loads(ratingString)
    
    rating_mean = ratingInfo["mean"]
    rating_median = ratingInfo["stdev"]
    rating_stdev = ratingInfo["stdev"]
    print ("%s\t\t%.4f\t\t%s\t\t%.4f" % (genre, rating_mean, rating_median, rating_stdev))  
    

Overwriting genreReducer.py


In [23]:
!hdfs dfs -cat /repository/movielens/ratings.csv 2>/dev/null \
    | head -n 1000 \
    | python genreMapper.py \
    | sort \
    | python genreReducer.py

Genre		Mean		Median		Standard deviation
Action		3.6111
Adventure		3.6335
Animation		3.8512
Children		3.6458
Comedy		3.6078
Crime		3.7708
Documentary		3.3333
Drama		3.7590
Fantasy		3.6314
Film-Noir		4.1000
Horror		3.1714
IMAX		3.9054
Musical		3.6475
Mystery		3.6339
Romance		3.7320
Sci-Fi		3.4508
Thriller		3.5379
War		3.8545
Western		3.5000


In [20]:
!hdfs dfs -cat /repository/movielens/ratings.csv 2>/dev/null \
    | python genreMapper.py \
    | sort \
    | python genreReducer.py

Genre		Mean		Median		Standard deviation
Action		3.4545		3.5		1.0721
Adventure		3.5071		3.5		1.0675
Animation		3.6105		4.0		1.0317
Children		3.4166		3.5		1.1011
Comedy		3.4175		3.5		1.0850
Crime		3.6785		4.0		1.0132
Documentary		3.7228		4.0		1.0220
Drama		3.6743		4.0		1.0025
Fantasy		3.5030		3.5		1.0868
Film-Noir		3.9408		4.0		0.9155
Horror		3.2753		3.5		1.1521
IMAX		3.6371		4.0		1.0275
Musical		3.5439		4.0		1.0627
Mystery		3.6615		4.0		1.0119
(no genres listed)		3.2080		3.5		1.2311
Romance		3.5425		4.0		1.0468
Sci-Fi		3.4552		3.5		1.0916
Thriller		3.5127		3.5		1.0399
War		3.8033		4.0		0.9969
Western		3.5716		4.0		1.0256


In [21]:
%%writefile userMapper.py
#!/usr/bin/env python

import sys
import csv
import json

movieFile = "movies.csv"
movieList = {}

with open(movieFile, mode = 'r') as infile:
    reader = csv.reader(infile)
    for row in reader:
        movieList[row[0]] = {}
        movieList[row[0]]["title"] = row[1]
        movieList[row[0]]["genre"] = row[2].strip().split("|")

userDict = {}
        
for oneMovie in sys.stdin:
    oneMovie = oneMovie.strip()
    ratingInfo = oneMovie.split(",")
    try:
        user = int(ratingInfo[0])
        movieTitle = movieList[ratingInfo[1]]["title"]
        movieGenre = movieList[ratingInfo[1]]["genre"]
        rating = float(ratingInfo[2])
        
        if user in userDict:
            userDict[user]["count"] += 1            
        else:
            userDict[user] = {}
            userDict[user]["count"] = 1
            userDict[user]["genre"] = {}
        for genre in movieGenre:
            if genre not in userDict[user]["genre"]:
                userDict[user]["genre"][genre] = 0
            userDict[user]["genre"][genre] += 1
    except ValueError:
        continue
        
for user in userDict:
    print("%s\t%s" % (user, json.dumps(userDict[user])))


Overwriting userMapper.py


In [22]:
!hdfs dfs -cat /repository/movielens/ratings.csv 2>/dev/null \
    | head -n 100 \
    | python userMapper.py \

1	{"count": 7, "genre": {"Crime": 1, "Thriller": 2, "Comedy": 2, "Sci-Fi": 1, "Romance": 2, "Horror": 1, "Action": 1, "Drama": 3}}
2	{"count": 10, "genre": {"Crime": 1, "Thriller": 4, "Comedy": 3, "Adventure": 2, "Romance": 2, "Sci-Fi": 1, "Mystery": 1, "Action": 4, "War": 1, "Drama": 4}}
3	{"count": 62, "genre": {"Crime": 8, "Comedy": 24, "Romance": 17, "Horror": 1, "Action": 16, "Fantasy": 6, "Drama": 27, "Western": 3, "Thriller": 26, "Mystery": 3, "Adventure": 17, "Children": 13, "IMAX": 3, "Sci-Fi": 3, "Animation": 7, "War": 2, "Musical": 8}}
4	{"count": 1, "genre": {"Comedy": 1, "Adventure": 1, "Children": 1, "Musical": 1}}
5	{"count": 19, "genre": {"Crime": 5, "Comedy": 8, "Romance": 2, "Action": 7, "Fantasy": 1, "Drama": 7, "Thriller": 7, "Mystery": 2, "Adventure": 8, "Film-Noir": 1, "Sci-Fi": 5, "War": 2, "Musical": 1}}


In [23]:
!hdfs dfs -cat /repository/movielens/ratings.csv 2>/dev/null \
    | head -n 100 \
    | python userMapper.py \
    | sort

1	{"genre": {"Horror": 1, "Action": 1, "Drama": 3, "Comedy": 2, "Sci-Fi": 1, "Romance": 2, "Crime": 1, "Thriller": 2}, "count": 7}
2	{"genre": {"Mystery": 1, "Action": 4, "Drama": 4, "Sci-Fi": 1, "Romance": 2, "Comedy": 3, "Crime": 1, "Adventure": 2, "War": 1, "Thriller": 4}, "count": 10}
3	{"genre": {"Mystery": 3, "Children": 13, "Drama": 27, "Musical": 8, "Animation": 7, "Crime": 8, "Adventure": 17, "IMAX": 3, "Thriller": 26, "War": 2, "Sci-Fi": 3, "Comedy": 24, "Fantasy": 6, "Romance": 17, "Western": 3, "Horror": 1, "Action": 16}, "count": 62}
4	{"genre": {"Children": 1, "Comedy": 1, "Adventure": 1, "Musical": 1}, "count": 1}
5	{"genre": {"Mystery": 2, "Action": 7, "Drama": 7, "Film-Noir": 1, "Crime": 5, "Adventure": 8, "War": 2, "Fantasy": 1, "Musical": 1, "Sci-Fi": 5, "Thriller": 7, "Romance": 2, "Comedy": 8}, "count": 19}


In [24]:
%%writefile userReducer.py
#!/usr/bin/env python

import sys
import csv
import json

mostRatingUser = None
mostRatingCount = 0
genreDict = None

for line in sys.stdin:
    line = line.strip()
    user, genreString = line.split("\t", 1)
    genreInfo = json.loads(genreString)

    if not mostRatingUser or genreInfo["count"] > mostRatingCount:
        mostRatingUser = user
        mostRatingCount = genreInfo["count"] 
        genreDict = genreInfo["genre"]
        
# print(genreDict)

mostRatedCount = 0
mostRatedGenre = None

for genre, count in genreDict.items():
    if count > mostRatedCount:
        mostRatedCount = count
        mostRatedGenre = genre
    
print("%s -- Total Rating Counts: %d -- Most Rated Genre: %s - %d" % (mostRatingUser, mostRatingCount, mostRatedGenre, mostRatedCount))

Overwriting userReducer.py


In [25]:
!hdfs dfs -cat /repository/movielens/ratings.csv 2>/dev/null \
    | head -n 1000 \
    | python userMapper.py \
    | sort \
    | python userReducer.py

18 -- Total Rating Counts: 252 -- Most Rated Genre: Drama - 145


In [26]:
!hdfs dfs -cat /repository/movielens/ratings.csv 2>/dev/null \
    | python userMapper.py \
    | sort \
    | python userReducer.py

186590 -- Total Rating Counts: 13250 -- Most Rated Genre: Drama - 8026


#### Full execution on HDFS

In [32]:
!hdfs dfs -rm -R ./output-meanGenre
!yarn jar /usr/hdp/current/hadoop-mapreduce-client/hadoop-streaming.jar \
    -input /repository/movielens/ratings.csv \
    -output ./output-meanGenre \
    -file ./meanGenreMapper.py \
    -mapper meanGenreMapper.py \
    -file ./meanGenreReducer.py \
    -reducer meanGenreReducer.py \
    -file ./movies.csv

rm: `./output-mean': No such file or directory
19/04/08 15:59:12 WARN streaming.StreamJob: -file option is deprecated, please use generic option -files instead.
packageJobJar: [./meanGenreMapper.py, ./meanGenreReducer.py, ./movies.csv] [/usr/hdp/2.6.5.0-292/hadoop-mapreduce/hadoop-streaming-2.7.3.2.6.5.0-292.jar] /hadoop_java_io_tmpdir/streamjob2141192298957538711.jar tmpDir=null
19/04/08 15:59:13 INFO client.AHSProxy: Connecting to Application History server at dscim003.palmetto.clemson.edu/10.125.8.215:10200
19/04/08 15:59:13 INFO client.AHSProxy: Connecting to Application History server at dscim003.palmetto.clemson.edu/10.125.8.215:10200
19/04/08 15:59:13 INFO hdfs.DFSClient: Created HDFS_DELEGATION_TOKEN token 22171 for yuzhey on ha-hdfs:dsci
19/04/08 15:59:13 INFO security.TokenCache: Got dt for hdfs://dsci; Kind: HDFS_DELEGATION_TOKEN, Service: ha-hdfs:dsci, Ident: (HDFS_DELEGATION_TOKEN token 22171 for yuzhey)
19/04/08 15:59:13 INFO lzo.GPLNativeCodeLoader: Loaded native gpl lib

In [33]:
!hdfs dfs -cat ./output-meanGenre/part-00000

(no genres listed)	3.33320433437
(no genres listed)	3.24496056091
(no genres listed)	3.13632514818
(no genres listed)	3.24122807018
(no genres listed)	3.06960352423
Action	3.44444281044
Action	3.45894409618
Action	3.45845824548
Action	3.45852368671
Action	3.45127443462
Adventure	3.49949764004
Adventure	3.51170577777
Adventure	3.50045914479
Adventure	3.51365006298
Adventure	3.50983028992
Animation	3.61306183751
Animation	3.60252051056
Animation	3.59798601506
Animation	3.61029521137
Animation	3.62827657548
Children	3.43086201442
Children	3.40942305911
Children	3.41414288185
Children	3.4076072136
Children	3.42058416433
Comedy	3.42433055487
Comedy	3.4145183972
Comedy	3.4240967791
Comedy	3.41294742788
Comedy	3.41103262748
Crime	3.67117984465
Crime	3.68252001502
Crime	3.68327146944
Crime	3.68051272167
Crime	3.67464524153
Documentary	3.70401121754
Documentary	3.72438481388
Documentary	3.7307135799
Documentary	3.73771762654
Documentary	3.71807109665
Drama	3.67062916815
Drama	3.67299605853
Dram

In [24]:
!hdfs dfs -rm -R ./output-genre
!yarn jar /usr/hdp/current/hadoop-mapreduce-client/hadoop-streaming.jar \
    -input /repository/movielens/ratings.csv \
    -output ./output-genre \
    -file ./genreMapper.py \
    -mapper genreMapper.py \
    -file ./genreReducer.py \
    -reducer genreReducer.py \
    -file ./movies.csv

19/04/09 13:31:53 INFO fs.TrashPolicyDefault: Moved: 'hdfs://dsci/user/yuzhey/output-genre' to trash at: hdfs://dsci/user/yuzhey/.Trash/Current/user/yuzhey/output-genre1554831113258
19/04/09 13:31:54 WARN streaming.StreamJob: -file option is deprecated, please use generic option -files instead.
packageJobJar: [./genreMapper.py, ./genreReducer.py, ./movies.csv] [/usr/hdp/2.6.5.0-292/hadoop-mapreduce/hadoop-streaming-2.7.3.2.6.5.0-292.jar] /hadoop_java_io_tmpdir/streamjob934711794381139792.jar tmpDir=null
19/04/09 13:31:55 INFO client.AHSProxy: Connecting to Application History server at dscim003.palmetto.clemson.edu/10.125.8.215:10200
19/04/09 13:31:55 INFO client.AHSProxy: Connecting to Application History server at dscim003.palmetto.clemson.edu/10.125.8.215:10200
19/04/09 13:31:55 INFO hdfs.DFSClient: Created HDFS_DELEGATION_TOKEN token 22226 for yuzhey on ha-hdfs:dsci
19/04/09 13:31:55 INFO security.TokenCache: Got dt for hdfs://dsci; Kind: HDFS_DELEGATION_TOKEN, Service: ha-hdfs:dsc

19/04/09 13:32:18 INFO mapreduce.Job: Task Id : attempt_1553277363335_0398_m_000000_1, Status : FAILED
Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
	at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:322)
	at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:535)
	at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
	at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
	at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
	at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:453)
	at org.apache.hadoop.mapred.MapTask.run(MapTask.java:343)
	at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:170)
	at java.security.AccessController.doPrivileged(Native Method)
	at javax.security.auth.Subject.doAs(Subject.java:422)
	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1869)
	at org.apache.hadoop.mapred.YarnCh

19/04/09 13:32:27 INFO mapreduce.Job: Counters: 18
	Job Counters 
		Failed map tasks=16
		Killed map tasks=4
		Killed reduce tasks=1
		Launched map tasks=20
		Other local map tasks=15
		Data-local map tasks=1
		Rack-local map tasks=4
		Total time spent by all maps in occupied slots (ms)=238470
		Total time spent by all reduces in occupied slots (ms)=0
		Total time spent by all map tasks (ms)=79490
		Total time spent by all reduce tasks (ms)=0
		Total vcore-milliseconds taken by all map tasks=79490
		Total vcore-milliseconds taken by all reduce tasks=0
		Total megabyte-milliseconds taken by all map tasks=1024785080
		Total megabyte-milliseconds taken by all reduce tasks=0
	Map-Reduce Framework
		CPU time spent (ms)=0
		Physical memory (bytes) snapshot=0
		Virtual memory (bytes) snapshot=0
19/04/09 13:32:27 ERROR streaming.StreamJob: Job not successful!
Streaming Command Failed!


In [29]:
!hdfs dfs -rm -R ./output-user
!yarn jar /usr/hdp/current/hadoop-mapreduce-client/hadoop-streaming.jar \
    -input /repository/movielens/ratings.csv \
    -output ./output-user \
    -file ./userMapper.py \
    -mapper userMapper.py \
    -file ./userReducer.py \
    -reducer userReducer.py \
    -file ./movies.csv

19/04/08 15:54:59 INFO fs.TrashPolicyDefault: Moved: 'hdfs://dsci/user/yuzhey/output-user' to trash at: hdfs://dsci/user/yuzhey/.Trash/Current/user/yuzhey/output-user
19/04/08 15:55:00 WARN streaming.StreamJob: -file option is deprecated, please use generic option -files instead.
packageJobJar: [./userMapper.py, ./userReducer.py, ./movies.csv] [/usr/hdp/2.6.5.0-292/hadoop-mapreduce/hadoop-streaming-2.7.3.2.6.5.0-292.jar] /hadoop_java_io_tmpdir/streamjob2279689388552816955.jar tmpDir=null
19/04/08 15:55:01 INFO client.AHSProxy: Connecting to Application History server at dscim003.palmetto.clemson.edu/10.125.8.215:10200
19/04/08 15:55:01 INFO client.AHSProxy: Connecting to Application History server at dscim003.palmetto.clemson.edu/10.125.8.215:10200
19/04/08 15:55:01 INFO hdfs.DFSClient: Created HDFS_DELEGATION_TOKEN token 22165 for yuzhey on ha-hdfs:dsci
19/04/08 15:55:01 INFO security.TokenCache: Got dt for hdfs://dsci; Kind: HDFS_DELEGATION_TOKEN, Service: ha-hdfs:dsci, Ident: (HDFS_

In [30]:
!hdfs dfs -cat ./output-user/part-00000

186590 -- Total Rating Counts: 13250 -- Most Rated Genre: Drama - 8026	
