#  Tutorial for Improved Ensemble Model

## 1. Activate Local Environment

In [None]:
import sys
sys.executable

## 2. Base Classifiers Modelling

###  2.1 ML classifiers modelling

In [None]:
import os
import numpy as np
import pandas as pd
import torch
from torch.utils.data import Dataset, DataLoader
from sklearn.preprocessing import StandardScaler, LabelEncoder
from sklearn.metrics import confusion_matrix, classification_report
from sklearn.model_selection import GridSearchCV
from sklearn.neural_network import MLPClassifier
from sklearn.ensemble import RandomForestClassifier
from sklearn.svm import SVC
import joblib  # Add joblib for model serialization
import warnings
warnings.filterwarnings("ignore", category=UserWarning)
from sklearn.metrics import classification_report, accuracy_score, confusion_matrix
import torch.nn as nn
import torch.optim as optim
import seaborn as sns
import matplotlib.pyplot as plt
from sklearn.metrics import f1_score

In [54]:
torch.manual_seed(42)
np.random.seed(42)


# Data Processing
class ShadowDataset(Dataset):
    def __init__(self, features, labels):
        self.features = torch.FloatTensor(features)
        self.labels = torch.LongTensor(labels)

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):
        return self.features[idx], self.labels[idx]



def train_and_evaluate_models(train_path, val_path):
    os.makedirs(r'venv\best_parameter_MLmodel', exist_ok=True)
    os.makedirs(r'venv', exist_ok=True)
    train_df = pd.read_csv(train_path)
    val_df = pd.read_csv(val_path)
    label_encoder = LabelEncoder()
    all_targets = pd.concat([train_df['Target'], val_df['Target']]) #Target: original dependent variable
    label_encoder.fit(all_targets)
    train_df['Target_Encoded'] = label_encoder.transform(train_df['Target'])
    val_df['Target_Encoded'] = label_encoder.transform(val_df['Target'])
    joblib.dump(label_encoder, r'venv\MLmodel\label_encoder.joblib')

    # Separate features and labels
    X_train = train_df.drop(['Target', 'Target_Encoded'], axis=1).values
    y_train = train_df['Target_Encoded'].values
    X_val = val_df.drop(['Target', 'Target_Encoded'], axis=1).values
    y_val = val_df['Target_Encoded'].values

    scaler = StandardScaler()
    X_train_scaled = scaler.fit_transform(X_train)
    X_val_scaled = scaler.transform(X_val)
    joblib.dump(scaler, r'venv\MLmodel\scaler.joblib')
    print(f"Data Preprocessing Completed！")
    
    # Initialize model dictionary
    models = {
        'MLP': None,
        'RF': None,
        'SVM': None
    }


    input_size = X_train_scaled.shape[1]
    num_classes = len(np.unique(y_train))

    train_dataset = ShadowDataset(np.expand_dims(X_train_scaled, axis=1), y_train)
    val_dataset = ShadowDataset(np.expand_dims(X_val_scaled, axis=1), y_val)
    train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False)

    # MLP
    mlp_params = {
        'hidden_layer_sizes': [(50, 50), (100,), (50, 100, 50)],
        'activation': ['relu'],
        'solver': ['adam'],
        'max_iter': [1000]
    }
    mlp = MLPClassifier(random_state=42)
    mlp_grid = GridSearchCV(mlp, mlp_params, cv=5, scoring='f1_weighted')
    mlp_grid.fit(X_train_scaled, y_train)
    models['MLP'] = mlp_grid.best_estimator_

    joblib.dump(mlp_grid.best_estimator_, r'venv\MLmodel\best_mlp_model.joblib')

    # RF
    rf_params = {
        'n_estimators': [100, 200],
        'max_depth': [None, 10, 20],
        'min_samples_split': [2, 5]
    }
    rf = RandomForestClassifier(random_state=42)
    rf_grid = GridSearchCV(rf, rf_params, cv=5, scoring='f1_weighted')
    rf_grid.fit(X_train_scaled, y_train)
    models['RF'] = rf_grid.best_estimator_
    joblib.dump(rf_grid.best_estimator_, r'venv\MLmodel\best_rf_model.joblib')

    # SVM
    svm_params = {
        'C': [0.1, 1, 10],
        'kernel': ['rbf', 'linear'],
        'gamma': ['scale', 'auto']
    }
    svm = SVC(random_state=42, probability=True)
    svm_grid = GridSearchCV(svm, svm_params, cv=5, scoring='f1_weighted')
    svm_grid.fit(X_train_scaled, y_train)
    models['SVM'] = svm_grid.best_estimator_
    joblib.dump(svm_grid.best_estimator_, r'venv\MLmodel\best_svm_model.joblib')

    print("Model Evaluation Completed.")
    # Prepare a dictionary to store results
    f1_scores = {}
    confusion_matrices = {}
    f1_scores_per_class = {}

    # Evaluate All Models
    results = {}
    for name, model in models.items():
        y_pred = model.predict(X_val_scaled)
        y_true = y_val
        
        # Calculate metrics
        accuracy = accuracy_score(y_true, y_pred)
        f1 = f1_score(y_true, y_pred, average='weighted')
        report = classification_report(y_true, y_pred, output_dict=True)
        
        f1_per_class = f1_score(
        y_true, 
        y_pred, 
        average=None, 
        labels=np.unique(y_true))
        f1_scores_per_class[name] = f1_per_class

        # Store results
        results[name] = {
            'Accuracy': accuracy,
            'OA': accuracy,  # OA is same as accuracy in classification
            'F1_score': f1,
            'Classification_report': report
        }

        cm = confusion_matrix(y_true, y_pred)
        confusion_matrices[name] = cm
        cm_df = pd.DataFrame(cm)
        with open(r'venv\validation\MLmodelcm.csv', 'a') as f:
            f.write(f"\n{name} Confusion Matrix:\n")
        cm_df.to_csv(r'venv\validation\MLmodelcm.csv', mode='a', header=False)
        f1_scores[name] = classification_report(y_true, y_pred, output_dict=True)['weighted avg']['f1-score']
        
    f1_df = pd.DataFrame.from_dict(f1_scores_per_class, orient='index', columns=[f'Class_{i}' for i in range(len(f1_scores_per_class['MLP']))])
    f1_df.index.name = 'Model'
    f1_df.to_csv(r'venv\validation\F1_3MLmodel.csv')


