# FGSM Evaluation on Patolli and Concatenated Feature Spaces  

This notebook investigates the impact of FGSM perturbations on model robustness, considering two input settings:  
- Patolli descriptors alone  
- A concatenated feature space combining CNN-extracted diffractogram features with Patolli descriptors  


In [None]:
import os
import datetime
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from tensorflow.keras.utils import plot_model
from tensorflow.keras.utils import model_to_dot
import tensorflow as tf
from sklearn.metrics import mean_squared_error as mse
from sklearn.metrics import mean_absolute_error as mae
from tensorflow.keras import Input, Model
from tensorflow.keras.layers import Dense, LayerNormalization, Activation, Dropout
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.utils import plot_model
from tensorflow import keras
from tensorflow.keras import layers
import matplotlib.pyplot as plt
import itertools
import time
import copy

2025-09-19 11:33:29.844527: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:485] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
2025-09-19 11:33:29.859518: E external/local_xla/xla/stream_executor/cuda/cuda_dnn.cc:8454] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
2025-09-19 11:33:29.863729: E external/local_xla/xla/stream_executor/cuda/cuda_blas.cc:1452] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
2025-09-19 11:33:29.873719: I tensorflow/core/platform/cpu_feature_guard.cc:210] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


Loading dataframes from the patolli-generated descriptors

In [2]:
#Patolli-based descriptors only
xtraval_patolli=np.load('data/patolli_generated_data/patolli_gen_desc_xtraval_2s_fgsm.npy')
ytraval_patolli=np.load('data/patolli_generated_data/patolli_gen_desc_ytraval_2s_fgsm.npy')
xtest_patolli=np.load('data/patolli_generated_data/patolli_gen_desc_xtest_2s_fgsm.npy')
ytest_patolli=np.load('data/patolli_generated_data/patolli_gen_desc_ytest_2s_fgsm.npy')
print(xtest_patolli.shape,ytest_patolli.shape)

(1626, 1, 120) (1626, 4)


Loading concatenated dataframes (patolli + ccn-extracted descriptors)

In [3]:
# Load concatenated sets
traval_concat_df=pd.read_csv("data/concatenated_data/traval_concat_df_2_fgsm.csv")
test_concat_df=pd.read_csv("data/concatenated_data/test_concat_df_2_fgsm.csv")

In [4]:
bandgap_cols = ["bg_gga", "bg_gga_opt", "bg_hse", "bg_hse_opt"]
# Extract targets from concatenated splits
yconcat_traval = traval_concat_df[bandgap_cols].to_numpy()
yconcat_test = test_concat_df[bandgap_cols].to_numpy()

#print(yconcat_traval.shape, yconcat_test.shape)

# Extract features only (drop cif + targets)
xconcat_traval = traval_concat_df.drop(columns=["cif"] + bandgap_cols).values
xconcat_test = test_concat_df.drop(columns=["cif"] + bandgap_cols).values

#print(xconcat_traval.shape, xconcat_test.shape)

In [5]:
def plot_graphs(losses, model_name, ytest, ptest):
    # Plot the loss
    plt.figure()
    plt.plot(losses, label='Training')
    plt.title('Model Loss')
    plt.ylabel('Loss')
    plt.xlabel('Epoch') 
    plt.legend()
    plt.savefig(f'{model_name}/loss_mse_plot.png')
    plt.close()
    # Plot predicted vs real values
    for item in range(ytest.shape[-1]):
        plt.figure(figsize=(12, 6))
        plt.subplot(1, 2, 1)
        plt.scatter(ytest[:,item],ptest[:,item], s=10, color='purple', alpha=0.5)
        plt.grid()
        plt.xlabel(r'actual band gap (eV)')
        plt.ylabel(r'predicted bandgap (eV)')
    #plt.plot(np.arange(-1,7), np.arange(-1,7), linewidth=1, color='black')
        plt.title('Actual vs Predicted Values')
    # Plot histograms of actual and predicted values
        ytest_plot = ytest[:,item]
        ptest_plot = ptest[:,item]
        plt.subplot(1, 2, 2)
        plt.hist(ytest_plot, bins=200, color='blue')
        plt.hist(ptest_plot, bins=200, color='red', alpha=0.5)
        plt.title('Histogram of Actual and Predicted Values')
        plt.xlabel(r'band gap (eV)')
        plt.ylabel('Frequency')
        plt.legend(['Actual', 'Predicted'], loc='upper right')
    
        plt.savefig(f'{model_name}/real_vs_predicted_{item}.png')
        plt.close()
    
    for item in range(ytest.shape[-1]):
        residuals = ytest[:,item] - ptest[:,item]
        plt.figure(figsize=(12, 6))
        plt.subplot(1, 2, 1)
        plt.hist(residuals, bins=50, color='blue', alpha=0.5)
        plt.title('Histogram of Actual and Predicted Values')
        plt.xlabel('E_actual-E_predicted (eV)')
        plt.ylabel('Frequency')
    
        plt.savefig(f'{model_name}/Distribution_{item}.png')
        plt.close()



