### Demo Spark 1

In this demo, let's find all the movies with the lowest average rating. 

In the Data section of Databricks, upload the 2 files u.data and u.item if not done already. 
Note the path were the files have been saved in DBFS (e.g. "dbfs:/FileStore/tables/q52fkb8t1508007409762/u.item" or "dbfs:/FileStore/tables/u.item") and udpdate the code below.

Refer to the Introduction to Databricks notebook for more details about the Spark functions used in this notebook.

In [2]:
#List the path of the u.item and u.data

display(dbutils.fs.ls("dbfs:/FileStore/tables/"))

In [3]:
# Load u.item data
rawItem = sc.textFile("<FILL IN WITH U.ITEM PATH IN DBFS>/u.item")

# Load u.data data
rawData = sc.textFile("<FILL IN WITH U.DATA PATH IN DBFS>/u.data")


In [4]:
# rawItem records are populated like this:
# movie_id|title|release_date|imbd_link| ... and other flag we won't need
# 1|Toy Story (1995)|01-Jan-1995||http://us.imdb.com/M/title-exact?Toy%20Story%20(1995)|0|0|0|1|1|1|0|0|0|0|0|0|0|0|0|0|0|0|0
# The "|" means the data are pipe (|) delimited
rawItem.take(5)

In [5]:
# rawItem records are populated like this:
# user_id \t movie_id \t rating \t timestamp
# 196\t242\t3\t881250949
# The "\t" means the data are tab-delimited
rawData.take(5)

In [6]:
# Map rawItem data to (movie_id, title)
movieList = rawItem.map(lambda line: line.split("|")).map(lambda x: (int(x[0]), x[1]))

# Map our rawData to (movie_id, (rating, 1.0))
movieRatings = rawData.map(lambda line: line.split("\t")).map(lambda x: (int(x[1]), (int(x[2]), 1.0)))

In [7]:
# Our movieList RDD has now the following structure: (movie_id, title)
# It is a list of Tuples
movieList.take(5)

In [8]:
#Our movieRatings RDD has now the following structure: (movie_id, (rating, 1.0))
#It is a list of Tuples as well, with the second element of the Tuple being a Tuple itself: (rating, 1.0)
#1.0 has been artificially added for aggregating/count purposes in the next steps.
movieRatings.take(5)

In [9]:
#Reduce to (movie_id (sumOfRatings, totalRatings))
# The reduceByKey takes the first field of (movie_id, (rating, 1.0)) as the key, movie_id in this case. 
# The Lambda function of reduceByKey takes 2 arguments: movie1 and movie2. 
# movie1 and movie2 are records sharing the same key, so we are defining how we are combining records with same movie_id
# movie1 and movie2 are the second part of the Tuple: (movie_id (rating, 1.0)) (remember movie_id is our key used by the reduceByKey), so movie1 and movie2 are of the form: (rating, 1.0)
# To access rating and 1.0 separetely, we use the notation movie1[0] (for rating) and movie1[1] (for 1.0). Same applies to movie2.
# Since here we want the sum of all ratings and the total number of ratings, we can just sum up movie1[0]+movie2[0] and movie1[1]+movie2[1]), which gives the following line of code:
ratingTotalsAndCount = movieRatings.reduceByKey(lambda movie1, movie2 : (movie1[0]+movie2[0], movie1[1]+movie2[1]))


#Map to (movieID, AverageRating)
# Since we have a Tuple of the form (movie_id (sumOfRatings, totalRatings)), we can easily get the average rating per movie_id:
averageRatings = ratingTotalsAndCount.mapValues(lambda totalAndCount: totalAndCount[0] / totalAndCount[1])


In [10]:
#Sort by average Rating
# We have now a Tuple of the form: (movie_id, AverageRating), we can sort by AverageRating or x[1] if x is our input for the Lambda function, x[0] being the movie_id:
sortedMovies = averageRatings.sortBy(lambda x: x[1])

#Join with movieList 
# We can join our averageRatings RDD with the movieList RDD using JOIN.
# When called on RDDs of type (K, V) and (K, W), JOIN returns a dataset of (K, (V, W)) pairs with all pairs of elements for each key.
# averageRatings has the form: (movie_id, AverageRating) and movieList has the form: (movie_id, title)
joinRDD = averageRatings.join(movieList)

#joinRDD has the form: (movie_id,(AverageRating, title))
joinRDD.take(5)


In [11]:
#Keep title and average rating only
# joinRDD has the form: (movie_id,(AverageRating, title)), x[1] returns the Tuple (AverageRating, title) if x is the input of the Lambda function. 
listSorted = joinRDD.map(lambda x: (x[1]))

#Sort by average Rating
# listSorted has the form: (AverageRating, title), we can sort by AverageRating.
results = listSorted.sortBy(lambda x: x[0])
results.collect()
