In [0]:
import dlt
import json
from pyspark.sql.functions import col, regexp_replace, from_json, udf, lower, initcap, upper, split, lit, concat, trim, when, isnull, expr, get, size, concat_ws, slice
from pyspark.sql.types import ArrayType, StringType, DoubleType, BooleanType, IntegerType, DateType

# ——————————————————————————————
#  COMMON SETUP
# ——————————————————————————————
env           = spark.conf.get("pipeline.env") # prod dev prostredi
catalog       = "principal_lab_db"
bronze_schema = f"{env}_bronze"
silver_schema = f"{env}_silver"
spark.sql(f"USE CATALOG {catalog}")
spark.sql(f"USE SCHEMA {silver_schema}")

# mapování SCD typů
scd_type_map = {
    "SCD1": 1, "SCD2": 2,
    "1": 1,     "2": 2,
    None: 0,     "": 0
}

# ——————————————————————————————
#  COMMON FUNCTIONS
# ——————————————————————————————
def get_table_expectations(table_name:str,catalog:str,enviroment:str) -> dict:
    """
    Return dlt expectations values for desired table.
    Args:
        table_name:str table name
        catalog:str catalog where is stored expectation setup table
        enviroment:str use in schema name for setup table
    Returns:
        :dict all expectations for table  
    """
    df = spark.read.table(f"{catalog}.config_{enviroment}.expectations_setup").filter(f"table_name == '{table_name}'")
    df = df.select("expectation_name","expectation_value")
    return {row["expectation_name"]: row["expectation_value"] for row in df.collect()}

# ---------------------------------------------policies-------------------------------------------------------------
# —————————————————————————————————————————————————————————————————————————————————
#  1) NAČTENÍ METADATA PRO POLICIES
# —————————————————————————————————————————————————————————————————————————————————
config = (
    spark.table(f"{catalog}.config_{env}.table_lookup")
         .filter(col("table_name") == "dim_policies")
         .select("keys", "scd_type", "description")
         .first()
)
keys_raw       = config["keys"]
scd_raw        = config["scd_type"]
description    = config["description"]

# rozparsování business-keys
business_keys = json.loads(keys_raw) if isinstance(keys_raw, str) else keys_raw
scd_type       = scd_type_map.get(str(scd_raw).upper(), 0)

# —————————————————————————————————————————————————————————————————————————————————
#  2) PRAVIDLA PRO VYMAZÁNÍ ŠPATNÝCH ZÁZNAMŮ
# —————————————————————————————————————————————————————————————————————————————————
expectation_rules = {
    "policies": {
        "expectations": [{
            "valid_policy_id":   "policy_id IS NOT NULL",
            "valid_agent_id":    "agent_id  IS NOT NULL",
            "valid_customer_id": "customer_id IS NOT NULL",
            "valid_product_id":  "product_id IS NOT NULL",
            "positive_premium":  "premium > 0",
            "valid_snapshot":    "snapshot_date IS NOT NULL"
        }]
    }
}
pol_rules = expectation_rules["policies"]["expectations"][0]
pol_expr  = "NOT({0})".format(" AND ".join(pol_rules.values()))

# —————————————————————————————————————————————————————————————————————————————————
#  3) VIEWČKO
# —————————————————————————————————————————————————————————————————————————————————
@dlt.table(
    name="policies_clean_quarantine_rules",
    comment="apply expectations for policies",
    partition_cols=["is_quarantined"]
)
@dlt.expect_all(pol_rules)
def policies_clean_quarantine():
    bronze = spark.readStream.table(f"{catalog}.{bronze_schema}.policies_bronze")
    return (
        bronze
        .withColumn("policy_id",    trim(col("policy_id")))
        .withColumn("agent_id",     trim(col("agent_id")))
        .withColumn("customer_id",  trim(col("customer_id")))
        .withColumn("product_id",   trim(col("product_id")))
        .withColumn("premium",      col("premium").cast(DoubleType()))
        .withColumn("coverages",    col("coverages"))
        .withColumn("snapshot_date", col("snapshot_date").cast(DateType()))
        .withColumn("is_quarantined", expr(pol_expr))
        .drop("_rescued_data", "source_file", "ingestion_ts")
    )


