# Netflix Movie Recommendation

By : 
- Rakesh Reddy Poddutoori 
- Nakul Reddy Nimmala 
- Navya Gorantla.

In this project, we will be carrying out netflix movie recommendation, its about aiming to improve the accuracy of prediction of the netflix movies on the users preferences and ratings.

## Initialization
We first import the relevant libraries to carry out the processes.
We then read in each of the files and create a DataFrame consisting of parsed lines.
Here our source file is from netflix dataset with over 100000 ratings given to more then 17 thousand movies. It is a part of netflix user data issued for the netflix prize. This dataset provides movie name,year of release,ratings and a particular movie id assigned to each movie.

In [2]:
import os
import unittest as Test
dbfs_dir = 's3://dsci6007movienetflix/Netflix/'#path to our s3 bucket sourcing our file
ratings_filename = dbfs_dir + '/TestingRatings.txt'#source dataset
movies_filename = dbfs_dir + '/movie_titles.txt'#supporting our dataset with movie names

In this step, we will import methods from pyspark which are very helpful in running our code in our EMR cluster works and initiate our variables to the values in the dataset

In [3]:
from pyspark.sql.types import *

ratings_df_schema = StructType(
  [StructField('userId', IntegerType()),
   StructField('movieId', IntegerType()),
   StructField('rating', DoubleType())]
)
movies_df_schema = StructType(
  [StructField('ID', IntegerType()),
   StructField('year', IntegerType()),
   StructField('title', StringType()),]
)

### Load and Cache

By now, the assigned datasets should be hosted on S3. We're going to be accessing this data a lot. Rather than read it over and over again from S3, we'll cache both the movies DataFrame and the ratings DataFrame in memory.

In [4]:
from pyspark.sql.functions import regexp_extract
from pyspark.sql.types import *

raw_ratings_df = sqlContext.read.format('csv').options(header=True, inferSchema=False).schema(ratings_df_schema).load(ratings_filename)
ratings_df = raw_ratings_df.drop('Timestamp')

raw_movies_df = sqlContext.read.format('csv').options(header=True, inferSchema=False).schema(movies_df_schema).load(movies_filename)
movies_df = raw_movies_df.drop('Genres').withColumnRenamed('movieId', 'ID')

ratings_df.cache()
movies_df.cache()

assert ratings_df.is_cached
assert movies_df.is_cached

raw_ratings_count = raw_ratings_df.count()
ratings_count = ratings_df.count()
raw_movies_count = raw_movies_df.count()
movies_count = movies_df.count()

print('There are %s ratings and %s movies in the datasets' % (ratings_count, movies_count))
print('Ratings:')
ratings_df.show(3)
print('Movies:')
movies_df.show(3, truncate=False)

assert raw_ratings_count == ratings_count
assert raw_movies_count == movies_count

                                                                                

There are 100477 ratings and 17769 movies in the datasets
Ratings:
+------+-------+------+
|userId|movieId|rating|
+------+-------+------+
|     8|2149668|   3.0|
|     8|1089184|   3.0|
|     8|2465894|   3.0|
+------+-------+------+
only showing top 3 rows

Movies:
+---+----+----------------------------+
|ID |year|title                       |
+---+----+----------------------------+
|2  |2004|Isle of Man TT 2004 Review  |
|3  |1997|Character                   |
|4  |1994|Paula Abdul's Get Up & Dance|
+---+----+----------------------------+
only showing top 3 rows



Next, let's do a quick verification of the data.

Let's take a quick look at some of the data in the two DataFrames.

In [5]:
display(movies_df)

DataFrame[ID: int, year: int, title: string]

In [6]:
display(ratings_df)

DataFrame[userId: int, movieId: int, rating: double]

# Basic Recommendations

One method of film recommendation is to always suggest the films with the highest average rating.
We'll utilize Spark to identify the name, number of ratings, and average rating of the 20 films with the highest average rating and at least 500 reviews in this section.
We want to filter our movies that have good ratings but less than or equal to 500 reviews because films with few reviews may not appeal to everyone. 

