## Problem koji rešavamo

Zadatak se formuliše kao problem binarne klasifikacije. Cilj analize je da se, na osnovu demografskih i kliničkih karakteristika pacijenata, predvidi **smrtni ishod** COVID-19 pacijenta.

Pravovremena identifikacija visokorizičnih pacijenata može pomoći lekarima da lakše uoče pacijente kojima preti smrtni ishod i obezbede bolju alokaciju medicinskih resursa.

Ciljna promenljiva je **DEATH**, gde vrednost 1 označava preminulog pacijenta, dok vrednost 0 označava pacijenta koji je preživeo.

## SparkML modelovanje – predikcija smrtnog ishoda

Cilj ovog dela analize je izgradnja modela mašinskog učenja koji može da predvidi da li će COVID-19 pacijent preminuti, na osnovu demografskih i kliničkih karakteristika.  
Model se implementira korišćenjem SparkML biblioteke i direktno odgovara na istraživačko pitanje br. 5 definisano u opisu projekta.

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

spark = (
    SparkSession.builder
    .appName("COVID-SparkML-Death-Prediction")
    .getOrCreate()
)

spark.sparkContext.setLogLevel("WARN")

print("Spark version:", spark.version)
print("Master:", spark.sparkContext.master)

Spark version: 4.1.1
Master: local[*]


### Učitavanje podataka u Spark

Učitavamo prethodno očišćen i pripremljen dataset (`Covid_Data_Clean.csv`) koji je produkt skripte 1 (čišćenje i priprema podataka). Dataset sadrži sve transformacije i izvedene atribute kreirane u fazi pripreme podataka.

In [2]:
spark_df = (
    spark.read
    .option("header", True)
    .option("inferSchema", True)
    .csv('../data/Covid_Data_Clean.csv')
)

spark_df.printSchema()
spark_df.show(5)

root
 |-- USMER: integer (nullable = true)
 |-- MEDICAL_UNIT: integer (nullable = true)
 |-- SEX: integer (nullable = true)
 |-- INTUBED: double (nullable = true)
 |-- PNEUMONIA: double (nullable = true)
 |-- AGE: integer (nullable = true)
 |-- PREGNANT: double (nullable = true)
 |-- DIABETES: double (nullable = true)
 |-- COPD: double (nullable = true)
 |-- ASTHMA: double (nullable = true)
 |-- INMSUPR: double (nullable = true)
 |-- HIPERTENSION: double (nullable = true)
 |-- OTHER_DISEASE: double (nullable = true)
 |-- CARDIOVASCULAR: double (nullable = true)
 |-- OBESITY: double (nullable = true)
 |-- RENAL_CHRONIC: double (nullable = true)
 |-- TOBACCO: double (nullable = true)
 |-- ICU: double (nullable = true)
 |-- DEATH: integer (nullable = true)
 |-- AGE_GROUP: string (nullable = true)
 |-- COMORBIDITIES_COUNT: double (nullable = true)
 |-- COMORBIDITY_GROUP: string (nullable = true)
 |-- HIGH_RISK: integer (nullable = true)
 |-- PNEUMONIA_IN_ELDERLY: integer (nullable = true)


### Izbor karakteristika i priprema podataka

Za modelovanje se koriste demografske i kliničke karakteristike pacijenata. Feature-i su odabrani na osnovu nalaza EDA analize:

- **Demografski faktori**: `AGE`, `SEX` – istraživačko pitanje 1 pokazalo je da su starost i pol značajno povezani sa smrtnim ishodom
- **Klinički faktori**: `PNEUMONIA` – istraživačko pitanje 4 pokazalo je visoku korelaciju pneumonije i smrtnosti, posebno kod starijih
- **Hronične bolesti**: `DIABETES`, `HIPERTENSION`, `OBESITY`, `RENAL_CHRONIC`, `COPD`, `CARDIOVASCULAR` – istraživačko pitanje 3 identifikovalo je ove komorbiditetie kao značajne faktore rizika
- **Izvedeni atributi**: `COMORBIDITIES_COUNT`, `HIGH_RISK`, `PNEUMONIA_IN_ELDERLY` – kreirani u fazi pripreme podataka radi boljeg obuhvatanja složenih obrazaca

Nedostajuće vrednosti su uklonjene, a vrednosti 97/98/99 su već zamenjene tokom čišćenja podataka.

In [3]:
features = [
    "AGE",
    "SEX",
    "PNEUMONIA",
    "DIABETES",
    "HIPERTENSION",
    "OBESITY",
    "RENAL_CHRONIC",
    "COPD",
    "CARDIOVASCULAR",
    "COMORBIDITIES_COUNT",
    "HIGH_RISK",
    "PNEUMONIA_IN_ELDERLY"
]

label_col = "DEATH"

ml_df = (
    spark_df
    .select(features + [label_col])
    .dropna()
)

print("Distribucija ciljne varijable (DEATH):")
ml_df.groupBy(label_col).count().show()

Distribucija ciljne varijable (DEATH):
+-----+------+
|DEATH| count|
+-----+------+
|    1| 52176|
|    0|335206|
+-----+------+



### Balansiranje klasa (Class Weighting)

Dataset je neuravnotežen – preminulih pacijenata je značajno manje od preživelih. Model bi bez korekcije tendirao ka predviđanju dominantne klase (0 – preživeo), što bi rezultovalo visokom accuracy ali lošim prepoznavanjem pacijenata kojima preti smrtni ishod.

Primenjujemo tehniku ponderisanja klasa: svakom primeru dodeljujemo težinu obrnuto proporcionalnu zastupljenosti njegove klase, čime model posvećuje više pažnje manjinskoj klasi.

