# DSCI 6007 Final Project
### Project: Netflix Recommendation System 

The purpose of this project was to create a movie recommendation system using Netflix data. Then, using the system created, predict movie ratings for 100,000 movies for users in a subset of the original Netflix data. 

The steps taken in this project: 
1. Analyze the Netflix data using Spark
2. Based on the outcomes of this analysis, develop a feasible and efficent implementation of the collabortive filtering algorithm in Spark. 
3. Execute the program on Amazon EMR to get the rating predictions and evaluate those ratings by comparing them to the provided true ratings. 


In [1]:
import os
# from test_helper import Test

# Change to the location of data files
dbfs_dir = 's3://dsci6007finalproject'
ratings_filename = dbfs_dir + '/TrainingRatings.txt'
movies_filename = dbfs_dir + '/movie_titles.txt'


In [2]:
#setting the schemas for the data we will be reading in 
from pyspark.sql.types import *

training_ratings_df_schema = StructType(
  [StructField('MovieId', IntegerType()),
   StructField('CustomerId', IntegerType()),
   StructField('Rating', DoubleType())]
)

movies_df_schema = StructType(
  [StructField('ID', IntegerType()),
   StructField('release_year', IntegerType()),
   StructField('title', StringType())]
)

test_ratings_df_schema = StructType(
  [StructField('MovieId', IntegerType()),
   StructField('CustomerId', IntegerType()),
   StructField('Rating', DoubleType())]
)

In [3]:
#loading in the ratings (training) data
ratings_df = sqlContext.read.format('csv').options(header=True, inferSchema=False).schema(training_ratings_df_schema).csv(ratings_filename)
ratings_df.show()

[Stage 0:>                                                          (0 + 1) / 1]

+-------+----------+------+
|MovieId|CustomerId|Rating|
+-------+----------+------+
|      8|   1395430|   2.0|
|      8|   1205593|   4.0|
|      8|   1488844|   4.0|
|      8|   1447354|   1.0|
|      8|    306466|   4.0|
|      8|   1331154|   4.0|
|      8|   1818178|   3.0|
|      8|    991725|   4.0|
|      8|   1987434|   4.0|
|      8|   1765381|   4.0|
|      8|    433803|   3.0|
|      8|   1148143|   2.0|
|      8|   1174811|   5.0|
|      8|   1684516|   3.0|
|      8|    754781|   4.0|
|      8|    567025|   4.0|
|      8|   1623132|   4.0|
|      8|   1567095|   3.0|
|      8|   1666394|   5.0|
|      8|    622194|   4.0|
+-------+----------+------+
only showing top 20 rows



                                                                                

In [4]:
#loading in the movies dataframe
movies_df = sqlContext.read.format('csv').options(header=True, inferSchema=False).schema(movies_df_schema).csv(movies_filename)
movies_df.show(5)

+---+------------+--------------------+
| ID|release_year|               title|
+---+------------+--------------------+
|  2|        2004|Isle of Man TT 20...|
|  3|        1997|           Character|
|  4|        1994|Paula Abdul's Get...|
|  5|        2004|The Rise and Fall...|
|  6|        1997|                Sick|
+---+------------+--------------------+
only showing top 5 rows



In [5]:
#how much data are we working with
ratings_count = ratings_df.count()
print('There are {} ratings in the ratings dataframe.'.format(ratings_count))
movies_count = movies_df.count()
print('There are {} ratings in the movies dataframe.'.format(movies_count))

                                                                                

There are 3255351 ratings in the ratings dataframe.
There are 17769 ratings in the movies dataframe.


## Basic Recommendations

One simple method of recommendation is to just recommend the top-rated movies. To do this we will combine and average all of the user ratings for movies and sort them from greatest to least. 

In [6]:
from pyspark.sql import functions as F

# From ratingsDF, create a movie_ids_with_avg_ratings_df that combines the two DataFrames
movie_ids_with_avg_ratings_df = ratings_df.groupBy('movieId').agg(F.count(ratings_df.Rating).alias("count"), F.avg(ratings_df.Rating).alias("average"))
print('movie_ids_with_avg_ratings_df:')
movie_ids_with_avg_ratings_df.show(3, truncate=False)