### (a) Movies with Highest Average Ratings

Let's determine the movies with the highest average ratings.
The steps you performed here are:
1. Recall that the `ratings_df` contains three columns:
    - The ID of the user who rated the film
    - the ID of the movie being rated
    - and the rating.
   First, we transform `ratings_df` into a second DataFrame, `movie_ids_with_avg_ratings`, with the following columns:
    - The movie ID
    - The number of ratings for the movie
    - The average of all the movie's ratings
2. Then, Transformation of `movie_ids_with_avg_ratings` to another DataFrame, `movie_names_with_avg_ratings_df` that adds the movie name to each row. `movie_names_with_avg_ratings_df`
   will contain these columns:
    - The movie ID
    - The movie name
    - The number of ratings for the movie
    - The average of all the movie's ratings


In [7]:
from pyspark.sql import functions as F
# From ratingsDF, create a movie_ids_with_avg_ratings_df that combines the two DataFrames
movie_ids_with_avg_ratings_df = ratings_df.groupBy('movieId').agg(F.count(ratings_df.rating).alias("count"), F.avg(ratings_df.rating).alias("average"))
print('movie_ids_with_avg_ratings_df:')
movie_ids_with_avg_ratings_df.show(3, truncate=False)
# Note: movie_names_df is a temporary variable, used only to separate the steps necessary
# to create the movie_names_with_avg_ratings_df DataFrame.
movie_names_df = movie_ids_with_avg_ratings_df.join(movies_df,movies_df.ID==movie_ids_with_avg_ratings_df.movieId)
movie_names_with_avg_ratings_df = movie_names_df.select('average','title','count','movieId').sort('average',ascending=False)

print('movie_names_with_avg_ratings_df:')
movie_names_with_avg_ratings_df.show(3, truncate=False)

movie_ids_with_avg_ratings_df:


[Stage 14:>                                                         (0 + 1) / 1]22/05/01 21:18:21 WARN Utils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.debug.maxToStringFields' in SparkEnv.conf.
                                                                                

+-------+-----+-------+
|movieId|count|average|
+-------+-----+-------+
|2358799|2    |3.5    |
|973051 |4    |4.25   |
|1189060|3    |3.0    |
+-------+-----+-------+
only showing top 3 rows

movie_names_with_avg_ratings_df:
+-------+-------------------+-----+-------+
|average|title              |count|movieId|
+-------+-------------------+-----+-------+
|5.0    |Chaplin: The Movie |4    |8815   |
|5.0    |Faith of My Fathers|1    |11215  |
|5.0    |Fat City           |1    |6699   |
+-------+-------------------+-----+-------+
only showing top 3 rows



In [38]:
Test=unittest.TestCase()

### (b) Movies with highest average ratings and at least with 500 reviews

Now that we have a DataFrame of the movies with highest average ratings, we can use Spark to determine the 20 movies with highest average ratings and at least 500 reviews.
We then add a single DataFrame transformation to limit the results to movies with ratings from at least 500 people.

In [8]:
movies_with_500_ratings_or_more = movie_names_with_avg_ratings_df.filter(movie_names_with_avg_ratings_df['count']>=10).sort('average',ascending=False)
print('Movies with highest ratings:')
movies_with_500_ratings_or_more.show(20, truncate=False)

Movies with highest ratings:
+------------------+------------------------------------+-----+-------+
|average           |title                               |count|movieId|
+------------------+------------------------------------+-----+-------+
|3.8               |Un Chien Andalou                    |10   |5980   |
|3.7               |Into the Sun                        |10   |16273  |
|3.3636363636363638|Street Fighter Alpha                |11   |11796  |
|3.272727272727273 |Madonna: The Drowned World Tour 2001|11   |12812  |
|2.9375            |Crouching Tiger                     |16   |16272  |
|2.7               |Mother's Day                        |10   |1333   |
|2.2857142857142856|In Dreams                           |14   |3321   |
+------------------+------------------------------------+-----+-------+



