In [None]:
import pandas as pd

import random
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import confusion_matrix, accuracy_score
from torch.utils.data import DataLoader, random_split
from torch.utils.data import Dataset
from sklearn.model_selection import train_test_split
import math

torch.manual_seed(42)
random.seed(42)

window_size = 16
batch_size = 64
input_dim = 30
num_heads = 2
d_model = 64
num_encoders = 1
num_decoders = 1
lr=0.001

exp_name = f"random_{window_size}_{batch_size}_{input_dim}_{num_heads}_{num_encoders}_{num_decoders}_{d_model}"
a1_attack_values_normalized_df = pd.read_csv("../datasets/a1_attack_values_normalized_df.csv")
a6_attack_values_normalized_df = pd.read_csv("../datasets/a6_attack_values_normalized_df.csv")
checkpoint_dir = ""

# Model Definition

In [None]:
# Define Transformer-based Feature Extractor
class TransformerFeatureExtractor(nn.Module):
    def __init__(self, input_dim, num_heads=4, num_enc_layers=2, num_dec_layers=2, d_model=128):
        super().__init__()
        self.embedding = nn.Linear(input_dim, d_model)
        self.positional_encoding = nn.Parameter(self.create_sinusoidal_encoding(16, d_model))
        encoder_layer = nn.TransformerEncoderLayer(d_model=d_model, nhead=num_heads, batch_first=True)
        decoder_layer = nn.TransformerDecoderLayer(d_model=d_model, nhead=num_heads, batch_first=True)
        self.encoder = nn.TransformerEncoder(encoder_layer, num_layers=num_enc_layers)
        self.decoder = nn.TransformerDecoder(decoder_layer, num_layers=num_dec_layers)

    def create_sinusoidal_encoding(self, seq_length, hidden_dim):
        position = torch.arange(seq_length).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, hidden_dim, 2) * -(math.log(10000.0) / hidden_dim))
        pe = torch.zeros(seq_length, hidden_dim)
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        return pe.unsqueeze(0)  # Shape: (1, seq_length, hidden_dim)


    def forward(self, x):
        x = self.embedding(x) + self.positional_encoding
        encoded = self.encoder(x)
        decoded = self.decoder(encoded, encoded)
        return decoded[:, -1, :]

# Define the Classifier
class Classifier(nn.Module):
    def __init__(self, feature_dim, num_classes=2):
        super().__init__()
        self.fc = nn.Sequential(
            nn.Linear(feature_dim, 64),
            nn.ReLU(),
            nn.Linear(64, num_classes)
        )

    def forward(self, x):
        return self.fc(x)

# Full Model Combining Feature Extractor & Classifier
class TransformerClassifier(nn.Module):
    def __init__(self, input_dim, num_heads=4, num_enc_layers=2, num_dec_layers=2, num_classes=2, d_model=128):
        super().__init__()
        self.feature_extractor = TransformerFeatureExtractor(input_dim, num_heads, num_enc_layers, num_dec_layers, d_model)
        self.classifier = Classifier(d_model, num_classes)

    def forward(self, x):
        features = self.feature_extractor(x)
        return self.classifier(features)
class TimeSeriesDataset(Dataset):
    def __init__(self, df, window_size, rolling_window = 1):
        """
        Args:
            df (pd.DataFrame): The input DataFrame.
            window_size (int): The size of the window.
            sensor_columns (list): List of column names for sensor readings.
            target_column (str): Name of the target column.
        """
        self.df = df
        self.window_size = window_size
        self.rolling_window = rolling_window
        self.sensor_columns = ["FIT101", "LIT101", "MV101", "P101", "AIT201", "AIT202", "AIT203", "FIT201", "MV201", "P203", "DPIT301", "FIT301", "LIT301", "MV301", "MV302", "MV303", "MV304", "AIT402", "FIT401", "LIT401", "AIT501", "AIT502", "AIT503", "AIT504", "FIT501", "FIT502", "FIT503", "PIT501", "PIT502", "PIT503"]
        self.target_column = "Target"
        self.data, self.labels = self._create_windows()

    def _create_windows(self):
        """
        Create windows of size `window_size` from the DataFrame.
        Each window contains consecutive samples of the same class.
        """
        data = []
        labels = []
        class_groups = self.df.groupby(self.target_column)

        for _, group in class_groups:
            group = group.sort_index()  # Ensure data is in order
            group_values = group[self.sensor_columns].values
            group_length = len(group)
            if self.rolling_window == 0:
                for i in range(np.int16(group_length / self.window_size)):
                    window = group_values[i * self.window_size:i * self.window_size + self.window_size]
                    data.append(window)
                    labels.append(group[self.target_column].iloc[i])  # Use the class of the first sample in the window

            else:
                # Create windows for this group
                for i in range(group_length - self.window_size + 1):
                    window = group_values[i:i + self.window_size]
                    data.append(window)
                    labels.append(group[self.target_column].iloc[i])  # Use the class of the first sample in the window

        return np.array(data), np.array(labels)

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        """
        Returns a window of data and its corresponding label.
        """
        window = self.data[idx]
        label = self.labels[idx]
        return torch.tensor(window, dtype=torch.float32), torch.tensor(label, dtype=torch.long)

