## 0. Dataset and DataLoader

In [None]:
import torch
import torch.nn as nn
import pandas as pd
from torch.utils.data import DataLoader, Dataset, Subset,TensorDataset
from sklearn.preprocessing import MinMaxScaler
import pickle
import numpy as np
import torch.nn.functional as F
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score

class WindPowerDataset(Dataset):
    def __init__(self, csv_file, wind_power_scaler=None, weather_scaler=None, save_scalers=False):
        self.data = pd.read_csv(csv_file)

        self.data = self.data.iloc[::5, :].reset_index(drop=True)

        self.original_wind_power_std = self.data.iloc[:, 2].std()
        self.original_weather_std = self.data.iloc[:, 4:12].std()

        if wind_power_scaler is None or weather_scaler is None:
            self.wind_power_scaler = MinMaxScaler()
            self.weather_scaler = MinMaxScaler()

            self.data.iloc[:, 2] = self.wind_power_scaler.fit_transform(self.data.iloc[:, 2].values.reshape(-1, 1)).squeeze()

            self.data.iloc[:, 4:12] = self.weather_scaler.fit_transform(self.data.iloc[:, 4:12])
            
            if save_scalers:
                with open('wind_power_scaler.pkl', 'wb') as f:
                    pickle.dump(self.wind_power_scaler, f)
                with open('weather_scaler.pkl', 'wb') as f:
                    pickle.dump(self.weather_scaler, f)
        else:
            self.wind_power_scaler = wind_power_scaler
            self.weather_scaler = weather_scaler
            self.data.iloc[:, 2] = self.wind_power_scaler.transform(self.data.iloc[:, 2].values.reshape(-1, 1)).squeeze()
            self.data.iloc[:, 4:12] = self.weather_scaler.transform(self.data.iloc[:, 4:12])
    
    def __len__(self):
        return len(self.data) - 312  # 288 (1440/5) + 24 (120/5)
    
    def __getitem__(self, idx):
        history_weather = self.data.iloc[idx:idx + 288, 4:12].values.astype(float)  # [288, 8]
        wind_power_history = self.data.iloc[idx:idx + 288, 2].values.astype(float)   # [288]
        future_weather = self.data.iloc[idx + 288:idx + 312, 4:12].values.astype(float)  # [24, 8]
        future_wind_power = self.data.iloc[idx + 312, 2]  # float

        return (
            torch.tensor(history_weather, dtype=torch.float32),
            torch.tensor(wind_power_history, dtype=torch.float32),
            torch.tensor(future_weather, dtype=torch.float32),
            torch.tensor(future_wind_power, dtype=torch.float32)
        )
    
    def get_original_stds(self):
        return {
            'original_wind_power_std': self.original_wind_power_std,
            'original_weather_std': self.original_weather_std.to_dict()
        }
    
name='CAISO_zone_1_.csv'
with open('weather_scaler.pkl', 'rb') as f:
    weather_scaler = pickle.load(f)
dataset = WindPowerDataset(name, save_scalers=True)
wind_power_scaler=dataset.wind_power_scaler
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
batch_size=32
test_split=0.2
test_size = int(len(dataset) * test_split)
train_size = len(dataset) - test_size
train_dataset = Subset(dataset, list(range(train_size)))
test_dataset = Subset(dataset, list(range(train_size, len(dataset))))
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

## 1. Preparation

In [None]:
with open('rup_ensemble_003_caiso.pkl', 'rb') as f:
    rup_ensemble_003 = pickle.load(f)

with open('rup_ensemble_005_caiso.pkl', 'rb') as f:
    rup_ensemble_005 = pickle.load(f)

with open('rup_ensemble_010_caiso.pkl', 'rb') as f:
    rup_ensemble_010 = pickle.load(f)

with open('RUPW010_caiso.pkl', 'rb') as f:
    rupw_010 = pickle.load(f)

with open('RUPW005_caiso.pkl', 'rb') as f:
    rupw_005 = pickle.load(f)

with open('RUPW003_caiso.pkl', 'rb') as f:
    rupw_003 = pickle.load(f)


In [None]:
uap_loaded_list = []

uap_loaded_list.append(rup_ensemble_003)
uap_loaded_list.append(rup_ensemble_005)
uap_loaded_list.append(rup_ensemble_010)
uap_loaded_list.append(rupw_010)
uap_loaded_list.append(rupw_005)
uap_loaded_list.append(rupw_003)


