In [None]:
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
import seaborn as sns
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
from sklearn.model_selection import train_test_split
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import KFold
from torch.utils.data import DataLoader, TensorDataset
from sklearn.metrics import r2_score as sklearn_r2_score
from sklearn.metrics import r2_score

In [None]:

X_train = pd.read_csv('../../datasets/Admission/X_train.csv',index_col=0)  
X_test = pd.read_csv('../../datasets/Admission/X_test.csv',index_col=0)  
y_train = pd.read_csv('../../datasets/Admission/y_train.csv',index_col=0)  
y_test = pd.read_csv('../../datasets/Admission/y_test.csv',index_col=0)
# Specify the split ratio. For example, let's use 80% for training and 20% for validation
split_ratio = 0.2

X_train, X_val, y_train, y_val = train_test_split(X_train, y_train, test_size=split_ratio, random_state=42)


In [None]:
from sklearn.preprocessing import StandardScaler
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_val_scaled = scaler.transform(X_val)
X_test_scaled = scaler.transform(X_test)

In [None]:
import torch.nn as nn

class Net(nn.Module):
    def __init__(self, hidden_layers, hidden_units, dropout, dropout_array, input_dim, output_dim, activation=nn.ReLU(),norm=False):
        if hidden_layers != len(hidden_units):
            print("Error: wrong size of hidden_layers or hidden_units")
            return
        layers = []
        i = 0
        if norm:
            layers.append(nn.BatchNorm1d(input_dim))

        super(Net, self).__init__()
        
        layers.append(nn.Linear(input_dim, hidden_units[i]))
        layers.append(activation)
        if dropout:
            layers.append(nn.Dropout(dropout_array[i]))

        for _ in range(hidden_layers - 1):
            i += 1
            layers.append(nn.Linear(hidden_units[i-1], hidden_units[i]))
            layers.append(activation)
            if dropout:
                layers.append(nn.Dropout(dropout_array[i]))

        layers.append(nn.Linear(hidden_units[-1], output_dim))
        self.net = nn.Sequential(*layers)

    def forward(self, x):
        x = x.view(x.size(0), -1)
        return self.net(x)


In [None]:
config={'batch_size': 8,
             'lr': 5e-05,
             'num_epochs': 300,
             'hidden_layers': 3,
             'hidden_units': [128, 16, 32],
             'dropout': False,
             'dropout_array': [0.5262243916801222, 0.17160622713412305, 0.28392396173952034],
             'activation': nn.ReLU(),
             'norm': True,
             'patience': 20
            }

In [None]:
input_dim = X_train_scaled.shape[1]
output_dim = 1

In [None]:
train_ds = TensorDataset(torch.Tensor(X_train_scaled), torch.Tensor(y_train.to_numpy()))
val_ds = TensorDataset(torch.Tensor(X_val_scaled), torch.Tensor(y_val.to_numpy()))

test_ds = TensorDataset(torch.Tensor(X_test_scaled), torch.Tensor(y_test.to_numpy()))

In [None]:
train_loader = DataLoader(train_ds, batch_size=config["batch_size"], shuffle=True)
val_loader = DataLoader(val_ds, batch_size=config["batch_size"])

test_loader = DataLoader(test_ds, batch_size=config["batch_size"])

In [None]:
net = Net(hidden_layers=config["hidden_layers"], hidden_units=config["hidden_units"], dropout=config["dropout"], dropout_array=config["dropout_array"], input_dim=input_dim, output_dim=output_dim, activation=config["activation"], norm=config["norm"])

In [None]:
class RMSELoss(nn.Module):
    def __init__(self):
        super().__init__()
        self.mse = nn.MSELoss()
        
    def forward(self,yhat,y):
        return torch.sqrt(self.mse(yhat,y))

In [None]:
criterion = RMSELoss()
optimizer = optim.Adam(net.parameters(), lr=config["lr"])

In [None]:
from torchsummary import summary
device = torch.device("cuda" if torch.cuda.is_available() else "cpu") # PyTorch v0.4.0
net = net.to(device)

In [None]:
summary(net,input_size=(7,1))

In [None]:
def train_net(net, train_loader, criterion, optimizer, device):
    net.train()
    train_loss = 0
    for i, (inputs, targets) in enumerate(train_loader):
        inputs, targets = inputs.to(device), targets.to(device).view(-1, 1)

        optimizer.zero_grad()

        outputs = net(inputs)
        loss = criterion(outputs, targets)
        loss.backward()
        optimizer.step()

        train_loss += loss.item()

    return train_loss / len(train_loader), net


def test_net(net, test_loader, criterion, device):
    net.eval()
    test_loss = 0
    with torch.no_grad():
        for inputs, targets in test_loader:
            inputs, targets = inputs.to(device), targets.to(device).view(-1, 1)

            outputs = net(inputs)
            loss = criterion(outputs, targets)

            test_loss += loss.item()

    return test_loss / len(test_loader)


