In [0]:
# This notebook analyzes the MovieLens dataset using PySpark.
# All transformations are performed using the DataFrame API on distributed storage (Databricks DBFS).


In [0]:
#import functions
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from datetime import datetime
from pyspark.sql.window import Window


In [0]:
# Initialize Spark session for the assignment.
# Hive support is not required as we are working only with DataFrames and CSV files.

spark = SparkSession.builder\
        .appName("Assignment 2")\
            .enableHiveSupport()\
                .getOrCreate()

In [0]:
# # Load MovieLens datasets (movies, ratings, tags) into Spark DataFrames with inferred schema.


movies = spark.read.format('csv')\
            .option('header','true')\
            .option('inferSchema','true')\
            .load('/Volumes/workspace/default/spark_datasets/movies.csv')

ratings = spark.read.format('csv')\
            .option('header','true')\
            .option('inferSchema','true')\
            .load('/Volumes/workspace/default/spark_datasets/ratings.csv')

tags = spark.read.format('csv')\
            .option('header','true')\
            .option('inferSchema','true')\
            .load('/Volumes/workspace/default/spark_datasets/tags.csv')


In [0]:
# see the dataframes that will be used
movies.show(5)
ratings.show(5)
tags.show(5)

+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|
|      5|Father of the Bri...|              Comedy|
+-------+--------------------+--------------------+
only showing top 5 rows
+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|      1|   4.0|964982703|
|     1|      3|   4.0|964981247|
|     1|      6|   4.0|964982224|
|     1|     47|   5.0|964983815|
|     1|     50|   5.0|964982931|
+------+-------+------+---------+
only showing top 5 rows
+------+-------+---------------+----------+
|userId|movieId|            tag| timestamp|
+------+-------+---------------+----------+
|     2|  60756|          funny|1445714994|
| 

In [0]:
# convert timestamp column from unixtimestmap to actual timestamp 

ratings = ratings.withColumn("timestamp", from_unixtime("timestamp").cast(TimestampType()))
tags = tags.withColumn("timestamp", from_unixtime("timestamp").cast(TimestampType()))
tags.show(5)
ratings.show(5)

+------+-------+---------------+-------------------+
|userId|movieId|            tag|          timestamp|
+------+-------+---------------+-------------------+
|     2|  60756|          funny|2015-10-24 19:29:54|
|     2|  60756|Highly quotable|2015-10-24 19:29:56|
|     2|  60756|   will ferrell|2015-10-24 19:29:52|
|     2|  89774|   Boxing story|2015-10-24 19:33:27|
|     2|  89774|            MMA|2015-10-24 19:33:20|
+------+-------+---------------+-------------------+
only showing top 5 rows
+------+-------+------+-------------------+
|userId|movieId|rating|          timestamp|
+------+-------+------+-------------------+
|     1|      1|   4.0|2000-07-30 18:45:03|
|     1|      3|   4.0|2000-07-30 18:20:47|
|     1|      6|   4.0|2000-07-30 18:37:04|
|     1|     47|   5.0|2000-07-30 19:03:35|
|     1|     50|   5.0|2000-07-30 18:48:51|
+------+-------+------+-------------------+
only showing top 5 rows


1. Show Aggregated number of ratings per year

In [0]:
# Calculate the total number of ratings given in each year by extracting the year from the timestamp column.

yearly_number_of_ratings = ratings.groupBy(year(col('timestamp')).alias('year'))\
                                .agg(count("*").alias('total_count_yearly'))\
                                .orderBy(col('year').desc()).show(5)

+----+------------------+
|year|total_count_yearly|
+----+------------------+
|2018|              6418|
|2017|              8198|
|2016|              6703|
|2015|              6616|
|2014|              1439|
+----+------------------+
only showing top 5 rows


2. Show the average monthly number of ratings

In [0]:
# Calculate the total number of ratings given in each month by extracting the year-month from the timestamp column.

avg_monthly_rating = ratings.groupBy(date_format("timestamp","yyyy-MM").alias("year_month"))\
                                        .agg(round(avg('rating'),3).alias('avg_rating_monthly'))\
                                            .orderBy(col('year_month').desc()).show(5)

