In [3]:
import pandas as pd
import os
import json
import re
import requests
from pathlib import Path

from tqdm import tqdm
from unidecode import unidecode
# import pyspark
import pyspark.sql.functions as F
from pyspark.sql import SparkSession, Row
from pyspark.sql.types import StructType, StructField, ArrayType, StringType, IntegerType, FloatType
from pyspark.sql.functions import col, when, udf, regexp_replace, lower, trim, lit, coalesce, array, concat_ws, concat, split

from pyspark.ml import Pipeline

In [1]:
!git clone https://github.com/meralegre/Big_Data_IMDb.git
%cd Big_Data_IMDb/

# add the csv files here otherwise the code wont work, cannot push it to GitHub
# sadly, too large
# !mkdir movie_reviews
%pwd

Cloning into 'Big_Data_IMDb'...
remote: Enumerating objects: 135, done.[K
remote: Counting objects: 100% (135/135), done.[K
remote: Compressing objects: 100% (122/122), done.[K
remote: Total 135 (delta 54), reused 65 (delta 11), pack-reused 0 (from 0)[K
Receiving objects: 100% (135/135), 24.35 MiB | 8.26 MiB/s, done.
Resolving deltas: 100% (54/54), done.
Updating files: 100% (32/32), done.
/content/Big_Data_IMDb


'/content/Big_Data_IMDb'

### Load data with Spark

In [4]:
spark = SparkSession.builder \
    .master("local") \
    .config("spark.driver.bindAddress","127.0.0.1") \
    .getOrCreate()

In [5]:
def load_train_data():
    path = "data/train/"
    train_files = os.listdir(path=path)

    train_df = pd.DataFrame()
    for file in train_files:
        if file.startswith('train-') and file.endswith('.csv'):
            df = pd.read_csv(f"{path}/{file}")
            train_df = pd.concat([train_df, df], ignore_index=False)
            train_df = train_df.drop(columns=["Unnamed: 0", "runtimeMinutes"])

    # train_df = train_df.sort_index()
    spark_train_df = spark.createDataFrame(train_df).replace(to_replace='\\N', value=None)
    spark_train_df = spark_train_df.withColumnRenamed("primaryTitle", "movie_title")
    return spark_train_df

In [6]:
def load_validation_data():
    path = "data/"
    validation_df = pd.read_csv(f"{path}/validation_hidden.csv", index_col=[0])
    # validation_df = validation_df.sort_index()
    validation_df = validation_df.drop(columns="runtimeMinutes")
    spark_validation_df = spark.createDataFrame(validation_df).replace(to_replace='\\N', value=None)
    spark_validation_df = spark_validation_df.withColumnRenamed("primaryTitle", "movie_title")
    return spark_validation_df

def load_test_data():
    path = "data/"
    test_df = pd.read_csv(f"{path}/test_hidden.csv", index_col=[0])
    # test_df = test_df.sort_index()
    test_df = test_df.drop(columns="runtimeMinutes")
    spark_test_df = spark.createDataFrame(test_df).replace(to_replace='\\N', value=None)
    spark_test_df = spark_test_df.withColumnRenamed("primaryTitle", "movie_title")
    return spark_test_df


In [7]:
train_data = load_train_data()
train_data.show()

+---------+--------------------+----------------+---------+-------+--------+-----+
|   tconst|         movie_title|   originalTitle|startYear|endYear|numVotes|label|
+---------+--------------------+----------------+---------+-------+--------+-----+
|tt0014109|The Saga of Gösta...|             NaN|     1924|   NULL|  1231.0| true|
|tt0015064|      The Last Laugh| Der letzte Mann|     1924|   NULL|     NaN| true|
|tt0015841|        The Freshman|    The Freshman|     1925|   NULL|  5374.0| true|
|tt0017271|          By the Law|             NaN|     NULL|   1926|  1057.0| true|
|tt0018451|The Student Princ...|             NaN|     1927|   NULL|  1459.0| true|
|tt0018742|       The Cameraman|   The Cameraman|     1928|   NULL| 11388.0| true|
|tt0019379|         Show People|             NaN|     1928|   NULL|  3695.0| true|
|tt0020018|      In Old Arizona|             NaN|     1928|   NULL|  1049.0|false|
|tt0020793|Escape from Dartmoor|             NaN|     1929|   NULL|  1102.0| true|
|tt0

In [8]:
validation_data = load_validation_data()
validation_data.show()
validation_data.count()

+---------+--------------------+-------------------+---------+-------+--------+
|   tconst|         movie_title|      originalTitle|startYear|endYear|numVotes|
+---------+--------------------+-------------------+---------+-------+--------+
|tt0003740|             Cabiria|                NaN|     1914|   NULL|  3452.0|
|tt0008663|     A Man There Was|        Terje Vigen|     1917|   NULL|  1882.0|
|tt0010307|           J'accuse!|                NaN|     1919|   NULL|  1692.0|
|tt0014429|        Safety Last!|       Safety Last!|     1923|   NULL| 19898.0|
|tt0015175|Die Nibelungen: S...|                NaN|     1924|   NULL|  5676.0|
|tt0016332|       Seven Chances|                NaN|     1925|   NULL|  9914.0|
|tt0018737|       Pandora's Box|                NaN|     NULL|   1929| 10475.0|
|tt0018839|The Docks of New ...|                NaN|     1928|   NULL|  4339.0|
|tt0019421| Steamboat Bill, Jr.|Steamboat Bill, Jr.|     1928|   NULL| 14166.0|
|tt0019901|   Woman in the Moon|        

955

In [9]:
test_data = load_test_data()
test_data.show()
test_data.count()

+---------+--------------------+-------------------+---------+-------+--------+
|   tconst|         movie_title|      originalTitle|startYear|endYear|numVotes|
+---------+--------------------+-------------------+---------+-------+--------+
|tt0014972| He Who Gets Slapped|He Who Gets Slapped|     1924|   NULL|  3654.0|
|tt0015016|      The Iron Horse|                NaN|     1924|   NULL|  2136.0|
|tt0015174|Die Nibelungen: K...|                NaN|     1924|   NULL|  4341.0|
|tt0015214|             At 3:25|                NaN|     NULL|   1925|  1724.0|
|tt0015863|             Go West|                NaN|     1925|   NULL|  4188.0|
|tt0016481|             Variety|            Varieté|     1925|   NULL|  1188.0|
|tt0017136|          Metropolis|                NaN|     1927|   NULL|168372.0|
|tt0018876|   The Farmer's Wife|                NaN|     1928|   NULL|  2741.0|
|tt0019074| Laugh, Clown, Laugh|Laugh, Clown, Laugh|     1928|   NULL|  1934.0|
|tt0021730|           The Champ|        

1086

### Cleaning

In [13]:
# def handle_years():
#     """
#     Creates a 'year' column using 'startYear' if available, otherwise 'endYear'.
#     Drops 'startYear' and 'endYear' after merging.
#     """
#     spark_train_df = spark_train_df.withColumn("year", when(col("startYear").isNotNull(), col("startYear"))
#                                   .otherwise(col("endYear")))

#     spark_validation_df = spark_validation_df.withColumn("year", when(col("startYear").isNotNull(), col("startYear"))
#                                   .otherwise(col("endYear")))

#     spark_test_df = spark_test_df.withColumn("year", when(col("startYear").isNotNull(), col("startYear"))
#                                   .otherwise(col("endYear")))

#     # drop original startYear and endYear
#     spark_train_df = spark_train_df.drop("startYear", "endYear")
#     spark_validation_df = spark_validation_df.drop("startYear", "endYear")
#     spark_test_df = spark_test_df.drop("startYear", "endYear")

#     return spark_train_df, spark_validation_df, spark_test_df

def handle_years(df):
    """
    Creates a 'year' column using 'startYear' if available, otherwise 'endYear'.
    Drops 'startYear' and 'endYear' after merging.
    """
    df = df.withColumn("year", when(col("startYear").isNotNull(), col("startYear"))
                                  .otherwise(col("endYear")))

    # drop original startYear and endYear
    df = df.drop("startYear", "endYear")

    return df

In [None]:
spark_train_df = load_train_data()
spark_train_df = handle_years(spark_train_df)
spark_train_df.show()

+---------+--------------------+--------------------+--------+-----+----+
|   tconst|         movie_title|       originalTitle|numVotes|label|year|
+---------+--------------------+--------------------+--------+-----+----+
|tt0009369|              Mickey|              Mickey|  1119.0|false|1918|
|tt0014142|The Hunchback of ...|                 NaN|  5288.0| true|1923|
|tt0014945|            Girl Shy|            Girl Shy|  3327.0| true|1924|
|tt0017048|   A Page of Madness|                 NaN|  3357.0| true|1926|
|tt0017350|  The Scarlet Letter|                 NaN|  1768.0| true|1926|
|tt0017961|           Happiness|                 NaN|  1080.0| true|1935|
|tt0018054|   Thé King ớf Kings|   The King of Kings|  2081.0| true|1927|
|tt0018578|               Wings|                 NaN|     NaN| true|1927|
|tt0019429|        Street Angel|        Street Angel|  2314.0| true|1928|
|tt0020768|           City Girl|                 NaN|  3199.0| true|1930|
|tt0022599|   À Nous la Liberté|   À n

In [10]:
# converts special characters to ASCII
def normalize_text(text):
    if text is None:
        return None
    return unidecode(text)

normalize_text_udf = udf(normalize_text, StringType())

def clean_titles(df):
    df = df.withColumn("movie_title", normalize_text_udf(col("movie_title")))
    df = df.withColumn("originalTitle", normalize_text_udf(col("originalTitle")))

    # maybe we can drop the original name since there are some NaN values present
    # and we already have the clean primaryTitle column
    df = df.drop("originalTitle")
    return df

In [None]:
spark_train_df = clean_titles(spark_train_df)
spark_train_df.show()

+---------+--------------------+--------+-----+----+
|   tconst|         movie_title|numVotes|label|year|
+---------+--------------------+--------+-----+----+
|tt0009369|              Mickey|  1119.0|false|1918|
|tt0014142|The Hunchback of ...|  5288.0| true|1923|
|tt0014945|            Girl Shy|  3327.0| true|1924|
|tt0017048|   A Page of Madness|  3357.0| true|1926|
|tt0017350|  The Scarlet Letter|  1768.0| true|1926|
|tt0017961|           Happiness|  1080.0| true|1935|
|tt0018054|   The King of Kings|  2081.0| true|1927|
|tt0018578|               Wings|     NaN| true|1927|
|tt0019429|        Street Angel|  2314.0| true|1928|
|tt0020768|           City Girl|  3199.0| true|1930|
|tt0022599|   A Nous la Liberte|  4392.0| true|1931|
|tt0027075|A Tale of Two Cities|  5596.0| true|1935|
|tt0027441|Charlie Chan at t...|  1461.0| true|1936|
|tt0028575|               Angel|  2698.0| true|1937|
|tt0031002|  The Young in Heart|  1332.0| true|1938|
|tt0031022|The Adventures of...|  6616.0| true

In [None]:
# number of rows of training data after concatenating everything together
spark_train_df.count()

7959

### Merge with movie_reviews data

In [None]:
def load_reviews_data():
    path = "movie_reviews"
    reviews_df = pd.read_csv(f"{path}/final_movie_reviews.csv")
    # reviews_df = reviews_df.sort_index()
    spark_reviews_df = spark.createDataFrame(reviews_df).replace(to_replace='\\N', value=None)
    return spark_reviews_df

In [11]:
spark_reviews_df = spark.read \
    .option("header", True) \
    .option("mode", "DROPMALFORMED") \
    .option("inferSchema", True) \
    .csv("final_movie_reviews.csv")


# Show DataFrame
spark_reviews_df.show(truncate=False)

+--------------------------------------------------+------+------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------------+------------------+------------------+------------------+---------------+---------------+------------+------------+----------+
|movie_title                                       |year  |genre                                                       |reviews                                                                                                                                                                                                                                    

In [None]:
spark_reviews_df.count()

825410

In [15]:
def convert_columns_to_int(df, column_name):
    """
    converts a float-based year column to an integer
    """
    df = df.withColumn(column_name, col(column_name).cast(IntegerType()))
    return df

# spark_reviews_df = convert_columns_to_int(spark_reviews_df, "year")
# spark_reviews_df = convert_columns_to_int(spark_reviews_df, "tomatometer_rating")
# spark_reviews_df = convert_columns_to_int(spark_reviews_df, "audience_rating")

In [None]:
spark_reviews_df = spark_reviews_df.withColumnRenamed("Like count", "like_count")
spark_reviews_df = spark_reviews_df.withColumnRenamed("reviews", "review_content")
spark_reviews_df.show()

+--------------------+----+--------------------+--------------------+--------------+------------------+------------------+------------------+---------------+---------------+------------+------------+----------+
|         movie_title|year|               genre|      review_content|content_rating|production_company|tomatometer_status|tomatometer_rating|audience_status|audience_rating|review_score|review_label|like_count|
+--------------------+----+--------------------+--------------------+--------------+------------------+------------------+------------------+---------------+---------------+------------+------------+----------+
|Percy Jackson & t...|1970|Action & Adventur...|Whether audiences...|            PG|  20th Century Fox|            Rotten|                49|        Spilled|             53|       3.5/5|       Fresh|      NULL|
|Percy Jackson & t...|1970|Action & Adventur...|Harry Potter knoc...|            PG|  20th Century Fox|            Rotten|                49|        Spilled

### Time to merge

For now we will use a LEFT join on the training data to avoid having null values on the tconst identifier column. Maybe later we can scrape the tconst of the movies missing and have bigger data

In [None]:
def join_training_with_reviews(df_movies, df_reviews):
    """
    joins training data with reviews based on `movie_title` and `year`.

    Parameters:
    - df_movies: PySpark DataFrame containing training movie metadata.
    - df_reviews: PySpark DataFrame containing reviews.

    Returns:
    - Merged PySpark DataFrame with movie metadata + reviews.
    """

    # Standardize `movie_title` (trim + lowercase) for better matching
    df_movies = df_movies.withColumn("movie_title", trim(lower(col("movie_title"))))
    df_reviews = df_reviews.withColumn("movie_title", trim(lower(col("movie_title"))))

    # Ensure `year` is cast as an integer
    df_movies = df_movies.withColumn("year", col("year").cast("int"))
    df_reviews = df_reviews.withColumn("year", col("year").cast("int"))

    # Perform a LEFT JOIN on `movie_title` and `year`
    df_merged = df_movies.join(df_reviews, on=["movie_title", "year"], how="left")

    # Select relevant columns
    # selected_columns = [
    #     "tconst", "movie_title", "numVotes", "year", "label",
    #     "tomatometer_status", "review_type", "genre",
    #     "review_label", "review_content"
    # ]

    # df_final = df_merged.select(*selected_columns)

    # # Fill missing reviews with None and "Unknown"
    # df_final = df_final.fillna({"review_content": None, "review_label": "Unknown"})

    return df_merged

In [None]:
join_training_with_reviews(spark_train_df, spark_reviews_df).show(40)
final_df = join_training_with_reviews(spark_train_df, spark_reviews_df)

+--------------------+----+---------+--------+-----+-----+--------------+--------------+------------------+------------------+------------------+---------------+---------------+------------+------------+----------+
|         movie_title|year|   tconst|numVotes|label|genre|review_content|content_rating|production_company|tomatometer_status|tomatometer_rating|audience_status|audience_rating|review_score|review_label|like_count|
+--------------------+----+---------+--------+-----+-----+--------------+--------------+------------------+------------------+------------------+---------------+---------------+------------+------------+----------+
|   a nous la liberte|1931|tt0022599|  4392.0| true| NULL|          NULL|          NULL|              NULL|              NULL|              NULL|           NULL|           NULL|        NULL|        NULL|      NULL|
|a tale of two cities|1935|tt0027075|  5596.0| true| NULL|          NULL|          NULL|              NULL|              NULL|              

In [None]:
final_df = convert_columns_to_int(final_df, "numVotes")
final_df.show()

+--------------------+----+---------+--------+-----+-----+--------------+--------------+------------------+------------------+------------------+---------------+---------------+------------+------------+----------+
|         movie_title|year|   tconst|numVotes|label|genre|review_content|content_rating|production_company|tomatometer_status|tomatometer_rating|audience_status|audience_rating|review_score|review_label|like_count|
+--------------------+----+---------+--------+-----+-----+--------------+--------------+------------------+------------------+------------------+---------------+---------------+------------+------------+----------+
|   a nous la liberte|1931|tt0022599|    4392| true| NULL|          NULL|          NULL|              NULL|              NULL|              NULL|           NULL|           NULL|        NULL|        NULL|      NULL|
|a tale of two cities|1935|tt0027075|    5596| true| NULL|          NULL|          NULL|              NULL|              NULL|              

In [None]:
final_df.count()

8294

In [48]:
def clean_labels(df):
    """
    Creates a 'year' column using 'startYear' if available, otherwise 'endYear'.
    Drops 'startYear' and 'endYear' after merging.
    """
    # # convert boolean column `label` to integer
    # df = df.withColumn("label_int", when(col("label") == True, 1)
    #                                   .when(col("label") == False, 0)
    #                                   .otherwise(None))

    # convert categorical `review_label` to numerical
    df = df.withColumn("tomatoes_label", when(col("review_label") == "Fresh", 1)
                                          .when(col("review_label") == "Rotten", 0)
                                          .otherwise(None))

    df = df.withColumn("tomatometer_status", when(col("tomatometer_status") == "Fresh", 1)
                                          .when(col("tomatometer_status") == "Rotten", 0)
                                          .otherwise(None))

    df = df.withColumn("audience_status", when(col("audience_status") == "Upright", 1)
                                          .when(col("audience_status") == "Spilled", 0)
                                          .otherwise(None))

    # Drop unnecessary intermediate columns
    #df = df.drop("review_label")

    df.withColumnRenamed("label_int", "label")

    return df

In [None]:
clean_labels(final_df).show()
final_df = clean_labels(final_df)

+--------------------+----+---------+--------+-----+-----+--------------+--------------+------------------+------------------+------------------+---------------+---------------+------------+----------+---------+--------------+
|         movie_title|year|   tconst|numVotes|label|genre|review_content|content_rating|production_company|tomatometer_status|tomatometer_rating|audience_status|audience_rating|review_score|like_count|label_int|tomatoes_label|
+--------------------+----+---------+--------+-----+-----+--------------+--------------+------------------+------------------+------------------+---------------+---------------+------------+----------+---------+--------------+
|   a nous la liberte|1931|tt0022599|    4392| true| NULL|          NULL|          NULL|              NULL|              NULL|              NULL|           NULL|           NULL|        NULL|      NULL|        1|          NULL|
|a tale of two cities|1935|tt0027075|    5596| true| NULL|          NULL|          NULL|    

In [None]:
df_missing_movies = spark_train_df.join(spark_reviews_df, on=["movie_title", "year"], how="left_anti")
df_missing_movies.count()

7911

# **WEB SCRAPING**

In [20]:
validation_data.show(5)

+---------+--------------------+--------+----+
|   tconst|         movie_title|numVotes|year|
+---------+--------------------+--------+----+
|tt0003740|             Cabiria|  3452.0|1914|
|tt0008663|     A Man There Was|  1882.0|1917|
|tt0010307|           J'accuse!|  1692.0|1919|
|tt0014429|        Safety Last!| 19898.0|1923|
|tt0015175|Die Nibelungen: S...|  5676.0|1924|
+---------+--------------------+--------+----+
only showing top 5 rows



In [21]:
tconst_val_list = validation_data.select("tconst").distinct().rdd.flatMap(lambda x: x).collect()
print(tconst_val_list)

tconst_test_list = test_data.select("tconst").distinct().rdd.flatMap(lambda x: x).collect()
print(tconst_test_list)

['tt0091142', 'tt0817228', 'tt1640548', 'tt1891905', 'tt0378215', 'tt0458449', 'tt11143108', 'tt1462014', 'tt0485513', 'tt11274100', 'tt1453405', 'tt2210441', 'tt2299842', 'tt0092534', 'tt0094929', 'tt0366943', 'tt0085601', 'tt15354498', 'tt0033563', 'tt0034461', 'tt0343818', 'tt1136688', 'tt1807950', 'tt0035169', 'tt0091943', 'tt0418110', 'tt0477273', 'tt0084629', 'tt0366627', 'tt10539608', 'tt3228830', 'tt4843012', 'tt0062671', 'tt0066206', 'tt0091178', 'tt1034311', 'tt1662293', 'tt5092380', 'tt0054020', 'tt0087164', 'tt1302011', 'tt5460658', 'tt0051525', 'tt0113463', 'tt0844457', 'tt1260567', 'tt5566714', 'tt0351795', 'tt2853182', 'tt5775220', 'tt5779540', 'tt7615722', 'tt8297300', 'tt0058126', 'tt0396659', 'tt0477051', 'tt1785288', 'tt0080549', 'tt1186370', 'tt7085842', 'tt0053598', 'tt0058439', 'tt0066564', 'tt0080802', 'tt0118083', 'tt0420966', 'tt0479948', 'tt16298620', 'tt0025699', 'tt0040558', 'tt0359950', 'tt0923752', 'tt2437548', 'tt0043545', 'tt0079579', 'tt0087130', 'tt735

## IBDM Review Scraping

In [22]:
import requests
from bs4 import BeautifulSoup
import pandas as pd
import time
import random

In [23]:
my_url = "https://www.imdb.com/title/tt0073781/reviews/"

In [27]:
def scrape_imdb_reviews(imdb_id):
    """
    Fetches the IMDb reviews page for the given imdb_id
    and returns ONLY the first 5 reviews that actually contain text.
    """
    url = f"https://www.imdb.com/title/{imdb_id}/reviews"
    headers = {"User-Agent": "Mozilla/5.0"}
    # resp = requests.get(url, headers=headers)
    # resp.raise_for_status()

    try:
        resp = requests.get(url, headers=headers, timeout=10)
        resp.raise_for_status()
    except requests.exceptions.RequestException as e:
        print(f"Failed to fetch reviews for {imdb_id}: {e}")
        return []

    soup = BeautifulSoup(resp.text, "html.parser")

    review_cards = soup.find_all("div", {"data-testid": "review-card-parent"})

    reviews_data = []

    for card in review_cards:
        # extract review text
        content_div = card.select_one("div.ipc-html-content-inner-div")
        review_text = content_div.get_text(strip=True) if content_div else ""

        if not review_text:
            continue

        reviews_data.append(review_text)

        if len(reviews_data) == 1:
            break

    return reviews_data

In [28]:
import time

def scrape_reviews_for_imdb_ids(imdb_ids):
    """
    Takes a list of IMDb IDs,
    scrapes the first 4 text reviews for each, then returns a Spark DataFrame with:
        - imdb_id
        - review (each review appears in a separate row)
    """
    rows = []

    for imdb_id in imdb_ids:
        reviews_list = scrape_imdb_reviews(imdb_id)

        for review in reviews_list:
            rows.append((imdb_id, review))

        # time.sleep(1)  # Uncomment if rate limiting is needed

    columns = ["tconst", "review"]
    df = spark.createDataFrame(rows, columns)
    return df

In [29]:
# df_reviews = scrape_reviews_for_imdb_ids(tconst_list)

# df_reviews.show()

In [30]:
df_reviews_val = scrape_reviews_for_imdb_ids(tconst_val_list)
df_reviews_val.show()

df_reviews_test = scrape_reviews_for_imdb_ids(tconst_test_list)
df_reviews_test.show()

Failed to fetch reviews for tt16298620: 404 Client Error: Not Found for url: https://www.imdb.com/title/tt16298620/reviews/
+----------+--------------------+
|    tconst|              review|
+----------+--------------------+
| tt0091142|Gothic, directed ...|
| tt0817228|The American tatt...|
| tt1640548|Greetings again f...|
| tt1891905|As said in my rev...|
| tt0378215|As with most Dani...|
| tt0458449|I have had this D...|
|tt11143108|Very good movie e...|
| tt1462014|This documentary,...|
| tt0485513|I must say if I h...|
|tt11274100|Hamster is a blac...|
| tt1453405|I adored Monsters...|
| tt2210441|When I was a chil...|
| tt2299842|Directed by Kim K...|
| tt0092534|"...makes Plan 9 ...|
| tt0094929|UK title - The Bi...|
| tt0366943|"Reconstruction" ...|
| tt0085601|The pic deals wit...|
| tt0033563|I will never get ...|
| tt0034461|A gang of youngst...|
| tt0343818|Director Alex Pro...|
+----------+--------------------+
only showing top 20 rows

Failed to fetch reviews for tt9008

In [31]:
df_reviews_val.coalesce(1).write.json("validation_reviews.csv", mode='overwrite')
df_reviews_test.coalesce(1).write.json("test_reviews.csv", mode='overwrite')

In [None]:
#df_reviews.coalesce(1).write.json("movie_reviews/imdb_reviews.csv", mode='overwrite')

## CLEAN VAL AND TEST

In [12]:
val_reviews = spark.read.json("validation_review.json")
test_reviews = spark.read.json("test_reviews.json")

In [13]:
val_reviews = validation_data.join(val_reviews, on="tconst", how="left")
test_reviews = test_data.join(test_reviews, on="tconst", how="left")

val_reviews.show()

+---------+--------------------+-------------------+---------+-------+--------+--------------------+
|   tconst|         movie_title|      originalTitle|startYear|endYear|numVotes|              review|
+---------+--------------------+-------------------+---------+-------+--------+--------------------+
|tt0003740|             Cabiria|                NaN|     1914|   NULL|  3452.0|It is a little kn...|
|tt0008663|     A Man There Was|        Terje Vigen|     1917|   NULL|  1882.0|This is a faithfu...|
|tt0010307|           J'accuse!|                NaN|     1919|   NULL|  1692.0|The only attempt ...|
|tt0014429|        Safety Last!|       Safety Last!|     1923|   NULL| 19898.0|One of the best c...|
|tt0015175|Die Nibelungen: S...|                NaN|     1924|   NULL|  5676.0|I'll say this up ...|
|tt0016332|       Seven Chances|                NaN|     1925|   NULL|  9914.0|Contrary to what ...|
|tt0018737|       Pandora's Box|                NaN|     NULL|   1929| 10475.0|Things I lov

In [14]:
def handle_years_val_test(val_reviews, test_reviews):
    """
    Creates a 'year' column using 'startYear' if available, otherwise 'endYear'.
    Drops 'startYear' and 'endYear' after merging.
    """

    val_reviews = val_reviews.withColumn("year", when(col("startYear").isNotNull(), col("startYear"))
                                  .otherwise(col("endYear")))

    test_reviews = test_reviews.withColumn("year", when(col("startYear").isNotNull(), col("startYear"))
                                  .otherwise(col("endYear")))

    # drop original startYear and endYear
    val_reviews = val_reviews.drop("startYear", "endYear")
    test_reviews = test_reviews.drop("startYear", "endYear")

    return val_reviews, test_reviews

In [15]:
val_reviews, test_reviews = handle_years_val_test(val_reviews, test_reviews)
validation_data, test_data = handle_years_val_test(validation_data, test_data)

# val_reviews.show()

In [16]:
val_reviews = clean_titles(val_reviews)
test_reviews = clean_titles(test_reviews)

validation_data = clean_titles(validation_data)
test_data = clean_titles(test_data)

# val_reviews.show()

In [19]:
spark_reviews_df = spark.read \
    .option("header", True) \
    .option("mode", "DROPMALFORMED") \
    .option("inferSchema", True) \
    .csv("final_movie_reviews.csv")


# Show DataFrame
spark_reviews_df.show(truncate=False)

+--------------------------------------------------+------+------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------------+------------------+------------------+------------------+---------------+---------------+------------+------------+----------+
|movie_title                                       |year  |genre                                                       |reviews                                                                                                                                                                                                                                    

In [49]:
spark_reviews_df = spark_reviews_df.drop("reviews", "tomatoes_label")
spark_reviews_df = clean_labels(spark_reviews_df)
spark_reviews_df = spark_reviews_df.withColumnRenamed("Like count", "like_count")
spark_reviews_df.show()

AnalysisException: [UNRESOLVED_COLUMN.WITH_SUGGESTION] A column or function parameter with name `review_label` cannot be resolved. Did you mean one of the following? [`review_score`, `movie_title`, `genre`, `year`, `like_count`].;
'Project [movie_title#231, year#232, genre#233, content_rating#235, production_company#236, tomatometer_status#567, tomatometer_rating#238, audience_status#581, audience_rating#240, review_score#241, like_count#619, CASE WHEN ('review_label = Fresh) THEN 1 WHEN ('review_label = Rotten) THEN 0 ELSE null END AS tomatoes_label#2513]
+- Project [movie_title#231, year#232, genre#233, content_rating#235, production_company#236, tomatometer_status#567, tomatometer_rating#238, audience_status#581, audience_rating#240, review_score#241, like_count#619]
   +- Project [movie_title#231, year#232, genre#233, content_rating#235, production_company#236, tomatometer_status#567, tomatometer_rating#238, audience_status#581, audience_rating#240, review_score#241, Like count#243 AS like_count#619, tomatoes_label#553]
      +- Project [movie_title#231, year#232, genre#233, content_rating#235, production_company#236, tomatometer_status#567, tomatometer_rating#238, audience_status#581, audience_rating#240, review_score#241, Like count#243, tomatoes_label#553]
         +- Project [movie_title#231, year#232, genre#233, content_rating#235, production_company#236, tomatometer_status#567, tomatometer_rating#238, CASE WHEN (audience_status#239 = Upright) THEN 1 WHEN (audience_status#239 = Spilled) THEN 0 ELSE cast(null as int) END AS audience_status#581, audience_rating#240, review_score#241, review_label#242, Like count#243, tomatoes_label#553]
            +- Project [movie_title#231, year#232, genre#233, content_rating#235, production_company#236, CASE WHEN (tomatometer_status#237 = Fresh) THEN 1 WHEN (tomatometer_status#237 = Rotten) THEN 0 ELSE cast(null as int) END AS tomatometer_status#567, tomatometer_rating#238, audience_status#239, audience_rating#240, review_score#241, review_label#242, Like count#243, tomatoes_label#553]
               +- Project [movie_title#231, year#232, genre#233, content_rating#235, production_company#236, tomatometer_status#237, tomatometer_rating#238, audience_status#239, audience_rating#240, review_score#241, review_label#242, Like count#243, CASE WHEN (review_label#242 = Fresh) THEN 1 WHEN (review_label#242 = Rotten) THEN 0 ELSE cast(null as int) END AS tomatoes_label#553]
                  +- Project [movie_title#231, year#232, genre#233, content_rating#235, production_company#236, tomatometer_status#237, tomatometer_rating#238, audience_status#239, audience_rating#240, review_score#241, review_label#242, Like count#243]
                     +- Relation [movie_title#231,year#232,genre#233,reviews#234,content_rating#235,production_company#236,tomatometer_status#237,tomatometer_rating#238,audience_status#239,audience_rating#240,review_score#241,review_label#242,Like count#243] csv


In [20]:
from pyspark.sql.functions import avg, first

# Define numerical features (calculate mean)
numerical_features = ["like_count", "tomatometer_status",
                      "tomatometer_rating", "audience_status",
                      "audience_rating"]

# Define categorical features (take first occurrence)
categorical_features = ["year", "genre", "content_rating", "production_company"]

# Perform aggregation using PySpark
df_grouped = spark_reviews_df.groupBy("movie_title").agg(
    *[avg(col).alias(col) for col in numerical_features],  # Compute mean for numerical features
    *[first(col).alias(col) for col in categorical_features]  # Take first occurrence for categorical features
)

# Show grouped data
df_grouped.show(5, truncate=False)

+-----------------------+----------+------------------+------------------+---------------+----------------+------+------------------------------------------------+--------------+--------------------+
|movie_title            |like_count|tomatometer_status|tomatometer_rating|audience_status|audience_rating |year  |genre                                           |content_rating|production_company  |
+-----------------------+----------+------------------+------------------+---------------+----------------+------+------------------------------------------------+--------------+--------------------+
|$9.99                  |NULL      |1.0               |73.0              |0.0            |55.45           |1970.0|Animation, Art House & International, Comedy    |R             |Regent Releasing    |
|'71                    |NULL      |1.0               |96.0              |1.0            |81.4639175257732|1970.0|Action & Adventure, Drama                       |PG-13         |Roadside Attractions|


In [21]:
categorical_cols = [field.name for field in df_grouped.schema.fields if field.dataType.simpleString() == "string"]

for cat_col in categorical_cols:
    df_grouped = df_grouped.withColumn(cat_col, when(col(cat_col).isNull(), "Unknown").otherwise(col(cat_col)))

# Show updated dataset
df_grouped.show(5, truncate=False)

+-----------------------+----------+------------------+------------------+---------------+----------------+------+------------------------------------------------+--------------+--------------------+
|movie_title            |like_count|tomatometer_status|tomatometer_rating|audience_status|audience_rating |year  |genre                                           |content_rating|production_company  |
+-----------------------+----------+------------------+------------------+---------------+----------------+------+------------------------------------------------+--------------+--------------------+
|$9.99                  |NULL      |1.0               |73.0              |0.0            |55.45           |1970.0|Animation, Art House & International, Comedy    |R             |Regent Releasing    |
|'71                    |NULL      |1.0               |96.0              |1.0            |81.4639175257732|1970.0|Action & Adventure, Drama                       |PG-13         |Roadside Attractions|


In [22]:
from pyspark.sql.functions import avg, col, when, median

df_grouped = df_grouped.withColumn("year", col("year").cast(IntegerType()))
df_cleaned = df_grouped.filter(col("year").isNotNull())

status_cols = ["tomatometer_status", "audience_status"]
for c in status_cols:
    df_cleaned = df_cleaned.withColumn(
        c, when(col(c).isNull(), 0).otherwise(col(c))
    )

median_rating_per_genre = df_cleaned.groupBy("genre").agg(median("tomatometer_rating").alias("median_tomato"),
                                                          median("audience_rating").alias("median_audience"))

df_cleaned = df_cleaned.join(median_rating_per_genre, on="genre", how="left")

df_cleaned = df_cleaned.withColumn(
    "tomatometer_rating",
    when(col("tomatometer_rating").isNull(), col("median_tomato")).otherwise(col("tomatometer_rating"))
).withColumn(
    "audience_rating",
    when(col("audience_rating").isNull(), col("median_audience")).otherwise(col("audience_rating"))
).drop("median_tomato", "median_audience")

# First compute average like_count per genre
avg_like_per_genre = df_cleaned.groupBy("genre").agg(avg("like_count").alias("genre_avg_like"))

# Join back the average to original DataFrame
df_with_avg = df_cleaned.join(avg_like_per_genre, on="genre", how="left")

# Fill missing like_count with the genre average
df_grouped = df_with_avg.withColumn(
    "like_count",
    when(col("like_count").isNull(), col("genre_avg_like")).otherwise(col("like_count"))
).drop("genre_avg_like")

df_grouped.show()

+--------------------+--------------------+------------------+------------------+------------------+---------------+------------------+----+--------------------+--------------------+
|               genre|         movie_title|        like_count|tomatometer_status|tomatometer_rating|audience_status|   audience_rating|year|      content_rating|  production_company|
+--------------------+--------------------+------------------+------------------+------------------+---------------+------------------+----+--------------------+--------------------+
|       Comedy, Drama|            10 Years|              40.0|               1.0|              60.0|            0.0|40.869565217391305|1970|               PG-13|Anchor Bay Entert...|
|     Comedy, Romance|                  10|57.154761904761905|               1.0|              67.0|            0.0|              53.0|1970|                   R|         Waner Bros.|
|     Comedy, Romance|           100 Girls|57.154761904761905|               1.0|    

In [23]:
df_grouped = df_grouped.filter(col("like_count").isNotNull())
df_grouped.show()

+--------------------+--------------------+------------------+------------------+------------------+---------------+------------------+----+--------------+--------------------+
|               genre|         movie_title|        like_count|tomatometer_status|tomatometer_rating|audience_status|   audience_rating|year|content_rating|  production_company|
+--------------------+--------------------+------------------+------------------+------------------+---------------+------------------+----+--------------+--------------------+
|Comedy, Science F...|       Alien Autopsy|              62.0|               1.0|              75.0|            0.0|              50.0|1970|         PG-13|   Warner Home Video|
|Comedy, Science F...|           All of Me|              67.0|               1.0|              85.0|            1.0|              67.0|1970|            PG|           HBO Video|
|Comedy, Science F...|Back to the Futur...|              62.0|               1.0|              66.0|            1.0

In [27]:
from pyspark.sql.functions import count

df_grouped.groupBy("movie_title", "year").agg(count("*").alias("count")).filter("count > 1").show()

+-----------+----+-----+
|movie_title|year|count|
+-----------+----+-----+
+-----------+----+-----+



#### VALIDATION

In [50]:
df_grouped = spark.createDataFrame(df_grouped)
df_grouped.show()

+--------------------+--------------------+------------------+------------------+------------------+---------------+------------------+----+--------------+--------------------+
|               genre|         movie_title|        like_count|tomatometer_status|tomatometer_rating|audience_status|   audience_rating|year|content_rating|  production_company|
+--------------------+--------------------+------------------+------------------+------------------+---------------+------------------+----+--------------+--------------------+
|Comedy, Science F...|       alien autopsy|              62.0|               1.0|              75.0|            0.0|              50.0|1970|         PG-13|   Warner Home Video|
|Comedy, Science F...|           all of me|              67.0|               1.0|              85.0|            1.0|              67.0|1970|            PG|           HBO Video|
|Comedy, Science F...|back to the futur...|              62.0|               1.0|              66.0|            1.0

In [51]:
df_grouped = df_grouped.withColumn("movie_title", trim(lower("movie_title")))
valval = val_reviews.withColumn("movie_title", trim(lower("movie_title")))

valval = valval.join(df_grouped, on=["movie_title", "year"], how="left")
valval.show()

+--------------------+----+---------+--------+--------------------+-----+----------+------------------+------------------+---------------+---------------+--------------+------------------+
|         movie_title|year|   tconst|numVotes|              review|genre|like_count|tomatometer_status|tomatometer_rating|audience_status|audience_rating|content_rating|production_company|
+--------------------+----+---------+--------+--------------------+-----+----------+------------------+------------------+---------------+---------------+--------------+------------------+
|             cabiria|1914|tt0003740|  3452.0|It is a little kn...| NULL|      NULL|              NULL|              NULL|           NULL|           NULL|          NULL|              NULL|
|     a man there was|1917|tt0008663|  1882.0|This is a faithfu...| NULL|      NULL|              NULL|              NULL|           NULL|           NULL|          NULL|              NULL|
|           j'accuse!|1919|tt0010307|  1692.0|The only 

In [52]:
categorical_cols = [field.name for field in valval.schema.fields if field.dataType.simpleString() == "string"]

for cat_col in categorical_cols:
    valval = valval.withColumn(cat_col, when(col(cat_col).isNull(), "Unknown").otherwise(col(cat_col)))

# Show updated dataset
valval.show(5, truncate=False)

+-------------------------+----+---------+--------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [53]:
from pyspark.sql.functions import col, when, avg, expr

avg_like = valval.select(avg("like_count")).collect()[0][0]

# Fill missing `like_count` with global average
val_reviews_updated = valval.withColumn(
    "like_count",
    when(col("like_count").isNull(), avg_like).otherwise(col("like_count"))
)

In [54]:
median_tomato = val_reviews_updated.approxQuantile("tomatometer_rating", [0.5], 0)[0]
median_audience = val_reviews_updated.approxQuantile("audience_rating", [0.5], 0)[0]

# Fill missing `tomatometer_rating` and `audience_rating` with their medians
val_reviews_updated = val_reviews_updated.withColumn(
    "tomatometer_rating",
    when(col("tomatometer_rating").isNull(), median_tomato).otherwise(col("tomatometer_rating"))
).withColumn(
    "audience_rating",
    when(col("audience_rating").isNull(), median_audience).otherwise(col("audience_rating"))
)

val_reviews_updated.show(5)

+--------------------+----+---------+--------+--------------------+-------+------------------+------------------+------------------+---------------+---------------+--------------+------------------+
|         movie_title|year|   tconst|numVotes|              review|  genre|        like_count|tomatometer_status|tomatometer_rating|audience_status|audience_rating|content_rating|production_company|
+--------------------+----+---------+--------+--------------------+-------+------------------+------------------+------------------+---------------+---------------+--------------+------------------+
|             cabiria|1914|tt0003740|  3452.0|It is a little kn...|Unknown|52.333333333333336|              NULL|              44.0|           NULL|           39.0|       Unknown|           Unknown|
|     a man there was|1917|tt0008663|  1882.0|This is a faithfu...|Unknown|52.333333333333336|              NULL|              44.0|           NULL|           39.0|       Unknown|           Unknown|
|    

In [55]:
val_reviews_updated = val_reviews_updated.fillna(0)
val_reviews_updated.show(5)

+--------------------+----+---------+--------+--------------------+-------+------------------+------------------+------------------+---------------+---------------+--------------+------------------+
|         movie_title|year|   tconst|numVotes|              review|  genre|        like_count|tomatometer_status|tomatometer_rating|audience_status|audience_rating|content_rating|production_company|
+--------------------+----+---------+--------+--------------------+-------+------------------+------------------+------------------+---------------+---------------+--------------+------------------+
|             cabiria|1914|tt0003740|  3452.0|It is a little kn...|Unknown|52.333333333333336|               0.0|              44.0|            0.0|           39.0|       Unknown|           Unknown|
|     a man there was|1917|tt0008663|  1882.0|This is a faithfu...|Unknown|52.333333333333336|               0.0|              44.0|            0.0|           39.0|       Unknown|           Unknown|
|    

In [33]:
val_reviews_updated.count()

955

In [58]:
#val_reviews_updated.coalesce(1).write.csv("validation_reviews.csv", mode='overwrite')
val_reviews_pd = val_reviews_updated.toPandas()

In [59]:
val_reviews_pd.to_csv("validation_reviews.csv", index=False)

#### TEST

In [27]:
spark = SparkSession.builder.master("local[*]").appName("Debug").getOrCreate()

In [30]:
# Instead of directly using pd.read_json, try loading and cleaning the JSON data:
with open("test_reviews.json", "r") as f:
    data = "[" + ",".join(line.strip() for line in f if line.strip()) + "]"  # Join lines and wrap in []

test_reviews = pd.read_json(data, orient='records')

test_reviews.head()

  test_reviews = pd.read_json(data, orient='records')


Unnamed: 0,tconst,review
0,tt0054152,This movie is a good example of how a story ca...
1,tt0063210,This too stars Frank Sinatra as a Miami detect...
2,tt0070246,Although it has certain stylistic similarities...
3,tt0102701,Granted my copy of the VCD print was horrible ...
4,tt0133240,"Upfront, I thoroughly enjoyed this movie beyon..."


In [31]:
df_grouped = df_grouped.toPandas()

In [33]:
test_data = test_data.toPandas()

In [34]:
test_reviews = test_data.merge(test_reviews, on="tconst", how="left")

In [38]:
test_reviews["year"] = pd.to_numeric(test_reviews["year"], errors="coerce").astype("Int32")

In [39]:
df_grouped["movie_title"] = df_grouped["movie_title"].str.lower().str.strip()
test_reviews["movie_title"] = test_reviews["movie_title"].str.lower().str.strip()

# Merge (left join on movie_title and year)
test_test = pd.merge(test_reviews, df_grouped, how="left", on=["movie_title", "year"])

# Show the result
print(test_test.head())

      tconst                          movie_title  numVotes  year  \
0  tt0014972                  he who gets slapped    3654.0  1924   
1  tt0015016                       the iron horse    2136.0  1924   
2  tt0015174  die nibelungen: kriemhild's revenge    4341.0  1924   
3  tt0015214                              at 3:25    1724.0  1925   
4  tt0015863                              go west    4188.0  1925   

                                              review genre  like_count  \
0  Before I saw "He Who Get's Slapped" my 3 favor...   NaN         NaN   
1  This film was on my radar for a long time.\nSa...   NaN         NaN   
2  Please see also my comment on Die Nibelungen p...   NaN         NaN   
3  One has to love these early shorts -- look at ...   NaN         NaN   
4  No, as most critics have said, this isn't one ...   NaN         NaN   

   tomatometer_status  tomatometer_rating  audience_status  audience_rating  \
0                 NaN                 NaN              NaN   

In [40]:
categorical_cols = test_test.select_dtypes(include=["object"]).columns.tolist()

for col_name in categorical_cols:
    test_test[col_name] = test_test[col_name].fillna("Unknown")

print(test_test.head(5))

      tconst                          movie_title  numVotes  year  \
0  tt0014972                  he who gets slapped    3654.0  1924   
1  tt0015016                       the iron horse    2136.0  1924   
2  tt0015174  die nibelungen: kriemhild's revenge    4341.0  1924   
3  tt0015214                              at 3:25    1724.0  1925   
4  tt0015863                              go west    4188.0  1925   

                                              review    genre  like_count  \
0  Before I saw "He Who Get's Slapped" my 3 favor...  Unknown         NaN   
1  This film was on my radar for a long time.\nSa...  Unknown         NaN   
2  Please see also my comment on Die Nibelungen p...  Unknown         NaN   
3  One has to love these early shorts -- look at ...  Unknown         NaN   
4  No, as most critics have said, this isn't one ...  Unknown         NaN   

   tomatometer_status  tomatometer_rating  audience_status  audience_rating  \
0                 NaN                 NaN  

In [42]:
import pandas as pd
import numpy as np

test_reviews_updated = test_test.copy()

avg_like = test_reviews_updated["like_count"].mean()
test_reviews_updated["like_count"] = test_reviews_updated["like_count"].fillna(avg_like)

median_tomato = test_reviews_updated["tomatometer_rating"].median()
median_audience = test_reviews_updated["audience_rating"].median()

test_reviews_updated["tomatometer_rating"] = test_reviews_updated["tomatometer_rating"].fillna(median_tomato)
test_reviews_updated["audience_rating"] = test_reviews_updated["audience_rating"].fillna(median_audience)

test_reviews_updated = test_reviews_updated.fillna(0)
print(test_reviews_updated.head())

      tconst                          movie_title  numVotes  year  \
0  tt0014972                  he who gets slapped    3654.0  1924   
1  tt0015016                       the iron horse    2136.0  1924   
2  tt0015174  die nibelungen: kriemhild's revenge    4341.0  1924   
3  tt0015214                              at 3:25    1724.0  1925   
4  tt0015863                              go west    4188.0  1925   

                                              review    genre  like_count  \
0  Before I saw "He Who Get's Slapped" my 3 favor...  Unknown    63.10665   
1  This film was on my radar for a long time.\nSa...  Unknown    63.10665   
2  Please see also my comment on Die Nibelungen p...  Unknown    63.10665   
3  One has to love these early shorts -- look at ...  Unknown    63.10665   
4  No, as most critics have said, this isn't one ...  Unknown    63.10665   

   tomatometer_status  tomatometer_rating  audience_status  audience_rating  \
0                 0.0                40.0  

In [43]:
test_reviews_updated.to_csv("test_reviews.csv", index=False)

# Final Data

In [None]:
df_review = spark.read.json("/content/Big_Data_IMDb/movie_reviews/ibdm_reviews.json")

In [None]:
df_review.show(truncate=False)

+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [None]:
merged_df = final_df.join(df_reviews, on='tconst', how='left')
merged_df.show()

+---------+--------------------+----+--------+-----+-----+--------------+--------------+------------------+------------------+------------------+---------------+---------------+------------+----------+---------+--------------+--------------------+
|   tconst|         movie_title|year|numVotes|label|genre|review_content|content_rating|production_company|tomatometer_status|tomatometer_rating|audience_status|audience_rating|review_score|like_count|label_int|tomatoes_label|              review|
+---------+--------------------+----+--------+-----+-----+--------------+--------------+------------------+------------------+------------------+---------------+---------------+------------+----------+---------+--------------+--------------------+
|tt0009369|              mickey|1918|    1119|false| NULL|          NULL|          NULL|              NULL|              NULL|              NULL|           NULL|           NULL|        NULL|      NULL|        0|          NULL|Mack Sennett had ...|
|tt00093

In [None]:
merged_df.count()

8294

In [None]:
# merged_df = merged_df.withColumn(
#     "reviews",
#     when(
#         col("reviews").isNotNull() & (col("reviews") != "No Reviews"),
#         array(col("reviews"))
#     )
# )

merged_df = merged_df.withColumn(
    "reviews",
    when(
        (col("reviews").isNotNull()) & (col("reviews") != "No Reviews"),
        concat(lit("['"), concat_ws("', '", split(col("reviews"), ",\s*")), lit("']"))
    ))

merged_df.show()

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/usr/local/lib/python3.11/dist-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/dist-packages/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
                          ^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/lib/python3.11/socket.py", line 718, in readinto
    return self._sock.recv_into(b)
           ^^^^^^^^^^^^^^^^^^^^^^^
KeyboardInterrupt


KeyboardInterrupt: 

In [None]:
def handle_reviews(df):
    """
    Creates a 'review' column using 'review_content' if available, otherwise 'reviews'.
    Drops 'review_content' and 'reviews' after merging.
    """
    df = df.withColumn("reviews", when(col("review_content").isNotNull(), col("review_content"))
                                  .otherwise(col("review")))

    # drop original startYear and endYear
    df = df.drop("review_content", "review")

    #df = df.withColumnRenamed("review", "reviews")

    return df

df_final = handle_reviews(merged_df)
df_final.show()

+---------+--------------------+----+--------+-----+-----+--------------+------------------+------------------+------------------+---------------+---------------+------------+----------+---------+--------------+--------------------+
|   tconst|         movie_title|year|numVotes|label|genre|content_rating|production_company|tomatometer_status|tomatometer_rating|audience_status|audience_rating|review_score|like_count|label_int|tomatoes_label|             reviews|
+---------+--------------------+----+--------+-----+-----+--------------+------------------+------------------+------------------+---------------+---------------+------------+----------+---------+--------------+--------------------+
|tt0009369|              mickey|1918|    1119|false| NULL|          NULL|              NULL|              NULL|              NULL|           NULL|           NULL|        NULL|      NULL|        0|          NULL|Mack Sennett had ...|
|tt0009369|              mickey|1918|    1119|false| NULL|          

In [None]:
df_final.coalesce(1).write.csv("movie_reviews/final_reviews_data.csv", header=True, mode='overwrite')

In [None]:
df_final = spark.read.csv("movie_reviews/final_reviews_data.csv", header=True)

df_final.select("reviews").show(truncate=False)

+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [None]:
from pyspark.sql.functions import isnan, when, count, col

df_final.select([count(when(isnan(c), c)).alias(c) for c in df_final.columns]).show()

+------+-----------+----+--------+-----+-----+--------------+------------------+------------------+------------------+---------------+---------------+------------+----------+---------+--------------+-------+
|tconst|movie_title|year|numVotes|label|genre|content_rating|production_company|tomatometer_status|tomatometer_rating|audience_status|audience_rating|review_score|like_count|label_int|tomatoes_label|reviews|
+------+-----------+----+--------+-----+-----+--------------+------------------+------------------+------------------+---------------+---------------+------------+----------+---------+--------------+-------+
|     0|          0|   0|       0|    0|    0|             0|                 0|                 0|                 0|              0|              0|           0|         0|        0|             0|      0|
+------+-----------+----+--------+-----+-----+--------------+------------------+------------------+------------------+---------------+---------------+------------+-----

## Cleaning reviews for tokenization (IGNORE THIS)

In [None]:
import re
import unicodedata
import pyspark.sql.functions as F
from pyspark.sql.types import ArrayType, StringType

def robust_review_extractor(text):
    # Unicode normalization
    text = unicodedata.normalize("NFKD", text)
    text = text.replace("“", '"').replace("”", '"').replace("’", "'").replace("‘", "'")
    text = re.sub(r'[\x00-\x1F\x7F-\x9F]', '', text)

    # Remove starting and ending square brackets, if present
    text = text.strip()
    if text.startswith('[') and text.endswith(']'):
        text = text[1:-1]

    reviews = []
    current_review = ''
    in_review = False
    quote_char = ''

    i = 0
    while i < len(text):
        char = text[i]

        if not in_review:
            if char in ['"', "'"]:
                in_review = True
                quote_char = char
                current_review = ''
        else:
            if char == quote_char:
                # Check if next char is comma or whitespace indicating end of review
                if (i + 1 == len(text)) or text[i + 1] in [',', ' ']:
                    reviews.append(current_review.strip())
                    in_review = False
                    quote_char = ''
                    current_review = ''
                    # Skip the comma if present
                    if i + 1 < len(text) and text[i + 1] == ',':
                        i += 1
                else:
                    current_review += char
            else:
                current_review += char
        i += 1

    # Final clean-up to remove extra spaces and empty strings
    cleaned_reviews = [re.sub(r'\s+', ' ', r).strip() for r in reviews if r.strip()]
    return cleaned_reviews

# Register UDF
robust_review_extractor_udf = F.udf(robust_review_extractor, ArrayType(StringType()))

# Apply the UDF
df_final_fixed = df_final.withColumn(
    "clean_reviews_list",
    robust_review_extractor_udf(F.col("reviews"))
)

df_final_fixed.select("clean_reviews_list").show(truncate=False, n=10)

+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [None]:
df_final_fixed.printSchema()


root
 |-- tconst: string (nullable = true)
 |-- movie_title: string (nullable = true)
 |-- year: string (nullable = true)
 |-- numVotes: string (nullable = true)
 |-- label: string (nullable = true)
 |-- tomatometer_status: string (nullable = true)
 |-- tomatometer_rating: string (nullable = true)
 |-- audience_status: string (nullable = true)
 |-- audience_rating: string (nullable = true)
 |-- review_type: string (nullable = true)
 |-- review_score: string (nullable = true)
 |-- Like count: string (nullable = true)
 |-- genre: string (nullable = true)
 |-- label_int: string (nullable = true)
 |-- tomatoes_label: string (nullable = true)
 |-- reviews: string (nullable = true)
 |-- clean_reviews_list: array (nullable = true)
 |    |-- element: string (containsNull = true)



In [None]:
from pyspark.sql.functions import col, explode, concat_ws, split

# Step 1: Convert the list of reviews into a single string with "|" as the separator
df_fixed = df_final_fixed.withColumn("clean_reviews_list", concat_ws("|", col("clean_reviews_list")))

# Step 2: Split the newly formatted column back into an array using "|"
df_fixed = df_fixed.withColumn("clean_reviews_list", split(col("clean_reviews_list"), "\\|"))

# Step 3: Explode the list to create one row per review
df_exploded = df_fixed.withColumn("review", explode(col("clean_reviews_list")))

# Step 4: Drop the original list column (optional)
df_exploded = df_exploded.drop("clean_reviews_list")

# Show the transformed DataFrame
df_exploded.show(10, truncate=False)

+---------+-----------+----+--------+-----+------------------+------------------+---------------+---------------+-----------+------------+----------+-----+---------+--------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [None]:
df_final_fixed.write.json("movie_reviews/final_cleaned_reviews.json", mode="overwrite")

Py4JJavaError: An error occurred while calling o307.json.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 20.0 failed 1 times, most recent failure: Lost task 0.0 in stage 20.0 (TID 20) (fc4ce7013ce9 executor driver): org.apache.spark.SparkException: [TASK_WRITE_FAILED] Task failed while writing rows to file:/content/movie_reviews/final_cleaned_reviews.json.
	at org.apache.spark.sql.errors.QueryExecutionErrors$.taskFailedWhileWritingRowsError(QueryExecutionErrors.scala:775)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:420)
	at org.apache.spark.sql.execution.datasources.WriteFilesExec.$anonfun$doExecuteWrite$1(WriteFiles.scala:100)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:893)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:893)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "<ipython-input-19-2dd57505eaa1>", line 8, in robust_review_extractor
TypeError: normalize() argument 2 must be str, not None

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:572)
	at org.apache.spark.sql.execution.python.BasePythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:94)
	at org.apache.spark.sql.execution.python.BasePythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:75)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:525)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at org.apache.spark.sql.execution.datasources.FileFormatDataWriter.writeWithIterator(FileFormatDataWriter.scala:91)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$executeTask$1(FileFormatWriter.scala:403)
	at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1397)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:410)
	... 17 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2856)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2792)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2791)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2791)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1247)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3060)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2994)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2983)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:989)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2393)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$executeWrite$4(FileFormatWriter.scala:307)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.writeAndCommit(FileFormatWriter.scala:271)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeWrite(FileFormatWriter.scala:304)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:190)
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:190)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:113)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:111)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:125)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:107)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:107)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:437)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:85)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:83)
	at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:142)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:869)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:391)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:364)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:243)
	at org.apache.spark.sql.DataFrameWriter.json(DataFrameWriter.scala:784)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.spark.SparkException: [TASK_WRITE_FAILED] Task failed while writing rows to file:/content/movie_reviews/final_cleaned_reviews.json.
	at org.apache.spark.sql.errors.QueryExecutionErrors$.taskFailedWhileWritingRowsError(QueryExecutionErrors.scala:775)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:420)
	at org.apache.spark.sql.execution.datasources.WriteFilesExec.$anonfun$doExecuteWrite$1(WriteFiles.scala:100)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:893)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:893)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	... 1 more
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "<ipython-input-19-2dd57505eaa1>", line 8, in robust_review_extractor
TypeError: normalize() argument 2 must be str, not None

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:572)
	at org.apache.spark.sql.execution.python.BasePythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:94)
	at org.apache.spark.sql.execution.python.BasePythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:75)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:525)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at org.apache.spark.sql.execution.datasources.FileFormatDataWriter.writeWithIterator(FileFormatDataWriter.scala:91)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$executeTask$1(FileFormatWriter.scala:403)
	at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1397)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:410)
	... 17 more