train_and_evaluate_models(
    r'venv\dataset\firstlayertrain.csv',
    r'venv\dataset\firstlayerval.csv'
)


Model Evaluation Completed.


###  2.2 DL classifiers modelling

In [None]:
import os
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from sklearn.preprocessing import StandardScaler, LabelEncoder
from sklearn.metrics import confusion_matrix, classification_report
import seaborn as sns
import matplotlib.pyplot as plt
from sklearn.metrics import f1_score

In [None]:
class ShadowDataset(Dataset):
    def __init__(self, features, labels):
        self.features = torch.FloatTensor(features)
        self.labels = torch.LongTensor(labels)

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):
        return self.features[idx], self.labels[idx]


def train_model(train_file_path, val_file_path):
    train_df = pd.read_csv(train_file_path)
    val_df = pd.read_csv(val_file_path)

    # LabelEncoder
    label_encoder = LabelEncoder()
    train_df['Target_Encoded'] = label_encoder.fit_transform(train_df['Target'])
    val_df['Target_Encoded'] = label_encoder.transform(val_df['Target'])

    X_train = train_df.drop(['Target', 'Target_Encoded'], axis=1).values
    y_train = train_df['Target_Encoded'].values
    X_val = val_df.drop(['Target', 'Target_Encoded'], axis=1).values
    y_val = val_df['Target_Encoded'].values

    scaler = StandardScaler()
    X_train_scaled = scaler.fit_transform(X_train)
    X_val_scaled = scaler.transform(X_val)

    def reshape_for_model(data):
        return np.expand_dims(data, axis=1)

    X_train_reshaped = reshape_for_model(X_train_scaled)
    X_val_reshaped = reshape_for_model(X_val_scaled)

    train_dataset = ShadowDataset(X_train_reshaped, y_train)
    val_dataset = ShadowDataset(X_val_reshaped, y_val)

    train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=64, shuffle=False)

    input_size = X_train.shape[1]
    num_classes = len(np.unique(y_train))  

    class EnhancedAttentionCNN(nn.Module):
        def __init__(self, input_size, num_classes):
            super(EnhancedAttentionCNN, self).__init__()
            self.attention = nn.Sequential(
                nn.Linear(input_size, input_size),
                nn.ReLU(),
                nn.Linear(input_size, input_size),
                nn.Softmax(dim=1)
            )
            self.features = nn.Sequential(
                nn.Conv1d(1, 64, kernel_size=3, padding=1),
                nn.BatchNorm1d(64),
                nn.ReLU(),
                nn.MaxPool1d(2),
                nn.Conv1d(64, 128, kernel_size=3, padding=1),
                nn.BatchNorm1d(128),
                nn.ReLU(),
                nn.MaxPool1d(2),
                nn.Conv1d(128, 256, kernel_size=3, padding=1),
                nn.BatchNorm1d(256),
                nn.ReLU(),
                nn.MaxPool1d(2)
            )

            with torch.no_grad():
                test_input = torch.zeros(1, 1, input_size)
                feature_size = self._calculate_feature_size(test_input)

            self.classifier = nn.Sequential(
                nn.Linear(256 * feature_size, 512),
                nn.ReLU(),
                nn.Dropout(0.5),
                nn.Linear(512, 256),
                nn.ReLU(),
                nn.Dropout(0.5),
                nn.Linear(256, num_classes)
            )

        def _calculate_feature_size(self, x):
            return self.features(x).size(2)

        def forward(self, x):
            x_flat = x.view(x.size(0), -1)
            attention_weights = self.attention(x_flat)
            x_attended = x_flat * attention_weights

            x_conv = x_attended.view(x.size(0), 1, -1)

            features = self.features(x_conv)

            x = features.view(features.size(0), -1)

            output = self.classifier(x)

            return output, attention_weights

    model = EnhancedAttentionCNN(input_size, num_classes)

    criterion = nn.CrossEntropyLoss()
    optimizer = optim.AdamW(model.parameters(), lr=0.001, weight_decay=1e-4)
    scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, 'max', patience=5)

    train_losses = []
    val_accuracies = []
    train_accuracies = []

    num_epochs = 100  
    best_accuracy = 0

    for epoch in range(num_epochs):
        model.train()
        total_train_loss = 0
        total_train_correct = 0
        total_train_samples = 0

        for batch_features, batch_labels in train_loader:
            optimizer.zero_grad()
            outputs, _ = model(batch_features)
            loss = criterion(outputs, batch_labels)
            loss.backward()
            optimizer.step()

            total_train_loss += loss.item()
            _, predicted = torch.max(outputs.data, 1)
            total_train_samples += batch_labels.size(0)
            total_train_correct += (predicted == batch_labels).sum().item()

        model.eval()
        total_val_correct = 0
        total_val_samples = 0
        y_pred_list = []
        y_true_list = []

        with torch.no_grad():
            for val_features, val_labels in val_loader:
                outputs, _ = model(val_features)
                _, predicted = torch.max(outputs.data, 1)
                total_val_samples += val_labels.size(0)
                total_val_correct += (predicted == val_labels).sum().item()
                y_pred_list.extend(predicted.numpy())
                y_true_list.extend(val_labels.numpy())

        train_accuracy = 100 * total_train_correct / total_train_samples
        val_accuracy = 100 * total_val_correct / total_val_samples

        train_losses.append(total_train_loss / len(train_loader))
        train_accuracies.append(train_accuracy)
        val_accuracies.append(val_accuracy)

        scheduler.step(val_accuracy)

        if val_accuracy > best_accuracy:
            best_accuracy = val_accuracy

            os.makedirs('venv/MLmodel', exist_ok=True)
            torch.save(model.state_dict(), 'venv/MLmodel/Best_CNN_Model.pth')

        # print(f'Epoch {epoch + 1}/{num_epochs}')
        # print(f'Train Loss: {train_losses[-1]:.4f}, Train Accuracy: {train_accuracy:.2f}%')
        # print(f'Validation Accuracy: {val_accuracy:.2f}%')

    training_history_df = pd.DataFrame({
        'Epoch': range(1, num_epochs + 1),
        'Train_Loss': train_losses,
        'Train_Accuracy': train_accuracies,
        'Validation_Accuracy': val_accuracies
    })
    training_history_df.to_csv('venv/CNN epoch.csv', index=False)

    model.load_state_dict(torch.load('venv/MLmodel/Best_CNN_Model.pth'))

    train_attention_weights = []
    val_attention_weights = []
    train_attention_weighted_features = []
    val_attention_weighted_features = []

    with torch.no_grad():
        for train_features, train_labels in train_loader:
            _, batch_attention = model(train_features)
            train_attention_weights.extend(batch_attention.numpy())

            x_flat = train_features.view(train_features.size(0), -1)
            x_attended = x_flat * batch_attention
            train_attention_weighted_features.extend(x_attended.numpy())

        for val_features, val_labels in val_loader:
            _, batch_attention = model(val_features)
            val_attention_weights.extend(batch_attention.numpy())

            x_flat = val_features.view(val_features.size(0), -1)
            x_attended = x_flat * batch_attention
            val_attention_weighted_features.extend(x_attended.numpy())

    train_attention_df = pd.DataFrame(train_attention_weights,
                                      columns=[f'Feature_{i}' for i in range(len(train_attention_weights[0]))])
    val_attention_df = pd.DataFrame(val_attention_weights,
                                    columns=[f'Feature_{i}' for i in range(len(val_attention_weights[0]))])

    os.makedirs('venv/dataset', exist_ok=True)

    train_attention_df.to_csv('venv/dataset/weight_train.csv', index=False)
    val_attention_df.to_csv('venv/dataset/weight_val.csv', index=False)

    train_attention_weighted_df = pd.DataFrame(train_attention_weighted_features,
                                               columns=[f'Weighted_Feature_{i}' for i in
                                                        range(len(train_attention_weighted_features[0]))])
    train_attention_weighted_df['Target'] = train_df['Target']  

    val_attention_weighted_df = pd.DataFrame(val_attention_weighted_features,
                                             columns=[f'Weighted_Feature_{i}' for i in
                                                      range(len(val_attention_weighted_features[0]))])
    val_attention_weighted_df['Target'] = val_df['Target']  

    train_attention_weighted_df.to_csv('venv/dataset/weighted_train.csv', index=False)
    val_attention_weighted_df.to_csv('venv/dataset/weighted_val.csv', index=False)

    cm = confusion_matrix(y_true_list, y_pred_list)
    plt.figure(figsize=(10, 8))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues')
    plt.title('Confusion Matrix')
    plt.xlabel('Predicted')
    plt.ylabel('True')
    plt.savefig('confusion_matrix.png')
    plt.close()


    cm = confusion_matrix(y_true_list, y_pred_list)
    print("confusion matrix:")
    print(cm)

    print("classification report")
    print(classification_report(y_true_list, y_pred_list))

    per_class_f1 = f1_score(y_true_list, y_pred_list, average=None)
    f1_df = pd.DataFrame(per_class_f1, columns=["CNN"])
    f1_df.to_csv('venv/validation/CNN_F1_scores.csv', index=False)
    print("F1 saved at: venv/validation/CNN_F1_scores.csv")
    return best_accuracy,model