# —————————————————————————————————————————————————————————————————————————————————
#  4) ŠPATNÉ X DOBRÉ ZÁZNAMY
# —————————————————————————————————————————————————————————————————————————————————
@dlt.table(
    name="policies_clean_good_records",
    comment="policies good (passed expectations)"
)
def policies_good():
    return (
        dlt.read_stream("policies_clean_quarantine_rules")
           .filter("is_quarantined = false")
           .drop("is_quarantined")
    )

@dlt.table(
    name="policies_clean_bad_records",
    comment="policies bad (failed expectations)"
)
def policies_bad():
    return (
        dlt.read_stream("policies_clean_quarantine_rules")
           .filter("is_quarantined = true")
           .drop("is_quarantined")
    )

# —————————————————————————————————————————————————————————————————————————————————
#  5) SCD2 DIMENZE
# —————————————————————————————————————————————————————————————————————————————————
if scd_type == 2:
    dlt.create_streaming_table(
        name="dim_policies",
        comment=description,
        table_properties={"quality": "silver"}
    )
    dlt.apply_changes(
        target="dim_policies",
        source="policies_clean_good_records",
        keys=business_keys,
        sequence_by=col("snapshot_date"),
        ignore_null_updates=False,
        stored_as_scd_type="2",
        track_history_except_column_list=["snapshot_date"]
    )
else:
    raise ValueError(f"Nepodporovaný SCD typ: {scd_raw}")


# --------------------------------------------- customer -------------------------------------------------------------
# —————————————————————————————————————————————————————————————————————————————————
#  1) NACTENI METADATA PRO CUSTOMERS
# —————————————————————————————————————————————————————————————————————————————————
config_customers = spark.table(f"{catalog}.config_{env}.table_lookup") \
    .filter(col("table_name") == "dim_customers") \
    .select("keys", "scd_type", "description") \
    .first()

keys_raw_customers = config_customers["keys"]
scd_type_raw_customers = config_customers["scd_type"]
description_customers = config_customers["description"]

business_keys_customer = json.loads(keys_raw_customers) if isinstance(keys_raw_customers, str) else keys_raw_customers
scd_type_customers = scd_type_map.get(str(scd_type_raw_customers).upper(), 0)

# —————————————————————————————————————————————————————————————————————————————————
#  2) DEFINICE FUNKCI PRO CISTENI DAT PRO CUSTOMERS
# —————————————————————————————————————————————————————————————————————————————————
def clean_last_name(value:str) -> str:
    """
    Remove school titles after last name and take last word in last name. May occur values like 'Jimmie Smith Phd'. 
    In this case we want just Smith.
    Args:
        value:str raw value of row in desired column
    Return:
        :str cleaned value of row in desired column
    """
    if value is None:
        return None
    forbidden_values = ['md','phd','dds']
    words = value.strip().split()
    words = [w.lower().capitalize() for w in words if w.lower() not in forbidden_values]
    return words[-1] if words else None


def clean_first_name(value:str) -> str:
    """
    Removes prefixes from first name like Mrs.,etc.. or if first name have only Mrs. or Mr. return NULL.
    Args:
        value:str raw value of row in desired column
    Return:
        :str cleaned value of row in desired column
    """

    if value is None:
        return None
    forbidden_values = ['mr','mrs','mr.','mrs.'] #if we have new prefixes, add here
    words = value.strip().split()
    words = [w.lower().capitalize() for w in words if w.lower() not in forbidden_values]
    return words[0] if words else None

#registrace udf funkcni pro pouziti v PySpark API
clean_last_name_udf = udf(clean_last_name, StringType())
clean_first_name_udf = udf(clean_first_name, StringType())
# —————————————————————————————————————————————————————————————————————————————————
#  3) TABULKA PRO URCENI DOBRE/SPATNE ZAZNAMY
# —————————————————————————————————————————————————————————————————————————————————
#ziskani expectations hodnot pro datovou kvalitu
customer_expectations_rules = get_table_expectations("customers",catalog,env)
customer_expectations_expr = "NOT({0})".format(" AND ".join(customer_expectations_rules.values()))

