In [0]:
#Libraries management
import dlt
# from pyspark import pipelines as dp
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window

In [0]:
spark.sql("USE CATALOG `workspace`")
spark.sql("USE SCHEMA `imdb_data_analysis`")

## Load DIM_PERSON

In [0]:
# =============================================================================
# GOLD LAYER: dim_person
# =============================================================================
# Description: Person dimension containing cast and crew personnel
# Source: silver_imdb_name_basics
# Grain: One row per unique person (nconst)
# Load Type: Batch (industry standard for dimensions)
# =============================================================================

from pyspark.sql.functions import (
    col, current_timestamp, count, first, abs, hash
)

@dlt.table(
    name="dim_person",
    comment="Gold layer: Person dimension - cast and crew personnel with surrogate keys",
    table_properties={
        "quality": "gold",
        "layer": "gold",
        "domain": "imdb"
    }
)
@dlt.expect_or_drop("valid_nconst", "nconst IS NOT NULL")
@dlt.expect_or_drop("valid_name", "primary_name IS NOT NULL")
def gold_dim_person():
    
    # Read from Silver layer as batch (standard for dimensions)
    df = spark.read.table("LIVE.silver_imdb_name_basics")
    
    # Aggregate to one row per person
    df_person = df.groupBy("nconst").agg(
        first("primaryName").alias("primary_name"),
        first("birthYear").alias("birth_year"),
        first("deathYear").alias("death_year"),
        count("profession").alias("profession_count")
    )
    
    # Generate surrogate key using hash (deterministic)
    df_person = df_person.withColumn(
        "person_key",
        # abs(hash(col("nconst")))
        monotonically_increasing_id() + 1
    )
    
    # Add audit column
    df_person = df_person.withColumn("etl_load_timestamp", current_timestamp())
    
    # Select final columns
    df_final = df_person.select(
        "person_key",
        "nconst",
        "primary_name",
        "birth_year",
        "death_year",
        "profession_count",
        "etl_load_timestamp"
    )
    
    return df_final

## Load DIM_REGION

In [0]:
# =============================================================================
# GOLD LAYER: dim_region
# =============================================================================
# Description: Region dimension containing geographic regions/countries
# Source: iso_countries.csv (direct load from reference file)
# Grain: One row per unique region code
# =============================================================================

from pyspark.sql.functions import (
    col, current_timestamp, abs, hash, upper, trim, lit
)

@dlt.table(
    name="dim_region",
    comment="Gold layer: Region dimension - geographic regions with surrogate keys",
    table_properties={
        "quality": "gold",
        "layer": "gold",
        "domain": "imdb"
    }
)
@dlt.expect_or_drop("valid_region_code", "region_code IS NOT NULL")
@dlt.expect_or_drop("valid_region_name", "region_name IS NOT NULL")
def gold_dim_region():
    
    # -------------------------------------------------------------------------
    # Define path to reference CSV
    # -------------------------------------------------------------------------
    country_codes_path = "/Volumes/workspace/imdb_data_analysis/datastore/iso_countries/iso_countries.csv"
    
    # -------------------------------------------------------------------------
    # Read Country Codes reference CSV
    # -------------------------------------------------------------------------
    df = (
        spark.read
        .format("csv")
        .option("header", "true")
        .option("inferSchema", "true")
        .load(country_codes_path)
    )
    
    # -------------------------------------------------------------------------
    # Transform columns
    # -------------------------------------------------------------------------
    df = df.withColumn("region_code", upper(trim(col("Code"))))
    df = df.withColumn("region_name", trim(col("Description")))
    
    # Filter out null/empty codes (there's one row with empty code for Namibia)
    df = df.filter(
        (col("region_code").isNotNull()) & 
        (col("region_code") != "")
    )
    
    # -------------------------------------------------------------------------
    # Generate surrogate key
    # -------------------------------------------------------------------------
    df = df.withColumn(
        "region_key",
        # abs(hash(col("region_code")))
        monotonically_increasing_id() + 1
    )
    
    # Add audit column
    df = df.withColumn("etl_load_timestamp", current_timestamp())
    
    # -------------------------------------------------------------------------
    # Select final columns
    # -------------------------------------------------------------------------
    df_final = df.select(
        "region_key",
        "region_code",
        "region_name",
        "etl_load_timestamp"
    )
    
    return df_final

