In [0]:
# ────────────────────────────────────────────────────────────────
# 0 · CONFIG & PATHS  ➜  edit volume_base if you used another name
# ────────────────────────────────────────────────────────────────
spark.conf.set("spark.sql.shuffle.partitions", 200)

volume_base      = "/Volumes/workspace/default/my_volume"   # <— CHANGE if needed
file_path        = f"{volume_base}/diabetic_data.csv"
export_base      = f"{volume_base}/exports"
partitioned_path = f"{volume_base}/diabetic_delta"


In [0]:
# ────────────────────────────────────────────────────────────────
# 1 · DATA INGESTION
# ────────────────────────────────────────────────────────────────
raw_df = (spark.read
          .option("header", True)
          .option("inferSchema", True)
          .csv(file_path))

raw_df.printSchema()
raw_df.show(5)


root
 |-- encounter_id: integer (nullable = true)
 |-- patient_nbr: integer (nullable = true)
 |-- race: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- age: string (nullable = true)
 |-- weight: string (nullable = true)
 |-- admission_type_id: integer (nullable = true)
 |-- discharge_disposition_id: integer (nullable = true)
 |-- admission_source_id: integer (nullable = true)
 |-- time_in_hospital: integer (nullable = true)
 |-- payer_code: string (nullable = true)
 |-- medical_specialty: string (nullable = true)
 |-- num_lab_procedures: integer (nullable = true)
 |-- num_procedures: integer (nullable = true)
 |-- num_medications: integer (nullable = true)
 |-- number_outpatient: integer (nullable = true)
 |-- number_emergency: integer (nullable = true)
 |-- number_inpatient: integer (nullable = true)
 |-- diag_1: string (nullable = true)
 |-- diag_2: string (nullable = true)
 |-- diag_3: string (nullable = true)
 |-- number_diagnoses: integer (nullable = true)
 |-

In [0]:
# ────────────────────────────────────────────────────────────────
# 2 · DATA CLEANSING
# ────────────────────────────────────────────────────────────────
from pyspark.sql import functions as F

string_cols = [c for c,t in raw_df.dtypes if t == "string"]
clean_df = raw_df.select(
    *[F.when(F.col(c)=="?", None).otherwise(F.col(c)).alias(c) for c in string_cols],
    *[c for c,t in raw_df.dtypes if t != "string"]
).filter(~F.col("gender").isin("Unknown/Invalid"))


In [0]:
# ────────────────────────────────────────────────────────────────
# 3 · DATA TRANSFORMATION
# ────────────────────────────────────────────────────────────────
admission_type_map = {
    1:"Emergency",2:"Urgent",3:"Elective",4:"Newborn",
    5:"Not Available",6:"NULL",7:"Trauma Center",8:"Not Mapped"
}
map_expr = F.create_map([F.lit(x) for kv in admission_type_map.items() for x in kv])

tf_df = (clean_df
         .withColumn("admission_type", map_expr[F.col("admission_type_id")])
         .withColumn("total_visits",
                     F.col("number_outpatient")+F.col("number_emergency")+F.col("number_inpatient"))
         .withColumn("readmission_flag",
                     F.when(F.col("readmitted").isin("<30",">30"),1).otherwise(0)))


In [0]:
# ────────────────────────────────────────────────────────────────
# 4 · WRITE TO DELTA + REGISTER TABLE
# ────────────────────────────────────────────────────────────────
spark.sql("DROP TABLE IF EXISTS workspace.default.diabetic_clean")   # clean slate

(tf_df.write
     .format("delta")
     .mode("overwrite")
     .partitionBy("readmission_flag")
     .saveAsTable("workspace.default.diabetic_clean"))


In [0]:
# ────────────────────────────────────────────────────────────────
# 5 · ANALYTICAL SUMMARIES
# ────────────────────────────────────────────────────────────────
los_by_age = (tf_df.groupBy("age")
                     .agg(F.round(F.avg("time_in_hospital"),2).alias("avg_days_in_hosp"))
                     .orderBy("age"))

diag1_rates = (tf_df.groupBy("diag_1")
                      .agg(F.mean("readmission_flag").alias("readmit_rate"),
                           F.count("*").alias("cases"))
                      .filter("cases >= 500")
                      .orderBy(F.desc("readmit_rate")))

insulin_a1c = (tf_df.groupBy("insulin","A1Cresult")
                      .agg(F.mean("readmission_flag").alias("readmit_rate"),
                           F.count("*").alias("cases"))
                      .orderBy(F.desc("readmit_rate")))


In [0]:
# ────────────────────────────────────────────────────────────────
# 6 · EXPORT SUMMARIES AS CSV (inside the same volume)
# ────────────────────────────────────────────────────────────────
(los_by_age .coalesce(1).write.mode("overwrite").option("header",True)
            .csv(f"{export_base}/los_by_age"))

(diag1_rates.coalesce(1).write.mode("overwrite").option("header",True)
            .csv(f"{export_base}/diag1_rates"))

(insulin_a1c.coalesce(1).write.mode("overwrite").option("header",True)
            .csv(f"{export_base}/insulin_a1c"))
