In [1]:
# =========================
# Install PySpark and Import Libraries
# =========================

# Install PySpark
!pip install pyspark --quiet

# Import necessary libraries
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, StandardScaler
from pyspark.ml import Pipeline, PipelineModel
from pyspark.ml.classification import (
    LogisticRegression, RandomForestClassifier, MultilayerPerceptronClassifier,
    LogisticRegressionModel, RandomForestClassificationModel, MultilayerPerceptronClassificationModel
)
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
import shutil
import os


import urllib.request


In [2]:
# =========================
# Start Spark Session
# =========================

# Initialize a Spark session
spark = SparkSession.builder.appName("HeartAttackPrediction").getOrCreate()


In [3]:
# =========================
# Download the dataset
# =========================

url = "https://sp8138-heart-attack-dataset.s3.us-east-2.amazonaws.com/heart_attack_data.csv"
local_file = "heart_attack_data.csv"
urllib.request.urlretrieve(url, local_file)


('heart_attack_data.csv', <http.client.HTTPMessage at 0x79da85551b10>)

In [4]:
# =========================
# Load and Preprocess Data
# =========================

# Load the dataset
df = spark.read.csv("heart_attack_data.csv", header=True, inferSchema=True)

# Drop rows with missing values
df = df.dropna()

# Display the schema of the dataset
df.printSchema()


