In [1]:
import torch
import torch.nn as nn
import numpy as np
import sys
import os
import random
import matplotlib.pyplot as plt

In [2]:
# check if GPU is available and set the device
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print('Using device:', device)

Using device: cuda


### Model + Helper Functions

In [5]:
os.chdir("src")
print("Current working directory set to:", os.getcwd())

Current working directory set to: /projects/bcnx/sroy6/One_pixel/one-pixel-attack-pytorch/src


In [6]:
# Please put the src directory in the path
src_path = os.path.abspath(os.path.join(os.getcwd(), 'src'))
if src_path not in sys.path:
    sys.path.append(src_path)

from utils import MIMONetDataset, ChannelScaler
from mimonet import MIMONet

In [7]:
# set working directory
working_dir = "/projects/bcnx/kazumak2/MIMONet/HeatExchanger"
data_dir = os.path.join(working_dir, "data")

### Load datasets

In [8]:
# trunk dataset
trunk_input = np.load(os.path.join(data_dir, "share/trunk.npz"))['trunk']

# min-max scaling [-1, 1]
trunk_input[:, 0] = 2 * (trunk_input[:, 0] - np.min(trunk_input[:, 0])) / (np.max(trunk_input[:, 0]) - np.min(trunk_input[:, 0])) - 1
trunk_input[:, 1] = 2 * (trunk_input[:, 1] - np.min(trunk_input[:, 1])) / (np.max(trunk_input[:, 1]) - np.min(trunk_input[:, 1])) - 1

In [9]:
# branch input dataset
branch = np.load(os.path.join(data_dir, "branch.npz"))

branch1 = branch['branch1']
branch2 = branch['branch2']

print("Branch1 shape:", branch1.shape)
print("Branch2 shape:", branch2.shape)

# split the dataset into training and testing sets
train_size = int(0.8 * len(branch1))
test_size = len(branch1) - train_size
train_branch1, test_branch1 = branch1[:train_size], branch1[train_size:]
train_branch2, test_branch2 = branch2[:train_size], branch2[train_size:]

Branch1 shape: (1546, 2)
Branch2 shape: (1546, 100)


In [10]:
# create a dictionary for the output channel names
# 0: turb-kinetic-energy
# 1: pressure
# 2: temperature
# 3: z-velocity
# 4: y-velocity
# 5: x-velocity
# 6: velocity-magnitude

dict_channel = {
    0: 'turb-kinetic-energy',
    1: 'pressure',
    2: 'temperature',
    3: 'z-velocity',
    4: 'y-velocity',
    5: 'x-velocity',
    6: 'velocity-magnitude'
}

# select the output channel
target_channel = [1, 3, 4, 5, 6]

# print the selected output channel names
# target_label is used to store the names of the selected output channels for further processing (e.g., plotting)
print("Selected output channels:")
target_label = []
for channel in target_channel:
    print(dict_channel[channel])
    target_label.append(dict_channel[channel])    

Selected output channels:
pressure
z-velocity
y-velocity
x-velocity
velocity-magnitude


In [11]:
# target dataset
target = np.load(os.path.join(data_dir, "target.npy"))

# extract the output channels
target = target[:, :, target_channel ]  # select the first 7 channels
print("Target dataset shape before split:", target.shape)


# split the target dataset into training and testing sets
train_target = target[:train_size]
test_target = target[train_size:]

print("Train target shape:", train_target.shape)
print("Test target shape:", test_target.shape)

Target dataset shape before split: (1546, 3977, 5)
Train target shape: (1236, 3977, 5)
Test target shape: (310, 3977, 5)


In [12]:
# get the mean and standard deviation of each channel
mean_branch1 = np.mean(train_branch1, axis=0)
std_branch1 = np.std(train_branch1, axis=0)

print("Mean of branch1:", mean_branch1)
print("Std of branch1:", std_branch1)

# (# train samples, 100)
mean_branch2 = np.mean(train_branch2)
std_branch2 = np.std(train_branch2)

