# 10 – Delta Live Tables Pipeline (Finance Small)
Attach as a DLT pipeline source, or run in a DLT-enabled workspace cell.

In [None]:
import dlt
from pyspark.sql import functions as F
from pyspark.sql.window import Window

BRONZE = f"{CATALOG}.{SCHEMA_BRONZE}"
SILVER = f"{CATALOG}.{SCHEMA_SILVER}"
GOLD   = f"{CATALOG}.{SCHEMA_GOLD}"
RAW_DROP = VOLUME_URI_RAW
REF_DROP = VOLUME_URI_REF

@dlt.table(name="br_transactions_raw", comment="Raw PaySim transactions (small banking-like dataset)")
@dlt.expect_or_drop("amount_non_negative", "amount >= 0")
def br_transactions_raw():
    return (spark.readStream.format("cloudFiles")
            .option("cloudFiles.format", "csv")
            .option("header", True)
            .option("inferSchema", True)
            .load(RAW_DROP + "/paysim/")
            .withColumnRenamed("nameOrig", "customer_id")
            .withColumn("txn_id", F.monotonically_increasing_id().cast("string"))
            .withColumn("ts", F.expr("timestampadd(HOUR, step, to_timestamp('2025-01-01'))"))
            .withColumn("_ingest_ts", F.current_timestamp()))

@dlt.table(name="br_customers_raw", comment="Raw customers (optional upload)")
@dlt.expect_or_drop("dob_present", "dob IS NOT NULL")
def br_customers_raw():
    return (spark.readStream.format("cloudFiles")
            .option("cloudFiles.format", "csv")
            .option("header", True)
            .option("inferSchema", True)
            .load(RAW_DROP + "/customers/")
            .withColumn("_ingest_ts", F.current_timestamp()))

@dlt.table(name="br_watchlists_raw", comment="Sanctions (OFAC SDN) + PEP (OpenSanctions lite)")
@dlt.expect("has_name_or_alias", "name IS NOT NULL OR alias IS NOT NULL")
def br_watchlists_raw():
    ofac = (spark.read.format("csv").option("header", True)
              .load(REF_DROP + "/ofac/sdn.csv")
              .select(F.col("name").alias("name"), F.lit("SANCTIONS").alias("list_type"),
                      F.lit("OFAC SDN").alias("list_name"), F.lit(1.0).alias("match_score")))
    pep = (spark.read.format("csv").option("header", True)
              .load(REF_DROP + "/pep/pep.csv")
              .select(F.col("name").alias("name"), F.lit("PEP").alias("list_type"),
                      F.lit("OpenSanctions").alias("list_name"), F.lit(1.0).alias("match_score")))
    return (ofac.unionByName(pep)
                 .withColumn("_ingest_ts", F.current_timestamp()))

@dlt.table(name="si_customers", comment="Cleaned customers")
@dlt.expect("email_format", "email RLIKE '^[^@\\s]+@[^@\\s]+\\.[^@\\s]+$'")
def si_customers():
    df = dlt.read_stream("br_customers_raw")
    return (df
        .withColumn("name", F.initcap(F.trim("name")))
        .withColumn("country", F.upper(F.trim("country")))
        .withColumn("dob", F.to_date("dob"))
        .withColumn("pep_flag", F.coalesce(F.col("pep_flag").cast("boolean"), F.lit(False)))
        .dropDuplicates(["customer_id"]))

@dlt.table(name="si_entities_resolved", comment="Canonical entity_id derived from customer attributes")
@dlt.expect("has_identity", "name IS NOT NULL AND dob IS NOT NULL")
def si_entities_resolved():
    c = dlt.read_stream("si_customers")
    entity_id = F.sha2(F.concat_ws("|", F.col("name"), F.col("dob"), F.coalesce(F.col("national_id"), F.lit(""))), 256)
    return (c.withColumn("entity_id", entity_id)
             .select("entity_id", "customer_id", "name", "dob", "country", "pep_flag"))

@dlt.table(name="si_transactions", comment="Clean transactions with entity mapping (from PaySim)")
@dlt.expect_or_drop("ts_present", "ts IS NOT NULL")
@dlt.expect_or_drop("has_customer", "customer_id IS NOT NULL")
def si_transactions():
    t = dlt.read_stream("br_transactions_raw")
    e = dlt.read("si_entities_resolved")
    return (t.join(e.select("customer_id", "entity_id"), on="customer_id", how="left")
              .withColumn("amount", F.col("amount").cast("double"))
              .withColumn("direction", F.when(F.col("type").isin("CASH_IN","PAYMENT"), "IN").otherwise("OUT"))
              .withColumn("country", F.lit("IN")))

@dlt.table(name="si_watch_hits", comment="Exact/ilike match to sanctions/PEP (demo)")
@dlt.expect("name_available", "name IS NOT NULL")
def si_watch_hits():
    e = dlt.read("si_entities_resolved")
    w = dlt.read("br_watchlists_raw")
    return (e.join(w, F.upper(e.name) == F.upper(w.name), "left")
             .select("entity_id", w.list_type.alias("list_type"), w.list_name.alias("list_name"), w.match_score, w._ingest_ts))

@dlt.table(name="go_entity_features", comment="Entity-level features for risk scoring")
def go_entity_features():
    e = dlt.read("si_entities_resolved")
    t = dlt.read("si_transactions")
    w = dlt.read("si_watch_hits")

    w30 = Window.partitionBy("entity_id").orderBy(F.col("ts").cast("long")).rangeBetween(-30*24*3600, 0)

    txn = (t
      .withColumn("tx_count_30d", F.count("txn_id").over(w30))
      .withColumn("net_flow_30d", F.sum(F.when(F.col("direction")=="IN", F.col("amount")).otherwise(-F.col("amount"))).over(w30))
      .groupBy("entity_id")
      .agg(F.max("tx_count_30d").alias("tx_count_30d"), F.max("net_flow_30d").alias("net_flow_30d")))

    watch = (w.groupBy("entity_id")
               .agg(F.count(F.when(F.col("list_type")=="SANCTIONS", True)).alias("sanctions_hits_30d"),
                    F.count(F.when(F.col("list_type")=="PEP", True)).alias("pep_hits_30d")))

    geo_risk = (e.select("entity_id", "country")
                  .withColumn("geo_risk", F.when(F.col("country").isin("IR", "KP", "SY"), F.lit("HIGH"))
                                             .when(F.col("country").isin("IN", "BR", "ZA"), F.lit("MEDIUM"))
                                             .otherwise(F.lit("LOW"))))

    return (e.select("entity_id", "pep_flag")
              .join(txn, "entity_id", "left")
              .join(watch, "entity_id", "left")
              .join(geo_risk, "entity_id", "left")
              .withColumn("sanctions_hits_30d", F.coalesce("sanctions_hits_30d", F.lit(0)))
              .withColumn("tx_count_30d", F.coalesce("tx_count_30d", F.lit(0)))
              .withColumn("net_flow_30d", F.coalesce("net_flow_30d", F.lit(0.0)))
              .withColumn("geo_risk", F.coalesce("geo_risk", F.lit("LOW"))))

> Use DLT **Triggered (batch)** + **Photon ON** for low cost.