In [31]:
import findspark
findspark.init()

In [32]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("MovieTrendsAnalysis").getOrCreate()

In [33]:
from data_ingestion import Session
from data_cleaning import CleanData
from data_transformation import TransformData

datasets = Session().load_data(spark)
datasets = CleanData().clean_data(datasets)
datasets = TransformData().transform_data(datasets)


movies_df = datasets.get("movies_df")
ratings_df = datasets.get("ratings_df")
tags_df = datasets.get("tags_df")
links_df = datasets.get("links_df")
movies_df.printSchema()
ratings_df.printSchema()
tags_df.printSchema()
links_df.printSchema()

root
 |-- movieId: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- genres: string (nullable = true)

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: double (nullable = true)
 |-- timestamp: string (nullable = true)

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- tag: string (nullable = true)
 |-- timestamp: integer (nullable = true)

root
 |-- movieId: integer (nullable = true)
 |-- imdbId: integer (nullable = true)
 |-- tmdbId: integer (nullable = true)



In [4]:
from pyspark.sql import functions as f

In [5]:
# Checking/Viewing the dataframes
movies_df.show(5)
ratings_df.show(5)
links_df.show(5)
tags_df.show(5)

+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|
|      5|Father of the Bri...|              Comedy|
+-------+--------------------+--------------------+
only showing top 5 rows

+------+-------+------+-------------------+
|userId|movieId|rating|          timestamp|
+------+-------+------+-------------------+
|     1|      1|   4.0|31-07-2000 00:15:03|
|     1|      3|   4.0|30-07-2000 23:50:47|
|     1|      6|   4.0|31-07-2000 00:07:04|
|     1|     47|   5.0|31-07-2000 00:33:35|
|     1|     50|   5.0|31-07-2000 00:18:51|
+------+-------+------+-------------------+
only showing top 5 rows

+-------+------+------+
|movieId|imdbId|tmdbId|
+-------+------+------+
|      1|11470

## ANALYSIS

In [6]:
#  Count the total number of movies in the dataset.
movies_count = movies_df.select(f.col("title")).count()
print(movies_count)

9742


In [7]:
# Calculate the average rating for each movie.
joined_movie_rating_df = movies_df.join(ratings_df, on="movieId")
avg_rating = joined_movie_rating_df.groupBy(f.col("movieId"), f.col("title")).agg(
    f.format_number(f.avg(f.col("rating")), 2).alias("AvgRating")
)
avg_rating.show()

