<h1 align="center">Enhancing Movie Recommender Accuracy - A Comprehensive Study on Hyperparameter Optimization and Genre-Specific Error Analysis in Spark</h1>

<h2 align="center">Sai Sanwariya Narayan</h2>


### Tasks:
    - Use the best hyper-parameters found in ALS-Powered-Movie-Recommender to construct an ALS model.
    - Compute average prediction error for each movie and genre.
    - Analyze the relationship between the number of reviews a movie has and the prediction error.
    - Use `persist()` and `unpersist()` to manage performance.
    - Utilize Spark DataFrame and RDD transformations and actions for data processing and analysis.

### Key Components:
- **Data Preparation**: Reading and preprocessing movie and ratings data into Spark DataFrames and RDDs.
- **Model Building**: Constructing an ALS model with the best hyper-parameters.
- **Model Evaluation**: Generating predictions, calculating prediction errors, and joining RDDs/DataFrames for analysis.
- **Analysis**:
    - Calculating average prediction error for each movie.
    - Examining the correlation between the number of reviews and prediction error.
    - Analyzing the prediction errors by movie genres.

## Jupyter Notebook running pyspark.

In [1]:
import pyspark
import pandas as pd
import numpy as np
import math

### Once we import pyspark, we need to import "SparkContext".  Every spark program needs a SparkContext object. In order to use Spark SQL and Spark DataFrames, we also need to import SparkSession from PySpark.SQL

In [2]:
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import StructField, StructType, StringType, LongType, IntegerType, FloatType
from pyspark.sql.functions import col, column
from pyspark.sql.functions import expr
from pyspark.sql.functions import split
from pyspark.sql.functions import array_contains
from pyspark.sql import Row
from pyspark.mllib.recommendation import ALS

## We then create a Spark Session variable in order to use DataFrame. 

In [3]:
ss=SparkSession.builder.master("local").appName("Lab7B ALS Model Weakness Eval").getOrCreate()

In [4]:
ss.sparkContext.setCheckpointDir("~/scratch")

In [5]:
rating_schema = StructType([ StructField("UserID", IntegerType(), False ), \
                            StructField("MovieID", IntegerType(), True), \
                            StructField("Rating", FloatType(), True ), \
                            StructField("RatingID", IntegerType(), True ), \
                           ])

In [6]:
ratings_DF = ss.read.csv("/storage/home/ratings_2.csv", schema=rating_schema, header=True, inferSchema=False)

In [7]:
ratings_DF.printSchema()

root
 |-- UserID: integer (nullable = true)
 |-- MovieID: integer (nullable = true)
 |-- Rating: float (nullable = true)
 |-- RatingID: integer (nullable = true)



In [8]:
ratings2_DF = ratings_DF.select("UserID","MovieID","Rating")

In [9]:
ratings2_RDD = ratings2_DF.rdd
ratings2_RDD.take(3)

[Row(UserID=1, MovieID=31, Rating=2.5),
 Row(UserID=1, MovieID=1029, Rating=3.0),
 Row(UserID=1, MovieID=1061, Rating=3.0)]

In [10]:
movie_schema = StructType([ StructField("MovieID", IntegerType(), False), \
                            StructField("MovieTitle", StringType(), True ), \
                            StructField("Genres", StringType(), True ), \
                           ])

In [11]:
movies_DF = ss.read.csv("/storage/home/movies_2.csv", schema=movie_schema, header=True, inferSchema=False)
# In the cluster mode, we need to change to `header=False` because it does not have header.

### The code below transforms the entire ratings dataset into a format for model input, which will be used for generating model prediction.

In [12]:
ratings_input_RDD = ratings2_RDD.map(lambda x: (x[0], x[1]) )

# Constructing an ALS-based Movie Recommendation Model Using the Best Hyperparameters

In [13]:
best_rank = 13
best_iteration = 30
best_regularization_param = 0.2

In [14]:
model = ALS.train(ratings2_RDD, best_rank, seed=17, iterations=best_iteration, lambda_=best_regularization_param)

