In [1]:
# Starting Spark properly with a custom temp directory to avoid shuffle errors
from pyspark.sql import SparkSession
import os

# Set a safe local temp directory
os.environ["SPARK_LOCAL_DIRS"] = "C:/temp/spark"

# Start Spark session
spark = SparkSession.builder.appName("BigDataAnalysis").getOrCreate()
spark






In [2]:
# Loading in the cleaned patient dataset
df = spark.read.csv("cleaned_patients.csv", header=True, inferSchema=True)

# Quick look at first few rows
df.show(5)

# Checking what the structure of the data looks like
df.printSchema()




+--------------------+--------------------+---+--------------------+----------+-------+---------+------------------+--------+--------+----+------+-------+------------+-------------+--------------+-------+-------------+-------+--------------------+---+
|               USMER|        MEDICAL_UNIT|SEX|        PATIENT_TYPE| DATE_DIED|INTUBED|PNEUMONIA|               AGE|PREGNANT|DIABETES|COPD|ASTHMA|INMSUPR|HIPERTENSION|OTHER_DISEASE|CARDIOVASCULAR|OBESITY|RENAL_CHRONIC|TOBACCO|CLASIFFICATION_FINAL|ICU|
+--------------------+--------------------+---+--------------------+----------+-------+---------+------------------+--------+--------+----+------+-------+------------+-------------+--------------+-------+-------------+-------+--------------------+---+
| 0.03630966210657986| 0.01815483105328993|  2| 0.03630966210657986|09/06/2020|      1|        2|0.9985157079309461|       2|       1|   2|     2|      2|           2|            2|             2|      2|            2|      2|                  

In [3]:
# Picking out the main features to use in the model
from pyspark.ml.feature import VectorAssembler

features = ["AGE", "OBESITY", "TOBACCO", "HIPERTENSION", "DIABETES"]

# Combining them into one feature column
assembler = VectorAssembler(inputCols=features, outputCol="features")
df_transformed = assembler.transform(df)

# Just checking that the features column looks alright
df_transformed.select("AGE", "OBESITY", "TOBACCO", "HIPERTENSION", "DIABETES", "features").show(5, truncate=False)



+------------------+-------+-------+------------+--------+------------------------------------+
|AGE               |OBESITY|TOBACCO|HIPERTENSION|DIABETES|features                            |
+------------------+-------+-------+------------+--------+------------------------------------+
|0.9985157079309461|2      |2      |2           |1       |[0.9985157079309461,2.0,2.0,2.0,1.0]|
|0.9971993098884563|2      |2      |2           |2       |[0.9971993098884563,2.0,2.0,2.0,2.0]|
|0.9967290481146759|1      |2      |1           |1       |[0.9967290481146759,1.0,2.0,1.0,1.0]|
|0.992876838486922 |2      |2      |2           |2       |[0.992876838486922,2.0,2.0,2.0,2.0] |
|0.9922778767136677|2      |2      |2           |2       |[0.9922778767136677,2.0,2.0,2.0,2.0]|
+------------------+-------+-------+------------+--------+------------------------------------+
only showing top 5 rows



In [4]:
# Using KMeans to group patients based on the features
from pyspark.ml.clustering import KMeans

kmeans = KMeans(k=3, seed=42, featuresCol="features")
kmeans_model = kmeans.fit(df_transformed)
clusters = kmeans_model.transform(df_transformed)

# Checking what cluster each patient ended up in
clusters.select("features", "prediction").show(10)