## Load DIM_LANGUAGE

In [0]:
# =============================================================================
# GOLD LAYER: dim_language
# =============================================================================
# Description: Language dimension containing languages
# Source: ISO_Language_Name.csv (direct load from reference file)
# Grain: One row per unique language code
# =============================================================================

from pyspark.sql.functions import (
    col, current_timestamp, abs, hash, lower, trim, lit
)

@dlt.table(
    name="dim_language",
    comment="Gold layer: Language dimension - languages with surrogate keys",
    table_properties={
        "quality": "gold",
        "layer": "gold",
        "domain": "imdb"
    }
)
@dlt.expect_or_drop("valid_language_code", "language_code IS NOT NULL")
@dlt.expect_or_drop("valid_language_name", "language_name IS NOT NULL")
def gold_dim_language():
    
    # -------------------------------------------------------------------------
    # Define path to reference CSV
    # -------------------------------------------------------------------------
    language_codes_path = "/Volumes/workspace/imdb_data_analysis/datastore/ISO_Language_Name/ISO_Language_Name.csv"
    
    # -------------------------------------------------------------------------
    # Read Language Codes reference CSV
    # -------------------------------------------------------------------------
    df = (
        spark.read
        .format("csv")
        .option("header", "true")
        .option("inferSchema", "true")
        .load(language_codes_path)
    )
    
    # -------------------------------------------------------------------------
    # Transform columns
    # -------------------------------------------------------------------------
    df = df.withColumn("language_code", lower(trim(col("`Set 1 (639-1)`"))))
    df = df.withColumn("language_name", trim(col("`ISO Language Name`")))
    
    # Filter out null/empty codes
    df = df.filter(
        (col("language_code").isNotNull()) & 
        (col("language_code") != "")
    )
    
    # -------------------------------------------------------------------------
    # Generate surrogate key
    # -------------------------------------------------------------------------
    df = df.withColumn(
        "language_key",
        # abs(hash(col("language_code")))
        monotonically_increasing_id() + 1
    )
    
    # Add audit column
    df = df.withColumn("etl_load_timestamp", current_timestamp())
    
    # -------------------------------------------------------------------------
    # Select final columns
    # -------------------------------------------------------------------------
    df_final = df.select(
        "language_key",
        "language_code",
        "language_name",
        "etl_load_timestamp"
    )
    
    return df_final

## Load DIM_TITLE

In [0]:
# =============================================================================
# GOLD LAYER: dim_title
# =============================================================================
# Description: Title dimension containing movies, TV series, and episodes
# Source: silver_imdb_title_basics (deduplicated)
# Grain: One row per unique title (tconst)
# =============================================================================

from pyspark.sql.functions import (
    col, lit, current_timestamp, first, monotonically_increasing_id
)

@dlt.table(
    name="dim_title",
    comment="Gold layer: Title dimension - movies, TV series, episodes",
    table_properties={
        "quality": "gold",
        "layer": "gold",
        "domain": "imdb"
    }
)
@dlt.expect_or_drop("valid_tconst", "tconst IS NOT NULL")
@dlt.expect_or_drop("valid_title", "primary_title IS NOT NULL")
def gold_dim_title():
    
    # -------------------------------------------------------------------------
    # Read from Silver layer
    # -------------------------------------------------------------------------
    df = spark.read.table("LIVE.silver_imdb_title_basics")
    
    # -------------------------------------------------------------------------
    # Deduplicate (silver has exploded genres - aggregate back to one row per tconst)
    # -------------------------------------------------------------------------
    df = df.groupBy("tconst").agg(
        first("title_category").alias("title_category"),
        first("titleType").alias("title_type"),
        first("primaryTitle").alias("primary_title"),
        first("originalTitle").alias("original_title"),
        first("isAdult").alias("is_adult"),
        first("startYear").alias("start_year"),
        first("endYear").alias("end_year"),
        first("runtimeMinutes").alias("runtime_minutes")
    )
    
    # -------------------------------------------------------------------------
    # Generate surrogate key
    # -------------------------------------------------------------------------
    df = df.withColumn(
        "title_key",
        monotonically_increasing_id() + 1
    )
    
    # -------------------------------------------------------------------------
    # Add metadata and audit columns
    # -------------------------------------------------------------------------
    df = df.withColumn("source_system", lit("silver_imdb_title_basics"))
    df = df.withColumn("etl_load_timestamp", current_timestamp())
    
    # -------------------------------------------------------------------------
    # Select final columns in schema order
    # -------------------------------------------------------------------------
    df_final = df.select(
        "title_key",
        "tconst",
        "title_category",
        "title_type",
        "primary_title",
        "original_title",
        "is_adult",
        "start_year",
        "end_year",
        "runtime_minutes",
        "source_system",
        "etl_load_timestamp"
    )
    
    return df_final

