In [0]:
import dlt
from pyspark.sql import DataFrame
from pyspark.sql.window import Window
from pyspark.sql.functions import (
    col, when, regexp_replace, trim, split, array_distinct,
    array_remove, size, concat_ws, current_timestamp,
    lit, expr, md5, coalesce, lower, row_number
)
from pyspark.sql.types import IntegerType

# ============================================================
# 1. CONFIG
# ============================================================

RAW_DATA_PATH = "/Volumes/workspace/imdb/imdb/title.akas.tsv"
LANGUAGE_CODES_PATH = "/Volumes/workspace/imdb/imdb/language-codes-iso.tsv"   # update path
REGION_CODES_PATH   = "/Volumes/workspace/imdb/imdb/countries_iso - all.tsv"     # update path

# ============================================================
# 2. BRONZE LAYER
# ============================================================

@dlt.table(
    name="title_akas_bronze",
    comment="Bronze table for IMDb title.akas – raw data plus audit columns"
)
def title_akas_bronze():
    df_raw = (
        spark.read
        .option("header", "true")
        .option("sep", "\t")
        .option("nullValue", "\\N")
        .csv(RAW_DATA_PATH)
    )

    # normalize empties to nulls
    df_raw = (
        df_raw
        .withColumn("titleId", when(trim(col("titleId")) == "", None).otherwise(col("titleId")))
        .withColumn("ordering", when(trim(col("ordering")) == "", None).otherwise(col("ordering")))
        .withColumn("title", when(trim(col("title")) == "", None).otherwise(col("title")))
        .withColumn("region", when(trim(col("region")) == "", None).otherwise(col("region")))
        .withColumn("language", when(trim(col("language")) == "", None).otherwise(col("language")))
        .withColumn("types", when(trim(col("types")) == "", None).otherwise(col("types")))
        .withColumn("attributes", when(trim(col("attributes")) == "", None).otherwise(col("attributes")))
        .withColumn("isOriginalTitle", when(trim(col("isOriginalTitle")) == "", None).otherwise(col("isOriginalTitle")))
    )

    df_bronze = (
        df_raw
        .withColumn("bronze_ingestion_timestamp", current_timestamp())
        .withColumn("bronze_ingestion_date", current_timestamp().cast("date"))
        .withColumn("bronze_source_system", lit("imdb"))
        .withColumn("bronze_source_file", lit(RAW_DATA_PATH))
        .withColumn(
            "bronze_record_hash",
            md5(
                concat_ws(
                    "|",
                    coalesce(col("titleId"), lit("")),
                    coalesce(col("ordering"), lit("")),
                    coalesce(col("title"), lit("")),
                    coalesce(col("region"), lit("")),
                    coalesce(col("language"), lit("")),
                    coalesce(col("types"), lit("")),
                    coalesce(col("attributes"), lit("")),
                    coalesce(col("isOriginalTitle"), lit(""))
                )
            )
        )
    )

    return df_bronze

# ============================================================
# 3. CLEANING HELPERS
# ============================================================

def validate_title_id(df: DataFrame) -> DataFrame:
    """Flag whether titleId is a valid IMDb title id (tt[0-9]+)."""
    return df.withColumn(
        "titleId_valid",
        when(col("titleId").isNotNull() & col("titleId").rlike("^tt[0-9]+$"), True).otherwise(False)
    )

def clean_ordering(df: DataFrame) -> DataFrame:
    """Parse ordering as positive integer; leave NULL if bad."""
    return df.withColumn(
        "ordering_clean",
        when(
            col("ordering").isNotNull() &
            col("ordering").rlike("^[0-9]+$") &
            (col("ordering").cast(IntegerType()) >= 1) &
            (col("ordering").cast(IntegerType()) <= 100000),
            col("ordering").cast(IntegerType())
        ).otherwise(None)
    )

def clean_title(df: DataFrame) -> DataFrame:
    """Normalize alternative title."""
    return df.withColumn(
        "title_clean",
        when(col("title").isNotNull(),
             trim(regexp_replace(col("title"), "\\s+", " ")))
    )

def clean_region_language(df: DataFrame) -> DataFrame:
    """Normalize region and language codes to lower-case; drop weird junk."""
    df = df.withColumn(
        "region_clean",
        when(col("region").isNotNull(),
             lower(trim(col("region"))))
    )

    df = df.withColumn(
        "language_clean",
        when(col("language").isNotNull(),
             lower(trim(col("language"))))
    )

    return df

