In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, countDistinct, isnan, when
from pyspark.sql.types import IntegerType, StringType
from pyspark.sql.functions import desc

Session creation

In [2]:
spark = SparkSession.builder.getOrCreate()

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/03/22 13:19:23 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Read the data

In [3]:
df = spark.read.csv("../data/title.principals.tsv", sep="\t", header=True, inferSchema=True)
df.show()

                                                                                

+---------+--------+---------+---------------+--------------------+--------------+
|   tconst|ordering|   nconst|       category|                 job|    characters|
+---------+--------+---------+---------------+--------------------+--------------+
|tt0000001|       1|nm1588970|           self|                  \N|      ["Self"]|
|tt0000001|       2|nm0005690|       director|                  \N|            \N|
|tt0000001|       3|nm0005690|       producer|            producer|            \N|
|tt0000001|       4|nm0374658|cinematographer|director of photo...|            \N|
|tt0000002|       1|nm0721526|       director|                  \N|            \N|
|tt0000002|       2|nm1335271|       composer|                  \N|            \N|
|tt0000003|       1|nm0721526|       director|                  \N|            \N|
|tt0000003|       2|nm0721526|         writer|                  \N|            \N|
|tt0000003|       3|nm1770680|       producer|            producer|            \N|
|tt0

Ensure proper dtypes

In [4]:
df = df.withColumn("ordering", df["ordering"].cast(IntegerType()))
df = df.withColumn("tconst", df["tconst"].cast(StringType()))
df = df.withColumn("nconst", df["nconst"].cast(StringType()))
df = df.withColumn("category", df["category"].cast(StringType()))
df = df.withColumn("job", df["job"].cast(StringType()))
df = df.withColumn("characters", df["characters"].cast(StringType()))

In [5]:
row_count = df.count()
print(f"Total Rows: {row_count}")



Total Rows: 91470119


                                                                                

In [5]:
df = df.replace("\\N", None)
nan_counts = df.select([count(when(col(c).isNull() | isnan(col(c)), c)).alias(c) for c in df.columns])
print("NaN Counts per Column:")
nan_counts.show()

NaN Counts per Column:




+------+--------+------+--------+--------+----------+
|tconst|ordering|nconst|category|     job|characters|
+------+--------+------+--------+--------+----------+
|     0|       0|     0|       0|74383897|  47161089|
+------+--------+------+--------+--------+----------+



                                                                                

As we can see, 'job' column has almost all values as NaNs, and the rest values are just duplicates of the 'category' column. As for the characters, we can't really do anything, cause only rows with 'category' == 'actor' have values.

In [6]:
unique_counts = df.select([countDistinct(c).alias(c) for c in df.columns])
print("Unique Values per Column:")
unique_counts.show()

Unique Values per Column:




+--------+--------+-------+--------+-----+----------+
|  tconst|ordering| nconst|category|  job|characters|
+--------+--------+-------+--------+-----+----------+
|10434121|      75|6618722|      13|44300|   4246175|
+--------+--------+-------+--------+-----+----------+



                                                                                

In [7]:
print("Summary Statistics:")
df.describe().show()

Summary Statistics:




