In [47]:
import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
from sklearn.neural_network import MLPClassifier
from sklearn.metrics import accuracy_score

# Load the dataset
df = pd.read_csv('diabetes_prediction_dataset.csv')
df_encoded = pd.get_dummies(df, columns=['gender', 'smoking_history'], drop_first=True)
testDF = df_encoded.sample(frac=1).reset_index(drop=True)
x_unscaled = testDF.drop(['diabetes'], axis=1)
y = testDF['diabetes']

# Normalize the data
numerical_columns = x_unscaled.select_dtypes(include=np.number).columns
boolean_columns = x_unscaled.select_dtypes(include=bool).columns
scaler = StandardScaler()
temp = pd.DataFrame(scaler.fit_transform(x_unscaled[numerical_columns]), columns=numerical_columns)
x_scaled = pd.concat([temp, x_unscaled[boolean_columns]], axis=1)
x_train, x_test, y_train, y_test = train_test_split(x_scaled, y, test_size=0.2, random_state=42)

In [48]:
# Define the Encoder model with reduced complexity and dropout
class Encoder(nn.Module):
    def __init__(self, input_dim, encoding_dim):
        super(Encoder, self).__init__()
        self.encoder = nn.Sequential(
            nn.Linear(input_dim, 128),
            nn.ReLU(),
            nn.Dropout(0.2),
            nn.Linear(128, 64),
            nn.ReLU(),
            nn.Dropout(0.2),
            nn.Linear(64, encoding_dim)
        )

    def forward(self, x):
        return self.encoder(x)

# Define the Decoder model with reduced complexity and dropout
class Decoder(nn.Module):
    def __init__(self, encoding_dim, input_dim):
        super(Decoder, self).__init__()
        self.decoder = nn.Sequential(
            nn.Linear(encoding_dim, 64),
            nn.ReLU(),
            nn.Dropout(0.2),
            nn.Linear(64, 128),
            nn.ReLU(),
            nn.Dropout(0.2),
            nn.Linear(128, input_dim)
        )

    def forward(self, x):
        return self.decoder(x)

In [49]:
# Define input dimensions
input_dim = x_train.shape[1]
encoding_dim = 8

# Load the trained autoencoder models
encoder_model_file = './models/encoder2.pth'
decoder_model_file = './models/decoder2.pth'

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

encoder = Encoder(input_dim, encoding_dim).to(device)
decoder = Decoder(encoding_dim, input_dim).to(device)
encoder.load_state_dict(torch.load(encoder_model_file))
decoder.load_state_dict(torch.load(decoder_model_file))
encoder.eval()
decoder.eval()

# Define the Autoencoder model
class Autoencoder(nn.Module):
    def __init__(self, encoder, decoder):
        super(Autoencoder, self).__init__()
        self.encoder = encoder
        self.decoder = decoder

    def forward(self, x):
        encoded = self.encoder(x)
        decoded = self.decoder(encoded)
        return decoded

autoencoder = Autoencoder(encoder, decoder).to(device)

  encoder.load_state_dict(torch.load(encoder_model_file))
  decoder.load_state_dict(torch.load(decoder_model_file))


## Original MLP model

In [50]:
import torch
import numpy as np
from sklearn.metrics import accuracy_score, confusion_matrix

# Define the original MLP model
class MLP(nn.Module):
    def __init__(self, input_dim):
        super(MLP, self).__init__()
        self.model = nn.Sequential(
            nn.Linear(input_dim, 64),
            nn.ReLU(),
            nn.Linear(64, 32),
            nn.ReLU(),
            nn.Linear(32, 1),
            nn.Sigmoid()  # Sigmoid for binary classification
        )
        
    def forward(self, x):
        return self.model(x)

# Load the original MLP model
mlp_model_file = './models/mlp_model.pth'
input_dim = x_train.shape[1]
original_mlp = MLP(input_dim).to(device)
original_mlp.load_state_dict(torch.load(mlp_model_file))
original_mlp.eval()

