In [1]:
# Import essential packages into python

import pandas as pd
from pyspark.sql.functions import col, explode
from pyspark import SparkContext


In [60]:
# Initiate the spark session

from pyspark.sql import SparkSession
sc = SparkContext

# sc.setCheckpointDir('checkpoint')

spark = SparkSession.builder.appName('Recommendations').getOrCreate()

# Retrieve the essential spark packages

import os
import pyspark
import random

import sys
import itertools
from math import sqrt
from operator import add
from os.path import join, isfile, dirname

from pyspark import SparkConf, SparkContext
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.sql import SparkSession
from pyspark.sql import functions as F


In [61]:
print('-------------------------- QUESTION 1 --------------------------')

-------------------------- QUESTION 1 --------------------------


In [52]:
print('-------------------------- QUESTION 2 --------------------------')

-------------------------- QUESTION 2 --------------------------


In [3]:
# import the csv into spark

movies = spark.read.csv("movies.csv", header = True)
ratings = spark.read.csv("ratings.csv", header = True)

In [9]:
# changing variables from strings to integer and floats.

ratings = ratings.\
    withColumn('userID', col('userID').cast('integer')).\
    withColumn('MovieID', col('MovieID').cast('integer')).\
    withColumn('rating', col('rating').cast('float')).\
    drop('timestamp')

ratings.show()

+------+-------+------+
|userID|MovieID|rating|
+------+-------+------+
|     1|   1193|   5.0|
|     1|    661|   3.0|
|     1|    914|   3.0|
|     1|   3408|   4.0|
|     1|   2355|   5.0|
|     1|   1197|   3.0|
|     1|   1287|   5.0|
|     1|   2804|   5.0|
|     1|    594|   4.0|
|     1|    919|   4.0|
|     1|    595|   5.0|
|     1|    938|   4.0|
|     1|   2398|   4.0|
|     1|   2918|   4.0|
|     1|   1035|   5.0|
|     1|   2791|   4.0|
|     1|   2687|   3.0|
|     1|   2018|   4.0|
|     1|   3105|   5.0|
|     1|   2797|   4.0|
+------+-------+------+
only showing top 20 rows



In [11]:
# Join both the data frames to add movie data into ratings data frame so we can see everything together

movie_ratings = ratings.join(movies, ['movieId'], 'left')
movie_ratings.show()

