# 1. Movies Dataset

In this notebook, we'll explore the [MovieLens dataset](https://www.kaggle.com/datasets/grouplens/movielens-20m-dataset?select=tag.csv), which contains information about movies and their respective ratings, from 1995 to 2015.

Let's initialize some variables and import some necessary packages for our exercise.

### 1.1 Import packages

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, avg, count, explode, split, year
import seaborn as sns
import matplotlib.pyplot as plt

### 1.2 Create Spark session

New spark session will be created, with the necessary Delta libraries.

In [None]:
# Initialize Spark session
spark = SparkSession.builder.appName("Movies") \
    .config("spark.jars.packages", "io.delta:delta-spark_2.12:3.2.0") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()

### 1.3 Notebook variables

Variables pointing to the raw CSV dataset used in this notebook - make sure to adjust them if the paths below are wrong.

In [None]:
RATINGS_DATASET_PATH = "./rating.csv"
MOVIE_DATASET_PATH = "./movie.csv"

# 2. Working with the dataset

We'll create the DataFrames to manipulate the necessary information contained in the raw dataset. They consist of:

- **Movies Dataset**: contains movie information, with columns `movieId`, `title` and `genres`.
- **Ratings Dataset**: contains ratings of movies by users, with columns `userId`, `movieId`, `rating`, `timestamp`. 

### 2.1 Creating the DataFrames

In [None]:
# Load ratings dataset
ratingsDf = spark.read.csv(RATINGS_DATASET_PATH, header=True, inferSchema=True)
# Load movies dataset
moviesDf = spark.read.csv(MOVIE_DATASET_PATH, header=True, inferSchema=True)

In [None]:
# Check the first 10 rows in each DataFrame
ratingsDf.show(10)
moviesDf.show(10)

## 2.1 Most popular movies

Calculate the movies with the highest average rating. We discard movies with less than 100 ratings to avoid considerance movies with less significance in terms of reviews.

In [None]:
# Compute average rating and number of ratings for each movie
popularMoviesDf = ratingsDf.groupBy("movieId") \
    .agg(avg("rating").alias("avg_rating"), count("rating").alias("num_ratings")) \
    .filter(col("num_ratings") > 100) \
    .join(movies_df, on="movieId") \
    .orderBy(col("avg_rating").desc())

# Show the top 10 best rated movies
popularMoviesDf.show(10)

## 2.2 Rating by genre

We'll conduct an analysis of movies by genre, and (graphically) answer questions like:

- What are the best rated genres?
- What's the distribution of ratings amongst the best movie categories?
- How have the movie genres evolved in the eye of the public through the years?

The first manipulation we'll do is extract the individual genres of each movie. As you may have noticed, each movie can have more than one genre - they're separated by the "|" character in the `genre` column.

In [None]:
# Split genres into multiple rows. Each movie in moviesDf can have more than one genre, separated by "|".
# We'll split each 
genreDf = moviesDf.withColumn("genre", explode(split(col("genres"), "\\|")))  # Split genres by "|"

# Join with ratings dataset, so we have the user rating info.
genreRatingsDf = ratingsDf.join(genreDf, on="movieId")

# Compute average rating per genre
avgGenreRatingsDf = genreRatingsDf.groupBy("genre").agg(avg("rating").alias("avg_rating"))

# Show the top three best rated genres
avgGenreRatingsDf.orderBy(col("avg_rating").desc()).show(3)

We'll store the top 3 genres in the Python list `top_3_genres`.

In [None]:
# Store the top 3 most well rated movies
top3Genres = [row["genre"] for row in avgGenreRatingsDf.orderBy(col("avg_rating").desc()).select("genre").limit(3).collect()]

print(f"The top-3 best rated genres are: {top3Genres}")

Next, we'll create a new DataFrame containing the ratings of the top 3 best rated genres only.

In [None]:
top3GenresMovieRatingsDf = genreRatingsDf.filter(col("genre").isin(top3Genres))

print(f"There are {top3GenresMovieRatingsDf.count()} ratings for the top 3 best genres.")

# Show first 10 user ratings of movies that relate to the top 3 genres.
top3GenresMovieRatingsDf.show(10)

### Plotting reviews and distributions of the top 3 genres by year

We'll now make use of the plotting libraries `matplotlib`, `seaborn` and the data analysis library `pandas` to visually verify the evolution of ratings amongst the top three best movie categories.