print("Mean of branch2:", mean_branch2)
print("Std of branch2:", std_branch2)

Mean of branch1: [  4.51454429 292.42944177]
Std of branch1: [ 0.2615285  17.03323994]
Mean of branch2: 12587.66968713018
Std of branch2: 6302.709013835411


In [13]:
# normalize the branch data using the mean and std
train_branch_1 = (train_branch1 - mean_branch1) / std_branch1
test_branch_1 = (test_branch1 - mean_branch1) / std_branch1
train_branch_2 = (train_branch2 - mean_branch2) / std_branch2
test_branch_2 = (test_branch2 - mean_branch2) / std_branch2

# print the shapes of the normalized data
print("Shape of normalized train_branch1:", train_branch_1.shape)
print("Shape of normalized test_branch1:", test_branch_1.shape)
print("Shape of normalized train_branch2:", train_branch_2.shape)
print("Shape of normalized test_branch2:", test_branch_2.shape)

Shape of normalized train_branch1: (1236, 2)
Shape of normalized test_branch1: (310, 2)
Shape of normalized train_branch2: (1236, 100)
Shape of normalized test_branch2: (310, 100)


### Scaling the target data

In [14]:
import numpy as np
# scaling the target data
'''  
note: reverse the scaling for the target data
train_target = scaler.inverse_transform(train_target_scaled)
test_target = scaler.inverse_transform(test_target_scaled)
'''
scaler = ChannelScaler(method='minmax', feature_range=(-1, 1))
scaler.fit(train_target)
train_target_scaled = scaler.transform(train_target)
test_target_scaled = scaler.transform(test_target)

print("Shape of scaled train_target:", train_target_scaled.shape)
print("Shape of scaled test_target:", test_target_scaled.shape)

Shape of scaled train_target: (1236, 3977, 5)
Shape of scaled test_target: (310, 3977, 5)


In [15]:
# test dataset and dataloader
test_dataset = MIMONetDataset(
    [test_branch_1, test_branch_2],  # branch_data_list
    trunk_input,                     # trunk_data
    test_target_scaled               # target_data
)

test_loader = torch.utils.data.DataLoader(
    test_dataset,
    batch_size=1,  # set to 1 for testing
    shuffle=False,
    num_workers=0
)

## Model Instance & Load Parameters

In [16]:
# Architecture parameters
dim = 256
branch_input_dim1 = 2
branch_input_dim2 = 100
trunk_input_dim = 2

# Define the model arguments for orig_MIMONet
model_args = {
    'branch_arch_list': [
        [branch_input_dim1, 512, 512, 512, dim],
        [branch_input_dim2, 512, 512, 512, dim]
    ],
    'trunk_arch': [trunk_input_dim, 256, 256, 256, dim],
    'num_outputs': target.shape[-1] -1,  # number of output channels
    'activation_fn': nn.ReLU,
    'merge_type': 'mul',
}

### Load

In [17]:
model = MIMONet(**model_args).to(device)
# Load the trained model
model_path = os.path.join(working_dir, "checkpoints/custom_best_model_lambda_1E-4.pt")

if os.path.exists(model_path):
    model.load_state_dict(torch.load(model_path, map_location=device))
    print("Model loaded successfully from", model_path)
else:
    print("Model file not found at", model_path)
    sys.exit(1)
    
# Evaluate the model on the test set
model.eval()

# print the number of parameters in the model
num_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
print(f"Number of trainable parameters in the model: {num_params}")

Model loaded successfully from /projects/bcnx/kazumak2/MIMONet/HeatExchanger/checkpoints/custom_best_model_lambda_1E-4.pt
Number of trainable parameters in the model: 1762052


  model.load_state_dict(torch.load(model_path, map_location=device))


## How to predict?

In [18]:
# feed the test loader to the model (and save predictions and targets)
predictions = []
targets = []
with torch.no_grad():
    for batch in test_loader:
        branch_data, trunk_data, target_data = batch
        branch_data = [b.to(device) for b in branch_data]
        trunk_data = trunk_data.to(device)
        target_data = target_data.to(device)

        output = model(branch_data, trunk_data)
        predictions.append(output.cpu().numpy())
        targets.append(target_data.cpu().numpy())
        