One technique to increase the quality of the recommendations is to set a threshold for the amount of reviews, but there are many additional options.
For example, weight ratings based on the number of ratings. 

## Collaborative Filtering

For movie suggestions, we start with a matrix using movie ratings from users as entries.
Each row represents a certain movie, while each column represents a user.
We don't know all of the entries in this matrix because not all users have reviewed all of the movies, which is why we require collaborative filtering.
We only have ratings for a portion of the movies for each user.
The idea behind collaborative filtering is to factorize the ratings matrix as the product of two matrices: one that represents the attributes of each use and another that defines the properties of each movie. 

We want to use these two matrices so that the error for users/movie combinations for which we know the right ratings is as small as possible.
The alternating least squares algorithm accomplishes this by randomly filling the users matrix with values and then optimizing the value of the movies to minimize the error.
Then it optimizes the value of the user's matrix while keeping the movies matrix constant.
The "alternating" in the name refers to the fact that the matrix to optimize changes from time to time.

Using the optimization, we use the known ratings to obtain the optimal values for the movie factors.
Then, given set movie factors, we "alternate" and choose the best user factors. 


### (a) Creating a Training Set

Before we use machine learning algorithms, we need to break up the `ratings_df` dataset into three pieces:
* A training set (DataFrame), which we will use to train models
* A validation set (DataFrame), which we will use to choose the best model
* A test set (DataFrame), which we will use for our experiments
To randomly split the dataset into the multiple groups, we can use the pySpark randomSplit() as below.

In [9]:
# We'll hold out 60% for training, 20% of our data for validation, and leave 20% for testing
seed = 1800009193
(split_60_df, split_a_20_df, split_b_20_df) = ratings_df.randomSplit([0.60,0.20,0.20],seed)
# Let's cache these datasets for performance
training_df = split_60_df.cache()
validation_df = split_a_20_df.cache()
test_df = split_b_20_df.cache()
print('Training: {0}, validation: {1}, test: {2}\n'.format(
  training_df.count(), validation_df.count(), test_df.count())
)
training_df.show(3)
validation_df.show(3)
test_df.show(3)

Training: 60354, validation: 20041, test: 20082

+------+-------+------+
|userId|movieId|rating|
+------+-------+------+
|     8|   9660|   3.0|
|     8| 112790|   3.0|
|     8| 155279|   3.0|
+------+-------+------+
only showing top 3 rows

+------+-------+------+
|userId|movieId|rating|
+------+-------+------+
|     8| 152955|   3.0|
|     8| 394189|   2.0|
|     8| 448155|   4.0|
+------+-------+------+
only showing top 3 rows

+------+-------+------+
|userId|movieId|rating|
+------+-------+------+
|     8| 254671|   4.0|
|     8| 264988|   3.0|
|     8| 500322|   3.0|
+------+-------+------+
only showing top 3 rows



After splitting the dataset, your training set has about 60354 entries and the validation and test sets each have about 20000 entries. 

### (b) Alternating Least Squares

We'll utilize the Apache Spark ML Pipeline implementation of Alternating Least Squares.A training dataset and certain parameters that regulate the model construction process are passed to ALS.
We'll use ALS to train numerous models to find the optimum values for the parameters, then pick the best model and utilize the parameters from that model in the rest of the lab session. 
The process we will use for determining the best model is as follows:

1. Pick a set of model parameters. The most important parameter to model is the rank, which is the number of columns in the Users matrix or the number of rows in the Movies matrix. In general, a lower rank will mean higher error on the training dataset, but a high rank may lead to overfitting. 

2. Set the appropriate parameters on the `ALS` object:
    * The "User" column will be set to the values in our `userId` DataFrame column.
    * The "Item" column will be set to the values in our `movieId` DataFrame column.
    * The "Rating" column will be set to the values in our `rating` DataFrame column.
    * We'll using a regularization parameter of 0.1.

