In [0]:
%python
# Don't run this...
# DLT pipeline for Gold layer transformations
from dlt import *
from pyspark.sql.functions import *
from pyspark.sql.types import *

# GOLD TABLE 1: Claims Enriched with Analytics
@dlt.table(
  name="claims_analytics",
  comment="Enriched claims data with fraud scoring, provider/member details for analytics",
  table_properties={
    "quality": "gold", 
    "pipelines.autoOptimize.managed": "true"
  }
)
@dlt.expect_or_drop("valid_claim_amount", "claim_amount IS NOT NULL AND claim_amount > 0")
def claims_analytics():
    # Read from silver layer
    claims = dlt.read("leomar.2silver.claims_batch_and_stream")

    members = spark.table("leomar.2silver.members")
    providers = spark.table("leomar.2silver.providers")
    locations = spark.table("leomar.2silver.provider_locations")
    diagnoses = spark.table("leomar.2silver.diagnosis")
    
    # Fraud scoring UDF (simple example - expand based on business rules)
    @udf(FloatType())
    def calculate_fraud_score(amount, diagnosis_code, provider_type):
        score = 0.0
        if amount is not None and amount > 10000: score += 0.3
        if diagnosis_code in ['E119', 'I10']: score += 0.2  # Common codes
        if provider_type == 'Specialist': score += 0.1
        return float(score)
    
    # Enrich and transform
    return (
        claims
        # Ensure claim_amount is of correct type
        .withColumn("claim_amount", col("claim_amount").cast(FloatType()))
        
        # Join with dimension tables
        .join(members, "member_id", "left")
        .join(providers, "provider_id", "left") 
        .join(locations, "provider_id", "left") 
        .join(diagnoses, "diagnosis_code", "left")
        
        # Add fraud scoring
        .withColumn("fraud_risk_score", 
                   calculate_fraud_score(col("claim_amount"), 
                                        col("diagnosis_code"),
                                        col("tin")))
        
        # Categorize claims
        .withColumn("claim_severity",
                   when(col("claim_amount") < 1000, "Low")
                   .when(col("claim_amount") < 5000, "Medium")
                   .otherwise("High"))
        
        # Add analytics columns
        .withColumn("processing_month", date_format(col("ingestion_date"), "yyyy-MM"))
        .withColumn("member_age", 
                   floor(months_between(current_date(), col("birth_date")) / 12))
        .withColumn("name_parts", split(trim(col("member_name")), " "))
        .withColumn("first_name", col("name_parts").getItem(0))
        .withColumn("last_name", col("name_parts").getItem(1))
        # Select final columns for analytics
        .select(
            "claim_id", "member_id", "provider_id", "diagnosis_code",
            "claim_amount", "claim_date", "ingestion_date", "processing_month",
            "fraud_risk_score", "claim_severity", "member_age",
            "tin", "provider_name", "diagnosis_description", "City",
            "first_name", "last_name"
        )
    )

# GOLD TABLE 2: Provider Performance Dashboard
@dlt.table(
  name="provider_performance",
  comment="Aggregated provider performance metrics for compliance reporting",
  table_properties={"quality": "gold"}
)
def provider_performance():
    claims = dlt.read("claims_analytics")
    
    return (
        claims
        .groupBy("provider_id", "provider_name", "tin", "City")
        .agg(
            count("claim_id").alias("total_claims"),
            sum("claim_amount").alias("total_amount"),
            avg("claim_amount").alias("avg_claim_amount"),
            avg("fraud_risk_score").alias("avg_fraud_score"),
            count(when(col("fraud_risk_score") > 0.7, 1)).alias("high_risk_claims")
        )
        .withColumn("reporting_period", date_format(current_date(), "yyyy-MM"))
        .withColumn("last_updated", current_timestamp())
    )

# GOLD TABLE 3: Member Claims Summary
@dlt.table(
  name="member_claims_summary", 
  comment="Member-level claims summary for customer analytics",
  table_properties={"quality": "gold"}
)
def member_claims_summary():
    claims = dlt.read("claims_analytics")
    
    return (
        claims
        .groupBy("member_id", "first_name", "last_name")
        .agg(
            count("claim_id").alias("claims_count"),
            sum("claim_amount").alias("total_claimed"),
            max("claim_date").alias("last_claim_date"),
            avg("fraud_risk_score").alias("member_risk_score")
        )
        .withColumn("summary_period", date_format(current_date(), "yyyy-MM-dd"))
    )

# GOLD TABLE 4: Real-time Fraud Alerts
@dlt.table(
  name="realtime_fraud_alerts",
  comment="Real-time fraud detection alerts from streaming data",
  table_properties={"quality": "gold"}
)
def realtime_fraud_alerts():
    claims = dlt.read_stream("leomar.2silver.claims_stream")
    
    return (
        claims
        .filter(
            (col("claim_amount") > 15000) |
            (col("diagnosis_code").isin(["E119", "I10", "M545"])) |
            (col("provider_id").isin(["PROV999", "PROV888"]))
        )
        .withColumn("alert_timestamp", current_timestamp())
        .withColumn("alert_severity", 
                   when(col("claim_amount") > 20000, "Critical")
                   .otherwise("High"))
        .withColumn("alert_reason",
                   when(col("claim_amount") > 15000, "High amount")
                   .when(col("diagnosis_code").isin(["E119", "I10", "M545"]), "Suspicious diagnosis")
                   .otherwise("Watchlist provider"))
        .select("claim_id", "member_id", "provider_id", "claim_amount", 
                "diagnosis_code", "alert_timestamp", "alert_severity", "alert_reason")
    )

# GOLD TABLE 5: Historical Fraud Alerts (from batch)
@dlt.table(
  name="historical_fraud_alerts",
  comment="Historical fraud detection alerts from batch data",
  table_properties={"quality": "gold"}
)
def historical_fraud_alerts():
    claims = dlt.read("leomar.2silver.claims_batch")
    
    return (
        claims
        .filter(
            (col("claim_amount") > 15000) |
            (col("diagnosis_code").isin(["E119", "I10", "M545"])) |
            (col("provider_id").isin(["PROV999", "PROV888"]))
        )
        .withColumn("alert_timestamp", current_timestamp())
        .withColumn("alert_severity", 
                   when(col("claim_amount") > 20000, "Critical")
                   .otherwise("High"))
        .withColumn("alert_reason",
                   when(col("claim_amount") > 15000, "High amount")
                   .when(col("diagnosis_code").isin(["E119", "I10", "M545"]), "Suspicious diagnosis")
                   .otherwise("Watchlist provider"))
        .select("claim_id", "member_id", "provider_id", "claim_amount", 
                "diagnosis_code", "alert_timestamp", "alert_severity", "alert_reason")
    )