In [12]:
from pyspark.sql import SparkSession
from pyspark.sql import Window
import pyspark.sql.functions as F
from pyspark.sql.functions import col, desc, first, count, avg, when, array_contains, dense_rank
import os
from pyspark.sql import DataFrame
from typing import Dict
from config import Config

from schemas import (
    schema_title_basics,
    schema_title_episode,
    schema_title_crew,
    schema_title_akas,
    schema_title_ratings,
    schema_title_principals,
    schema_name_basics
)

from imdb_spark_utils import (
    initialize_spark,
    load_dataframe,
    transform_title_basics,
    transform_title_akas,
    transform_title_crew,
    transform_title_episode,
    display_dataframe_info,
    transform_title_ratings,
    transform_title_principals,
    transform_name_basics
)

spark_session = initialize_spark("IMDB Data Processing")


def check_pyspark() -> None:
    print("Initializing Spark Session...")
    spark = SparkSession.builder \
        .appName("IMDB Data Check") \
        .getOrCreate()

    print("Spark Session initialized.")
    print("Checking available files...")
    files = [f for f in os.listdir(Config.DATA_DIR) if f.endswith(Config.FILE_EXTENSION)]

    if not files:
        print("No TSV files found. Make sure data is downloaded and extracted.")
        return

    print("Found files:", files)

    sample_file = os.path.join(Config.DATA_DIR, files[0])
    print(f"Loading sample file: {sample_file}")

    df = spark.read.option("header", "true").option("sep", "\t").csv(sample_file)

    print("Schema of the loaded file:")
    df.printSchema()

    print("Showing first 5 rows:")
    df.show(5)

    print("PySpark check complete!")

    spark.stop()


def process_imdb_data() -> Dict[str, DataFrame]:
    dataframes = {}

    dataframes["basics"] = transform_title_basics(
        load_dataframe(spark_session, schema_title_basics, f"{Config.DATA_DIR}/title.basics{Config.FILE_EXTENSION}")
    )
    dataframes["akas"] = transform_title_akas(
        load_dataframe(spark_session, schema_title_akas, f"{Config.DATA_DIR}/title.akas{Config.FILE_EXTENSION}")
    )
    dataframes["crew"] = transform_title_crew(
        load_dataframe(spark_session, schema_title_crew, f"{Config.DATA_DIR}/title.crew{Config.FILE_EXTENSION}")
    )
    dataframes["episode"] = transform_title_episode(
        load_dataframe(spark_session, schema_title_episode, f"{Config.DATA_DIR}/title.episode{Config.FILE_EXTENSION}")
    )
    dataframes["ratings"] = transform_title_ratings(
        load_dataframe(spark_session, schema_title_ratings, f"{Config.DATA_DIR}/title.ratings{Config.FILE_EXTENSION}")
    )
    dataframes["principals"] = transform_title_principals(
        load_dataframe(spark_session, schema_title_principals, f"{Config.DATA_DIR}/title.principals{Config.FILE_EXTENSION}")
    )
    dataframes["name"] = transform_name_basics(
        load_dataframe(spark_session, schema_name_basics, f"{Config.DATA_DIR}/name.basics{Config.FILE_EXTENSION}")
    )

    for name, df in dataframes.items():
        display_dataframe_info(df, name)

    return dataframes


dataframes = process_imdb_data()


=== basics ===
First 5 rows:
+---------+---------+----------------------+----------------------+-------+---------+-------+--------------+----------------------------+
|tconst   |titleType|primaryTitle          |originalTitle         |isAdult|startYear|endYear|runtimeMinutes|genres                      |
+---------+---------+----------------------+----------------------+-------+---------+-------+--------------+----------------------------+
|tt0000001|short    |Carmencita            |Carmencita            |NULL   |1894     |NULL   |1             |[Documentary, Short]        |
|tt0000002|short    |Le clown et ses chiens|Le clown et ses chiens|NULL   |1892     |NULL   |5             |[Animation, Short]          |
|tt0000003|short    |Poor Pierrot          |Pauvre Pierrot        |NULL   |1892     |NULL   |5             |[Animation, Comedy, Romance]|
|tt0000004|short    |Un bon bock           |Un bon bock           |NULL   |1892     |NULL   |12            |[Animation, Short]          |
|tt0

In [16]:
from pyspark.sql import DataFrame
from pyspark.sql import functions as F
from pyspark.sql.window import Window

def export_result(df: DataFrame, result_path: str, show_rows: int = 20, title: str = None,
                  partition_column: str = None, num_partitions: int = None) -> None:
    print(f"Results for: {title}")
    df.show(show_rows, truncate=False)
    total_records = df.count()
    print(f"Total number of records in the result: {total_records}")

    for col_name, col_type in df.dtypes:
        if col_type.startswith("array"):
            df = df.withColumn(col_name, F.concat_ws(", ", F.col(col_name)))

    writer = df.write.option("header", "true")

    if partition_column and partition_column in df.columns:
        print(f"Exporting results with partitioning by column: {partition_column}")
        writer.partitionBy(partition_column).csv(result_path, mode="overwrite")
    elif num_partitions and num_partitions > 0:
        print(f"Exporting results with {num_partitions} partitions")
        df.repartition(num_partitions).write.option("header", "true").csv(result_path, mode="overwrite")
    else:
        target_size_mb = 128
        estimated_size_bytes = total_records * len(df.columns) * 100
        estimated_size_mb = estimated_size_bytes / (1024 * 1024)
        optimal_partitions = max(1, int(estimated_size_mb / target_size_mb))

        print(f"Exporting results with automatically determined {optimal_partitions} partitions")
        df.repartition(optimal_partitions).write.option("header", "true").csv(result_path, mode="overwrite")

