In [61]:
import torch
import torch.nn as nn
from torch.optim import Adam
from torch.utils.data import DataLoader, TensorDataset
from sklearn.metrics import classification_report, roc_auc_score
from imblearn.over_sampling import SMOTE
from imblearn.under_sampling import NearMiss
import numpy as np
from sklearn.model_selection import train_test_split

# Positional Encoding
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_len=5000):
        super(PositionalEncoding, self).__init__()
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-np.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        self.pe = pe.unsqueeze(0)

    def forward(self, x):
        L = x.size(1)
        return x + self.pe[:, :L, :].to(x.device)

# Transformer Detector Model
class TransformerDetector(nn.Module):
    def __init__(self, input_size, d_model=128, nhead=8, num_layers=2, dim_feedforward=256, dropout=0.1):
        super(TransformerDetector, self).__init__()
        self.embedding = nn.Linear(input_size, d_model)
        self.positional_encoding = PositionalEncoding(d_model)
        encoder_layer = nn.TransformerEncoderLayer(d_model=d_model, nhead=nhead, dim_feedforward=dim_feedforward, dropout=dropout, batch_first=True)
        self.transformer_encoder = nn.TransformerEncoder(encoder_layer, num_layers=num_layers)
        self.fc = nn.Sequential(
            nn.Linear(d_model, 128),
            nn.ReLU(),
            nn.Linear(128, 1),
            nn.Sigmoid()
        )

    def forward(self, x):
        if x.dim() == 2:  # Nếu không có chiều sequence
            x = x.unsqueeze(1)  # [B, d_input] -> [B, 1, d_input]
        x = self.embedding(x)  # [B, L, d_model]
        x = self.positional_encoding(x)  # Add positional encoding
        x = self.transformer_encoder(x)  # [B, L, d_model]
        x = x.mean(dim=1)  # Average pooling over sequence length
        return self.fc(x)



In [62]:
# Load Data and Preprocess
def load_adbench_data(dataset_path):
    data = np.load(dataset_path)
    X = data['X']
    y = data['y']
    return torch.tensor(X, dtype=torch.float32), torch.tensor(y, dtype=torch.float32)

def evaluate_with_classification_report_and_auc(model, test_loader, device, threshold=0.5):
    """
    Đánh giá mô hình với classification_report và in AUC-ROC.
    """
    model.eval()
    all_preds, all_labels = [], []
    with torch.no_grad():
        for X_batch, y_batch in test_loader:
            X_batch, y_batch = X_batch.to(device), y_batch.to(device)
            y_pred = model(X_batch).squeeze()
            all_preds.append(y_pred.cpu())
            all_labels.append(y_batch.cpu())

    # Gộp tất cả batch lại
    preds = torch.cat(all_preds).numpy()
    labels = torch.cat(all_labels).numpy()

    # Chuyển dự đoán thành nhị phân dựa trên threshold
    binary_preds = (preds > threshold).astype(int)

    # Classification report
    report = classification_report(labels, binary_preds, target_names=['Class 0', 'Class 1'])
    print(report)

    # Tính AUC-ROC
    if len(set(labels)) > 1:  # Chỉ tính nếu có cả 2 lớp
        aucroc = roc_auc_score(labels, preds)
        print(f"AUC-ROC: {aucroc:.4f}")
    else:
        aucroc = None
        print("AUC-ROC: Undefined (only one class present in labels)")

    return report, aucroc

# Train Function
def train_detector(model, train_loader, optimizer, criterion, device):
    model.train()
    total_loss = 0
    for X_batch, y_batch in train_loader:
        X_batch, y_batch = X_batch.to(device), y_batch.to(device)
        y_pred = model(X_batch).squeeze()
        loss = criterion(y_pred, y_batch)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
    return total_loss / len(train_loader)



In [63]:

dataset_path = r"D:\Study\Code\SwiftHydra\Classical\12_fault.npz"  # Replace with your dataset path
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Load Data
X, y = load_adbench_data(dataset_path)

# Chia dữ liệu gốc trước khi áp dụng SMOTE
X_train, X_test, y_train, y_test = train_test_split(X.numpy(), y.numpy(), test_size=0.2, random_state=42, stratify=y)


# Handle Imbalanced Data with SMOTE
smote = SMOTE()

# Áp dụng SMOTE chỉ cho tập huấn luyện
X_resampled, y_resampled = smote.fit_resample(X_train, y_train)
# Chuyển lại thành tensor
X_train = torch.tensor(X_resampled, dtype=torch.float32)
y_train = torch.tensor(y_resampled, dtype=torch.float32)
X_test = torch.tensor(X_test, dtype=torch.float32)
y_test = torch.tensor(y_test, dtype=torch.float32)

# Kiểm tra phân phối
unique, counts = np.unique(y_train, return_counts=True)
print(f"Class distribution of training set after SMOTE: {dict(zip(unique, counts))}")

unique, counts = np.unique(y_test, return_counts=True)
print(f"Class distribution of testing set: {dict(zip(unique, counts))}")
# Create DataLoaders
train_dataset = TensorDataset(X_train, y_train)
test_dataset = TensorDataset(X_test, y_test)
train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=64)



Class distribution of training set after SMOTE: {0.0: 1014, 1.0: 1014}
Class distribution of testing set: {0.0: 254, 1.0: 135}


In [65]:
# Initialize Model
model = TransformerDetector(input_size=X.shape[1]).to(device)
optimizer = Adam(model.parameters(), lr=1e-3)
criterion = nn.BCELoss()

# Train and Evaluate
num_epochs = 200
for epoch in range(num_epochs):
    train_loss = train_detector(model, train_loader, optimizer, criterion, device)
    print(f"Epoch {epoch + 1}/{num_epochs}, Loss: {train_loss:.4f}")
    
    # Evaluate with classification report
    print("Classification Report on Test Set:")
    evaluate_with_classification_report_and_auc(model, test_loader, device)


Epoch 1/200, Loss: 0.5920
Classification Report on Test Set:
              precision    recall  f1-score   support

     Class 0       0.90      0.36      0.51       254
     Class 1       0.43      0.93      0.59       135

    accuracy                           0.56       389
   macro avg       0.67      0.64      0.55       389
weighted avg       0.74      0.56      0.54       389

AUC-ROC: 0.7587
Epoch 2/200, Loss: 0.5200
Classification Report on Test Set:
              precision    recall  f1-score   support

     Class 0       0.80      0.69      0.74       254
     Class 1       0.54      0.67      0.60       135

    accuracy                           0.69       389
   macro avg       0.67      0.68      0.67       389
weighted avg       0.71      0.69      0.69       389

AUC-ROC: 0.7909
Epoch 3/200, Loss: 0.4814
Classification Report on Test Set:
              precision    recall  f1-score   support

     Class 0       0.85      0.70      0.76       254
     Class 1       0.5