# Model-based attacks VS FGSM / iFGSM

In [1]:
SEED = 123
import random, os
import numpy as np
import torch

random.seed(SEED)
os.environ["PYTHONHASHSEED"] = str(SEED)
np.random.seed(SEED)
torch.manual_seed(SEED)
torch.cuda.manual_seed_all(SEED)

torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False

### EarlyStopping utility

In [2]:
class EarlyStopping:
    def __init__(self, patience=5, min_delta=1e-4, mode='min'):
        assert mode in ('min', 'max')
        self.patience = patience
        self.min_delta = min_delta
        self.mode = mode
        self.best = None
        self.counter = 0
        self.should_stop = False

    def step(self, metric):
        if self.best is None:
            self.best = metric
            self.counter = 0
            return False
        improvement = (metric < self.best - self.min_delta) if self.mode == 'min' else (metric > self.best + self.min_delta)
        if improvement:
            self.best = metric
            self.counter = 0
        else:
            self.counter += 1
            if self.counter >= self.patience:
                self.should_stop = True
        return self.should_stop


### Preprocessing

In [3]:
import pandas as pd
import numpy as np
from pathlib import Path
import torch
from torch import nn
from torch.utils.data import Dataset, DataLoader
from sklearn.preprocessing import StandardScaler, LabelEncoder

In [4]:
TRAIN_PATH = Path('PowerCons_TRAIN.tsv')
TEST_PATH = Path('PowerCons_TEST.tsv')
BATCH_SIZE = 64
EPOCHS = 50
LR = 1e-3
device = 'cuda' if torch.cuda.is_available() else 'cpu'

In [5]:
def load_powercon(path: Path):
    df = pd.read_csv(path, sep='\t', header=None)
    y = df.iloc[:, 0].values
    X = df.iloc[:, 1:].values.astype(np.float32)
    scaler = StandardScaler()
    X = scaler.fit_transform(X)
    X = X.reshape(X.shape[0], X.shape[1], 1)
    le = LabelEncoder()
    y_encoded = le.fit_transform(y).astype(np.int64)
    return X, y_encoded, le.classes_

In [6]:
X_train, y_train, classes_ = load_powercon(TRAIN_PATH)
X_test, y_test, _ = load_powercon(TEST_PATH)
n_classes = len(classes_)
print('Train shape:', X_train.shape, 'Test shape:', X_test.shape, 'n_classes:', n_classes)

Train shape: (180, 144, 1) Test shape: (180, 144, 1) n_classes: 2


In [7]:
class PowerConDataset(Dataset):
    def __init__(self, X, y):
        self.X = torch.tensor(X, dtype=torch.float32)
        self.y = torch.tensor(y, dtype=torch.long)
    def __len__(self):
        return len(self.X)
    def __getitem__(self, idx):
        return self.X[idx], self.y[idx]

train_ds = PowerConDataset(X_train, y_train)
test_ds = PowerConDataset(X_test, y_test)
train_dl = DataLoader(train_ds, batch_size=BATCH_SIZE, shuffle=True)
test_dl = DataLoader(test_ds, batch_size=BATCH_SIZE)

In [8]:
def seed_worker(worker_id):
    worker_seed = SEED + worker_id
    np.random.seed(worker_seed)
    random.seed(worker_seed)

g = torch.Generator()
g.manual_seed(SEED)
train_dl = DataLoader(train_ds, batch_size=BATCH_SIZE, shuffle=True, num_workers=4, worker_init_fn=seed_worker, generator=g)



### LSTMClassifier

In [9]:
class LSTMClassifier(nn.Module):
    def __init__(self, n_classes, input_size=1, hidden_size=64, num_layers=2, dropout=0.2):
        super().__init__()
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers=num_layers, batch_first=True, dropout=dropout)
        self.fc = nn.Linear(hidden_size, n_classes)
    def forward(self, x):
        out, (h_n, _) = self.lstm(x)
        return self.fc(h_n[-1])

model = LSTMClassifier(n_classes).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=LR)

In [10]:
def accuracy(logits, target):
    return (logits.argmax(1) == target).float().mean().item()

In [11]:
stopper_cls = EarlyStopping(patience=4, mode="min")
for epoch in range(1, EPOCHS + 1):
    model.train()
    total_loss, correct = 0., 0
    for xb, yb in train_dl:
        xb, yb = xb.to(device), yb.to(device)
        optimizer.zero_grad()
        out = model(xb)
        loss = criterion(out, yb)
        loss.backward()
        optimizer.step()
        total_loss += loss.item() * xb.size(0)
        correct += (out.argmax(1) == yb).sum().item()
    train_acc = correct / len(train_ds)

    model.eval()
    val_loss, val_correct = 0., 0
    with torch.no_grad():
        for xb, yb in test_dl:
            xb, yb = xb.to(device), yb.to(device)
            out = model(xb)
            val_loss += criterion(out, yb).item() * xb.size(0)
            val_correct += (out.argmax(1) == yb).sum().item()
    val_loss /= len(test_ds)
    val_acc = val_correct / len(test_ds)
    print(f'Epoch {epoch:2d} | train_acc {train_acc:.3f} | val_loss {val_loss:.4f} | val_acc {val_acc:.3f}')

    if stopper_cls.step(val_loss):
        print(f'⏹ Early stopping classifier at epoch {epoch}')
        break




