In [0]:
from pyspark.sql import functions as F
from delta.tables import DeltaTable
from config.config import *
from config.schemas import *

In [0]:
def upsert_to_delta(df_source, target_table, merge_condition):
    """
    Upserts data from a Spark DataFrame into a Delta table.

    Parameters:
    df_source (DataFrame): Source Spark DataFrame to upsert.
    target_table (str): Name of the target Delta table.
    merge_condition (str): SQL merge condition for matching records.

    Behavior:
    - If the target Delta table exists, performs a merge (upsert) using the provided condition.
      Inserts all records from the source DataFrame that do not match the target.
    - If the target table does not exist, creates it and overwrites with the source DataFrame.

    Example:
        upsert_to_delta(
            df_source=df,
            target_table="my_delta_table",
            merge_condition="t.id = s.id"
        )
    """
    if spark.catalog.tableExists(target_table):
        delta = DeltaTable.forName(spark, target_table)
        (
            delta
                .alias("t")
                .merge(df_source.alias("s"), merge_condition)
                .whenNotMatchedInsertAll()
                .execute()
        )
    else:
        df_source.write.format("delta").mode("overwrite").saveAsTable(target_table)

In [0]:
df_claims = spark.table(st_fact_claims_merged)
df_claims.display()

In [0]:
# ------------------------------
# Clean and transform st_claims_merged table
# ------------------------------

# Load st_claims_merged table
df_claims = spark.table(st_fact_claims_merged)

# Remove nulls
df_claims = (
    df_claims
        .filter(F.col("ClaimID").isNotNull())
        .filter(F.col("MemberID").isNotNull())
        .filter(F.col("ProviderID").isNotNull())
)

# Cast to correct data types
df_claims = (
    df_claims
        .withColumn("ClaimDate", F.to_date(F.col("ClaimDate"), "yyyy-MM-dd"))
        .withColumn("Amount", F.col("Amount").cast("double"))
)

# Convert string list to an array
df_claims = (df_claims
    .withColumn("ICD10Codes", F.split(F.col("ICD10Codes"), ";"))
    .withColumn("CPTCodes", F.split(F.col("CPTCodes"), ";"))
)

# Check data quality
df_claims = (df_claims
    .withColumn("dq_amount_valid", F.when(F.col("Amount") > 0, True).otherwise(False))
    .withColumn("dq_has_member", F.col("MemberID").isNotNull())
)

# Foreign Key Validation
members = spark.table(bt_ref_members).select("MemberID").distinct()
providers = spark.table(bt_ref_providers).select("ProviderID").distinct()

df_claims = (
    df_claims
        .join(members, "MemberID", "left")
            .withColumn("dq_member_valid", F.col("MemberID").isNotNull())
        .join(providers, "ProviderID", "left")
            .withColumn("dq_provider_valid", F.col("ProviderID").isNotNull())
)

# Create ID for deduplication
df_claims = df_claims.withColumn(
    "dedupe_hash",
    F.sha2(F.concat_ws("||", F.col("ClaimID"), F.col("MemberID"), F.col("ProviderID")), 256)
)

upsert_to_delta(
    df_claims, target_table=st_fact_claims_enriched,
    merge_condition="t.ClaimID = s.ClaimID AND t.dedupe_hash = s.dedupe_hash"
)

In [0]:
# ------------------------------
# Clean and transform bt_diagnosis table
# ------------------------------

# Load bt_ref_diagnosis table
df_diagnosis = spark.table(bt_ref_diagnosis)

# Remove nulls
df_diagnosis = (
    df_diagnosis
        .filter(F.col("Code").isNotNull())
        .filter(F.col("Description").isNotNull())
)

# Perform upsert to target delta table
upsert_to_delta(
    df_diagnosis, target_table=st_ref_diagnosis_clean,
    merge_condition="t.Code = s.Code"
)

In [0]:
# ------------------------------
# Clean and transform bt_ref_members table
# ------------------------------

# Load bt_ref_members table
df_members = spark.table(bt_ref_members)

# Remove nulls
df_members = (
    df_members
        .filter(F.col("MemberID").isNotNull())
        .filter(F.col("PlanType").isNotNull())
        .filter(F.col("Name").isNotNull())
)

# Check if valid email
df_members = (
    df_members.withColumn("dq_email_valid", 
        F.col("Email").rlike(r"^[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}$"))
)

# Perform upsert to target delta table
upsert_to_delta(
    df_members, target_table=st_ref_members_clean,
    merge_condition="t.MemberID = s.MemberID"
)

In [0]:
# ------------------------------
# Clean and transform bt_ref_providers table
# ------------------------------

# Load bt_ref_providers table
df_providers = spark.table(bt_ref_providers)

# Remove nulls
df_providers = (
    df_providers
        .filter(F.col("ProviderID").isNotNull())
        .filter(F.col("Name").isNotNull())
)

# Create separate columns for location
df_providers = (
    df_providers.select(
        "*",
        F.col("Locations")["Address"].alias("Address"),
        F.col("Locations")["City"].alias("City"),
        F.col("Locations")["State"].alias("State")
    )
)
df_providers = df_providers.drop("Locations")

# Perform upsert to target delta table
upsert_to_delta(
    df_providers, target_table=st_ref_providers_clean,
    merge_condition="t.ProviderID = s.ProviderID"
)