accuracy, trained_model = train_model(r'venv\dataset\firstlayertrain.csv', r'venv\dataset\firstlayerval.csv')

### 2.3 Dictionary generation 

In [None]:
cnn_path = 'venv/validation/CNN_F1_scores.csv'
mlmodels_path = 'venv/validation/F1_3MLmodel.csv'
output_path = 'venv/validation/F1_dictionary.csv'

with open(cnn_path, 'r') as f:
    cnn_values = [float(line.strip()) for line in f.readlines()[1:]]

mlmodels_df = pd.read_csv(mlmodels_path, header=None, skiprows=1)  
mlmodels_df.columns = ['Model'] + [f'Category_{i}' for i in range(1, 9)]

data = {
    'MLP': mlmodels_df[mlmodels_df['Model'] == 'MLP'].iloc[0, 1:].tolist(),
    'RF': mlmodels_df[mlmodels_df['Model'] == 'RF'].iloc[0, 1:].tolist(),
    'SVM': mlmodels_df[mlmodels_df['Model'] == 'SVM'].iloc[0, 1:].tolist(),
    'CNN': cnn_values
}

result_df = pd.DataFrame(data)

result_df.to_csv(output_path, header=True, index=False)

print(f"weighted dictionary saved at: {output_path}")

### 2.4 Weighted features generation

