<a href="https://colab.research.google.com/github/Hailemicael/Cardiovascular_Disease_Prediction_with_Explainable_AI/blob/master/Cardiovascular_Disease_Prediction_with_Explainable_AI_Last.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## **Data Preparation and Loading**

### **Libraries**

In [1]:
!pip install shap

zsh:1: command not found: pip


In [2]:
!pip install lime

zsh:1: command not found: pip


In [3]:
import os  # Provides functions for interacting with the operating system
import shap
import lime
import itertools
import numpy as np
import pandas as pd  # Data manipulation and analysis
import seaborn as sns   # This is used for data visualization
import lime.lime_tabular
import tensorflow as tf  # Deep learning framework
from sklearn.svm import SVC # Support Vector Machine model for classification tasks
import scipy.stats as stats
from collections import Counter
import matplotlib.pyplot as plt  # Plotting and visualizations
from imblearn.over_sampling import SMOTE  # Handles imbalanced datasets with oversampling
from sklearn.preprocessing import StandardScaler
from tensorflow.keras.callbacks import EarlyStopping
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split  # Splits data into training and test sets
from tensorflow.keras import layers, models, callbacks
from tensorflow.keras.layers import Conv1D, MaxPool1D, Flatten, Dense, Dropout, BatchNormalization
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, confusion_matrix  # Evaluation metrics for classification models

ModuleNotFoundError: No module named 'shap'

### **Loading the Dataset and Getting Information about the Dataset**

In [None]:
df = pd.read_csv('/content/heart_failure.csv') #Used for reading the dataframe
df.head()

In [None]:
df.info()      #Getting the info of the dataframe

In [None]:
print(f"Dataset shape: {df.shape}")  # Checking the Dataset Shape

### **Data Preprocessing**

In [None]:
# Check for missing values
print("Missing values:")
print(df.isnull().sum())

# Check for duplicates
duplicates = df.duplicated().sum()
print(f"Number of duplicate rows: {duplicates}")

# Remove duplicates if any
df = df.drop_duplicates()

In [None]:
df.describe().T         #Used for understanding the spread in the data

In [None]:
#This will help us understand how strongly each feature is related to the others
plt.figure(figsize = (15, 15))
sns.heatmap(df.drop(['DEATH_EVENT'], axis = 1).corr(), annot = True)

In [None]:
# This will helps us to visualizing relationships between features in a dataset and understanding how they relate to the target variable (DEATH_EVENT).
sns.pairplot(df[['age', 'creatinine_phosphokinase', 'ejection_fraction',
                 'serum_creatinine', 'serum_sodium', 'DEATH_EVENT']],
             hue='DEATH_EVENT', palette='CMRmap')
plt.show()


In [None]:
 # this code helps to show outliers effectively.
# List of features to visualize
features = ["age", "creatinine_phosphokinase", "ejection_fraction",
            "platelets", "serum_creatinine", "serum_sodium", "time"]
fig, axes = plt.subplots(4, 2, figsize=(16, 20))
# Flatten the axes array for easier indexing
axes = axes.flatten()
# Loop over the features and create plots
for i, feature in enumerate(features):
    sns.stripplot(x="DEATH_EVENT", y=feature, data=df, color="black", alpha=0.7, ax=axes[i])
    sns.boxenplot(x="DEATH_EVENT", y=feature, data=df, hue="DEATH_EVENT", palette="Set2", ax=axes[i], legend=False)
    axes[i].set_title(f'{feature} vs Death Event')
plt.tight_layout()
plt.show()


In [None]:
#This code visualizes the average values of selected continuous features for heart failure and non-heart failure groups using bar charts.
# Separate the data into two groups based on the DEATH_EVENT
heart_failure = df[df['DEATH_EVENT'] == 1]  # People who experienced heart failure (DEATH_EVENT = 1)
no_heart_failure = df[df['DEATH_EVENT'] == 0]  # People who did not experience heart failure (DEATH_EVENT = 0)

# List of features we want to compare
features = ['age', 'creatinine_phosphokinase', 'ejection_fraction',
            'platelets', 'serum_creatinine', 'serum_sodium']

# Create a 3x2 plot layout to display all the features
fig, axes = plt.subplots(3, 2, figsize=(11, 11))