## Autoencoder

In [None]:
print("Loading normal data from test_loader...")
normal_sequences = []

for batch in test_loader:
    history_weather = batch[0].cpu().numpy()  # (B, 288, 8)
    future_weather = batch[2].cpu().numpy()   # (B, 24, 8)
    full_seq = np.concatenate([history_weather, future_weather], axis=1)  # (B, 312, 8)
    normal_sequences.append(full_seq)

normal_sequences = np.concatenate(normal_sequences, axis=0)  # (N, 312, 8)
print(f"Normal data shape: {normal_sequences.shape}")

X_normal = normal_sequences.reshape(normal_sequences.shape[0], -1).astype(np.float32)
print(f"Flattened normal data shape: {X_normal.shape}")

input_dim = 312 * 8  # 2496
encoding_dim = 128   # hyperparameter, suggested 64~512

class Autoencoder(nn.Module):
    def __init__(self, input_dim, encoding_dim):
        super(Autoencoder, self).__init__()
        self.encoder = nn.Sequential(
            nn.Linear(input_dim, 512),
            nn.ReLU(),
            nn.Linear(512, 256),
            nn.ReLU(),
            nn.Linear(256, encoding_dim)
        )
        self.decoder = nn.Sequential(
            nn.Linear(encoding_dim, 256),
            nn.ReLU(),
            nn.Linear(256, 512),
            nn.ReLU(),
            nn.Linear(512, input_dim),
            nn.Sigmoid()  
        )
    
    def forward(self, x):
        encoded = self.encoder(x)
        decoded = self.decoder(encoded)
        return decoded

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
autoencoder = Autoencoder(input_dim, encoding_dim).to(device)

print("Training Autoencoder on normal data...")

# DataLoader
train_tensor = torch.tensor(X_normal)
train_dataset = TensorDataset(train_tensor)
train_loader_ae = DataLoader(train_dataset, batch_size=64, shuffle=True)

criterion = nn.MSELoss()
optimizer = torch.optim.Adam(autoencoder.parameters(), lr=1e-3)

epochs = 50
autoencoder.train()
for epoch in range(epochs):
    total_loss = 0
    for batch in train_loader_ae:
        x = batch[0].to(device)
        optimizer.zero_grad()
        recon = autoencoder(x)
        loss = criterion(recon, x)
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
    if (epoch + 1) % 10 == 0:
        print(f"Epoch {epoch+1}/{epochs}, Loss: {total_loss/len(train_loader_ae):.6f}")

for i in range(len(uap_loaded_list)):
    uap_loaded = uap_loaded_list[i].cpu().numpy()  # (24, 8)

    # generate attacked data
    attacked_sequences = normal_sequences.copy()
    attacked_sequences[:, 288:, :] += uap_loaded
    attacked_sequences = np.clip(attacked_sequences, 0.0, 1.0)
    X_attacked = attacked_sequences.reshape(attacked_sequences.shape[0], -1).astype(np.float32)

    # generate test set
    X_test = np.concatenate([X_normal, X_attacked], axis=0)  # (2N, 2496)
    y_test = np.concatenate([np.ones(len(X_normal)), np.zeros(len(X_attacked))], axis=0)  # normal=1, abnormal=0

    # Reconstructing error as anomaly score
    autoencoder.eval()
    with torch.no_grad():
        X_test_tensor = torch.tensor(X_test).to(device)
        recon_test = autoencoder(X_test_tensor)
        recon_error = torch.mean((recon_test - X_test_tensor) ** 2, dim=1).cpu().numpy()  # (2N,)

    # Select threshold: use 95th percentile (or 99th) of normal data
    normal_errors = recon_error[:len(X_normal)]
    threshold = np.percentile(normal_errors, 95)  # 90, 95, 99
    print(f"Reconstruction error threshold (95th percentile): {threshold:.6f}")

    # Predict: error <= threshold → normal (1), else abnormal (0)
    y_pred = (recon_error <= threshold).astype(int)

    # Evaluation indicators (positive=normal=1)
    acc = accuracy_score(y_test, y_pred)
    precision = precision_score(y_test, y_pred, pos_label=1)
    recall = recall_score(y_test, y_pred, pos_label=1)
    f1 = f1_score(y_test, y_pred, pos_label=1)

    print("\n" + "="*50)
    print(f"Autoencoder Anomaly Detection - UAP {i+1}")
    print("="*50)
    print(f"Test samples: {len(X_test)} (half normal, half attacked)")
    print(f"Accuracy:  {acc:.4f}")
    print(f"Precision: {precision:.4f}")
    print(f"Recall:    {recall:.4f}")
    print(f"F1-Score:  {f1:.4f}")
    print(f"Recon error range: [{recon_error.min():.6f}, {recon_error.max():.6f}]")
    print('\n')

