In [1]:
import sys
import os
import urllib
from pyspark import SparkContext


In [2]:
sc = SparkContext()

# download the ratings.dat and movies.dat files from the mentioned internet site link in the comment and place these in the same folder as this ipynb code file resides. 


https://raw.githubusercontent.com/databricks/spark-training/master/data/movielens/medium/ratings.dat


https://raw.githubusercontent.com/databricks/spark-training/master/data/movielens/medium/movies.dat



In [3]:
numPartitions = 2


rawRatings = sc.textFile("ratings.dat").repartition(numPartitions)



rawMovies = sc.textFile("movies.dat")


In [4]:

def get_ratings_tuple(entry):
    """ Parse a line in the ratings dataset
    Args:
        entry (str): a line in the ratings dataset in the form of UserID::MovieID::Rating::Timestamp
    Returns:
        tuple: (UserID, MovieID, Rating)
    """
    items = entry.split('::')
    return int(items[0]), int(items[1]), float(items[2])

In [5]:
def get_movie_tuple(entry):
    """ Parse a line in the movies dataset
    Args:
        entry (str): a line in the movies dataset in the form of MovieID::Title::Genres
    Returns:
        tuple: (MovieID, Title)
    """
    items = entry.split('::')
    return int(items[0]), items[1]

In [6]:
ratingsRDD = rawRatings.map(get_ratings_tuple).cache()
moviesRDD = rawMovies.map(get_movie_tuple).cache()

In [7]:
ratingsCount = ratingsRDD.count()
moviesCount = moviesRDD.count()

In [8]:
print("There are {} ratings and {} movies in the datasets".format(ratingsCount, moviesCount))
print("Ratings: {}".format(ratingsRDD.take(3)))
print("Movies: {}".format(moviesRDD.take(3)))

There are 1000209 ratings and 3883 movies in the datasets
Ratings: [(1, 1193, 5.0), (1, 661, 3.0), (1, 914, 3.0)]
Movies: [(1, 'Toy Story (1995)'), (2, 'Jumanji (1995)'), (3, 'Grumpier Old Men (1995)')]


In [9]:
# First, implement a helper function `getCountsAndAverages` using only Python
def getCountsAndAverages(IDandRatingsTuple):
    """ Calculate average rating
    Args:
        IDandRatingsTuple: a single tuple of (MovieID, (Rating1, Rating2, Rating3, ...))
    Returns:
        tuple: a tuple of (MovieID, (number of ratings, averageRating))
    """
    result = (IDandRatingsTuple[0], (len(IDandRatingsTuple[1]), float(sum(IDandRatingsTuple[1])) / len(IDandRatingsTuple[1])))
    return result

In [10]:
#(Movies with Highest Average Ratings)
# From ratingsRDD with tuples of (UserID, MovieID, Rating) create an RDD with tuples of
# the (MovieID, iterable of Ratings for that MovieID)
movieIDsWithRatingsRDD = (ratingsRDD.map(lambda rating: (rating[1], rating[2])).groupByKey())
print("movieIDsWithRatingsRDD: {}\n".format(movieIDsWithRatingsRDD.take(3)))

movieIDsWithRatingsRDD: [(914, <pyspark.resultiterable.ResultIterable object at 0x0000003F8D003E80>), (3408, <pyspark.resultiterable.ResultIterable object at 0x0000003F8D003EB8>), (2804, <pyspark.resultiterable.ResultIterable object at 0x0000003F8D003F60>)]



In [11]:
# Using `movieIDsWithRatingsRDD`, compute the number of ratings and average rating for each movie to
# yield tuples of the form (MovieID, (number of ratings, average rating))
movieIDsWithAvgRatingsRDD = movieIDsWithRatingsRDD.map(getCountsAndAverages)
print("movieIDsWithAvgRatingsRDD: {}\n".format(movieIDsWithAvgRatingsRDD.take(3)))

movieIDsWithAvgRatingsRDD: [(914, (636, 4.154088050314465)), (3408, (1315, 3.863878326996198)), (2804, (1352, 4.238905325443787))]



In [12]:
# To `movieIDsWithAvgRatingsRDD`, apply RDD transformations that use `moviesRDD` to get the movie
# names for `movieIDsWithAvgRatingsRDD`, yielding tuples of the form
# (average rating, movie name, number of ratings)
movieNameWithAvgRatingsRDD = (moviesRDD.join(movieIDsWithAvgRatingsRDD).map(lambda movie: (movie[1][1][1], movie[1][0], movie[1][1][0])))
print("movieNameWithAvgRatingsRDD: {}\n".format(movieNameWithAvgRatingsRDD.take(3)))

movieNameWithAvgRatingsRDD: [(2.7294117647058824, 'Waiting to Exhale (1995)', 170), (3.014705882352941, 'Tom and Huck (1995)', 68), (2.3625, 'Dracula: Dead and Loving It (1995)', 160)]



In [13]:
# Movies with Highest Average Ratings and more than 500 reviews
# Apply an RDD transformation to `movieNameWithAvgRatingsRDD` to limit the results to movies with
# ratings from more than 500 people. We then use the `sortFunction()` helper function to sort by the
# average rating to get the movies in order of their rating (highest rating first)


rdd3 = movieNameWithAvgRatingsRDD.filter(lambda movie: movie[2] > 500)
rdd3_mapped = rdd3.map(lambda x: (x,1))
rdd3_grouped = rdd3_mapped.groupByKey()
movieSortedByRating = rdd3_grouped.mapValues(lambda x: x).sortByKey(ascending=False)

print("Movies with highest ratings: {} ".format(movieSortedByRating.take(20)))


Movies with highest ratings: [((4.560509554140127, 'Seven Samurai (The Magnificent Seven) (Shichinin no samurai) (1954)', 628), <pyspark.resultiterable.ResultIterable object at 0x0000003F8C9BF828>), ((4.554557700942973, 'Shawshank Redemption, The (1994)', 2227), <pyspark.resultiterable.ResultIterable object at 0x0000003F8C9BFAC8>), ((4.524966261808367, 'Godfather, The (1972)', 2223), <pyspark.resultiterable.ResultIterable object at 0x0000003F889499E8>), ((4.52054794520548, 'Close Shave, A (1995)', 657), <pyspark.resultiterable.ResultIterable object at 0x0000003F8C9BF5C0>), ((4.517106001121705, 'Usual Suspects, The (1995)', 1783), <pyspark.resultiterable.ResultIterable object at 0x0000003F8C948B00>), ((4.510416666666667, "Schindler's List (1993)", 2304), <pyspark.resultiterable.ResultIterable object at 0x0000003F8C948908>), ((4.507936507936508, 'Wrong Trousers, The (1993)', 882), <pyspark.resultiterable.ResultIterable object at 0x0000003F8D018048>), ((4.477724741447892, 'Raiders of the 