In [1]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

In [2]:
pd.set_option('display.max_columns', None)

In [3]:
from pyspark.sql import SparkSession

In [4]:
# create a spark session
spark = SparkSession.builder.appName("HospitalReadmissionEDA").getOrCreate()

In [5]:
# Load the Data into a Spark DataFrame
data_path = '../data/diabetic_data.csv'
df_spark = spark.read.csv(data_path, header=True)

# Initial Inspection with PySpark
print(f"Number of rows: {df_spark.count()}")
print(f"Number of columns: {len(df_spark.columns)}")

# Show the first 5 rows
df_spark.show(5)

# Print the schema that Spark inferred
df_spark.printSchema()

Number of rows: 101766
Number of columns: 50
+------------+-----------+---------------+------+-------+------+-----------------+------------------------+-------------------+----------------+----------+--------------------+------------------+--------------+---------------+-----------------+----------------+----------------+------+------+------+----------------+-------------+---------+---------+-----------+-----------+--------------+-----------+-------------+---------+---------+-----------+------------+-------------+--------+--------+------------+----------+-------+-----------+-------+-------------------+-------------------+------------------------+-----------------------+----------------------+------+-----------+----------+
|encounter_id|patient_nbr|           race|gender|    age|weight|admission_type_id|discharge_disposition_id|admission_source_id|time_in_hospital|payer_code|   medical_specialty|num_lab_procedures|num_procedures|num_medications|number_outpatient|number_emergency|number_

In [6]:
from pyspark.sql.functions import col, when

In [7]:
# create a list of all column names
all_columns = df_spark.columns

In [8]:
# loop through each column to replace? with null
df_cleaned = df_spark 
for column in all_columns:
    df_cleaned = df_cleaned.withColumn(column, 
                                      when(col(column) == "?", None).otherwise(col(column))
                                      )
print("Replaced '?' with null values.")

Replaced '?' with null values.


In [9]:
df_cleaned.show(5)

+------------+-----------+---------------+------+-------+------+-----------------+------------------------+-------------------+----------------+----------+--------------------+------------------+--------------+---------------+-----------------+----------------+----------------+------+------+------+----------------+-------------+---------+---------+-----------+-----------+--------------+-----------+-------------+---------+---------+-----------+------------+-------------+--------+--------+------------+----------+-------+-----------+-------+-------------------+-------------------+------------------------+-----------------------+----------------------+------+-----------+----------+
|encounter_id|patient_nbr|           race|gender|    age|weight|admission_type_id|discharge_disposition_id|admission_source_id|time_in_hospital|payer_code|   medical_specialty|num_lab_procedures|num_procedures|num_medications|number_outpatient|number_emergency|number_inpatient|diag_1|diag_2|diag_3|number_diagnos

In [10]:
df_cleaned.select('race').show(10)

+---------------+
|           race|
+---------------+
|      Caucasian|
|      Caucasian|
|AfricanAmerican|
|      Caucasian|
|      Caucasian|
|      Caucasian|
|      Caucasian|
|      Caucasian|
|      Caucasian|
|      Caucasian|
+---------------+
only showing top 10 rows


In [11]:
from pyspark.sql.functions import count, lit
# get total number of rows
total_rows = df_cleaned.count()
total_rows

101766

In [12]:
# calculate missing values count and percentage for each column
missing = df_cleaned.select([(count(when(col(c).isNull(), c))/total_rows*100).alias(c) for c in all_columns])
missing.toPandas()

Unnamed: 0,encounter_id,patient_nbr,race,gender,age,weight,admission_type_id,discharge_disposition_id,admission_source_id,time_in_hospital,payer_code,medical_specialty,num_lab_procedures,num_procedures,num_medications,number_outpatient,number_emergency,number_inpatient,diag_1,diag_2,diag_3,number_diagnoses,max_glu_serum,A1Cresult,metformin,repaglinide,nateglinide,chlorpropamide,glimepiride,acetohexamide,glipizide,glyburide,tolbutamide,pioglitazone,rosiglitazone,acarbose,miglitol,troglitazone,tolazamide,examide,citoglipton,insulin,glyburide-metformin,glipizide-metformin,glimepiride-pioglitazone,metformin-rosiglitazone,metformin-pioglitazone,change,diabetesMed,readmitted
0,0.0,0.0,2.233555,0.0,0.0,96.858479,0.0,0.0,0.0,0.0,39.557416,49.082208,0.0,0.0,0.0,0.0,0.0,0.0,0.020636,0.351787,1.398306,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0