First, we convert the Spark DataFrame `top3GenresMovieRatingsDf` to a pandas DataFrame - although the name is identical, they're different classes. It so happens that the pandas DataFrame fits better with the plotting functions of `seaborn` and `matplotlib` that we'll use. In fact, this operation is so often executed that PySpark offers a native function [toPandas()](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.toPandas.html) to convert a PySpark DataFrame to a pandas one.

> **WARNING** the command below may take a minute to conclude, since converting to a Pandas DataFrame require a complete iteration through the original Spark DataFrame

In [None]:
top3GenresMovieRatingsPd = top3GenresMovieRatingsDf.toPandas()

With the pandas DataFrame at hand, we'll give a shot at checking the statistical distribution of the top-tier categories movie ratings.

We'll use the seaborn boxplot function to obtain a visual representation of means, percentiles and outliers.

In [None]:
plt.figure(figsize=(12, 6))
sns.boxplot(data=top3GenresMovieRatingsPd, x="rating", y="genre")
plt.title("Distribution of Movie Ratings for Top 3 Most Popular Genres", fontsize=16)
plt.xlabel("Rating", fontsize=12)
plt.ylabel("Genre", fontsize=12)
plt.grid(axis='y', linestyle='--', alpha=0.7)
plt.show()

That wasn't so visually appealing, right? We clearly see that all of the top 3 genres have a mean rating of 4, and slightly similar upper quartiles (75th percentiles).

Let's try to plot more insightful observations - such as how the ratings evolved by year for the top 3 genres.

We'll start by deriving the year from the timestamp of the rating - PySpark function `year` helps us with that. Then, we'll group the resulting PySpark DataFrame by year and genre, so that we can compute the yearly information - number of ratings and average rating - by group.

In [None]:
# Create a new column "year" from the existing "timestamp" column.
movieReviewsTop3Df = top3GenresMovieRatingsDf.withColumn("year", year("timestamp"))

# Compute average rating and total ratings per genre and year
top3GenresMovieReviewsByGenreAndYear = movieReviewsTop3Df.groupBy("year", "genre") \
    .agg(
        avg("rating").alias("avg_rating"),
        count("rating").alias("num_ratings")
    ) \
    .orderBy("year", "genre")

We're now ready to plot the information. We'll use seaborn's [scatterplot function](https://seaborn.pydata.org/generated/seaborn.scatterplot.html#seaborn-scatterplot) to construct a scatter plot of movie ratings per year.

Again, we'll convert our PySpark DataFrame `movie_reviews_by_genre_and_year` to a pandas DataFrame. Then, we'll use the function `show_scatterplot` that takes in a pandas DataFrame with the necessary columns and creates a scatter plot for the information

In [None]:
def show_scatterplot(data, plot_title=""):
    # Normalize the bubble size for better visualization
    max_size = 700  # Maximum size for bubbles
    min_size = 200  # Minimum size for bubbles

    # Add a "size" dimension to the pandas DataFrame, for aiding the scatterplot function.
    data["size"] = ((data["num_ratings"] / data["num_ratings"].max()) * (max_size - min_size) + min_size)
    
    # Plot the scatterplot
    plt.figure(figsize=(16, 8))
    sns.scatterplot(
        data=data,
        x="year",
        y="avg_rating",
        hue="genre",
        size="size",
        sizes=(min_size, max_size),  # Minimum and maximum bubble size
        palette="tab10",
        alpha=0.7
    )
    
    # Set x-axis ticks to be every unique year
    unique_years = sorted(data["year"].unique())
    plt.xticks(unique_years, rotation=45)  # Rotate labels if needed for better readability
    
    # Add labels and title
    plt.title(plot_title, fontsize=16)
    plt.xlabel("Year", fontsize=12)
    plt.ylabel("Average Rating", fontsize=12)
    plt.legend(bbox_to_anchor=(1.05, 1), loc='upper left', title="Genre")
    plt.grid(alpha=0.3)
    plt.tight_layout()
    plt.show()

In [None]:
# Invoke show_scatterplot function with the top-3 best rated genres information
show_scatterplot(top3GenresMovieReviewsByGenreAndYear.toPandas(), "Rating trend for the top-3 best rated movie genres")

## Challenge!

Try plotting the same graph, but now for the _bottom 3 worst rated_ movie genres - how does it compare with the evolution of the top 3 genres?