def parse_types_attributes(df: DataFrame) -> DataFrame:
    """Split comma-separated types/attributes into arrays."""
    df = df.withColumn(
        "types_array",
        when(col("types").isNotNull(),
             array_distinct(array_remove(split(col("types"), ","), "")))
        .otherwise(expr("array()"))
    )

    df = df.withColumn(
        "attributes_array",
        when(col("attributes").isNotNull(),
             array_distinct(array_remove(split(col("attributes"), ","), "")))
        .otherwise(expr("array()"))
    )

    df = df.withColumn("types_count", size(col("types_array")))
    df = df.withColumn("attributes_count", size(col("attributes_array")))
    return df

def clean_is_original(df: DataFrame) -> DataFrame:
    """
    Convert isOriginalTitle from string to boolean:
      '1' -> True
      '0' or NULL -> False (but we still track validity separately)
    """
    df = df.withColumn(
        "is_original_flag",
        when(col("isOriginalTitle") == "1", True)
        .otherwise(False)
    )
    return df

# ============================================================
# 4. SILVER ALL (CLEANED + QUALITY FLAGS)
# ============================================================

@dlt.table(
    name="title_akas_silver_all",
    comment="Silver cleaned table for title.akas with quality scoring"
)
def title_akas_silver_all():

    df = dlt.read("title_akas_bronze")

    # Cleaning
    df = validate_title_id(df)
    df = clean_ordering(df)
    df = clean_title(df)
    df = clean_region_language(df)
    df = parse_types_attributes(df)
    df = clean_is_original(df)

    # ===========================
    # VALIDITY FLAGS
    # ===========================

    # Ordering:
    #   - NULL original is OK (unknown)
    #   - non-null must parse and be > 0 and within sane limit
    df = df.withColumn(
        "has_valid_ordering",
        when(col("ordering").isNull(), True)
        .when(col("ordering_clean").isNotNull(), True)
        .otherwise(False)
    )

    # Region:
    #   - NULL original is OK
    #   - non-null must have a cleaned value (simple a-z0-9/- like codes)
    df = df.withColumn(
        "has_valid_region",
        when(col("region").isNull(), True)
        .when(col("region_clean").isNotNull(), True)
        .otherwise(False)
    )

    # Language:
    df = df.withColumn(
        "has_valid_language",
        when(col("language").isNull(), True)
        .when(col("language_clean").isNotNull(), True)
        .otherwise(False)
    )

    # isOriginalTitle:
    #   allowed values: NULL, '0', '1'
    df = df.withColumn(
        "has_valid_is_original",
        when(col("isOriginalTitle").isNull(), True)
        .when(col("isOriginalTitle").isin("0", "1"), True)
        .otherwise(False)
    )

    # Completeness score (for information only)
    df = df.withColumn(
        "data_completeness_score",
        (
            when(col("titleId_valid"), 20).otherwise(0)
            + when(col("title_clean").isNotNull(), 20).otherwise(0)
            + when(col("ordering_clean").isNotNull(), 20).otherwise(0)
            + when(col("region_clean").isNotNull(), 20).otherwise(0)
            + when(col("language_clean").isNotNull(), 20).otherwise(0)
        )
    )

    df = df.withColumn(
        "quality_tier",
        when(col("data_completeness_score") >= 80, "HIGH")
        .when(col("data_completeness_score") >= 60, "MEDIUM")
        .when(col("data_completeness_score") >= 40, "LOW")
        .otherwise("POOR")
    )

    df = df.withColumn("silver_processing_timestamp", current_timestamp())
    df = df.withColumn("silver_processing_date", current_timestamp().cast("date"))
    df = df.withColumn("silver_version", lit("1.0"))

    # FINAL QUALITY DECISION – quarantine ONLY truly bad rows
    df = df.withColumn(
        "silver_quality_check",
        when(~col("titleId_valid"), "FAILED")
        .when(~col("has_valid_ordering"), "FAILED")
        .when(~col("has_valid_region"), "FAILED")
        .when(~col("has_valid_language"), "FAILED")
        .when(~col("has_valid_is_original"), "FAILED")
        .otherwise("PASSED")
    )

    # Deduplicate by (titleId, ordering), keep latest ingestion
    w = Window.partitionBy("titleId", "ordering").orderBy(col("bronze_ingestion_timestamp").desc())
    df = df.withColumn("rn", row_number().over(w)).filter(col("rn") == 1).drop("rn")

    return df

# ============================================================
# 5. SILVER CLEAN (NO NULLS, READY FOR GOLD)
# ============================================================