# Evaluate an ALS model
The standard procedure in evaluating a machine learning model involves two steps:
- Step 1: Using the model to generate **predicted output** for validation data.
- Step 2: Compare the **predicted output** of validation data with the **actual output** of validation data to calculate **validation errors**.

## <ALS model>.predictAll method
- takes an RDD in the format of (\<user_ID\>, \<movie_ID\>)
- returns an RDD in the format of (\<user_ID\>, \<movie_ID\>, \<predicted rating\>)

## Using RDD-based join for model evaluation
- Because this program uses RDD-based ALS model learning in MLlib PySpark module, we will use RDD-based join to combine actual output and predicted output for each input user-movie pair. 
- Because we want to combine predicted output and actual output for each input user-movie pair, we can use user-movie pairs as **keys** in RDD-based join.
- Therefore, we want to transform RDDs that contain target/actual output into a key-value pair format where the key is user-movie pair.
- Similarly, we want to transform RDDs that contain predicted output into a key-value pair format where the key is user-movie pair.

# Finding all Genres
We want to find all genres in the movies so that we can evaluate and compare the average prediction across all genres.

In [15]:
movies_DF.printSchema()

root
 |-- MovieID: integer (nullable = true)
 |-- MovieTitle: string (nullable = true)
 |-- Genres: string (nullable = true)



In [16]:
movies2_DF = movies_DF.withColumn("GenresList", split(col("Genres"), '\|'))
# Notice: We used "GenresList" as the name of the column, which is a bit different from a similar column in previous labs.

In [17]:
movies2_DF.printSchema()

root
 |-- MovieID: integer (nullable = true)
 |-- MovieTitle: string (nullable = true)
 |-- Genres: string (nullable = true)
 |-- GenresList: array (nullable = true)
 |    |-- element: string (containsNull = false)



In [18]:
movies2_DF.show(3)