# Evaluate the original MLP model
with torch.no_grad():
    x_test_tensor = torch.tensor(x_test.values, dtype=torch.float32).to(device)
    y_test_tensor = torch.tensor(y_test.values, dtype=torch.float32).to(device)
    y_pred_original = original_mlp(x_test_tensor).cpu().numpy().flatten()
    below_0_5 = np.sum(y_pred_original < 0.5)
    above_0_5 = np.sum(y_pred_original >= 0.5)
    print(f"Values below 0.5: {below_0_5}")
    print(f"Values above 0.5: {above_0_5}")
    y_pred_original = (y_pred_original > 0.5).astype(int)  # Convert to binary predictions

# Calculate accuracy
original_accuracy = accuracy_score(y_test, y_pred_original)
print(f"Original MLP Accuracy: {original_accuracy * 100:.2f}%")

# Calculate confusion matrix
conf_matrix_original = confusion_matrix(y_test, y_pred_original)

# Extract values from confusion matrix
tn, fp, fn, tp = conf_matrix_original.ravel()

print(f"Total correct predictions: {tn + tp}")
print(f"Total wrong predictions: {fp + fn}\n")
print(f"True Negatives: {tn}")
print(f"False Positives: {fp}")
print(f"False Negatives: {fn}")
print(f"True Positives: {tp}")

Values below 0.5: 18681
Values above 0.5: 1319
Original MLP Accuracy: 97.03%
Total correct predictions: 19406
Total wrong predictions: 594

True Negatives: 18199
False Positives: 112
False Negatives: 482
True Positives: 1207


  original_mlp.load_state_dict(torch.load(mlp_model_file))


### Simple error correction function by adding a bias (no learning)

In [51]:
import numpy as np
import torch
import torch.nn.functional as F  # For activation functions

def error_correction_function(mlp, autoencoder, x, bias_factor):
    # Ensure all computations happen on the correct device
    with torch.no_grad():
        # Calculate reconstruction error using the autoencoder
        x_tensor = torch.tensor(x, dtype=torch.float32).to(device)
        reconstructed = autoencoder(x_tensor).cpu().numpy()
    reconstruction_error = np.mean(np.square(x - reconstructed), axis=1)
    threshold = np.mean(reconstruction_error)
    
    # Make predictions with the MLP model
    with torch.no_grad():
        x_test_tensor = torch.tensor(x, dtype=torch.float32).to(device)
        y_pred_proba = mlp(x_test_tensor).cpu().numpy().flatten()

    # Adjust predictions based on reconstruction error with adaptive bias factor
    y_pred_proba_corrected = y_pred_proba.copy()
    high_error_indices = np.where(reconstruction_error > threshold)[0]
    print("Number of high error indices: ", len(high_error_indices))
    for idx in high_error_indices:
        adaptive_bias = bias_factor * (reconstruction_error[idx] / threshold)
        if y_pred_proba[idx] < 0.5:
            y_pred_proba_corrected[idx] += adaptive_bias
        else:
            y_pred_proba_corrected[idx] -= adaptive_bias
    # y_pred_proba_corrected[high_error_indices] += bias_factor  # Apply bias factor
    
    initial_zeros = np.sum(y_pred_proba[high_error_indices] < 0.5)
    initial_ones = np.sum(y_pred_proba[high_error_indices] >= 0.5)
    print(f"Initial predictions of 0: {initial_zeros}")
    print(f"Initial predictions of 1: {initial_ones}")
    changed_predictions = np.sum((y_pred_proba[high_error_indices] > 0.5).astype(int) != (y_pred_proba_corrected[high_error_indices] > 0.5).astype(int))
    print(f"Number of changed predictions: {changed_predictions}")
    
    y_pred_proba_corrected = np.clip(y_pred_proba_corrected, 0, 1)  # Ensure probabilities are in [0, 1]

    # Convert corrected probabilities to binary predictions
    y_pred_corrected = (y_pred_proba_corrected > 0.5).astype(int)
    
    return y_pred_corrected, reconstruction_error


In [52]:
y_pred_corrected, reconstruction_error = error_correction_function(original_mlp, autoencoder, x_test.values, bias_factor=0.05)