# Loop over the features and create a bar chart for each one
for i, feature in enumerate(features):
    row = i // 2  # Determine the row for the subplot
    col = i % 2  # Determine the column for the subplot

    # Calculate the average values for the heart failure and no heart failure groups
    avg_heart_failure = heart_failure[feature].mean()
    avg_no_heart_failure = no_heart_failure[feature].mean()

    # Create a bar chart to compare the average values
    ax = axes[row, col]
    ax.bar(['Heart Failure', 'No Heart Failure'], [avg_heart_failure, avg_no_heart_failure], color=['orange', 'purple'])
    ax.set_ylabel(f"Avg {feature}")
    ax.set_title(f"Avg {feature} for Heart Failure vs No Heart Failure")

# Adjust layout and display the plot
plt.tight_layout()
plt.show()


In [None]:
#This code is used to generate Q-Q (Quantile-Quantile) plots for several continuous features in a dataset to test for normality.
continuous_feature_names = ['age', 'creatinine_phosphokinase', 'ejection_fraction', 'platelets',
                            'serum_creatinine', 'serum_sodium']

# Create the subplots grid (2 rows, 3 columns)
fig, axes = plt.subplots(2, 3, figsize=(12, 8))
# Loop through each continuous feature and generate Q-Q plot
for i, feature in enumerate(continuous_feature_names):
    ax = axes[i // 3, i % 3]  # Get the current subplot position
    stats.probplot(df[feature], plot=ax)  # Generate Q-Q plot
    ax.set_title(feature)  # Set the title as the feature name

plt.tight_layout()
plt.show()



### **Normalize continuous variables**


In [None]:
# List of all continuous variables
continuous_vars = ['age', 'creatinine_phosphokinase', 'ejection_fraction', 'platelets',
                   'serum_creatinine', 'serum_sodium', 'time']
# Normalize the continuous variables
scaler = StandardScaler()
df[continuous_vars] = scaler.fit_transform(df[continuous_vars])
df[continuous_vars]

### **Split the dataset**


In [None]:
# Define features (x) and target (y)
x = df.iloc[:, :-1].values  # All columns except the last one (DEATH_EVENT)
y = df.iloc[:, -1].values   # The last column (DEATH_EVENT)

# Check class distribution before SMOTE
counter_before = Counter(y)
print(f'Class distribution before SMOTE: {counter_before}')

# Apply SMOTE to balance the classes in DEATH_EVENT
oversample = SMOTE()
x, y = oversample.fit_resample(x, y)

# Check class distribution after SMOTE
counter_after = Counter(y)
print(f'Class distribution after SMOTE: {counter_after}')

# Split the data into training (70%) and temporary (30%)
x_train, x_temp, y_train, y_temp = train_test_split(x, y, test_size=0.30, random_state=42)

# Further split the temporary set into validation (15%) and test (15%)
x_val, x_test, y_val, y_test = train_test_split(x_temp, y_temp, test_size=0.50, random_state=42)

# Print the shapes of the datasets
print(f"Training set shape: {x_train.shape}")
print(f"Validation set shape: {x_val.shape}")
print(f"Test set shape: {x_test.shape}")

## **Model Development and Evaluation**

#### **Logistic Regression Model**

In [None]:
# Train Logistic Regression Model
logreg = LogisticRegression(max_iter=1000)
logreg.fit(x_train, y_train)

# Make predictions
y_pred = logreg.predict(x_test)

# Evaluate the model
accuracy = accuracy_score(y_test, y_pred)
precision = precision_score(y_test, y_pred)
recall = recall_score(y_test, y_pred)
f1 = f1_score(y_test, y_pred)

# Print evaluation metrics
print(f'Accuracy: {accuracy:.2f}')
print(f'Precision: {precision:.2f}')
print(f'Recall: {recall:.2f}')
print(f'F1 Score: {f1:.2f}')

# Confusion Matrix
conf_matrix = confusion_matrix(y_test, y_pred)

# Visualize Confusion Matrix
sns.heatmap(conf_matrix, annot=True, fmt='d', cmap='Blues',
            xticklabels=['No CVD', 'CVD'],
            yticklabels=['No CVD', 'CVD'])
plt.xlabel('Predicted Label')
plt.ylabel('True Label')
plt.title('Confusion Matrix')
plt.show()


#### **SVM**

In [None]:
# Function to evaluate model and print results
def evaluate_model(model, model_name):
    model.fit(x_train, y_train)  # Train model
    y_pred = model.predict(x_test)  # Make predictions

    # Calculate metrics
    accuracy = accuracy_score(y_test, y_pred)
    precision = precision_score(y_test, y_pred)
    recall = recall_score(y_test, y_pred)
    f1 = f1_score(y_test, y_pred)

    # Print results
    print(f'{model_name} - Accuracy: {accuracy:.2f}, Precision: {precision:.2f}, Recall: {recall:.2f}, F1 Score: {f1:.2f}')

    # Confusion Matrix
    conf_matrix = confusion_matrix(y_test, y_pred)
    sns.heatmap(conf_matrix, annot=True, fmt='d', cmap='Blues', xticklabels=['No CVD', 'CVD'], yticklabels=['No CVD', 'CVD'])
    plt.title(f'{model_name} Confusion Matrix')
    plt.xlabel('Predicted')
    plt.ylabel('True')
    plt.show()

# Evaluate SVM models
svm_linear = SVC(kernel='linear')
evaluate_model(svm_linear, 'SVM (Linear Kernel)')

svm_rbf = SVC(kernel='rbf')
evaluate_model(svm_rbf, 'SVM (RBF Kernel)')

#### **Random Forest Model**

In [None]:
# Train a Random Forest Model
rf_model = RandomForestClassifier(n_estimators=100, random_state=42)
rf_model.fit(x_train, y_train)

# Make predictions on the test set
y_pred_rf = rf_model.predict(x_test)

# Evaluate the model
accuracy_rf = accuracy_score(y_test, y_pred_rf)
precision_rf = precision_score(y_test, y_pred_rf)
recall_rf = recall_score(y_test, y_pred_rf)
f1_rf = f1_score(y_test, y_pred_rf)

# Print evaluation metrics
print(f'Accuracy: {accuracy_rf:.2f}')
print(f'Precision: {precision_rf:.2f}')
print(f'Recall: {recall_rf:.2f}')
print(f'F1 Score: {f1_rf:.2f}')

# Confusion Matrix
conf_matrix_rf = confusion_matrix(y_test, y_pred_rf)

# Visualize the Confusion Matrix
sns.heatmap(conf_matrix_rf, annot=True, fmt='d', cmap='Oranges',
            xticklabels=['No CVD', 'CVD'], yticklabels=['No CVD', 'CVD'])
plt.xlabel('Predicted')
plt.ylabel('True')
plt.title('Random Forest Confusion Matrix')
plt.show()

#### **Fully Connected Neural Network with different parameter fine tuning**

In [None]:
def create_fcnn_model(input_shape, learning_rate, dropout_rate):
    model = models.Sequential()
    model.add(layers.Input(shape=(input_shape,)))
    model.add(layers.Dense(64, activation='relu'))
    model.add(layers.BatchNormalization())
    model.add(layers.Dropout(dropout_rate))
    model.add(layers.Dense(32, activation='relu'))
    model.add(layers.BatchNormalization())
    model.add(layers.Dropout(dropout_rate))
    model.add(layers.Dense(1, activation='sigmoid'))

    model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=learning_rate),
                  loss='binary_crossentropy',
                  metrics=['accuracy'])

    return model