Epoch  1 | train_acc 0.500 | val_loss 0.6915 | val_acc 0.511
Epoch  2 | train_acc 0.544 | val_loss 0.6878 | val_acc 0.556
Epoch  3 | train_acc 0.583 | val_loss 0.6838 | val_acc 0.583
Epoch  4 | train_acc 0.650 | val_loss 0.6786 | val_acc 0.594
Epoch  5 | train_acc 0.628 | val_loss 0.6711 | val_acc 0.589
Epoch  6 | train_acc 0.633 | val_loss 0.6589 | val_acc 0.622
Epoch  7 | train_acc 0.672 | val_loss 0.6363 | val_acc 0.661
Epoch  8 | train_acc 0.711 | val_loss 0.5881 | val_acc 0.722
Epoch  9 | train_acc 0.828 | val_loss 0.5021 | val_acc 0.806
Epoch 10 | train_acc 0.850 | val_loss 0.3896 | val_acc 0.872
Epoch 11 | train_acc 0.889 | val_loss 0.3041 | val_acc 0.883
Epoch 12 | train_acc 0.856 | val_loss 0.2788 | val_acc 0.906
Epoch 13 | train_acc 0.878 | val_loss 0.2490 | val_acc 0.911
Epoch 14 | train_acc 0.894 | val_loss 0.2379 | val_acc 0.906
Epoch 15 | train_acc 0.894 | val_loss 0.2472 | val_acc 0.911
Epoch 16 | train_acc 0.883 | val_loss 0.2546 | val_acc 0.906
Epoch 17 | train_acc 0.9

### Surrogate adversarial model (attack_LSTM)

In [12]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class Activation(nn.Module):
    def __init__(self, kind='identity'):
        super().__init__()
        if kind == 'identity':
            self.act = nn.Identity()
        elif kind == 'relu':
            self.act = nn.ReLU()
        elif kind == 'tanh':
            self.act = nn.Tanh()
        else:
            raise ValueError(f'Unknown activation {kind}')
    def forward(self, x):
        return self.act(x)

class attack_LSTM(nn.Module):
    def __init__(self, hidden_dim=64, x_dim=1, activation_type='identity'):
        super().__init__()
        self.rnn_inp = nn.LSTM(x_dim, hidden_dim, num_layers=3, batch_first=True, dropout=0.4)
        self.act = Activation(activation_type)
        self.rnn_out = nn.LSTM(hidden_dim, hidden_dim, num_layers=3, batch_first=True, dropout=0.4)
        self.fc = nn.Linear(hidden_dim, x_dim)
    def forward(self, data):
        x, _ = self.rnn_inp(data)
        x = self.act(x)
        x, _ = self.rnn_out(x)
        return self.fc(x)


In [13]:
def accuracy_from_logits(logits, target):
    return (logits.argmax(1) == target).float().mean().item()

In [14]:
EPS = 0.23
def train_surrogate(surr, victim, loader, eps=EPS, epochs=50, lr=1e-4, alpha_l2=1e-3, device='cpu', patience=4):
    surr.to(device)
    victim.to(device).eval()
    opt = torch.optim.Adam(surr.parameters(), lr)
    stopper = EarlyStopping(patience=patience, mode="max")
    for ep in range(1, epochs+1):
        surr.train()
        run_vloss, run_acc, n = 0., 0., 0
        for x, y in loader:
            x, y = x.to(device), y.to(device)
            delta = eps * torch.tanh(surr(x))
            x_adv = torch.clamp(x + delta, -1, 1)
            logits = victim(x_adv)
            vloss = F.cross_entropy(logits, y)
            acc = (logits.argmax(1) == y).float().mean().item()
            reg = alpha_l2 * (delta**2).mean()
            loss = -(vloss - reg)
            opt.zero_grad()
            loss.backward()
            opt.step()
            run_vloss += vloss.item()*x.size(0)
            run_acc += acc * x.size(0)
            n += x.size(0)
        val_loss = run_vloss / n
        print(f'Epoch {ep:02d} | victim‑loss {val_loss:.4f} | acc {run_acc/n:.4f}')
        if stopper.step(val_loss):
            print(f'⏹ Early stopping at epoch {ep}')
            break
    torch.save(surr.state_dict(), 'surrogate_maxloss.pth')