In [6]:
def create_model(xt, yt, neurons, blocks, drop):
    # Input layer
    inputs = Input(shape=xt.shape[1:])

    # Initialize 'x' with the inputs
    x = inputs
    # Add 'num_blocks' blocks
    for _ in range(blocks):
        x = Dense(neurons)(x)
        x = LayerNormalization()(x)
        x = Activation('relu')(x)
        x = Dropout(drop)(x)
    # Output layer
    outputs = Dense(yt.shape[-1], activation='linear')(x)

    # Create the model
    model = Model(inputs=inputs, outputs=outputs)
    return model

In [7]:
def create_model2(xt, yt, neurons, blocks, drop):
    # Input layer
    inputs = Input(shape=xt.shape[1:])

    # Initialize 'x' with the inputs
    x = inputs
    x2= inputs
    x2 = Dense(neurons)(x2)
    x2 = LayerNormalization()(x2)
    x2 = layers.Activation('relu')(x2)
    x2 = Dropout(drop)(x2)
    x2 = Dense(neurons)(x2)
    x2 = LayerNormalization()(x2)
    x2 = layers.Activation('relu')(x2)
    x2 = Dropout(drop)(x2)
    # Add 'num_blocks' blocks
    for _ in range(blocks):
        x = Dense(neurons)(x)
        x = LayerNormalization()(x)
        x = Activation('relu')(x)
        x = Dropout(drop)(x)

    x=layers.add([x,x2])
    # Output layer
    outputs = Dense(yt.shape[-1], activation='linear')(x)

    # Create the model
    model = Model(inputs=inputs, outputs=outputs)
    return model

In [8]:
def adversary_fgsm_batch(model_fgsm, xconcat_traval, yconcat_traval, epsilon, loss_function, optim, batch_size, epochs):

    num_batches = int(np.ceil(len(xconcat_traval) / batch_size))
    all_losses = [] 
    epochs_loss_avg=[]
    for epoch in range(1, epochs + 1):
        print('Epoch', epoch)
        epoch_losses = []
        for batch in range(num_batches):
            start_idx = batch * batch_size
            end_idx = min((batch + 1) * batch_size, len(xconcat_traval))  # clamp to dataset size


            x_batch_tf = tf.convert_to_tensor(xconcat_traval[start_idx:end_idx], dtype=tf.float32)
            y_batch_tf = tf.convert_to_tensor(yconcat_traval[start_idx:end_idx], dtype=tf.float32)

            with tf.GradientTape() as tape:
                tape.watch(x_batch_tf)
                pred = model_fgsm(x_batch_tf,training=True)
                loss = loss_function(y_batch_tf, pred)

            gradients = tape.gradient(loss, x_batch_tf)
            signed_grad = tf.sign(gradients)
            
            x_batch_tf +=  epsilon * signed_grad

            with tf.GradientTape() as tape:
                pred_perturbed = model_fgsm(x_batch_tf, training=True)
                loss_perturbed= loss_function(y_batch_tf, pred_perturbed)

            gradients_fgsm = tape.gradient(loss_perturbed, model_fgsm.trainable_variables)
            optim.apply_gradients(zip(gradients_fgsm, model_fgsm.trainable_variables))

            epoch_losses.append(loss_perturbed.numpy())

        all_losses.append(epoch_losses)
        df_loss = pd.DataFrame(np.asarray(all_losses).reshape((-1,1)))
        df_loss.round(5).to_csv('losses.csv')
        epoch_loss_avg = tf.reduce_mean(epoch_losses).numpy()
        print('Average Epoch Loss (FGSM Model):', epoch_loss_avg)
        epochs_loss_avg.append(epoch_loss_avg)
    epochs_loss_avg= pd.Series(epochs_loss_avg, index=range(1, len(epochs_loss_avg) + 1))

    return model_fgsm,epochs_loss_avg