def train(config):
    train_loader = DataLoader(train_ds, batch_size=config["batch_size"], shuffle=True)
    val_loader = DataLoader(val_ds, batch_size=config["batch_size"])
    net = Net(hidden_layers=config["hidden_layers"], hidden_units=config["hidden_units"], dropout=config["dropout"], dropout_array=config["dropout_array"], input_dim=input_dim, output_dim=output_dim, activation=config["activation"], norm=config["norm"])
    criterion = RMSELoss()
    optimizer = optim.Adam(net.parameters(), lr=config["lr"])
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    net.to(device)

    train_losses = []
    val_losses = []

    best_val_loss = float("inf")
    patience = config['patience']
    early_stop = False

    for epoch in range(config["num_epochs"]):
        train_loss,net = train_net(net, train_loader, criterion, optimizer, device)
        val_loss = test_net(net, val_loader, criterion, device)

        train_losses.append(train_loss)
        val_losses.append(val_loss)

        print(f"Epoch [{epoch+1}/{config['num_epochs']}], Train Loss: {train_loss:.4f}, Test Loss: {val_loss:.4f}")

        # Early stopping
        if val_loss < best_val_loss:
            best_val_loss = val_loss
            patience = 10
        else:
            patience -= 1
            if patience == 0:
                print("Early stopping...")
                early_stop = True
                break

    if not early_stop:
        print("Training complete.")

    return train_losses, val_losses, net


In [None]:
train_losses, val_losses, mod = train(config)

In [None]:
def plot_losses(train_losses, val_losses):
    plt.plot(train_losses, label="Train Loss")
    plt.plot(val_losses, label="val Loss")
    plt.xlabel("Epoch")
    plt.ylabel("Loss")
    plt.legend()
    plt.show()

In [None]:
plot_losses(train_losses, val_losses)

In [None]:
def plot_predictions(net, data_loader, device):
    net.eval()
    y_true = []
    y_pred = []
    with torch.no_grad():
        for inputs, targets in data_loader:
            inputs, targets = inputs.to(device), targets.to(device).view(-1, 1)
            outputs = net(inputs)
            y_true.extend(targets.cpu().numpy())
            y_pred.extend(outputs.cpu().numpy())

    plt.scatter(y_true, y_pred)
    plt.xlabel("True Values")
    plt.ylabel("Predictions")
    plt.show()

def r_squared(y_true, y_pred):
    y_bar = np.mean(y_true)
    ss_tot = np.sum((y_true - y_bar) ** 2)
    ss_res = np.sum((y_true - y_pred) ** 2)
    r2 = 1 - (ss_res / ss_tot)
    return r2


In [None]:
#train_losses, test_losses = train(config)
plot_losses(train_losses, val_losses)
plot_predictions(mod, test_loader, device)
y_true = y_test
y_pred = mod(torch.Tensor(X_test_scaled).to(device)).cpu().detach().numpy()
r2 = r2_score(y_true, y_pred)
print("R^2 score:", r2)
y_train_tensor = torch.Tensor(y_train.values).to(device)
y_val_tensor = torch.Tensor(y_val.values).to(device)
y_test_tensor = torch.Tensor(y_test.values).to(device)
print("Train_loss:",criterion(mod(torch.Tensor(X_train_scaled).to(device)),y_train_tensor).item())
print("Validation_loss:",criterion(mod(torch.Tensor(X_val_scaled).to(device)),y_val_tensor).item())
print("Test_loss:",criterion(mod(torch.Tensor(X_test_scaled).to(device)),y_test_tensor).item())


In [None]:
torch.save(mod, '../../Models/admission_model_72')

In [None]:
from captum.attr import IntegratedGradients
from captum.attr import LayerConductance
from captum.attr import NeuronConductance
from captum.attr import ShapleyValues

In [None]:
mod = mod.to(device)
x_test_tensor = torch.Tensor(X_test_scaled).requires_grad_().to(device)


In [None]:
ig = IntegratedGradients(mod)
attr, delta = ig.attribute(x_test_tensor, return_convergence_delta=True)
attr = attr.detach().cpu().numpy()

In [None]:
print(np.mean(attr, axis=0))

In [None]:
# Helper method to print importances and visualize distribution
def visualize_importances(feature_names, importances, title="Average Feature Importances", plot=True, axis_title="Features"):
    print(title)
    for i in range(len(feature_names)):
        print(feature_names[i], ": ", '%.4f'%(importances[i]))
    x_pos = (np.arange(len(feature_names)))
    if plot:
        plt.figure(figsize=(12,6))
        plt.bar(x_pos, importances, align='center')
        plt.xticks(x_pos, feature_names, wrap=True)
        plt.xlabel(axis_title)
        plt.title(title)
visualize_importances(df.columns, np.mean(attr, axis=0))

In [None]:
svs = ShapleyValues(net)

In [None]:
attr_2 = svs.attribute(x_test_tensor)
attr_2 = attr_2.detach().cpu().numpy()

In [None]:
print(np.mean(attr_2, axis=0))

In [None]:
visualize_importances(df.columns, np.mean(attr_2, axis=0))