In [0]:
import dlt
from pyspark.sql.functions import *

#load members csv to Bronze
@dlt.table(
    comment="Raw member data loaded from CSV",
    table_properties={
        "quality": "bronze",
        "pipelines.autoOptimize.managed": "true"
    }
)
def bronze_members():
    df = spark.read.format("csv") \
        .option("header", "true") \
        .option("inferSchema", "true") \
        .load("/Volumes/capstone_hospital/data/raw/members.csv")
    return df


#load diagnosis csv to Bronze
@dlt.table(
    comment="Raw diagnosis data loaded from CSV",
    table_properties={
        "quality": "bronze",
        "pipelines.autoOptimize.managed": "true"
    }
)
def bronze_diagnosis():
    df = spark.read.format("csv") \
        .option("header", "true") \
        .option("inferSchema", "true") \
        .load("/Volumes/capstone_hospital/data/raw/diagnosis_ref.csv")
    return df


#load claim csv to Bronze
@dlt.table(
    comment="Raw claim data loaded from CSV",
    table_properties={
        "quality": "bronze",
        "pipelines.autoOptimize.managed": "true"
    }
)
def bronze_claim():
    df = spark.read.format("csv") \
        .option("header", "true") \
        .option("inferSchema", "true") \
        .load("/Volumes/capstone_hospital/data/raw/claims_batch.csv")
    return df



[0;31m---------------------------------------------------------------------------[0m
[0;31mModuleNotFoundError[0m                       Traceback (most recent call last)
File [0;32m<command-4760633582222195>, line 1[0m
[0;32m----> 1[0m [38;5;28;01mimport[39;00m [38;5;21;01mdlt[39;00m
[1;32m      2[0m [38;5;28;01mfrom[39;00m [38;5;21;01mpyspark[39;00m[38;5;21;01m.[39;00m[38;5;21;01msql[39;00m[38;5;21;01m.[39;00m[38;5;21;01mfunctions[39;00m [38;5;28;01mimport[39;00m col
[1;32m      4[0m [38;5;129m@dlt[39m[38;5;241m.[39mtable(
[1;32m      5[0m   name[38;5;241m=[39m[38;5;124m"[39m[38;5;124mcapstone_hospital.bronze_members[39m[38;5;124m"[39m,
[1;32m      6[0m   comment[38;5;241m=[39m[38;5;124m"[39m[38;5;124mRaw members data ingested to Bronze layer[39m[38;5;124m"[39m,
[0;32m   (...)[0m
[1;32m     10[0m )
[1;32m     11[0m [38;5;28;01mdef[39;00m [38;5;21mbronze_members[39m():

File [0;32m/databricks/python_shell/lib/dbruntime/au

In [0]:
# Load claims stream JSON to Bronze
@dlt.table(
    comment="Raw claims stream data loaded from JSON",
    table_properties={
        "quality": "bronze",
        "pipelines.autoOptimize.managed": "true"
    }
)
def bronze_claims_stream():
    return (
        spark.read.format("json")
        .load("/Volumes/capstone_hospital/data/raw/claims_stream.json"))

# Load providers JSON to Bronze
@dlt.table(
    comment="Raw providers stream data loaded from JSON",
    table_properties={
        "quality": "bronze",
        "pipelines.autoOptimize.managed": "true"
    }
)
def bronze_providers():
    return (
        spark.read.format("json")
        .load("/Volumes/capstone_hospital/data/raw/providers.json")
    )



In [0]:
# Silver view: member : cleaned data with quality checks
@dlt.table(
  name="silver_members",
  comment="Cleaned and deduplicated Silver sales data",
   table_properties={
        "quality": "silver",
        "pipelines.autoOptimize.managed": "true"
    }
)
def silver_members():
    df = dlt.read("bronze_members")
    return df.filter(col("memberID").isNotNull()).dropDuplicates(["memberID"])

# Silver view: diagnosis : cleaned data with quality checks
@dlt.table(
    name="silver_diagnosis",
    comment="Cleaned data with quality checks",
    table_properties={
        "quality": "silver",
        "pipelines.autoOptimize.managed": "true"
    }
)
def silver_diagnosis():
    return dlt.read("bronze_diagnosis").filter(col("code").isNotNull()).dropDuplicates(["code"])

# Silver view: diagnosis : cleaned data with quality checks
@dlt.table(
    name="silver_claim",
    comment="Cleaned data with quality checks",
    table_properties={
        "quality": "silver",
        "pipelines.autoOptimize.managed": "true"
    }
)
def silver_claim():
    return dlt.read("bronze_claim").filter(col("ClaimID").isNotNull()).dropDuplicates(["ClaimID"])    

 # Silver view: claims stream : cleaned data with quality checks
@dlt.table(
    name="silver_claims_stream",
    comment="Cleaned data with quality checks",
    table_properties={
        "quality": "silver",
        "pipelines.autoOptimize.managed": "true"
    }
)
def silver_claim():
    return dlt.read("bronze_claims_stream").filter(col("ClaimID").isNotNull()).dropDuplicates(["ClaimID"])    

 # Silver view: provider : cleaned data with quality checks
@dlt.table(
    name="silver_providers",
    comment="Cleaned data with quality checks",
    table_properties={
        "quality": "silver",
        "pipelines.autoOptimize.managed": "true"
    }
)
def silver_claim():
    return dlt.read("bronze_providers").filter(col("ProviderID").isNotNull()).dropDuplicates(["ProviderID"])    

In [0]:
@dlt.table(
    name="gold_fraud_detection",
    comment="Filtered and joined data for fraud detection"
)
def gold_fraud_detection():
    # Load the tables
    silver_claim = spark.read.table("capstone_hospital.default.silver_claim")
    silver_diagnosis = spark.read.table("capstone_hospital.default.silver_diagnosis")
    silver_members = spark.read.table("capstone_hospital.default.silver_members")

    # Explode ICD10Codes into multiple rows
    exploded_claims = silver_claim.withColumn("ICD10Code", explode(split(col("ICD10Codes"), ",")))

    # Filter rows where ClaimDate is more than 24 hours before ServiceDate
    filtered_claims = exploded_claims.filter(datediff(col("ClaimDate"), col("ServiceDate")) > 1)

    # Join the tables
    result = filtered_claims.join(silver_members, "MemberID") \
                            .join(silver_diagnosis, filtered_claims["ICD10Code"] == silver_diagnosis["code"])

    return result