## Isolation Forest

In [None]:
from sklearn.ensemble import IsolationForest

print("Loading normal data from test_loader...")
normal_sequences = []

for batch in test_loader:
    history_weather = batch[0].cpu().numpy()  # (B, 288, 8)
    future_weather = batch[2].cpu().numpy()  # (B, 24, 8)
    full_seq = np.concatenate([history_weather, future_weather], axis=1)  # (B, 312, 8)
    normal_sequences.append(full_seq)

normal_sequences = np.concatenate(normal_sequences, axis=0)  # (N, 312, 8)
print(f"Normal data shape: {normal_sequences.shape}")

X_normal = normal_sequences.reshape(normal_sequences.shape[0], -1)  # (N, 2496)
print(f"Flattened normal data shape: {X_normal.shape}")

for i in range(len(uap_loaded_list)):
    uap_loaded = uap_loaded_list[i].cpu().numpy()  # (24, 8)

    attacked_sequences = normal_sequences.copy()
    attacked_sequences[:, 288:, :] += uap_loaded  
    attacked_sequences = np.clip(attacked_sequences, 0.0, 1.0)  

    X_attacked = attacked_sequences.reshape(attacked_sequences.shape[0], -1)  # (N, 2496)

    X_train = X_normal  

    X_test = np.concatenate([X_normal, X_attacked], axis=0)  # (2N, 2496)

    y_test = np.concatenate([
        np.ones(X_normal.shape[0]),            # normal: 1 
        np.zeros(X_attacked.shape[0])          # abnormal: 0 
    ], axis=0)

    print("Training Isolation Forest...")
    iso_forest = IsolationForest(
        contamination=0.5,       
        random_state=42,
        n_estimators=100,
        max_samples='auto'
    )
    iso_forest.fit(X_train)

    y_pred = iso_forest.predict(X_test) 

    y_pred = (y_pred == 1).astype(int)  # True if normal → 1, else 0

    acc = accuracy_score(y_test, y_pred)
    precision = precision_score(y_test, y_pred, pos_label=1)  
    recall = recall_score(y_test, y_pred, pos_label=1)        
    f1 = f1_score(y_test, y_pred, pos_label=1)

    print("\n" + "="*50)
    print("Isolation Forest:")
    print("="*50)
    print(f"Test samples: {len(X_test)} (half normal, half attacked)")
    print(f"Accuracy:  {acc:.4f}")
    print(f"Precision: {precision:.4f} ")
    print(f"Recall:    {recall:.4f} ")
    print(f"F1-Score:  {f1:.4f}")


## VAR

In [None]:
from scipy.stats import chi2
import warnings
warnings.filterwarnings("ignore")

def collect_training_weather_sequences(loader):
    sequences = []
    with torch.no_grad():
        for _, _, weather_future, _ in loader:
            batch_np = weather_future.cpu().numpy()  # (B, 24, 8)
            sequences.append(batch_np)
    sequences = np.concatenate(sequences, axis=0)  # (N, 24, 8)
    return sequences

def fit_global_var_model_on_subsequences(sequences, max_lag=2):

    p = max_lag
    all_subseq = []
    for seq in sequences:
        for t in range(p, 24):
            window = seq[t - p : t + 1]  # (p+1, 8)
            all_subseq.append(window)
    
    all_subseq = np.array(all_subseq)  # (M, p+1, 8)
    M, _, k = all_subseq.shape
    
    Y_target = all_subseq[:, -1, :]  # (M, 8)
    Z = all_subseq[:, :p, :]         # (M, p, 8)
    Z_flat = Z.reshape(M, -1)        # (M, p*8)

    X = np.column_stack([Z_flat, np.ones(M)])  
    try:
        beta, res, rank, s = np.linalg.lstsq(X, Y_target, rcond=1e-10)
    except np.linalg.LinAlgError as e:
        raise RuntimeError(f"Failed to fit VAR model: {e}")

    B = beta[:-1].reshape(p, k, k).transpose(1, 0, 2)  # (k, p, k)
    intercept = beta[-1]

    pred = Z_flat @ beta[:-1] + beta[-1]
    residuals = Y_target - pred
    sigma_u = np.cov(residuals, rowvar=False)
    if sigma_u.shape == ():
        sigma_u = np.eye(k) * 1e-6

    return {
        'B': B,
        'intercept': intercept,
        'sigma_u': sigma_u,
        'lag': p,
        'k': k
    }