@dlt.table(
    name="title_akas_silver_clean",
    comment="High-quality Silver records for title.akas (with region & language names, no NULLs in key fields)"
)
def title_akas_silver_clean():
    # 1) Start from Silver_all and keep only PASSED rows
    df = dlt.read("title_akas_silver_all").filter(col("silver_quality_check") == "PASSED")

    # 2) Read language and region lookup TSVs
    # --- Language mapping (ISO codes) ---
    lang_ref_raw = (
        spark.read
        .option("header", "true")
        .option("sep", "\t")
        .csv(LANGUAGE_CODES_PATH)
    )

    language_ref = lang_ref_raw.select(
        lower(trim(col("alpha2"))).alias("ref_language_code"),
        col("English").alias("language_name")
    )

    # --- Region mapping (country codes) ---
    region_ref_raw = (
        spark.read
        .option("header", "true")
        .option("sep", "\t")
        .csv(REGION_CODES_PATH)
    )

    region_ref = region_ref_raw.select(
        lower(trim(col("alpha-2"))).alias("ref_region_code"),
        col("name").alias("region_name")
    )

    # 3) Join Silver data with reference data (left joins so we don't lose rows)
    df = (
        df
        .join(region_ref, df.region_clean == region_ref.ref_region_code, "left")
        .join(language_ref, df.language_clean == language_ref.ref_language_code, "left")
    )

    # 4) Select final columns (codes + full names + lineage)
    return df.select(
        # IDs / keys
        coalesce(col("titleId"), lit("UNKNOWN")).alias("title_imdb_id"),
        coalesce(col("ordering_clean"), lit(0)).alias("ordering"),

        # Titles
        coalesce(col("title_clean"), lit("Unknown")).alias("aka_title"),

        # Region: code from akas + full name from region TSV
        coalesce(col("region_clean"), lit("unknown")).alias("region_code"),
        coalesce(col("region_name"), lit("Unknown Region")).alias("region_name"),

        # Language: code from akas + full name from language TSV
        coalesce(col("language_clean"), lit("unknown")).alias("language_code"),
        coalesce(col("language_name"), lit("Unknown Language")).alias("language_name"),

        # Types / attributes as arrays
        coalesce(col("types_array"), expr("array()")).alias("types"),
        coalesce(col("attributes_array"), expr("array()")).alias("attributes"),
        coalesce(col("types_count"), lit(0)).alias("types_count"),
        coalesce(col("attributes_count"), lit(0)).alias("attributes_count"),

        # Original title flag
        coalesce(col("is_original_flag"), lit(False)).alias("is_original_title"),

        # Completeness / quality
        coalesce(col("data_completeness_score"), lit(0)).alias("data_completeness_score"),
        coalesce(col("quality_tier"), lit("UNKNOWN")).alias("quality_tier"),

        # Audit / lineage
        coalesce(col("silver_processing_timestamp"), current_timestamp()).alias("silver_processing_timestamp"),
        coalesce(col("silver_processing_date"), current_timestamp().cast("date")).alias("silver_processing_date"),
        coalesce(col("silver_version"), lit("1.0")).alias("silver_version"),

        coalesce(col("bronze_ingestion_timestamp"), current_timestamp()).alias("bronze_ingestion_timestamp"),
        coalesce(col("bronze_ingestion_date"), current_timestamp().cast("date")).alias("bronze_ingestion_date"),

        coalesce(col("bronze_source_system"), lit("imdb")).alias("bronze_source_system"),
        coalesce(col("bronze_source_file"), lit("UNKNOWN_FILE")).alias("bronze_source_file"),
        coalesce(col("bronze_record_hash"), lit("")).alias("bronze_record_hash")
    )

# ============================================================
# 6. QUARANTINE
# ============================================================

@dlt.table(
    name="title_akas_quarantine",
    comment="Records from title.akas that failed Silver quality checks (invalid IDs, codes, ordering)"
)
def title_akas_quarantine():
    return dlt.read("title_akas_silver_all").filter(col("silver_quality_check") == "FAILED")


In [0]:
%skip
import dlt
from pyspark.sql import DataFrame
from pyspark.sql.window import Window
from pyspark.sql.functions import (
    col, when, regexp_replace, trim, split, array_distinct,
    array_remove, size, concat_ws, current_timestamp,
    lit, expr, md5, coalesce, lower, row_number, array_contains,
    arrays_overlap, array
)
from pyspark.sql.types import IntegerType

# ============================================================
# 1. CONFIG
# ============================================================

RAW_DATA_PATH = "/Volumes/workspace/damg7370/datastore/imdb/raw/title.akas.tsv"

# ============================================================
# 2. REFERENCE DATA - Based on Official IMDb and ISO Standards
# ============================================================

# Valid IMDb types for title.akas (from IMDb documentation)
# Source: https://developer.imdb.com/non-commercial-datasets/
VALID_TYPES = [
    "alternative",
    "dvd",
    "festival",
    "tv",
    "video",
    "working",
    "original",
    "imdbDisplay"
]