In [13]:
# List of columns to drop
cols_to_drop = [
    'weight',              # 96.9% missing
    'payer_code',          # 39.6% missing
    'medical_specialty',   # 49.1% missing
    'encounter_id',        # Identifier
    'patient_nbr',         # Identifier
]

# Drop the columns
df_reduced = df_cleaned.drop(*cols_to_drop)

# Verify the change
print(f"Original number of columns: {len(df_cleaned.columns)}")
print(f"Number of columns after dropping: {len(df_reduced.columns)}")

Original number of columns: 50
Number of columns after dropping: 45


In [14]:
df_cleaned.select('readmitted').distinct().show()

+----------+
|readmitted|
+----------+
|       >30|
|        NO|
|       <30|
+----------+



In [15]:
df_reduced.select('diag_1').distinct().show()

+------+
|diag_1|
+------+
|   296|
|   451|
|   853|
|   800|
|250.01|
|   447|
|   591|
|     7|
|   574|
|   475|
|   718|
|   307|
|   577|
|   581|
|   205|
|   747|
|   334|
|   462|
|   711|
|   647|
+------+
only showing top 20 rows


In [16]:
from pyspark.sql.functions import when, col

# We map three strings to three numbers.
df_reduced_with_target = df_reduced.withColumn('label',
    when(col('readmitted') == 'NO', 0.0)
    .when(col('readmitted') == '<30', 1.0)
    .when(col('readmitted') == '>30', 2.0)
    .otherwise(None) # Should not happen, but good practice
).na.drop(subset=["label"]).drop('readmitted')

df_reduced_with_target = df_reduced_with_target.withColumn("label", col("label").cast("double"))

# Create the new simplified diagnosis category column
df_engineered_with_target = df_reduced_with_target.withColumn('diag_1_category',
    when(col('diag_1').like('250%'), 'Diabetes')
    .when((col('diag_1') >= '390') & (col('diag_1') <= '459') | (col('diag_1') == '785'), 'Circulatory')
    .when((col('diag_1') >= '460') & (col('diag_1') <= '519') | (col('diag_1') == '786'), 'Respiratory')
    .when((col('diag_1') >= '520') & (col('diag_1') <= '579') | (col('diag_1') == '787'), 'Digestive')
    .when((col('diag_1') >= '800') & (col('diag_1') <= '999'), 'Injury')
    .when((col('diag_1') >= '710') & (col('diag_1') <= '739'), 'Musculoskeletal')
    .when((col('diag_1') >= '580') & (col('diag_1') <= '629') | (col('diag_1') == '788'), 'Genitourinary')
    .when((col('diag_1') >= '140') & (col('diag_1') <= '239'), 'Neoplasms')
    .otherwise('Other')
)

# Show the result of our new feature
print("Distribution of the new 'diag_1_category' feature:")
df_engineered_with_target.groupBy('diag_1_category').count().orderBy('count', ascending=False).show()

Distribution of the new 'diag_1_category' feature:
+---------------+-----+
|diag_1_category|count|
+---------------+-----+
|    Circulatory|30490|
|          Other|17955|
|    Respiratory|14465|
|      Digestive| 9543|
|       Diabetes| 8757|
|         Injury| 7046|
|  Genitourinary| 5118|
|Musculoskeletal| 4957|
|      Neoplasms| 3435|
+---------------+-----+



In [17]:
# Find the most frequent value (the mode) for the 'race' column
mode_race = df_engineered_with_target.groupBy('race').count().orderBy('count', ascending=False).first()[0]
print(f"The most frequent race is: '{mode_race}'. We will use this to fill missing values.")

# Fill the nulls with the mode
df_imputed_with_target = df_engineered_with_target.fillna({'race': mode_race})

# We'll also fill nulls in our new diag_1_category with 'Other'. For simplicity, we'll now drop the original diag columns
df_imputed_with_target = df_imputed_with_target.fillna({'diag_1_category': 'Other'}).drop('diag_1', 'diag_2', 'diag_3')

df_imputed_with_target.printSchema()

