In [23]:
# Set up AWS S3 access credentials
AWS_BUCKET_NAME = "sam4651-movie-data"

In [2]:
# Convert csv file to Spark DataFrame
def loadDataFrame(fileName, fileSchema):
  return (spark.read.format("csv")
                    .schema(fileSchema)
                    .option("header", "true")
                    .option("mode", "DROPMALFORMED")
                    .csv("s3a://%s:%s@%s/%s" % (ACCESS_KEY, ENCODED_SECRET_KEY, AWS_BUCKET_NAME, fileName)))

In [24]:
# For AWS EMR
# Convert csv file to spark data frame
# INPUT: 
# fileName: the full file name(e.g. "file.csv"), 
# fileSchema: the schema (StructType Array with StructField)
# OUTPUT:
# Spark DataFrame
def loadDataFrame(fileName, fileSchema):
    return (spark.read.format("org.apache.spark.sql.execution.datasources.csv.CSVFileFormat")
                    .option("header", "true")
                    .option("mode", "DROPMALFORMED")
                    .schema(fileSchema)
                    .load("s3://%s/%s" % (AWS_BUCKET_NAME, fileName)))

In [25]:
from pyspark.sql.types import *

movieRatingSchema = StructType([
    StructField("userId", IntegerType(), True),
    StructField("movieId", IntegerType(), True),
    StructField("rating", FloatType(), True),
    StructField("timestamp", StringType(), True)])

movieSchema = StructType([
    StructField("movieId", IntegerType(), True),
    StructField("title", StringType(), True),
    StructField("genres", StringType(), True)])

smallMovieRatingsDF = loadDataFrame("ratings-small.csv", movieRatingSchema).cache()
smallMoviesDF = loadDataFrame("movies-small.csv", movieSchema).cache()

In [26]:
# Print out the DataFrame shcema, and a few lines as example
smallMovieRatingsDF.printSchema()
smallMovieRatingsDF.show(5)

smallMoviesDF.printSchema()
smallMoviesDF.show(5)

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: float (nullable = true)
 |-- timestamp: string (nullable = true)

+------+-------+------+----------+
|userId|movieId|rating| timestamp|
+------+-------+------+----------+
|     1|     31|   2.5|1260759144|
|     1|   1029|   3.0|1260759179|
|     1|   1061|   3.0|1260759182|
|     1|   1129|   2.0|1260759185|
|     1|   1172|   4.0|1260759205|
+------+-------+------+----------+
only showing top 5 rows

root
 |-- movieId: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- genres: string (nullable = true)

+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|
|      5|Fat

In [27]:
from pyspark.sql.functions import mean, min, max, stddev

# Data summary of the dataset
print "Number of ratings: %s" % (smallMovieRatingsDF.count())
print "Number of distinct users: %s" % (smallMovieRatingsDF.select('userId').distinct().count())
print "Number of distinct movies: %s" % (smallMovieRatingsDF.select('movieId').distinct().count())
smallMovieRatingsDF.select([mean('rating'), min('rating'), max('rating'), stddev('rating')]).show()
smallMovieRatingsDF.groupBy('rating').count().orderBy('rating').show()

Number of ratings: 100004
Number of distinct users: 671
Number of distinct movies: 9066
+-----------------+-----------+-----------+-------------------+
|      avg(rating)|min(rating)|max(rating)|stddev_samp(rating)|
+-----------------+-----------+-----------+-------------------+
|3.543608255669773|        0.5|        5.0| 1.0580641091070326|
+-----------------+-----------+-----------+-------------------+

+------+-----+
|rating|count|
+------+-----+
|   0.5| 1101|
|   1.0| 3326|
|   1.5| 1687|
|   2.0| 7271|
|   2.5| 4449|
|   3.0|20064|
|   3.5|10538|
|   4.0|28750|
|   4.5| 7723|
|   5.0|15095|
+------+-----+