# Prepare and load datasets

In [None]:
import numpy as np
import pandas as pd

import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split

def prepare_time_series_data(df, window_size=16):
    # Splitting dataframe based on 'Target' class
    class_0 = df[df["Target"] == 0]
    class_1 = df[df["Target"] == 1]

    def create_windows(df, window_size):
        """Creates non-overlapping windows from a dataframe."""
        num_windows = len(df) // window_size
        windows = [df.iloc[i * window_size: (i + 1) * window_size] for i in range(num_windows)]
        return windows

    class_0_windows = create_windows(class_0, window_size)
    class_1_windows = create_windows(class_1, window_size)


    def split_and_shuffle(windows):
        """Splits and shuffles the windows into train, validation, and test sets."""
        train, val = train_test_split(windows, test_size=0.15, random_state=42)
        train, test = train_test_split(train, test_size=0.13, random_state=42)

        return train, val, test

    train_0, val_0, test_0 = split_and_shuffle(class_0_windows)
    train_1, val_1, test_1 = split_and_shuffle(class_1_windows)

    # Combine the train, val, and test sets for both classes
    train_windows = train_0 + train_1
    val_windows = val_0 + val_1
    test_windows = test_0 + test_1

    # Shuffle the combined sets
    np.random.shuffle(train_windows)
    np.random.shuffle(val_windows)
    np.random.shuffle(test_windows)

    # Convert windows back to DataFrame and sort by index
    def reconstruct_sorted_df(windows):
        df = pd.concat(windows).sort_index()
        return df

    train_df = reconstruct_sorted_df(train_windows)
    val_df = reconstruct_sorted_df(val_windows)
    test_df = reconstruct_sorted_df(test_windows)
    return train_df, val_df, test_df


def load_data():
    source_df = a1_attack_values_normalized_df.copy()
    target_df = a6_attack_values_normalized_df.copy()

    train_df, val_df, test_df = prepare_time_series_data(source_df, window_size=window_size)
    target_train_df, target_val_df, target_test_df = prepare_time_series_data(target_df, window_size=window_size)

    train_dataset, val_dataset, test_dataset = TimeSeriesDataset(train_df, window_size), TimeSeriesDataset(val_df, window_size), TimeSeriesDataset(test_df, window_size)
    target_train_dataset, target_val_dataset, target_test_dataset = TimeSeriesDataset(target_train_df, window_size), TimeSeriesDataset(target_val_df, window_size), TimeSeriesDataset(target_test_df, window_size, 0)

    return train_dataset, val_dataset, test_dataset, target_train_dataset, target_val_dataset, target_test_dataset

def count_each_class_instances(dataloader_object):
    class_count = {}
    for _, labels in dataloader_object:
        labels = np.array(labels)
        for label in labels:
            if label in class_count.keys():
                class_count[label] += 1
            else:
                class_count[label] = 0;
    return class_count


# Data Loaders
train_dataset, val_dataset, test_dataset, target_train_dataset, target_val_dataset, target_test_dataset = load_data()
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