def compute_var_score_for_sequence(sequence, var_params):
    p = var_params['lag']
    k = var_params['k']
    if len(sequence) < p + 1:
        raise ValueError("Sequence too short")

    inv_sigma = np.linalg.pinv(var_params['sigma_u'])
    scores = []

    for t in range(p, len(sequence)):
        Z_t = sequence[t - p : t]  # (p, k)
        Z_flat = Z_t.flatten()
        pred = Z_flat @ var_params['B'].reshape(p * k, k) + var_params['intercept']
        residual = sequence[t] - pred
        maha_sq = residual @ inv_sigma @ residual.T
        scores.append(maha_sq)

    return np.mean(scores)  # Return the square of the average Mahalanobis distance

def evaluate_detection_performance(var_params, test_loader, uap_tensor, alpha=0.01):

    print("Evaluating detection performance...")

    NORMAL_LABEL = 1  
    ANOMALY_LABEL = 0 

    # Step 1: Estimate threshold on normal test data (99% confidence level)
    normal_scores = []
    normal_labels = []  

    with torch.no_grad():
        for _, _, weather_future, _ in test_loader:
            weather_batch = weather_future.cpu().numpy()  # (B, 24, 8)
            for seq in weather_batch:
                score = compute_var_score_for_sequence(seq, var_params)
                normal_scores.append(score)
                normal_labels.append(NORMAL_LABEL) 

    normal_scores = np.array(normal_scores)
    mean_score = normal_scores.mean()
    std_score = normal_scores.std()
    threshold = mean_score + 2.576 * std_score  
    print(f"Threshold (99%): {threshold:.4f}")
    print(f"Normal scores: mean={mean_score:.4f}, std={std_score:.4f}")

    # Step 2: Collect attack sample scores
    attacked_scores = []
    attacked_labels = []  

    uap = uap_tensor.cpu().numpy()

    with torch.no_grad():
        for _, _, weather_future, _ in test_loader:
            weather_batch = weather_future.cpu().numpy()
            attacked_batch = np.clip(weather_batch + uap, 0, 1)  

            for seq in attacked_batch:
                score = compute_var_score_for_sequence(seq, var_params)
                attacked_scores.append(score)
                attacked_labels.append(ANOMALY_LABEL)  

    # Step 3: Merge normal and attack samples (randomly shuffle, maintain balance)
    all_scores = np.array(normal_scores.tolist() + attacked_scores)
    all_labels = np.array(normal_labels + attacked_labels)

    pred_labels = (all_scores <= threshold).astype(int)  

    # Step 4: calculated metrics
    from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score

    acc = accuracy_score(all_labels, pred_labels)
    prec = precision_score(all_labels, pred_labels, pos_label=NORMAL_LABEL)
    rec = recall_score(all_labels, pred_labels, pos_label=NORMAL_LABEL)
    f1 = f1_score(all_labels, pred_labels, pos_label=NORMAL_LABEL)

    print(f"Detection Performance:")
    print(f"  Accuracy:  {acc:.4f}")
    print(f"  Precision: {prec:.4f}")
    print(f"  Recall:    {rec:.4f}")
    print(f"  F1-Score:  {f1:.4f}")
    print(f"  Threshold: {threshold:.4f}")
    print(f"  Total Samples: {len(all_labels)} (normal: {len(normal_labels)}, attacked: {len(attacked_labels)})")

    return {
        'accuracy': acc,
        'precision': prec,
        'recall': rec,
        'f1_score': f1,
        'threshold': threshold,
        'all_labels': all_labels,
        'pred_labels': pred_labels,
        'all_scores': all_scores
    }

In [None]:
print("Collecting training sequences...")
train_sequences = collect_training_weather_sequences(train_loader)

print("Fitting VAR model on subsequences...")
var_params = fit_global_var_model_on_subsequences(train_sequences, max_lag=2)