# Valid region codes (ISO 3166-1 alpha-2 + IMDb historical codes)
# Source: https://help.imdb.com/article/contribution/other-submission-guides/country-codes/G99K4LFRMSC37DCN
VALID_REGION_CODES = [
    # Standard ISO 3166-1 alpha-2 codes (lowercase)
    "af", "ax", "al", "dz", "as", "ad", "ao", "ai", "aq", "ag", "ar", "am", "aw", "au", "at", "az",
    "bs", "bh", "bd", "bb", "by", "be", "bz", "bj", "bm", "bt", "bo", "ba", "bw", "bv", "br", "io",
    "vg", "bn", "bg", "bf", "bi", "kh", "cm", "ca", "cv", "ky", "cf", "td", "cl", "cn", "cx", "cc",
    "co", "km", "cg", "cd", "ck", "cr", "ci", "hr", "cu", "cy", "cz", "dk", "dj", "dm", "do", "ec",
    "eg", "sv", "gq", "er", "ee", "et", "fk", "fo", "fj", "fi", "fr", "gf", "pf", "tf", "ga", "gm",
    "ge", "de", "gh", "gi", "gb", "gr", "gl", "gd", "gp", "gu", "gt", "gg", "gw", "gn", "gy", "ht",
    "hm", "hn", "hk", "hu", "is", "in", "id", "ir", "iq", "ie", "im", "il", "it", "jm", "jp", "je",
    "jo", "kz", "ke", "ki", "kp", "kr", "kw", "kg", "la", "lv", "lb", "ls", "lr", "ly", "li", "lt",
    "lu", "mo", "mg", "mw", "my", "mv", "ml", "mt", "mh", "mq", "mr", "mu", "yt", "mx", "fm", "md",
    "mc", "mn", "me", "ms", "ma", "mz", "mm", "na", "nr", "np", "an", "nl", "nc", "nz", "ni", "ne",
    "ng", "nu", "nf", "mk", "mp", "no", "om", "pk", "pw", "ps", "pa", "pg", "py", "pe", "ph", "pn",
    "pl", "pt", "pr", "qa", "re", "ro", "ru", "rw", "sh", "kn", "lc", "pm", "vc", "ws", "sm", "st",
    "sa", "sn", "rs", "sc", "sl", "sg", "sk", "si", "sb", "so", "za", "gs", "es", "lk", "sd", "sr",
    "sj", "sz", "se", "ch", "sy", "tw", "tj", "tz", "th", "tl", "tg", "tk", "to", "tt", "tn", "tr",
    "tm", "tc", "tv", "ug", "ua", "ae", "um", "vi", "us", "uy", "uz", "vu", "va", "ve", "vn", "wf",
    "eh", "ye", "zm", "zw",
    # Kosovo (special IMDb code)
    "xkv",
    # Historical country codes (from IMDb documentation)
    "cshh",  # Czechoslovakia
    "csxx",  # Serbia and Montenegro
    "ddde",  # German Democratic Republic (East Germany)
    "suhh",  # Soviet Union (USSR)
    "yucs",  # Yugoslavia
    # Additional codes that may appear in IMDb data
    "xww",   # Worldwide
    "xeu",   # Europe
    "xas",   # Asia
    "xna",   # North America
    "xsa",   # South America
    "xoc",   # Oceania
    "xaf",   # Africa
]

# Valid language codes (ISO 639-1 two-letter codes)
# Source: https://en.wikipedia.org/wiki/List_of_ISO_639_language_codes
VALID_LANGUAGE_CODES = [
    "ab", "aa", "af", "ak", "sq", "am", "ar", "an", "hy", "as", "av", "ae", "ay", "az",
    "bm", "ba", "eu", "be", "bn", "bi", "bs", "br", "bg", "my", "ca", "ch", "ce", "ny",
    "zh", "cu", "cv", "kw", "co", "cr", "hr", "cs", "da", "dv", "nl", "dz", "en", "eo",
    "et", "ee", "fo", "fj", "fi", "fr", "fy", "ff", "gd", "gl", "lg", "ka", "de", "el",
    "kl", "gn", "gu", "ht", "ha", "he", "hz", "hi", "ho", "hu", "is", "io", "ig", "id",
    "ia", "ie", "iu", "ik", "ga", "it", "ja", "jv", "kn", "kr", "ks", "kk", "km", "ki",
    "rw", "ky", "kv", "kg", "ko", "kj", "ku", "lo", "la", "lv", "li", "ln", "lt", "lu",
    "lb", "mk", "mg", "ms", "ml", "mt", "gv", "mi", "mr", "mh", "mn", "na", "nv", "nd",
    "nr", "ng", "ne", "no", "nb", "nn", "oc", "oj", "or", "om", "os", "pi", "ps", "fa",
    "pl", "pt", "pa", "qu", "ro", "rm", "rn", "ru", "se", "sm", "sg", "sa", "sc", "sr",
    "sn", "sd", "si", "sk", "sl", "so", "st", "es", "su", "sw", "ss", "sv", "tl", "ty",
    "tg", "ta", "tt", "te", "th", "bo", "ti", "to", "ts", "tn", "tr", "tk", "tw", "ug",
    "uk", "ur", "uz", "ve", "vi", "vo", "wa", "cy", "wo", "xh", "ii", "yi", "yo", "za", "zu",
    # Additional common codes found in IMDb data
    "cmn",  # Mandarin Chinese (ISO 639-3)
    "yue",  # Cantonese (ISO 639-3)
    "qbn",  # Bengali variant
    "qbo",  # Tibetan variant
    "qbp",  # Some IMDb-specific codes
]