#vtvoreni tabulky kde se aplikuje datova kvalita
#to, jestli zaznam projde datovou kvalitou urcuje sloupec is_quarantined
@dlt.table(
    name="customers_clean_quarantine_rules",
    comment="apply expectations rules for customers",
    partition_cols =["is_quarantined"]
)
@dlt.expect_all(customer_expectations_rules)
def customer_data_clean_quarantine():
    df_customer = dlt.readStream(f"{catalog}.{bronze_schema}.customers_bronze")
    return (
        df_customer
        .withColumn("customer_id",trim("customer_id"))
        .withColumn("first_name", clean_first_name_udf(col("first_name")))
        .withColumn("last_name",clean_last_name_udf(col("last_name")))
        .withColumn("email", lower(trim(col("email"))))
        .withColumn("address_splt",split(trim(col("address")),','))
        .withColumn("address",concat(initcap(get("address_splt",0)),lit(','),initcap(get("address_splt",1)),lit(','),upper(get("address_splt",2))))
        .withColumn("income",col("income").cast(IntegerType()))
        .withColumn("contact_methods_raw", regexp_replace(col("preferences.contact_methods"), r'\\"', '"'))
        .withColumn("contact_methods", from_json(col("contact_methods_raw"), ArrayType(StringType()))) 
        .withColumn("preferred_language", trim(col("preferences.preferred_language"))) 
        .withColumn("newsletter_opt_in", col("preferences.newsletter_opt_in").cast(BooleanType())) 
        .withColumn("is_quarantined",expr(customer_expectations_expr))
        .drop("address_splt","contact_methods_raw","preferences","_rescued_data", "source_file", "ingestion_ts")
    )
# —————————————————————————————————————————————————————————————————————————————————
#  4) SPATNE X DOBRE ZAZNAMY
# —————————————————————————————————————————————————————————————————————————————————
#zapis dat do tabulky kde mame zaznamy ktere vyhovuji datove kvalite
@dlt.table(name='customers_clean_good_records',comment='customers cleanded and validate data')
def customer_clean():
    df_customer = dlt.readStream('customers_clean_quarantine_rules')
    return (
        df_customer
        .filter("is_quarantined=false")
        .drop("is_quarantined")
    )

#zapis dat do tabulky kde mame zaznamy ktere nevyhovuji datove kvalite
@dlt.table(name='customers_clean_bad_records',comment='customers cleaned and bad data')
def customer_clean():
    df_customer = dlt.readStream('customers_clean_quarantine_rules')
    return (
        df_customer
        .filter("is_quarantined=true")
        .drop("is_quarantined")
    )
# —————————————————————————————————————————————————————————————————————————————————
#  5) SCD2 DIMENZE 
# —————————————————————————————————————————————————————————————————————————————————
if scd_type_customers == 2:
    dlt.create_streaming_table(
        name="dim_customers",
        comment=description_customers,
        table_properties={"quality": "silver"}
        )
    dlt.apply_changes(
        target = "dim_customers",
        source = "customers_clean_good_records",
        keys = business_keys_customer,
        sequence_by = col("snapshot_date"),
        ignore_null_updates=False,
        stored_as_scd_type="2",
        track_history_except_column_list=['snapshot_date']
    )
else:
    raise ValueError(f"Nepodporovaný nebo chybějící SCD typ: {scd_type_raw_customers}")

# --------------------------------------------- agents -------------------------------------------------------------
# —————————————————————————————————————————————————————————————————————————————————
#  1) NACTENI METADATA PRO AGENTS
# —————————————————————————————————————————————————————————————————————————————————
config_agents = spark.table(f"{catalog}.config_{env}.table_lookup") \
    .filter(col("table_name") == "dim_agents") \
    .select("keys", "scd_type", "description") \
    .first()

keys_raw_agents = config_agents["keys"]
scd_type_raw_agents = config_agents["scd_type"]
description_agents = config_agents["description"]

business_keys_agents = json.loads(keys_raw_agents) if isinstance(keys_raw_agents, str) else keys_raw_agents
scd_type_agents = scd_type_map.get(str(scd_type_raw_agents).upper(), 0)

# —————————————————————————————————————————————————————————————————————————————————
#  2) DEFINICE FUNKCI PRO CISTENI DAT PRO AGENTS
# —————————————————————————————————————————————————————————————————————————————————
def clean_full_name(value:str) -> str:
    """
    Remove prefixes/sufixes from names.
    """
    forbidden_prefixes = ['mr.', 'mrs.', 'mr','mrs','dds','phd','md']
    if value == None:
        return None
    name_spl = value.strip().lower().split(" ")
    cleaned_name = [word.capitalize() for word in name_spl if word not in forbidden_prefixes]
    return ' '.join(cleaned_name)

#registrace udf funkcni pro pouziti v PySpark API
clean_full_name_udf = udf(clean_full_name, StringType())