## Load DIM_EPISODE

In [0]:
# =============================================================================
# GOLD LAYER: dim_episode
# =============================================================================
# Description: Episode dimension (outrigger from dim_title) containing TV episode hierarchy
# Source: silver_imdb_title_episode + dim_title (for key lookups)
# Grain: One row per unique episode (tconst)
# =============================================================================

from pyspark.sql.functions import (
    col, lit, current_timestamp, monotonically_increasing_id
)

@dlt.table(
    name="dim_episode",
    comment="Gold layer: Episode dimension (outrigger) - TV episode hierarchy with parent series linkage",
    table_properties={
        "quality": "gold",
        "layer": "gold",
        "domain": "imdb"
    }
)
@dlt.expect_or_drop("valid_tconst", "tconst IS NOT NULL")
@dlt.expect_or_drop("valid_parent_tconst", "parent_tconst IS NOT NULL")
def gold_dim_episode():
    
    # -------------------------------------------------------------------------
    # Read from Silver layer
    # -------------------------------------------------------------------------
    df = spark.read.table("LIVE.silver_imdb_title_episode")
    
    # -------------------------------------------------------------------------
    # Read dim_title for key lookups
    # -------------------------------------------------------------------------
    df_title = spark.read.table("LIVE.dim_title").select(
        col("tconst").alias("lookup_tconst"),
        col("title_key").alias("lookup_title_key")
    )
    
    # -------------------------------------------------------------------------
    # Lookup title_key for episode (tconst)
    # -------------------------------------------------------------------------
    df = df.join(
        df_title,
        df["tconst"] == df_title["lookup_tconst"],
        "left"
    ).withColumn(
        "title_key", col("lookup_title_key")
    ).drop("lookup_tconst", "lookup_title_key")
    
    # -------------------------------------------------------------------------
    # Lookup parent_title_key for parent series (parentTconst)
    # -------------------------------------------------------------------------
    df_parent = spark.read.table("LIVE.dim_title").select(
        col("tconst").alias("parent_lookup_tconst"),
        col("title_key").alias("parent_lookup_title_key")
    )
    
    df = df.join(
        df_parent,
        df["parentTconst"] == df_parent["parent_lookup_tconst"],
        "left"
    ).withColumn(
        "parent_title_key", col("parent_lookup_title_key")
    ).drop("parent_lookup_tconst", "parent_lookup_title_key")
    
    # -------------------------------------------------------------------------
    # Rename columns
    # -------------------------------------------------------------------------
    df = df.withColumnRenamed("parentTconst", "parent_tconst")
    df = df.withColumnRenamed("seasonNumber", "season_number")
    df = df.withColumnRenamed("episodeNumber", "episode_number")
    
    # -------------------------------------------------------------------------
    # Generate surrogate key
    # -------------------------------------------------------------------------
    df = df.withColumn(
        "episode_key",
        monotonically_increasing_id() + 1
    )
    
    # -------------------------------------------------------------------------
    # Add metadata and audit columns
    # -------------------------------------------------------------------------
    df = df.withColumn("source_system", lit("silver_imdb_title_episode"))
    df = df.withColumn("etl_load_timestamp", current_timestamp())
    
    # -------------------------------------------------------------------------
    # Select final columns in schema order
    # -------------------------------------------------------------------------
    df_final = df.select(
        "episode_key",
        "tconst",
        "title_key",
        "parent_tconst",
        "parent_title_key",
        "season_number",
        "episode_number",
        "source_system",
        "etl_load_timestamp"
    )
    
    return df_final