evaluation_uaps = {
    'RUP (0.1)': rup_ensemble_010,
    'Simple Average (0.1)': rupw_010,
    'RUP (0.05)': rup_ensemble_005,
    'Simple Average (0.05)': rupw_005,
    'RUP (0.03)': rup_ensemble_003,
    'Simple Average (0.03)': rupw_003,
}

results = {}
for name, uap_tensor in evaluation_uaps.items():
    print(f"\n{'='*50}")
    print(f"Evaluating: {name}")
    print(f"{'='*50}")
    result = evaluate_detection_performance(var_params, test_loader, uap_tensor, alpha=0.01)
    results[name] = result

print("\n\n" + "="*60)
print("SUMMARY: Detection Performance Comparison")
print("="*60)
print(f"{'Method':<20} {'Acc':<8} {'Prec':<8} {'Recall':<8} {'F1':<8}")
print("-" * 60)
for name, res in results.items():
    print(f"{name:<20} {res['accuracy']:<8.4f} {res['precision']:<8.4f} {res['recall']:<8.4f} {res['f1_score']:<8.4f}")

## ODIN

In [None]:
from sklearn.metrics import roc_auc_score, accuracy_score, precision_recall_fscore_support
print("Loading FUTURE_weather for Autoencoder training...")
sequences = []

for batch in train_loader:
    future_weather = batch[2].cpu().numpy()  
    sequences.append(future_weather)

sequences = np.concatenate(sequences, axis=0)  # (N, 24, 8)
print(f"Total future_weather sequences for training Autoencoder: {sequences.shape}")

X_train_tensor = torch.tensor(sequences, dtype=torch.float32)
dataset_ae = TensorDataset(X_train_tensor, X_train_tensor)
dataloader_ae = DataLoader(dataset_ae, batch_size=64, shuffle=True)

In [None]:
class LSTMEncoder(nn.Module):
    def __init__(self, input_size=8, hidden_size=64, num_layers=2):
        super(LSTMEncoder, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True, dropout=0.3)

    def forward(self, x):
        outputs, (h, c) = self.lstm(x)
        return outputs, (h, c)


class LSTMDecoder(nn.Module):
    def __init__(self, hidden_size=64, output_size=8, num_layers=2):
        super(LSTMDecoder, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.lstm = nn.LSTM(output_size, hidden_size, num_layers, batch_first=True, dropout=0.3)
        self.output_layer = nn.Linear(hidden_size, output_size)

    def forward(self, x, hidden):
        lstm_out, _ = self.lstm(x, hidden)
        output = self.output_layer(lstm_out)
        return output


class LSTMAutoencoder(nn.Module):
    def __init__(self, input_size=8, hidden_size=64, num_layers=2):
        super(LSTMAutoencoder, self).__init__()
        self.encoder = LSTMEncoder(input_size, hidden_size, num_layers)
        self.decoder = LSTMDecoder(hidden_size, input_size, num_layers)

    def forward(self, x):
        _, (h, c) = self.encoder(x)
        recon = self.decoder(x, (h, c))  
        return recon

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = LSTMAutoencoder(input_size=8, hidden_size=64, num_layers=2).to(device)

criterion = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.0001)

num_epochs = 100
print("Training Autoencoder on train_loader data...")

model.train()
for epoch in range(num_epochs):
    total_loss = 0.0
    for data, _ in dataloader_ae:
        data = data.to(device)
        optimizer.zero_grad()
        recon = model(data)
        loss = criterion(recon, data)
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
        optimizer.step()
        total_loss += loss.item()
    
    if (epoch + 1) % 10 == 0:
        print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {total_loss/len(dataloader_ae):.6f}")

print("Autoencoder training completed!")
torch.save(model.state_dict(), "autoencoder_weather_pretrained_caiso.pth")
print("Saved pre-trained autoencoder: autoencoder_weather_pretrained_caiso.pth")

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = LSTMAutoencoder(input_size=8, hidden_size=64, num_layers=2).to(device)
model.load_state_dict(torch.load("autoencoder_weather_pretrained_caiso.pth"))
model.eval()  
print("Loaded pre-trained autoencoder for ODIN.")

In [None]:
batch_size = 64
epsilon = 0.01
alpha = 0.01
device = 'cuda' if torch.cuda.is_available() else 'cpu'

