# Import and Device

In [2]:
import numpy as np
import torch
from torch import nn
from torch.utils.data import Dataset, DataLoader

from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, confusion_matrix

# If fdi_models.py is in the same directory as the notebook:
from fdi_models import BiLSTMFDIDetector, count_parameters

DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Using device:", DEVICE)

Using device: cuda


# Load Prepared Dataset

In [3]:
DATA_PATH = "prepared_data/smartgrid_fdi_seq10.npz"  

data = np.load(DATA_PATH, allow_pickle=True)
print("Keys:", data.files)

X_train = data["X_train"]
y_train = data["y_train"]
X_val   = data["X_val"]
y_val   = data["y_val"]
X_test  = data["X_test"]
y_test  = data["y_test"]

print("X_train:", X_train.shape, "y_train:", y_train.shape)
print("X_val:  ", X_val.shape,   "y_val:  ", y_val.shape)
print("X_test: ", X_test.shape,  "y_test: ", y_test.shape)

print("Train label counts:", np.bincount(y_train.astype(int)))
print("Val label counts:  ", np.bincount(y_val.astype(int)))
print("Test label counts: ", np.bincount(y_test.astype(int)))



Keys: ['X_train', 'y_train', 'X_val', 'y_val', 'X_test', 'y_test']
X_train: (4000, 10, 15) y_train: (4000,)
X_val:   (500, 10, 15) y_val:   (500,)
X_test:  (500, 10, 15) y_test:  (500,)
Train label counts: [2000 2000]
Val label counts:   [250 250]
Test label counts:  [250 250]


# Organizing Dataset to Dataloaders

In [4]:
class SequenceFDIDataset(Dataset):
    def __init__(self, X, y):
        self.X = torch.from_numpy(X).float()
        self.y = torch.from_numpy(y).float()

    def __len__(self):
        return len(self.X)

    def __getitem__(self, idx):
        return self.X[idx], self.y[idx]

BATCH_SIZE = 128

train_ds = SequenceFDIDataset(X_train, y_train)
val_ds   = SequenceFDIDataset(X_val,   y_val)
test_ds  = SequenceFDIDataset(X_test,  y_test)

train_loader = DataLoader(train_ds, batch_size=BATCH_SIZE, shuffle=True)
val_loader   = DataLoader(val_ds,   batch_size=BATCH_SIZE, shuffle=False)
test_loader  = DataLoader(test_ds,  batch_size=BATCH_SIZE, shuffle=False)

len(train_ds), len(val_ds), len(test_ds)




(4000, 500, 500)

# Instantiate BiLSTM from .py

In [5]:
input_dim = X_train.shape[2]

model = BiLSTMFDIDetector(
    input_dim=input_dim,
    hidden_dim=32,
    num_layers=1,
    bidirectional=True,
    dropout=0.15,
).to(DEVICE)

print(model)
print("Trainable parameters:", count_parameters(model))

criterion = nn.BCEWithLogitsLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)

BiLSTMFDIDetector(
  (lstm): LSTM(15, 32, batch_first=True, bidirectional=True)
  (classifier): Sequential(
    (0): Linear(in_features=64, out_features=64, bias=True)
    (1): ReLU()
    (2): Dropout(p=0.15, inplace=False)
    (3): Linear(in_features=64, out_features=1, bias=True)
  )
)
Trainable parameters: 16769


# Training Loop

In [6]:

def batch_accuracy_from_logits(logits: torch.Tensor, labels: torch.Tensor) -> float:
    probs = torch.sigmoid(logits)
    preds = (probs >= 0.5).float()
    return (preds == labels).float().mean().item()

def evaluate(model, data_loader, criterion):
    model.eval()
    total_loss = 0.0
    total_acc = 0.0
    total_n = 0

    with torch.no_grad():
        for X_batch, y_batch in data_loader:
            X_batch = X_batch.to(DEVICE)
            y_batch = y_batch.to(DEVICE)

            logits = model(X_batch)
            loss = criterion(logits, y_batch)

            n = y_batch.size(0)
            total_loss += loss.item() * n
            total_acc  += batch_accuracy_from_logits(logits, y_batch) * n
            total_n += n

    return total_loss / total_n, total_acc / total_n


