In [0]:
import dlt
from pyspark.sql.functions import (
    col, expr, to_date, year, current_date, upper, trim, regexp_replace, when, lit, sha2, concat_ws, lower
)
from pyspark.sql.types import (
    StringType, IntegerType, DoubleType, DateType, TimestampType
)

# --- Silver claims  ---
@dlt.table(
    name="silver_claims_batch",
    comment="Cleaned and confirmed claims with data quality checks (from batch source).",
    table_properties={"quality": "silver"}
)
def silver_claims_batch():
    bronze_claims_df = dlt.read("bronze_claims_batch")

    # Remove nulls and invalid records
    cleaned_claims_df = bronze_claims_df.filter(
        (bronze_claims_df.ClaimID.isNotNull()) &
        (bronze_claims_df.MemberID.isNotNull()) &
        (bronze_claims_df.ProviderID.isNotNull()) &
        (bronze_claims_df.ClaimID != "") &
        (bronze_claims_df.MemberID != "") &
        (bronze_claims_df.ProviderID != "")
    )

    # Deduplicate on ClaimID, MemberID, ProviderID
    final_claims_df = cleaned_claims_df.dropDuplicates(["ClaimID", "MemberID", "ProviderID"])

    return final_claims_df

# --- Silver members  ---
@dlt.table(
    name="silver_members_batch",
    comment="Cleaned and confirmed members with data quality checks (from batch source).",
    table_properties={"quality": "silver"}
)
def silver_members_batch():
    bronze_members_df = dlt.read("bronze_members_batch")

    # Remove nulls and invalid records
    cleaned_members_df = bronze_members_df.filter(
        (bronze_members_df.MemberID.isNotNull()) &
        (bronze_members_df.Name.isNotNull()) &
        (bronze_members_df.MemberID != "") &
        (bronze_members_df.Name != "")
    )

    # Deduplicate on MemberID
    final_members_df = cleaned_members_df.dropDuplicates(["MemberID"])

    return final_members_df

# --- Silver providers  ---
@dlt.table(
    name="silver_providers_batch",
    comment="Cleaned and confirmed providers with data quality checks (from batch source).",
    table_properties={"quality": "silver"}
)
def silver_providers_batch():
    bronze_providers_df = dlt.read("bronze_providers_batch")

    # Parse JSON columns
    from pyspark.sql.functions import col, from_json, schema_of_json

    # Extract individual fields from Locations
    final_providers_df = bronze_providers_df.withColumn("Address", col("Locations.Address")) \
                                            .withColumn("City", col("Locations.City")) \
                                            .withColumn("State", col("Locations.State")) \
                                            .drop("Locations")

    return final_providers_df

# --- Silver diagnosis_ref  ---
@dlt.table(
    name="silver_diagnosis_ref_batch",
    comment="Cleaned and confirmed diagnosis reference data with data quality checks (from batch source).",
    table_properties={"quality": "silver"}
)
def silver_diagnosis_ref_batch():
    bronze_diagnosis_ref_df = dlt.read("bronze_diagnosis_ref_batch")

    # Remove nulls and invalid records
    cleaned_diagnosis_ref_df = bronze_diagnosis_ref_df.filter(
        (bronze_diagnosis_ref_df.Code.isNotNull()) &
        (bronze_diagnosis_ref_df.Description.isNotNull()) &
        (bronze_diagnosis_ref_df.Code != "") &
        (bronze_diagnosis_ref_df.Description != "")
    )

    # Deduplicate on Code
    final_diagnosis_ref_df = cleaned_diagnosis_ref_df.dropDuplicates(["Code"])

    return final_diagnosis_ref_df

# --- Silver claims (streaming) ---
@dlt.table(
    name="silver_claims_stream",
    comment="Cleaned and confirmed claims with data quality checks (from streaming source).",
    table_properties={"quality": "silver"}
)
def silver_claims_stream():
    bronze_claims_stream_df = dlt.read_stream("bronze_claims_stream")

    # Remove nulls and invalid records
    cleaned_claims_stream_df = bronze_claims_stream_df.filter(
        (bronze_claims_stream_df.ClaimID.isNotNull()) &
        (bronze_claims_stream_df.MemberID.isNotNull()) &
        (bronze_claims_stream_df.ProviderID.isNotNull()) &
        (bronze_claims_stream_df.ClaimID != "") &
        (bronze_claims_stream_df.MemberID != "") &
        (bronze_claims_stream_df.ProviderID != "")
    )

    # Deduplicate on ClaimID, MemberID, ProviderID
    final_claims_stream_df = cleaned_claims_stream_df.dropDuplicates(["ClaimID", "MemberID", "ProviderID"])

    return final_claims_stream_df