Number of high error indices:  4324
Initial predictions of 0: 3399
Initial predictions of 1: 925
Number of changed predictions: 349


In [53]:
from sklearn.metrics import accuracy_score, confusion_matrix

# Calculate accuracy
corrected_accuracy = accuracy_score(y_test, y_pred_corrected)
print(f"Corrected Accuracy: {corrected_accuracy * 100:.2f}%")

# Calculate confusion matrix
conf_matrix_corrected = confusion_matrix(y_test, y_pred_corrected)

# Print confusion matrix results
tn, fp, fn, tp = conf_matrix_corrected.ravel()
print(f"Corrected - True Negatives: {tn}")
print(f"Corrected - False Positives: {fp}")
print(f"Corrected - False Negatives: {fn}")
print(f"Corrected - True Positives: {tp}")


Corrected Accuracy: 95.98%
Corrected - True Negatives: 18081
Corrected - False Positives: 230
Corrected - False Negatives: 573
Corrected - True Positives: 1116


### TRYING OUT ERROR-CORRECTING MODEL

In [54]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np

# Define the Bias Predictor Model
class BiasPredictor(nn.Module):
    def __init__(self):
        super(BiasPredictor, self).__init__()
        self.fc1 = nn.Linear(14, 128)  # Increased neurons
        self.dropout1 = nn.Dropout(0.3)  # Increased dropout rate
        
        self.fc2 = nn.Linear(128, 64)  # Increased neurons
        self.dropout2 = nn.Dropout(0.3)  # Increased dropout rate
        
        self.fc3 = nn.Linear(64, 32)
        self.dropout3 = nn.Dropout(0.3)  # Increased dropout rate
        
        self.fc4 = nn.Linear(32, 16)
        self.fc5 = nn.Linear(16, 8)
        self.fc6 = nn.Linear(8, 1)  # Output layer for bias correction

    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = self.dropout1(x)
        
        x = F.relu(self.fc2(x))
        x = self.dropout2(x)
        
        x = F.relu(self.fc3(x))
        x = self.dropout3(x)
        
        x = F.relu(self.fc4(x))
        x = F.relu(self.fc5(x))
        return self.fc6(x)

# Prepare the training data for the bias predictor
# def prepare_bias_data(reconstruction_errors, original_probs, ground_truth):
#     # Use reconstruction errors as the only input feature
#     features = reconstruction_errors.reshape(-1, 1)
#     # print(features)
#     print("Minimum reconstruction error: ", np.min(features))
#     print("Maximum reconstruction error: ", np.max(features))
#     # Compute target biases
#     target_bias = ground_truth - original_probs
#     print("Minimum bias: ", np.min(target_bias))
#     print("Maximum bias: ", np.max(target_bias))
#     return torch.tensor(features, dtype=torch.float32), torch.tensor(target_bias, dtype=torch.float32)
def prepare_bias_data(reconstruction_errors, x_test, original_probs, ground_truth):
    # Combine reconstruction errors with the existing normalized features
    features = np.hstack((reconstruction_errors.reshape(-1, 1), x_test.values))
    
    # Compute target biases
    target_bias = ground_truth - original_probs
    print("Minimum bias: ", np.min(target_bias))
    print("Maximum bias: ", np.max(target_bias))
    
    return torch.tensor(features, dtype=torch.float32), torch.tensor(target_bias, dtype=torch.float32)


# Define the device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Train the Bias Predictor
# def train_bias_predictor(features, target_bias, epochs=100, learning_rate=0.001):
#     model = BiasPredictor().to(device)
#     optimizer = optim.Adam(model.parameters(), lr=learning_rate)
#     criterion = nn.MSELoss()

#     for epoch in range(epochs):
#         model.train()
#         optimizer.zero_grad()
#         predictions = model(features).squeeze()
#         loss = criterion(predictions, target_bias)
#         loss.backward()
#         optimizer.step()

#         if (epoch + 1) % 10 == 0:
#             print(f"Epoch {epoch+1}/{epochs}, Loss: {loss.item()}")