In [None]:
# import os
# import numpy as np
# import pandas as pd
# import torch
# import joblib
# import torch.nn as nn


class EnhancedAttentionCNN(nn.Module):
    def __init__(self, input_size, num_classes):
        super(EnhancedAttentionCNN, self).__init__()
        self.attention = nn.Sequential(
            nn.Linear(input_size, input_size),
            nn.ReLU(),
            nn.Linear(input_size, input_size),
            nn.Softmax(dim=1)
        )

        self.features = nn.Sequential(
            nn.Conv1d(1, 64, kernel_size=3, padding=1),
            nn.BatchNorm1d(64),
            nn.ReLU(),
            nn.MaxPool1d(2),
            nn.Conv1d(64, 128, kernel_size=3, padding=1),
            nn.BatchNorm1d(128),
            nn.ReLU(),
            nn.MaxPool1d(2),
            nn.Conv1d(128, 256, kernel_size=3, padding=1),
            nn.BatchNorm1d(256),
            nn.ReLU(),
            nn.MaxPool1d(2)
        )

        with torch.no_grad():
            test_input = torch.zeros(1, 1, input_size)
            feature_size = self._calculate_feature_size(test_input)

        self.classifier = nn.Sequential(
            nn.Linear(256 * feature_size, 512),
            nn.ReLU(),
            nn.Dropout(0.5),
            nn.Linear(512, 256),
            nn.ReLU(),
            nn.Dropout(0.5),
            nn.Linear(256, num_classes)
        )

    def _calculate_feature_size(self, x):
        return self.features(x).size(2)

    def forward(self, x):
        x_flat = x.view(x.size(0), -1)
        attention_weights = self.attention(x_flat)
        x_attended = x_flat * attention_weights
        x_conv = x_attended.view(x.size(0), 1, -1)

        features = self.features(x_conv)

        x = features.view(features.size(0), -1)

        output = self.classifier(x)

        return output, attention_weights, x_attended