In [28]:
# Partition the dataset into traning, validation and testing for cross-validation
(trainingSet, validationSet, testingSet) = smallMovieRatingsDF.randomSplit([0.6, 0.2, 0.2], seed=12345)
training = trainingSet.cache()
validation = validationSet.cache()
testing = testingSet.cache()

In [29]:
# Use ml instead of mlib for Dataframes
# http://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark.ml.recommendation.ALS
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql import Row

ranks = [2, 4, 8, 12, 16, 20, 24]
regParams = [0.01, 0.05, 0.1, 0.15, 0.2, 0.3]
minError = float('inf')
bestRank = -1
bestRegParam = -1
bestModel = None

# An RMSE evaluator using the rating and predicted rating columns
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")
# Initialize the ASL(Alternating Least Squares)
als = ALS(userCol = "userId", itemCol = "movieId", ratingCol = "rating", seed = 123)

for regParam in regParams:
  for rank in ranks:
    # Build the recommendation model using ALS on the training data
    als.setParams(rank = rank, regParam = regParam)
    model = als.fit(training)

    # Evaluate the model by computing the RMSE on the validation data
    predictions = model.transform(validation)
    predictions = predictions.dropna() # drop all NaN prediction value to ensure not to have NaN RMSE (due to SPARK-14489)
    error = evaluator.evaluate(predictions)
    
    if error < minError:
      bestRank = rank
      bestRegParam = regParam
      minError = error
      bestModel = model
    print 'For rank %s, regParams %s, the RMSE is %s' % (rank, regParam, error)

print("Best Rank = %s, Best regParam = %s, with RMSE = %s"  % (bestRank, bestRegParam, minError))

For rank 2, regParams 0.01, the RMSE is 1.03153158849
For rank 4, regParams 0.01, the RMSE is 1.1106769261
For rank 8, regParams 0.01, the RMSE is 1.22156636168
For rank 12, regParams 0.01, the RMSE is 1.30301752091
For rank 16, regParams 0.01, the RMSE is 1.34038879145
For rank 20, regParams 0.01, the RMSE is 1.36867623935
For rank 24, regParams 0.01, the RMSE is 1.3964271066
For rank 2, regParams 0.05, the RMSE is 0.969612344535
For rank 4, regParams 0.05, the RMSE is 0.994690520889
For rank 8, regParams 0.05, the RMSE is 1.03391732058
For rank 12, regParams 0.05, the RMSE is 1.05468292515
For rank 16, regParams 0.05, the RMSE is 1.04826527298
For rank 20, regParams 0.05, the RMSE is 1.04998009767
For rank 24, regParams 0.05, the RMSE is 1.05601495976
For rank 2, regParams 0.1, the RMSE is 0.945134847557
For rank 4, regParams 0.1, the RMSE is 0.945914517266
For rank 8, regParams 0.1, the RMSE is 0.953375330173
For rank 12, regParams 0.1, the RMSE is 0.955152351799
For rank 16, regPar

In [34]:
# After getting the best rank and RegParam, test the model on test dataset
predictions = bestModel.transform(testing)
predictions = predictions.dropna() # drop all NaN prediction value to ensure not to have NaN RMSE (due to SPARK-14489)
rmse = evaluator.evaluate(predictions)
print("The model had a RMSE of %s on test dataset"  % (rmse))

The model had a RMSE of 0.92277622142 on test dataset


In [35]:
# Train the full data set and calculate the time elapsed
MovieRatingsDF = loadDataFrame("ratings.csv", movieRatingSchema).cache()
MoviesDF = loadDataFrame("movies.csv", movieSchema).cache()

In [36]:
# Data summary of the full dataset on movie rating
print "Number of ratings: %s" % (MovieRatingsDF.count())
print "Number of distinct users: %s" % (MovieRatingsDF.select('userId').distinct().count())
print "Number of rated distinct movies: %s" % (MovieRatingsDF.select('movieId').distinct().count())
print "Total number of movies: %s" % (MoviesDF.select('movieId').count())

MovieRatingsDF.select([mean('rating'), min('rating'), max('rating'), stddev('rating')]).show()