In [9]:
class BandgapModel:
    def __init__(self, neurons=64, blocks=3, drop=0.2, lr=0.001, version=1):
        self.neurons = neurons
        self.blocks = blocks
        self.drop = drop
        self.lr = lr
        self.version = version
        self.model = None
        self.history = None
        self.adv_history = None

    def build(self, xtraval, ytraval):
        if self.version == 1:
            self.model = create_model(xtraval, ytraval, self.neurons, self.blocks, self.drop)
        else:
            self.model = create_model2(xtraval, ytraval, self.neurons, self.blocks, self.drop)
        return self.model

    def fit(self, xtraval, ytraval, validation_split=0.11, epoch=1, batch=1, verbose=2,
            adversarial=False, epsilon=0.01):
        if self.model is None:
            raise ValueError("Call build() before fit().")

        if adversarial:
            optim = tf.keras.optimizers.Adam(self.lr)
            loss_function = tf.keras.losses.LogCosh()
            self.model, self.adv_history = adversary_fgsm_batch(
                self.model, xtraval, ytraval, epsilon,
                loss_function, optim, batch_size=batch, epochs=epoch
            )
            return self.adv_history
        else:
            self.history = self.model.fit(
                xtraval, ytraval,
                validation_split=validation_split,
                batch_size=batch,
                epochs=epoch,
                verbose=verbose
            )
            return self.history


    def evaluate_model(self,xtraval, ytraval, xtest, ytest, model_name,losses):

        #history= model.fit(xtraval, ytraval, validation_split=0.11, epochs=epoch, batch_size=batch, verbose=2)

        ptraval = self.model.predict(xtraval)
        ptest = self.model.predict(xtest)

        np.save(f'{model_name}/{model_name}_ptrav', ptraval)
        np.save(f'{model_name}/{model_name}_ptest', ptest)

        msetraval = mse(ytraval, ptraval)
        msetest = mse(ytest, ptest)
        
        maetraval = mae(ytraval, ptraval)
        maetest = mae(ytest, ptest)

        # Calculate metrics
        mae_nn = abs(ytest - ptest).mean(axis=0)
        rmse_nn = ((ytest - ptest)**2).mean(axis=0)**0.5

        with open(f'{model_name}/mse_mae.txt','a') as f:
            f.write(model_name)
            f.write(',')
            f.write("%.5f" % msetraval)
            f.write(',')
            f.write("%.5f" % msetest)
            f.write(',')
            f.write("%.5f" % maetraval)
            f.write(',')
            f.write("%.5f" % maetest)
            f.write('\n\n')
            f.write(' mae:')
            f.write(str(mae_nn))
            f.write('\n')
            f.write(' rmse:')
            f.write(str(rmse_nn))
            f.close()
        
        self.model.save(f'{model_name}/{model_name}.keras')
        losses.to_csv(f'{model_name}/losses.csv')
        # Plot the model
        plot_model(self.model, to_file=f'{model_name}/model.png', show_shapes=True, show_layer_names=False)
        # Call the function to plot the graphs
        plot_graphs(losses, model_name, ytest, ptest)



In [10]:
# Hyperparameter tuning ranges
lrs = [1e-4]
batch_sizes = [32]
epochs_list = [1]
blocks_list = [1]
drops = np.arange(0.0, 0.5, 0.1)
neurons_list = range(2800, 2801, 280)
epsilons = np.arange(0.115, 0.131, 0.015)  # epsilon sweep
epsilons = np.around(epsilons, 3)

# Full hyperparameter sweep
for block, drop, batch, neurons, lr, epoch, epsilon in itertools.product(
        blocks_list, drops, batch_sizes, neurons_list, lrs, epochs_list, epsilons):

    # Initialize model
    bandgap = BandgapModel(
        neurons=neurons,
        blocks=block,
        drop=drop,
        lr=lr,
        version=1
    )

    # Build the model
    bandgap.build(xconcat_traval, yconcat_traval)

    # Train the model with adversarial FGSM
    history = bandgap.fit(
        xtraval=xconcat_traval,
        ytraval=yconcat_traval,
        validation_split=0.1,
        epoch=epoch,
        batch=batch,
        verbose=2,
        adversarial=True,
        epsilon=epsilon  # pass current epsilon
    )

    # Create unique folder name including epsilon
    model_name = datetime.datetime.now().strftime(
        f"%B_%d_%H%M%S_%Y_patolli_lr{lr}_neurons{neurons}_drop{drop}_batch{batch}_blocks{block}_epoch{epoch}_eps{epsilon}"
    )
    os.makedirs(model_name, exist_ok=True)

    # Evaluate model and save metrics, predictions, graphs, etc.
    bandgap.evaluate_model(
        xtraval=xconcat_traval,
        ytraval=yconcat_traval,
        xtest=xconcat_test,
        ytest=yconcat_test,
        model_name=model_name,
        losses=history
    )


I0000 00:00:1758303212.093182    7657 cuda_executor.cc:1001] could not open file to read NUMA node: /sys/bus/pci/devices/0000:0a:00.0/numa_node
Your kernel may have been built without NUMA support.
I0000 00:00:1758303212.138664    7657 cuda_executor.cc:1001] could not open file to read NUMA node: /sys/bus/pci/devices/0000:0a:00.0/numa_node
Your kernel may have been built without NUMA support.
I0000 00:00:1758303212.138736    7657 cuda_executor.cc:1001] could not open file to read NUMA node: /sys/bus/pci/devices/0000:0a:00.0/numa_node
Your kernel may have been built without NUMA support.
I0000 00:00:1758303212.143823    7657 cuda_executor.cc:1001] could not open file to read NUMA node: /sys/bus/pci/devices/0000:0a:00.0/numa_node
Your kernel may have been built without NUMA support.
I0000 00:00:1758303212.143928    7657 cuda_executor.cc:1001] could not open file to read NUMA node: /sys/bus/pci/devices/0000:0a:00.0/numa_node
Your kernel may have been built without NUMA support.
I0000 00:0

