In [0]:
# Goal: Build a data pipeline that ingests and transforms movie rating data, producing insights like:
# Popular movies
# Active users
# Genre preferences

ratings = spark.read.option("delimiter", "\t") \
    .csv("/Volumes/workspace/default/data/u.data") \
    .toDF("userId", "movieId", "rating", "timestamp")

ratings = ratings.withColumn("userId", ratings["userId"].cast("int")) \
                 .withColumn("movieId", ratings["movieId"].cast("int")) \
                 .withColumn("rating", ratings["rating"].cast("float")) \
                 .withColumn("timestamp", ratings["timestamp"].cast("long"))

ratings.show(5)


In [0]:
movie = spark.read.option("delimiter", "|") \
    .csv("/Volumes/workspace/default/data/u.item")


movies = movie.selectExpr(
    "_c0 as movieId", "_c1 as title", "_c2 as release_date", "_c5 as imdb_url"
)

movies = movies.withColumn("movieId", movies["movieId"].cast("int"))

movies.show(5)

In [0]:
df_joined = ratings.join(movies, on="movieId", how="inner")
df_joined.show()

In [0]:
# Most Popular Movies (by number of ratings)

from pyspark.sql.functions import avg, count

df_popular = df_joined.groupBy("title").agg(count("rating").alias("rating_count")).orderBy("rating_count", ascending=False)
df_popular.show()

In [0]:
# Top Rated Movies (min 50 ratings)

df_min50 =  df_popular.filter("rating_count >= 50")
display(df_min50)


In [0]:
df_activeUsers = df_joined.groupBy("userId").agg(count("rating").alias("movies_rated")).orderBy("movies_rated", ascending=False)
display(df_activeUsers)

In [0]:
df_joined.createOrReplaceTempView("movie_data")

In [0]:
%sql

SELECT title, count(*) AS movie_ratings
from movie_data
GROUP BY title
HAVING movie_ratings >= 50
ORDER BY movie_ratings desc