def process_models_and_probabilities(train_path):
    torch.manual_seed(42)
    np.random.seed(42)
    label_encoder = joblib.load(r'venv\MLmodel\label_encoder.joblib')
    scaler = joblib.load(r'venv\MLmodel\scaler.joblib')
    train_df = pd.read_csv(train_path)
    train_df['Target_Encoded'] = label_encoder.transform(train_df['Target'])
    
    X_train = train_df.drop(['Target', 'Target_Encoded'], axis=1).values
    y_train = train_df['Target_Encoded'].values

    input_size = X_train.shape[1]
    num_classes = len(label_encoder.classes_)

    # Load ML model
    mlp_model = joblib.load(r'venv\MLmodel\best_mlp_model.joblib')
    rf_model = joblib.load(r'venv\MLmodel\best_rf_model.joblib')
    svm_model = joblib.load(r'venv\MLmodel\best_svm_model.joblib')

    # load 1DCNN model
    cnn_model = EnhancedAttentionCNN(input_size, num_classes)
    cnn_model.load_state_dict(torch.load(r'venv\MLmodel\Best_CNN_Model.pth'))
    cnn_model.eval()

    X_train_scaled = scaler.transform(X_train)

    # Load weighted dictionary
    f1_dict_path = r'venv\validation\F1_dictionary.csv'
    f1_dict_df = pd.read_csv(f1_dict_path)

    # MLP
    mlp_proba = mlp_model.predict_proba(X_train_scaled)

    # RF
    rf_proba = rf_model.predict_proba(X_train_scaled)

    # SVM
    svm_proba = svm_model.predict_proba(X_train_scaled)


    # 1DCNN
    X_train_tensor = torch.FloatTensor(np.expand_dims(X_train_scaled, axis=1))
    with torch.no_grad():
        cnn_outputs, _, cnn_attended_features = cnn_model(X_train_tensor)
        cnn_proba = torch.softmax(cnn_outputs, dim=1).numpy()

    cnn_pred_labels = torch.argmax(cnn_outputs, dim=1).numpy()

    weighted_probas = []
    for i in range(len(X_train_scaled)):
        mlp_pred = np.argmax(mlp_proba[i])
        rf_pred = np.argmax(rf_proba[i])
        svm_pred = np.argmax(svm_proba[i])
        cnn_pred = np.argmax(cnn_proba[i])
        f1_mlp = f1_dict_df.loc[mlp_pred, 'MLP']
        f1_rf = f1_dict_df.loc[rf_pred, 'RF']
        f1_svm = f1_dict_df.loc[svm_pred, 'SVM']
        f1_cnn = f1_dict_df.loc[cnn_pred, 'CNN']


        # 加权概率计算
        weighted_proba = (
                mlp_proba[i] * f1_mlp +
                rf_proba[i] * f1_rf +
                svm_proba[i] * f1_svm +
                cnn_proba[i] * f1_cnn
        )

        weighted_probas.append(weighted_proba)

    weighted_probas = np.array(weighted_probas)

    output_df = pd.DataFrame(weighted_probas, columns=[f'Proba_{cls}' for cls in label_encoder.classes_])
    output_df.insert(0, 'Target', train_df['Target'])

    attended_features_df = pd.DataFrame(cnn_attended_features.numpy())
    attended_features_df.insert(0, 'Target', train_df['Target'])

    final_df = pd.concat([output_df, attended_features_df.iloc[:, 1:].add_prefix('AttendedFeature_')], axis=1)


    # save results
    output_path = r'venv\dataset\firstlayeroutput_weighted_probabilities_train.csv'
    final_df.to_csv(output_path, index=False)

    print(f"Weighted probabilities and attention features have been saved to {output_path}")