+--------------------+----------+
|            features|prediction|
+--------------------+----------+
|[0.99851570793094...|         0|
|[0.99719930988845...|         1|
|[0.99672904811467...|         0|
|[0.99287683848692...|         1|
|[0.99227787671366...|         1|
|[0.99503719020998...|         1|
|[0.99929761570918...|         1|
|[0.99870976937166...|         2|
|[0.99778515785660...|         1|
|[0.99778515785660...|         1|
+--------------------+----------+
only showing top 10 rows



In [5]:
# Seeing how ICU cases are spread across each cluster
clusters.groupBy("prediction", "ICU").count().show()





+----------+---+------+
|prediction|ICU| count|
+----------+---+------+
|         2|1.0|  1119|
|         0|2.0| 50602|
|         1|2.0|115968|
|         2|2.0| 12134|
|         1|1.0|  9836|
|         0|1.0|  4867|
+----------+---+------+



In [6]:
# Training a Random Forest model to predict ICU
from pyspark.ml.classification import RandomForestClassifier

rf = RandomForestClassifier(labelCol="ICU", featuresCol="features", numTrees=100)
rf_model = rf.fit(df_transformed)

# Making predictions on full dataset
rf_predictions = rf_model.transform(df_transformed)
rf_predictions.select("features", "ICU", "prediction", "probability").show(10, truncate=False)




+------------------------------------+---+----------+-------------------------------------------+
|features                            |ICU|prediction|probability                                |
+------------------------------------+---+----------+-------------------------------------------+
|[0.9985157079309461,2.0,2.0,2.0,1.0]|2.0|2.0       |[0.0,0.0813553519860528,0.9186446480139472]|
|[0.9971993098884563,2.0,2.0,2.0,2.0]|2.0|2.0       |[0.0,0.0813553519860528,0.9186446480139472]|
|[0.9967290481146759,1.0,2.0,1.0,1.0]|2.0|2.0       |[0.0,0.0813553519860528,0.9186446480139472]|
|[0.992876838486922,2.0,2.0,2.0,2.0] |2.0|2.0       |[0.0,0.0813553519860528,0.9186446480139472]|
|[0.9922778767136677,2.0,2.0,2.0,2.0]|2.0|2.0       |[0.0,0.0813553519860528,0.9186446480139472]|
|[0.9950371902099892,2.0,2.0,2.0,2.0]|2.0|2.0       |[0.0,0.0813553519860528,0.9186446480139472]|
|[0.9992976157091806,2.0,2.0,1.0,2.0]|1.0|2.0       |[0.0,0.0813553519860528,0.9186446480139472]|
|[0.9987097693716604

In [7]:
# Balancing the dataset by oversampling ICU = 1 cases

# Splitting the data into ICU = 1 (minority) and ICU = 2 (majority)
minority = df_transformed.filter(df_transformed["ICU"] == 1)
majority = df_transformed.filter(df_transformed["ICU"] == 2)

# Duplicating the minority class to balance it
oversampled = minority.sample(withReplacement=True, fraction=2.0)

# Putting both groups back together
balanced_data = majority.union(oversampled)

# Shuffling the data
from pyspark.sql.functions import rand
balanced_data = balanced_data.orderBy(rand())

# Checking how balanced it is now
balanced_data.groupBy("ICU").count().show()





+---+------+
|ICU| count|
+---+------+
|2.0|178704|
|1.0| 31704|
+---+------+



In [8]:
# Training the model again on the balanced dataset
rf_balanced = RandomForestClassifier(labelCol="ICU", featuresCol="features", numTrees=100)
rf_balanced_model = rf_balanced.fit(balanced_data)

# Making new predictions
balanced_predictions = rf_balanced_model.transform(balanced_data)
balanced_predictions.select("features", "ICU", "prediction", "probability").show(10, truncate=False)





+-------------------------------------+---+----------+--------------------------------------------+
|features                             |ICU|prediction|probability                                 |
+-------------------------------------+---+----------+--------------------------------------------+
|[0.9919952972186272,2.0,2.0,2.0,2.0] |2.0|2.0       |[0.0,0.15358922631119284,0.8464107736888071]|
|[0.8968700041677191,2.0,2.0,2.0,2.0] |2.0|2.0       |[0.0,0.15308600753101764,0.8469139924689822]|
|[0.9672747798780993,2.0,2.0,2.0,2.0] |2.0|2.0       |[0.0,0.15358922631119284,0.8464107736888071]|
|[0.9977369441329896,2.0,2.0,1.0,1.0] |2.0|2.0       |[0.0,0.14481709895494896,0.8551829010450511]|
|[0.9937523518859899,2.0,2.0,1.0,2.0] |1.0|2.0       |[0.0,0.1438383777693017,0.8561616222306984] |
|[0.9665486721013271,2.0,2.0,2.0,2.0] |2.0|2.0       |[0.0,0.15358922631119284,0.8464107736888071]|
|[0.9938586931957764,2.0,2.0,2.0,2.0] |2.0|2.0       |[0.0,0.1438383777693017,0.8561616222306984] |


In [9]:
# Checking how accurate the model is
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(labelCol="ICU", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(balanced_predictions)

print(f"Model Accuracy: {accuracy:.2f}")





Model Accuracy: 0.85


In [10]:
# Extra evaluation metrics
f1_eval = MulticlassClassificationEvaluator(labelCol="ICU", predictionCol="prediction", metricName="f1")
f1 = f1_eval.evaluate(balanced_predictions)
print(f"F1 Score: {f1:.2f}")

precision_eval = MulticlassClassificationEvaluator(labelCol="ICU", predictionCol="prediction", metricName="weightedPrecision")
precision = precision_eval.evaluate(balanced_predictions)
print(f"Precision: {precision:.2f}")

recall_eval = MulticlassClassificationEvaluator(labelCol="ICU", predictionCol="prediction", metricName="weightedRecall")
recall = recall_eval.evaluate(balanced_predictions)
print(f"Recall: {recall:.2f}")



F1 Score: 0.78
Precision: 0.72
Recall: 0.85


In [11]:
# Splitting the data into train and test sets
train_data, test_data = balanced_data.randomSplit([0.8, 0.2], seed=42)

# Just checking how many rows in each
print("Train count:", train_data.count())
print("Test count:", test_data.count())


Train count: 168533
Test count: 41866


In [12]:
# Training model on just the training data
final_rf = RandomForestClassifier(labelCol="ICU", featuresCol="features", numTrees=100)
final_model = final_rf.fit(train_data)

# Predicting on test data
test_preds = final_model.transform(test_data)
test_preds.select("features", "ICU", "prediction", "probability").show(10, truncate=False)


+------------------------------------+---+----------+--------------------------------------------+
|features                            |ICU|prediction|probability                                 |
+------------------------------------+---+----------+--------------------------------------------+
|[0.9926322333149494,2.0,2.0,1.0,2.0]|2.0|2.0       |[0.0,0.1520862287528701,0.84791377124713]   |
|[0.9979562822891704,2.0,2.0,2.0,2.0]|2.0|2.0       |[0.0,0.14858448816733236,0.8514155118326676]|
|[0.9989516508612453,2.0,2.0,2.0,2.0]|2.0|2.0       |[0.0,0.14858448816733236,0.8514155118326676]|
|[0.9989303992660782,2.0,2.0,1.0,2.0]|2.0|2.0       |[0.0,0.14864682652016545,0.8513531734798346]|
|[0.9921748659570819,2.0,2.0,2.0,2.0]|1.0|2.0       |[0.0,0.15129002721485163,0.8487099727851484]|
|[0.9921748659570819,2.0,2.0,2.0,2.0]|1.0|2.0       |[0.0,0.15129002721485163,0.8487099727851484]|
|[0.9977830021123382,1.0,2.0,2.0,2.0]|1.0|2.0       |[0.0,0.14909585914248147,0.8509041408575185]|
|[0.997736

In [13]:
# Final evaluation on test data
evaluator = MulticlassClassificationEvaluator(labelCol="ICU", predictionCol="prediction")

acc = evaluator.setMetricName("accuracy").evaluate(test_preds)
f1 = evaluator.setMetricName("f1").evaluate(test_preds)
prec = evaluator.setMetricName("weightedPrecision").evaluate(test_preds)
rec = evaluator.setMetricName("weightedRecall").evaluate(test_preds)

print(f"Accuracy: {acc:.2f}")
print(f"F1 Score: {f1:.2f}")
print(f"Precision: {prec:.2f}")
print(f"Recall: {rec:.2f}")


Accuracy: 0.85
F1 Score: 0.78
Precision: 0.72
Recall: 0.85