#     return model

#####################################################################################################

# Train the Bias Predictor with Class Weights
# def train_bias_predictor_with_weights(features, target_bias, ground_truth, epochs=100, learning_rate=0.001):
def train_bias_predictor_with_weights(features, target_bias, ground_truth, epochs=100, learning_rate=0.001, weight_positive_scale=1.0, weight_negative_scale=1.0):
    model = BiasPredictor().to(device)
    optimizer = optim.Adam(model.parameters(), lr=learning_rate)
    
    # Define class weights
    num_positive = np.sum(ground_truth == 1)
    num_negative = np.sum(ground_truth == 0)
    total = len(ground_truth)

    weight_positive = (total / (2 * num_positive)) * weight_positive_scale
    weight_negative = (total / (2 * num_negative)) * weight_negative_scale 

    # Convert weights to tensors
    class_weights = torch.tensor([weight_negative, weight_positive], dtype=torch.float32).to(device)
    criterion = nn.MSELoss(reduction='none')  # Use 'none' to apply weights manually

    for epoch in range(epochs):
        model.train()
        optimizer.zero_grad()
        predictions = model(features).squeeze()
        
        # Calculate weighted loss
        loss = criterion(predictions, target_bias)
        weights = torch.where(
            target_bias > 0,  # Assign weights based on ground truth class
            class_weights[1],  # Positive class weight
            class_weights[0]   # Negative class weight
        )
        weighted_loss = torch.mean(loss * weights)
        
        weighted_loss.backward()
        optimizer.step()

        # if (epoch + 1) % 10 == 0:
        #     print(f"Epoch {epoch+1}/{epochs}, Loss: {weighted_loss.item()}")

    return model

#####################################################################################################

# Apply bias correction during inference
# def apply_bias_correction(bias_predictor, reconstruction_errors, original_probs):
#     # Create feature tensor and move it to the correct device
#     features = torch.tensor(reconstruction_errors.reshape(-1, 1), dtype=torch.float32).to(next(bias_predictor.parameters()).device)
#     with torch.no_grad():
#         bias_correction = bias_predictor(features).squeeze().cpu().numpy()
#     # Apply the correction and clip probabilities to [0, 1]
#     corrected_probs = np.clip(original_probs + bias_correction, 0, 1)
#     return corrected_probs

def apply_bias_correction(bias_predictor, reconstruction_errors, x_test, original_probs):
    # Combine normalized reconstruction errors with other features
    combined_features = np.hstack((reconstruction_errors.reshape(-1, 1), x_test.values))
    features = torch.tensor(combined_features, dtype=torch.float32).to(next(bias_predictor.parameters()).device)
    
    with torch.no_grad():
        bias_correction = bias_predictor(features).squeeze().cpu().numpy()
    
    # Apply the correction and clip probabilities to [0, 1]
    corrected_probs = np.clip(original_probs + bias_correction, 0, 1)
    return corrected_probs



x_tensor = torch.tensor(x_test.values, dtype=torch.float32).to(device)
with torch.no_grad():
    reconstructed = autoencoder(x_tensor).cpu().numpy()
reconstruction_errors = np.mean(np.square(x_test.values - reconstructed), axis=1)

# Normalizing the reconstruction errors
mean_error = np.mean(reconstruction_errors)
std_error = np.std(reconstruction_errors)
reconstruction_errors_normalized = (reconstruction_errors - mean_error) / std_error

with torch.no_grad():
    x_test_tensor = torch.tensor(x_test.values, dtype=torch.float32).to(device)
    y_test_tensor = torch.tensor(y_test.values, dtype=torch.float32).to(device)
    # logits = original_mlp(x_test_tensor)
    # y_pred_original = torch.sigmoid(logits).cpu().numpy().flatten()
    y_pred_original = original_mlp(x_test_tensor).cpu().numpy().flatten()
    
    
original_probs = y_pred_original
ground_truth = y_test.values


# Prepare the data for training
features, target_bias = prepare_bias_data(reconstruction_errors_normalized, x_test, original_probs, ground_truth)

