In [0]:
from pyspark.sql import SparkSession, functions as f

claims_df=spark.read.csv("/mnt/landing/claimsdata/*.csv",header=True)

claims_df = claims_df.withColumn(
    "datasource",
    f.when(f.input_file_name().contains("hospital1"), "hosa").when(f.input_file_name().contains("hospital2"), "hosb")
     .otherwise(None)
)

In [0]:
#writing claims data into bronze layer 
claims_df.write.format("parquet").mode("overwrite").save("/mnt/bronze/claims/")

In [0]:
claims_df.createOrReplaceTempView("claims")

In [0]:
%sql
CREATE OR REPLACE TEMP VIEW data_quality_check AS
SELECT 
  CONCAT(ClaimID, '-', datasource) AS ClaimID,
  ClaimID AS SRC_ClaimID,
  TransactionID,
  PatientID,
  EncounterID,
  ProviderID,
  DeptID,
  CAST(ServiceDate AS DATE) AS ServiceDate,
  CAST(ClaimDate AS DATE) AS ClaimDate,
  PayorID,
  ClaimAmount,
  PaidAmount,
  ClaimStatus,
  PayorType,
  Deductible,
  Coinsurance,
  Copay,
  CAST(InsertDate AS DATE) AS SRC_InsertDate,
  CAST(ModifiedDate AS DATE) AS SRC_ModifiedDate,
  datasource,
  CASE 
    WHEN ClaimID IS NULL OR TransactionID IS NULL OR PatientID IS NULL OR ServiceDate IS NULL THEN TRUE
    ELSE FALSE
  END AS is_quarantined
FROM claims;


In [0]:
%sql 
select * from data_quality_check limit 10

ClaimID,SRC_ClaimID,TransactionID,PatientID,EncounterID,ProviderID,DeptID,ServiceDate,ClaimDate,PayorID,ClaimAmount,PaidAmount,ClaimStatus,PayorType,Deductible,Coinsurance,Copay,SRC_InsertDate,SRC_ModifiedDate,datasource,is_quarantined
CLAIM000001-hosb,CLAIM000001,TRANS001819,HOSP1-001494,ENC001805,PROV0242,DEPT007,2024-08-26,2024-10-05,UnitedHealthcare,4600.39,3228.9,Approved,Government,101.31,99.56,45.99,2023-01-13,2020-01-10,hosb,False
CLAIM000002-hosb,CLAIM000002,TRANS003384,HOSP1-000208,ENC001914,PROV0487,DEPT002,2024-05-03,2024-04-23,Medicare,4987.94,3969.49,Paid,Private,125.13,96.39,30.87,2024-10-03,2023-02-08,hosb,False
CLAIM000003-hosb,CLAIM000003,TRANS006265,HOSP1-004702,ENC002918,PROV0302,DEPT002,2024-04-04,2024-08-19,Medicaid,421.04,2023.9,Denied,Private,42.43,3.14,37.67,2024-03-06,2020-05-25,hosb,False
CLAIM000004-hosb,CLAIM000004,TRANS008629,HOSP1-002778,ENC000910,PROV0498,DEPT015,2024-04-21,2024-10-15,Aetna,1475.53,3557.28,Rejected,Private,60.78,166.32,42.04,2020-01-17,2022-01-16,hosb,False
CLAIM000005-hosb,CLAIM000005,TRANS007129,HOSP1-000436,ENC008163,PROV0004,DEPT012,2024-03-09,2024-01-05,UnitedHealthcare,2019.09,1546.3,Approved,Self-pay,195.19,102.23,6.64,2023-02-25,2022-05-24,hosb,False
CLAIM000006-hosb,CLAIM000006,TRANS009296,HOSP1-001684,ENC005220,PROV0045,DEPT017,2024-02-28,2024-01-16,Aetna,4019.34,2527.53,Rejected,Private,130.93,189.18,49.97,2021-08-29,2020-02-24,hosb,False
CLAIM000007-hosb,CLAIM000007,TRANS003160,HOSP1-000678,ENC002566,PROV0476,DEPT015,2024-02-11,2024-02-25,UnitedHealthcare,3354.98,4352.07,Rejected,Government,59.69,121.43,17.11,2020-02-08,2024-06-23,hosb,False
CLAIM000008-hosb,CLAIM000008,TRANS002785,HOSP1-003734,ENC007357,PROV0435,DEPT019,2024-06-29,2024-10-03,BlueCross,4922.57,3273.42,Rejected,Government,396.21,120.76,35.28,2021-04-06,2020-11-19,hosb,False
CLAIM000009-hosb,CLAIM000009,TRANS003904,HOSP1-002037,ENC002867,PROV0210,DEPT003,2024-06-11,2024-09-20,UnitedHealthcare,4961.89,2913.81,Paid,Private,409.01,59.76,39.75,2022-08-27,2024-10-28,hosb,False
CLAIM000010-hosb,CLAIM000010,TRANS009210,HOSP1-002245,ENC009378,PROV0387,DEPT017,2024-08-28,2024-07-19,UnitedHealthcare,259.14,2877.6,Rejected,Government,488.06,3.43,45.88,2020-12-11,2024-10-03,hosb,False