print "Distribution of ratings:"
MovieRatingsDF.groupBy('rating').count().orderBy('rating').show()
RatingsCountGroupByMovieId = MovieRatingsDF.groupBy('movieId').count()
print "Average number of ratings per movie: %s" % (RatingsCountGroupByMovieId.select(mean('count')).first())

Number of ratings: 24404096
Number of distinct users: 259137
Number of rated distinct movies: 39443
Total number of movies: 40110
+------------------+-----------+-----------+-------------------+
|       avg(rating)|min(rating)|max(rating)|stddev_samp(rating)|
+------------------+-----------+-----------+-------------------+
|3.5265681220070597|        0.5|        5.0| 1.0642182955183275|
+------------------+-----------+-----------+-------------------+

Distribution of ratings:
+------+-------+
|rating|  count|
+------+-------+
|   0.5| 361006|
|   1.0| 807991|
|   1.5| 369879|
|   2.0|1681993|
|   2.5|1145168|
|   3.0|5011914|
|   3.5|2840765|
|   4.0|6620340|
|   4.5|1984982|
|   5.0|3580058|
+------+-------+

Average number of ratings per movie: 618.71804883


In [37]:
from time import time

als.setParams(rank = bestRank, regParam = bestRegParam)
print "Training full data set with Rank = %s, regParam = %s ..." % (bestRank, bestRegParam)

timeBegin = time()

model = als.fit(MovieRatingsDF) # use the full dataset for training

timeElapsed = time() - timeBegin

print "Final model trained in %s seconds" % round(timeElapsed, 2)

Training full data set with Rank = 4, regParam = 0.2 ...
Final model trained in 56.15 seconds


In [38]:
# Evaluate the performance of the final model with the testing data
predictions = model.transform(testing)
predictions = predictions.dropna() # drop all NaN prediction value to ensure not to have NaN RMSE (due to SPARK-14489)
rmse = evaluator.evaluate(predictions)
print("The final model had a RMSE of %s"  % (rmse))

The final model had a RMSE of 1.0618307465


In [39]:
from pyspark.sql.functions import lit
UserId = 1000
userWatchedList = MovieRatingsDF.filter(MovieRatingsDF.userId == UserId).join(MoviesDF, 'movieId').select(['movieId', 'userId', 'title', 'rating'])
watchedMovieList = []
for movie in userWatchedList.collect():
  watchedMovieList.append(movie.movieId)
print "User %s has watched and rated %s moive (sorted by rating):" % (UserId, len(watchedMovieList)) 
userWatchedList.orderBy('rating', ascending = False).show(20, False)

# find out the unwatched list and append with the userid
userUnwatchedList = MoviesDF.filter(MoviesDF.movieId.isin(watchedMovieList) == False).withColumn('userId', lit(UserId)).cache()
print "%s unwatched movie:" % (userUnwatchedList.count())
userUnwatchedList.show(20, False)

predictedMovies = model.transform(userUnwatchedList)
predictedMovies = predictedMovies.dropna().cache() # drop all NaN prediction value to ensure not to have NaN RMSE (due to SPARK-14489)


User 1000 has watched and rated 49 moive (sorted by rating):
+-------+------+--------------------------------------------+------+
|movieId|userId|title                                       |rating|
+-------+------+--------------------------------------------+------+
|7361   |1000  |Eternal Sunshine of the Spotless Mind (2004)|5.0   |
|8784   |1000  |Garden State (2004)                         |5.0   |
|2762   |1000  |Sixth Sense, The (1999)                     |5.0   |
|46578  |1000  |Little Miss Sunshine (2006)                 |5.0   |
|111362 |1000  |X-Men: Days of Future Past (2014)           |5.0   |
|1265   |1000  |Groundhog Day (1993)                        |5.0   |
|4878   |1000  |Donnie Darko (2001)                         |5.0   |
|92259  |1000  |Intouchables (2011)                         |5.0   |
|527    |1000  |Schindler's List (1993)                     |4.5   |
|60069  |1000  |WALL·E (2008)                               |4.5   |
|8972   |1000  |National Treasure (2004)  

