### Verify Spark Context object
Check whether spark context is available. If there is an exception, you have to troubleshoot. Otherwise, good to go ahead.

### Exercise 1:

1. Create a directory in HDFS (/user/cloudera/movielens/) called movielens and load movies.csv and ratings.csv from ~/Downloads/datasets/movie-lens directory.  
2. Load movies.csv into in moviesRdd
3. Load ratings.csv into ratingsRdd
4. Find out 10 top movies based on the highest average rating. Consider only those movies that have got at least 100 ratings.


In [9]:
movies = sc.textFile("/user/cloudera/movielens/movies.csv")
movies.count()

total 3424
-rw-r--r-- 1 cloudera cloudera  207997 Jul  2 20:49 links.csv
-rw-r--r-- 1 cloudera cloudera  515700 Jul  2 20:49 movies.csv
-rw-r--r-- 1 cloudera cloudera 2580392 Jul  2 20:49 ratings.csv
-rw-r--r-- 1 cloudera cloudera  199073 Jul  2 20:49 tags.csv


In [11]:
for r in movies.take(10):
    print(r)

movieId,title,genres
1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy
2,Jumanji (1995),Adventure|Children|Fantasy
3,Grumpier Old Men (1995),Comedy|Romance
4,Waiting to Exhale (1995),Comedy|Drama|Romance
5,Father of the Bride Part II (1995),Comedy
6,Heat (1995),Action|Crime|Thriller
7,Sabrina (1995),Comedy|Romance
8,Tom and Huck (1995),Adventure|Children
9,Sudden Death (1995),Action


In [12]:
moviesData = movies.filter(lambda line: not line.startswith("movieId"))
for r in moviesData.take(10):
    print(r)

1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy
2,Jumanji (1995),Adventure|Children|Fantasy
3,Grumpier Old Men (1995),Comedy|Romance
4,Waiting to Exhale (1995),Comedy|Drama|Romance
5,Father of the Bride Part II (1995),Comedy
6,Heat (1995),Action|Crime|Thriller
7,Sabrina (1995),Comedy|Romance
8,Tom and Huck (1995),Adventure|Children
9,Sudden Death (1995),Action
10,GoldenEye (1995),Action|Adventure|Thriller


In [13]:
ratings = sc.textFile("/user/cloudera/movielens/ratings")
ratings.count()

105340

In [14]:
for r in ratings.take(10):
    print(r)

userId,movieId,rating,timestamp
1,16,4.0,1217897793
1,24,1.5,1217895807
1,32,4.0,1217896246
1,47,4.0,1217896556
1,50,4.0,1217896523
1,110,4.0,1217896150
1,150,3.0,1217895940
1,161,4.0,1217897864
1,165,3.0,1217897135


In [15]:
ratingsData = ratings.filter(lambda line: not line.startswith("userId"))
for r in ratingsData.take(10):
    print(r)

1,16,4.0,1217897793
1,24,1.5,1217895807
1,32,4.0,1217896246
1,47,4.0,1217896556
1,50,4.0,1217896523
1,110,4.0,1217896150
1,150,3.0,1217895940
1,161,4.0,1217897864
1,165,3.0,1217897135
1,204,0.5,1217895786


In [16]:
movies.first()

'movieId,title,genres'

In [17]:
ratings.first()

'userId,movieId,rating,timestamp'

In [18]:
movies_by_movieid = moviesData\
.map(lambda line: line.split(","))\
.map(lambda tokens: (int(tokens[0]), tokens[1]))

movies_by_movieid.first()

(1, 'Toy Story (1995)')

In [19]:
ratings_by_movieid = ratingsData\
.map(lambda line: line.split(","))\
.map(lambda tokens: (int(tokens[1]), float(tokens[2])))

ratings_by_movieid.first()

(16, 4.0)

In [20]:
top_10 = movies_by_movieid.join(ratings_by_movieid)\
.map(lambda p: ((p[0], p[1][0]), p[1][1]))\
.groupByKey().mapValues(lambda values: (sum(values)/len(values), len(values)))\
.filter(lambda p: p[1][1] >= 100)\
.sortBy(lambda p: p[1], False)\
.take(10)

for m in top_10:
    print(m)

((318, '"Shawshank Redemption'), (4.454545454545454, 308))
((858, '"Godfather'), (4.392857142857143, 210))
((50, '"Usual Suspects'), (4.328947368421052, 228))
((1136, 'Monty Python and the Holy Grail (1975)'), (4.3019480519480515, 154))
((527, "Schindler's List (1993)"), (4.296370967741935, 248))
((1193, "One Flew Over the Cuckoo's Nest (1975)"), (4.2727272727272725, 143))
((608, 'Fargo (1996)'), (4.2711442786069655, 201))
((2571, '"Matrix'), (4.264367816091954, 261))
((1221, '"Godfather: Part II'), (4.260714285714286, 140))
((1213, 'Goodfellas (1990)'), (4.2592592592592595, 135))


In [21]:
movies_by_movieid.join(ratings_by_movieid)\
.first()


(4096, ('"Curse', 4.0))

In [22]:
movies_by_movieid.join(ratings_by_movieid)\
.map(lambda p: ((p[0], p[1][0]), p[1][1]))\
.first()


((4096, '"Curse'), 4.0)

In [23]:
top_10 = movies_by_movieid.join(ratings_by_movieid)\
.map(lambda p: ((p[0], p[1][0]), p[1][1]))\
.groupByKey().mapValues(lambda values: (sum(values)/len(values), len(values)))\
.filter(lambda p: p[1][1] >= 100)

top_10.first()

((1036, 'Die Hard (1988)'), (3.918181818181818, 165))

In [24]:
top_10 = movies_by_movieid.join(ratings_by_movieid)\
.map(lambda p: ((p[0], p[1][0]), p[1][1]))\
.groupByKey().mapValues(lambda values: (sum(values)/len(values), len(values)))\
.filter(lambda p: p[1][1] >= 100)\
.sortBy(lambda p: p[1], False)\

for m in top_10.take(10):
    print(m)

((318, '"Shawshank Redemption'), (4.454545454545454, 308))
((858, '"Godfather'), (4.392857142857143, 210))
((50, '"Usual Suspects'), (4.328947368421052, 228))
((1136, 'Monty Python and the Holy Grail (1975)'), (4.3019480519480515, 154))
((527, "Schindler's List (1993)"), (4.296370967741935, 248))
((1193, "One Flew Over the Cuckoo's Nest (1975)"), (4.2727272727272725, 143))
((608, 'Fargo (1996)'), (4.2711442786069655, 201))
((2571, '"Matrix'), (4.264367816091954, 261))
((1221, '"Godfather: Part II'), (4.260714285714286, 140))
((1213, 'Goodfellas (1990)'), (4.2592592592592595, 135))