# —————————————————————————————————————————————————————————————————————————————————
#  3) TABULKA PRO URCENI DOBRE/SPATNE ZAZNAMY - DATA KVALITA
# —————————————————————————————————————————————————————————————————————————————————
#ziskani expectations hodnot pro datovou kvalitu
agents_expectations_rules = get_table_expectations("agents",catalog,env)
agents_expectations_expr = "NOT({0})".format(" AND ".join(agents_expectations_rules.values()))

#vtvoreni tabulky kde se aplikuje datova kvalita
#to, jestli zaznam projde datovou kvalitou urcuje sloupec is_quarantined
@dlt.table(
    name="agents_clean_quarantine_rules",
    comment="apply expectations rules for agents",
    partition_cols =["is_quarantined"]
)
@dlt.expect_all(agents_expectations_rules)
def agents_data_clean_quarantine():
    df_agents = dlt.readStream(f"{catalog}.{bronze_schema}.agents_bronze")
    return (
        df_agents
        .withColumn("agent_id",trim(col("agent_id")))
        .withColumn("name",clean_full_name_udf(col("name")))
        .withColumn("name_splt",split(col("name")," "))
        .withColumn("first_name", when(isnull(col("name_splt")),None).when(size(col("name_splt"))==1,None).otherwise(concat_ws(" ", slice(col("name_splt"), 1, size(col("name_splt")) - 1))))
        .withColumn("last_name",when(isnull(col("name_splt")), None).when(size(col("name_splt"))==1,None).otherwise(col("name_splt")[size(col("name_splt")) - 1]))
        .withColumn("region",initcap(trim(col("region"))))
        .withColumn("email",lower(trim(col("email"))))
        .withColumn("start_date", col("start_date").cast(DateType()))
        .withColumn("languages_raw",regexp_replace(col("metadata.languages"), r'\\"', '"'))
        .withColumn("languages", from_json(col("languages_raw"),ArrayType(StringType())))
        .withColumn("certifications_raw",regexp_replace(col("metadata.certifications"), r'\\"', '"'))
        .withColumn("certifications", from_json(col("certifications_raw"),ArrayType(StringType())))
        .withColumn("is_quarantined",expr(agents_expectations_expr))
        .drop("name","metadata","name_splt","languages_raw","certifications_raw","_rescued_data", "source_file", "ingestion_ts")
    )
# —————————————————————————————————————————————————————————————————————————————————
#  4) SPATNE X DOBRE ZAZNAMY
# —————————————————————————————————————————————————————————————————————————————————
#zapis dat do tabulky kde mame zaznamy ktere vyhovuji datove kvalite
@dlt.table(name='agents_clean_good_records',comment='agents cleanded and validate data')
def customer_clean():
    df_customer = dlt.readStream('agents_clean_quarantine_rules')
    return (
        df_customer
        .filter("is_quarantined=false")
        .drop("is_quarantined")
    )

#zapis dat do tabulky kde mame zaznamy ktere nevyhovuji datove kvalite
@dlt.table(name='agents_clean_bad_records',comment='agents cleaned and bad data')
def customer_clean():
    df_customer = dlt.readStream('agents_clean_quarantine_rules')
    return (
        df_customer
        .filter("is_quarantined=true")
        .drop("is_quarantined")
    )

# —————————————————————————————————————————————————————————————————————————————————
#  5) SCD2 DIMENZE 
# —————————————————————————————————————————————————————————————————————————————————
if scd_type_customers == 2:
    dlt.create_streaming_table(
        name="dim_agents",
        comment=description_agents,
        table_properties={"quality": "silver"}
        )
    dlt.apply_changes(
        target = "dim_agents",
        source = "agents_clean_good_records",
        keys = business_keys_agents,
        sequence_by = col("snapshot_date"),
        ignore_null_updates=False,
        stored_as_scd_type="2",
        track_history_except_column_list=['snapshot_date']
    )
else:
    raise ValueError(f"Nepodporovaný nebo chybějící SCD typ: {scd_type_raw_customers}")

# ---------------------------------------------products-------------------------------------------------------------

# Load the table configuration from the config
config = spark.table(f"{catalog}.config_{env}.table_lookup") \
    .filter(col("table_name") == "dim_products") \
    .select("scd_type", "description") \
    .first()

# Validate that the configuration was found
if config is None:
    raise ValueError(f"Configuration for 'dim_products' not found in table {catalog}.config_{env}.table_lookup")