In [10]:
from pyspark.ml.recommendation import ALS
# Let's initialize our ALS learner
als = ALS()
# Now we set the parameters for the method
als.setMaxIter(5)\
   .setSeed(seed)\
   .setRegParam(0.1)\
   .setUserCol("userId")\
   .setItemCol("movieId")\
   .setRatingCol("rating")

# Now let's compute an evaluation metric for our test dataset
from pyspark.ml.evaluation import RegressionEvaluator

# Create an RMSE evaluator using the label and predicted columns
reg_eval = RegressionEvaluator(predictionCol="prediction", labelCol="rating", metricName="rmse")

tolerance = 0.03
ranks = [4, 8, 12]
errors = [0, 0, 0]
models = [0, 0, 0]
err = 0
min_error = float('inf')
best_rank = -1
for rank in ranks:
  # Set the rank here:
  als.setRank(rank)
  # Create the model with these parameters.
  model = als.fit(training_df)
  # Run the model to create a prediction. Predict against the validation_df.
  predict_df = model.transform(validation_df)

  # Remove NaN values from prediction (due to SPARK-14489)
  predicted_ratings_df = predict_df.filter(predict_df.prediction != float('nan'))

  # Run the previously created RMSE evaluator, reg_eval, on the predicted_ratings_df DataFrame
  error = reg_eval.evaluate(predicted_ratings_df)
  errors[err] = error
  models[err] = model
  print ('For rank %s the RMSE is %s' % (rank, error))
  if error < min_error:
    min_error = error
    best_rank = err
  err += 1

als.setRank(ranks[best_rank])
print('The best model was trained with rank %s' % ranks[best_rank])
my_model = models[best_rank]

                                                                                

For rank 4 the RMSE is 4.22533894559318


                                                                                

For rank 8 the RMSE is 2.8321088010878492




For rank 12 the RMSE is 2.7424852680717313
The best model was trained with rank 12


                                                                                

### (c) Testing Your Model

To date, we've selected the best model using the 'training df' and 'validation df' datasets.
We can't use these two datasets to assess how well the model is because we used them to establish which model is best; otherwise, we'd be prone to overfitting.
We'll use the 'test df' dataset to see how good our model is.
We'll develop a model for predicting the ratings for the test dataset using the 'best rank' you determined in step (b), and then compute the RMSE. 

In [11]:
predict_df = my_model.transform(test_df)

# Remove NaN values from prediction (due to SPARK-14489)
predicted_test_df = predict_df.filter(predict_df.prediction != float('nan'))

# Run the previously created RMSE evaluator, reg_eval, on the predicted_test_df DataFrame
test_RMSE = reg_eval.evaluate(predicted_test_df)

print('The model had a RMSE on the test set of {0}'.format(test_RMSE))



The model had a RMSE on the test set of 2.7316792063322315


                                                                                

In [16]:
predicted_test_df.show()

+------+-------+------+-----------+
|userId|movieId|rating| prediction|
+------+-------+------+-----------+
|  2366| 124342|   4.0| 0.17026067|
|  2366| 298900|   2.0|   2.295404|
|  2366| 819822|   3.0|  1.4053096|
|  2366| 934835|   4.0| 0.95326215|
|  2366| 962019|   1.0|   2.202037|
|  2366|1036350|   4.0|  3.8729734|
|  2366|1059927|   4.0|-0.30296922|
|  2366|1082263|   3.0|  1.7261463|
|  2366|1269998|   1.0|  0.6576985|
|  2366|1386053|   3.0|  1.4028765|
|  2366|1417705|   1.0|  1.3606554|
|  2366|1979447|   3.0|  2.8431706|
| 11317| 828344|   2.0|  0.7400894|
| 11317|1057992|   2.0|  1.4427508|
| 11317|1258876|   3.0|  2.8689241|
| 11317|1903604|   3.0|  0.8837402|
| 11317|2281080|   3.0|  2.1490357|
|  4190|1874239|   4.0|  2.0246134|
|  9517|  78404|   4.0|-0.79945314|
|  9517| 138649|   1.0| 0.41861713|
+------+-------+------+-----------+
only showing top 20 rows