+----------+------------------+
|year_month|avg_rating_monthly|
+----------+------------------+
|   2018-09|             3.569|
|   2018-08|             3.558|
|   2018-07|              4.01|
|   2018-06|              3.98|
|   2018-05|             2.952|
+----------+------------------+
only showing top 5 rows


3. Show the rating levels distribution (1-2, 2-3.5, 3.5-4, 4-5)

In [0]:
# Analyze the distribution of rating values to understand how frequently each rating score is used.

# not actual ideal solution, just to learn and practice concepts
ratings_category = ratings.withColumn("Rating_Category", 
                   when((col('rating')>=0) & (col('rating')<1),"Very Low")\
                  .when((col('rating')>=1) & (col('rating')<2),"Low")\
                  .when((col('rating')>=2) & (col('rating')<3.5),"Medium")\
                  .when((col('rating')>=3.5) & (col('rating')<4),"High")\
                  .otherwise("Very High"))\
                      .groupBy(col('Rating_Category'))\
                            .agg(count('*').cast(IntegerType()).alias('rating_count'))\
                                .orderBy(col('rating_count').asc())
                                
# Create a window to sum over all categories
window = Window.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)

ratings_category = ratings_category.withColumn("percentage",
                                                           round((col("rating_count") / sum("rating_count")\
                                                               .over(window)) * 100, 2))

ratings_category.show(5)



+---------------+------------+----------+
|Rating_Category|rating_count|percentage|
+---------------+------------+----------+
|       Very Low|        1370|      1.36|
|            Low|        4602|      4.56|
|           High|       13136|     13.03|
|         Medium|       33148|     32.87|
|      Very High|       48580|     48.18|
+---------------+------------+----------+





In [0]:
# actual answer

ratings_category = ratings.withColumn("Rating_Category", 
                   when((col('rating')>=0) & (col('rating')<1),"Very Low")\
                  .when((col('rating')>=1) & (col('rating')<2),"Low")\
                  .when((col('rating')>=2) & (col('rating')<3.5),"Medium")\
                  .when((col('rating')>=3.5) & (col('rating')<4),"High")\
                  .otherwise("Very High"))\
                      .groupBy(col('Rating_Category'))\
                            .agg(count('*').cast(IntegerType()).alias('rating_count'))\
                                .orderBy(col('rating_count').asc())

total_count = ratings.count()

ratings_category = ratings_category.withColumn(
    "percentage",
    round((col("rating_count") / total_count) * 100, 2)
)

ratings_category.show()


+---------------+------------+----------+
|Rating_Category|rating_count|percentage|
+---------------+------------+----------+
|       Very Low|        1370|      1.36|
|            Low|        4602|      4.56|
|           High|       13136|     13.03|
|         Medium|       33148|     32.87|
|      Very High|       48580|     48.18|
+---------------+------------+----------+



4. Show the 18 movies that are tagged but not rated

In [0]:
# Identify movies that have been rated by users but never tagged.NULL tag values after a LEFT JOIN indicate missing tags.

df = tags.join((ratings),tags.movieId == ratings.movieId,'left')\
    .join(movies,tags.movieId == movies.movieId,'inner')\
    .drop(tags.userId,tags.timestamp,ratings.userId,ratings.movieId,movies.movieId,movies.genres)

df.filter(col('rating').isNull()).select(col('title')).distinct().show(18)