# Parameter grid to search over
param_grid_fcnn = {
    'learning_rate': [0.01, 1e-3, 5e-4, 1e-4],  # Tunable learning rates
    'dropout_rate': [0.3, 0.5, 0.7],  # Dropout rates
    'batch_size': [32, 64],  # Various batch sizes
    'epochs': [50, 100]  # Number of epochs
}

# Generate all combinations of the parameters
param_combinations = list(itertools.product(param_grid_fcnn['learning_rate'],
                                            param_grid_fcnn['dropout_rate'],
                                            param_grid_fcnn['batch_size'],
                                            param_grid_fcnn['epochs']))

# To store the results
best_accuracy = 0
best_params = None

# Loop over all combinations of hyperparameters
for lr, dropout_rate, batch_size, epochs in param_combinations:
    print(f"\nTraining with parameters: learning_rate={lr}, dropout_rate={dropout_rate}, batch_size={batch_size}, epochs={epochs}")

    # Create the model with the current set of parameters
    input_shape = x_train.shape[1]
    fcnn_model = create_fcnn_model(input_shape, lr, dropout_rate)

    # Train the model
    fcnn_model.fit(x_train, y_train, validation_data=(x_val, y_val),
                   epochs=epochs, batch_size=batch_size, verbose=0)

    # Predict on the test set
    y_pred_fcnn = (fcnn_model.predict(x_test) > 0.5).astype("int32")  # Binarize predictions (threshold = 0.5)

    # Calculate evaluation metrics
    accuracy_fcnn = accuracy_score(y_test, y_pred_fcnn)
    precision_fcnn = precision_score(y_test, y_pred_fcnn)
    recall_fcnn = recall_score(y_test, y_pred_fcnn)
    f1_fcnn = f1_score(y_test, y_pred_fcnn)

    # Print the results for this combination
    print(f"Accuracy: {accuracy_fcnn:.2f}, Precision: {precision_fcnn:.2f}, Recall: {recall_fcnn:.2f}, F1 Score: {f1_fcnn:.2f}")

    # Update the best model if necessary
    if accuracy_fcnn > best_accuracy:
        best_accuracy = accuracy_fcnn
        best_params = {'learning_rate': lr, 'dropout_rate': dropout_rate, 'batch_size': batch_size, 'epochs': epochs}
        print(f"New best model found with accuracy: {best_accuracy:.2f}")

