In [None]:
import pandas as pd
import numpy as np
import math
from sklearn.preprocessing import StandardScaler
from imblearn.over_sampling import SMOTE
from sklearn import svm
from sklearn.model_selection import cross_val_score
from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plt
from sklearn.manifold import TSNE
from sklearn.preprocessing import OneHotEncoder
from sklearn.datasets import make_classification

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score
from torch.utils.data import TensorDataset, DataLoader
import torch.nn.init as init
import random

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
import warnings
warnings.filterwarnings('ignore')

In [None]:
df_knn = pd.read_csv('/content/drive/MyDrive/Colab Notebooks/df_knn.csv')

In [None]:
# split feature and label
X = df_knn.drop('CVD0010', axis=1)
y = df_knn['CVD0010']

In [None]:
# normalize X_train, X_test
scaler = StandardScaler()
X = pd.DataFrame(scaler.fit_transform(X), columns=X.columns)

In [None]:
# split train and test
X_train, X_test, y_train, y_test = train_test_split(X, y, random_state=42, test_size=0.2)

In [None]:
# SMOTE
print("Before OverSampling, counts of label '1': {}".format(sum(y_train==1)))
print("Before OverSampling, counts of label '0': {} \n".format(sum(y_train==0)))

X_res, y_res = SMOTE().fit_resample(X_train,y_train)

print('After OverSampling, the shape of train_X: {}'.format(X_res.shape))
print('After OverSampling, the shape of train_y: {} \n'.format(y_res.shape))

print("After OverSampling, counts of label '1': {}".format(sum(y_res==1)))
print("After OverSampling, counts of label '0': {}".format(sum(y_res==0)))

In [None]:
# Get new data generated by smote
X_resampled = X_res[(X_train.shape[0]):]

In [None]:
# Get new data with label value 1.0
X_real = X_train[y_train == 1]

In [None]:
# hyperparameter
n_epochs = 4000
batch_size = 128
latent_dim = X_train.shape[1] # 513

In [None]:
# GAN model
# generator_block
def get_generator_block(input_dim, output_dim):
    return nn.Sequential(
        nn.Linear(input_dim, output_dim),
        nn.BatchNorm1d(output_dim),
        nn.LeakyReLU(0.2, inplace=True),
    )
# generator
class Generator(nn.Module):
    def __init__(self, z_dim=latent_dim, data_dim=latent_dim):
        super(Generator, self).__init__()
        self.gen = nn.Sequential(
            get_generator_block(z_dim, 1024),
            get_generator_block(1024, 2048),
            get_generator_block(2048, 4096),
            get_generator_block(4096, 2048),
            get_generator_block(2048, 1024),
            nn.Linear(1024, data_dim),
        )

        self.init_weights()

    def init_weights(self):
        for m in self.modules():
            if isinstance(m, nn.Linear):
                init.kaiming_uniform_(m.weight.data, nonlinearity='leaky_relu')
                if m.bias is not None:
                    init.zeros_(m.bias.data)

    def forward(self, noise):
        return self.gen(noise)

# discriminator_block
def get_discriminator_block(input_dim, output_dim):
    return nn.Sequential(
        nn.Linear(input_dim, output_dim),
        nn.LeakyReLU(0.2, inplace=True),
        nn.Dropout(0.5)
    )
# 2.discriminator
class Discriminator(nn.Module):
    def __init__(self, data_dim=latent_dim):
        super(Discriminator, self).__init__()
        self.disc = nn.Sequential(
            get_discriminator_block(data_dim, 512),
            get_discriminator_block(512, 1024),
            get_discriminator_block(1024, 512),
            get_discriminator_block(512, 256),
            nn.Linear(256, 1),
            nn.Sigmoid(),
        )
    def forward(self, data):
        return self.disc(data)

In [None]:
# Loss function
criterion = nn.BCELoss()

# Initialize generator and discriminator
generator = Generator()
discriminator = Discriminator()

# Optimizers
g_optimizer = torch.optim.Adam(generator.parameters(), lr=0.00001)
d_optimizer = torch.optim.Adam(discriminator.parameters(), lr=0.00001)

# All the training set data with a label value of 1 are input into the discriminator as real data
indices = (y_train == 1)
X_real = X_train[indices]
y_real = y_train[indices]
my_dataset = TensorDataset(torch.Tensor(X_real.values), torch.Tensor(y_real.values))
# dataloader
dataloader = DataLoader(my_dataset, batch_size=batch_size, shuffle=True)

In [None]:
# Train
gen_losses = []
disc_losses = []
for epoch in range(n_epochs):
    for i, real_data in enumerate(dataloader):
        real_data = real_data[0].float()

        # ---------------------
        #  First Train Discriminator
        # ---------------------

        d_optimizer.zero_grad()

        # Measure discriminator's ability to classify real from generated samples
        disc_real_pred = discriminator(real_data)
        real_loss = criterion(disc_real_pred, torch.ones_like(disc_real_pred))

        gen_datas = generator(torch.Tensor(X_resampled.values).float())
        disc_fake_pred = discriminator(gen_datas.detach())
        fake_loss = criterion(disc_fake_pred, torch.zeros_like(disc_fake_pred))

        d_loss = (real_loss + fake_loss) / 2

        d_loss.backward()
        d_optimizer.step()

        # -----------------
        #  Train Generator
        # -----------------

        g_optimizer.zero_grad()
        gen_datas = generator(torch.Tensor(X_resampled.values).float())
        disc_fake_pred = discriminator(gen_datas)
        g_loss = criterion(disc_fake_pred, torch.ones_like(disc_fake_pred))

        g_loss.backward()
        g_optimizer.step()

        gen_losses.append(g_loss.item())
        disc_losses.append(d_loss.item())

        print(
            "[Epoch %d/%d] [Batch %d/%d] [D loss: %f] [G loss: %f]"
            % (epoch, n_epochs, i, len(dataloader), d_loss.item(), g_loss.item())
        )

