In [0]:
%run /Workspace/Users/marutidodmani.cloud02@gmail.com/generic_connecters

In [0]:
%run /Workspace/Users/marutidodmani.cloud02@gmail.com/generic_transformation

In [0]:
spark_connector()

In [0]:
#Reading all Silver layer files
patient_data=read_silver_csv_data("patient_S")
hospital_data=read_silver_csv_data("hospital_S")
claims_data=read_silver_csv_data("claims_S")
disease_data=read_silver_csv_data("disease_S")
group_data=read_silver_csv_data("group_S")
subgroup_data=read_silver_csv_data("subgroup_S")
subscribe_data=read_silver_csv_data("subscriber_S")

In [0]:
dim_subscriber = subscribe_data.select(
    col("sub_id").alias("SubscriberKey"),
    col("first_name"),
    col("last_name"),
    col("Street"),
    col("Gender"),
    col("Country"),
    col("City"),
    col("Zip_Code"),
    col("Subgrp_id"),
    col("Elig_ind"),
    col("eff_date"),
    col("term_date"),
    col("Patient_age")
).dropDuplicates(["SubscriberKey"])

In [0]:
dim_subgroup = subgroup_data.select(
    col("subgrp_id").alias("SubgroupKey"),
    col("subgrp_name"),
    col("monthly_premium")
).dropDuplicates(["SubgroupKey"])


In [0]:
dim_patient = patient_data.select(
    col("patient_id").alias("PatientKey"),
    col("patient_name"),
    col("patient_gender"),
    col("disease_name"),
    col("city"),
    col("hospital_id"),
    col("patient_age")
).dropDuplicates(["PatientKey"])


In [0]:
dim_hospital = hospital_data.select(
    col("hospital_id").alias("HospitalKey"),
    col("hospital_name"),
    col("city"),
    col("state"),
    col("country")
).dropDuplicates(["HospitalKey"])


In [0]:
dim_group = group_data.select(
    col("grp_id").alias("GroupKey"),
    col("grp_name"),
    col("grp_type"),
    col("country"),
    col("city"),
    col("premium_written"),
    col("zip_code")
).dropDuplicates(["GroupKey"])


In [0]:
dim_disease = disease_data.select(
    col("disease_id").alias("DiseaseKey"),
    col("disease_name"),
    col("subgrp_id")
).dropDuplicates(["DiseaseKey"])


In [0]:
fact_claims = claims_data.alias("c") \
    .join(dim_subscriber.alias("s"), col("c.sub_id") == col("s.SubscriberKey"), "left") \
    .join(dim_patient.alias("p"), col("c.patient_id") == col("p.PatientKey"), "left") \
    .join(dim_disease.alias("d"), col("c.disease_name") == col("d.disease_name"), "left") \
    .join(dim_hospital.alias("h"), col("p.hospital_id") == col("h.HospitalKey"), "left") \
    .join(dim_subgroup.alias("sg"), col("s.Subgrp_id") == col("sg.SubgroupKey"), "left") \
    .select(
        col("c.claim_id").alias("ClaimKey"),
        col("s.SubscriberKey"),
        col("p.PatientKey"),
        col("h.HospitalKey"),
        col("sg.SubgroupKey"),
        col("d.DiseaseKey"),
        col("c.claim_amount"),
        col("c.claim_date"),
        col("c.claim_type"),
        col("c.Claim_Or_Rejected")
    )


In [0]:
display(fact_claims)

In [0]:
claim_status_summary = fact_claims.groupBy("Claim_Or_Rejected") \
    .agg(count("ClaimKey").alias("claim_count"),
         sum("claim_amount").alias("total_amount"))


In [0]:
monthly_claims = fact_claims.withColumn("month", date_format("claim_date", "yyyy-MM")) \
    .groupBy("month") \
    .agg(
        count("ClaimKey").alias("claim_count"),
        sum("claim_amount").alias("total_claim_amount")
    )