# Extract metadata values
description = config["description"]
scd_type_raw = config["scd_type"]

# Map SCD type from string to an integer value
scd_type_map = {
    "SCD1": 1,
    "SCD2": 2,
    "1": 1,
    "2": 2,
    None: 0,
    "": 0
}
scd_type = scd_type_map.get(str(scd_type_raw).upper(), 0)

# Define a streaming view to clean the bronze table
@dlt.view(name="products_bronze_clean")
def products_bronze_clean():
    df = spark.readStream.table(f"{catalog}.{bronze_schema}.products_bronze")

    cleaned_df = (
        df
        .filter(col("product_id").isNotNull())  
        .withColumn("product_id", trim(col("product_id").cast(StringType())))  
        .withColumn("product_name", initcap(trim(col("product_name").cast(StringType()))))  
        .withColumn("category", initcap(trim(col("category").cast(StringType()))))  
        .dropDuplicates(["product_id", "product_name", "category"]) 
        .drop("_rescued_data", "source_file", "ingestion_ts")  
    )

    return cleaned_df

# create Silver table if SCD type is 0 or 1 
if scd_type in (0, 1): 

    @dlt.table(
        name="dim_products",
        comment=description,
        spark_conf={"spark.databricks.delta.schema.autoMerge.enabled": "true"},  # Enable schema evolution
        table_properties={
            "delta.autoOptimize.optimizeWrite": "true",
            "delta.autoOptimize.autoCompact": "true",
            "pipelines.autoOptimize.zOrderCols": "product_id",  # Optimize queries on product_id
            "pipelines.reset.allowed": "true"  # Allow pipeline reset to overwrite table
        }
    )
    def dim_products():
        return dlt.read_stream("products_bronze_clean")  

else:
    raise ValueError(f"SCD type '{scd_type_raw}' is not supported for 'dim_products'")

# ---------------------------------------------FACT TABLES---------------------------------------------------------

# ----------------------------------------------- claims---------------------------------------------------------

# Load the table configuration from the config
config = spark.table(f"{catalog}.config_{env}.table_lookup") \
    .filter(col("table_name") == "fact_claims") \
    .select("scd_type", "description") \
    .first()

# Validate that the configuration was found
if config is None:
    raise ValueError(f"Configuration for 'fact_claims' not found in table {catalog}.config_{env}.table_lookup")

# Extract metadata values
description = config["description"]
scd_type_raw = config["scd_type"]

# Map SCD type from string to an integer value
scd_type_map = {
    "SCD1": 1,
    "SCD2": 2,
    "1": 1,
    "2": 2,
    None: 0,
    "": 0
}
scd_type = scd_type_map.get(str(scd_type_raw).upper(), 0)

# Define a streaming view to clean the bronze claims table
@dlt.view(name="claims_bronze_clean")
def claims_bronze_clean():
    df = spark.readStream.table(f"{catalog}.{bronze_schema}.claims_bronze")

    cleaned_df = (
        df
        .filter(col("claim_id").isNotNull()) 
        .withColumn("claim_id", trim(col("claim_id").cast(StringType()))) 
        .withColumn("policy_id", trim(col("policy_id").cast(StringType())))  
        .withColumn("agent_id", trim(col("agent_id").cast(StringType())))  
        .withColumn("claim_date", col("claim_date").cast(DateType())) 
        .withColumn("amount", col("amount").cast(IntegerType())) 
        .dropDuplicates(["claim_id"]) 
        .drop("_rescued_data", "source_file", "ingestion_ts")  
    )

    return cleaned_df

# Create Silver table for fact table (SCD type 0)
if scd_type == 0:
    @dlt.table(
        name="fact_claims",
        comment=description,
        spark_conf={"spark.databricks.delta.schema.autoMerge.enabled": "true"},  # Enable schema evolution
        table_properties={
            "delta.autoOptimize.optimizeWrite": "true",
            "delta.autoOptimize.autoCompact": "true",
            "pipelines.autoOptimize.zOrderCols": "claim_id",  # Optimize queries on claim_id
            "pipelines.reset.allowed": "true"  # Allow pipeline reset to overwrite table
        }
    )
    def fact_claims():
        return dlt.read_stream("claims_bronze_clean")

    # Define data quality expectations
    dlt.expect_or_fail("valid_claim_id", "claim_id IS NOT NULL")
    dlt.expect_or_fail("valid_policy_id", "policy_id IS NOT NULL")
    dlt.expect_or_fail("valid_agent_id", "agent_id IS NOT NULL")
    dlt.expect_or_fail("positive_amount", "amount > 0")
    dlt.expect_or_fail("valid_claim_date", "claim_date IS NOT NULL AND claim_date = CAST(CAST(claim_date AS STRING) AS DATE)")