target_train_loader = DataLoader(target_train_dataset, batch_size=batch_size, shuffle=True)
target_val_loader = DataLoader(target_val_dataset, batch_size=batch_size, shuffle=False)
target_test_loader = DataLoader(target_test_dataset, batch_size=batch_size, shuffle=False)

train_count = count_each_class_instances(train_loader)
val_count = count_each_class_instances(val_loader)
test_count = count_each_class_instances(test_loader)

target_train_count = count_each_class_instances(target_train_loader)
target_val_count = count_each_class_instances(target_val_loader)
target_test_count = count_each_class_instances(target_test_loader)

print(f"Train Dataset Count: {train_count}, Validation Data count: {val_count}, Test Count: {test_count}")
print(f"Target Train: {target_train_count}, Target Validation: {target_val_count}, Target test Count: {target_test_count}")

# Check for previous model checkpoints and resume training

In [None]:
# Train Feature Extractor
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

num_classes = 2

model = TransformerClassifier(input_dim, num_heads, num_encoders, num_decoders, num_classes, d_model)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=lr)
loaded_epoch = 0
best_accuracy = 0
import os
import shutil

if not os.path.exists(os.path.join(checkpoint_dir, exp_name, "source")):
    check_dir = os.path.join(checkpoint_dir, exp_name, "source")
    os.makedirs(check_dir)
    os.makedirs(os.path.join(checkpoint_dir, exp_name, "target"))
    print(f"no checkpoint for model: {exp_name}, make a new one at {check_dir}")
    best_step = 0
else:
    if not os.path.exists(os.path.join(checkpoint_dir, exp_name,"source",'model_best.pth.tar')):
        best_step = 0
        print(os.path.join(checkpoint_dir, exp_name,"source",'model_best.pth.tar'))
        print("Couldn't find the model, creating new one.")
    else:
        print(f"Found a checkpoint, loading checkpoint from {check_dir}")
        best_checkpoint = torch.load(os.path.join(checkpoint_dir, exp_name," source",'model_best.pth.tar'))
        print('Best model pack loaded')
        loaded_epoch = best_checkpoint['iteration']
        model.feature_extractor.load_state_dict(best_checkpoint['feature_extractor'])
        model.classifier.load_state_dict(best_checkpoint['classifier'])
        best_accuracy = best_checkpoint['test_acc']
        optimizer.load_state_dict(best_checkpoint['optimizer'])
        print(f"current best test accuracy is: {best_accuracy}, at step: {loaded_epoch}")


def save_checkpoint(state, is_best, exp_name):
    """
    save the checkpoint during training stage
    :param state: content to be saved
    :param is_best: if DPGN model's performance is the best at current step
    :param exp_name: experiment name
    :return: None
    """
    torch.save(state, os.path.join('{}'.format(exp_name), 'checkpoint.pth.tar'))
    if is_best:
        shutil.copyfile(os.path.join('{}'.format(exp_name), 'checkpoint.pth.tar'),
                        os.path.join('{}'.format(exp_name), 'model_best.pth.tar'))



In [None]:
def train_model(model, train_loader, val_loader, epochs=10, freeze_feature_extractor=False, dataset_part = "source"):
    global best_accuracy
    global best_target_accuracy
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.to(device)

    if freeze_feature_extractor:
        for param in model.feature_extractor.parameters():
            param.requires_grad = False
    else:
        for param in model.feature_extractor.parameters():
            param.requires_grad = True

    train_losses, val_losses, train_accuracies ,val_accuracies = [], [], [], []

    for epoch in range(epochs):
        model.train()
        total_loss, correct, total = 0, 0, 0
        for batch in train_loader:
            inputs, labels = batch
            inputs, labels = inputs.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            total_loss += loss.item()
            _, predicted = torch.max(outputs, 1)
            correct += (predicted == labels).sum().item()
            total += labels.size(0)
        train_losses.append(total_loss / len(train_loader))
        train_acc = correct / total
        train_accuracies.append(train_acc)

        model.eval()
        val_loss, correct, total = 0, 0, 0
        with torch.no_grad():
            for batch in val_loader:
                inputs, labels = batch
                inputs, labels = inputs.to(device), labels.to(device)
                outputs = model(inputs)
                val_loss += criterion(outputs, labels).item()
                _, predicted = torch.max(outputs, 1)
                correct += (predicted == labels).sum().item()
                total += labels.size(0)
        val_losses.append(val_loss / len(val_loader))
        val_acc = correct / total
        val_accuracies.append(val_acc)

        print(f"Epoch {epoch+1}/{epochs} - Train Loss: {train_losses[-1]:.4f}, Val Loss: {val_losses[-1]:.4f}, Accuracy: {val_acc:.4f}")

        if val_acc > best_accuracy:
            best_accuracy = val_acc
            print(f"New Best Accuracy Found: {best_accuracy*100:.4f}, At epoch:{epoch + loaded_epoch + 1}. Saving.")
            save_checkpoint({
                'iteration': epoch + loaded_epoch + 1,
                'feature_extractor': model.feature_extractor.state_dict(),
                'classifier': model.classifier.state_dict(),
                'test_acc': best_accuracy,
                'optimizer': optimizer.state_dict(),
            }, True, os.path.join(checkpoint_dir, exp_name, dataset_part))

    return train_losses, val_losses, train_accuracies ,val_accuracies