def odin_score_ae(x, model, T=1000, epsilon=0.01, device='cuda'):
    model.train()
    x = x.to(device).requires_grad_()
    recon = model(x)
    loss = F.mse_loss(recon, x, reduction='sum')  
    grad = torch.autograd.grad(loss, x, create_graph=False)[0]
    x_perturbed = x - epsilon * grad.sign()
    recon_pert = model(x_perturbed)
    perturbed_loss = F.mse_loss(recon_pert, x_perturbed, reduction='none').mean(dim=[1,2])
    odin_scores = -perturbed_loss.cpu().detach().numpy()
    return odin_scores

normal_data_list = [sample[2] for sample in test_dataset]
normal_data = torch.stack(normal_data_list).float().to(device)

all_normal_scores = []
for i in range(0, len(normal_data), batch_size):
    batch = normal_data[i:i+batch_size]
    scores = odin_score_ae(batch, model, epsilon=epsilon, device=device)
    all_normal_scores.extend(scores)
normal_scores = np.array(all_normal_scores)

threshold = np.percentile(normal_scores, alpha * 100)  
print(f"Threshold: {threshold:.6f} (keep 99% normal samples)")

for uap_idx, uap_noise in enumerate(uap_loaded_list):
    uap_noise = uap_noise.to(device)
    attacked_data = torch.clamp(normal_data + uap_noise, 0, 1)

    all_attacked_scores = []
    for i in range(0, len(attacked_data), batch_size):
        batch = attacked_data[i:i+batch_size]
        scores = odin_score_ae(batch, model, epsilon=epsilon, device=device)
        all_attacked_scores.extend(scores)
    attacked_scores = np.array(all_attacked_scores)

    scores_all = np.concatenate([normal_scores, attacked_scores])
    labels_all = np.concatenate([
        np.ones(len(normal_scores)),      
        np.zeros(len(attacked_scores))    
    ])

    # AUC: The higher the score, the more normal it is → Normal is a positive example → Labels=1 → Directly use scores
    auc = roc_auc_score(labels_all, scores_all)

    pred_labels = (scores_all >= threshold).astype(int)  

    acc = accuracy_score(labels_all, pred_labels)
    prec = precision_score(labels_all, pred_labels, pos_label=1)
    rec = recall_score(labels_all, pred_labels, pos_label=1)  
    f1 = f1_score(labels_all, pred_labels, pos_label=1)

    fpr = 1 - (pred_labels[:len(normal_scores)] == 1).mean()
    tnr = (pred_labels[len(normal_scores):] == 0).mean()  

    print(f"[UAP {uap_idx}] AUC: {auc:.4f}, Acc: {acc:.4f}, F1: {f1:.4f}")
    print(f"             Normal:  {normal_scores.mean():.6f} ± {normal_scores.std():.6f}")
    print(f"             Attacked: {attacked_scores.mean():.6f} ± {attacked_scores.std():.6f}")
    print(f"             Threshold: {threshold:.6f} (α={alpha:.3f})")
    print(f"             Precision: {prec:.6f}, Recall: {rec:.6f}")
    #print(f"             FPR: {fpr:.6f}, Specificity: {tnr:.6f}")

model.eval()
print("\n✅ Model set back to eval mode.")

## 3-sigma and boxplot

In [None]:
data = pd.read_csv(name).iloc[::5, 4:12]
max_value = data.max()
min_value = data.min()
q1 = data.quantile(0.25)
q3 = data.quantile(0.75)
iqr = q3 - q1
lower_bound = q1 - 1.5 * iqr
upper_bound = q3 + 1.5 * iqr
lower_bound_array = lower_bound.values
upper_bound_array = upper_bound.values
weather_stds = dataset.get_original_stds()
weather_stds_array = np.array(list(weather_stds['original_weather_std'].values()))  
weather_data = pd.read_csv(name)
weather_data = weather_data.iloc[::5, :].reset_index(drop=True)
weather_mean = weather_data.iloc[:, 4:12].mean()