# Print the best hyperparameters
print(f"\nBest Hyperparameters: {best_params}")
print(f"Best Accuracy: {best_accuracy:.2f}")


#### **Convolutional Neural Network (CNN)**

In [None]:
# For CNN, reshape to 3D (samples, timesteps, features)
x_train_cnn = np.array(x_train).reshape(x_train.shape[0], x_train.shape[1], 1)
x_test_cnn = np.array(x_test).reshape(x_test.shape[0], x_test.shape[1], 1)

# Define the CNN model
cnn_model = tf.keras.models.Sequential()

# Add Conv1D layers with LeakyReLU activation and BatchNormalization
cnn_model.add(Conv1D(filters=32, kernel_size=(3,), padding='same', activation=tf.keras.layers.LeakyReLU(alpha=0.001), input_shape=(x_train_cnn.shape[1], 1)))
cnn_model.add(BatchNormalization())  # Batch Normalization

cnn_model.add(Conv1D(filters=64, kernel_size=(3,), padding='same', activation=tf.keras.layers.LeakyReLU(alpha=0.001)))
cnn_model.add(BatchNormalization())  # Batch Normalization

cnn_model.add(Conv1D(filters=128, kernel_size=(3,), padding='same', activation=tf.keras.layers.LeakyReLU(alpha=0.001)))
cnn_model.add(BatchNormalization())  # Batch Normalization

# Add pooling, dropout, and flatten layer
cnn_model.add(MaxPool1D(pool_size=(3,), strides=2, padding='same'))
cnn_model.add(Dropout(0.5))
cnn_model.add(Flatten())

# Add dense layers with LeakyReLU activation and BatchNormalization
cnn_model.add(Dense(units=256, activation=tf.keras.layers.LeakyReLU(alpha=0.001)))
cnn_model.add(BatchNormalization())  # Batch Normalization

cnn_model.add(Dense(units=512, activation=tf.keras.layers.LeakyReLU(alpha=0.001)))
cnn_model.add(BatchNormalization())  # Batch Normalization

# Final output layer for binary classification
cnn_model.add(Dense(units=1, activation='sigmoid'))  # Sigmoid activation for binary classification

# Compile the model
cnn_model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])

# Print model summary
cnn_model.summary()

# Train the model using 3D reshaped data for CNN
cnn_model_history = cnn_model.fit(x_train_cnn, y_train, epochs=50, batch_size=10, validation_data=(x_test_cnn, y_test))

# Evaluate the CNN model on the test data
y_pred_prob = cnn_model.predict(x_test_cnn)
y_pred = (y_pred_prob > 0.5).astype("int32")  # Convert probabilities to binary predictions

# Calculate evaluation metrics
accuracy = accuracy_score(y_test, y_pred)
precision = precision_score(y_test, y_pred)
recall = recall_score(y_test, y_pred)
f1 = f1_score(y_test, y_pred)

# Print evaluation results
print(f'Accuracy: {accuracy:.2f}')
print(f'Precision: {precision:.2f}')
print(f'Recall: {recall:.2f}')
print(f'F1 Score: {f1:.2f}')

# Confusion Matrix
conf_matrix = confusion_matrix(y_test, y_pred)

# Visualize the Confusion Matrix using a heatmap
plt.figure(figsize=(6,4))
sns.heatmap(conf_matrix, annot=True, fmt='d', cmap='Blues', xticklabels=['No Disease', 'Disease'], yticklabels=['No Disease', 'Disease'])
plt.title('Confusion Matrix')
plt.xlabel('Predicted Label')
plt.ylabel('True Label')
plt.show()