The most frequent race is: 'Caucasian'. We will use this to fill missing values.
root
 |-- race: string (nullable = false)
 |-- gender: string (nullable = true)
 |-- age: string (nullable = true)
 |-- admission_type_id: string (nullable = true)
 |-- discharge_disposition_id: string (nullable = true)
 |-- admission_source_id: string (nullable = true)
 |-- time_in_hospital: string (nullable = true)
 |-- num_lab_procedures: string (nullable = true)
 |-- num_procedures: string (nullable = true)
 |-- num_medications: string (nullable = true)
 |-- number_outpatient: string (nullable = true)
 |-- number_emergency: string (nullable = true)
 |-- number_inpatient: string (nullable = true)
 |-- number_diagnoses: string (nullable = true)
 |-- max_glu_serum: string (nullable = true)
 |-- A1Cresult: string (nullable = true)
 |-- metformin: string (nullable = true)
 |-- repaglinide: string (nullable = true)
 |-- nateglinide: string (nullable = true)
 |-- chlorpropamide: string (nullable = true)
 |-- 

In [18]:
# Separate columns into categorical and numerical lists
# The target variable 'readmitted_binary' is separate
categorical_cols = [
    'race', 'gender', 'age', 'admission_type_id', 'discharge_disposition_id',
    'admission_source_id', 'max_glu_serum', 'A1Cresult', 'metformin',
    'repaglinide', 'nateglinide', 'chlorpropamide', 'glimepiride',
    'acetohexamide', 'glipizide', 'glyburide', 'tolbutamide', 'pioglitazone',
    'rosiglitazone', 'acarbose', 'miglitol', 'troglitazone', 'tolazamide',
    'examide', 'citoglipton', 'insulin', 'glyburide-metformin', 'glipizide-metformin',
    'glimepiride-pioglitazone', 'metformin-rosiglitazone', 'metformin-pioglitazone',
    'change', 'diabetesMed', 'diag_1_category'
]

numerical_cols = [
    'time_in_hospital', 'num_lab_procedures', 'num_procedures',
    'num_medications', 'number_outpatient', 'number_emergency',
    'number_inpatient', 'number_diagnoses'
]

target_col = 'label'

# Create a new DataFrame with the correct types since the numerical columns are string
df_final = df_imputed_with_target
for c in numerical_cols:
    df_final = df_final.withColumn(c, col(c).cast('integer'))

print("\nFinal number of columns before modeling:", len(df_final.columns))
df_final.printSchema()