In [None]:
import matplotlib.pyplot as plt
import seaborn as sns
import numpy as np

# Custom "sciency" look via manual rcParams
plt.rcParams.update({
    'font.size': 12,
    'axes.titlesize': 14,
    'axes.labelsize': 12,
    'axes.edgecolor': 'black',
    'axes.linewidth': 1.2,
    'xtick.labelsize': 11,
    'ytick.labelsize': 11,
    'legend.fontsize': 11,
    'grid.color': 'gray',
    'grid.linestyle': '--',
    'grid.alpha': 0.4,
    'lines.linewidth': 2.5,
})

# Custom high-contrast color palette
custom_colors = ['#009E73', '#0072B2']


def plot_results(train_losses, val_losses, train_accuracies ,val_accuracies):
    plt.figure(figsize=(12, 5))

    # Loss plot
    plt.subplot(1, 2, 1)
    plt.plot(train_losses, label='Train Loss', color=custom_colors[0])
    plt.plot(val_losses, label='Validation Loss', color=custom_colors[1])
    plt.title('Loss Over Epochs', weight='bold')
    plt.xlabel('Epochs')
    plt.ylabel('Loss')
    plt.legend(frameon=False)
    plt.grid(True)

    # Accuracy plot
    plt.subplot(1, 2, 2)
    plt.plot(train_accuracies, label='Train Accuracy', color=custom_colors[0])
    plt.plot(val_accuracies, label='Validation Accuracy', color=custom_colors[1])
    plt.title('Accuracy Over Epochs', weight='bold')
    plt.xlabel('Epochs')
    plt.ylabel('Accuracy')
    plt.legend(frameon=False)
    plt.grid(True)

    plt.tight_layout()
    plt.savefig("fig.png")
    plt.show()

# Confusion Matrix
def plot_confusion_matrix(model, data_loader):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.to(device)
    model.eval()
    all_preds, all_labels = [], []
    with torch.no_grad():
        for inputs, labels in data_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            _, predicted = torch.max(outputs, 1)
            all_preds.extend(predicted.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())

    cm = confusion_matrix(all_labels, all_preds)
    plt.figure(figsize=(6, 5))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues')
    plt.xlabel('Predicted')
    plt.ylabel('Actual')
    plt.title('Confusion Matrix')
    plt.show()
    p = cm
    f1_fault = p[1,1] / (p[1,1] + (0.5 * (p[0,1] + p[1,0]))) * 100
    f1_health = p[0,0] / (p[0,0] + (0.5 * (p[0,1] + p[1,0]))) * 100
    accuracy = (p[0,0] + p[1,1]) / (p[0,0] + p[1,1] + p[0,1] + p[1,0]) * 100
    print(f"Accuracy: {accuracy:.4f}, Healthy F1 Score: {f1_health:.4f}, Faulty F1 Score: {f1_fault:.4f}")
    print(cm)
    return accuracy, f1_fault

# Pre-train the model on the A1 Dataset

In [None]:

train_losses, val_losses, train_accuracies ,val_accuracies = train_model(model, train_loader, val_loader, epochs=100, freeze_feature_extractor=False, dataset_part = "source")
plot_results(train_losses, val_losses, train_accuracies, val_accuracies)

