In [0]:
spark.sparkContext._jsc.hadoopConfiguration().set("fs.s3a.access.key", "AKIASR7CXDKDU4Q5YRNS")
spark.sparkContext._jsc.hadoopConfiguration().set("fs.s3a.secret.key", "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")


In [0]:
# Cell 2: ── Define input / output paths ────────────────────────────────────────
INPUT_BASE  = "s3a://cpastonebucket111/inputdata"
OUTPUT_BASE = "s3a://cpastonebucket111/outputdata"


In [0]:
# Cell 3: ── Load raw tables ─────────────────────────────────────────────────────
disease_df     = spark.read.csv(f"{INPUT_BASE}/disease.csv",     header=True, inferSchema=True)
claims_df      = spark.read.json(f"{INPUT_BASE}/claims.json")
grpsubgrp_df   = spark.read.csv(f"{INPUT_BASE}/grpsubgrp.csv", header=True, inferSchema=True)
group_df       = spark.read.csv(f"{INPUT_BASE}/group.csv",      header=True, inferSchema=True)
hospital_df    = spark.read.csv(f"{INPUT_BASE}/hospital.csv",   header=True, inferSchema=True)
patient_df     = spark.read.csv(f"{INPUT_BASE}/Patient_records.csv", header=True, inferSchema=True)
subgroup_df    = spark.read.csv(f"{INPUT_BASE}/subgroup.csv",   header=True, inferSchema=True)


In [0]:
# Cell 4: ── Inspect schemas & sample data ──────────────────────────────────────
for df,name in [
    (disease_df,    "disease"),
    (claims_df,     "claims"),
    (hospital_df, "hospital")
]:
    print(f"--- {name} schema ---")
    df.printSchema()
    df.show(3, truncate=False)


--- disease schema ---
root
 |-- SubGrpID: string (nullable = true)
 |--  Disease_ID: integer (nullable = true)
 |-- Disease_name: string (nullable = true)

+--------+-----------+------------+
|SubGrpID| Disease_ID|Disease_name|
+--------+-----------+------------+
|S101    |110001     |Beriberi    |
|S101    |110002     |Scurvy      |
|S101    |110003     |Goitre      |
+--------+-----------+------------+
only showing top 3 rows

--- claims schema ---
root
 |-- Claim_Or_Rejected: string (nullable = true)
 |-- SUB_ID: string (nullable = true)
 |-- claim_amount: string (nullable = true)
 |-- claim_date: string (nullable = true)
 |-- claim_id: long (nullable = true)
 |-- claim_type: string (nullable = true)
 |-- disease_name: string (nullable = true)
 |-- patient_id: long (nullable = true)

+-----------------+----------+------------+----------+--------+----------------+--------------+----------+
|Claim_Or_Rejected|SUB_ID    |claim_amount|claim_date|claim_id|claim_type      |disease_name  

In [0]:
# Cell 5: ── Clean Claims ───────────────────────────────────────────────────
from pyspark.sql.functions import col

claims_clean = (
  claims_df
    .dropDuplicates()
    .na.drop(subset=["claim_id","disease_name","claim_amount"])
    .withColumn("claim_amount", col("claim_amount").cast("int"))
    .withColumnRenamed("Claim_Or_Rejected","claim_rejected_flag")
)

# Quick peek
claims_clean.show(3, truncate=False)


+-------------------+----------+------------+----------+--------+----------------+--------------+----------+
|claim_rejected_flag|SUB_ID    |claim_amount|claim_date|claim_id|claim_type      |disease_name  |patient_id|
+-------------------+----------+------------+----------+--------+----------------+--------------+----------+
|NaN                |SUBID10001|151142      |1970-03-16|1       |claims of policy|Bladder cancer|112766    |
|NaN                |SUBID10003|143120      |1995-02-08|3       |claims of fact  |Suicide       |133424    |
|NaN                |SUBID10002|59924       |2008-02-03|2       |claims of value |Kidney cancer |199252    |
+-------------------+----------+------------+----------+--------+----------------+--------------+----------+
only showing top 3 rows



In [0]:
# Cell 6: ── Use-Case 1: Disease with MAX number of claims ─────────────────
uc1 = (
  claims_clean
    .groupBy("disease_name")
    .count()
    .orderBy(col("count").desc())
    .limit(1)
)

# Show & write
uc1.show()
uc1.write.mode("overwrite") \
    .parquet(f"{OUTPUT_BASE}/uc01_max_claims_by_disease")


+------------+-----+
|disease_name|count|
+------------+-----+
| Pet allergy|    3|
+------------+-----+



In [0]:
# Cell 7: ── Clean Hospital & Patient ──────────────────────────────────────
hospital_clean = (
  hospital_df
    .dropDuplicates()
    .na.drop(subset=["Hospital_id"])
    .withColumnRenamed("Hospital_id","hospital_id")
    .withColumnRenamed("Hospital_name","hospital_name")
)

patient_clean = (
  patient_df
    .dropDuplicates()
    .na.drop(subset=["Patient_id","hospital_id"])
    .withColumnRenamed("Patient_id","patient_id")
    .withColumn("patient_id", col("patient_id").cast("int"))
)

# Peek
hospital_clean.show(2,truncate=False)
patient_clean.show(3,truncate=False)


+-----------+---------------------------------------+---------+----------+-------+
|hospital_id|hospital_name                          |city     |state     |country|
+-----------+---------------------------------------+---------+----------+-------+
|H1002      |The Christian Medical College          |Vellore  |Tamil Nadu|India  |
|H1000      |All India Institute of Medical Sciences|New Delhi|NaN       |India  |
+-----------+---------------------------------------+---------+----------+-------+
only showing top 2 rows

+----------+------------+--------------+------------------+--------------+-------------+------------+-----------+
|patient_id|Patient_name|patient_gender|patient_birth_date|patient_phone |disease_name |city        |hospital_id|
+----------+------------+--------------+------------------+--------------+-------------+------------+-----------+
|187158    |Harbir      |Female        |1924-06-30        |+91 0112009318|Galactosemia |Rourkela    |H1001      |
|199252    |Ujjawal  

In [0]:
# Cell 8: ── Use-Case 2: Hospital serving MOST patients ────────────────────
from pyspark.sql.functions import countDistinct

uc2 = (
  patient_clean
    .groupBy("hospital_id")
    .agg(countDistinct("patient_id").alias("num_patients"))
    .orderBy(col("num_patients").desc())
    .limit(1)
    .join(hospital_clean, on="hospital_id", how="left")
    .select("hospital_id","hospital_name","num_patients")
)

# Show & write
uc2.show()
uc2.write.mode("overwrite") \
    .parquet(f"{OUTPUT_BASE}/uc02_hospital_most_patients")


+-----------+-----------------+------------+
|hospital_id|    hospital_name|num_patients|
+-----------+-----------------+------------+
|      H1017|Manipal Hospitals|           9|
+-----------+-----------------+------------+

