In [None]:
!pip install -q pyspark[connect]==3.5.1


In [None]:
import os

if "SPARK_HOME" in os.environ:
    del os.environ["SPARK_HOME"]

if "JAVA_HOME" in os.environ:
    del os.environ["JAVA_HOME"]

print("Environment cleaned.")


Environment cleaned.


In [None]:
!pip install -q pyspark[connect]==3.5.1


In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("Clean PySpark 3.5.1 Setup") \
    .getOrCreate()

spark


Imports

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, explode, split, avg, count, desc, regexp_extract, year, from_unixtime, lit, expr

import matplotlib.pyplot as plt


Creating the Spark Session

In [None]:
spark = SparkSession.builder.appName("MoviesRatingsDeepPipeline").getOrCreate()

Loading the Datasets


In [None]:
movies = spark.read.csv("/content/movies.csv", header=True, inferSchema=True)
ratings = spark.read.csv("/content/ratings.csv", header=True, inferSchema=True)

Merging the Datasets


In [None]:
df = movies.join(ratings, on="movieId", how="inner").dropDuplicates()
df = df.fillna({'title': 'Unknown', 'genres': 'Unknown'})

Dropping the duplicates in each dataset

In [None]:
movies = movies.dropDuplicates()
ratings = ratings.dropDuplicates()

 Dropping the duplicate rows created after join

In [None]:
df = df.dropDuplicates()

Handling the missing values

In [None]:
df = df.fillna({
    "title": "Unknown",
    "genres": "Unknown"
})

Inspecting the schemas

In [None]:
print("=== Movies Schema ===")
movies.printSchema()
print("=== Ratings Schema ===")
ratings.printSchema()


=== Movies Schema ===
root
 |-- movieId: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- genres: string (nullable = true)

=== Ratings Schema ===
root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: double (nullable = true)
 |-- timestamp: integer (nullable = true)



In [None]:
from pyspark.sql.functions import explode, split, size, col

df = df.withColumn("genre_exploded", explode(split(col("genres"), "\\|")))
df = df.withColumn("num_genres", size(split(col("genres"), "\\|")))


Exploring the genres to create more rows

Ratings

In [None]:
from pyspark.sql.functions import col, when, split, size, regexp_extract, year, from_unixtime

df = df.withColumn("release_year", regexp_extract(col("title"), r"\((\d{4})\)", 1).cast("int")) \
       .withColumn("movie_age", 2025 - col("release_year")) \
       .withColumn("high_rating_flag", when(col("rating") >= 4, 1).otherwise(0)) \
       .withColumn("rating_category",
                   when(col("rating") >= 4.5, "Excellent")
                   .when(col("rating") >= 3.5, "Good")
                   .when(col("rating") >= 2.5, "Average")
                   .otherwise("Low")) \
       .withColumn("num_genres", size(split(col("genres"), "\\|")))


Exploring genres after creating all feature columns

In [None]:
from pyspark.sql.functions import explode

df = df.withColumn("genre_exploded", explode(split(col("genres"), "\\|")))


Ranking after all columns exist

In [None]:
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, desc

window_genre = Window.partitionBy("genre_exploded").orderBy(desc("rating"))
df = df.withColumn("genre_rank", row_number().over(window_genre))


Movie level collection

In [None]:
from pyspark.sql.functions import avg, stddev, count, sum, max, min, col, when

movie_agg = df.groupBy("movieId","title").agg(
    avg("rating").alias("avg_rating"),
    stddev("rating").alias("rating_stddev"),
    count("rating").alias("num_ratings"),
    sum("high_rating_flag").alias("num_high_rated"),
    max("rating").alias("max_rating"),
    min("rating").alias("min_rating"),
    min("timestamp").alias("first_rating_ts"),
    max("timestamp").alias("last_rating_ts")
)

movie_agg = movie_agg.withColumn("high_rating_ratio", col("num_high_rated")/col("num_ratings")) \
                     .withColumn("rating_range", col("max_rating")-col("min_rating")) \
                     .withColumn("days_active", (col("last_rating_ts")-col("first_rating_ts"))/86400) \
                     .withColumn("movie_engagement_score", col("num_ratings")*col("avg_rating")) \
                     .withColumn("movie_popularity_score", col("high_rating_ratio")*col("num_ratings")) \
                     .withColumn("rare_movie_flag", when((col("num_ratings")<100) & (col("avg_rating")>=4),1).otherwise(0))


Genre-level collections

In [None]:
genre_agg = df.groupBy("genre_exploded").agg(
    avg("rating").alias("avg_rating_genre"),
    count("movieId").alias("num_movies_genre"),
    sum("high_rating_flag").alias("num_high_rated_genre")
).withColumn(
    "genre_popularity_score",
    col("avg_rating_genre") * col("num_movies_genre")
)

User Level collections

In [None]:
user_agg = df.groupBy("userId").agg(
    avg("rating").alias("user_avg_rating"),
    stddev("rating").alias("user_rating_stddev"),
    count("movieId").alias("num_movies_rated"),
    sum("high_rating_flag").alias("num_high_ratings")
).withColumn(
    "high_rating_ratio_user",
    col("num_high_ratings") / col("num_movies_rated")
).withColumn(
    "influential_user_flag",
    when(
        (col("num_movies_rated") >= 50) &
        (col("high_rating_ratio_user") >= 0.8),
        1
    ).otherwise(0)
)

User–Genre interaction features

In [None]:
user_genre = df.groupBy("userId", "genre_exploded").agg(
    avg("rating").alias("user_genre_avg_rating"),
    count("movieId").alias("num_movies_rated_in_genre"),
    sum("high_rating_flag").alias("num_high_rated_in_genre")
).withColumn(
    "user_high_rating_ratio_genre",
    col("num_high_rated_in_genre") / col("num_movies_rated_in_genre")
)