# ============================================================
# 3. BRONZE LAYER
# ============================================================

@dlt.table(
    name="title_akas_bronze",
    comment="Bronze table for IMDb title.akas – raw data plus audit columns. Source: https://datasets.imdbws.com/"
)
def title_akas_bronze():
    """
    Ingests raw IMDb title.akas.tsv data with minimal transformations.
    
    Schema per IMDb documentation (https://developer.imdb.com/non-commercial-datasets/):
    - titleId: tconst (alphanumeric unique identifier, format: tt[0-9]+)
    - ordering: integer to uniquely identify rows for a given titleId
    - title: localized title
    - region: region code for this version (ISO 3166-1 alpha-2)
    - language: language code (ISO 639-1)
    - types: enumerated attributes (alternative, dvd, festival, tv, video, working, original, imdbDisplay)
    - attributes: additional descriptive terms (not enumerated)
    - isOriginalTitle: 0 or 1 boolean
    """
    df_raw = (
        spark.read
        .option("header", "true")
        .option("sep", "\t")
        .option("nullValue", "\\N")  # IMDb uses \N for NULL values
        .csv(RAW_DATA_PATH)
    )

    # Normalize empty strings to nulls (consistent with \N handling)
    df_raw = (
        df_raw
        .withColumn("titleId", when(trim(col("titleId")) == "", None).otherwise(col("titleId")))
        .withColumn("ordering", when(trim(col("ordering")) == "", None).otherwise(col("ordering")))
        .withColumn("title", when(trim(col("title")) == "", None).otherwise(col("title")))
        .withColumn("region", when(trim(col("region")) == "", None).otherwise(col("region")))
        .withColumn("language", when(trim(col("language")) == "", None).otherwise(col("language")))
        .withColumn("types", when(trim(col("types")) == "", None).otherwise(col("types")))
        .withColumn("attributes", when(trim(col("attributes")) == "", None).otherwise(col("attributes")))
        .withColumn("isOriginalTitle", when(trim(col("isOriginalTitle")) == "", None).otherwise(col("isOriginalTitle")))
    )

    # Add audit/lineage columns
    df_bronze = (
        df_raw
        .withColumn("bronze_ingestion_timestamp", current_timestamp())
        .withColumn("bronze_ingestion_date", current_timestamp().cast("date"))
        .withColumn("bronze_source_system", lit("imdb"))
        .withColumn("bronze_source_file", lit(RAW_DATA_PATH))
        .withColumn(
            "bronze_record_hash",
            md5(
                concat_ws(
                    "|",
                    coalesce(col("titleId"), lit("")),
                    coalesce(col("ordering"), lit("")),
                    coalesce(col("title"), lit("")),
                    coalesce(col("region"), lit("")),
                    coalesce(col("language"), lit("")),
                    coalesce(col("types"), lit("")),
                    coalesce(col("attributes"), lit("")),
                    coalesce(col("isOriginalTitle"), lit(""))
                )
            )
        )
    )

    return df_bronze

# ============================================================
# 4. CLEANING HELPERS
# ============================================================

def validate_title_id(df: DataFrame) -> DataFrame:
    """
    Validate titleId follows IMDb tconst format: tt followed by digits.
    Per IMDb docs: "a tconst, an alphanumeric unique identifier of the title"
    """
    return df.withColumn(
        "titleId_valid",
        when(
            col("titleId").isNotNull() & 
            col("titleId").rlike("^tt[0-9]+$"), 
            True
        ).otherwise(False)
    )

def clean_ordering(df: DataFrame) -> DataFrame:
    """
    Parse ordering as positive integer.
    Per IMDb docs: "a number to uniquely identify rows for a given titleId"
    Ordering starts from 1 and should be reasonable (cap at 100000 for sanity).
    """
    return df.withColumn(
        "ordering_clean",
        when(
            col("ordering").isNotNull() &
            col("ordering").rlike("^[0-9]+$") &
            (col("ordering").cast(IntegerType()) >= 1) &
            (col("ordering").cast(IntegerType()) <= 100000),
            col("ordering").cast(IntegerType())
        ).otherwise(None)
    )