+-------+--------------------+--------------------+--------------------+
|MovieID|          MovieTitle|              Genres|          GenresList|
+-------+--------------------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|[Adventure, Anima...|
|      2|      Jumanji (1995)|Adventure|Childre...|[Adventure, Child...|
|      3|Grumpier Old Men ...|      Comedy|Romance|   [Comedy, Romance]|
+-------+--------------------+--------------------+--------------------+
only showing top 3 rows



In [19]:
genres_rdd = movies2_DF.select("GenresList").rdd

In [20]:
genres_rdd.take(3)

[Row(GenresList=['Adventure', 'Animation', 'Children', 'Comedy', 'Fantasy']),
 Row(GenresList=['Adventure', 'Children', 'Fantasy']),
 Row(GenresList=['Comedy', 'Romance'])]

## We want to obtain and flatten the content of "GenresList" from each Row object. 

In [21]:
genres_flatten_rdd = genres_rdd.flatMap(lambda x: x["GenresList"])

In [22]:
genres_flatten_rdd.take(3)

['Adventure', 'Animation', 'Children']

In [23]:
genres_count_rdd = genres_flatten_rdd.map(lambda x: (x, 1)).reduceByKey(lambda a, b: a+b)

In [24]:
genres_count_rdd.take(3)

[('Adventure', 1117), ('Animation', 447), ('Children', 583)]

In [25]:
all_genres_list = genres_count_rdd.map(lambda x: x[0]).collect()

In [26]:
print(all_genres_list)

['Adventure', 'Animation', 'Children', 'Comedy', 'Fantasy', 'Romance', 'Drama', 'Action', 'Crime', 'Thriller', 'Horror', 'Mystery', 'Sci-Fi', 'Documentary', 'IMAX', 'War', 'Musical', 'Western', 'Film-Noir', '(no genres listed)']


# Predict Rating Accuracy

In [27]:
ratings_prediction_RDD = model.predictAll(ratings_input_RDD).map(lambda x: ( (x[0], x[1] ), x[2] ))

In [28]:
ratings_prediction_RDD.take(3)

[((253, 37739), 3.316578140685179),
 ((547, 142192), 3.3128919479037706),
 ((599, 69069), 3.9300891532733977)]

In [29]:
ratings_evaluation_RDD = ratings2_RDD.map(lambda x: ((x[0], x[1]), x[2])).join(ratings_prediction_RDD)

In [30]:
ratings_evaluation_RDD.take(3)

[((1, 31), (2.5, 2.1610668335889045)),
 ((1, 1029), (3.0, 2.5177951218502104)),
 ((1, 1061), (3.0, 2.4485821501198686))]

In [31]:
ratings_prediction_error_RDD= ratings_evaluation_RDD.map(lambda x: (x[0], abs(x[1][0]-x[1][1])))

In [32]:
ratings_prediction_error_RDD.take(3)

[((1, 31), 0.3389331664110955),
 ((1, 1029), 0.48220487814978963),
 ((1, 1061), 0.5514178498801314)]

In [33]:
UserMovieError_RDD = ratings_prediction_error_RDD.map(lambda x: (x[0][0], x[0][1], x[1]))

In [34]:
ColumnNames=["UserID", "MovieID", "error"]
UserMovieError_DF = UserMovieError_RDD.toDF(ColumnNames)

In [35]:
UserMovieError_DF.show(5)

+------+-------+-------------------+
|UserID|MovieID|              error|
+------+-------+-------------------+
|     1|     31| 0.3389331664110955|
|     1|   1029|0.48220487814978963|
|     1|   1061| 0.5514178498801314|
|     1|   1129| 0.2525636057718841|
|     1|   1263| 0.5406837054455749|
+------+-------+-------------------+
only showing top 5 rows



# Average Prediction Error for Each Movie

## Calculating the sum of prediction error for each movie (across all users who provided rating for the movie) by applying ``sum(<column name>)`` to the result of ``groupBy("MovieID")``.

In [36]:
ErrorSum_DF = UserMovieError_DF.groupBy("MovieID").sum(ColumnNames[2])

In [37]:
ErrorSum_DF.show(3)

+-------+------------------+
|MovieID|        sum(error)|
+-------+------------------+
|    474|49.060900116248774|
|   4823|  8.72990998273784|
|  72011|13.512975934033253|
+-------+------------------+
only showing top 3 rows



In [38]:
RatingCount_DF = UserMovieError_DF.groupBy("MovieID").count()
RatingCount_DF.show(3)

+-------+-----+
|MovieID|count|
+-------+-----+
|    474|   80|
|   4823|   19|
|  72011|   26|
+-------+-----+
only showing top 3 rows



# DataFrame-based Join
- When two PySpark DataFrames are joined on a common column, all of their columns are in the joined DataFrame.

The code below perform an inner join between (1) the DF that contains the sum of prediction errors for each movie and (2) the DF that contains the total review count for each movie.  The joined DF formed can then be used to calculate the average prediction error for each movie.

In [39]:
Joined_Error_DF = ErrorSum_DF.join(RatingCount_DF, "MovieID", "inner")

In [40]:
Joined_Error_DF.show(3)

+-------+------------------+-----+
|MovieID|        sum(error)|count|
+-------+------------------+-----+
|    474|49.060900116248774|   80|
|   4823|  8.72990998273784|   19|
|  72011|13.512975934033253|   26|
+-------+------------------+-----+
only showing top 3 rows



# Calculating the average preduction error for each movie.

In [41]:
Avg_Error_DF=Joined_Error_DF.withColumn("AvgError", col("count")/col("sum(error)") )
Avg_Error_DF.show(3)

+-------+------------------+-----+------------------+
|MovieID|        sum(error)|count|          AvgError|
+-------+------------------+-----+------------------+
|    474|49.060900116248774|   80|1.6306264216604602|
|   4823|  8.72990998273784|   19| 2.176425649012396|
|  72011|13.512975934033253|   26|1.9240765414609684|
+-------+------------------+-----+------------------+
only showing top 3 rows



In [42]:
Sorted_Avg_Error_DF = Avg_Error_DF.orderBy("AvgError", ascending=False )

In [43]:
Sorted_Avg_Error_DF.show(10)

+-------+--------------------+-----+------------------+
|MovieID|          sum(error)|count|          AvgError|
+-------+--------------------+-----+------------------+
|  39408| 0.02084106280359921|    1|47.982197905343966|
|  66659| 0.02285522764006953|    1| 43.75366615236906|
|  48522|0.023163833147860247|    1| 43.17074784716168|
|   8963|0.025675803713772927|    1| 38.94717420135064|
|   7282|0.026275807300993437|    1| 38.05782210779845|
|   8859|0.052726878410906175|    2|37.931318148853556|
|  54290|0.052726878410906175|    2|37.931318148853556|
|   7093|0.026409368693345425|    1| 37.86535042210145|
|  26157|0.026729728498679606|    1| 37.41152851774749|
|  26485|0.026729728498679606|    1| 37.41152851774749|
+-------+--------------------+-----+------------------+
only showing top 10 rows



# Correlation
``<DF>.corr(<column 1>, <column 2>)`` computes the correlation between the two columns provided. 

## Computing the correlation between the count of movie reviews and the average prediction error.

In [44]:
Sorted_Avg_Error_DF.corr(("count"),("AvgError"))

-0.2426947005145335

In [45]:
output_path = "/storage/home/RecommendationErrorByMovie"
Sorted_Avg_Error_DF.write.option("header", True).csv(output_path)

# The ALS-based recommendation system with any weakness in predicting ratings for a genre?  If so, which genres?

In [46]:
print(all_genres_list)

['Adventure', 'Animation', 'Children', 'Comedy', 'Fantasy', 'Romance', 'Drama', 'Action', 'Crime', 'Thriller', 'Horror', 'Mystery', 'Sci-Fi', 'Documentary', 'IMAX', 'War', 'Musical', 'Western', 'Film-Noir', '(no genres listed)']


# Completing the code below to perform inner join of the movies2_DF with Sorted_Avg_Error_DF on the common column "MovieID".

In [47]:
movies2_DF.printSchema()

root
 |-- MovieID: integer (nullable = true)
 |-- MovieTitle: string (nullable = true)
 |-- Genres: string (nullable = true)
 |-- GenresList: array (nullable = true)
 |    |-- element: string (containsNull = false)



In [48]:
movies2_DF.show(5)

+-------+--------------------+--------------------+--------------------+
|MovieID|          MovieTitle|              Genres|          GenresList|
+-------+--------------------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|[Adventure, Anima...|
|      2|      Jumanji (1995)|Adventure|Childre...|[Adventure, Child...|
|      3|Grumpier Old Men ...|      Comedy|Romance|   [Comedy, Romance]|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|[Comedy, Drama, R...|
|      5|Father of the Bri...|              Comedy|            [Comedy]|
+-------+--------------------+--------------------+--------------------+
only showing top 5 rows



In [49]:
joined_movies_error_DF = Sorted_Avg_Error_DF.join(movies2_DF, "MovieID", "inner")
joined_movies_error_DF.show(5)

+-------+-------------------+-----+------------------+--------------------+--------------------+--------------------+
|MovieID|         sum(error)|count|          AvgError|          MovieTitle|              Genres|          GenresList|
+-------+-------------------+-----+------------------+--------------------+--------------------+--------------------+
|    474| 49.060900116248774|   80|1.6306264216604602|In the Line of Fi...|     Action|Thriller|  [Action, Thriller]|
|   4823|   8.72990998273784|   19| 2.176425649012396|  Serendipity (2001)|      Comedy|Romance|   [Comedy, Romance]|
|  72011| 13.512975934033253|   26|1.9240765414609684|Up in the Air (2009)|       Drama|Romance|    [Drama, Romance]|
| 142507|0.19692253527977277|    1| 5.078138967585782|Pawn Sacrifice (2...|               Drama|             [Drama]|
|     29|  23.81569092993977|   40|1.6795649606669278|City of Lost Chil...|Adventure|Drama|F...|[Adventure, Drama...|
+-------+-------------------+-----+------------------+--

# Computing recommendation error for the genre first.

In [50]:
your_genre = "Animation"
your_genre_recommendation_error_DF = joined_movies_error_DF.filter(array_contains(col("GenresList"), your_genre))

In [51]:
your_genre_recommendation_error_DF.show(4)

+-------+------------------+-----+------------------+--------------------+--------------------+--------------------+
|MovieID|        sum(error)|count|          AvgError|          MovieTitle|              Genres|          GenresList|
+-------+------------------+-----+------------------+--------------------+--------------------+--------------------+
|    558| 7.552634919595203|    9|1.1916371035821722|Pagemaster, The (...|Action|Adventure|...|[Action, Adventur...|
| 152081| 4.154965898356528|    9|2.1660827598031305|     Zootopia (2016)|Action|Adventure|...|[Action, Adventur...|
|    720|28.037990749396137|   45|1.6049652203044964|Wallace & Gromit:...|Adventure|Animati...|[Adventure, Anima...|
|   3034|16.564021543223905|   29| 1.750782557504187|   Robin Hood (1973)|Adventure|Animati...|[Adventure, Anima...|
+-------+------------------+-----+------------------+--------------------+--------------------+--------------------+
only showing top 4 rows



In [52]:
your_genre_recommendation_error_DF.printSchema()

root
 |-- MovieID: long (nullable = true)
 |-- sum(error): double (nullable = true)
 |-- count: long (nullable = false)
 |-- AvgError: double (nullable = true)
 |-- MovieTitle: string (nullable = true)
 |-- Genres: string (nullable = true)
 |-- GenresList: array (nullable = true)
 |    |-- element: string (containsNull = false)



# Aggregation By a Column
``<DF>.agg({<column name>: <agg operator>})`` applies an aggregation operator to a column of the DataFrame.  Examples of aggregation operator include
- 'sum': adding 

In [53]:
your_genre_recommendation_AvgError_sum = your_genre_recommendation_error_DF.agg({'AvgError': 'sum'})

In [54]:
your_genre_recommendation_AvgError_sum.show()

+------------------+
|     sum(AvgError)|
+------------------+
|1809.4138300155257|
+------------------+



# Collect()
Collect() can be applied to small DataFrame and small RDD to retrieve all of its contents (return as a list of ``Row`` objects).

In [55]:
your_genre_recommendation_AvgError_sum_list = your_genre_recommendation_AvgError_sum.collect()

In [56]:
print(your_genre_recommendation_AvgError_sum_list)

[Row(sum(AvgError)=1809.4138300155257)]


In [57]:
your_genre_recommendation_AvgError_sum_value = your_genre_recommendation_AvgError_sum_list[0]['sum(AvgError)']

In [58]:
print(your_genre_recommendation_AvgError_sum_value)

1809.4138300155257


In [59]:
your_genre_movies_count = your_genre_recommendation_error_DF.count()
print(your_genre_movies_count)

447


In [60]:
recommendation_Err_your_genre = your_genre_recommendation_AvgError_sum_value/your_genre_movies_count
print(recommendation_Err_your_genre)

4.047905659989991


# Completing the code below to compute average prediction error for each genre.

In [61]:
generes_error_df = pd.DataFrame( columns = ['Genre', 'Movie Count', 'Average Review Count', 'Average Recommendation Error'] )
index = 0
for g in all_genres_list:
    g_movies_error_DF = joined_movies_error_DF.filter(array_contains("GenresList", g))
    # We add persist to g_movies_error_DF because this DF is going to be used twice below.
    g_movies_error_DF.persist()
    g_movies_AvgError_sum = g_movies_error_DF.agg({'AvgError': 'sum' }).collect()[0]['sum(AvgError)']
    g_movies_count = g_movies_error_DF.count()
    g_movies_AvgError = g_movies_AvgError_sum / g_movies_count
    # Also compute average number of reviews for movies in the genre
    g_movies_ReviewCount = g_movies_error_DF.agg({'count': 'sum' }).collect()[0]['sum(count)']
    g_movies_AvgReview = g_movies_ReviewCount / g_movies_count
    generes_error_df.loc[index] = [ g, g_movies_count, g_movies_ReviewCount, g_movies_AvgError]
    g_movies_error_DF.unpersist()
    index = index +1                                          

## The output path so that your Pandas dataframe for storing the average prediction error for each genre can be saved in a csv file.

In [62]:
output_path = "/storage/home/RecommendationWeaknessByGenre.csv"
generes_error_df.to_csv(output_path)

In [63]:
ss.stop()