In [0]:
%sql
CREATE TABLE IF NOT EXISTS silver.claims (
  ClaimID STRING,
  SRC_ClaimID STRING,
  TransactionID STRING,
  PatientID STRING,
  EncounterID STRING,
  ProviderID STRING,
  DeptID STRING,
  ServiceDate DATE,
  ClaimDate DATE,
  PayorID STRING,
  ClaimAmount STRING,
  PaidAmount STRING,
  ClaimStatus STRING,
  PayorType STRING,
  Deductible STRING,
  Coinsurance STRING,
  Copay STRING,
  SRC_InsertDate DATE,
  SRC_ModifiedDate DATE,
  datasource STRING,
  is_quarantined BOOLEAN,
  audit_insertdate TIMESTAMP,
  audit_modifieddate TIMESTAMP,
  is_current BOOLEAN
)
USING DELTA;


In [0]:
%sql
MERGE INTO silver.claims AS target
USING data_quality_check AS source
ON target.ClaimID = source.ClaimID AND target.is_current = true

-- Step 1: If a match is found but data has changed → mark old record inactive
WHEN MATCHED AND (
    target.SRC_ClaimID != source.SRC_ClaimID OR
    target.TransactionID != source.TransactionID OR
    target.PatientID != source.PatientID OR
    target.EncounterID != source.EncounterID OR
    target.ProviderID != source.ProviderID OR
    target.DeptID != source.DeptID OR
    target.ServiceDate != source.ServiceDate OR
    target.ClaimDate != source.ClaimDate OR
    target.PayorID != source.PayorID OR
    target.ClaimAmount != source.ClaimAmount OR
    target.PaidAmount != source.PaidAmount OR
    target.ClaimStatus != source.ClaimStatus OR
    target.PayorType != source.PayorType OR
    target.Deductible != source.Deductible OR
    target.Coinsurance != source.Coinsurance OR
    target.Copay != source.Copay OR
    target.SRC_InsertDate != source.SRC_InsertDate OR
    target.SRC_ModifiedDate != source.SRC_ModifiedDate OR
    target.datasource != source.datasource OR
    target.is_quarantined != source.is_quarantined
) THEN UPDATE SET
    target.is_current = false,
    target.audit_modifieddate = current_timestamp()

-- Step 2: If no active version exists (new ClaimID or changed), insert new record
WHEN NOT MATCHED BY TARGET THEN
  INSERT (
    ClaimID,
    SRC_ClaimID,
    TransactionID,
    PatientID,
    EncounterID,
    ProviderID,
    DeptID,
    ServiceDate,
    ClaimDate,
    PayorID,
    ClaimAmount,
    PaidAmount,
    ClaimStatus,
    PayorType,
    Deductible,
    Coinsurance,
    Copay,
    SRC_InsertDate,
    SRC_ModifiedDate,
    datasource,
    is_quarantined,
    audit_insertdate,
    audit_modifieddate,
    is_current
  )
  VALUES (
    source.ClaimID,
    source.SRC_ClaimID,
    source.TransactionID,
    source.PatientID,
    source.EncounterID,
    source.ProviderID,
    source.DeptID,
    source.ServiceDate,
    source.ClaimDate,
    source.PayorID,
    source.ClaimAmount,
    source.PaidAmount,
    source.ClaimStatus,
    source.PayorType,
    source.Deductible,
    source.Coinsurance,
    source.Copay,
    source.SRC_InsertDate,
    source.SRC_ModifiedDate,
    source.datasource,
    source.is_quarantined,
    current_timestamp(),
    current_timestamp(),
    true
  );


num_affected_rows,num_updated_rows,num_deleted_rows,num_inserted_rows
20000,0,0,20000