process_models_and_probabilities(
    r'venv\dataset\firstlayertrain.csv'
)

### 2.5 Validation dataset generation

In [None]:
class EnhancedAttentionCNN(nn.Module):
    def __init__(self, input_size, num_classes):
        super(EnhancedAttentionCNN, self).__init__()

        self.attention = nn.Sequential(
            nn.Linear(input_size, input_size),
            nn.ReLU(),
            nn.Linear(input_size, input_size),
            nn.Softmax(dim=1)
        )

        self.features = nn.Sequential(
            nn.Conv1d(1, 64, kernel_size=3, padding=1),
            nn.BatchNorm1d(64),
            nn.ReLU(),
            nn.MaxPool1d(2),
            nn.Conv1d(64, 128, kernel_size=3, padding=1),
            nn.BatchNorm1d(128),
            nn.ReLU(),
            nn.MaxPool1d(2),
            nn.Conv1d(128, 256, kernel_size=3, padding=1),
            nn.BatchNorm1d(256),
            nn.ReLU(),
            nn.MaxPool1d(2)
        )

        with torch.no_grad():
            test_input = torch.zeros(1, 1, input_size)
            feature_size = self._calculate_feature_size(test_input)

        self.classifier = nn.Sequential(
            nn.Linear(256 * feature_size, 512),
            nn.ReLU(),
            nn.Dropout(0.5),
            nn.Linear(512, 256),
            nn.ReLU(),
            nn.Dropout(0.5),
            nn.Linear(256, num_classes)
        )

    def _calculate_feature_size(self, x):
        return self.features(x).size(2)

    def forward(self, x):
        x_flat = x.view(x.size(0), -1)
        attention_weights = self.attention(x_flat)
        x_attended = x_flat * attention_weights
        x_conv = x_attended.view(x.size(0), 1, -1)
        features = self.features(x_conv)
        x = features.view(features.size(0), -1)
        output = self.classifier(x)
        return output, attention_weights, x_attended