# # Note: movie_names_df is a temporary variable, used only to separate the steps necessary
# # to create the movie_names_with_avg_ratings_df DataFrame.
# movie_names_df = movie_ids_with_avg_ratings_df.<FILL_IN>
# movie_names_with_avg_ratings_df = movie_names_df.<FILL_IN>

# print 'movie_names_with_avg_ratings_df:'
# movie_names_with_avg_ratings_df.show(3, truncate=False)

movie_ids_with_avg_ratings_df:




+-------+-----+------------------+
|movieId|count|average           |
+-------+-----+------------------+
|2366   |1643 |3.036518563603165 |
|4190   |37   |3.054054054054054 |
|3220   |1417 |2.9273112208892025|
+-------+-----+------------------+
only showing top 3 rows



                                                                                

In [7]:
#adding in the titles of the movies 
movie_names_with_avg_rating_df = movie_ids_with_avg_ratings_df.join(movies_df, movie_ids_with_avg_ratings_df.movieId == movies_df.ID).select("movieId","average","title", "count").sort(F.col("average").desc())
movie_names_with_avg_rating_df.show(5, truncate=False)




+-------+-----------------+------------------------------------------------+-----+
|movieId|average          |title                                           |count|
+-------+-----------------+------------------------------------------------+-----+
|3033   |4.5              |Ghost in the Shell: Stand Alone Complex: 2nd Gig|62   |
|12293  |4.464598134454594|The Godfather                                   |20691|
|16147  |4.422818791946309|The Sopranos: Season 1                          |10430|
|14283  |4.418618618618619|The Best of Friends: Vol. 3                     |3330 |
|1256   |4.414830736163353|The Best of Friends: Vol. 4                     |3722 |
+-------+-----------------+------------------------------------------------+-----+
only showing top 5 rows



                                                                                

## Movies with Highest Ratings and at least 500 Reviews: 



In [118]:
#filtering it so that we have the top rated movies where at least 500 people reviewed it
movies_with_500_ratings_or_more = movie_names_with_avg_rating_df.filter(F.col('count')>=500)
print('Movies with highest ratings:')
movies_with_500_ratings_or_more.show(5000, truncate=False)

Movies with highest ratings:


                                                                                

+-------+------------------+------------------------------------------------------------------------+-----+
|movieId|average           |title                                                                   |count|
+-------+------------------+------------------------------------------------------------------------+-----+
|12293  |4.464598134454594 |The Godfather                                                           |20691|
|16147  |4.422818791946309 |The Sopranos: Season 1                                                  |10430|
|14283  |4.418618618618619 |The Best of Friends: Vol. 3                                             |3330 |
|1256   |4.414830736163353 |The Best of Friends: Vol. 4                                             |3722 |
|5760   |4.410690051153565 |The Sopranos: Season 3                                                  |9579 |
|7569   |4.38388625592417  |Dead Like Me: Season 2                                                  |1477 |
|14648  |4.35288414929714  |

### Further Data Analysis

In [9]:
#getting number of distinct users in training dataset
from pyspark.sql.functions import countDistinct
distinctusers=ratings_df.select(countDistinct("CustomerId"))
print('Distinct users in the training data set: ')
distinctusers.show()

Distinct users in the training data set: 




+--------------------------+
|count(DISTINCT CustomerId)|
+--------------------------+
|                     28978|
+--------------------------+



                                                                                

In [10]:
#getting distinct number of movies that have been rated from the training data set
from pyspark.sql.functions import countDistinct
distinctmovies=ratings_df.select(countDistinct("movieId"))
print('Distinct movies that have been rated by the users in the training data set: ')
distinctmovies.show()

Distinct movies that have been rated by the users in the training data set: 




+-----------------------+
|count(DISTINCT movieId)|
+-----------------------+
|                   1821|
+-----------------------+



                                                                                

In [11]:
#getting distinct number of movies that have been rated from the training data set
from pyspark.sql.functions import countDistinct
distinctmovies=movies_df.select(countDistinct("ID"))
print('Total movies available for rating: ')
distinctmovies.show()