In [4]:
total = ml_df.count()
pos = ml_df.filter(F.col("DEATH") == 1).count()
neg = total - pos

print(f"Ukupno pacijenata: {total}")
print(f"Preminulih (DEATH=1): {pos} ({pos/total*100:.1f}%)")
print(f"Preživelih (DEATH=0): {neg} ({neg/total*100:.1f}%)")

ml_df = ml_df.withColumn(
    "classWeight",
    F.when(F.col("DEATH") == 1, total / (2.0 * pos))
     .otherwise(total / (2.0 * neg))
)

Ukupno pacijenata: 387382
Preminulih (DEATH=1): 52176 (13.5%)
Preživelih (DEATH=0): 335206 (86.5%)


### Podela na trening i test skup

Skup podataka deli se na trening (80%) i test (20%) deo. Model se trenira na trening skupu, dok se test skup koristi za objektivnu procenu performansi na neviđenim podacima.

Ovakav pristup smanjuje rizik od preprilagođavanja (overfitting) modela.

In [5]:
train_df, test_df = ml_df.randomSplit([0.8, 0.2], seed=42)

print("Train size:", train_df.count())
print("Test size:", test_df.count())

Train size: 310177
Test size: 77205


### Kreiranje ML pipeline-a

Koristi se SparkML pipeline koji standardizuje tok obrade podataka i sprečava curenje informacija između trening i test skupa.

Numeričke karakteristike se objedinjavaju pomoću VectorAssembler transformacije u jedinstveni vektor obeležja. Nad tako pripremljenim podacima trenira se model **logističke regresije**, koji je pogodan za probleme binarne klasifikacije i pruža interpretabilne koeficijente za svaki faktor rizika.

In [6]:
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(
    inputCols=features,
    outputCol="features"
)

In [7]:
from pyspark.ml.classification import LogisticRegression

lr = LogisticRegression(
    featuresCol="features",
    labelCol=label_col,
    weightCol="classWeight",
    maxIter=50
)

In [8]:
from pyspark.ml import Pipeline

pipeline = Pipeline(stages=[assembler, lr])

model = pipeline.fit(train_df)
print("Model uspešno istreniran.")

Model uspešno istreniran.


### Predikcije modela

Model generiše za svaki pacijent: binarnu predikciju (0 ili 1) i verovatnoću pripadnosti svakoj klasi.

In [9]:
pred = model.transform(test_df)

pred.select(
    label_col,
    "prediction",
    "probability"
).show(10, truncate=False)

+-----+----------+-----------------------------------------+
|DEATH|prediction|probability                              |
+-----+----------+-----------------------------------------+
|0    |0.0       |[0.9927315056771728,0.007268494322827168]|
|0    |0.0       |[0.9927315056771728,0.007268494322827168]|
|0    |0.0       |[0.9927315056771728,0.007268494322827168]|
|0    |0.0       |[0.9927315056771728,0.007268494322827168]|
|1    |0.0       |[0.9890887947528785,0.010911205247121525]|
|0    |0.0       |[0.8916410899359559,0.10835891006404408] |
|0    |0.0       |[0.8916410899359559,0.10835891006404408] |
|0    |0.0       |[0.8452337749819528,0.15476622501804715] |
|1    |0.0       |[0.9937310886481493,0.006268911351850703]|
|1    |0.0       |[0.9905845819295694,0.009415418070430559]|
+-----+----------+-----------------------------------------+
only showing top 10 rows


### Evaluacija modela

Performanse modela procenjuju se korišćenjem više metrika:

- **AUC (Area Under ROC Curve)** – meri sposobnost modela da razlikuje klase; vrednost 1.0 je savršena, 0.5 je slučajno pogađanje
- **Accuracy** – procenat tačno klasifikovanih primera; može biti varljiva metrika kod neuravnoteženih skupova
- **F1 Score** – harmonijska sredina preciznosti i odziva; pouzdanija metrika u prisustvu neravnomernih klasa
- **Konfuziona matrica** – pruža uvid u tipove grešaka: lažno negativni (FN) su posebno kritični jer predstavljaju propuštene visokorizične pacijente

In [10]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

evaluator_auc = BinaryClassificationEvaluator(
    labelCol=label_col,
    rawPredictionCol="rawPrediction",
    metricName="areaUnderROC"
)

auc = evaluator_auc.evaluate(pred)
print(f"AUC: {auc:.4f}")

AUC: 0.9094


In [11]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

acc_eval = MulticlassClassificationEvaluator(
    labelCol=label_col,
    metricName="accuracy"
)
accuracy = acc_eval.evaluate(pred)
print(f"Accuracy: {accuracy:.4f}")

Accuracy: 0.8342


In [12]:
f1_eval = MulticlassClassificationEvaluator(
    labelCol=label_col,
    metricName="f1"
)
f1 = f1_eval.evaluate(pred)
print(f"F1 Score: {f1:.4f}")

F1 Score: 0.8537


### Konfuziona matrica

Konfuziona matrica prikazuje raspodelu tačnih i netačnih predikcija:
- **TN (0→0)**: tačno predviđeni preživeli
- **FP (0→1)**: preživeli pogrešno klasifikovani kao preminuli
- **FN (1→0)**: preminuli pogrešno klasifikovani kao preživeli 
- **TP (1→1)**: tačno predviđeni preminuli

In [13]:
print("Konfuziona matrica:")
pred.groupBy(label_col, "prediction") \
    .count() \
    .orderBy(label_col, "prediction") \
    .show()

Konfuziona matrica:
+-----+----------+-----+
|DEATH|prediction|count|
+-----+----------+-----+
|    0|       0.0|55427|
|    0|       1.0|11209|
|    1|       0.0| 1590|
|    1|       1.0| 8979|
+-----+----------+-----+