In [None]:
# TEST Testing our Model (c)
Test.assertTrue(abs(test_RMSE - 0.809624038485) < tolerance, 'incorrect test_RMSE: {0:.11f}'.format(test_RMSE))

### (d) Comparing our Model

One technique to assess the quality of our model is to compare the RMSE of the model's projected results to the values in the test set.
Another technique to assess the model is to assess the error from a test set in which each rating is the average of the training set's ratings. 

In [17]:
# Compute the average rating
avg_rating_df = training_df.select('rating').agg(F.avg("rating")).head(1)[0][0]

print('The average rating for movies in the training set is {0}'.format(avg_rating_df))

# Add a column with the average rating
test_for_avg_df = test_df.withColumn('prediction', F.lit(avg_rating_df))

# Run the previously created RMSE evaluator, reg_eval, on the test_for_avg_df DataFrame
test_avg_RMSE = reg_eval.evaluate(test_for_avg_df)

print("The RMSE on the average set is {0}".format(test_avg_RMSE))




The average rating for movies in the training set is 3.4815753719720317
The RMSE on the average set is 1.0924531185162887


You now have code to predict how users will rate movies!

## Part 3: Predictions for ourself

In [18]:
print('Most rated movies:')
print('(average rating, movie name, number of reviews, movie ID)')
display(movies_with_500_ratings_or_more.orderBy(movies_with_500_ratings_or_more['average'].desc()).take(50))

Most rated movies:
(average rating, movie name, number of reviews, movie ID)


[Row(average=3.8, title='Un Chien Andalou', count=10, movieId=5980),
 Row(average=3.7, title='Into the Sun', count=10, movieId=16273),
 Row(average=3.3636363636363638, title='Street Fighter Alpha', count=11, movieId=11796),
 Row(average=3.272727272727273, title='Madonna: The Drowned World Tour 2001', count=11, movieId=12812),
 Row(average=2.9375, title='Crouching Tiger', count=16, movieId=16272),
 Row(average=2.7, title="Mother's Day", count=10, movieId=1333),
 Row(average=2.2857142857142856, title='In Dreams', count=14, movieId=3321)]

The user ID 0 is unassigned, so we will use it for your ratings. We set the variable `my_user_ID` to 0 for you. Next, create a new DataFrame called `my_ratings_df`, with your ratings for at least 10 movie ratings. Each entry should be formatted as `(my_user_id, movieID, rating)`.  As in the original dataset, ratings should be between 1 and 5 (inclusive). If you have not seen at least 10 of these movies, you can increase the parameter passed to `take()` in the above cell until there are 10 movies that you have seen (or you can also guess what your rating would be for movies you have not seen).

In [19]:
from pyspark.sql import Row
my_user_id = 0

# Note that the movie IDs are the *last* number on each line. A common error was to use the number of ratings as the movie ID.
my_rated_movies = [
     
     # The format of each line is (my_user_id, movie ID, your rating)
     # For example, to give the movie "Star Wars: Episode IV - A New Hope (1977)" a five rating, you would add the following line:
     #   (my_user_id, 260, 5),
    (my_user_id, 171011,  4),
     (my_user_id, 905, 3),
     (my_user_id, 2329,  3),
     (my_user_id, 26082, 5),
     (my_user_id, 26082,  4),
     (my_user_id, 1234, 3),
     (my_user_id, 1217, 5),
     (my_user_id, 1136,  2),
     (my_user_id, 5971, 4),
     (my_user_id, 2571, 3) 
]

my_ratings_df = sqlContext.createDataFrame(my_rated_movies, ['userId','movieId','rating'])
print('My movie ratings:')
my_ratings_df.show(10)

My movie ratings:
+------+-------+------+
|userId|movieId|rating|
+------+-------+------+
|     0| 171011|     4|
|     0|    905|     3|
|     0|   2329|     3|
|     0|  26082|     5|
|     0|  26082|     4|
|     0|   1234|     3|
|     0|   1217|     5|
|     0|   1136|     2|
|     0|   5971|     4|
|     0|   2571|     3|
+------+-------+------+