def get_ukrainian_titles(akas: DataFrame) -> DataFrame:
    return akas.filter(F.col("language") == "uk") \
               .select("titleId", "title")

def get_long_movies_with_ranking(movie_basics: DataFrame, ratings: DataFrame) -> DataFrame:
    window_spec = Window.orderBy(F.desc("averageRating"))
    return movie_basics.filter(F.col("runtimeMinutes") > 120) \
        .join(ratings, on="tconst") \
        .withColumn("rank_among_long", F.dense_rank().over(window_spec)) \
        .select("primaryTitle", "runtimeMinutes", "averageRating", "rank_among_long")

def get_top_rated_movies(movie_basics: DataFrame, ratings: DataFrame) -> DataFrame:
    return ratings.filter((F.col("averageRating") > 8) & (F.col("numVotes") > 10000)) \
        .join(movie_basics, on="tconst") \
        .select("tconst", "primaryTitle", "averageRating", "numVotes")

def get_genre_avg_ratings(movie_basics: DataFrame, ratings: DataFrame) -> DataFrame:
    return movie_basics.join(ratings, on="tconst") \
        .withColumn("genre", F.explode("genres")) \
        .groupBy("genre") \
        .agg(
            F.count("*").alias("count"),
            F.avg("averageRating").alias("avg_rating")
        ) \
        .orderBy(F.desc("avg_rating"))

def get_top_productive_actors(principals: DataFrame, names: DataFrame) -> DataFrame:
    return principals.filter(F.col("category") == "actor") \
        .groupBy("nconst") \
        .agg(F.count("*").alias("film_count")) \
        .filter(F.col("film_count") >= 10) \
        .orderBy(F.desc("film_count")) \
        .limit(3) \
        .join(names, on="nconst") \
        .select("primaryName", "film_count")

def get_top_actors_in_high_rated_popular_movies(dataframes):

    high_rated_movies = dataframes["ratings"] \
        .filter((F.col("averageRating") > 8.0) & (F.col("numVotes") > 1000)) \
        .select("tconst")

    actors_in_good_movies = dataframes["principals"] \
        .filter(F.col("category") == "actor") \
        .join(high_rated_movies, on="tconst") \
        .groupBy("nconst") \
        .agg(F.count("*").alias("high_rating_appearances")) \
        .orderBy(F.desc("high_rating_appearances")) \
        .limit(10)

    top_actors_in_good_movies = actors_in_good_movies \
        .join(dataframes["name"], on="nconst") \
        .select("primaryName", "high_rating_appearances")
    return top_actors_in_good_movies



def run_all_analytics(dataframes: dict[str, DataFrame]) -> None:
    basics = dataframes["basics"]
    movie_basics = basics.filter(F.col("titleType") == "movie")

    export_result(get_top_actors_in_high_rated_popular_movies(dataframes),
              "results/top_actors_in_high_rated", show_rows=20)

    export_result(get_ukrainian_titles(dataframes["akas"]),
                  "results/ukrainian_titles", show_rows=20)

    export_result(get_long_movies_with_ranking(movie_basics, dataframes["ratings"]),
                  "results/long_movies_ranked", show_rows=20)

    export_result(get_top_rated_movies(movie_basics, dataframes["ratings"]),
                  "results/top_rated", show_rows=20)

    export_result(get_genre_avg_ratings(movie_basics, dataframes["ratings"]),
                  "results/genre_avg_rating", show_rows=20)

    export_result(get_top_productive_actors(dataframes["principals"], dataframes["name"]),
                  "results/top_actors_productive", show_rows=20)



run_all_analytics(dataframes)


Results for: None
+-----------------+-----------------------+
|primaryName      |high_rating_appearances|
+-----------------+-----------------------+
|Hank Azaria      |418                    |
|Matt Stone       |450                    |
|Trey Parker      |454                    |
|Dee Bradley Baker|751                    |
|Dan Castellaneta |414                    |
|John DiMaggio    |428                    |
|Tom Kenny        |431                    |
|Harry Shearer    |407                    |
|William Salyers  |383                    |
|Sam Marin        |434                    |
+-----------------+-----------------------+

Total number of records in the result: 10
Exporting results with automatically determined 1 partitions
Results for: None
+---------+---------------------+
|titleId  |title                |
+---------+---------------------+
|tt0009204|Дім зненависти       |
|tt0009204|Dim znenavysty       |
|tt0011278|Геліотроп            |
|tt0011439|Знак Зорро           |
|tt001