NUM_EPOCHS = 30
best_val_loss = float("inf")

for epoch in range(1, NUM_EPOCHS + 1):
    model.train()
    running_loss = 0.0
    running_acc = 0.0
    total_n = 0

    for X_batch, y_batch in train_loader:
        X_batch = X_batch.to(DEVICE)
        y_batch = y_batch.to(DEVICE)

        optimizer.zero_grad()
        logits = model(X_batch)
        loss = criterion(logits, y_batch)
        loss.backward()
        optimizer.step()

        n = y_batch.size(0)
        running_loss += loss.item() * n
        running_acc  += batch_accuracy_from_logits(logits, y_batch) * n
        total_n += n

    train_loss = running_loss / total_n
    train_acc  = running_acc / total_n
    val_loss, val_acc = evaluate(model, val_loader, criterion)

    print(
        f"Epoch {epoch:02d} | "
        f"Train Loss: {train_loss:.4f}, Acc: {train_acc:.4f} | "
        f"Val Loss: {val_loss:.4f}, Acc: {val_acc:.4f}"
    )

    if val_loss < best_val_loss:
        best_val_loss = val_loss
        torch.save(model.state_dict(), "best_bilstm_fdi_detector.pt")
        print("  -> Saved new best model.")





Epoch 01 | Train Loss: 0.6925, Acc: 0.5225 | Val Loss: 0.6930, Acc: 0.5000
  -> Saved new best model.
Epoch 02 | Train Loss: 0.6924, Acc: 0.5102 | Val Loss: 0.6902, Acc: 0.5620
  -> Saved new best model.
Epoch 03 | Train Loss: 0.6843, Acc: 0.5765 | Val Loss: 0.6696, Acc: 0.6040
  -> Saved new best model.
Epoch 04 | Train Loss: 0.6450, Acc: 0.6275 | Val Loss: 0.6517, Acc: 0.6160
  -> Saved new best model.
Epoch 05 | Train Loss: 0.6184, Acc: 0.6603 | Val Loss: 0.6155, Acc: 0.6500
  -> Saved new best model.
Epoch 06 | Train Loss: 0.5877, Acc: 0.6935 | Val Loss: 0.6395, Acc: 0.6300
Epoch 07 | Train Loss: 0.5606, Acc: 0.7163 | Val Loss: 0.5481, Acc: 0.7360
  -> Saved new best model.
Epoch 08 | Train Loss: 0.5389, Acc: 0.7382 | Val Loss: 0.5573, Acc: 0.7240
Epoch 09 | Train Loss: 0.5278, Acc: 0.7390 | Val Loss: 0.5499, Acc: 0.7180
Epoch 10 | Train Loss: 0.5116, Acc: 0.7510 | Val Loss: 0.5478, Acc: 0.7200
  -> Saved new best model.
Epoch 11 | Train Loss: 0.5032, Acc: 0.7515 | Val Loss: 0.5366

# Test Evaluation

In [7]:
# Load best model for testing
model.load_state_dict(torch.load("best_bilstm_fdi_detector.pt", map_location=DEVICE))
model.to(DEVICE)
model.eval()

all_logits = []
all_labels = []

with torch.no_grad():
    for X_batch, y_batch in test_loader:
        X_batch = X_batch.to(DEVICE)
        y_batch = y_batch.to(DEVICE)

        logits = model(X_batch)
        all_logits.append(logits.cpu().numpy())
        all_labels.append(y_batch.cpu().numpy())

all_logits = np.concatenate(all_logits)
all_labels = np.concatenate(all_labels)

probs = 1 / (1 + np.exp(-all_logits))   # sigmoid
preds = (probs >= 0.5).astype(np.float32)

acc  = accuracy_score(all_labels, preds)
prec = precision_score(all_labels, preds)
rec  = recall_score(all_labels, preds)
f1   = f1_score(all_labels, preds)
cm   = confusion_matrix(all_labels, preds)

print("Test Accuracy :", acc)
print("Test Precision:", prec)
print("Test Recall   :", rec)
print("Test F1       :", f1)
print("Confusion matrix:\n", cm)

