In [None]:
import pandas as pd
from sklearn.model_selection import StratifiedKFold
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import accuracy_score, precision_recall_fscore_support, confusion_matrix
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt

# Load and preprocess the training data
data = pd.read_csv('data/dataTrainR85-15.csv') 
print("Columns:", data.columns)
print("Index:", data.index)

class2idx = {'NS': 0, 'S': 1}
data = data[data["StructuralDamage"] != "-"]
data['StructuralDamage'].replace(class2idx, inplace=True)
try:
    X_train = data.iloc[:, 1:-1]
    y_train = data.iloc[:, -1].astype('int')
except IndexError as e:
    print("IndexError:", e)
    print("Data shape:", data.shape)
    raise

scaler = MinMaxScaler()
X_train_scaled = scaler.fit_transform(X_train)

class CustomLoss(nn.Module):
    def __init__(self, fn_cost_weight):
        super(CustomLoss, self).__init__()
        self.fn_cost_weight = fn_cost_weight
        self.bce_loss = nn.BCELoss(reduction='none')

    def forward(self, output, target):
        loss = self.bce_loss(output, target)
        target = target.float()
        false_negatives = ((output < 0.5) & (target == 1)).float().sum()
        weighted_loss = loss + self.fn_cost_weight * false_negatives / target.size(0)
        return weighted_loss.mean()

class MLP(nn.Module):
    def __init__(self, input_dim):
        super(MLP, self).__init__()
        self.fc1 = nn.Linear(input_dim, 64)
        self.bn1 = nn.BatchNorm1d(64)
        self.dropout1 = nn.Dropout(0.3)
        self.fc2 = nn.Linear(64, 128)
        self.bn2 = nn.BatchNorm1d(128)
        self.dropout2 = nn.Dropout(0.3)
        self.fc3 = nn.Linear(128, 256)
        self.bn3 = nn.BatchNorm1d(256)
        self.dropout3 = nn.Dropout(0.3)
        self.fc4 = nn.Linear(256, 512)
        self.bn4 = nn.BatchNorm1d(512)
        self.dropout4 = nn.Dropout(0.3)
        self.fc5 = nn.Linear(512, 1)
        self.sigmoid = nn.Sigmoid()

    def forward(self, x):
        x = self.dropout1(self.bn1(torch.relu(self.fc1(x))))
        x = self.dropout2(self.bn2(torch.relu(self.fc2(x))))
        x = self.dropout3(self.bn3(torch.relu(self.fc3(x))))
        x = self.dropout4(self.bn4(torch.relu(self.fc4(x))))
        x = self.sigmoid(self.fc5(x))
        return x

input_dim = X_train.shape[1]
num_folds = 10
num_runs = 200
best_test_conf_matrix = None
best_train_conf_matrix = None
best_test_fscore = 0.0
best_fn_count = float('inf')
f_scores = []
best_model_state = None
best_train_data = None
best_val_data = None

