In [0]:
# Load MovieLens and IMDb
movielens_df = spark.read.csv("/FileStore/tables/cleaned_MovieLensData.csv", header=True, inferSchema=True)
imdb_df = spark.read.csv("/FileStore/tables/cleaned_imdb.csv", header=True, inferSchema=True)


In [0]:
from pyspark.sql.functions import col, regexp_extract, trim, lower

# Extract clean title and year for MovieLensData
movielens_df = movielens_df.withColumn("clean_title", lower(trim(regexp_extract("title", r"^(.*)\s+\((\d{4})\)$", 1))))
movielens_df = movielens_df.withColumn("year", regexp_extract("title", r"^(.*)\s+\((\d{4})\)$", 2))


In [0]:
#Extract and clean title and year for IMDB data
imdb_df = imdb_df.withColumn("clean_title", lower(trim(imdb_df["primaryTitle"])))
imdb_df = imdb_df.withColumnRenamed("startYear", "year")
imdb_df = imdb_df.withColumnRenamed("genres", "genres_imdb")


In [0]:
#Merge the datasets

from pyspark.sql.functions import coalesce

movielens_df = movielens_df.withColumn("year", movielens_df["year"].cast("string"))
imdb_df = imdb_df.withColumn("year", imdb_df["year"].cast("string"))

merged_df = movielens_df.join(imdb_df, on=["clean_title", "year"], how="outer")
merged_df = merged_df.withColumn("combined_genres", coalesce(col("genres"), col("genres_imdb")))


In [0]:
from pyspark.sql.functions import round, coalesce

# Scale MovieLens rating to 10-point scale
merged_df = merged_df.withColumn("movielens_rating_scaled", col("rating") * 2)

# Calculate smart combined rating
merged_df = merged_df.withColumn(
    "combined_rating",
    round(
        coalesce(
            (col("movielens_rating_scaled") + col("averageRating")) / 2,
            col("movielens_rating_scaled"),
            col("averageRating")
        ),
        2
    )
)


In [0]:
from pyspark.sql.functions import count as count_

# Count how many users rated each movie in MovieLens
ml_votes_df = movielens_df.groupBy("clean_title").agg(count_("userId").alias("ml_votes_clean"))

# Drop old vote column to avoid ambiguity
if "ml_votes_clean" in merged_df.columns:
    merged_df = merged_df.drop("ml_votes_clean")

# Join clean vote counts
merged_df = merged_df.join(ml_votes_df, on="clean_title", how="left")

# Fill missing values
merged_df = merged_df.fillna({"ml_votes_clean": 0, "numVotes": 0})

# One row per movie (title + year)
movie_level_df = merged_df.dropDuplicates(["clean_title", "year"])

# Add total popularity
movie_level_df = movie_level_df.withColumn(
    "total_popularity",
    (col("ml_votes_clean") + col("numVotes")).cast("int")
)


In [0]:
from pyspark.sql.functions import split, size, explode

# Split and count genres
movie_level_df = movie_level_df.withColumn("genre_list", split(col("combined_genres"), "\|"))
movie_level_df = movie_level_df.withColumn("num_genres", size(col("genre_list")))

# Explode genres make it qeual
exploded_df = movie_level_df.withColumn("genre", explode(col("genre_list")))


In [0]:
# To show results
exploded_df.select(
    col("clean_title").alias("movie_title"),
    col("year"),
    col("genre"),
    col("combined_rating"),
    col("total_popularity")
).dropna(subset=["movie_title", "genre", "combined_rating", "total_popularity"]).orderBy("movie_title").display()


movie_title,year,genre,combined_rating,total_popularity
,,(no genres listed),10.0,20
#,2022.0,Drama,4.0,7
#69 samskar colony,2022.0,Romance,6.3,598
"#73, shaanthi nivaasa",2007.0,"Drama,Family,Musical",7.5,251
#blackmendream,2014.0,Documentary,6.0,18
#blessed,2015.0,"Comedy,Documentary",5.6,10
#digitallivesmatter,2016.0,Comedy,6.5,53
#float,2022.0,Horror,2.6,614
#gawwezni,2022.0,Comedy,5.2,72
#handozenryoku,2020.0,"Drama,Sport",5.5,39


In [0]:
# Save the final cleaned movie table to a CSV in the Databricks file system
movie_level_df.select(
    col("clean_title").alias("movie_title"),
    col("year"),
    col("combined_rating"),
    col("total_popularity")
).dropDuplicates(["movie_title", "year"]) \
 .coalesce(1) \
 .write.mode("overwrite") \
 .option("header", True) \
 .csv("/FileStore/final_movie_table")