tn, fp, fn, tp = cm.ravel()
print(f"TN={tn}, FP={fp}, FN={fn}, TP={tp}")

Test Accuracy : 0.968
Test Precision: 0.9717741935483871
Test Recall   : 0.964
Test F1       : 0.9678714859437751
Confusion matrix:
 [[243   7]
 [  9 241]]
TN=243, FP=7, FN=9, TP=241


  model.load_state_dict(torch.load("best_bilstm_fdi_detector.pt", map_location=DEVICE))


In [8]:
# Reload trained model for FGSM
from fdi_models import BiLSTMFDIDetector

model_fgsm = BiLSTMFDIDetector(
    input_dim=input_dim,
    hidden_dim=32,
    num_layers=1,
    bidirectional=True,
    dropout=0.15,
).to(DEVICE)

model_fgsm.load_state_dict(
    torch.load("best_bilstm_fdi_detector.pt", map_location=DEVICE)
)

model_fgsm.eval()

print("Trained BiLSTM model loaded for FGSM")


Trained BiLSTM model loaded for FGSM


  torch.load("best_bilstm_fdi_detector.pt", map_location=DEVICE)


In [10]:
def fgsm_attack(model, X, y, epsilon):
    """
    FGSM attack for sequence-based FDIA
    X: (batch_size, seq_len, input_dim)
    y: (batch_size, 1)
    """
    X_adv = X.clone().detach().to(DEVICE)
    X_adv.requires_grad = True

    logits = model(X_adv)
    loss = criterion(logits, y)

    model.zero_grad()
    loss.backward()

    # FGSM perturbation
    X_adv = X_adv + epsilon * X_adv.grad.sign()

    return X_adv.detach()


In [12]:
# Parameters
EPSILON = 0.03        # attack strength
ATTACK_RATIO = 0.20  # 20% FDIA

# Make copies of training data
X_train_adv = X_train.copy()
y_train_adv = y_train.copy()

num_attack_samples = int(len(X_train) * ATTACK_RATIO)
attack_indices = np.random.choice(len(X_train), num_attack_samples, replace=False)

# IMPORTANT: enable backward for LSTM
model_fgsm.train()

for idx in attack_indices:
    X_seq = torch.from_numpy(X_train[idx:idx+1]).float().to(DEVICE)
    y_seq = torch.from_numpy(y_train[idx:idx+1]).float().to(DEVICE)

    X_adv_seq = fgsm_attack(model_fgsm, X_seq, y_seq, EPSILON)

    X_train_adv[idx] = X_adv_seq.cpu().numpy()
    y_train_adv[idx] = 1.0  # mark as attack

print(f"FGSM attacks injected into {num_attack_samples} training samples")


FGSM attacks injected into 800 training samples


In [13]:
print("Original training labels:", np.bincount(y_train.astype(int)))
print("FGSM training labels:    ", np.bincount(y_train_adv.astype(int)))

print("Original X_train shape:", X_train.shape)
print("FGSM X_train_adv shape:", X_train_adv.shape)


Original training labels: [2000 2000]
FGSM training labels:     [1605 2395]
Original X_train shape: (4000, 10, 15)
FGSM X_train_adv shape: (4000, 10, 15)


In [15]:
# Save FGSM-augmented dataset
np.savez(
    "prepared_data/smartgrid_fdi_seq10_fgsm.npz",
    X_train=X_train_adv,
    y_train=y_train_adv,
    X_val=X_val,
    y_val=y_val,
    X_test=X_test,
    y_test=y_test
)

print("FGSM dataset saved as smartgrid_fdi_seq10_fgsm.npz")


FGSM dataset saved as smartgrid_fdi_seq10_fgsm.npz


In [None]:
model_adv = BiLSTMFDIDetector(
    input_dim=input_dim,
    hidden_dim=32,
    num_layers=1,
    bidirectional=True,
    dropout=0.15,
).to(DEVICE)

criterion_adv = nn.BCEWithLogitsLoss()
optimizer_adv = torch.optim.Adam(model_adv.parameters(), lr=1e-3)

print("Adversarial training model initialized")