In [0]:
avg_claim_per_disease = fact_claims.groupBy("DiseaseKey") \
    .agg(avg("claim_amount").alias("avg_claim_amount"))


In [0]:
from pyspark.sql.functions import *
# Flattened Gold table
gold_df = fact_claims.alias("f") \
    .join(dim_subscriber.alias("s"), col("f.SubscriberKey") == col("s.SubscriberKey"), "left") \
    .join(dim_patient.alias("p"), col("f.PatientKey") == col("p.PatientKey"), "left") \
    .join(dim_hospital.alias("h"), col("f.HospitalKey") == col("h.HospitalKey"), "left") \
    .join(dim_subgroup.alias("sg"), col("f.SubgroupKey") == col("sg.SubgroupKey"), "left") \
    .join(dim_disease.alias("d"), col("f.DiseaseKey") == col("d.DiseaseKey"), "left") \
    .join(dim_group.alias("g"), col("sg.SubgroupKey") == col("g.GroupKey"), "left") \
    .select(
        col("f.ClaimKey"),
        col("f.claim_date"),
        col("f.claim_type"),
        col("f.claim_amount"),
        col("f.Claim_Or_Rejected"),
        col("s.SubscriberKey"),
        col("s.first_name").alias("SubscriberFirstName"),
        col("s.last_name").alias("SubscriberLastName"),
        col("s.Street"),
        col("s.Gender").alias("SubscriberGender"),
        col("s.City").alias("SubscriberCity"),
        col("s.Country").alias("SubscriberCountry"),
        col("s.Zip_Code"),
        col("s.Elig_ind"),
        col("s.eff_date"),
        col("s.term_date"),
        col("s.Patient_age").alias("SubscriberAge"),
        col("p.PatientKey"),
        col("p.patient_name"),
        col("p.patient_gender"),
        col("p.patient_age"),
        col("p.disease_name").alias("PatientDisease"),
        col("h.HospitalKey"),
        col("h.hospital_name"),
        col("h.city").alias("HospitalCity"),
        col("h.state").alias("HospitalState"),
        col("h.country").alias("HospitalCountry"),
        col("sg.SubgroupKey"),
        col("sg.subgrp_name"),
        col("sg.monthly_premium"),
        col("g.GroupKey"),
        col("g.grp_name"),
        col("g.grp_type"),
        col("g.country").alias("GroupCountry"),
        col("g.city").alias("GroupCity"),
        col("g.premium_written"),
        col("g.zip_code").alias("GroupZipCode")
    )

In [0]:
display(gold_df)

In [0]:
write2gold (dim_patient,"dim_patient_G")
write2gold (dim_hospital,"dim_hospital_G")
write2gold (dim_subscriber,"dim_subscriber_G")
write2gold (dim_group,"dim_group_G")
write2gold (dim_subgroup,"dim_subgroup_G")
write2gold (dim_disease,"dim_disease_G")
write2gold (fact_claims,"fact_claims_G")
write2gold (gold_df,"Optum_G")

In [0]:
dim_patient.write.format("delta").mode("overwrite").saveAsTable("optum.Gold.dim_patient_data")
dim_hospital.write.format("delta").mode("overwrite").saveAsTable("optum.Gold.dim_hospital")
dim_subscriber.write.format("delta").mode("overwrite").saveAsTable("optum.Gold.dim_subscriber")
dim_group.write.format("delta").mode("overwrite").saveAsTable("optum.Gold.dim_group")
dim_subgroup.write.format("delta").mode("overwrite").saveAsTable("optum.Gold.dim_subgroup")
dim_disease.write.format("delta").mode("overwrite").saveAsTable("optum.Gold.dim_disease")
fact_claims.write.format("delta").mode("overwrite").saveAsTable("optum.Gold.fact_claims")
gold_df.write.format("delta").mode("overwrite").saveAsTable("optum.Gold.optum_data")

In [0]:
display(avg_claim_per_disease)