Final number of columns before modeling: 43
root
 |-- race: string (nullable = false)
 |-- gender: string (nullable = true)
 |-- age: string (nullable = true)
 |-- admission_type_id: string (nullable = true)
 |-- discharge_disposition_id: string (nullable = true)
 |-- admission_source_id: string (nullable = true)
 |-- time_in_hospital: integer (nullable = true)
 |-- num_lab_procedures: integer (nullable = true)
 |-- num_procedures: integer (nullable = true)
 |-- num_medications: integer (nullable = true)
 |-- number_outpatient: integer (nullable = true)
 |-- number_emergency: integer (nullable = true)
 |-- number_inpatient: integer (nullable = true)
 |-- number_diagnoses: integer (nullable = true)
 |-- max_glu_serum: string (nullable = true)
 |-- A1Cresult: string (nullable = true)
 |-- metformin: string (nullable = true)
 |-- repaglinide: string (nullable = true)
 |-- nateglinide: string (nullable = true)
 |-- chlorpropamide: string (nullable = true)
 |-- glimepiride: string (nullabl

In [19]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler

# --- STAGE 1 & 2: String Indexing and One-Hot Encoding for Categorical Columns ---
# We create a list to hold all the stages of our pipeline
stages = []

# Loop through each categorical column
for col_name in categorical_cols:
    # 1. StringIndexer
    string_indexer = StringIndexer(inputCol=col_name, outputCol=col_name + "_index", handleInvalid="keep")
    # 2. OneHotEncoder
    one_hot_encoder = OneHotEncoder(inputCols=[string_indexer.getOutputCol()], outputCols=[col_name + "_vec"])
    # Add these two stages to our pipeline
    stages += [string_indexer, one_hot_encoder]

# --- STAGE 3: Vector Assembly ---
# Create a list of all feature columns to be assembled
# This includes the one-hot encoded vectors and the original numerical columns
assembler_inputs = [c + "_vec" for c in categorical_cols] + numerical_cols

# Create the VectorAssembler stage
vector_assembler = VectorAssembler(inputCols=assembler_inputs, outputCol="features")

# Add the assembler to our stages
stages += [vector_assembler]

# --- Create and Run the Pipeline ---
# Define the pipeline with all the stages
preprocessing_pipeline = Pipeline(stages=stages)

# Fit the pipeline to our data to "learn" the transformations
pipeline_model = preprocessing_pipeline.fit(df_final)

# Transform the data to apply the transformations
df_model_ready = pipeline_model.transform(df_final)

print("Pipeline created and data transformed successfully.")

# --- Inspect the Result ---
# Let's look at the final DataFrame. You'll see the new columns,
# especially the 'features' vector and our target column.
df_model_ready.select(target_col, 'features').show(5, truncate=False)

Pipeline created and data transformed successfully.
+-----+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|label|features                                                                                                                                                                                                                                                                                                                           |
+-----+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [20]:
df_model_ready.groupBy('label').count().show()

+-----+-----+
|label|count|
+-----+-----+
|  0.0|54864|
|  1.0|11357|
|  2.0|35545|
+-----+-----+



In [21]:
(trainingData, testData) = df_model_ready.randomSplit([0.8, 0.2], seed=42)

# Cache the data to speed up repeated access
trainingData.cache()
testData.cache()

print(f"Number of training samples: {trainingData.count()}")
print(f"Number of testing samples: {testData.count()}")

Number of training samples: 81565
Number of testing samples: 20201


In [22]:
from pyspark.ml.classification import LogisticRegression

# Instantiate the Logistic Regression model
# It will use our 'features' and 'label' columns
lr = LogisticRegression(featuresCol='features', labelCol='label')

# Train the model by fitting it to the training data
print("Training the Logistic Regression model...")
lr_model = lr.fit(trainingData)
print("Model training complete.")

Training the Logistic Regression model...
Model training complete.


In [24]:
# Make predictions on the test data
predictions = lr_model.transform(testData)

# Let's look at the predictions
# The 'prediction' column is what the model guessed.
print("Sample predictions:")
predictions.select('label', 'prediction', 'probability').show(10, truncate=False)

Sample predictions:
+-----+----------+-------------------------------------------------------------+
|label|prediction|probability                                                  |
+-----+----------+-------------------------------------------------------------+
|2.0  |0.0       |[0.9172633011526374,0.013758167024229419,0.06897853182313325]|
|0.0  |0.0       |[0.6364394544351407,0.08360657131193892,0.2799539742529205]  |
|0.0  |0.0       |[0.7125058977114047,0.034925253653600594,0.2525688486349947] |
|2.0  |0.0       |[0.6676560094028694,0.04553025426885506,0.2868137363282756]  |
|2.0  |2.0       |[0.40163273076289696,0.1152128564755154,0.4831544127615876]  |
|0.0  |0.0       |[0.7325772684209156,0.03522576309317734,0.2321969684859071]  |
|0.0  |0.0       |[0.6295071708913154,0.054447426242994985,0.3160454028656896] |
|2.0  |0.0       |[0.6746278811526313,0.045122881108826625,0.28024923773854205]|
|2.0  |0.0       |[0.7163222352788102,0.044348244826413,0.23932951989477688]   |
|2.0  |0

In [25]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
import pandas as pd

# --- Metric 1: Accuracy ---
# "Out of all predictions, how many were correct?"
evaluator_acc = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator_acc.evaluate(predictions)
print(f"Accuracy = {accuracy:.2%}")

# --- Metric 2: F1-Score ---
# A weighted average of precision and recall. A good overall measure.
evaluator_f1 = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="f1")
f1_score = evaluator_f1.evaluate(predictions)
print(f"F1 Score = {f1_score:.4f}")

# --- Detailed View: Confusion Matrix ---
# Shows how many times the model predicted each class vs. the actual class.
print("\nConfusion Matrix:")
# Convert to Pandas for a nice crosstab view
preds_and_labels = predictions.select(['prediction', 'label']).toPandas()
confusion_matrix = pd.crosstab(preds_and_labels['label'], preds_and_labels['prediction'])
print(confusion_matrix)

# Remember our mapping: 0.0 = 'NO', 1.0 = '<30', 2.0 = '>30'

Accuracy = 58.42%
F1 Score = 0.5259

Confusion Matrix:
prediction   0.0  1.0   2.0
label                      
0.0         9532   25  1330
1.0         1480   38   779
2.0         4733   52  2232