In [15]:
surrogate_LSTM = attack_LSTM(hidden_dim=64, x_dim=1, activation_type='tanh').to(device)
for p in model.parameters():
    p.requires_grad_(False)
model.eval()
train_surrogate(surrogate_LSTM, model, train_dl, eps=EPS, epochs=90, lr=1e-4, alpha_l2=1e-3, device=device, patience=7)


Epoch 01 | victim‑loss 0.1883 | acc 0.9333
Epoch 02 | victim‑loss 0.1885 | acc 0.9333
Epoch 03 | victim‑loss 0.1889 | acc 0.9333
Epoch 04 | victim‑loss 0.1891 | acc 0.9278
Epoch 05 | victim‑loss 0.1894 | acc 0.9278
Epoch 06 | victim‑loss 0.1896 | acc 0.9278
Epoch 07 | victim‑loss 0.1899 | acc 0.9278
Epoch 08 | victim‑loss 0.1903 | acc 0.9278
Epoch 09 | victim‑loss 0.1906 | acc 0.9278
Epoch 10 | victim‑loss 0.1908 | acc 0.9278
Epoch 11 | victim‑loss 0.1912 | acc 0.9278
Epoch 12 | victim‑loss 0.1916 | acc 0.9278
Epoch 13 | victim‑loss 0.1924 | acc 0.9333
Epoch 14 | victim‑loss 0.1925 | acc 0.9333
Epoch 15 | victim‑loss 0.1931 | acc 0.9333
Epoch 16 | victim‑loss 0.1939 | acc 0.9333
Epoch 17 | victim‑loss 0.1945 | acc 0.9333
Epoch 18 | victim‑loss 0.1949 | acc 0.9333
Epoch 19 | victim‑loss 0.1960 | acc 0.9333
Epoch 20 | victim‑loss 0.1972 | acc 0.9333
Epoch 21 | victim‑loss 0.1985 | acc 0.9333
Epoch 22 | victim‑loss 0.2003 | acc 0.9333
Epoch 23 | victim‑loss 0.2018 | acc 0.9222
Epoch 24 | 

### Fooling rate

In [16]:
import torch
import torch.nn.functional as F

def preds_to_labels(logits: torch.Tensor) -> torch.Tensor:
    return logits.argmax(dim=1)

def fooling_rate(model, loader, attack, device='cpu'):
    model.eval()
    fooled, total = 0, 0
    with torch.no_grad():
        for x, y in loader:
            x, y = x.to(device), y.to(device)
            preds_orig = preds_to_labels(model(x))
    fooled, total = 0, 0
    for x, y in loader:
        x, y = x.to(device), y.to(device)
        preds_orig = preds_to_labels(model(x).detach())
        x_adv = attack(model, x, y)
        preds_adv = preds_to_labels(model(x_adv).detach())
        fooled += (preds_adv != preds_orig).sum().item()
        total += x.size(0)
    return fooled / total

### FGSM / iFGSM

In [17]:
class Attack:
    def __init__(self, eps: float, clamp=(-1, 1)):
        self.eps = eps
        self.clamp = clamp

class FGSMAttack(Attack):
    def __call__(self, model, x, y):
        x_req = x.clone().detach().requires_grad_(True)
        loss = F.cross_entropy(model(x_req), y)
        loss.backward()
        delta = self.eps * x_req.grad.sign()
        x_adv = torch.clamp(x + delta, *self.clamp)
        return x_adv.detach()

class iFGSMAttack(Attack):
    def __init__(self, eps, n_iter=10, alpha=None, clamp=(-1, 1), rand_init=True):
        super().__init__(eps, clamp)
        self.n_iter = n_iter
        self.alpha = alpha if alpha is not None else 1.25 * eps / n_iter
        self.rand_init = rand_init

    def _clip(self, x_adv, x_orig):
        delta = torch.clamp(x_adv - x_orig, min=-self.eps, max=self.eps)
        return torch.clamp(x_orig + delta, *self.clamp)

    def __call__(self, model, x, y):
        model.eval()
        if self.rand_init:
            x_adv = x + torch.empty_like(x).uniform_(-self.eps, self.eps)
            x_adv = self._clip(x_adv, x).detach()
        else:
            x_adv = x.clone().detach()

        for _ in range(self.n_iter):
            x_adv.requires_grad_(True)
            loss = F.cross_entropy(model(x_adv), y)
            model.zero_grad()
            loss.backward()
            grad_sign = x_adv.grad.sign()
            x_adv = x_adv + self.alpha * grad_sign
            x_adv = self._clip(x_adv, x).detach()
        return x_adv