## Get the best pretraining model with the highest performance on the source validation data

In [None]:
print(f"Found a checkpoint, loading checkpoint from")

best_checkpoint = torch.load("model_best.pth.tar")
print('Best model pack loaded')
loaded_epoch = best_checkpoint['iteration']
model.feature_extractor.load_state_dict(best_checkpoint['feature_extractor'])
model.classifier.load_state_dict(best_checkpoint['classifier'])
best_accuracy = best_checkpoint['test_acc']
optimizer.load_state_dict(best_checkpoint['optimizer'])
print(f"current best test accuracy is: {best_accuracy}, at step: {loaded_epoch}")

In [None]:
def optimizer_to(optim, device):
    for param in optim.state.values():
        # Not sure there are any global tensors in the state dict
        if isinstance(param, torch.Tensor):
            param.data = param.data.to(device)
            if param._grad is not None:
                param._grad.data = param._grad.data.to(device)
        elif isinstance(param, dict):
            for subparam in param.values():
                if isinstance(subparam, torch.Tensor):
                    subparam.data = subparam.data.to(device)
                    if subparam._grad is not None:
                        subparam._grad.data = subparam._grad.data.to(device)
optimizer_to(optimizer, device)

In [None]:
def get_pre_trained_model():
    print(f"Found a checkpoint, loading checkpoint from")
    # best_checkpoint = torch.load(os.path.join(checkpoint_dir, exp_name,"source",'model_best.pth.tar'))
    best_checkpoint = torch.load("model_best.pth.tar", weights_only=False)
    print('Best model pack loaded')
    loaded_epoch = best_checkpoint['iteration']
    model.feature_extractor.load_state_dict(best_checkpoint['feature_extractor'])
    model.classifier.load_state_dict(best_checkpoint['classifier'])
    best_accuracy = best_checkpoint['test_acc']
    optimizer.load_state_dict(best_checkpoint['optimizer'])
    print(f"current best test accuracy is: {best_accuracy}, at step: {loaded_epoch}")

    def optimizer_to(optim, device):
        for param in optim.state.values():
            # Not sure there are any global tensors in the state dict
            if isinstance(param, torch.Tensor):
                param.data = param.data.to(device)
                if param._grad is not None:
                    param._grad.data = param._grad.data.to(device)
            elif isinstance(param, dict):
                for subparam in param.values():
                    if isinstance(subparam, torch.Tensor):
                        subparam.data = subparam.data.to(device)
                        if subparam._grad is not None:
                            subparam._grad.data = subparam._grad.data.to(device)
    optimizer_to(optimizer, device)
    return model, optimizer

def get_fine_tuned_model():
    print(f"Found a checkpoint, loading checkpoint from")
    best_checkpoint = torch.load(os.path.join(checkpoint_dir, exp_name,"target",'model_best.pth.tar'),weights_only=False)
    print('Best model pack loaded')
    loaded_epoch = best_checkpoint['iteration']
    model.feature_extractor.load_state_dict(best_checkpoint['feature_extractor'])
    model.classifier.load_state_dict(best_checkpoint['classifier'])
    best_accuracy = best_checkpoint['test_acc']
    optimizer.load_state_dict(best_checkpoint['optimizer'])
    print(f"current best test accuracy is: {best_accuracy}, at step: {loaded_epoch}")

# Fine-tune the pre-trained model with 10 fold cross validation

In [None]:
import torch
from torch.utils.data import ConcatDataset, DataLoader, Subset
from sklearn.model_selection import KFold

combined_dataset = ConcatDataset([target_train_loader.dataset, target_val_loader.dataset])