+--------------------+
|               title|
+--------------------+
|Roaring Twenties,...|
|Mutiny on the Bou...|
|        Proof (1991)|
|      Scrooge (1970)|
|In the Realms of ...|
|Road Home, The (W...|
|  Chalet Girl (2011)|
|I Know Where I'm ...|
|Color of Paradise...|
|Call Northside 77...|
|Browning Version,...|
|      Niagara (1953)|
|  Chosen, The (1981)|
|For All Mankind (...|
|Twentieth Century...|
|Parallax View, Th...|
|This Gun for Hire...|
|Innocents, The (1...|
+--------------------+



5. Show the movies that have rating but no tag

In [0]:
# Identify movies that have been rated by users but never tagged. NULL tag values after a LEFT JOIN indicate missing tags.

df = movies.join((ratings),movies.movieId == ratings.movieId ,'inner')\
    .join((tags),movies.movieId == tags.movieId ,'left')\
    .drop(movies.genres,ratings.userId,ratings.movieId,ratings.timestamp,tags.userId,tags.movieId,tags.timestamp)

df.filter(col('tag').isNull()).select('title','movieId').distinct().show(10)

+--------------------+-------+
|               title|movieId|
+--------------------+-------+
|Celluloid Closet,...|    581|
|Mission: Impossib...|   3623|
|The Golden Voyage...|   3771|
|Murder by Death (...|   5021|
|One-Eyed Jacks (1...|   5063|
|Walk on the Moon,...|   2570|
|Melvin and Howard...|   2988|
|In the Bedroom (2...|   4903|
|Beyond Rangoon (1...|    155|
|Madonna: Truth or...|   1191|
+--------------------+-------+
only showing top 10 rows


6. Focusing on the rated untagged movies with more than 30 user ratings, show the top 10 movies in terms of average rating and number of ratings.
    

In [0]:
# Focus on movies that have been rated by more than 30 users but have no tags. Compute average rating and number of users, then rank movies based on average rating.

df = movies.join((ratings),movies.movieId == ratings.movieId ,'inner')\
    .join((tags),movies.movieId == tags.movieId ,'left')\
        .drop(movies.genres,ratings.movieId,ratings.timestamp,tags.userId,tags.movieId,tags.timestamp)\
        .filter(col('tag').isNull())


df_2= df.groupBy('movieId','title')\
        .agg(
            round(avg(col('rating')),4).alias('avg_rating'),\
            countDistinct(col('userId')).alias('total_users')
            )\
            .filter(col('total_users') > 30)\
            .select('movieId','title','avg_rating','total_users')
            


window_avg = Window.orderBy(col('avg_rating').desc())
df_avg = df_2.withColumn('avg_rank', dense_rank().over(window_avg))

window_count = Window.orderBy(col('total_users').desc())
df_count = df_2.withColumn('count_rank', dense_rank().over(window_count))


top_movies = df_avg.join(df_count, df_avg.avg_rank == df_count.count_rank,'inner')\
            .filter(df_avg.avg_rank <= 10)\
                .select(df_avg.title,df_avg.avg_rating,df_avg.avg_rank,df_count.title,df_count.total_users,df_count.count_rank)
                
top_movies.show()




+--------------------+----------+--------+--------------------+-----------+----------+
|               title|avg_rating|avg_rank|               title|total_users|count_rank|
+--------------------+----------+--------+--------------------+-----------+----------+
|Boondock Saints, ...|    4.2209|       1|American Beauty (...|        204|         1|
|       Brazil (1985)|     4.178|       2|Ace Ventura: Pet ...|        161|         2|
|Cinema Paradiso (...|    4.1618|       3|    Mask, The (1994)|        157|         3|
|       Snatch (2000)|    4.1559|       4|     Die Hard (1988)|        145|         4|
|For a Few Dollars...|    4.1515|       5|Die Hard: With a ...|        144|         5|
|Lives of Others, ...|    4.1176|       6|Groundhog Day (1993)|        143|         6|
|  Toy Story 3 (2010)|    4.1091|       7|Dumb & Dumber (Du...|        133|         7|
|Boogie Nights (1997)|    4.0769|       8|    GoldenEye (1995)|        132|         8|
|Boogie Nights (1997)|    4.0769|       8|M

7. What is the average number of tags per movie in tagsDF? And the
average number of tags per user? How does it compare with the
average number of tags a user assigns to a movie?

In [0]:

# Calculate the average number of tags applied per user and the average number of tags received per movie.
# not optimal solution but for understanding concepts

# Calculate number of tags per user
df1 = tags.groupBy(col('userId'))\
        .agg(count('tag').alias('tags_per_user'))\
        .orderBy(col('tags_per_user').desc())                                      

# add global average of tags per user as a column
window_all = Window.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
df1= df1.withColumn('avg_tags_per_user',avg('tags_per_user').over(window_all))


# Calculate number of tags per movie
df2= tags.groupBy(col('movieId'))\
            .agg(count('tag').alias('tags_per_movie'))\
                .orderBy(col('tags_per_movie').desc())

# add global average of tags per movie as a column
windows_all = Window.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
df2 = df2.withColumn('avg_tags_per_movie',round((avg('tags_per_movie').over(window_all)),4))


# comparision df
avg_user_df = df1.select('avg_tags_per_user').distinct()
avg_movie_df = df2.select('avg_tags_per_movie').distinct()

comparison_df = avg_user_df.withColumn('key', lit(1))\
                    .join(avg_movie_df.withColumn('key',lit(1)),'key','inner')\
                        .drop('key')\
                            .withColumn('comparision_result',
                                        when(col('avg_tags_per_user')>col('avg_tags_per_movie'),"tags per user is high")\
                                        .when(col('avg_tags_per_user')<col('avg_tags_per_movie'),"tags per movie is high")\
                                        .otherwise(" Both tags per movie and tags per user are equal")
                                         )

comparison_df.show()                                  
                                        
                                       




+-----------------+------------------+--------------------+
|avg_tags_per_user|avg_tags_per_movie|  comparision_result|
+-----------------+------------------+--------------------+
|             63.5|            2.3429|tags per user is ...|
+-----------------+------------------+--------------------+



In [0]:
# Avg tags per user
avg_tags_per_user = (
    tags.groupBy("userId")
        .agg(count("tag").alias("tags_per_user"))
        .agg(avg("tags_per_user").alias("avg_tags_per_user"))
)

# Avg tags per movie
avg_tags_per_movie = (
    tags.groupBy("movieId")
        .agg(count("tag").alias("tags_per_movie"))
        .agg(avg("tags_per_movie").alias("avg_tags_per_movie"))
)

# comapre the two dataframes
comparison_df = avg_tags_per_user.withColumn('key', lit(1))\
                    .join(avg_tags_per_movie.withColumn('key',lit(1)),'key','inner')\
                        .drop('key')\
                        .withColumn('comparision_result',
                                        when(col('avg_tags_per_user')>col('avg_tags_per_movie'),"tags per user is high")\
                                        .when(col('avg_tags_per_user')<col('avg_tags_per_movie'),"tags per movie is high")\
                                        .otherwise(" Both tags per movie and tags per user are equal")
                                         )

comparison_df.show()


+-----------------+------------------+--------------------+
|avg_tags_per_user|avg_tags_per_movie|  comparision_result|
+-----------------+------------------+--------------------+
|             63.5|2.3428753180661577|tags per user is ...|
+-----------------+------------------+--------------------+



8. Identify the users that tagged movies without rating them


In [0]:
# Identify users who have tagged movies but never rated those same movies. This is done using a LEFT JOIN and filtering records with NULL ratings.

t = tags.alias("t")
r = ratings.alias("r")

result = t.join(r,(col("t.userId") == col("r.userId")) &(col("t.movieId") == col("r.movieId")),"left")\
    .filter(col("r.rating").isNull())\
    .select(col("t.userId").alias("userId"))\
    .distinct()


result.show()

+------+
|userId|
+------+
|   537|
|    21|
|   474|
|   193|
|   424|
|   341|
|   288|
|   573|
|   289|
|   435|
|   119|
|   606|
|   336|
|   477|
|   543|
|   318|
|   600|
+------+



9. What is the average number of ratings per user in ratings DF? And the
average number of ratings per movie?

In [0]:
# Compute the average number of ratings given per user and the average number of ratings received per movie.

# not optimal solution but for understanding concepts

# Calculate number of ratings per user
df1 = ratings.groupBy(col('userId'))\
        .agg(count('rating').alias('rating_per_user'))\
        .orderBy(col('rating_per_user').desc())   

window_all = Window.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
df1= df1.withColumn('avg_rating_per_user',avg('rating_per_user').over(window_all))


# Calculate number of ratings per movie
df2 = ratings.groupBy(col('movieId'))\
        .agg(count('rating').alias('rating_per_movie'))\
        .orderBy(col('rating_per_movie').desc())   

window_all = Window.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
df2= df2.withColumn('avg_rating_per_movie',avg('rating_per_movie').over(window_all)).select('avg_rating_per_movie').distinct()

# comparision df
avg_user_df = df1.select('avg_rating_per_user').distinct()
avg_movie_df = df2.select('avg_rating_per_movie').distinct()

comparison_df = avg_user_df.withColumn('key', lit(1))\
                    .join(avg_movie_df.withColumn('key',lit(1)),'key','inner')\
                        .drop('key')
comparison_df.show()




+-------------------+--------------------+
|avg_rating_per_user|avg_rating_per_movie|
+-------------------+--------------------+
| 165.30491803278687|  10.369806663924312|
+-------------------+--------------------+





In [0]:
# Avg ratings per user
avg_ratings_per_user = (
    ratings.groupBy("userId")\
        .agg(count("rating").alias("ratings_per_user"))\
            .agg(avg("ratings_per_user").alias("avg_ratings_per_user"))
            )
# Avg ratings per movie
avg_ratings_per_movie = (
    ratings.groupBy("movieId")\
        .agg(count('rating').alias('ratings_per_movie'))\
            .agg(avg('ratings_per_movie').alias('avg_ratings_per_movie'))    
)
# compare the two dataframes
comparison_df = avg_ratings_per_movie.withColumn('key', lit(1))\
                    .join(avg_ratings_per_user.withColumn('key',lit(1)),'key','inner')\
                        .drop('key')

comparison_df.show()

+---------------------+--------------------+
|avg_ratings_per_movie|avg_ratings_per_user|
+---------------------+--------------------+
|   10.369806663924312|  165.30491803278687|
+---------------------+--------------------+



10. What is the predominant (frequency based) genre per rating level?


In [0]:
# Determine the most frequent genre for each rating level. Genres are exploded to handle multi-genre movies, and dense_rank is used to handle ties correctly.

#create base
df = movies.join(ratings,movies.movieId == ratings.movieId,'inner').drop(ratings.timestamp,ratings.movieId)

# explode  genre column
df = df.withColumn("genre",explode(split(col("genres"), "\\|"))).drop('genres')

# Calculate How many times did each genre appear for each rating
genre_counts = (
    df.groupBy("rating", "genre")
      .agg(count("*").alias("genre_count"))
)

# Now we have to rank top genre for each group of rating based on highest genre_count
window_spec = Window.partitionBy(col('rating')).orderBy(col('genre_count').desc())
ranked_genres = genre_counts.withColumn(
    "rank",
    dense_rank().over(window_spec)
)

ranked_genres.filter(col("rank") == 1).orderBy(col('rating').desc()).select('rating','genre').show()


+------+------+
|rating| genre|
+------+------+
|   5.0| Drama|
|   4.5| Drama|
|   4.0| Drama|
|   3.5| Drama|
|   3.0|Comedy|
|   2.5|Comedy|
|   2.0|Comedy|
|   1.5|Comedy|
|   1.0|Comedy|
|   0.5|Comedy|
+------+------+



11. What is the predominant tag per genre ?

In [0]:
# Identify the most frequently used tag for each genre. Each row represents one tag applied to one genre, and dense_rank is used to select the predominant tag.

#create base
df = movies.join(tags,movies.movieId == tags.movieId,'inner').drop(tags.timestamp,tags.movieId)

# explode  genre column
df = df.withColumn("genre",explode(split(col("genres"), "\\|"))).drop('genres')


# Calculate How many times did a tag appear for each genre
tag_counts = (
    df.groupBy("genre","tag")
      .agg(count("*").alias("tag_count"))
)


# Now we have to rank top tag for each group of genre based on highest tag_count
window_spec = Window.partitionBy(col('genre')).orderBy(col('tag_count').desc())
ranked_tags = tag_counts.withColumn(
    "rank",
    dense_rank().over(window_spec)
)

ranked_tags.filter(col("rank") == 1).orderBy(col('genre').desc()).select('genre','tag').show()




+-----------+------------------+
|      genre|               tag|
+-----------+------------------+
|    Western|  In Netflix queue|
|        War|           Vietnam|
|   Thriller|       atmospheric|
|     Sci-Fi|            sci-fi|
|    Romance|  In Netflix queue|
|    Mystery|      twist ending|
|    Musical|            Disney|
|       IMAX| thought-provoking|
|       IMAX|visually appealing|
|       IMAX|            sci-fi|
|     Horror|            ghosts|
|     Horror|      Stephen King|
|  Film-Noir|  In Netflix queue|
|    Fantasy|            Disney|
|      Drama|  In Netflix queue|
|Documentary|  In Netflix queue|
|      Crime|  In Netflix queue|
|     Comedy|  In Netflix queue|
|   Children|            Disney|
|  Animation|            Disney|
+-----------+------------------+
only showing top 20 rows


12. what is the most tagged genres?

In [0]:
# Identify the genres that receive the highest number of tags overall, indicating the most actively tagged genres.


# most tagged genre
genre_tag_counts = (
    df.groupBy("genre")
      .agg(count("*").alias("total_tags"))
      .orderBy(col("total_tags").desc())
)

window_all = Window.orderBy(col("total_tags").desc())

most_tagged_genres = genre_tag_counts.withColumn("rank",
    dense_rank().over(window_all)
).filter(col("rank") == 1)

most_tagged_genres.select('genre').show()





+-----+
|genre|
+-----+
|Drama|
+-----+



13.What are the most predominant (popularity based) movies top 10


In [0]:
# Identify the most popular movies based on the number of distinct users who rated them, rather than average rating and use dense rank to rank them and select top 10.

df = movies.join(ratings,movies.movieId == ratings.movieId,"inner").drop(movies.genres,ratings.movieId,ratings.timestamp,ratings.rating)

user_counts = (
    df.groupBy("movieId","title")
      .agg(count("*").alias("total_user"))
      .orderBy(col("total_user").desc())
)

window_all = Window.orderBy(col("total_user").desc())

popular_movie = user_counts.withColumn("rank", dense_rank().over(window_all)).filter(col("rank") <= 10)
popular_movie.select('title','total_user').show()



+--------------------+----------+
|               title|total_user|
+--------------------+----------+
| Forrest Gump (1994)|       329|
|Shawshank Redempt...|       317|
| Pulp Fiction (1994)|       307|
|Silence of the La...|       279|
|  Matrix, The (1999)|       278|
|Star Wars: Episod...|       251|
|Jurassic Park (1993)|       238|
|   Braveheart (1995)|       237|
|Terminator 2: Jud...|       224|
|Schindler's List ...|       220|
+--------------------+----------+



14. Top 10 movies in terms of average rating (provided more than 30 users
reviewed them)

In [0]:
# Identify the most popular movies based on the average rating, and use dense rank to rank them and select top 10

df = ratings.join(user_counts,ratings.movieId == user_counts.movieId , 'inner').filter(user_counts.total_user>=30).drop(user_counts.movieId,ratings.timestamp)

df_avg = df.groupBy('movieId','title').agg(round(avg("rating"), 4).alias("avg_rating"))

window_spec = Window.orderBy(col('avg_rating').desc())

avg_rank = df_avg.withColumn("rank", dense_rank().over(window_spec)).filter(col("rank") <= 10)
avg_rank.show()





+-------+--------------------+----------+----+
|movieId|               title|avg_rating|rank|
+-------+--------------------+----------+----+
|    318|Shawshank Redempt...|     4.429|   1|
|   1204|Lawrence of Arabi...|       4.3|   2|
|    858|Godfather, The (1...|    4.2891|   3|
|   2959|   Fight Club (1999)|    4.2729|   4|
|   1276|Cool Hand Luke (1...|    4.2719|   5|
|    750|Dr. Strangelove o...|     4.268|   6|
|    904|  Rear Window (1954)|    4.2619|   7|
|   1221|Godfather: Part I...|    4.2597|   8|
|  48516|Departed, The (2006)|    4.2523|   9|
|   1267|Manchurian Candid...|      4.25|  10|
|   1213|   Goodfellas (1990)|      4.25|  10|
+-------+--------------------+----------+----+

