# Building a Movie Recommendation Engine
## Xander Hieken
***
### Data Preparation

Load the data from the `ratings.csv` and `movies.csv` files and combine them on `movieId`. 

*The resultant data set should contain all of the user ratings and include movie titles.*

In [1]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql import Row
import warnings
warnings.filterwarnings('ignore')

# Starting the SparkSession
spark = SparkSession.builder \
                    .appName("MovieRatings") \
                    .getOrCreate()

# Defining the schema for both csv files
movieSchema = StructType().add("movieId", "integer") \
                         .add("title", "string") \
                         .add("genres", "string")

ratingSchema = StructType().add("userId", "integer") \
                         .add("movieIdr", "integer") \
                         .add("rating", "float") \
                         .add("timestamp", "integer")

# Reading each csv file using my defined schema
movies = spark.read.csv("movielens/movies.csv", header = True, schema = movieSchema)
ratings = spark.read.csv("movielens/ratings.csv", header = True, schema = ratingSchema)

# Defining the join expression
joinExpression = movies["movieId"] == ratings["movieIdr"]

# Joining the dataframes
lines = movies.join(ratings, joinExpression)

# Finalizing the structure of my 'ratings' dataframe
ratings = lines.select('userId', 'movieId', 'rating', 'timestamp', 'title', 'genres')

***
### Training the Recommendation Engine

Using the data from the last step, I will create a movie recommendation model using collaborative filtering. 

Before training the recommendation model, I split the data into a training dataset and a testing dataset using the `randomSplit` dataframe method. 

*80% of the data will be used for training and 20% for testing.*

In [2]:
# Using randomSplit to create an 80/20 train/test split
(training, test) = ratings.randomSplit([0.8, 0.2])

# Build the recommendation model using ALS (Alternating Least Squares) on the training data
# Note we set cold start strategy to 'drop' to ensure we don't get NaN evaluation metrics
als = ALS(maxIter=5, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="rating",
          coldStartStrategy="drop")

# Fit the model using the training dataset
model = als.fit(training)

After fitting the model using the training dataset, I calculate the predictions on the test dataset and use the `RegressionEvaluator` to calculate the root-mean-square error (RMSE) of the model.

In [3]:
# Evaluate the model by computing the RMSE on the test data
predictions = model.transform(test)

evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",
                                predictionCol="prediction")

rmse = evaluator.evaluate(predictions)

print("Root-mean-square error = " + str(rmse))

Root-mean-square error = 1.0797962930808434


***
### Generating Top 10 Movie Recommendations

Using the recommendation model, I now generate the top ten recommendations for each user. 

In [4]:
# Generate top 10 movie recommendations for each user
userRecs = model.recommendForAllUsers(10)

To see some results, I want to check out the recommendations for the user IDs, 127, 151, and 300. 

In [5]:
recommendations_127 = userRecs.where(userRecs.userId == 127)
recommendations_151 = userRecs.where(userRecs.userId == 151)
recommendations_300 = userRecs.where(userRecs.userId == 300)

In [6]:
recommendations_127.show(truncate=False)

+------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|userId|recommendations                                                                                                                                                                        |
+------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|127   |[[70641, 11.197943], [85367, 9.942365], [43928, 9.480051], [938, 9.322339], [41573, 9.244039], [3004, 9.199526], [7155, 8.920888], [2376, 8.87927], [2565, 8.765781], [1409, 8.658605]]|
+------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+



In [7]:
recommendations_151.show(truncate=False)

+------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|userId|recommendations                                                                                                                                                                               |
+------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|151   |[[188, 6.6401634], [58301, 6.601354], [93840, 6.435652], [7720, 6.424265], [1211, 6.3180017], [97225, 6.2901716], [62155, 6.0972524], [6818, 6.0305104], [97306, 5.9089346], [4351, 5.903432]]|
+------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+


In [8]:
recommendations_300.show(truncate=False)