### (b) Add the Movies to Training Dataset
Now that we have our own ratings, we must add them to the 'training' dataset so that the model we train will take the preferences into account.
Spark's transformation joins two DataFrames together, using 'unionAll()' to generate a new training dataset that includes your ratings as well as the data from the original training dataset. 

In [20]:
training_with_my_ratings_df = training_df.unionAll(my_ratings_df)

print ('The training dataset now has %s more entries than the original training dataset' %
       (training_with_my_ratings_df.count() - training_df.count()))
assert (training_with_my_ratings_df.count() - training_df.count()) == my_ratings_df.count()

The training dataset now has 10 more entries than the original training dataset


### (c) Train a Model with Your Ratings

In [21]:
# Reset the parameters for the ALS object.
als.setPredictionCol("prediction")\
   .setMaxIter(5)\
   .setSeed(seed)\
   .setUserCol("userId")\
   .setItemCol("movieId")\
   .setRatingCol("rating")\
   .setRank(8)

# Create the model with these parameters.
my_ratings_model = als.fit(training_with_my_ratings_df)

### (d) Check RMSE for the New Model with Your Ratings

Compute the RMSE for this new model on the test set.

In [22]:
my_predict_df = my_ratings_model.transform(test_df)

# Remove NaN values from prediction (due to SPARK-14489)
predicted_test_my_ratings_df = my_predict_df.filter(my_predict_df.prediction != float('nan'))

# Run the previously created RMSE evaluator, reg_eval, on the predicted_test_my_ratings_df DataFrame
test_RMSE_my_ratings = reg_eval.evaluate(predicted_test_my_ratings_df)
print('The model had a RMSE on the test set of {0}'.format(test_RMSE_my_ratings))



The model had a RMSE on the test set of 2.840537106683272


                                                                                

### (e) Predict the Ratings

So far, we have only computed the error of the model.  Next, let's predict what ratings we would give to the movies that we did not already provide ratings for.

In [23]:
# Create a list of my rated movie IDs
my_rated_movie_ids = [x[1] for x in my_rated_movies]
# Filter out the movies I already rated.
not_rated_df = movies_df.filter(~movies_df["ID"].isin(my_rated_movie_ids))
# Rename the "ID" column to be "movieId", and add a column with my_user_id as "userId".
my_unrated_movies_df = not_rated_df.withColumn("userId", F.lit(0)).withColumnRenamed("ID","movieId").drop('title').withColumn('rating',F.lit(0))


In [24]:
# Create a list of my rated movie IDs
my_rated_movie_ids = [x[1] for x in my_rated_movies]
# Filter out the movies I already rated.
not_rated_df = movies_df.filter(~movies_df["ID"].isin(my_rated_movie_ids))
# Rename the "ID" column to be "movieId", and add a column with my_user_id as "userId".
my_unrated_movies_df = not_rated_df.withColumnRenamed("ID", "movieId").withColumn("userId",F.lit(my_user_id))
raw_predicted_ratings_df = my_ratings_model.transform(my_unrated_movies_df)
predicted_ratings_df = raw_predicted_ratings_df.filter(raw_predicted_ratings_df['prediction'] != float('nan'))

In [36]:
predicted_ratings_df.show()

+-------+----+--------------------+------+----------+
|movieId|year|               title|userId|prediction|
+-------+----+--------------------+------+----------+
|      7|1992|               8 Man|     0|-3.1314156|
|     79|1956|         The Killing|     0| 2.0580237|
|    481|2002|Building the Grea...|     0|  1.332852|
|    769|1958|    The Crawling Eye|     0| 2.7147417|
|    906|1969|Benny Hill: Compl...|     0| 0.7379707|
|   1310|2002|          Revelation|     0| 2.7935019|
|   1333|1980|        Mother's Day|     0|  0.567906|
|   1427|1998|            Sweepers|     0| 2.4487655|
|   1442|2001|Sade: Life Promis...|     0|0.27922902|
|   1457|1973|The White Seal / ...|     0|-2.2007918|
|   1527|2003|   Hawaiian Paradise|     0| 0.8778585|
|   1918|2004|           The Alamo|     0| 1.2805192|
|   2000|1994|Four Weddings and...|     0| 1.7917471|
|   2128|1993|                Rudy|     0| 1.9295458|
|   2213|2000| Little Rascals #1-2|     0| 3.7195857|
|   2225|1972|Chariots of th