+-------+------+------+--------------------+--------------------+
|MovieID|userID|rating|               Title|              Genres|
+-------+------+------+--------------------+--------------------+
|   1193|     1|   5.0|One Flew Over the...|               Drama|
|    661|     1|   3.0|James and the Gia...|Animation|Childre...|
|    914|     1|   3.0| My Fair Lady (1964)|     Musical|Romance|
|   3408|     1|   4.0|Erin Brockovich (...|               Drama|
|   2355|     1|   5.0|Bug's Life, A (1998)|Animation|Childre...|
|   1197|     1|   3.0|Princess Bride, T...|Action|Adventure|...|
|   1287|     1|   5.0|      Ben-Hur (1959)|Action|Adventure|...|
|   2804|     1|   5.0|Christmas Story, ...|        Comedy|Drama|
|    594|     1|   4.0|Snow White and th...|Animation|Childre...|
|    919|     1|   4.0|Wizard of Oz, The...|Adventure|Childre...|
|    595|     1|   5.0|Beauty and the Be...|Animation|Childre...|
|    938|     1|   4.0|         Gigi (1958)|             Musical|
|   2398| 

In [12]:
# Create test and train set

(train, test) = ratings.randomSplit([0.8, 0.2], seed = 2021)

In [15]:
# we will make a function to determine whether or not an individual has watch the movie

def get_binary_data(ratings):
    ratings = ratings.withColumn('binary', F.lit(1))
    userIDs = ratings.select("userID").distinct()
    movieIDs = ratings.select("MovieID").distinct()

    user_movie = userIDs.crossJoin(movieIDs).join(ratings, ['userID', 'MovieID'], "left")
    user_movie = user_movie.select(['userID', 'MovieID', 'binary']).fillna(0)
    return user_movie

user_movie = get_binary_data(ratings)

In [16]:
# Create ALS model so we can run our models through it.

als = ALS(
         userCol="userID", 
         itemCol="MovieID",
         ratingCol="rating", 
         nonnegative = True, 
         implicitPrefs = False,
         coldStartStrategy="drop"
)

In [17]:
# Add hyperparameters and their respective values to param_grid we will be testing 9 different models

param_grid = ParamGridBuilder() \
            .addGrid(als.rank, [10, 50, 100]) \
            .addGrid(als.regParam, [.01, .05, .1]) \
            .build()


In [18]:
# here we will define the evaluator as an RMSE and print the length.

evaluator = RegressionEvaluator(
           metricName="rmse", 
           labelCol="rating", 
           predictionCol="prediction") 

print ("The number models which will be tested will be: ", len(param_grid))


The number models which will be tested will be:  9


In [19]:
# Build the cross validation using the CrossValidator function I wil be using 6 folds for this assignment.

cv = CrossValidator(estimator = als, estimatorParamMaps = param_grid, evaluator = evaluator, numFolds = 6)

In [20]:
# Fit cross validator to the 'train' dataset
model = cv.fit(train)

# Extract best model from the cv model above
best_model = model.bestModel

# After we will print the score and see what the results are
test_predictions = best_model.transform(test)
RMSE = evaluator.evaluate(test_predictions)
print(RMSE)

0.8519728058640155


In [57]:
# Here we are checking which parametre out of 9 is the best model

print("::Best Model::")
print("  Rank:", best_model._java_obj.parent().getRank())
print("  Max Iterations:", best_model._java_obj.parent().getMaxIter())
print("  RegParam:", best_model._java_obj.parent().getRegParam())




::Best Model::
  Rank: 100
  Max Iterations: 10
  RegParam: 0.05


In [29]:
# Here we are going to generate n Recommendations for all users

n_recommendations = best_model.recommendForAllUsers(5)

n_recommendations = n_recommendations\
    .withColumn("rec_exp", explode("recommendations"))\
    .select('userID', col("rec_exp.MovieID"), col("rec_exp.rating"))

n_recommendations.limit(10).show()


+------+-------+---------+
|userID|MovieID|   rating|
+------+-------+---------+
|  1580|   3010| 4.483064|
|  1580|   1423| 4.458513|
|  1580|   2609| 4.452656|
|  1580|    978|4.4285493|
|  1580|   1111|4.4222517|
|  4900|   1421|5.2794046|
|  4900|    318|5.2740226|
|  4900|   1900| 5.238463|
|  4900|   2019| 5.236742|
|  4900|   3338|5.2089295|
+------+-------+---------+



In [50]:
# Making sure the recommender is running properly we will test and see if user 1000 is accurate

print('----- Users Choice of Movies -----')
n_recommendations.join(movies, on = 'movieID').filter('userID = 1000').show()

print('----- Recommended Movies to Watch ------')
ratings.join(movies, on = 'movieID').filter('userID = 1000').sort('rating', ascending=False).limit(20).show()

----- Users Choice of Movies -----
+-------+------+---------+--------------------+--------------------+
|MovieID|userID|   rating|               Title|              Genres|
+-------+------+---------+--------------------+--------------------+
|    260|  1000| 5.041031|Star Wars: Episod...|Action|Adventure|...|
|    318|  1000|4.9599986|Shawshank Redempt...|               Drama|
|   2571|  1000|4.9124155|  Matrix, The (1999)|Action|Sci-Fi|Thr...|
|    527|  1000|  4.88983|Schindler's List ...|           Drama|War|
|   1198|  1000| 4.885619|Raiders of the Lo...|    Action|Adventure|
+-------+------+---------+--------------------+--------------------+

----- Recommended Movies to Watch ------
+-------+------+------+--------------------+--------------------+
|MovieID|userID|rating|               Title|              Genres|
+-------+------+------+--------------------+--------------------+
|   1610|  1000|   5.0|Hunt for Red Octo...|     Action|Thriller|
|    858|  1000|   5.0|Godfather, The 