+------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|userId|recommendations                                                                                                                                                                        |
+------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|300   |[[89904, 7.9275184], [5785, 7.7424746], [1218, 7.717198], [1131, 7.5375314], [85, 7.5194035], [1883, 7.447788], [3030, 7.241143], [3089, 7.1625175], [215, 6.987399], [800, 6.8838964]]|
+------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+



At this point, I can see that the model is generating ten movieIDs and ratings for each of the users, but the output isn't very user-friendly. Let's see if I can get that to look a little nicer.

In [9]:
import pandas as pd

def prettyResults(sparkDF):
    # This function will take a spark dataframe and return a pandas dataframe with the top 10 recommendations
    # The returned dataframe will only have two columns: 'Movie' and 'Predicted_Rating'
    pd.set_option('display.max_colwidth', -1) # I always want to display the full column width
    tempDF = sparkDF.toPandas()
    movie = []
    rating = []
    
    # loop to populate movie and rating lists
    for i in range(len(tempDF.recommendations[0])):
        movie.append(tempDF.recommendations[0][i][0])
        rating.append(tempDF.recommendations[0][i][1])
    
    # creating the pandas dataframe with movies and ratings
    df = pd.DataFrame({'Movie': movie, 'Predicted_Rating': rating})
    
    return df

def name_retriever(movie_id, df):
    # This function takes a movie_id number and the original spark dataframe of ratings
    # It returns the title of that movie_id
    return df.filter(df.movieId == movie_id).select('title').collect()[0][0]

def namedResults(df):
    # This function takes the dataframe from prettyResults and swaps the movie title for the movie_id
    # It returns a dataframe with 'Movie' and 'Predicted_Rating' columns that shows the title of each movie
    for i in range(len(df)):
        df.Movie[i] = name_retriever(int(df.Movie[i]), ratings)
    return df

Now, let's check those same users again- this time using the functions I just created to display the recommendations in a much more user-friendly format

In [10]:
df127 = prettyResults(recommendations_127)
namedResults(df127)
print("Top 10 Recommendations for userId 127")
df127

Top 10 Recommendations for userId 127


Unnamed: 0,Movie,Predicted_Rating
0,Miss March (2009),11.197943
1,Just Go with It (2011),9.942365
2,Ultraviolet (2006),9.480051
3,Gigi (1958),9.322339
4,"Family Stone, The (2005)",9.244039
5,"Bachelor, The (1999)",9.199526
6,Calendar Girls (2003),8.920888
7,"View to a Kill, A (1985)",8.87927
8,"King and I, The (1956)",8.765781
9,Michael (1996),8.658605


In [11]:
df151 = prettyResults(recommendations_151)
namedResults(df151)
print("Top 10 Recommendations for userId 151")
df151

Top 10 Recommendations for userId 151


Unnamed: 0,Movie,Predicted_Rating
0,"Prophecy, The (1995)",6.640163
1,Funny Games U.S. (2007),6.601354
2,"Cabin in the Woods, The (2012)",6.435652
3,"Four Musketeers, The (1974)",6.424265
4,"Wings of Desire (Himmel über Berlin, Der) (1987)",6.318002
5,Hotel Transylvania (2012),6.290172
6,Nick and Norah's Infinite Playlist (2008),6.097252
7,Come and See (Idi i smotri) (1985),6.03051
8,Seven Psychopaths (2012),5.908935
9,Point Break (1991),5.903432


In [12]:
df300 = prettyResults(recommendations_300)
namedResults(df300)
print("Top 10 Recommendations for userId 300")
df300

Top 10 Recommendations for userId 300


Unnamed: 0,Movie,Predicted_Rating
0,The Artist (2011),7.927518
1,Jackass: The Movie (2002),7.742475
2,"Killer, The (Die xue shuang xiong) (1989)",7.717198
3,Jean de Florette (1986),7.537531
4,Angels and Insects (1995),7.519403
5,Bulworth (1998),7.447788
6,Yojimbo (1961),7.241143
7,Bicycle Thieves (a.k.a. The Bicycle Thief) (a.k.a. The Bicycle Thieves) (Ladri di biciclette) (1948),7.162518
8,Before Sunrise (1995),6.987399
9,Lone Star (1996),6.883896