def process_models_and_probabilities(data_path):
    torch.manual_seed(42)
    np.random.seed(42)

    label_encoder = joblib.load(r'venv\MLmodel\label_encoder.joblib')
    scaler = joblib.load(r'venv\MLmodel\scaler.joblib')

    data_df = pd.read_csv(data_path)

    data_df['Target_Encoded'] = label_encoder.transform(data_df['Target'])

    X = data_df.drop(['Target', 'Target_Encoded'], axis=1).values

    input_size = X.shape[1]
    num_classes = len(label_encoder.classes_)

    # Load ML models
    mlp_model = joblib.load(r'venv\MLmodel\best_mlp_model.joblib')
    rf_model = joblib.load(r'venv\MLmodel\best_rf_model.joblib')
    svm_model = joblib.load(r'venv\MLmodel\best_svm_model.joblib')

    # Load 1DCNN model
    cnn_model = EnhancedAttentionCNN(input_size, num_classes)
    cnn_model.load_state_dict(torch.load(r'venv\MLmodel\Best_CNN_Model.pth'))
    cnn_model.eval()

    X_scaled = scaler.transform(X)

    # Load weighted dictionary
    f1_dict_path = r'venv\validation\F1_dictionary.csv'
    f1_dict_df = pd.read_csv(f1_dict_path)


    # Retrieve the class probabilities of each model
    # MLP
    mlp_proba = mlp_model.predict_proba(X_scaled)

    # RF
    rf_proba = rf_model.predict_proba(X_scaled)

    # SVM
    svm_proba = svm_model.predict_proba(X_scaled)

    # CNN
    X_tensor = torch.FloatTensor(np.expand_dims(X_scaled, axis=1))
    with torch.no_grad():
        cnn_outputs, _, cnn_attended_features = cnn_model(X_tensor)
        cnn_proba = torch.softmax(cnn_outputs, dim=1).numpy()

    cnn_pred_labels = torch.argmax(cnn_outputs, dim=1).numpy()

    weighted_probas = []
    for i in range(len(X_scaled)):
        # Obtain the predicted classes of each model
        mlp_pred = np.argmax(mlp_proba[i])
        rf_pred = np.argmax(rf_proba[i])
        svm_pred = np.argmax(svm_proba[i])
        cnn_pred = np.argmax(cnn_proba[i])

        # Obtain the F1 score of each model for its own predicted class
        f1_mlp = f1_dict_df.loc[mlp_pred, 'MLP']
        f1_rf = f1_dict_df.loc[rf_pred, 'RF']
        f1_svm = f1_dict_df.loc[svm_pred, 'SVM']
        f1_cnn = f1_dict_df.loc[cnn_pred, 'CNN']

        # Weighted probability calculation
        weighted_proba = (
                mlp_proba[i] * f1_mlp +
                rf_proba[i] * f1_rf +
                svm_proba[i] * f1_svm +
                cnn_proba[i] * f1_cnn
        )


        weighted_probas.append(weighted_proba)

    weighted_probas = np.array(weighted_probas)

    output_df = pd.DataFrame(weighted_probas, columns=[f'Proba_{cls}' for cls in label_encoder.classes_])
    output_df.insert(0, 'Target', data_df['Target'])


    attended_features_df = pd.DataFrame(cnn_attended_features.numpy())
    attended_features_df.insert(0, 'Target', data_df['Target'])

    final_df = pd.concat([output_df, attended_features_df.iloc[:, 1:].add_prefix('AttendedFeature_')], axis=1)

    # Save outputs
    output_path = r'venv\dataset\firstlayeroutput_weighted_probabilities_val.csv'
    final_df.to_csv(output_path, index=False)

    print(f"Weighted probabilities and attention features have been saved to {output_path}")

process_models_and_probabilities(
    r'venv\dataset\firstlayerval.csv'
)

## 3. Model Ensemble and Result Output

In [None]:
import pandas as pd
import numpy as np
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import classification_report, accuracy_score, confusion_matrix
from sklearn.ensemble import GradientBoostingClassifier
import matplotlib.pyplot as plt
import seaborn as sns
import os

In [None]:
def load_weighted_data(weighted_train_path, weighted_val_path):
    weighted_train = pd.read_csv(weighted_train_path)
    weighted_val = pd.read_csv(weighted_val_path)
    return weighted_train, weighted_val

def preprocess_data(train, val):
    # Separate features and labels
    X_train = train.drop(columns=['Target'])
    y_train = train['Target']
    X_val = val.drop(columns=['Target'])
    y_val = val['Target']
    return X_train, y_train, X_val, y_val