In [None]:
def adversarial_attack_all(uap_loaded, weather_mean, lower_bound_array, upper_bound_array, weather_stds_array, weather_scaler, data_loader, device='cpu'):
    lower_bound = torch.tensor(lower_bound_array).to(device)
    upper_bound = torch.tensor(upper_bound_array).to(device)
    weather_mean = torch.tensor(weather_mean).to(device)
    weather_stds = torch.tensor(weather_stds_array).to(device)

    NORMAL_LABEL = 1
    ANOMALY_LABEL = 0

    y_true_xigema = []
    y_pred_xigema = []
    y_true_xiang = []
    y_pred_xiang = []

    for history_weather, wind_history, weather_future, future_wind_power in data_loader:
        B, T, C = weather_future.shape  # batch_size, seq_len, channels

        weather_future = weather_future.to(device)

        adversarial_weather = weather_future + uap_loaded
        adversarial_weather = torch.clamp(adversarial_weather, 0, 1)

        normal_inv = weather_scaler.inverse_transform(
            weather_future.reshape(-1, 8).detach().cpu().numpy()
        ).reshape(B, T, C)

        adv_inv = weather_scaler.inverse_transform(
            adversarial_weather.reshape(-1, 8).detach().cpu().numpy()
        ).reshape(B, T, C)

        normal_inv = np.array(normal_inv)
        adv_inv = np.array(adv_inv)

        for i in range(B):
            normal_sample = normal_inv[i]  
            adv_sample = adv_inv[i]       

            mean_np = weather_mean.cpu().numpy()
            stds_np = weather_stds.cpu().numpy()
            lower_3sigma = mean_np - 3 * stds_np
            upper_3sigma = mean_np + 3 * stds_np

            if np.any((normal_sample < lower_3sigma) | (normal_sample > upper_3sigma)):
                pred_normal_xigema = ANOMALY_LABEL  
            else:
                pred_normal_xigema = NORMAL_LABEL  

            if np.any((adv_sample < lower_3sigma) | (adv_sample > upper_3sigma)):
                pred_adv_xigema = ANOMALY_LABEL 
            else:
                pred_adv_xigema = NORMAL_LABEL  

            y_true_xigema.extend([NORMAL_LABEL, ANOMALY_LABEL])  
            y_pred_xigema.extend([pred_normal_xigema, pred_adv_xigema])

            if np.any((normal_sample < lower_bound_array) | (normal_sample > upper_bound_array)):
                pred_normal_xiang = ANOMALY_LABEL 
            else:
                pred_normal_xiang = NORMAL_LABEL  

            if np.any((adv_sample < lower_bound_array) | (adv_sample > upper_bound_array)):
                pred_adv_xiang = ANOMALY_LABEL  
            else:
                pred_adv_xiang = NORMAL_LABEL   

            y_true_xiang.extend([NORMAL_LABEL, ANOMALY_LABEL])  
            y_pred_xiang.extend([pred_normal_xiang, pred_adv_xiang])

    y_true_xigema = np.array(y_true_xigema)
    y_pred_xigema = np.array(y_pred_xigema)
    y_true_xiang = np.array(y_true_xiang)
    y_pred_xiang = np.array(y_pred_xiang)

    print("\n" + "="*50)
    print("3 sigma:")
    print("="*50)
    acc_xigema = accuracy_score(y_true_xigema, y_pred_xigema)
    prec_xigema = precision_score(y_true_xigema, y_pred_xigema)  # P = TP / (TP + FP)
    rec_xigema = recall_score(y_true_xigema, y_pred_xigema)      # R = TP / (TP + FN)
    f1_xigema = f1_score(y_true_xigema, y_pred_xigema)

    print(f"Accuracy:  {acc_xigema:.4f}")
    print(f"Precision: {prec_xigema:.4f}")
    print(f"Recall:    {rec_xigema:.4f}")
    print(f"F1 Score:  {f1_xigema:.4f}")

    print("\n" + "="*50)
    print("Boxplot:")
    print("="*50)
    acc_xiang = accuracy_score(y_true_xiang, y_pred_xiang)
    prec_xiang = precision_score(y_true_xiang, y_pred_xiang)
    rec_xiang = recall_score(y_true_xiang, y_pred_xiang)
    f1_xiang = f1_score(y_true_xiang, y_pred_xiang)

    print(f"Accuracy:  {acc_xiang:.4f}")
    print(f"Precision: {prec_xiang:.4f}")
    print(f"Recall:    {rec_xiang:.4f}")
    print(f"F1 Score:  {f1_xiang:.4f}")

    return {
        '3sigma': {
            'acc': acc_xigema,
            'prec': prec_xigema,
            'rec': rec_xigema,
            'f1': f1_xigema
        },
        'boxplot': {
            'acc': acc_xiang,
            'prec': prec_xiang,
            'rec': rec_xiang,
            'f1': f1_xiang
        }
    }

In [None]:
for uap_loaded in uap_loaded_list:
    adversarial_attack_all(uap_loaded,weather_mean,lower_bound_array,upper_bound_array,weather_stds_array, weather_scaler, test_loader, device=device)