def clean_title(df: DataFrame) -> DataFrame:
    """
    Normalize alternative title: trim whitespace and collapse multiple spaces.
    Per IMDb docs: "the localized title"
    """
    return df.withColumn(
        "title_clean",
        when(
            col("title").isNotNull(),
            trim(regexp_replace(col("title"), "\\s+", " "))
        ).otherwise(None)
    )

def clean_region(df: DataFrame) -> DataFrame:
    """
    Normalize and validate region codes.
    Per IMDb docs: "the region for this version of the title"
    Uses ISO 3166-1 alpha-2 codes plus IMDb historical codes.
    """
    # Create array of valid region codes for comparison
    valid_regions_array = array(*[lit(code) for code in VALID_REGION_CODES])
    
    df = df.withColumn(
        "region_clean",
        when(col("region").isNotNull(), lower(trim(col("region")))).otherwise(None)
    )
    
    # Validate against known codes
    df = df.withColumn(
        "region_is_valid_code",
        when(col("region_clean").isNull(), True)  # NULL is acceptable
        .when(array_contains(valid_regions_array, col("region_clean")), True)
        # Also accept 2-4 character alphanumeric codes (for flexibility with new/unknown codes)
        .when(col("region_clean").rlike("^[a-z]{2,4}$"), True)
        .otherwise(False)
    )
    
    return df

def clean_language(df: DataFrame) -> DataFrame:
    """
    Normalize and validate language codes.
    Per IMDb docs: "the language of the title"
    Uses ISO 639-1 (2-letter) and some ISO 639-3 (3-letter) codes.
    """
    # Create array of valid language codes for comparison
    valid_languages_array = array(*[lit(code) for code in VALID_LANGUAGE_CODES])
    
    df = df.withColumn(
        "language_clean",
        when(col("language").isNotNull(), lower(trim(col("language")))).otherwise(None)
    )
    
    # Validate against known codes
    df = df.withColumn(
        "language_is_valid_code",
        when(col("language_clean").isNull(), True)  # NULL is acceptable
        .when(array_contains(valid_languages_array, col("language_clean")), True)
        # Also accept 2-3 character alphabetic codes (for flexibility)
        .when(col("language_clean").rlike("^[a-z]{2,3}$"), True)
        .otherwise(False)
    )
    
    return df

def parse_types(df: DataFrame) -> DataFrame:
    """
    Parse and validate types field.
    Per IMDb docs: "Enumerated set of attributes for this alternative title. 
    One or more of: alternative, dvd, festival, tv, video, working, original, imdbDisplay"
    """
    # Create array of valid types for comparison
    valid_types_array = array(*[lit(t) for t in VALID_TYPES])
    
    # Split comma-separated types into array
    df = df.withColumn(
        "types_array",
        when(
            col("types").isNotNull(),
            array_distinct(array_remove(split(col("types"), ","), ""))
        ).otherwise(expr("array()"))
    )
    
    df = df.withColumn("types_count", size(col("types_array")))
    
    # Check if all types are valid enumerated values
    df = df.withColumn(
        "types_all_valid",
        when(
            col("types").isNull() | (size(col("types_array")) == 0), 
            True
        ).otherwise(
            # All elements must be in valid types list
            size(array_distinct(
                expr(f"filter(types_array, x -> array_contains(array{tuple(VALID_TYPES)}, x))")
            )) == size(col("types_array"))
        )
    )
    
    return df

def parse_attributes(df: DataFrame) -> DataFrame:
    """
    Parse attributes field.
    Per IMDb docs: "Additional terms to describe this alternative title, not enumerated"
    Since not enumerated, we just clean and split without validation.
    """
    df = df.withColumn(
        "attributes_array",
        when(
            col("attributes").isNotNull(),
            array_distinct(array_remove(split(col("attributes"), ","), ""))
        ).otherwise(expr("array()"))
    )
    
    df = df.withColumn("attributes_count", size(col("attributes_array")))
    
    return df

def clean_is_original(df: DataFrame) -> DataFrame:
    """
    Convert isOriginalTitle from string to boolean.
    Per IMDb docs: "0: not original title; 1: original title"
    """
    df = df.withColumn(
        "is_original_flag",
        when(col("isOriginalTitle") == "1", True)
        .when(col("isOriginalTitle") == "0", False)
        .otherwise(None)  # NULL if invalid value
    )
    
    # Validate that value is 0, 1, or NULL
    df = df.withColumn(
        "is_original_valid",
        when(col("isOriginalTitle").isNull(), True)
        .when(col("isOriginalTitle").isin("0", "1"), True)
        .otherwise(False)
    )
    
    return df

# ============================================================
# 5. SILVER ALL (CLEANED + QUALITY FLAGS)
# ============================================================