In [26]:
my_unrated_movies_df.show()

+-------+----+--------------------+------+
|movieId|year|               title|userId|
+-------+----+--------------------+------+
|      2|2004|Isle of Man TT 20...|     0|
|      3|1997|           Character|     0|
|      4|1994|Paula Abdul's Get...|     0|
|      5|2004|The Rise and Fall...|     0|
|      6|1997|                Sick|     0|
|      7|1992|               8 Man|     0|
|      8|2004|What the #$*! Do ...|     0|
|      9|1991|Class of Nuke 'Em...|     0|
|     10|2001|             Fighter|     0|
|     11|1999|Full Frame: Docum...|     0|
|     12|1947|My Favorite Brunette|     0|
|     13|2003|Lord of the Rings...|     0|
|     14|1982|  Nature: Antarctica|     0|
|     15|1988|Neil Diamond: Gre...|     0|
|     16|1996|           Screamers|     0|
|     17|2005|           7 Seconds|     0|
|     18|1994|    Immortal Beloved|     0|
|     19|2000|By Dawn's Early L...|     0|
|     20|1972|     Seeta Aur Geeta|     0|
|     21|2002|   Strange Relations|     0|
+-------+--

In [31]:
raw_predicted_ratings_df = my_ratings_model.transform(my_unrated_movies_df)
predicted_ratings_df = raw_predicted_ratings_df.filter(raw_predicted_ratings_df['prediction'] != float('nan'))

In [32]:
predicted_ratings_df.show()

+-------+----+--------------------+------+----------+
|movieId|year|               title|userId|prediction|
+-------+----+--------------------+------+----------+
|      7|1992|               8 Man|     0|-3.1314156|
|     79|1956|         The Killing|     0| 2.0580237|
|    481|2002|Building the Grea...|     0|  1.332852|
|    769|1958|    The Crawling Eye|     0| 2.7147417|
|    906|1969|Benny Hill: Compl...|     0| 0.7379707|
|   1310|2002|          Revelation|     0| 2.7935019|
|   1333|1980|        Mother's Day|     0|  0.567906|
|   1427|1998|            Sweepers|     0| 2.4487655|
|   1442|2001|Sade: Life Promis...|     0|0.27922902|
|   1457|1973|The White Seal / ...|     0|-2.2007918|
|   1527|2003|   Hawaiian Paradise|     0| 0.8778585|
|   1918|2004|           The Alamo|     0| 1.2805192|
|   2000|1994|Four Weddings and...|     0| 1.7917471|
|   2128|1993|                Rudy|     0| 1.9295458|
|   2213|2000| Little Rascals #1-2|     0| 3.7195857|
|   2225|1972|Chariots of th

In [29]:
spark.conf.set("spark.sql.crossJoin.enabled",True)

In [33]:
predicted_ratings_df.show()

+-------+----+--------------------+------+-----------+
|movieId|year|               title|userId| prediction|
+-------+----+--------------------+------+-----------+
|   7576|1953|        Kiss Me Kate|     0| 0.19918247|
|   9597|1965|War-Gods of the Deep|     0|  1.9035152|
|    481|2002|Building the Grea...|     0|   1.332852|
|  15191|2003|           Figure 17|     0|   2.107647|
|   6460|2005|The Game: The Doc...|     0| 0.55362666|
|   2678|1996|     Price of Desire|     0|  0.7996334|
|   3595|1988|              Elvira|     0|  1.1335566|
|   7284|2005|        Morning Raga|     0|  3.2076926|
|   9399|1991|Legend of the Dra...|     0|  2.5769486|
|  17433|1936|    Follow the Fleet|     0| 0.92909527|
|  14403|1996|The Long Kiss Goo...|     0|  1.0385376|
|  15814|1941|           Suspicion|     0|   2.330772|
|   7601|1943|Son of Dracula / ...|     0|  1.8285698|
|  11462|1991|     Mortal Thoughts|     0|  1.5076497|
|   3039|1967|Dark Shadows: Vol. 1|     0|-0.52973473|
|  11215|2