else:
    raise ValueError(f"SCD type '{scd_type_raw}' is not supported for 'fact_claims'")


# ----------------------------------------------- premium ---------------------------------------------------------

# Load the table configuration from the config
config = spark.table(f"{catalog}.config_{env}.table_lookup") \
    .filter(col("table_name") == "fact_premium_transactions") \
    .select("scd_type", "description") \
    .first()

# Validate that the configuration was found
if config is None:
    raise ValueError(f"Configuration for 'fact_premium_transactions' not found in table {catalog}.config_{env}.table_lookup")

# Extract metadata values
description = config["description"]
scd_type_raw = config["scd_type"]

# Map SCD type from string to an integer value
scd_type_map = {
    "SCD1": 1,
    "SCD2": 2,
    "1": 1,
    "2": 2,
    None: 0,
    "": 0
}
scd_type = scd_type_map.get(str(scd_type_raw).upper(), 0)

# Define a streaming view to clean the bronze premium transactions table
@dlt.view(name="premium_transactions_bronze_clean")
def premium_transactions_bronze_clean():
    df = spark.readStream.table(f"{catalog}.{bronze_schema}.premium_transactions_bronze")

    cleaned_df = (
        df
        .filter(col("premium_txn_id").isNotNull())  
        .withColumn("premium_txn_id", trim(col("premium_txn_id").cast(StringType())))  
        .withColumn("policy_id", trim(col("policy_id").cast(StringType())))  
        .withColumn("agent_id", trim(col("agent_id").cast(StringType())))  
        .withColumn("due_date", col("due_date").cast(DateType())) 
        .withColumn("premium_amount", col("premium_amount").cast(DoubleType()))  
        .withColumn("paid_flag", col("paid_flag").cast(BooleanType())) 
        .withColumn("payment_date", col("payment_date").cast(DateType())) 
        .withColumn("snapshot_date", col("snapshot_date").cast(DateType()))
        .dropDuplicates(["premium_txn_id"])  
        .drop("_rescued_data", "source_file", "ingestion_ts")  
    )

    return cleaned_df

# Create Silver table for fact_premium_transactions table (SCD type 0)
if scd_type == 0:
    @dlt.table(
        name="fact_premium_transactions",
        comment=description,
        spark_conf={"spark.sql.databricks.delta.schema.autoMerge.enabled": "true"}, # Enable schema evolution
        table_properties={
            "delta.autoOptimize.optimizeWrite": "true",
            "delta.autoOptimize.autoCompact": "true",
            "pipelines.autoOptimize.zOrderCols": "premium_txn_id",  # Optimize queries on premium_txn_id
            "pipelines.reset.allowed": "true"  # Allow pipeline reset to overwrite table
        }
    )
    def fact_premium_transactions():
        return dlt.read_stream("premium_transactions_bronze_clean")

    # Define data quality expectations
    dlt.expect_or_fail("valid_premium_txn_id", "premium_txn_id IS NOT NULL")
    dlt.expect_or_fail("valid_policy_id", "policy_id IS NOT NULL")
    dlt.expect_or_fail("valid_agent_id", "agent_id IS NOT NULL")
    dlt.expect_or_fail("positive_premium_amount", "premium_amount > 0")
    dlt.expect_or_fail("valid_due_date", "due_date IS NOT NULL AND due_date = CAST(CAST(due_date AS STRING) AS DATE)")
    dlt.expect_or_fail("valid_paid_flag", "paid_flag IS NOT NULL")
    dlt.expect("valid_payment_date", "paid_flag = FALSE OR (paid_flag = TRUE AND payment_date IS NOT NULL AND payment_date = CAST(CAST(payment_date AS STRING) AS DATE))")
    dlt.expect_or_fail("valid_snap_date", "snapshot_date IS NOT NULL AND snapshot_date = CAST(CAST(snapshot_date AS STRING) AS DATE)")

else:
    raise ValueError(f"SCD type '{scd_type_raw}' is not supported for 'fact_premium_transactions'")