In [None]:
!pip install matplotlib pandas numpy seaborn

Collecting matplotlib
  Using cached matplotlib-3.4.3-cp38-cp38-manylinux1_x86_64.whl (10.3 MB)
Collecting pandas
  Using cached pandas-1.3.3-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (11.5 MB)
Collecting numpy
  Using cached numpy-1.21.2-cp38-cp38-manylinux_2_12_x86_64.manylinux2010_x86_64.whl (15.8 MB)
Collecting seaborn
  Using cached seaborn-0.11.2-py3-none-any.whl (292 kB)
Collecting kiwisolver>=1.0.1
  Using cached kiwisolver-1.3.2-cp38-cp38-manylinux_2_5_x86_64.manylinux1_x86_64.whl (1.2 MB)
Collecting pillow>=6.2.0
  Using cached Pillow-8.4.0-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (3.1 MB)
Collecting cycler>=0.10
  Using cached cycler-0.10.0-py2.py3-none-any.whl (6.5 kB)
Collecting pytz>=2017.3
  Using cached pytz-2021.3-py2.py3-none-any.whl (503 kB)
Collecting scipy>=1.0
  Using cached scipy-1.7.1-cp38-cp38-manylinux_2_5_x86_64.manylinux1_x86_64.whl (28.4 MB)
Installing collected packages: kiwisolver, numpy, pillow, cycler, matplotlib, pytz, pa

# Rating some movies
#### To make recommendation for you, we are going to learn your taste by asking you to rate a few movies.

In [None]:
import sys
import os
from os import remove
from time import time
from os.path import join, isfile, dirname

topMovies = """1,Toy Story (1995)
780,Independence Day (a.k.a. ID4) (1996)
590,Dances with Wolves (1990)
1210,Star Wars: Episode VI - Return of the Jedi (1983)
648,Mission: Impossible (1996)
344,Ace Ventura: Pet Detective (1994)
165,Die Hard: With a Vengeance (1995)
153,Batman Forever (1995)
597,Pretty Woman (1990)
1580,Men in Black (1997)
231,Dumb & Dumber (1994)"""

parentDir = os.path.abspath('')
ratingsFile = join(parentDir, "personalRatings.txt")

if isfile(ratingsFile):
    r = input("Looks like you've already rated the movies. Overwrite ratings (y/N)? ")
    if r and r[0].lower() == "y":
        remove(ratingsFile)
    else:
        sys.exit()

prompt = "Please rate the following movie (1-5 (best), or 0 if not seen): "
print(prompt)

now = int(time())
n = 0

f = open(ratingsFile, 'w')
for line in topMovies.split("\n"):
    ls = line.strip().split(",")
    valid = False
    while not valid:
        rStr = input(ls[1] + ": ")
        r = int(rStr) if rStr.isdigit() else -1
        if r < 0 or r > 5:
            print(prompt)
        else:
            valid = True
            if r > 0:
                f.write("0::%s::%d::%d\n" % (ls[0], r, now))
                n += 1
f.close()

if n == 0:
    print("No rating provided!")


# Solution Structure

In [None]:
# !/usr/bin/env python

import sys
import os
import itertools
from math import sqrt
from operator import add
from os.path import join, isfile, dirname
from pyspark.sql import SparkSession
#from pyspark.mllib.recommendation import ALS
from pyspark.ml.recommendation import ALS


def parseRating(line):
    """
    Parses a rating record in MovieLens format userId::movieId::rating::timestamp .
    """
    fields = line.strip().split("::")
    return int(fields[3]) % 10, (int(fields[0]), int(fields[1]), float(fields[2]))

def _parseRating(line):
    """
    Parses a rating record in MovieLens format userId::movieId::rating::timestamp .
    """
    fields = line.strip().split("::")
    return (int(fields[0]), int(fields[1]), float(fields[2]))

def parseMovie(line):
    """
    Parses a movie record in MovieLens format movieId::movieTitle .
    """
    fields = line.strip().split("::")
    return int(fields[0]), fields[1]

def loadRatings(ratingsFile):
    """
    Load ratings from file.
    """
    if not isfile(ratingsFile):
        print("File %s does not exist." % ratingsFile)
        sys.exit(1)
    f = open(ratingsFile, 'r')
    ratings = filter(lambda r: r[2] > 0, [parseRating(line)[1] for line in f])
    f.close()
    if not ratings:
        print("No ratings provided.")
        sys.exit(1)
    else:
        return ratings

def computeRmse(model, data, n):
    """
    Compute RMSE (Root Mean Squared Error).
    """
    predictions = model.predictAll(data.map(lambda x: (x[0], x[1])))
    predictionsAndRatings = predictions.map(lambda x: ((x[0], x[1]), x[2])) \
      .join(data.map(lambda x: ((x[0], x[1]), x[2]))) \
      .values()
    return sqrt(predictionsAndRatings.map(lambda x: (x[0] - x[1]) ** 2).reduce(add) / float(n))