def perform_grid_search(X_train, y_train):
    # Define the parameter grid
    param_grid = {
        'learning_rate': [0.05, 0.1, 0.2],
        'n_estimators': [100, 200, 300],
        'max_depth': [3, 5, 7],
        'min_samples_split': [20, 30, 40],
        'min_samples_leaf': [10, 20, 30],
        'subsample': [0.7, 0.8, 0.9]
    }
    
    # Create base model
    gb_model = GradientBoostingClassifier(random_state=42)
    
    # Instantiate the grid search model
    grid_search = GridSearchCV(
        estimator=gb_model,
        param_grid=param_grid,
        cv=5,
        n_jobs=-1,
        verbose=2,
        scoring='accuracy'
    )
    
    # Fit the grid search to the data
    grid_search.fit(X_train, y_train)
    
    return grid_search

def evaluate_model(model, X_val, y_val):
    # Predict
    y_pred = model.predict(X_val)
    
    # Calculate metrics
    accuracy = accuracy_score(y_val, y_pred)
    f1 = f1_score(y_val, y_pred, average='weighted')
    
    print("\nModel Performance:")
    print(f"Overall Accuracy (OA): {accuracy:.4f}")
    print(f"Weighted F1 Score: {f1:.4f}")
    
    # Detailed classification report
    print("\nClassification Report:")
    print(classification_report(y_val, y_pred))
    
    # Generate confusion matrix
    cm = confusion_matrix(y_val, y_pred)
    
    # Ensure validation directory exists
    os.makedirs(r'venv\validation', exist_ok=True)
    
    # Save confusion matrix as CSV
    unique_labels = sorted(np.unique(y_val))
    cm_df = pd.DataFrame(cm,
                         index=[f'True_{label}' for label in unique_labels],
                         columns=[f'Pred_{label}' for label in unique_labels])
    cm_df.to_csv(r'venv\validation\GBDT_weighted_confusion_matrix.csv')
    
    # Plot and save confusion matrix heatmap
    plt.figure(figsize=(8, 6))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues',
                xticklabels=unique_labels,
                yticklabels=unique_labels)
    plt.title('GBDT Confusion Matrix')
    plt.xlabel('Predicted Label')
    plt.ylabel('True Label')
    plt.savefig(r'venv\validation\GBDT_weighted_cm.png')
    plt.close()
    
    return accuracy, f1

def save_best_model(model, model_path):
    # Ensure directory exists
    os.makedirs(os.path.dirname(model_path), exist_ok=True)
    # Save the model
    joblib.dump(model, model_path)
    print(f"\nBest model saved to {model_path}")

def main():
    # Dataset paths
    weighted_train_path = r'venv\dataset\firstlayeroutput_weighted_probabilities_train.csv'
    weighted_val_path = r'venv\dataset\firstlayeroutput_weighted_probabilities_val.csv'
    
    # Load weighted datasets
    weighted_train, weighted_val = load_weighted_data(
        weighted_train_path, weighted_val_path
    )
    
    # Preprocess data
    X_train_weighted, y_train_weighted, X_val_weighted, y_val_weighted = preprocess_data(
        weighted_train, weighted_val
    )
    
    # Perform grid search to find best parameters
    print("Starting grid search...")
    grid_search = perform_grid_search(X_train_weighted, y_train_weighted)
    
    # Get the best model
    best_model = grid_search.best_estimator_
    
    # Print best parameters
    print("\nBest Parameters Found:")
    print(grid_search.best_params_)
    
    # Evaluate the best model
    accuracy, f1 = evaluate_model(best_model, X_val_weighted, y_val_weighted)
    
    # Save the best model
    model_path = r'venv\models\best_gbdt_model.pkl'
    save_best_model(best_model, model_path)
    
    # Return metrics
    return {
        'OA': accuracy,
        'Accuracy': accuracy,  # OA and Accuracy are the same in this context
        'F1': f1
    }

if __name__ == "__main__":
    results = main()
    print("\nFinal Evaluation Metrics:")
    print(f"Overall Accuracy (OA): {results['OA']:.4f}")
    print(f"Accuracy: {results['Accuracy']:.4f}")
    print(f"Weighted F1 Score: {results['F1']:.4f}")