for run in range(num_runs):
    print(f"Run: {run + 1}")

    model = MLP(input_dim)
    criterion = CustomLoss(fn_cost_weight=3)
    optimizer = optim.Adam(model.parameters(), lr=0.001)

    all_predictions = []
    all_labels = []

    skf = StratifiedKFold(n_splits=num_folds, shuffle=True, random_state=1234)
    fold = 1

    for train_index, val_index in skf.split(X_train_scaled, y_train):
        print(f"Fold: {fold}")
        train_data = X_train.iloc[train_index].copy()
        val_data = X_train.iloc[val_index].copy()
        train_labels = y_train.iloc[train_index].copy()
        val_labels = y_train.iloc[val_index].copy()

        fold_train_data = torch.tensor(X_train_scaled[train_index]).float()
        fold_val_data = torch.tensor(X_train_scaled[val_index]).float()
        fold_train_labels = torch.tensor(train_labels.values).float()
        fold_val_labels = torch.tensor(val_labels.values).float()

        fold += 1

        num_epochs = 200
        for epoch in range(num_epochs):
            optimizer.zero_grad()
            outputs = model(fold_train_data)
            loss = criterion(outputs.squeeze(), fold_train_labels)
            loss.backward()
            optimizer.step()

        model.eval()
        with torch.no_grad():
            # Training set predictions
            train_outputs = model(fold_train_data)
            train_predicted = torch.round(train_outputs).squeeze()
            train_conf_matrix = confusion_matrix(fold_train_labels.numpy(), train_predicted.numpy())

            # Validation set predictions
            val_outputs = model(fold_val_data)
            val_predicted = torch.round(val_outputs).squeeze()
            accuracy = (val_predicted == fold_val_labels).sum().item() / len(fold_val_labels) * 100

            # Add predictions and structural damage to the dataframes
            train_data['StructuralDamage'] = fold_train_labels.numpy()
            train_data['PredictedLabel'] = train_predicted.numpy()

            val_data['StructuralDamage'] = fold_val_labels.numpy()
            val_data['PredictedLabel'] = val_predicted.numpy()

            all_predictions.extend(val_predicted.tolist())
            all_labels.extend(fold_val_labels.tolist())

        fold_accuracy = accuracy_score(fold_val_labels, val_predicted)
        fold_fscore = np.mean(precision_recall_fscore_support(fold_val_labels, val_predicted, average='weighted')[2])
        fold_conf_matrix = confusion_matrix(fold_val_labels, val_predicted)
        false_negatives = ((torch.round(val_outputs).squeeze() < 0.5) & (fold_val_labels == 1)).sum().item()

    # After training all folds, evaluate the test set
    test_data = pd.read_csv('data/dataTestR85-15.csv')  
    test_data = test_data[test_data["StructuralDamage"] != "-"]
    test_data['StructuralDamage'].replace(class2idx, inplace=True)
    X_test = test_data.iloc[:, 1:-1]
    y_test = test_data.iloc[:, -1].astype('int')

    X_test_scaled = scaler.transform(X_test)
    test_data_tensor = torch.tensor(X_test_scaled).float()
    test_labels_tensor = torch.tensor(y_test.values).float()

    model.eval()
    with torch.no_grad():
        test_outputs = model(test_data_tensor)
        test_predictions = torch.round(test_outputs).squeeze()
        test_accuracy = (test_predictions == test_labels_tensor).sum().item() / len(test_labels_tensor) * 100
        test_conf_matrix = confusion_matrix(y_test, test_predictions.numpy())
        test_precision, test_recall, test_fscore, _ = precision_recall_fscore_support(y_test, test_predictions.numpy(), average='weighted')
        test_false_negatives = ((torch.round(test_outputs).squeeze() < 0.5) & (test_labels_tensor == 1)).sum().item()

    print(f"Test Accuracy: {test_accuracy}%")
    print(f"Test F-score: {test_fscore}")
    print(f"Test False Negatives: {test_false_negatives}")

    f_scores.append(test_fscore)

    if test_false_negatives < best_fn_count:
        best_fn_count = test_false_negatives
        best_test_conf_matrix = test_conf_matrix
        best_train_conf_matrix = train_conf_matrix
        best_train_data = train_data
        best_val_data = val_data
        best_model_state = model.state_dict()

# Save the best model with the smallest false negatives
if best_model_state is not None:
    torch.save(best_model_state, 'best_model_with_smallest_fn.pth')
    print("Best model with smallest false negatives saved as 'best_model_with_smallest_fn.pth'")

print(f"Best Test F-score: {best_test_fscore}")

# Plot F-scores
plt.figure(figsize=(10, 6))
plt.plot(range(1, num_runs + 1), f_scores, marker='o', linestyle='-', color='b')
plt.title("F-scores for Runs")
plt.xlabel("Run Number")
plt.ylabel("F-score")
plt.grid(True)
plt.show()

# Save the best test confusion matrix if available
if best_test_conf_matrix is not None:
    plt.figure(figsize=(8, 6))
    class_names = ['Non-Severe', 'Severe']
    sns.heatmap(best_test_conf_matrix, annot=True, fmt="d", cmap="Blues", xticklabels=class_names, yticklabels=class_names)
    plt.title("Best Test Confusion Matrix")
    plt.xlabel("Predicted")
    plt.ylabel("True")
    plt.show()

    np.savetxt('best_test_confusion_matrix.csv', best_test_conf_matrix, delimiter=",")
    print("Best test confusion matrix saved as 'best_test_confusion_matrix.csv'.")

# Save the best training confusion matrix if available
if best_train_conf_matrix is not None:
    plt.figure(figsize=(8, 6))
    class_names = ['Non-Severe', 'Severe']
    sns.heatmap(best_train_conf_matrix, annot=True, fmt="d", cmap="Blues", xticklabels=class_names, yticklabels=class_names)
    plt.title("Best Training Confusion Matrix")
    plt.xlabel("Predicted")
    plt.ylabel("True")
    plt.show()

    np.savetxt('best_train_confusion_matrix.csv', best_train_conf_matrix, delimiter=",")
    print("Best training confusion matrix saved as 'best_train_confusion_matrix.csv'.")

# Save the best training and validation data with labels and structural damage fields
if best_train_data is not None:
    best_train_data.to_csv('best_train_data_with_labels.csv', index=False)
    print("Best training data with labels saved as 'best_train_data_with_labels.csv'.")

if best_val_data is not None:
    best_val_data.to_csv('best_val_data_with_labels.csv', index=False)
    print("Best validation data with labels saved as 'best_val_data_with_labels.csv'.")