# Step 2: Setup KFold
k_folds = 10
kfold = KFold(n_splits=k_folds, shuffle=True, random_state=42)
accuracies = []
f1s_fault = []
fine_tuned_models = []
# Step 3: Start cross-validation
for fold, (train_idx, val_idx) in enumerate(kfold.split(combined_dataset)):
    print(f"=======================================================")
    print(f"=======================================================")
    print(f"Fold {fold+1}/{k_folds}")

    # Create data loaders for the current fold
    train_subset = Subset(combined_dataset, train_idx)
    val_subset = Subset(combined_dataset, val_idx)

    train_loader = DataLoader(train_subset, batch_size=batch_size, shuffle=True)
    val_loader = DataLoader(val_subset, batch_size=batch_size, shuffle=False)

    # Initialize model (you might want to reinitialize model weights each fold)
    model, optimizer = get_pre_trained_model()

    # Train model
    best_accuracy = 0
    loaded_epoch = 0

    train_losses, val_losses, train_accuracies, val_accuracies = train_model(
        model,
        train_loader,
        target_test_loader,
        epochs=100,
        freeze_feature_extractor=False,
        dataset_part="target"
    )
    print((train_losses, val_losses, train_accuracies, val_accuracies))

    best_model = get_fine_tuned_model()
    fine_tuned_models.append(model)
    current_accuracy, current_f1_fault = plot_confusion_matrix(model, target_test_loader)
    accuracies.append(current_accuracy)
    f1s_fault.append(current_f1_fault)

print(f"{np.array(accuracies).mean()} +- {np.array(accuracies).std()}")
print(f"{np.array(f1s_fault).mean()} +- {np.array(f1s_fault).std()}")

In [None]:
import torch
import numpy as np
import matplotlib.pyplot as plt
from sklearn.metrics import roc_curve, auc


tprs = []
aucs = []
mean_fpr = np.linspace(0, 1, 100)

fig, ax = plt.subplots(figsize=(6, 6))

for i, model in enumerate(fine_tuned_models):
    model.eval()
    all_probs = []
    all_labels = []

    with torch.no_grad():
        for inputs, labels in target_test_loader:
            inputs = inputs.to(device)
            outputs = model(inputs)
            probs = torch.softmax(outputs, dim=1)  # (batch_size, 2)
            pos_probs = probs[:, 1]  # Class 1

            all_probs.extend(pos_probs.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())

    fpr, tpr, _ = roc_curve(all_labels, all_probs)
    roc_auc = auc(fpr, tpr)

    interp_tpr = np.interp(mean_fpr, fpr, tpr)
    interp_tpr[0] = 0.0
    tprs.append(interp_tpr)
    aucs.append(roc_auc)

    ax.plot(fpr, tpr, lw=1, alpha=0.3, label=f"ROC Model {i} (AUC = {roc_auc:.2f})")

# Mean + std deviation across models
mean_tpr = np.mean(tprs, axis=0)
mean_tpr[-1] = 1.0
mean_auc = auc(mean_fpr, mean_tpr)
std_auc = np.std(aucs)

ax.plot(
    mean_fpr,
    mean_tpr,
    color="b",
    label=r"Mean ROC (AUC = %0.2f ± %0.2f)" % (mean_auc, std_auc),
    lw=2,
    alpha=0.8,
)

std_tpr = np.std(tprs, axis=0)
tprs_upper = np.minimum(mean_tpr + std_tpr, 1)
tprs_lower = np.maximum(mean_tpr - std_tpr, 0)

plt.rcParams["figure.dpi"] = 300
ax.plot([0, 1], [0, 1], linestyle="--", lw=1, color="gray", alpha=0.8)

ax.set(
    xlabel="False Positive Rate",
    ylabel="True Positive Rate",
    title="ROC Curves from Fine-Tuned Models",
)
plt.title("ROC Curves from Fine-Tuned Models", weight='bold')
ax.legend(loc="lower right")
plt.grid()
plt.tight_layout()
plt.show()


# Explainability

In [None]:
import shap
shap.initjs()


def convert_loader_to_background_dict(dataloader, max_samples=1000):
    """
    Converts a PyTorch DataLoader to a background data dictionary for deep integration.

    Parameters:
        dataloader (DataLoader): PyTorch DataLoader with each sample shaped (m x n).
        max_samples (int): Maximum number of samples to use for background data.

    Returns:
        dict: {'pytorch': [torch.Tensor]} where the tensor is (N, m, n)
    """
    background_samples = []
    total_collected = 0

    for batch in dataloader:
        # Unpack batch: assume (data, label)
        if isinstance(batch, (list, tuple)):
            data = batch[0]
        else:
            data = batch

        for sample in data:
            background_samples.append(sample)
            total_collected += 1
            if total_collected >= max_samples:
                break
        if total_collected >= max_samples:
            break

    # Stack into a single tensor (N, m, n)
    background_tensor = torch.stack(background_samples)
    return background_tensor