# Convert predictions and targets to numpy arrays
predictions = np.concatenate(predictions, axis=0)
targets = np.concatenate(targets, axis=0)

# Reverse the scaling for the target data
predictions = scaler.inverse_transform(predictions)
targets = scaler.inverse_transform(targets)

(Evaluate)

In [19]:
# Compute L2 norm over grid points for each sample and channel
l2_pred = np.linalg.norm(predictions, axis=1)  # shape: (samples, channels)
l2_gt = np.linalg.norm(targets[..., :4], axis=1)  # shape: (samples, channels)

print("L2 norm of predictions shape:", l2_pred.shape)
print("L2 norm of targets shape:", l2_gt.shape)


# Compute L2 error over grid points for each sample and channel
l2_err = np.linalg.norm(predictions - targets[..., :4], axis=1)  # shape: (samples, channels)

# Compute relative error (avoid division by zero)
rel_err = l2_err / (l2_gt + 1e-8)  # shape: (samples, channels)

# Mean over samples for each channel
mean_rel_err_per_channel = np.mean(rel_err, axis=0)  # shape: (channels,)

print("Mean relative L2 error per channel:", mean_rel_err_per_channel * 100, "%")

# standard deviation of relative error per channel
std_rel_err_per_channel = np.std(rel_err, axis=0)  # shape: (channels,)

print("Standard deviation of relative L2 error per channel:", std_rel_err_per_channel)

L2 norm of predictions shape: (310, 4)
L2 norm of targets shape: (310, 4)
Mean relative L2 error per channel: [0.7996377  1.4521601  1.0151619  0.51779175] %
Standard deviation of relative L2 error per channel: [0.00014584 0.0002911  0.0002742  0.00038389]


In [None]:
import os
import sys
import numpy as np

import argparse

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import torch.backends.cudnn as cudnn

import torchvision
import torchvision.transforms as transforms
from models import *
from utils import progress_bar
from torch.autograd import Variable


In [20]:
import argparse
import numpy as np
import torch
import torch.nn.functional as F
from torch.utils.data import DataLoader, Dataset

# Local file copied from the original repo – keep the path if needed.
from differential_evolution import differential_evolution  # noqa: E402


def parse_args():
    p = argparse.ArgumentParser("K‑Feature Differential Evolution Attack (Regression)")
    p.add_argument("--model-path", required=True, help="Path to saved PyTorch model (state_dict or full model).")
    p.add_argument("--features", type=int, default=1, help="How many input indices may be perturbed.")
    p.add_argument("--val-min", type=float, default=-1.0, help="Minimum allowed value for a perturbed component.")
    p.add_argument("--val-max", type=float, default=1.0, help="Maximum allowed value for a perturbed component.")
    p.add_argument("--maxiter", type=int, default=100, help="Maximum DE iterations.")
    p.add_argument("--popsize", type=int, default=400, help="Population‑size multiplier in DE (actual pop = popsize * n_params).")
    p.add_argument("--samples", type=int, default=100, help="How many (x, y) pairs from the dataset to attack.")
    p.add_argument("--error-thr", type=float, default=0.30, help="Relative‑error threshold to declare success.")
    p.add_argument("--batch-size", type=int, default=1, help="Batch size for the data loader (set 1 for simplicity).")
    p.add_argument("--seed", type=int, default=0, help="Random seed for reproducibility.")
    p.add_argument("--device", default="cuda" if torch.cuda.is_available() else "cpu", help="Device to run on.")
    p.add_argument("--verbose", action="store_true", help="Print verbose logs from DE.")
    return p.parse_args()


# ──────────────────────────────────────────────────────────────────────────────
# Utilities
# ──────────────────────────────────────────────────────────────────────────────