+-------+---------+------------------+---------+--------+------------------+--------------------+
|summary|   tconst|          ordering|   nconst|category|               job|          characters|
+-------+---------+------------------+---------+--------+------------------+--------------------+
|  count| 91470119|          91470119| 91470119|91470119|          17086222|            44309030|
|   mean|     null|7.0169422759797655|     null|    null|1.2666666666666666|                null|
| stddev|     null| 5.156933336088902|     null|    null|0.6531972647421809|                null|
|    min|tt0000001|                 1|nm0000001|   actor|"A Box in Town" by|             ["!CF"]|
|    max|tt9916880|                75|nm9993718|  writer|          écrivain|["🐑 Sheepish Bys...|
+-------+---------+------------------+---------+--------+------------------+--------------------+



                                                                                

In [8]:
print("Sample Data:")
df.show(5)

Sample Data:
+---------+--------+---------+---------------+--------------------+----------+
|   tconst|ordering|   nconst|       category|                 job|characters|
+---------+--------+---------+---------------+--------------------+----------+
|tt0000001|       1|nm1588970|           self|                null|  ["Self"]|
|tt0000001|       2|nm0005690|       director|                null|      null|
|tt0000001|       3|nm0005690|       producer|            producer|      null|
|tt0000001|       4|nm0374658|cinematographer|director of photo...|      null|
|tt0000002|       1|nm0721526|       director|                null|      null|
+---------+--------+---------+---------------+--------------------+----------+
only showing top 5 rows



Uselessness of 'job' column was described earlier. Now, talking about 'ordering' column, it just provides us with the number of rows for the same title, which is useless info.

In [6]:
df = df.drop('job')

In [7]:
print("Sample Data:")
df.show(5)

Sample Data:
+---------+--------+---------+---------------+----------+
|   tconst|ordering|   nconst|       category|characters|
+---------+--------+---------+---------------+----------+
|tt0000001|       1|nm1588970|           self|  ["Self"]|
|tt0000001|       2|nm0005690|       director|      null|
|tt0000001|       3|nm0005690|       producer|      null|
|tt0000001|       4|nm0374658|cinematographer|      null|
|tt0000002|       1|nm0721526|       director|      null|
+---------+--------+---------+---------------+----------+
only showing top 5 rows



In [14]:
print('Value counts for values in category col:')
df.groupBy('category').count().orderBy('count').show()

Value counts for values in category col:




+-------------------+--------+
|           category|   count|
+-------------------+--------+
|      archive_sound|    9582|
|    archive_footage|  566267|
|   casting_director| 1074082|
|production_designer| 1096583|
|           composer| 2971811|
|    cinematographer| 3682411|
|             editor| 4829718|
|           producer| 6892690|
|           director| 7884233|
|             writer|10960338|
|               self|13210847|
|            actress|16422490|
|              actor|21869067|
+-------------------+--------+



                                                                                

In [12]:
print('Most Popular person(worker):')
df.groupBy("nconst").count().orderBy(desc("count")).show(15)

Most Popular person(worker):


25/03/16 15:40:03 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/03/16 15:40:04 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/03/16 15:40:04 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/03/16 15:40:04 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/03/16 15:40:05 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/03/16 15:40:05 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/03/16 15:40:05 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/03/16 15:40:05 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/03/16 15:40:05 WARN RowBasedKeyValueBatch: Calling spill() on

+---------+-----+
|   nconst|count|
+---------+-----+
|nm0438471|37862|
|nm0438506|31561|
|nm7370686|28893|
|nm8467983|28389|
|nm6352729|26027|
|nm0914844|25587|
|nm0251041|25370|
|nm1203430|22776|
|nm2273814|21678|
|nm5042664|20490|
|nm3203921|20409|
|nm0022172|19989|
|nm5262331|18313|
|nm2941131|18012|
|nm0784014|17410|
+---------+-----+
only showing top 15 rows



                                                                                

In [9]:
print('Distribution of Crew Members per Title:')
df.groupBy("tconst").count().orderBy(desc("count")).show(10)

Distribution of Crew Members per Title:


25/03/16 15:38:08 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/03/16 15:38:09 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/03/16 15:38:09 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/03/16 15:38:09 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/03/16 15:38:09 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/03/16 15:38:09 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/03/16 15:38:09 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/03/16 15:38:10 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/03/16 15:38:10 WARN RowBasedKeyValueBatch: Calling spill() on

+----------+-----+
|    tconst|count|
+----------+-----+
| tt0398022|   75|
| tt5659710|   69|
| tt1438495|   66|
| tt0298590|   65|
| tt0406599|   64|
| tt0365033|   62|
| tt1245530|   59|
| tt2074491|   59|
| tt1606806|   59|
|tt10093280|   59|
+----------+-----+
only showing top 10 rows



                                                                                

In [None]:
output_path = "../data/title.principals.cleaned2.tsv"
df.coalesce(1).write.option("header", True).option("sep", "\t").mode("overwrite").csv(output_path)

[Stage 7:>                                                          (0 + 1) / 1]

Business questions:
1. Identify the top 10 directors with the highest average movie rating for movies having at least 1,000 votes.

    Tables involved:
    title.ratings: Filter movies with at least 1,000 votes and join to get ratings.
    title.crew: Get director identifiers (using tconst).
    title.principals: Use to reinforce the link between movies and the people involved (e.g., director’s role, if needed).
    name.basics: Join to retrieve director names.
    Operations:
    Apply filters on vote counts.
    Join multiple tables on tconst and nconst.
    Group by director and calculate average ratings.
    Order by rating and use window functions to rank directors.
2. List the most prolific actors—those who have acted in over 50 movies—with their corresponding movie counts.

    Tables involved:
    title.principals: Focus on rows where category indicates “actor” or “actress”.
    name.basics: Join on nconst to retrieve actor names.
    Operations:
    Filter rows by actor categories.
    Group by actor (nconst) and count appearances.
    Apply a HAVING clause (or filter) to show only those with more than 50 credits.
    Order results by count in descending order.
3. For movies released after 2010 in the ‘Comedy’ genre, rank the top 5 movies based on average rating.

    Tables involved:
    title.basics: Filter movies by startYear and genres.
    title.ratings: Join to get average ratings and vote counts.
    title.principals: Can be used for additional filtering (e.g., ensuring there are principal cast members).
    Operations:
    Filter movies by release year and genre.
    Join with ratings.
    Use a window function to rank movies by average rating within the filtered set.
    Limit the results to the top 5.
4. For each movie, list the names of the actors ordered by a custom ranking (e.g., their first appearance order in the cast).

    Tables involved:
    title.principals: Use rows where category indicates “actor”/“actress”.
    name.basics: Join on nconst to get actor names.
    Operations:
    Filter for acting roles.
    Use window functions (such as ROW_NUMBER() or RANK()) partitioned by tconst to order actors within each movie.
    Join with names to display human-readable actor names.
    Optionally, group or aggregate if you want a single row per movie with a list of actors.
5. Identify TV series (titleType = 'tvSeries') where the episode count exceeds the overall average number of episodes per series, and list the series title with its total episode count.

    Tables involved:
    title.basics: Filter for TV series (titleType).
    title.episode: Join on tconst/parentTconst to count episodes.
    title.principals: Optionally join to get additional cast/crew information.
    Operations:
    Filter for TV series.
    Group by the TV series (parentTconst), count episodes.
    Use a window function to calculate the average number of episodes per series across the dataset.
    Filter to show only those series that exceed this average and join back to get the series title.
6. Which pairs of actors frequently appear together across different movies?
    Tables involved:
    title.principals
    name.basics
    Operations:
    Filter: Focus on title.principals entries for acting roles.
    Self-Join: Join the table on itself by tconst (for the same movie) with different nconst values.
    Group By: Group by the pair of actors and count the number of movies they co-appear in.
    Window Function: Optionally rank pairs by the number of shared appearances.