class ModelBasedAttack(Attack):
    def __init__(self, surrogate, eps, clamp=(-1, 1)):
        super().__init__(eps, clamp)
        self.surr = surrogate.eval()

    @torch.no_grad()
    def __call__(self, model, x, y):
        delta = self.eps * torch.tanh(self.surr(x))
        return torch.clamp(x + delta, *self.clamp)

In [18]:
fgsm_attack = FGSMAttack(EPS)
ifgsm_attack = iFGSMAttack(eps=0.26, alpha=0.1, n_iter=70)
model_attack_lstm = ModelBasedAttack(surrogate_LSTM, EPS)

### Surrogate adversarial model (attack_resCNN)

In [19]:
!pip install tsai
!pip install torch=2.5.1

[31mERROR: Invalid requirement: 'torch=2.5.1': Expected end or semicolon (after name and no valid version specifier)
    torch=2.5.1
         ^
Hint: = is not a valid operator. Did you mean == ?[0m[31m
[0m

In [20]:
from tsai.models.all import ResCNN
import inspect

class ResCNNModel(nn.Module):
    def __init__(self, x_dim=1, output_dim=n_classes,
                 activation_type='identity',
                 rescnn_kwargs=None):
        super().__init__()
        self.x_dim = x_dim
        rescnn_kwargs = rescnn_kwargs or {}
        self.body = ResCNN(c_in=x_dim, c_out=output_dim, **rescnn_kwargs)
        self.fin = Activation(activation_type)
    def forward(self, x):
        if x.ndim == 3 and x.shape[1] != self.x_dim:
            x = x.transpose(1, 2)
        return self.fin(self.body(x))

In [21]:
class AttackCNN(nn.Module):
    def __init__(self, hidden_dim=128, x_dim=1, activation_type='tanh'):
        super().__init__()
        self.step_cnn = ResCNNModel(x_dim=x_dim, output_dim=hidden_dim, activation_type='identity')
        self.fc = nn.Linear(hidden_dim, x_dim)
        self.act = Activation(activation_type)
    def forward(self, x):
        B, L, C = x.shape
        x_flat = x.contiguous().view(B * L, 1, C)
        h = self.step_cnn(x_flat)
        h = h.view(B, L, -1)
        return self.fc(self.act(h))

surrogate_cnn = AttackCNN().to(device)

In [22]:
train_surrogate(surrogate_cnn, model, train_dl,
                eps=EPS, epochs=90, lr=1e-4, alpha_l2=1e-3, device=device, patience=4)

model_attack_cnn = ModelBasedAttack(surrogate_cnn, eps=EPS)

Epoch 01 | victim‑loss 0.2058 | acc 0.9222
Epoch 02 | victim‑loss 0.2302 | acc 0.9000
Epoch 03 | victim‑loss 0.2433 | acc 0.8889
Epoch 04 | victim‑loss 0.2616 | acc 0.8889
Epoch 05 | victim‑loss 0.2771 | acc 0.8889
Epoch 06 | victim‑loss 0.2910 | acc 0.8889
Epoch 07 | victim‑loss 0.3022 | acc 0.8889
Epoch 08 | victim‑loss 0.3071 | acc 0.8833
Epoch 09 | victim‑loss 0.3137 | acc 0.8722
Epoch 10 | victim‑loss 0.3178 | acc 0.8778
Epoch 11 | victim‑loss 0.3244 | acc 0.8667
Epoch 12 | victim‑loss 0.3262 | acc 0.8667
Epoch 13 | victim‑loss 0.3295 | acc 0.8611
Epoch 14 | victim‑loss 0.3314 | acc 0.8611
Epoch 15 | victim‑loss 0.3345 | acc 0.8667
Epoch 16 | victim‑loss 0.3329 | acc 0.8667
Epoch 17 | victim‑loss 0.3369 | acc 0.8667
Epoch 18 | victim‑loss 0.3377 | acc 0.8667
Epoch 19 | victim‑loss 0.3401 | acc 0.8611
Epoch 20 | victim‑loss 0.3383 | acc 0.8611
Epoch 21 | victim‑loss 0.3413 | acc 0.8611
Epoch 22 | victim‑loss 0.3419 | acc 0.8611
Epoch 23 | victim‑loss 0.3414 | acc 0.8611
Epoch 24 | 

### Final compare

In [23]:
test_loader = test_dl

attacks = {'FGSM': fgsm_attack, 'iFGSM': ifgsm_attack, 'LSTM': model_attack_lstm, 'CNN': model_attack_cnn}
for name, atk in attacks.items():
    rate = fooling_rate(model, test_loader, atk, device=device)
    print(f'Fooling rate {name:<6}: {rate:.3f}')

Fooling rate FGSM  : 0.133
Fooling rate iFGSM : 0.156
Fooling rate LSTM  : 0.144
Fooling rate CNN   : 0.144