Epoch 1
Average Epoch Loss (FGSM Model): 0.52786475


I0000 00:00:1758303223.311674    7746 service.cc:146] XLA service 0x7f3ac8005040 initialized for platform CUDA (this does not guarantee that XLA will be used). Devices:
I0000 00:00:1758303223.311724    7746 service.cc:154]   StreamExecutor device (0): NVIDIA GeForce RTX 2080, Compute Capability 7.5
2025-09-19 11:33:43.317191: I tensorflow/compiler/mlir/tensorflow/utils/dump_mlir_util.cc:268] disabling MLIR crash reproducer, set env var `MLIR_CRASH_REPRODUCER_DIRECTORY` to enable.
2025-09-19 11:33:43.328726: I external/local_xla/xla/stream_executor/cuda/cuda_dnn.cc:531] Loaded cuDNN version 8907
I0000 00:00:1758303223.508441    7746 device_compiler.h:188] Compiled cluster using XLA!  This line is logged at most once for the lifetime of the process.


[1m170/170[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 2ms/step
[1m42/42[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 5ms/step
Epoch 1
Average Epoch Loss (FGSM Model): 0.5598509
[1m170/170[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 2ms/step
[1m42/42[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 4ms/step
Epoch 1
Average Epoch Loss (FGSM Model): 0.51569414
[1m170/170[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 2ms/step
[1m42/42[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 3ms/step
Epoch 1
Average Epoch Loss (FGSM Model): 0.537914
[1m170/170[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 2ms/step
[1m42/42[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 3ms/step
Epoch 1
Average Epoch Loss (FGSM Model): 0.5265959
[1m170/170[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 2ms/step
[1m42/42[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 3ms/step
Epoch 1
Average Epoch Loss (FGSM Model): 0.53188044
[1m

In [11]:
#To check 
'''
def feature_importance_based_attack(x, model, epsilon=0.1):
  """
  Performs a feature importance-based adversarial attack on tabular data.

  Args:
      x: A 2D numpy array representing the tabular data (samples as rows, features as columns).
      model: A trained TensorFlow model for classification on tabular data.
      epsilon: The maximum magnitude of perturbation applied to each feature.
      target_class: The target class to misclassify the sample to (optional).

  Returns:
      A 2D numpy array representing the adversarially perturbed data.
  """

  # Get the feature importance scores
  with tf.GradientTape() as tape:
    tape.watch(x)
    logits = model(x)
    loss = tf.keras.losses.sparse_categorical_crossentropy(tf.argmax(logits, axis=1), logits)
  gradients = tape.gradient(loss, x)
  importance_scores = abs(tf.reduce_mean(gradients, axis=0))  # Average gradient magnitude per feature

  # Modify features with high importance
  modified_x = x.copy()
  num_features = x.shape[1]
  top_features = tf.math.top_k(importance_scores, k=int(0.2 * num_features))  # Modify top 20% features

  for i in range(top_features.indices.shape[0]):
    feature_index = top_features.indices[i].numpy()
    sign = np.sign(gradients[0, feature_index])  # Use gradient sign for direction
    modified_x[:, feature_index] += sign * epsilon

  # Clip modifications within epsilon range
  modified_x = tf.clip_by_value(modified_x, x - epsilon, x + epsilon)

  return modified_x

'''


'\ndef feature_importance_based_attack(x, model, epsilon=0.1):\n  """\n  Performs a feature importance-based adversarial attack on tabular data.\n\n  Args:\n      x: A 2D numpy array representing the tabular data (samples as rows, features as columns).\n      model: A trained TensorFlow model for classification on tabular data.\n      epsilon: The maximum magnitude of perturbation applied to each feature.\n      target_class: The target class to misclassify the sample to (optional).\n\n  Returns:\n      A 2D numpy array representing the adversarially perturbed data.\n  """\n\n  # Get the feature importance scores\n  with tf.GradientTape() as tape:\n    tape.watch(x)\n    logits = model(x)\n    loss = tf.keras.losses.sparse_categorical_crossentropy(tf.argmax(logits, axis=1), logits)\n  gradients = tape.gradient(loss, x)\n  importance_scores = abs(tf.reduce_mean(gradients, axis=0))  # Average gradient magnitude per feature\n\n  # Modify features with high importance\n  modified_x = x.cop