Total movies available for rating: 
+------------------+
|count(DISTINCT ID)|
+------------------+
|             17769|
+------------------+



So there are a lot of movies that have not been rated by anyone in the training dataset that we will have to account for when making predictions. 

## Loading in the test set

In [12]:
# Change to the location of data files
dbfs_dir = 's3://dsci6007finalproject'
test_ratings_filename = dbfs_dir + '/TestingRatings.txt'
#loading in the ratings (testing) data
test_ratings_df = sqlContext.read.format('csv').options(header=True, inferSchema=False).schema(test_ratings_df_schema).csv(test_ratings_filename)
test_ratings_df.show()

+-------+----------+------+
|MovieId|CustomerId|Rating|
+-------+----------+------+
|      8|   2149668|   3.0|
|      8|   1089184|   3.0|
|      8|   2465894|   3.0|
|      8|    534508|   1.0|
|      8|    992921|   4.0|
|      8|    595054|   4.0|
|      8|   1298304|   4.0|
|      8|   1661600|   4.0|
|      8|    553787|   2.0|
|      8|   1309839|   3.0|
|      8|    727242|   1.0|
|      8|   1437668|   4.0|
|      8|   2170930|   1.0|
|      8|   1780876|   5.0|
|      8|      9660|   3.0|
|      8|   2379200|   4.0|
|      8|    563186|   5.0|
|      8|   1539617|   4.0|
|      8|   1656839|   1.0|
|      8|   2591126|   4.0|
+-------+----------+------+
only showing top 20 rows



One of the things we were asked to find in problem 2 was the average overlap of items rated by the users in the training set for users in the test set and the average overlap of users that rated items in the training set for items appearing in the test set? 

First, we will start with the average overlap of items rated by users in the training set for users in the test set: 

In [13]:
# picking one user from the testing data set (CustomerId==2149668)

#now find the movies they have rated in the training set: 

sampleuser_df=ratings_df.filter(F.col('CustomerId')==2149668)
sampleuser_df.show(3)


#list of all of the movies that the sample user has rated
sampleusermovielist=sampleuser_df.select('MovieId').rdd.flatMap(lambda x: x).collect()
print('User {} rated {} movies. Their IDs can be found below:'.format(2149668,len(sampleusermovielist)))
print(sampleusermovielist)


#then find the average number of users who rated each of those movies in the
# training set
# for movie in sampleusermovielist:
#     movie_rating_count=ratings_df.filter(F.col('MovieId')==movie).agg(F.count(ratings_df.Rating).alias("count"))
#     movie_rating_count.show()




+-------+----------+------+
|MovieId|CustomerId|Rating|
+-------+----------+------+
|    992|   2149668|   3.0|
|   1202|   2149668|   3.0|
|   1289|   2149668|   1.0|
+-------+----------+------+
only showing top 3 rows





User 2149668 rated 75 movies. Their IDs can be found below:
[992, 1202, 1289, 1305, 2015, 2212, 2342, 2601, 2675, 2755, 2913, 2955, 3151, 3253, 3274, 3290, 3355, 3538, 4847, 4849, 5025, 5562, 5814, 6287, 6336, 6529, 6556, 6971, 7238, 7544, 8166, 8354, 8428, 8596, 8703, 8849, 8851, 9028, 9617, 9689, 9728, 9946, 10080, 10734, 10774, 10889, 11286, 11638, 11888, 12184, 12232, 12275, 12280, 12293, 12355, 12497, 12627, 13614, 13636, 13787, 14144, 14154, 14185, 14198, 14216, 14484, 14505, 14983, 15152, 15246, 15816, 16286, 16707, 17196, 17536]


                                                                                

In [14]:
#Now we will try to find the average number of users who rated the same movies

same_movies_as_sample_user=ratings_df.filter(F.col('MovieId').isin(sampleusermovielist))
print('The number of users who rated each of the same movies as sample user (with id: 2149668):')
same_movies_as_sample_user_counts=same_movies_as_sample_user.groupBy('MovieId').agg(F.count(same_movies_as_sample_user.Rating).alias("count"))
same_movies_as_sample_user_counts.show()