# Move tensors to the same device as the model
features, target_bias = features.to(device), target_bias.to(device)

# Train the bias predictor
# bias_predictor = train_bias_predictor(features, target_bias)

best_accuracy = 0
best_conf_matrix = None
for pos in np.arange(0.1, 2.0, 0.05):
    for neg in np.arange(1, 2.0, 0.05):
        
        # Train the bias predictor with weights
        bias_predictor = train_bias_predictor_with_weights(features, target_bias, ground_truth, weight_positive_scale=pos, weight_negative_scale=neg)

        # Apply bias correction during inference
        corrected_probs = apply_bias_correction(bias_predictor, reconstruction_errors_normalized, x_test, original_probs)

        # Convert probabilities to binary predictions
        corrected_predictions = (corrected_probs > 0.5).astype(int)
        # Calculate accuracy
        corrected_accuracy = accuracy_score(y_test, corrected_predictions)
        print(f"Corrected Accuracy for positive weight {pos:.2f} and negative weight {neg:.2f}: {corrected_accuracy * 100:.2f}%")

        # Calculate confusion matrix
        conf_matrix_corrected = confusion_matrix(y_test, corrected_predictions)

        if corrected_accuracy > best_accuracy:
            print("new best accuracy: ", corrected_accuracy)
            best_accuracy = corrected_accuracy
            best_conf_matrix = conf_matrix_corrected

# Print the best accuracy and corresponding confusion matrix
print(f"Best Corrected Accuracy: {best_accuracy * 100:.2f}%")
tn, fp, fn, tp = best_conf_matrix.ravel()
print("True Positives: ", tp)
print("True Negatives: ", tn)
print("False Positives: ", fp)
print("False Negatives: ", fn)


Minimum bias:  -0.9969898462295532
Maximum bias:  0.9996659214957617
Corrected Accuracy for positive weight 0.10 and negative weight 1.00: 97.00%
new best accuracy:  0.96995
Corrected Accuracy for positive weight 0.10 and negative weight 1.05: 97.00%
Corrected Accuracy for positive weight 0.10 and negative weight 1.10: 96.97%
Corrected Accuracy for positive weight 0.10 and negative weight 1.15: 97.04%
new best accuracy:  0.97035
Corrected Accuracy for positive weight 0.10 and negative weight 1.20: 97.02%
Corrected Accuracy for positive weight 0.10 and negative weight 1.25: 97.09%
new best accuracy:  0.97095
Corrected Accuracy for positive weight 0.10 and negative weight 1.30: 96.98%
Corrected Accuracy for positive weight 0.10 and negative weight 1.35: 97.04%
Corrected Accuracy for positive weight 0.10 and negative weight 1.40: 97.06%
Corrected Accuracy for positive weight 0.10 and negative weight 1.45: 97.02%
Corrected Accuracy for positive weight 0.10 and negative weight 1.50: 97.04%


In [55]:
# Calculate accuracy
corrected_accuracy = accuracy_score(y_test, corrected_predictions)
print(f"Corrected Accuracy: {corrected_accuracy * 100:.2f}%")

# Calculate confusion matrix
conf_matrix_corrected = confusion_matrix(y_test, corrected_predictions)

# Print confusion matrix results
tn, fp, fn, tp = conf_matrix_corrected.ravel()
print(f"Corrected - True Negatives: {tn}")
print(f"Corrected - False Positives: {fp}")
print(f"Corrected - False Negatives: {fn}")
print(f"Corrected - True Positives: {tp}")


Corrected Accuracy: 94.96%
Corrected - True Negatives: 17631
Corrected - False Positives: 680
Corrected - False Negatives: 328
Corrected - True Positives: 1361


In [56]:
num_ground_truth_ones = np.sum(ground_truth == 1)
print(f"Number of ground truth values equal to one: {num_ground_truth_ones}")

num_ground_truth_zeroes = np.sum(ground_truth == 0)
print(f"Number of ground truth values equal to zero: {num_ground_truth_zeroes}")

Number of ground truth values equal to one: 1689
Number of ground truth values equal to zero: 18311
