# Movie Data Transformation Pipeline

## Import Libraries

In [0]:
import json
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from functools import reduce
from pyspark.sql import DataFrame

## Create SparkSession

In [0]:
spark = SparkSession.builder \
    .appName("MovieDataTransformation") \
    .getOrCreate()

dbfs_path = "/dbfs/FileStore/tables/"

## Define Schema and Read JSON Files

In [0]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, ArrayType

def read_json(file_path):
    schema = StructType([
        StructField("Id", IntegerType(), True),
        StructField("Title", StringType(), True),
        StructField("Overview", StringType(), True),
        StructField("Tagline", StringType(), True),
        StructField("Budget", DoubleType(), True),
        StructField("Revenue", DoubleType(), True),
        StructField("ImdbUrl", StringType(), True),
        StructField("TmdbUrl", StringType(), True),
        StructField("PosterUrl", StringType(), True),
        StructField("BackdropUrl", StringType(), True),
        StructField("OriginalLanguage", StringType(), True),
        StructField("ReleaseDate", StringType(), True),
        StructField("RunTime", IntegerType(), True),
        StructField("Price", DoubleType(), True),
        StructField("CreatedDate", StringType(), True),
        StructField("UpdatedDate", StringType(), True),
        StructField("UpdatedBy", StringType(), True),
        StructField("CreatedBy", StringType(), True),
        StructField("genres", ArrayType(
            StructType([
                StructField("id", IntegerType(), True),
                StructField("name", StringType(), True)
            ])
        ), True)
    ])
    
    with open(file_path, 'r') as f:
        data = json.load(f)
    return spark.createDataFrame(data["movie"], schema=schema)


movie_files = ["movie_0.json", "movie_1.json", "movie_2.json", "movie_3.json",
               "movie_4.json", "movie_5.json", "movie_6.json", "movie_7.json"]

dataframes = [read_json(dbfs_path + file) for file in movie_files]

## Data Cleaning Functions

In [0]:
def clean_data(df):
    quarantined_df = df.filter(df["RunTime"] < 0)
    clean_df = df.filter(df["RunTime"] >= 0)
    clean_df = clean_df.withColumn("Quarantined", df["RunTime"] < 0)
    clean_df = clean_df.withColumn("RunTime", F.abs(df["RunTime"]))
    clean_df = clean_df.withColumn("Budget", F.when(df["Budget"] < 1000000, 1000000).otherwise(df["Budget"]))
    
    return clean_df, quarantined_df

def save_as_delta(df, table_name):
    delta_path = dbfs_path + table_name

    try:
        dbutils.fs.rm(delta_path, recurse=True)
    except Exception as e:
        print(f"Error while removing existing folder: {e}")
    
    df.write.format("delta").mode("overwrite").save(delta_path)

def bronze_to_silver(dataframes):
    silver_tables = []
    quarantined_records = []

    for i, df in enumerate(dataframes):
        clean_df, quarantined_df = clean_data(df)
        silver_tables.append(clean_df)
        quarantined_records.append(quarantined_df)

        save_as_delta(clean_df, f"silver_table_{i}")

    combined_quarantined_df = quarantined_records[0]
    for q_df in quarantined_records[1:]:
        combined_quarantined_df = combined_quarantined_df.union(q_df)
    save_as_delta(combined_quarantined_df, "quarantined_records")

    return silver_tables



## Lookup Tables

In [0]:
def create_lookup_tables(dataframes):
    genres = set()
    languages = set()

    for df in dataframes:
        if df.count() > 0:
            genres.update([genre["id"] for row in df.select(F.explode("genres")).distinct().collect() for genre in row["col"] if isinstance(row["col"], list)])
            languages.update(df.select("OriginalLanguage").distinct().rdd.flatMap(lambda x: x).collect())

    if not genres:
        genres.add(0)
    if not languages:
        languages.add("Unknown")

    genres_df = spark.createDataFrame([(genre,) for genre in sorted(genres)], ["GenreId"])
    languages_df = spark.createDataFrame([(lang,) for lang in sorted(languages)], ["Language"])

    save_as_delta(genres_df, "genres_lookup")
    save_as_delta(languages_df, "languages_lookup")

    return genres_df, languages_df

## Fix Missing Genre Names

In [0]:
from pyspark.sql.functions import explode, struct

def fix_missing_genre_names(df, genres_lookup):
    df = df.alias("movies")
    genres_lookup = genres_lookup.alias("lookup")

    exploded_df = df.withColumn("exploded_genres", explode("genres"))
    joined_df = exploded_df.join(
        genres_lookup,
        exploded_df["exploded_genres.id"] == genres_lookup["GenreId"],
        "left"
    )

    fixed_genres = joined_df.withColumn(
        "genres",
        struct(
            "exploded_genres.id",
            F.coalesce("exploded_genres.name", F.col("lookup.GenreId").cast("string"))
        )
    ).drop("exploded_genres")

    return fixed_genres.groupBy(*df.columns).agg(F.collect_list("genres").alias("genres"))


## Handle Duplicates

In [0]:
def handle_duplicates(silver_tables):
    # Assuming the 'status' column is present to indicate new or loaded records
    unique_records = []
    for df in silver_tables:
        df = df.withColumn("status", F.lit("loaded"))
        unique_records.append(df)

    combined_df = reduce(lambda x, y: x.union(y), unique_records)
    deduplicated_df = combined_df.dropDuplicates(["Id"])
    return deduplicated_df


## Execute Pipeline

In [0]:
genres_lookup, languages_lookup = create_lookup_tables(dataframes)
fixed_silver_tables = [fix_missing_genre_names(df, genres_lookup) for df in silver_tables if df.count() > 0]
deduplicated_df = handle_duplicates(fixed_silver_tables)

## Save Final Silver Tables

In [0]:
from pyspark.sql.functions import explode, col

# Check if the "genres" column exists in the DataFrame
if "genres" in deduplicated_df.columns:
    # Flatten the "genres" column
    deduplicated_df = deduplicated_df.withColumn("genres", explode(col("genres")))
    deduplicated_df = deduplicated_df.select("*", col("genres.id").alias("genre_id"), col("genres.name").alias("genre_name")).drop("genres")
else:
    print("The 'genres' column does not exist in the DataFrame.")

# Save the flattened DataFrame as a Delta table
save_as_delta(deduplicated_df, "silver_movie_table")
save_as_delta(genres_lookup, "silver_genres_lookup")
save_as_delta(languages_lookup, "silver_languages_lookup")

The 'genres' column does not exist in the DataFrame.