root
 |-- Age: integer (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Cholesterol: integer (nullable = true)
 |-- BloodPressure: integer (nullable = true)
 |-- HeartRate: integer (nullable = true)
 |-- BMI: double (nullable = true)
 |-- Smoker: integer (nullable = true)
 |-- Diabetes: integer (nullable = true)
 |-- Hypertension: integer (nullable = true)
 |-- FamilyHistory: integer (nullable = true)
 |-- PhysicalActivity: integer (nullable = true)
 |-- AlcoholConsumption: integer (nullable = true)
 |-- Diet: string (nullable = true)
 |-- StressLevel: integer (nullable = true)
 |-- Ethnicity: string (nullable = true)
 |-- Income: integer (nullable = true)
 |-- EducationLevel: string (nullable = true)
 |-- Medication: string (nullable = true)
 |-- ChestPainType: string (nullable = true)
 |-- ECGResults: string (nullable = true)
 |-- MaxHeartRate: integer (nullable = true)
 |-- ST_Depression: double (nullable = true)
 |-- ExerciseInducedAngina: string (nullable = true)
 |-- 

In [5]:
# =========================
# Select Features
# =========================

# Define the target column and selected features
target_col = "Outcome"
selected_numeric = [
    "MaxHeartRate", "PhysicalActivity", "Hypertension", "FamilyHistory", "StressLevel",
    "Smoker", "AlcoholConsumption", "Cholesterol", "StrokeHistory", "ST_Depression",
    "NumberOfMajorVessels", "BMI", "PreviousHeartAttack", "HeartRate", "Diabetes"
]
selected_categorical = [
    "Gender", "Diet", "Ethnicity", "EducationLevel", "Medication", "ChestPainType",
    "ECGResults", "ExerciseInducedAngina", "Slope", "Thalassemia", "Residence", "MaritalStatus"
]

categorical_cols = selected_categorical
numeric_cols = selected_numeric


In [6]:
# =========================
# Preprocessing Pipeline
# =========================

# StringIndexer for categorical columns (including target)
indexers = [StringIndexer(inputCol=col, outputCol=col+"_idx", handleInvalid="keep") for col in categorical_cols + [target_col]]

# OneHotEncoder for categorical features (not target)
encoder = OneHotEncoder(inputCols=[col+"_idx" for col in categorical_cols],
                        outputCols=[col+"_ohe" for col in categorical_cols])

# Assemble features
feature_cols = [col+"_ohe" for col in categorical_cols] + numeric_cols
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features_unscaled")

# StandardScaler
scaler = StandardScaler(inputCol="features_unscaled", outputCol="features", withMean=True, withStd=True)

# Create the pipeline
pipeline = Pipeline(stages=indexers + [encoder, assembler, scaler])
pipeline_model = pipeline.fit(df)

# Transform the dataset
df_prep = pipeline_model.transform(df)

# Select final features and label
df_final = df_prep.select("features", col("Outcome_idx").alias("label"))


In [7]:
# =========================
# Train/Test Split
# =========================

# Split the data into training and testing sets
train, test = df_final.randomSplit([0.8, 0.2], seed=42)

# Display the sizes of the splits
print(f"Training set size: {train.count()}")
print(f"Testing set size: {test.count()}")


Training set size: 298307
Testing set size: 74667


In [8]:
# =========================
# Train Models
# =========================

# Logistic Regression
lr = LogisticRegression(featuresCol="features", labelCol="label", maxIter=100)
lr_model = lr.fit(train)

# Random Forest
rf = RandomForestClassifier(featuresCol="features", labelCol="label", numTrees=100, seed=42)
rf_model = rf.fit(train)

# Multilayer Perceptron (ANN)
input_size = train.first()['features'].size
layers = [input_size, 16, 8, 2]  # 2 output classes
mlp = MultilayerPerceptronClassifier(featuresCol="features", labelCol="label", maxIter=100, layers=layers, seed=42)
mlp_model = mlp.fit(train)


In [9]:
# =========================
# Save Models and Pipeline
# =========================

# Create a directory for saving models
os.makedirs("saved_models", exist_ok=True)

# Remove existing directories if they exist
for path in [
    "saved_models/preprocessing_pipeline",
    "saved_models/logistic_regression",
    "saved_models/random_forest",
    "saved_models/ann_mlp"
]:
    if os.path.exists(path):
        shutil.rmtree(path)

# Save the pipeline and models
pipeline_model.save("saved_models/preprocessing_pipeline")
lr_model.save("saved_models/logistic_regression")
rf_model.save("saved_models/random_forest")
mlp_model.save("saved_models/ann_mlp")

print("Models and pipeline saved in 'saved_models/' directory.")


Models and pipeline saved in 'saved_models/' directory.


In [10]:
# =========================
# Evaluate Models
# =========================

# Function to evaluate a model
def evaluate_model(model, test, name):
    predictions = model.transform(test)
    evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
    acc = evaluator.evaluate(predictions)
    print(f"\n{name} Accuracy: {acc:.3f}")
    pred_pd = predictions.select("label", "prediction").toPandas()
    from sklearn.metrics import classification_report, confusion_matrix
    print(classification_report(pred_pd['label'], pred_pd['prediction']))
    print("Confusion Matrix:\n", confusion_matrix(pred_pd['label'], pred_pd['prediction']))

# Evaluate all models
evaluate_model(lr_model, test, "Logistic Regression")
evaluate_model(rf_model, test, "Random Forest")
evaluate_model(mlp_model, test, "Multilayer Perceptron (ANN)")



Logistic Regression Accuracy: 0.501
              precision    recall  f1-score   support

         0.0       0.50      0.55      0.52     37271
         1.0       0.50      0.45      0.47     37396

    accuracy                           0.50     74667
   macro avg       0.50      0.50      0.50     74667
weighted avg       0.50      0.50      0.50     74667

Confusion Matrix:
 [[20584 16687]
 [20573 16823]]

Random Forest Accuracy: 0.500
              precision    recall  f1-score   support

         0.0       0.50      0.61      0.55     37271
         1.0       0.50      0.39      0.44     37396

    accuracy                           0.50     74667
   macro avg       0.50      0.50      0.49     74667
weighted avg       0.50      0.50      0.49     74667

Confusion Matrix:
 [[22878 14393]
 [22908 14488]]

Multilayer Perceptron (ANN) Accuracy: 0.500
              precision    recall  f1-score   support

         0.0       0.50      0.51      0.51     37271
         1.0       0.50 

In [11]:
# =========================
# Load Models and Pipeline
# =========================

# Load the saved pipeline and models
loaded_pipeline = PipelineModel.load("saved_models/preprocessing_pipeline")
loaded_lr = LogisticRegressionModel.load("saved_models/logistic_regression")
loaded_rf = RandomForestClassificationModel.load("saved_models/random_forest")
loaded_mlp = MultilayerPerceptronClassificationModel.load("saved_models/ann_mlp")

print("Models and pipeline loaded successfully.")


Models and pipeline loaded successfully.


In [12]:
# =========================
# Inference on New Case
# =========================

# Function to predict a new case
def predict_new_case(new_case_dict):
    new_df = spark.createDataFrame([new_case_dict])
    new_df_prep = loaded_pipeline.transform(new_df)
    new_df_final = new_df_prep.select("features")
    print("\n--- Inference on New Case ---")
    for name, model in [("Logistic Regression", loaded_lr), ("Random Forest", loaded_rf), ("ANN", loaded_mlp)]:
        pred = model.transform(new_df_final).select("prediction").collect()[0][0]
        outcome = "Heart Attack" if pred == 1.0 else "No Heart Attack"
        print(f"{name}: {outcome}")

# Example new case
new_case = {
    'Gender': 'Male',
    'Diet': 'Moderate',
    'Ethnicity': 'White',
    'EducationLevel': 'College',
    'Medication': 'Yes',
    'ChestPainType': 'Typical',
    'ECGResults': 'Normal',
    'ExerciseInducedAngina': 'No',
    'Slope': 'Flat',
    'Thalassemia': 'Normal',
    'Residence': 'Urban',
    'MaritalStatus': 'Married',
    'MaxHeartRate': 150,
    'PhysicalActivity': 2,
    'Hypertension': 1,
    'FamilyHistory': 1,
    'StressLevel': 2,
    'Smoker': 1,
    'AlcoholConsumption': 1,
    'Cholesterol': 200,
    'StrokeHistory': 0,
    'ST_Depression': 1.2,
    'NumberOfMajorVessels': 0,
    'BMI': 27.5,
    'PreviousHeartAttack': 0,
    'HeartRate': 80,
    'Diabetes': 0
}
predict_new_case(new_case)



--- Inference on New Case ---
Logistic Regression: Heart Attack
Random Forest: No Heart Attack
ANN: No Heart Attack


In [13]:
# Zip the preprocessing pipeline
!zip -r preprocessing_pipeline.zip saved_models/preprocessing_pipeline

# Zip the random forest model
!zip -r random_forest.zip saved_models/random_forest

  adding: saved_models/preprocessing_pipeline/ (stored 0%)
  adding: saved_models/preprocessing_pipeline/stages/ (stored 0%)
  adding: saved_models/preprocessing_pipeline/stages/09_StringIndexer_2bb3e04fd38d/ (stored 0%)
  adding: saved_models/preprocessing_pipeline/stages/09_StringIndexer_2bb3e04fd38d/metadata/ (stored 0%)
  adding: saved_models/preprocessing_pipeline/stages/09_StringIndexer_2bb3e04fd38d/metadata/._SUCCESS.crc (stored 0%)
  adding: saved_models/preprocessing_pipeline/stages/09_StringIndexer_2bb3e04fd38d/metadata/.part-00000.crc (stored 0%)
  adding: saved_models/preprocessing_pipeline/stages/09_StringIndexer_2bb3e04fd38d/metadata/part-00000 (deflated 39%)
  adding: saved_models/preprocessing_pipeline/stages/09_StringIndexer_2bb3e04fd38d/metadata/_SUCCESS (stored 0%)
  adding: saved_models/preprocessing_pipeline/stages/09_StringIndexer_2bb3e04fd38d/data/ (stored 0%)
  adding: saved_models/preprocessing_pipeline/stages/09_StringIndexer_2bb3e04fd38d/data/.part-00000-7f3c

In [14]:
!zip -r saved_models.zip saved_models

  adding: saved_models/ (stored 0%)
  adding: saved_models/ann_mlp/ (stored 0%)
  adding: saved_models/ann_mlp/metadata/ (stored 0%)
  adding: saved_models/ann_mlp/metadata/._SUCCESS.crc (stored 0%)
  adding: saved_models/ann_mlp/metadata/.part-00000.crc (stored 0%)
  adding: saved_models/ann_mlp/metadata/part-00000 (deflated 46%)
  adding: saved_models/ann_mlp/metadata/_SUCCESS (stored 0%)
  adding: saved_models/ann_mlp/data/ (stored 0%)
  adding: saved_models/ann_mlp/data/._SUCCESS.crc (stored 0%)
  adding: saved_models/ann_mlp/data/.part-00000-70172737-22d9-4466-97ba-0aaf77749a96-c000.snappy.parquet.crc (stored 0%)
  adding: saved_models/ann_mlp/data/_SUCCESS (stored 0%)
  adding: saved_models/ann_mlp/data/part-00000-70172737-22d9-4466-97ba-0aaf77749a96-c000.snappy.parquet (deflated 10%)
  adding: saved_models/logistic_regression/ (stored 0%)
  adding: saved_models/logistic_regression/metadata/ (stored 0%)
  adding: saved_models/logistic_regression/metadata/._SUCCESS.crc (stored 0%)