if __name__ == "__main__":

    # set up environment
    spark = SparkSession.builder \
   .master("local") \
   .appName("Movie Recommendation Engine") \
   .config("spark.executor.memory", "1gb") \
   .getOrCreate()
   
    sc = spark.sparkContext
    
    movieLensHomeDir = os.path.abspath('') + '/movielens/medium/'

    # load personal ratings
    myRatings = loadRatings(os.path.abspath('personalRatings.txt'))
    myRatingsRDD = sc.parallelize(myRatings, 1)
    
    # load ratings and movie titles

    movieLensHomeDir = os.path.abspath('') + '/movielens/medium/'
    
    # ratings is an RDD of (last digit of timestamp, (userId, movieId, rating))
    ratings = sc.textFile(join(movieLensHomeDir, "ratings.dat")).map(_parseRating)

    # movies is an RDD of (movieId, movieTitle)
    movies = sc.textFile(join(movieLensHomeDir, "movies.dat")).map(parseMovie)

    # Casting the RDD into Dataframe and visualise data to check if the data exist. 
    myRatingsDF = myRatingsRDD.toDF(["userId", "movieId", "rating"])
    moviesDF = movies.toDF(['movieID', 'movieName'])
    ratingsDF = ratings.toDF(["userId", "movieId", "rating"])
    
    myRatingsDF.show()
    ratingsDF.show()
    moviesDF.show()

# Data Exploration

In [None]:
# Get an idea of the size of the dataset
from pyspark.sql.functions import countDistinct

nbOfMovies = moviesDF.count()
print("The database contains", nbOfMovies, "movies")

nbOfRatings = ratingsDF.count()
print("The database contains", nbOfRatings, "ratings")

nbOfUsers=ratingsDF.select(countDistinct("userId")).first()[0]
print("The database contains", nbOfUsers, "users")

In [None]:
# Years with the highest number of movies release in the DB
from pyspark.sql.functions import split, regexp_extract, col, length, substring
import seaborn as snsat 
import matplotlib.pyplot as plt

moviesDFyear = moviesDF.withColumn('year', regexp_extract(col('movieName'), '\(([^)]*)\)[^(]*$', 1))

moviesByYearPd = moviesDFyear.groupBy('year').count().toPandas()
moviesByYearPd = moviesByYearPd.sort_values(by=['year'], ascending = True)

plt.figure(figsize=(14,4))
ax = snsat.barplot(x="year", y="count", data=moviesByYearPd)
ax.set_xticklabels(ax.get_xticklabels(), rotation=90, ha="right")
plt.tight_layout()
plt.show()

In [None]:
# Movies with the most important number of ratings

movies_ratingsDF = ratingsDF.join(moviesDF, 'movieId')
moviesByNbRate = movies_ratingsDF.groupby('movieId').count()
moviesByNbRate = moviesByNbRate.join(moviesDF, ['movieId'], 'left')
moviesByNbRate = moviesByNbRate.sort(moviesByNbRate['count'].desc()).withColumnRenamed('count', 'NbRatings' )

moviesByNbRatePd = moviesByNbRate.toPandas()
moviesByNbRatePd = moviesByNbRatePd.sort_values(by=['NbRatings'], ascending = True)
moviesByNbRatePd = moviesByNbRatePd.nlargest(35, columns='NbRatings')

plt.figure(figsize=(14,8))
ax = snsat.barplot(y="NbRatings", x="movieName", data=moviesByNbRatePd)
ax.set_xticklabels(ax.get_xticklabels(), rotation=40, ha="right")
plt.tight_layout()
plt.show()

# Pre-Processing

In [None]:
# Check if the ratings are consistent (between 0 and 5)
ratingsDF = ratingsDF.filter((ratingsDF.rating <= 5) & (ratingsDF.rating >= 0) )
print("Number of ratings out of the range [0 - 5]:", nbOfRatings - ratingsDF.count())

In [None]:
# Check if there is not NaN value in the matrix rating
from pyspark.sql.functions import isnan, when, count, col
ratingsDF.select([count(when(isnan(c), c)).alias(c) for c in ratingsDF.columns]).show()

moviesDF.select([count(when(isnan(c), c)).alias(c) for c in moviesDF.columns]).show()

In [None]:
# Drop dublicate rows
ratingsDF = ratingsDF.dropDuplicates()
print("Number of duplicate row for the rating dataset:", nbOfRatings - ratingsDF.count())

moviesDF = moviesDF.dropDuplicates()
print("Number of duplicate row the movies dataset:", nbOfMovies - moviesDF.count())

# Implementation of the Machine Learning Prediction Part 1

In [None]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

In [None]:
# Create test and train set
(train, test) = ratingsDF.randomSplit([0.8, 0.2], seed = 5678)

In [None]:
# Create ALS model
als = ALS(maxIter=5, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="rating",
          coldStartStrategy="drop")