@dlt.table(
    name="title_akas_silver_all",
    comment="Silver cleaned table for title.akas with comprehensive quality scoring based on IMDb standards"
)
def title_akas_silver_all():
    """
    Apply all cleaning transformations and generate quality metrics.
    Records are flagged but not filtered - quarantine table handles bad records.
    """
    df = dlt.read("title_akas_bronze")

    # Apply all cleaning functions
    df = validate_title_id(df)
    df = clean_ordering(df)
    df = clean_title(df)
    df = clean_region(df)
    df = clean_language(df)
    df = parse_types(df)
    df = parse_attributes(df)
    df = clean_is_original(df)

    # ===========================
    # VALIDITY FLAGS
    # ===========================

    # Ordering validation: NULL is OK, non-null must parse successfully
    df = df.withColumn(
        "has_valid_ordering",
        when(col("ordering").isNull(), True)
        .when(col("ordering_clean").isNotNull(), True)
        .otherwise(False)
    )

    # Region validation: NULL is OK, non-null must be valid code
    df = df.withColumn(
        "has_valid_region",
        when(col("region").isNull(), True)
        .when(col("region_is_valid_code"), True)
        .otherwise(False)
    )

    # Language validation: NULL is OK, non-null must be valid code
    df = df.withColumn(
        "has_valid_language",
        when(col("language").isNull(), True)
        .when(col("language_is_valid_code"), True)
        .otherwise(False)
    )

    # Types validation: NULL is OK, non-null must have all valid enumerated values
    df = df.withColumn(
        "has_valid_types",
        when(col("types").isNull(), True)
        .when(col("types_all_valid"), True)
        .otherwise(False)
    )

    # isOriginalTitle validation
    df = df.withColumn(
        "has_valid_is_original",
        col("is_original_valid")
    )

    # ===========================
    # COMPLETENESS SCORE
    # ===========================
    # Score based on presence of key fields (max 100)
    df = df.withColumn(
        "data_completeness_score",
        (
            when(col("titleId_valid"), lit(25)).otherwise(lit(0))  # titleId is most important
            + when(col("title_clean").isNotNull(), lit(25)).otherwise(lit(0))  # title is essential
            + when(col("ordering_clean").isNotNull(), lit(15)).otherwise(lit(0))
            + when(col("region_clean").isNotNull(), lit(15)).otherwise(lit(0))
            + when(col("language_clean").isNotNull(), lit(10)).otherwise(lit(0))
            + when(size(col("types_array")) > 0, lit(5)).otherwise(lit(0))
            + when(col("is_original_flag").isNotNull(), lit(5)).otherwise(lit(0))
        )
    )

    df = df.withColumn(
        "quality_tier",
        when(col("data_completeness_score") >= 80, "HIGH")
        .when(col("data_completeness_score") >= 60, "MEDIUM")
        .when(col("data_completeness_score") >= 40, "LOW")
        .otherwise("POOR")
    )

    # ===========================
    # PROCESSING METADATA
    # ===========================
    df = df.withColumn("silver_processing_timestamp", current_timestamp())
    df = df.withColumn("silver_processing_date", current_timestamp().cast("date"))
    df = df.withColumn("silver_version", lit("2.0"))  # Updated version with proper validation

    # ===========================
    # QUALITY CHECK DECISION
    # ===========================
    # Only truly invalid records go to quarantine
    df = df.withColumn(
        "silver_quality_check",
        when(~col("titleId_valid"), "FAILED")  # Invalid titleId is critical
        .when(~col("has_valid_ordering"), "FAILED")
        .when(~col("has_valid_region"), "FAILED")
        .when(~col("has_valid_language"), "FAILED")
        .when(~col("has_valid_types"), "FAILED")
        .when(~col("has_valid_is_original"), "FAILED")
        .otherwise("PASSED")
    )

    # Add failure reason for debugging
    df = df.withColumn(
        "quality_failure_reason",
        when(~col("titleId_valid"), "Invalid titleId format (expected tt[0-9]+)")
        .when(~col("has_valid_ordering"), "Invalid ordering value")
        .when(~col("has_valid_region"), "Invalid region code")
        .when(~col("has_valid_language"), "Invalid language code")
        .when(~col("has_valid_types"), "Invalid type value (not in enumerated list)")
        .when(~col("has_valid_is_original"), "Invalid isOriginalTitle (expected 0 or 1)")
        .otherwise(None)
    )

    # Deduplicate by (titleId, ordering), keep latest ingestion
    w = Window.partitionBy("titleId", "ordering").orderBy(col("bronze_ingestion_timestamp").desc())
    df = df.withColumn("rn", row_number().over(w)).filter(col("rn") == 1).drop("rn")

    return df

# ============================================================
# 6. SILVER CLEAN (NO NULLS, READY FOR GOLD)
# ============================================================