In [40]:
print "Top 25 predicted movie with highest rating:"
top25Movies = predictedMovies.orderBy('prediction', ascending = False).show(25, False)

print "Top 25 commedy with highest rating:"
top25Comedy = predictedMovies.filter(predictedMovies.genres.like("%Comedy%")).orderBy('prediction', ascending = False).show(25, False)

Top 25 predicted movie with highest rating:
+-------+------------------------------------------------------+------------------+------+----------+
|movieId|title                                                 |genres            |userId|prediction|
+-------+------------------------------------------------------+------------------+------+----------+
|151989 |The Thorn (1971)                                      |Comedy            |1000  |6.502588  |
|105144 |Bare Knuckles (2010)                                  |Action|Drama      |1000  |6.0512533 |
|107252 |Island at War (2004)                                  |Drama|War         |1000  |5.862084  |
|107434 |Diplomatic Immunity (2009– )                          |Comedy            |1000  |5.862084  |
|162368 |The Russian Woodpecker (2015)                         |Documentary       |1000  |5.7818584 |
|156801 |The Magician (1898)                                   |Drama|Fantasy     |1000  |5.731187  |
|164937 |Love Is Blind (2013)         

In [41]:
N = 20
MovieWithLessThanNRatings = RatingsCountGroupByMovieId.filter('count <' + str(N))
print "Movies with less than %s rating count: %s" % (N, MovieWithLessThanNRatings.count())

movieToBeExcluded = []
for movie in MovieWithLessThanNRatings.collect():
  movieToBeExcluded.append(movie.movieId)
  
userUnwatchedListWithAtLeastNRatings = userUnwatchedList.filter(userUnwatchedList.movieId.isin(movieToBeExcluded) == False).cache()

Movies with less than 20 rating count: 23941


In [42]:
predictedMovies = model.transform(userUnwatchedListWithAtLeastNRatings)
predictedMovies = predictedMovies.dropna().cache() # drop all NaN prediction value to ensure not to have NaN RMSE (due to SPARK-14489)

In [43]:
print "Top 25 predicted movie with highest rating:"
top25Movies = predictedMovies.orderBy('prediction', ascending = False).show(25, False)

print "Top 25 commedy with highest rating:"
top25Comedy = predictedMovies.filter(predictedMovies.genres.like("%Comedy%")).orderBy('prediction', ascending = False).show(25, False)

print "Top 25 Science Fiction with highest rating:"
top25SciFi = predictedMovies.filter(predictedMovies.genres.like("%Sci-Fi%")).orderBy('prediction', ascending = False).show(25, False)

Top 25 predicted movie with highest rating:
+-------+----------------------------------------------------------+-------------------------+------+----------+
|movieId|title                                                     |genres                   |userId|prediction|
+-------+----------------------------------------------------------+-------------------------+------+----------+
|159817 |Planet Earth (2006)                                       |Documentary              |1000  |4.3610063 |
|142115 |The Blue Planet (2001)                                    |(no genres listed)       |1000  |4.24524   |
|140265 |George Carlin: Jammin' in New York (1992)                 |Comedy                   |1000  |4.203937  |
|858    |Godfather, The (1972)                                     |Crime|Drama              |1000  |4.1761856 |
|162376 |Stranger Things                                           |Drama                    |1000  |4.1760488 |
|160718 |Piper (2016)                               

In [44]:
# Save the trained model to S3
model.save("s3a://%s:%s@%s/%s" % (ACCESS_KEY, ENCODED_SECRET_KEY, AWS_BUCKET_NAME, "model"))

NameError: name 'ACCESS_KEY' is not defined

In [46]:
# Save the trained model to S3 for AWS EMR
model.save("s3://%s/%s" % (AWS_BUCKET_NAME, "model1"))

In [47]:
# Load model previous saved model
from pyspark.ml.recommendation import ALSModel
model = ALSModel.load("s3://%s/%s" % (AWS_BUCKET_NAME, "model1"))