In [None]:
# Convert lists to NumPy arrays
gen_losses = np.array(gen_losses)
disc_losses = np.array(disc_losses)

# Plot loss curves
plt.figure(figsize=(10, 8))
plt.plot(range(len(gen_losses)), gen_losses, label='Generator loss')
plt.plot(range(len(disc_losses)), disc_losses, label='Discriminator loss')
plt.xlabel('Iterations')
plt.ylabel('Loss')
plt.legend()
plt.show()

# t-SNE

In [None]:
real_one = X_train[y_train == 1]

# count the frequency of each value in y_train.
value_counts = y_train.value_counts()
# calculate how much fewer the number of minority classes is than that of majority classes.
diff = value_counts[0] - value_counts[1]

# generator datas
smote_z = torch.tensor(X_resampled.values, dtype=torch.float32)
generated_data_smotegan = generator(smote_z)
generated_data_smotegan_df = pd.DataFrame(generated_data_smotegan.detach().numpy(), columns=X_real.columns)

In [None]:
# t-SNE
# original data (label==1)
tsne_real = real_one # num == 76

# smote data (label==1)
tsne_real_smotegan = generated_data_smotegan_df # num == 392

# TSNE
combined_data = pd.concat([tsne_real_smotegan, tsne_real], axis=0)
labels = np.array([0] * len(tsne_real_smotegan) + [1] * len(tsne_real))
tsne = TSNE(n_components=2, random_state=42)
tsne_data = tsne.fit_transform(combined_data)

# show
plt.figure(figsize=(8, 6))
plt.scatter(tsne_data[labels == 0, 0], tsne_data[labels == 0, 1], c='b', label='SMOTified-GAN data')
plt.scatter(tsne_data[labels == 1, 0], tsne_data[labels == 1, 1], c='r', label='Real data')
plt.legend()
plt.xlabel('Dimension 1')
plt.ylabel('Dimension 2')
plt.title('TSNE comparing real data with SMOTified-GAN generated data')
plt.show()

In [None]:
# PCA
import matplotlib.pyplot as plt
from sklearn.decomposition import PCA

# merge two datas
merged_data = np.concatenate((real_one, generated_data_smotegan_df), axis=0)

# PCA
pca = PCA(n_components=2)  # dim==2

pca.fit(merged_data)
reduced_data = pca.transform(merged_data)

# show
plt.scatter(reduced_data[real_one.shape[0]:, 0], reduced_data[real_one.shape[0]:, 1], label='SMOTified-GAN data')
plt.scatter(reduced_data[:real_one.shape[0], 0], reduced_data[:real_one.shape[0], 1], label='Real data')
plt.xlabel('Principal Component 1')
plt.ylabel('Principal Component 2')
plt.legend()
plt.show()

# SVM

In [None]:
# add the generated data to the training set, X_train_smotegan, y_res
X_train_smotegan = X_train.append(generated_data_smotegan_df, ignore_index=True)
y_train_smotegan = y_res

from sklearn.utils import shuffle
# shuffle
X_train_smotegan, y_train_smotegan = shuffle(X_train_smotegan, y_train_smotegan, random_state=42)

In [None]:
# SVM
import torch
from sklearn.metrics import roc_curve, roc_auc_score, precision_recall_curve, average_precision_score, accuracy_score
from sklearn.svm import SVC

def train_and_evaluate_svm(X_train, y_train, X_test, y_test):
    X_train_tensor = torch.tensor(X_train.values, dtype=torch.float32)
    y_train_tensor = torch.tensor(y_train.values, dtype=torch.long)

    # SVM
    svm_classifier = SVC(probability=True)

    # train
    svm_classifier.fit(X_train_tensor.numpy(), y_train_tensor.numpy())

    X_test_tensor = torch.tensor(X_test.values, dtype=torch.float32)
    y_test_tensor = torch.tensor(y_test.values, dtype=torch.long)

    # test
    y_scores = svm_classifier.predict_proba(X_test_tensor.numpy())[:, 1]
    y_pred = svm_classifier.predict(X_test_tensor.numpy())

    # FPR and TPR for ROC curve
    fpr, tpr, thresholds = roc_curve(y_test_tensor.numpy(), y_scores)

    # AUC
    auc = roc_auc_score(y_test_tensor.numpy(), y_scores)

    return auc

In [None]:
# original data
roc_results = []

# Repeat the training and evaluation process 30 times.
num_repeats = 30
for _ in range(num_repeats):
    auc = train_and_evaluate_svm(X_train, y_train, X_test, y_test)
    roc_results.append(auc)  # Store the AUC value in the tuple

# Calculate the average value of evaluation indicators
mean_auc = np.mean([result for result in roc_results])  # Retrieve the AUC value from the tuple

print("Mean AUC:", mean_auc)

In [None]:
# smote data
roc_results_smotegan = []

# Repeat the training and evaluation process 30 times.
num_repeats = 30
for _ in range(num_repeats):
    auc = train_and_evaluate_svm(X_train_smotegan, y_train_smotegan, X_test, y_test)
    roc_results_smotegan.append(auc)  # Store the AUC value in the tuple

# Calculate the average value of evaluation indicators
mean_auc_smotegan = np.mean([result for result in roc_results_smotegan])  # Retrieve the AUC value from the tuple

print("Mean AUC:", mean_auc_smotegan)