In [None]:
# Add my movies ratings to the training database
trainWithMyRatings = train.union(myRatingsDF)

# Check if the ratings were correctly added
trainWithMyRatings.filter(trainWithMyRatings.userId == 0).show()

In [None]:
# Train my model with my ratings
modelWithMyRatings = als.fit(trainWithMyRatings)

# Generate the predictions with the new model based on the test dataset
predictions = modelWithMyRatings.transform(test)

# Remove potential NaN number generated during the prediction
predictions = predictions.filter(predictions.prediction != float('nan'))

# Overview of the generated prediction
predictions.show(15)

In [None]:
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",
                                predictionCol="prediction")

# Run the RMSE evaluator to check to quality of the new model
rmse = evaluator.evaluate(predictions)
print("The RMSE is", rmse)

In [None]:
# Generate the the recommendations.
# We are only looking for the top 5 of my recommended movies. The function recommendForAllUsers(x) find the x best recommandations. So we can use 5 as a parameter.
myRecommendations = modelWithMyRatings.recommendForAllUsers(5)

In [None]:
from pyspark.sql.functions import explode

# Cast the predictions into a dataframme well structure.
myRecommendations = myRecommendations.withColumn("rec_exp", explode("recommendations")).select('userId', col("rec_exp.movieId"), col("rec_exp.rating"))

# My ratings correspond to the user 0 so I have to filter my dataframe on userId = 0. Then I use the movie Dataframe to find the name of the movies id.
myRecommendationsList =  myRecommendations.filter(myRecommendations.userId == 0).join(moviesDF, ['movieId'], 'left')

# Display my top 5 movies
myRecommendationsList =  myRecommendations.filter(myRecommendations.userId == 0).join(moviesDF, ['movieId'], 'left').sort('rating', ascending=False).collect()
for i in range(0, 5):
  print(i + 1, ":", myRecommendationsList[i][3])

# Implementation of the Machine Learning Prediction Part 2

In the part, we are going to try to generate a better ALS model. Using a list of parameters (instead of a set), we are going to try different tuning for our model, compare their RMSE and choose the most accurate model. By making more tries to find the best tuning, we hope generating a better model.

We will start this new part with the same train and test datasets

In [None]:
# Create ALS model
als2 = ALS(userCol="userId", itemCol="movieId", ratingCol="rating",
          coldStartStrategy="drop")


In [None]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator

# Giving a list of parameter for the tuning instead of set of parameter
param_grid = ParamGridBuilder() \
            .addGrid(als.rank, [4, 12, 30, 60]) \
            .addGrid(als.regParam, [.01, .04, .1, .14]) \
            .build()

# Create a cross validation. The CrossValidator object will allows us to compare the RMSE ("evaluator" parameter) of our als2 model for each set of parameter.
cv2 = CrossValidator(estimator=als2, estimatorParamMaps=param_grid, evaluator=evaluator, numFolds=5)

# There are 2 parameters with 4 possible values for each one, which means 16 possible combinaisons of parameters.
print ("Number of possible models to be tested: ", len(param_grid))

In [None]:
# Train the model using the CrossValidator Object and the train dataset
model2 = cv2.fit(trainWithMyRatings)

# Extract best model from the cv model above
best_model2 = model2.bestModel

print("Best Model parameter:")
print("  Rank:", best_model2._java_obj.parent().getRank())
print("  RegParam:", best_model2._java_obj.parent().getRegParam())

In [None]:
# Generate the predictions with the new model based on the test data
predictions2 = best_model2.transform(test)
predictions2.show()

In [None]:
# Run the previous RMSE evaluator to check to quality of the new model
rmse2 = evaluator.evaluate(predictions2)
print("The RMSE is", rmse2)

Using the same test dataset the RMSE lower than in the first part so our model is more accurate. Our recommendations are going to be more reliable than in the first part.

In [None]:
# Generate the the recommendations.
# We are only looking for the top 5 of my recommended movies. The function recommendForAllUsers(x) find the x best recommandations. So we can use 5 as a parameter.
myRecommendations2 = best_model2.recommendForAllUsers(5)

# Overview of the recommendations
myRecommendations2.show()

In [None]:
# Cast the predictions into a dataframme well structure.
myRecommendations2 = myRecommendations2.withColumn("rec_exp", explode("recommendations")).select('userId', col("rec_exp.movieId"), col("rec_exp.rating"))

# My ratings correspond to the user 0 so I have to filter my dataframe on userId = 0. Then I use the movie Dataframe to find the name of the movies id.
myRecommendations2List =  myRecommendations2.filter(myRecommendations2.userId == 0).join(moviesDF, ['movieId'], 'left')

# Display my top 5 movies
myRecommendations2List =  myRecommendations2.filter(myRecommendations2.userId == 0).join(moviesDF, ['movieId'], 'left').sort('rating', ascending=False).collect()
for i in range(0, 5):
  print(i + 1, ":", myRecommendations2List[i][3])

In [None]:
# clean up
sc.stop()