### (f) Predict the Ratings

We have our predicted ratings. Now we can print out the 25 movies with the highest predicted ratings.

In [34]:
predicted_with_counts_df = predicted_ratings_df.join(movie_names_with_avg_ratings_df)
predicted_highest_rated_movies_df = predicted_with_counts_df.sort(F.col("prediction").desc())
print ('My 25 highest rated movies as predicted (for movies with more than 75 reviews):')
predicted_highest_rated_movies_df.show(25)

My 25 highest rated movies as predicted (for movies with more than 75 reviews):
+-------+----+--------+------+----------+-----------------+--------------------+-----+-------+
|movieId|year|   title|userId|prediction|          average|               title|count|movieId|
+-------+----+--------+------+----------+-----------------+--------------------+-----+-------+
|   2905|1998|Croupier|     0| 4.7978663|              5.0|            Fat City|    1|   6699|
|   2905|1998|Croupier|     0| 4.7978663|4.333333333333333|Day of the Dead 2...|    3|   7921|
|   2905|1998|Croupier|     0| 4.7978663|              5.0|       The Red Shoes|    3|  14704|
|   2905|1998|Croupier|     0| 4.7978663|              5.0|               8 Man|    2|      7|
|   2905|1998|Croupier|     0| 4.7978663|              5.0|                 Rat|    3|  12876|
|   2905|1998|Croupier|     0| 4.7978663|              5.0|  Felicity: Season 2|    1|   4783|
|   2905|1998|Croupier|     0| 4.7978663|              5.0| Faith

### Showing the Top 25 Highest rated movies.

In [39]:
predicted_with_counts_df = predicted_ratings_df.join(movie_names_with_avg_ratings_df,predicted_ratings_df.movieId==movie_names_with_avg_ratings_df.movieId).select("average", movie_names_with_avg_ratings_df["title"], movie_names_with_avg_ratings_df["movieId"], "count","prediction")
predicted_highest_rated_movies_df = predicted_with_counts_df.filter(predicted_with_counts_df['count']>5 ).sort('average',ascending=False)
print ('My 25 highest rated movies as predicted (for movies with more than 75 reviews):')
predicted_highest_rated_movies_df.show(25)

My 25 highest rated movies as predicted (for movies with more than 75 reviews):
+------------------+--------------------+-------+-----+-----------+
|           average|               title|movieId|count| prediction|
+------------------+--------------------+-------+-----+-----------+
| 4.428571428571429|Joseph Campbell a...|   3998|    7|  1.5132041|
| 4.333333333333333|Mary Tyler Moore:...|  11043|    9|  1.0767839|
| 4.166666666666667|Brotherhood of Ju...|  17421|    6|  3.2775755|
|               4.0| Little Rascals #1-2|   2213|    6|  3.7195857|
|               4.0|   O: Bonus Material|  16287|    6|  1.0035919|
|               4.0|            Russkies|   5618|    6|   4.338407|
|               3.8|    Un Chien Andalou|   5980|   10|  3.9360054|
|3.7142857142857144|Stargate SG-1: Se...|   5530|    7|   2.372392|
|               3.7|        Into the Sun|  16273|   10|    2.26118|
|3.6666666666666665|           High Noon|  14642|    6|  2.5619767|
|3.6666666666666665| Twelve O'Clock 

# Conclusion
We here successfully carried out our required process of predicting and creating the list of top 25 highest rated movies based on the given parameters and using the default resources of spark on Amazon EMR.