print('The average number of users that rated each of the same movies is:')
same_movies_as_sample_user_counts.agg(F.avg(F.col("count"))).show()

The number of users who rated each of the same movies as sample user (with id: 2149668):


                                                                                

+-------+-----+
|MovieId|count|
+-------+-----+
|  12184| 7363|
|   8354| 1925|
|  17196| 1414|
|  11888|15196|
|   2675| 9138|
|  12293|20691|
|   8851|12999|
|    992| 3164|
|   5025| 9653|
|   5814|18298|
|  14505| 2244|
|   8596|23005|
|  12627|  635|
|  13614|19316|
|   1305| 8707|
|   9617|16546|
|  14185|15690|
|   1202|20997|
|   5562| 9093|
|  15152|  488|
+-------+-----+
only showing top 20 rows

The average number of users that rated each of the same movies is:




+------------------+
|        avg(count)|
+------------------+
|10178.373333333333|
+------------------+



                                                                                

Now we will look at the average overlap of users that rated items in the training set for items appearing in the test set? 

In [15]:
#start by picking a few movies from the test set, I will just do the top 100
# most commonly rated movies from the test set. 


#getting the 100 most common movies
most_common_movies_test_df=test_ratings_df.groupBy('MovieId').agg(F.count(test_ratings_df.MovieId).alias('count')).sort(F.col('count').desc())
most_common_movies=most_common_movies_test_df.select('MovieId').rdd.flatMap(lambda x: x).collect()
most_common_movies=most_common_movies[:100]   

#now seeing how many users rated each of those same movies in the training data
most_common_movies_overlap=ratings_df.filter(F.col('MovieId').isin(most_common_movies))
most_common_movies_overlap=most_common_movies_overlap.groupBy('MovieId').agg(F.count(same_movies_as_sample_user.Rating).alias("count"))
print('The number of people in the training set who voted for each of the 100 most commonly rated films from the training set:')
most_common_movies_overlap.show()

print('The average number of users in the training set that rated each of the top 100 movies from the test set:')
most_common_movies_overlap_avg=most_common_movies_overlap.agg(F.avg(F.col("count"))).show()


The number of people in the training set who voted for each of the 100 most commonly rated films from the training set:


                                                                                

+-------+-----+
|MovieId|count|
+-------+-----+
|  11888|15196|
|   2675| 9138|
|   6911|13316|
|  12293|20691|
|   8851|12999|
|    442|10193|
|   5025| 9653|
|  14407|10218|
|   1615|18043|
|  16147|10430|
|   3893|11799|
|   9481|12893|
|   8596|23005|
|   1901|10062|
|   1406|20631|
|   5814|18298|
|   6347|13806|
|  13651|20902|
|  13614|19316|
|   9043|10686|
+-------+-----+
only showing top 20 rows

The average number of users in the training set that rated each of the top 100 movies from the test set:




+----------+
|avg(count)|
+----------+
|   15163.9|
+----------+



                                                                                

In [16]:
# #list of all of the movies that the sample user has rated
# sampleusermovielist=sampleuser_df.select('MovieId').rdd.flatMap(lambda x: x).collect()
# print(sampleusermovielist)

In [17]:
# for song in sampleusermovielist: 
    

In [18]:
#creating a df that keeps track of all of the movies each user has rated
all_ratings_from_user=ratings_df.groupby('CustomerId').agg(F.collect_set('MovieId').alias('AllMoviesRated'))

In [19]:
all_ratings_from_user=ratings_df.join(all_ratings_from_user, 'CustomerId')
all_ratings_from_user.show()