Movie–Genre cross features

In [None]:
movie_genre_cross = df.groupBy("movieId", "genre_exploded").agg(
    avg("rating").alias("movie_genre_avg_rating")
)


User–Movie interaction

In [None]:
user_movie_interaction = df.join(
    movie_agg.select("movieId", "avg_rating"),
    on="movieId"
)

user_movie_interaction = user_movie_interaction.withColumn(
    "user_movie_diff",
    col("rating") - col("avg_rating")
)

In [None]:
from pyspark.sql import Window
from pyspark.sql.functions import lag, col

window_movie = Window.partitionBy("movieId").orderBy("timestamp")

df = df.withColumn("prev_rating_ts", lag("timestamp").over(window_movie))
df = df.withColumn("rating_time_gap", col("timestamp") - col("prev_rating_ts"))


Ranking movies within genre

In [None]:
window_genre = Window.partitionBy("genre_exploded").orderBy(desc("avg_rating"))

movie_ranked = movie_agg.join(
    df.select("movieId", "genre_exploded").distinct(),
    on="movieId"
)

movie_ranked = movie_ranked.withColumn(
    "genre_rank",
    row_number().over(window_genre)
)

top5_per_genre = movie_ranked.filter(col("genre_rank") <= 5)

In [None]:
from pyspark.sql.functions import col, count, desc, lit, floor, concat, expr


In [None]:
df = df.withColumn("release_year", regexp_extract(col("title"), r"\((\d{4})\)", 1).cast("int"))

In [None]:
df = df.withColumn("genre", explode(split(col("genres"), "\\|")))


In [None]:
df = df.withColumn("high_rating_flag", expr("CASE WHEN rating >= 4 THEN 1 ELSE 0 END"))

In [None]:
df = df.withColumn("movie_age", lit(2025) - col("release_year"))

In [None]:
df = df.withColumn("rating_year", year(from_unixtime(col("timestamp"))))

In [None]:
movies_per_year = (
    df.select("movieId", "release_year")
      .distinct()
      .groupBy("release_year")
      .agg(count("*").alias("movie_count"))
      .orderBy("release_year")
)

In [None]:
ratings_per_year = (
    df.groupBy("rating_year")
      .agg(count("*").alias("rating_count"))
      .orderBy("rating_year")
)

In [None]:
genre_avg_rating = (
    df.groupBy("genre")
      .agg(avg("rating").alias("avg_rating"))
      .orderBy(desc("avg_rating"))
)

In [None]:
genre_pop = (
    df.groupBy("genre")
      .agg(count("*").alias("rating_count"), avg("rating").alias("avg_rating"))
      .withColumn("popularity_score", expr("rating_count * avg_rating"))
      .orderBy(desc("popularity_score"))
)

TOP 10 GENRES BY MOVIE COUNT

In [None]:
print("\n=== TOP 10 GENRES BY MOVIE COUNT ===")

top_genres_fast = (
    df.groupBy("genre_exploded")
      .agg(count("*").alias("count"))
      .orderBy(desc("count"))
      .limit(10)
      .withColumn("bars", expr("repeat('#', floor(count/500))"))
      .withColumn(
          "visual",
          concat(
              col("genre_exploded"),
              lit(" | "),
              col("bars"),
              lit(" ("), col("count"), lit(")")
          )
      )
)

top_genres_fast.select("visual").show(truncate=False)



=== TOP 10 GENRES BY MOVIE COUNT ===
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [None]:
print("\n=== AVERAGE RATING PER GENRE ===")

genre_rating = (
    df.groupBy("genre_exploded")
      .agg(avg("rating").alias("avg_rating"))
      .withColumn("stars", expr("repeat('*', floor(avg_rating * 2))"))
      .withColumn(
          "visual",
          concat(
              col("genre_exploded"), lit(" | "),
              col("stars"), lit(" ("), expr("round(avg_rating,2)"), lit(")")
          )
      )
      .orderBy(desc("avg_rating"))
      .limit(10)
)

genre_rating.select("visual").show(truncate=False)



=== AVERAGE RATING PER GENRE ===
+--------------------------+
|visual                    |
+--------------------------+
|Film-Noir | ******* (3.84)|
|War | ******* (3.73)      |
|Drama | ******* (3.67)    |
|Crime | ******* (3.67)    |
|Mystery | ******* (3.67)  |
|IMAX | ******* (3.64)     |
|Animation | ******* (3.62)|
|Musical | ******* (3.6)   |
|Romance | ******* (3.58)  |
|Children | ******* (3.54) |
+--------------------------+



In [None]:
print("\n=== TOP 10 MOVIES BY ENGAGEMENT SCORE ===")

top_movies = (
    movie_agg.orderBy(desc("movie_engagement_score"))
      .limit(10)
      .withColumn("bars", expr("repeat('#', floor(movie_engagement_score/50))"))
      .withColumn(
          "visual",
          concat(
              col("title"), lit(" | "),
              col("bars"), lit(" ("), expr("round(movie_engagement_score,1)"), lit(")")
          )
      )
)

top_movies.select("visual").show(truncate=False)


In [None]:
df_final = df.select(
    "movieId",
    "title",
    "genres",
    "userId",
    "rating",
    "timestamp",
    "high_rating_flag",
    "num_genres",
    "rating_category",
    "genre_rank",
    "movie_age",
    "genre_exploded"
)

In [None]:
df_final = df_final.persist()

In [None]:


df_final.write.mode("overwrite").parquet("/content/movies_ratings_final.parquet")
movie_agg.write.mode("overwrite").parquet("/content/movies_ratings_aggregated.parquet")
top5_per_genre.write.mode("overwrite").parquet("/content/movies_ratings_top5.parquet")