In [None]:
test_tensor = convert_loader_to_background_dict(target_test_loader).to(device)
train_tensor = convert_loader_to_background_dict(target_train_loader).to(device)

explainer = shap.DeepExplainer(model, train_tensor)
shap_values = explainer.shap_values(test_tensor, check_additivity = False)

In [None]:
columns = ["FIT101", "LIT101", "MV101", "P101", "AIT201", "AIT202", "AIT203", "FIT201", "MV201", "P203", "DPIT301", "FIT301", "LIT301", "MV301", "MV302", "MV303", "MV304", "AIT402", "FIT401", "LIT401", "AIT501", "AIT502", "AIT503", "AIT504", "FIT501", "FIT502", "FIT503", "PIT501", "PIT502", "PIT503"]

In [None]:
class_0 = shap_values[0]
class_0 = class_0.mean(axis = 1)
df_0 = pd.DataFrame(class_0, columns=columns)

value_tensor = test_tensor.clone()
value_tensor = value_tensor.mean(axis = 1).to("cpu")
color_df = pd.DataFrame(value_tensor, columns=columns)
color_df=(color_df-color_df.min())/(color_df.max() - color_df.min())


df_long_0 = df_0.melt(var_name="Feature", value_name="Value")
df_long_color = color_df.melt(var_name="Feature", value_name="Color")

df_long_0["Color"] = df_long_color["Color"]


In [None]:
import seaborn
sns.set_theme(rc={'figure.figsize':(16,8)})
seaborn.swarmplot(data=df_long_0, x="Feature", y="Value", size=2, hue="Color", )
# seaborn.violinplot(data=df_long, x="Feature", y="Value")
plt.xticks(rotation=90)

plt.show()


In [None]:
feature_importance = np.mean(np.abs(class_0), axis=0)

# Create a DataFrame for easy viewing
importance_df = pd.DataFrame({
    'Feature': columns,   # list of feature names
    'Importance': feature_importance
}).sort_values(by='Importance', ascending=False)

sorted_features = importance_df.sort_values(by="Importance", ascending=False)["Feature"].tolist()
df_long_0["Feature"] = pd.Categorical(df_long_0["Feature"], categories=sorted_features, ordered=True)


In [None]:
from matplotlib.colors import LinearSegmentedColormap
import matplotlib as mpl
import numpy as np

mpl.style.use('default')
plt.rcParams['grid.color'] = (0.5, 0.5, 0.5, 0.1)

custom_colors = ['#0072B2', '#700882',  '#e83517']
cm = LinearSegmentedColormap.from_list(
        "Custom", custom_colors, N=20)



df_long_0["Feature"] = pd.Categorical(df_long_0["Feature"], categories=sorted_features, ordered=True)
important_feature_count = 15
important_feature = sorted_features[:important_feature_count]
df_long_important_features =  df_long_0[df_long_0["Feature"].isin(important_feature)]
df_long_important_features["Feature"] = df_long_important_features["Feature"].cat.remove_unused_categories()
# 3. Plot with seaborn using the new sorted Feature order
plt.figure(figsize=(11, 7))
sns.swarmplot(data=df_long_important_features, x="Feature", y="Value", size=5, hue="Color", palette=cm, legend = False)

plt.title('SHAP values: impact on model output', weight='bold')

plt.xticks(rotation=90)
plt.grid(True)
plt.tight_layout()
plt.axhline(y=0, color='black', linestyle='-',lw = 0.5)

plt.show()


In [None]:
importance_df["Importance"] = importance_df["Importance"] / importance_df["Importance"].sum()
# plt.xticks(rotation=90)
plt.figure(figsize=(8, 10))
seaborn.barplot(data = importance_df, x = "Importance", y = "Feature", orient = 'h', color = custom_colors[0])
plt.grid(True)
plt.title("SHAP Feature Importance", weight='bold')
plt.ylabel("Features")
plt.xlabel("SHAP Feature Importance")
plt.show()