+----------+-------+------+--------------------+
|CustomerId|MovieId|Rating|      AllMoviesRated|
+----------+-------+------+--------------------+
|       481|   2532|   5.0|[17423, 13519, 69...|
|       481|   2601|   5.0|[17423, 13519, 69...|
|       481|   2756|   5.0|[17423, 13519, 69...|
|       481|   2913|   5.0|[17423, 13519, 69...|
|       481|   3165|   4.0|[17423, 13519, 69...|
|       481|   3638|   5.0|[17423, 13519, 69...|
|       481|   3670|   3.0|[17423, 13519, 69...|
|       481|   3890|   4.0|[17423, 13519, 69...|
|       481|   4064|   5.0|[17423, 13519, 69...|
|       481|   4432|   4.0|[17423, 13519, 69...|
|       481|   6911|   4.0|[17423, 13519, 69...|
|       481|   6971|   4.0|[17423, 13519, 69...|
|       481|   7511|   4.0|[17423, 13519, 69...|
|       481|   7524|   5.0|[17423, 13519, 69...|
|       481|   7544|   4.0|[17423, 13519, 69...|
|       481|   8596|   4.0|[17423, 13519, 69...|
|       481|   9617|   3.0|[17423, 13519, 69...|
|       481|  12232|

                                                                                

In [20]:
#finding users who have the same movies rated: 

all_ratings_from_user.printSchema()


root
 |-- CustomerId: integer (nullable = true)
 |-- MovieId: integer (nullable = true)
 |-- Rating: double (nullable = true)
 |-- AllMoviesRated: array (nullable = true)
 |    |-- element: integer (containsNull = true)



In [21]:
# is_even = lambda x: x == sampleusermovielist
# res = ratings_df.withColumn("arr_evens", filter(col("some_arr"), is_even))
# res.show()

In [22]:
# def movieinrated(currentmovielist,movielist): 
#     if movie in movielist: 
#         return movie
    
# movielist=['StarWars']
# movieinrated('StarWars',movielist)

### Collaborative Filtering

Collaborative filtering is a technique that can filter out items that a user might like on the basis of reactions by similar users. It is a method of making automatic predictions (filtering) about people's interests by searching a large group of people and finding smaller sets of users that have similar interests and tastes. It looks at the times they like and combines them to create a ranked list of suggestions. The basic idea is that if person A and person B share interests, than person A is likely to feel the same way about another interest, as opposed to how another random person feels about it. 

This is the method that will be used to create a Netflix movie recommendation system. 

### Building the Model 

In [23]:
training_df=ratings_df
training_df.show(3)

+-------+----------+------+
|MovieId|CustomerId|Rating|
+-------+----------+------+
|      8|   1395430|   2.0|
|      8|   1205593|   4.0|
|      8|   1488844|   4.0|
+-------+----------+------+
only showing top 3 rows



In [24]:
from pyspark.ml.recommendation import ALS
seed = 1800009193
# initializing the ALS learner
als = ALS()

# Now we set the parameters for the method
als.setMaxIter(5)\
   .setSeed(seed)\
   .setRegParam(0.1)\
    .setUserCol('CustomerId')\
    .setItemCol('MovieId')\
    .setRatingCol('Rating')\

# Now let's compute an evaluation metric for our test dataset
from pyspark.ml.evaluation import RegressionEvaluator

# Create an RMSE evaluator using the label and predicted columns
reg_eval = RegressionEvaluator(predictionCol="prediction", labelCol="Rating", metricName="rmse")

tolerance = 0.03
ranks = [4, 8, 12]
errors = [0, 0, 0]
models = [0, 0, 0]
err = 0
min_error = float('inf')
best_rank = -1
for rank in ranks:
  # Set the rank here:
  als.rank
  # Create the model with these parameters.
  model = als.fit(training_df)
  # Run the model to create a prediction. Predict against the validation_df.
  predict_df = model.transform(test_ratings_df)
    
    
  # Remove NaN values from prediction (due to SPARK-14489)
  predicted_ratings_df = predict_df.filter(predict_df.prediction != float('nan'))

  # Run the previously created RMSE evaluator, reg_eval, on the predicted_ratings_df DataFrame
  error = reg_eval.evaluate(predicted_ratings_df)
  errors[err] = error
  models[err] = model
  print('For rank %s the RMSE is %s' % (rank, error))
  if error < min_error:
    min_error = error
    best_rank = err
  err += 1

als.setRank(ranks[best_rank])
print('The best model was trained with rank %s' % ranks[best_rank])
my_model = models[best_rank]

                                                                                

For rank 4 the RMSE is 0.8703085717830881


                                                                                

For rank 8 the RMSE is 0.8703085717830881




For rank 12 the RMSE is 0.8703085717830881
The best model was trained with rank 4


                                                                                

### Testing the Model 

In [25]:
predictions_df=my_model.transform(test_ratings_df)

In [34]:
predictions_df.show(5)

+-------+----------+------+----------+
|MovieId|CustomerId|Rating|prediction|
+-------+----------+------+----------+
|     28|   2358799|   3.0| 3.7091436|
|    156|    973051|   5.0| 3.9311593|
|    851|   1189060|   3.0| 3.5384197|
|   1100|   2376892|   2.0| 2.3329113|
|   1123|   1628484|   3.0|  3.362345|
+-------+----------+------+----------+
only showing top 5 rows



In [225]:
MSE=predictions_df.select(F.abs((F.col("Rating") - F.col("prediction"))).alias("MSE"))
print('The absolute mean error is:')
MSE.agg(F.avg(MSE.MSE).alias('average')).show()

The absolute mean error is:
+------------------+
|           average|
+------------------+
|0.6943449243463141|
+------------------+



In [32]:
print('The number of predictions before getting rid of nans: {}'.format(predictions_df.count()))
predictions_ratings_df=predictions_df.filter(predictions_df.prediction!= float('nan'))
print('The number of predictions after rid of nans: {}'.format(predictions_df.count()))

The number of predictions before getting rid of nans: 100477
The number of predictions after rid of nans: 100477


In [37]:
test_RSME=reg_eval.evaluate(predictions_ratings_df)
print('The model has a RSME on the test dataset of:',test_RSME)



The model has a RSME on the test dataset of: 0.8703085717830881


                                                                                

### Comparing the Model

Comparing the model to if we had just used the average rating of all users across all movies. 

In [50]:
#getting the average rating

avg_rating_df=ratings_df.agg(F.avg(ratings_df.Rating).alias('average'))
avg_rating=avg_rating_df.collect()[0][0]
print("The average rating for all movies in the training set is: {}".format(avg_rating))
test_w_avg_df=test_ratings_df.withColumn('prediction',F.lit(avg_rating))
test_w_avg_df.show(5)

#getting the RMSE
test_avg_RSME=reg_eval.evaluate(test_w_avg_df)
print('Using the avg. rating of the training data as the prediction for the test data had an error of {}'.format(test_avg_RSME))
print("So the model created was more accurate.")

                                                                                

The average rating for all movies in the training set is: 3.4811883572616287
+-------+----------+------+------------------+
|MovieId|CustomerId|Rating|        prediction|
+-------+----------+------+------------------+
|      8|   2149668|   3.0|3.4811883572616287|
|      8|   1089184|   3.0|3.4811883572616287|
|      8|   2465894|   3.0|3.4811883572616287|
|      8|    534508|   1.0|3.4811883572616287|
|      8|    992921|   4.0|3.4811883572616287|
+-------+----------+------+------------------+
only showing top 5 rows

Using the avg. rating of the training data as the prediction for the test data had an error of 1.0852531859495582
So the model created was more accurate.


Now what about comparing to the average rating per movie? 


In [65]:
test_avg_movie_rating_df=test_ratings_df.join(movie_names_with_avg_rating_df, test_ratings_df.MovieId==movie_names_with_avg_rating_df.movieId)

In [73]:
test_avg_movie_rating_df=test_avg_movie_rating_df.drop('title','count')


In [79]:
#getting rid of duplicate movieId columns
df_cols = test_avg_movie_rating_df.columns
df_cols=[x.lower() for x in df_cols]
# get index of the duplicate columns
duplicate_col_index = list(set([df_cols.index(c) for c in df_cols if df_cols.count(c) == 2]))

for i in duplicate_col_index:
    df_cols[i] = df_cols[i] + '_duplicated'

test_avg_movie_rating_df = test_avg_movie_rating_df.toDF(*df_cols)
cols_to_remove = [c for c in df_cols if '_duplicated' in c]

#renaming average column to prediction 
test_avg_movie_rating_df=test_avg_movie_rating_df.withColumnRenamed('average','prediction')
test_avg_movie_rating_df=test_avg_movie_rating_df.withColumnRenamed('rating','Rating')
test_avg_movie_rating_df.drop(*cols_to_remove).show()

                                                                                

+----------+------+-------+-----------------+
|customerid|Rating|movieid|       prediction|
+----------+------+-------+-----------------+
|   2648176|   5.0|   3033|              4.5|
|     70476|   4.0|  12293|4.464598134454594|
|   2616318|   5.0|  12293|4.464598134454594|
|   1434507|   4.0|  12293|4.464598134454594|
|    945992|   5.0|  12293|4.464598134454594|
|   2292257|   4.0|  12293|4.464598134454594|
|   2087152|   5.0|  12293|4.464598134454594|
|     44490|   4.0|  12293|4.464598134454594|
|    938714|   4.0|  12293|4.464598134454594|
|   2469788|   5.0|  12293|4.464598134454594|
|    627801|   4.0|  12293|4.464598134454594|
|   2401006|   5.0|  12293|4.464598134454594|
|   2172649|   5.0|  12293|4.464598134454594|
|   2304078|   5.0|  12293|4.464598134454594|
|   2087448|   5.0|  12293|4.464598134454594|
|   1211567|   5.0|  12293|4.464598134454594|
|   1033224|   5.0|  12293|4.464598134454594|
|    899799|   4.0|  12293|4.464598134454594|
|   2207031|   5.0|  12293|4.46459

In [84]:
#now to get the RMSE of the average rating per movie: 

test_avg_movie_rating_df=test_avg_movie_rating_df.filter(test_avg_movie_rating_df.prediction!= float('nan'))
avg_movie_rating_RSME=reg_eval.evaluate(test_avg_movie_rating_df)
print("The RSME when using a specific movie's average rating is: {}".format(avg_movie_rating_RSME))

                                                                                

The RSME when using a specific movie's average rating is: 1.0069899571932361


Again, the model created using ALS was more accurate than this method as well. 

### Predicting My Own Ratings

Now I will be adding 10 of my own ratings to the training data.

In [186]:
from pyspark.sql import Row
my_user_id=0

my_rated_movies=[
    (14364,0,5),
    (14953,0,4),
    (15500,0,5),
    (3364,0,4),
    (17088,0,3),
    (17324,0,4),
    (5837,0,4),
    (6287,0,4),
    (11064,0,5),
    (9875,0,1)]

my_ratings_df=sqlContext.createDataFrame(my_rated_movies,['MovieId','CustomerId','Rating'])
print('My movie ratings:')
my_ratings_df.show()

My movie ratings:
+-------+----------+------+
|MovieId|CustomerId|Rating|
+-------+----------+------+
|  14364|         0|     5|
|  14953|         0|     4|
|  15500|         0|     5|
|   3364|         0|     4|
|  17088|         0|     3|
|  17324|         0|     4|
|   5837|         0|     4|
|   6287|         0|     4|
|  11064|         0|     5|
|   9875|         0|     1|
+-------+----------+------+



In [187]:
#adding my movies to the training dataset

training_with_my_ratings_df=ratings_df.union(my_ratings_df)


In [188]:
training_with_my_ratings_df.filter(F.col('CustomerId')==0).show()

                                                                                

+-------+----------+------+
|MovieId|CustomerId|Rating|
+-------+----------+------+
|  14364|         0|   5.0|
|  14953|         0|   4.0|
|  15500|         0|   5.0|
|   3364|         0|   4.0|
|  17088|         0|   3.0|
|  17324|         0|   4.0|
|   5837|         0|   4.0|
|   6287|         0|   4.0|
|  11064|         0|   5.0|
|   9875|         0|   1.0|
+-------+----------+------+



                                                                                

#### Training the model again with my ratings as part of the training data

In [189]:
als.setPredictionCol("prediction")\
   .setMaxIter(5)\
   .setSeed(seed)\
    .setUserCol('CustomerId')\
    .setItemCol('MovieId')\
    .setRatingCol('Rating')\

my_ratings_model = als.fit(training_with_my_ratings_df)


                                                                                

#### Making predictions with the new model 

In [190]:
my_predictions_df=my_ratings_model.transform(test_ratings_df)

In [191]:
my_predictions_df.show()

+-------+----------+------+----------+
|MovieId|CustomerId|Rating|prediction|
+-------+----------+------+----------+
|     28|   2358799|   3.0| 3.7327328|
|    156|    973051|   5.0| 3.9911323|
|    851|   1189060|   3.0| 3.6070168|
|   1100|   2376892|   2.0| 2.3124597|
|   1123|   1628484|   3.0| 3.3631506|
|   1289|   1552084|   3.0| 3.3827796|
|   1744|   2376892|   5.0| 3.6924365|
|   1851|    675056|   4.0| 3.4010458|
|   1983|   2376892|   4.0| 3.2269692|
|   1983|   2629660|   3.0| 2.9360857|
|   2290|   1909175|   5.0| 4.3345714|
|   2658|   1552084|   5.0| 3.3752797|
|   2675|   2629660|   3.0| 2.2776356|
|   2808|    128389|   5.0|  3.421668|
|   3290|   2088272|   4.0| 3.9819365|
|   3355|   2311863|   4.0| 3.1503255|
|   3538|    279120|   3.0| 3.1891806|
|   3538|   1553158|   3.0| 3.0417595|
|   3541|   1628484|   4.0| 3.4401903|
|   3587|   2376892|   4.0|  3.443128|
+-------+----------+------+----------+
only showing top 20 rows



#### Checking the RSME for new model with my ratings

In [192]:
test_RSME_my_ratings=reg_eval.evaluate(my_predictions_df)
print('The error of the model using my rankings is: {}'.format(test_RSME_my_ratings))



The error of the model using my rankings is: 0.8708227066383376


                                                                                

#### Now I will filter out any of the movies that I already manually rated to get my top 25 recommendations

In [196]:
my_rated_movie_ids = [x[0] for x in my_rated_movies]
not_rated_df = movies_df.filter(~movies_df['ID'].isin(my_rated_movie_ids))
my_unrated_movies_df = not_rated_df.withColumn('CustomerId',F.lit(0)).withColumnRenamed('ID','MovieId').drop('title','release_year').withColumn('Rating',F.lit(0))
raw_predicted_ratings_df = my_ratings_model.transform(my_unrated_movies_df)
my_predicted_ratings_df=raw_predicted_ratings_df.filter(raw_predicted_ratings_df.prediction != float('nan'))
my_predicted_ratings_df=my_predicted_ratings_df.sort(F.col('prediction').desc())

In [197]:
my_predicted_ratings_df.join(movies_df,my_predicted_ratings_df.MovieId==movies_df.ID).drop('release_year','rating','MovieId').show(25,truncate=False)

+----------+----------+-----+-----------------------------------------------------+
|CustomerId|prediction|ID   |title                                                |
+----------+----------+-----+-----------------------------------------------------+
|0         |3.6182053 |14283|The Best of Friends: Vol. 3                          |
|0         |3.6006413 |1256 |The Best of Friends: Vol. 4                          |
|0         |3.508395  |15582|Sweet Home Alabama                                   |
|0         |3.4511468 |12952|The God Who Wasn't There                             |
|0         |3.3774464 |13887|The Princess Diaries 2: Royal Engagement             |
|0         |3.3589683 |8692 |Shania Twain: Up Close and Personal                  |
|0         |3.3533468 |1213 |VeggieTales: Madame Blueberry                        |
|0         |3.3237321 |13651|Air Force One                                        |
|0         |3.3049304 |9617 |Stepmom                                        

These suggestions are pretty good. I've seen a decent amount of the titles it is recommending and I do like them. Most specifically 13 going on 30, Sweet Home Alabama, The Princess Diaries, etc. 