def perturb_vector(xs: np.ndarray, base_vec: torch.Tensor) -> torch.Tensor:
    """Apply a batch of perturbations *xs* to *base_vec*.

    Parameters
    ----------
    xs : ndarray, shape (n_pop, 2*k) or (2*k,)
        Genome(s) produced by DE: (idx₀, val₀, …).
    base_vec : Tensor, shape (input_dim,)
        Unperturbed input.

    Returns
    -------
    Tensor, shape (n_pop, input_dim)
        Batch of perturbed inputs ready for the model.
    """
    if xs.ndim == 1:
        xs = np.expand_dims(xs, 0)
    n_pop, genome_len = xs.shape
    k = genome_len // 2

    # Duplicate the base vector n_pop times – stays on CPU for now.
    vecs = base_vec.repeat(n_pop, 1)

    for row, genome in enumerate(xs):
        for j in range(k):
            idx = int(round(genome[2 * j]))  # ensure integer index
            val = genome[2 * j + 1]
            idx_clamped = max(0, min(idx, base_vec.numel() - 1))
            vecs[row, idx_clamped] = val
    return vecs


def relative_l2_error(pred: torch.Tensor, target: torch.Tensor) -> torch.Tensor:
    """Compute ‖pred ‑ target‖₂ / ‖target‖₂ along dim=1."""
    diff_norm = torch.norm(pred - target, dim=1)
    tgt_norm = torch.norm(target, dim=1).clamp_min(1e-12)
    return diff_norm / tgt_norm


# ──────────────────────────────────────────────────────────────────────────────
# Attack core
# ──────────────────────────────────────────────────────────────────────────────

def de_objective(xs: np.ndarray, base_x: torch.Tensor, y_true: torch.Tensor, model: torch.nn.Module,
                 device: torch.device, maximize: bool = True) -> np.ndarray:
    """Vectorised objective for DE.  Returns *negative* error (to minimise)."""
    perturbed = perturb_vector(xs, base_x).to(device)
    with torch.no_grad():
        y_pred = model(perturbed)
    err = relative_l2_error(y_pred, y_true.to(device)).cpu().numpy()  # shape (n_pop,)
    return -err if maximize else err


def attack_single(base_x: torch.Tensor, y_true: torch.Tensor, model: torch.nn.Module,
                  features: int, error_thr: float, val_min: float, val_max: float,
                  maxiter: int, popsize: int, device: torch.device, verbose: bool = False):
    """Run DE attack on a single (x, y) pair.  Returns (success, best_genome)."""
    input_dim = base_x.numel()
    bounds = []
    for _ in range(features):
        bounds.append((0, input_dim - 1))      # index
        bounds.append((val_min, val_max))      # new value

    # Pop‑multiplier like original code (total pop = popsize * n_params)
    popmul = max(1, popsize)

    predict_fn = lambda xs: de_objective(xs, base_x, y_true, model, device, True)

    def callback_fn(genome, convergence):
        # Early stop if success achieved
        perturbed = perturb_vector(genome, base_x).to(device)
        with torch.no_grad():
            err = relative_l2_error(model(perturbed), y_true.to(device))[0].item()
        if verbose:
            print(f"  Current best error = {err:.3f}")
        return err > error_thr

    # Optional initial population: random indices + gaussian values
    n_params = 2 * features
    inits = np.zeros((popmul * n_params, n_params))
    for row in inits:
        for f in range(features):
            row[2 * f] = np.random.randint(0, input_dim)
            row[2 * f + 1] = np.random.uniform(val_min, val_max)

    result = differential_evolution(
        predict_fn,
        bounds,
        maxiter=maxiter,
        popsize=popmul,
        recombination=1.0,
        atol=-1,
        init=inits,
        polish=False,
        callback=callback_fn,
        disp=False,
    )

    # Evaluate final error
    final_vec = perturb_vector(result.x, base_x).to(device)
    with torch.no_grad():
        final_err = relative_l2_error(model(final_vec), y_true.to(device))[0].item()

    success = final_err > error_thr
    return success, result.x, final_err