@dlt.table(
    name="title_akas_silver_clean",
    comment="High-quality Silver records for title.akas with validated fields ready for Gold layer"
)
def title_akas_silver_clean():
    """
    Filter to only PASSED records and provide clean, standardized output schema.
    All NULL values replaced with sensible defaults.
    """
    df = dlt.read("title_akas_silver_all").filter(col("silver_quality_check") == "PASSED")

    return df.select(
        # Primary keys / identifiers
        col("titleId").alias("title_imdb_id"),
        coalesce(col("ordering_clean"), lit(0)).alias("ordering"),

        # Title information
        coalesce(col("title_clean"), lit("Unknown")).alias("aka_title"),

        # Region / language (validated codes)
        coalesce(col("region_clean"), lit("unknown")).alias("region_code"),
        coalesce(col("language_clean"), lit("unknown")).alias("language_code"),

        # Types / attributes as arrays
        col("types_array").alias("types"),
        col("attributes_array").alias("attributes"),
        col("types_count"),
        col("attributes_count"),

        # Original title flag
        coalesce(col("is_original_flag"), lit(False)).alias("is_original_title"),

        # Quality metrics
        col("data_completeness_score"),
        col("quality_tier"),

        # Silver layer audit
        col("silver_processing_timestamp"),
        col("silver_processing_date"),
        col("silver_version"),

        # Bronze layer lineage
        col("bronze_ingestion_timestamp"),
        col("bronze_ingestion_date"),
        col("bronze_source_system"),
        col("bronze_source_file"),
        col("bronze_record_hash")
    )

# ============================================================
# 7. QUARANTINE
# ============================================================

@dlt.table(
    name="title_akas_quarantine",
    comment="Records from title.akas that failed Silver quality checks with failure reasons"
)
def title_akas_quarantine():
    """
    Capture all failed records with diagnostic information for investigation.
    """
    return (
        dlt.read("title_akas_silver_all")
        .filter(col("silver_quality_check") == "FAILED")
        .select(
            # Original values for investigation
            col("titleId"),
            col("ordering"),
            col("title"),
            col("region"),
            col("language"),
            col("types"),
            col("attributes"),
            col("isOriginalTitle"),
            
            # Validation results
            col("titleId_valid"),
            col("has_valid_ordering"),
            col("has_valid_region"),
            col("has_valid_language"),
            col("has_valid_types"),
            col("has_valid_is_original"),
            
            # Failure information
            col("silver_quality_check"),
            col("quality_failure_reason"),
            
            # Audit columns
            col("bronze_ingestion_timestamp"),
            col("bronze_source_file"),
            col("bronze_record_hash"),
            col("silver_processing_timestamp")
        )
    )

# ============================================================
# 8. DATA QUALITY METRICS (OPTIONAL AGGREGATION TABLE)
# ============================================================

@dlt.table(
    name="title_akas_quality_metrics",
    comment="Aggregated data quality metrics for title.akas pipeline monitoring"
)
def title_akas_quality_metrics():
    """
    Aggregate quality metrics for pipeline monitoring and alerting.
    """
    df = dlt.read("title_akas_silver_all")
    
    return df.groupBy("silver_processing_date").agg(
        expr("count(*) as total_records"),
        expr("sum(case when silver_quality_check = 'PASSED' then 1 else 0 end) as passed_records"),
        expr("sum(case when silver_quality_check = 'FAILED' then 1 else 0 end) as failed_records"),
        expr("round(sum(case when silver_quality_check = 'PASSED' then 1 else 0 end) * 100.0 / count(*), 2) as pass_rate_pct"),
        
        # Breakdown by validation type
        expr("sum(case when NOT titleId_valid then 1 else 0 end) as invalid_title_id_count"),
        expr("sum(case when NOT has_valid_ordering then 1 else 0 end) as invalid_ordering_count"),
        expr("sum(case when NOT has_valid_region then 1 else 0 end) as invalid_region_count"),
        expr("sum(case when NOT has_valid_language then 1 else 0 end) as invalid_language_count"),
        expr("sum(case when NOT has_valid_types then 1 else 0 end) as invalid_types_count"),
        expr("sum(case when NOT has_valid_is_original then 1 else 0 end) as invalid_is_original_count"),
        
        # Quality tier distribution
        expr("sum(case when quality_tier = 'HIGH' then 1 else 0 end) as high_quality_count"),
        expr("sum(case when quality_tier = 'MEDIUM' then 1 else 0 end) as medium_quality_count"),
        expr("sum(case when quality_tier = 'LOW' then 1 else 0 end) as low_quality_count"),
        expr("sum(case when quality_tier = 'POOR' then 1 else 0 end) as poor_quality_count"),
        
        # Average completeness
        expr("round(avg(data_completeness_score), 2) as avg_completeness_score")
    )