+-------+--------------------+---------+
|movieId|               title|AvgRating|
+-------+--------------------+---------+
|   2657|Rocky Horror Pict...|     3.34|
|   2076|  Blue Velvet (1986)|     3.95|
|    493|Menace II Society...|     3.67|
|    881|    First Kid (1996)|     2.10|
|    442|Demolition Man (1...|     3.09|
|   6548|  Bad Boys II (2003)|     2.88|
| 141688|       Legend (2015)|     3.50|
|   2171|Next Stop Wonderl...|     3.31|
|   4085|Beverly Hills Cop...|     3.40|
|  45447|Da Vinci Code, Th...|     3.12|
|  38886|Squid and the Wha...|     3.44|
| 142997|Hotel Transylvani...|     2.88|
|   6059| Recruit, The (2003)|     3.35|
|    938|         Gigi (1958)|     3.25|
|  96373|       Broken (2012)|     4.50|
| 134849|   Duck Amuck (1953)|     4.00|
|  97172|Frankenweenie (2012)|     2.50|
| 161966|         Elle (2016)|     4.00|
|   5563|City by the Sea (...|     2.83|
|   2946|        Help! (1965)|     3.36|
+-------+--------------------+---------+
only showing top

In [8]:
# Determine the total number of ratings for each movie.
rating_cnt = joined_movie_rating_df.groupBy(f.col("movieId"), f.col("title")).agg(
    f.count(f.col("rating")).alias("ratingCnt")
    )
rating_cnt.show()

+-------+--------------------+---------+
|movieId|               title|ratingCnt|
+-------+--------------------+---------+
|   2657|Rocky Horror Pict...|       63|
|   2076|  Blue Velvet (1986)|       46|
|    493|Menace II Society...|       12|
|    881|    First Kid (1996)|        5|
|    442|Demolition Man (1...|       81|
|   6548|  Bad Boys II (2003)|       17|
| 141688|       Legend (2015)|        2|
|   2171|Next Stop Wonderl...|        8|
|   4085|Beverly Hills Cop...|       46|
|  45447|Da Vinci Code, Th...|       49|
|  38886|Squid and the Wha...|        9|
| 142997|Hotel Transylvani...|        4|
|   6059| Recruit, The (2003)|       17|
|    938|         Gigi (1958)|        6|
|  96373|       Broken (2012)|        1|
| 134849|   Duck Amuck (1953)|        1|
|  97172|Frankenweenie (2012)|        2|
| 161966|         Elle (2016)|        1|
|   5563|City by the Sea (...|        3|
|   2946|        Help! (1965)|        7|
+-------+--------------------+---------+
only showing top

In [9]:
# Calculate the average rating given by each user.
avg_rating_by_user = ratings_df.groupBy(f.col("userId")).agg(
    f.format_number(f.avg("rating"), 2).alias("UserAvgRating")
).orderBy(f.col("userId"))
avg_rating_by_user.show()

+------+-------------+
|userId|UserAvgRating|
+------+-------------+
|     1|         4.37|
|     2|         3.95|
|     3|         2.44|
|     4|         3.56|
|     5|         3.64|
|     6|         3.49|
|     7|         3.23|
|     8|         3.57|
|     9|         3.26|
|    10|         3.28|
|    11|         3.78|
|    12|         4.39|
|    13|         3.65|
|    14|         3.40|
|    15|         3.45|
|    16|         3.72|
|    17|         4.21|
|    18|         3.73|
|    19|         2.61|
|    20|         3.59|
+------+-------------+
only showing top 20 rows



In [10]:
# Calculate the total number of movies each genre has.
separated_genre = movies_df.withColumn("genres", f.explode(f.split(f.col("genres"), "\\|")))
movie_cnt_per_genre = separated_genre.groupBy(f.col("genres")).agg(
    f.count(f.col("title")).alias("MovieCnt")
)
movie_cnt_per_genre.show()


+------------------+--------+
|            genres|MovieCnt|
+------------------+--------+
|             Crime|    1199|
|           Romance|    1596|
|          Thriller|    1894|
|         Adventure|    1263|
|             Drama|    4361|
|               War|     382|
|       Documentary|     440|
|           Fantasy|     779|
|           Mystery|     573|
|           Musical|     334|
|         Animation|     611|
|         Film-Noir|      87|
|(no genres listed)|      34|
|              IMAX|     158|
|            Horror|     978|
|           Western|     167|
|            Comedy|    3756|
|          Children|     664|
|            Action|    1828|
|            Sci-Fi|     980|
+------------------+--------+



In [11]:
tags_df.show(5)

+------+-------+---------------+----------+
|userId|movieId|            tag| timestamp|
+------+-------+---------------+----------+
|     2|  60756|          funny|1445714994|
|     2|  60756|Highly quotable|1445714996|
|     2|  60756|   will ferrell|1445714992|
|     2|  89774|   Boxing story|1445715207|
|     2|  89774|            MMA|1445715200|
+------+-------+---------------+----------+
only showing top 5 rows



In [12]:
# Find the average number of tags per movie.
avg_tags_per_movie = tags_df.groupBy(f.col("movieId")).agg(
    f.count(f.col("tag")).alias("tag_cnt")).agg(
    f.format_number(f.avg(f.col("tag_cnt")), 2).alias("AvgTagPerMovie")
)
avg_tags_per_movie.show()

+--------------+
|AvgTagPerMovie|
+--------------+
|          2.34|
+--------------+



In [13]:
# Identify the top 10 highest-rated movies.
top10_rated_movies = ratings_df.groupBy(f.col("movieId")).agg(
    f.format_number(f.avg(f.col("rating")), 2).alias("avgRating")
    ).orderBy(f.col("avgRating").desc(), f.col("movieId")).limit(10)

top10_rated_movies = top10_rated_movies.join(movies_df, on="movieId").select(
    f.col("movieId"), f.col("avgRating"), f.col("title")
    )
top10_rated_movies.show()

+-------+---------+--------------------+
|movieId|avgRating|               title|
+-------+---------+--------------------+
|     53|     5.00|     Lamerica (1994)|
|     99|     5.00|Heidi Fleiss: Hol...|
|    148|     5.00|Awfully Big Adven...|
|    467|     5.00|Live Nude Girls (...|
|    495|     5.00|In the Realm of t...|
|    496|     5.00|What Happened Was...|
|    626|     5.00|Thin Line Between...|
|    633|     5.00|Denise Calls Up (...|
|    876|     5.00|Supercop 2 (Proje...|
|   1140|     5.00|Entertaining Ange...|
+-------+---------+--------------------+



In [14]:
# List the top 5 movies with the most tags.
top5_most_tags = tags_df.groupBy(f.col("movieId")).agg(
    f.count("tag").alias("tag_cnt")
).orderBy(f.col("tag_cnt").desc()).limit(5)

top5_most_tags = top5_most_tags.join(movies_df, on="movieId", how="left_outer").select(
    f.col("movieId"), f.col("tag_cnt"), f.col("title")
)
top5_most_tags.show()

+-------+-------+--------------------+
|movieId|tag_cnt|               title|
+-------+-------+--------------------+
|    296|    181| Pulp Fiction (1994)|
|   2959|     54|   Fight Club (1999)|
|    924|     41|2001: A Space Ody...|
|    293|     35|Léon: The Profess...|
|   7361|     34|Eternal Sunshine ...|
+-------+-------+--------------------+



In [15]:
# Determine the most popular genre based on the number of movies.
popular_genre = movies_df.withColumn("genres", f.explode(f.split(f.col("genres"), "\\|"))).groupBy(f.col("genres")).agg(
    f.count(f.col("movieId")).alias("movie_cnt")
).orderBy(f.col("movie_cnt").desc()).limit(1).withColumnRenamed("genres", "genre")
popular_genre.show()


+-----+---------+
|genre|movie_cnt|
+-----+---------+
|Drama|     4361|
+-----+---------+



In [16]:
# Identify the movies that have been rated by the most number of users.
ratings_by_most_users = ratings_df.groupBy(f.col("movieId")).agg(
    f.count(f.col("userId")).alias("userCnt")
).orderBy(f.col("userCnt").desc())
ratings_by_most_users = ratings_by_most_users.join(movies_df, on="movieId", how="left_outer").select(
    f.col("movieId"), f.col("userCnt"), f.col("title")
    )
ratings_by_most_users.show(5)

+-------+-------+--------------------+
|movieId|userCnt|               title|
+-------+-------+--------------------+
|   1580|    165|Men in Black (a.k...|
|   2366|     25|    King Kong (1933)|
|   3175|     75| Galaxy Quest (1999)|
|   1088|     42|Dirty Dancing (1987)|
|  32460|      4|Knockin' on Heave...|
+-------+-------+--------------------+
only showing top 5 rows



In [17]:
# List movies that have been tagged by more than 10 different users.
rating_cnt_by_user = ratings_df.groupBy(f.col("movieId")).agg(
    f.count(f.col("userId")).alias("userCnt")
)
ratingCnt_moreThan10 = rating_cnt_by_user.filter(f.col("userCnt")>10).join(movies_df, on="movieId", how="left_outer").select(
    f.col("movieId"), f.col("title"), f.col("userCnt")
    )
ratingCnt_moreThan10.show()

+-------+--------------------+-------+
|movieId|               title|userCnt|
+-------+--------------------+-------+
|   1580|Men in Black (a.k...|    165|
|   2366|    King Kong (1933)|     25|
|   3175| Galaxy Quest (1999)|     75|
|   1088|Dirty Dancing (1987)|     42|
|  44022|Ice Age 2: The Me...|     23|
|   1342|     Candyman (1992)|     11|
|   1591|        Spawn (1997)|     26|
|   1645|The Devil's Advoc...|     51|
|    471|Hudsucker Proxy, ...|     40|
|   3997|Dungeons & Dragon...|     12|
|   1959|Out of Africa (1985)|     15|
|   2122|Children of the C...|     11|
|   6620|American Splendor...|     18|
|  54190|Across the Univer...|     11|
|   8638|Before Sunset (2004)|     15|
|   1025|Sword in the Ston...|     25|
|   1127|   Abyss, The (1989)|     62|
|   2387|Very Bad Things (...|     16|
|   2580|           Go (1999)|     39|
|   1084|Bonnie and Clyde ...|     35|
+-------+--------------------+-------+
only showing top 20 rows



In [18]:
# Find the number of unique users who rated movies.
unique_users_ratingCnt = ratings_df.agg(
    f.count_distinct(f.col("userId")).alias("uniqueUsers_rating")
    )
unique_users_ratingCnt.show()

+------------------+
|uniqueUsers_rating|
+------------------+
|               610|
+------------------+



In [19]:
# Identify users who have rated more than 50 movies.
movies_rated_byUser_moreThan50 = ratings_df.groupBy(f.col("userId")).agg(
    f.count(f.col("movieId")).alias("movies_rated_byUser")
).filter(f.col("movies_rated_byUser")>50)
movies_rated_byUser_moreThan50.show()

+------+-------------------+
|userId|movies_rated_byUser|
+------+-------------------+
|   137|                141|
|   580|                436|
|   458|                 59|
|   588|                 56|
|    78|                 61|
|   322|                107|
|   321|                 56|
|   362|                109|
|   593|                103|
|   597|                443|
|   108|                 76|
|    34|                 86|
|   211|                 89|
|   368|                469|
|   101|                 61|
|   115|                112|
|   385|                201|
|    28|                570|
|   183|                 57|
|   210|                138|
+------+-------------------+
only showing top 20 rows



In [20]:
# Most Active Users
user_activity = ratings_df.groupby(f.col("userId")).count().orderBy(f.col("count").desc()).limit(10)
user_activity.show()

+------+-----+
|userId|count|
+------+-----+
|   414| 2698|
|   599| 2478|
|   474| 2108|
|   448| 1864|
|   274| 1346|
|   610| 1302|
|    68| 1260|
|   380| 1218|
|   606| 1115|
|   288| 1055|
+------+-----+



In [21]:
# Calculate the average rating given by each user.
avg_rating_by_user = ratings_df.groupBy(f.col("userId")).agg(
    f.format_number(f.avg(f.col("rating")), 2).alias("avgRating")
    )
avg_rating_by_user.show()

+------+---------+
|userId|avgRating|
+------+---------+
|   148|     3.74|
|   463|     3.79|
|   471|     3.88|
|   496|     3.41|
|   243|     4.14|
|   392|     3.20|
|   540|     4.00|
|    31|     3.92|
|   516|     3.69|
|    85|     3.71|
|   137|     3.98|
|   251|     4.87|
|   451|     3.79|
|   580|     3.53|
|    65|     4.03|
|   458|     4.15|
|    53|     5.00|
|   255|     2.57|
|   481|     2.81|
|   588|     3.25|
+------+---------+
only showing top 20 rows



In [22]:
# Find the most recent movie rating based on the timestamp.
recent_rating = ratings_df.withColumn("timestamp", f.from_unixtime(f.col("timestamp"))).withColumn(
    "timestamp", f.date_format("timestamp", "dd-MM-yyyy HH:mm:ss")
    ).orderBy(f.col("timestamp").desc()).limit(1)
recent_rating = recent_rating.join(movies_df, on="movieId", how="left_outer")
recent_rating.show()

+-------+------+------+---------+----------------+--------------------+
|movieId|userId|rating|timestamp|           title|              genres|
+-------+------+------+---------+----------------+--------------------+
|      1|     1|   4.0|     NULL|Toy Story (1995)|Adventure|Animati...|
+-------+------+------+---------+----------------+--------------------+



In [23]:
# Determine the movie with the highest number of tags.
high_tag_cnt = tags_df.groupBy(f.col("movieId")).agg(
    f.count(f.col("tag")).alias("tag_cnt")
).orderBy(f.col("tag_cnt").desc()).limit(1)
high_tag_cnt = high_tag_cnt.join(movies_df, on="movieId", how="left_outer")
high_tag_cnt.show()

+-------+-------+-------------------+--------------------+
|movieId|tag_cnt|              title|              genres|
+-------+-------+-------------------+--------------------+
|    296|    181|Pulp Fiction (1994)|Comedy|Crime|Dram...|
+-------+-------+-------------------+--------------------+



In [24]:
# Find the most common tags used across the dataset.
most_common_tags = movies_df.withColumn("genres", f.explode(f.split(f.col("genres"), "\\|"))).groupBy(f.col("genres")).agg(
    f.count(f.col("genres")).alias("genre_cnt")
).orderBy(f.col("genre_cnt").desc())
most_common_tags.show()

+------------------+---------+
|            genres|genre_cnt|
+------------------+---------+
|             Drama|     4361|
|            Comedy|     3756|
|          Thriller|     1894|
|            Action|     1828|
|           Romance|     1596|
|         Adventure|     1263|
|             Crime|     1199|
|            Sci-Fi|      980|
|            Horror|      978|
|           Fantasy|      779|
|          Children|      664|
|         Animation|      611|
|           Mystery|      573|
|       Documentary|      440|
|               War|      382|
|           Musical|      334|
|           Western|      167|
|              IMAX|      158|
|         Film-Noir|       87|
|(no genres listed)|       34|
+------------------+---------+



In [25]:
# Calculate the average rating per genre.
joined_movie_rating_df = movies_df.join(ratings_df, on="movieId", how="left_outer")
genre_avg_rating = joined_movie_rating_df.withColumn("genres", f.explode(f.split(f.col("genres"), "\\|"))).groupBy(f.col("genres")).agg(
    f.format_number(f.avg(f.col("rating")), 2).alias("avgRating")
)
genre_avg_rating.show()


+------------------+---------+
|            genres|avgRating|
+------------------+---------+
|             Crime|     3.66|
|           Romance|     3.51|
|          Thriller|     3.49|
|         Adventure|     3.51|
|             Drama|     3.66|
|               War|     3.81|
|       Documentary|     3.80|
|           Fantasy|     3.49|
|           Mystery|     3.63|
|           Musical|     3.56|
|         Animation|     3.63|
|         Film-Noir|     3.92|
|(no genres listed)|     3.49|
|              IMAX|     3.62|
|            Horror|     3.26|
|           Western|     3.58|
|            Comedy|     3.38|
|          Children|     3.41|
|            Action|     3.45|
|            Sci-Fi|     3.46|
+------------------+---------+



In [26]:
# Determine the distribution of ratings (e.g., how many movies received each rating).
movie_cnt_per_rating = ratings_df.groupBy(f.col("rating")).count().orderBy(f.col("rating"))
movie_cnt_per_rating.show()

+------+-----+
|rating|count|
+------+-----+
|   0.5| 1370|
|   1.0| 2811|
|   1.5| 1791|
|   2.0| 7551|
|   2.5| 5550|
|   3.0|20047|
|   3.5|13136|
|   4.0|26818|
|   4.5| 8551|
|   5.0|13211|
+------+-----+



In [27]:
# Identify the movies that have the highest and lowest IMDb and TMDb IDs.
min_max_imdb_tmdb = links_df.agg(
    f.min(f.col("imdbId")).alias("IMDB-min"),
    f.max(f.col("imdbId")).alias("IMDB-max"),
    f.min(f.col("tmdbId")).alias("TMDB-min"),
    f.max(f.col("tmdbId")).alias("TMDB-max")
)
min_max_imdb_tmdb.show()

+--------+--------+--------+--------+
|IMDB-min|IMDB-max|TMDB-min|TMDB-max|
+--------+--------+--------+--------+
|     417| 8391976|       2|  525662|
+--------+--------+--------+--------+



In [28]:
imbd_links = links_df.withColumn("imdbLink", f.concat(
    f.lit("http://www.imdb.com/title/tt"), f.lpad(f.col("imdbId").cast("string"), 7, "0")
))
# imbd_links.show(truncate=False)

tmdb_links = links_df.withColumn("tmdbLink", f.concat(
    f.lit("https://www.themoviedb.org/movie/"), f.col("tmdbId").cast("string"))
    )
tmdb_links.show(truncate=False)

+-------+------+------+--------------------------------------+
|movieId|imdbId|tmdbId|tmdbLink                              |
+-------+------+------+--------------------------------------+
|1      |114709|862   |https://www.themoviedb.org/movie/862  |
|2      |113497|8844  |https://www.themoviedb.org/movie/8844 |
|3      |113228|15602 |https://www.themoviedb.org/movie/15602|
|4      |114885|31357 |https://www.themoviedb.org/movie/31357|
|5      |113041|11862 |https://www.themoviedb.org/movie/11862|
|6      |113277|949   |https://www.themoviedb.org/movie/949  |
|7      |114319|11860 |https://www.themoviedb.org/movie/11860|
|8      |112302|45325 |https://www.themoviedb.org/movie/45325|
|9      |114576|9091  |https://www.themoviedb.org/movie/9091 |
|10     |113189|710   |https://www.themoviedb.org/movie/710  |
|11     |112346|9087  |https://www.themoviedb.org/movie/9087 |
|12     |112896|12110 |https://www.themoviedb.org/movie/12110|
|13     |112453|21032 |https://www.themoviedb.org/movie

In [34]:
spark.stop()