## Demo 3: HKR classifier on MNIST dataset

[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/deel-ai/deel-lip/blob/master/docs/notebooks/demo3.ipynb)

This notebook will demonstrate learning a binary task on the MNIST0-8 dataset.


In [1]:
import os
os.environ["KERAS_BACKEND"] = "tensorflow"

In [2]:
# pip install git+https://github.com/deel-ai/deel-lip.git@keras3 -qqq

In [2]:
import keras
import keras.ops as K
from keras.layers import Input, Flatten, Dense
from keras.optimizers import Adam
from keras.metrics import BinaryAccuracy

# from keras.models import Sequential
from deel.lip.model import Sequential

from deel.lip.layers import (
    SpectralDense,
    SpectralConv2D,
    ScaledL2NormPooling2D,
    FrobeniusDense,
)
from deel.lip.activations import GroupSort, GroupSort2
from deel.lip.losses import HKR, KR, HingeMargin, MulticlassHKR, MulticlassKR

import numpy as np

2025-04-16 10:28:38.382025: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:477] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
E0000 00:00:1744792118.628553    7511 cuda_dnn.cc:8310] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
E0000 00:00:1744792118.699599    7511 cuda_blas.cc:1418] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
2025-04-16 10:28:39.374463: I tensorflow/core/platform/cpu_feature_guard.cc:210] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


### Data preparation

For this task we will select two classes: 0 and 8.

In [3]:
from keras.datasets import mnist
from keras.utils import to_categorical

# first we select the two classes
selected_classes = [0, 8]  # must be two classes as we perform binary classification


def prepare_data(x, y, class_a=0, class_b=8):
    """
    This function convert the MNIST data to make it suitable for our binary classification
    setup.
    """
    # select items from the two selected classes
    mask = (y == class_a) + (
        y == class_b
    )  # mask to select only items from class_a or class_b
    x = x[mask]
    y = y[mask]
    x = x.astype("float32")
    y = y.astype("float32")
    # convert from range int[0,255] to float32[-1,1]
    x /= 255
    x = x.reshape((-1, 28, 28, 1))
    # change label to binary classification {-1,1}
    y[y == class_a] = 1.0
    y[y == class_b] = 0.0
    print(x.shape, y.shape)
    return x, y.reshape((-1, 1))


# now we load the dataset
(x_train, y_train_ord), (x_test, y_test_ord) = mnist.load_data()

# prepare the data
x_train, y_train = prepare_data(
    x_train, y_train_ord, selected_classes[0], selected_classes[1]
)
x_test, y_test = prepare_data(
    x_test, y_test_ord, selected_classes[0], selected_classes[1]
)
y_test = to_categorical(y_test)
y_train = to_categorical(y_train)

(11774, 28, 28, 1) (11774,)
(1954, 28, 28, 1) (1954,)


In [4]:
x_train = np.transpose(x_train,(0,3,1,2))
x_test = np.transpose(x_test,(0,3,1,2))

In [None]:
import os
# Optionnel : Spécifier le backend Keras (si nécessaire)
# os.environ["KERAS_BACKEND"] = "tensorflow" # ou "jax" ou "torch"

import keras
from keras import layers
import keras.ops # API Keras 3 pour opérations backend-agnostiques
import numpy as np
import tensorflow as tf # Nécessaire pour tf.GradientTape

# --- Étape 1 : Définir ou Charger votre Modèle Original (avec 1 logit) ---
# REMPLACEZ CECI par la définition ou le chargement de VOTRE modèle original.
def create_original_single_logit_model(input_shape):
    inputs = keras.Input(shape=input_shape)
    x = layers.Flatten()(inputs)
    x = layers.Dense(64, activation="relu")(x)
    x = layers.Dense(32, activation="relu")(x)
    single_logit_output = layers.Dense(1, name="single_logit")(x)
    model = keras.Model(inputs=inputs, outputs=single_logit_output, name="original_model")
    return model

# --- Étape 2 : Fonction pour convertir le modèle à 2 logits [-z, z] ---
def convert_to_reflected_two_logits_model(original_model):
    """
    Prend un modèle Keras qui sort un seul logit (z) et retourne un nouveau
    modèle qui sort deux logits [-z, z].
    """
    inputs = original_model.input
    single_logit_output = original_model.output # Sortie z, forme (batch, 1)

    # Calculer -z en utilisant Keras Ops
    neg_logit = keras.ops.negative(single_logit_output) # Forme (batch, 1)

    # Concaténer [-z, z] en utilisant une couche Keras ou Keras Ops
    # Utilisons keras.layers.Concatenate pour rester dans le style API Fonctionnelle
    two_logits_output = layers.Concatenate(axis=-1, name="reflected_two_logits")([neg_logit, single_logit_output]) # Forme (batch, 2)

    # Créer le nouveau modèle
    new_model = keras.Model(inputs=inputs, outputs=two_logits_output, name="reflected_two_logits_model")
    return new_model

# --- Étape 3 : Implémentation de l'attaque FGSM (en utilisant Keras Ops autant que possible) ---

# Perte pour le modèle à 2 logits. La classe prédite sera argmax(logits).
# SparseCategoricalCrossentropy convient car les labels sont 0 ou 1.
# Important: from_logits=True
loss_object = keras.losses.SparseCategoricalCrossentropy(from_logits=True)

def fgsm_attack_keras_ops(model_two_logits, image, label, epsilon):
    """
    Génère une image adverse via FGSM, en utilisant Keras Ops lorsque possible.
    Nécessite tf.GradientTape pour le calcul du gradient sur l'entrée.

    Args:
        model_two_logits: Le modèle Keras modifié avec 2 sorties logits [-z, z].
        image: L'image d'entrée (tenseur, ex: (1, H, W, C)).
        label: Le vrai label (entier, 0 ou 1).
        epsilon: Le pas de l'attaque FGSM.

    Returns:
        Un tenseur (via Keras Ops) représentant l'image adverse.
    """
    # Convertir l'image en tenseur (Keras Ops peut gérer np.array, tf.Tensor, etc.)
    image_tensor = keras.ops.convert_to_tensor(image, dtype="float32")
    label_tensor = keras.ops.convert_to_tensor([label]) # Format Batch pour la perte

    # === Début de la section dépendante du backend (TensorFlow ici) ===
    with tf.GradientTape() as tape:
        tape.watch(image_tensor) # Surveiller l'image pour le gradient
        prediction = model_two_logits(image_tensor, training=False) # Obtenir les logits [-z, z]
        loss = loss_object(label_tensor, prediction) # Calculer la perte
    # Obtenir le gradient (via le backend)
    gradient = tape.gradient(loss, image_tensor)
    # === Fin de la section dépendante du backend ===

    # Utiliser Keras Ops pour la suite
    signed_grad = keras.ops.sign(gradient)
    adversarial_image = image_tensor + epsilon * signed_grad
    # Clipper l'image (adaptez les bornes si nécessaire)
    adversarial_image = keras.ops.clip(adversarial_image, 0, 1)

    return adversarial_image # Ceci est un tenseur du backend (ex: tf.Tensor)

# --- Exemple d'utilisation ---

# Définir la forme d'entrée
input_shape = (28, 28, 1)

# 1. Créer/Charger le modèle original
original_model = create_original_single_logit_model(input_shape)
print("Résumé du modèle original (1 logit) :")
original_model.summary()
print("-" * 30)

# 2. Convertir en modèle à 2 logits [-z, z]
reflected_two_logits_model = convert_to_reflected_two_logits_model(original_model)
print("Résumé du modèle modifié (2 logits [-z, z]) :")
reflected_two_logits_model.summary()
print("-" * 30)

# 3. Générer une image et un label factices
dummy_image = np.random.rand(1, *input_shape).astype(np.float32)
dummy_label = np.random.randint(0, 2)

# Afficher la prédiction du modèle original (pour comparaison)
original_logit = original_model.predict(dummy_image)
original_prob = tf.nn.sigmoid(original_logit).numpy() # Probabilité via Sigmoid
print(f"Logit original z: {original_logit[0, 0]:.4f}")
print(f"Probabilité originale P(classe=1): {original_prob[0, 0]:.4f}")
print("-" * 30)

# Afficher la prédiction du nouveau modèle
reflected_logits = reflected_two_logits_model.predict(dummy_image)
predicted_class = keras.ops.argmax(reflected_logits, axis=-1) # argmax([-z, z])
print(f"Logits réfléchis [-z, z]: [{reflected_logits[0, 0]:.4f}, {reflected_logits[0, 1]:.4f}]")
# Utilisation de keras.ops.convert_to_numpy pour obtenir la valeur
print(f"Classe prédite par argmax([-z, z]): {keras.ops.convert_to_numpy(predicted_class)[0]}")
print("-" * 30)

# 4. Paramètres de l'attaque
epsilon = 0.05

# 5. Générer l'image adverse
adversarial_example = fgsm_attack_keras_ops(reflected_two_logits_model, dummy_image, dummy_label, epsilon)
# Keras ops renvoie un tenseur, convertissons-le en numpy pour l'affichage si nécessaire
adversarial_example_np = keras.ops.convert_to_numpy(adversarial_example)

print(f"Image originale (forme) : {dummy_image.shape}")
print(f"Label original : {dummy_label}")
print(f"Image adverse générée (forme) : {adversarial_example_np.shape}")

# 6. Vérifier la prédiction sur l'image adverse
adversarial_logits = reflected_two_logits_model.predict(adversarial_example)
adversarial_predicted_class = keras.ops.argmax(adversarial_logits, axis=-1)
print(f"Logits sur image adverse [-z, z]: [{adversarial_logits[0, 0]:.4f}, {adversarial_logits[0, 1]:.4f}]")
print(f"Classe prédite sur image adverse: {keras.ops.convert_to_numpy(adversarial_predicted_class)[0]}")

### Build 1-Lipschitz Model

Let's first explicit the paremeters of this experiment


Now we can build the network. Here the experiment is done with a MLP. But `deel-lip`
also provide state of the art 1-Lipschitz convolutions.


In [5]:
keras.utils.clear_session()
# helper function to build the 1-lipschitz MLP
model = Sequential(
    layers=[
        Input((1, 28, 28)),
        Flatten(),
        SpectralDense(32, GroupSort2(), use_bias=True),
        SpectralDense(16, GroupSort2(), use_bias=True),
        SpectralDense(2, activation=None, use_bias=False),
    ],
    name="lipModel",
)
model.summary()

I0000 00:00:1744792213.110150    7511 gpu_device.cc:2022] Created device /job:localhost/replica:0/task:0/device:GPU:0 with 20710 MB memory:  -> device: 0, name: NVIDIA A10G, pci bus id: 0000:00:1e.0, compute capability: 8.6


In [9]:
# training parameters
epochs = 100
batch_size = 128

# network parameters
activation = GroupSort2  # ReLU, MaxMin, GroupSort2

In [10]:
model.compile(
#     loss=MulticlassHKR(
#         alpha=10.0, min_margin=1.0
#     ),  # HKR stands for the hinge regularized KR loss
#     metrics=[
#         # KR,  # shows the KR term of the loss
#         HingeMargin(min_margin=1.0),  # shows the hinge term of the loss
#         # BinaryAccuracy(threshold=0),  # shows the classification accuracy
#         "accuracy"
#     ],
#     optimizer=Adam(learning_rate=0.001),
# )
# decreasing alpha and increasing min_margin improve robustness (at the cost of accuracy)
# note also in the case of lipschitz networks, more robustness require more parameters.
loss=MulticlassHKR(alpha=100, min_margin=0.25),
optimizer=Adam(1e-4),
metrics=["accuracy", MulticlassKR()],)

### Learn classification on MNIST

Now the model is build, we can learn the task.


In [11]:
model.fit(
    x=x_train,
    y=y_train,
    validation_data=(x_test, y_test),
    batch_size=batch_size,
    shuffle=True,
    epochs=epochs,
    verbose=1,
)

Epoch 1/100


I0000 00:00:1744205313.523012    5203 service.cc:148] XLA service 0x55a29cea5840 initialized for platform CUDA (this does not guarantee that XLA will be used). Devices:
I0000 00:00:1744205313.523059    5203 service.cc:156]   StreamExecutor device (0): NVIDIA A10G, Compute Capability 8.6
2025-04-09 15:28:33.608933: I tensorflow/compiler/mlir/tensorflow/utils/dump_mlir_util.cc:268] disabling MLIR crash reproducer, set env var `MLIR_CRASH_REPRODUCER_DIRECTORY` to enable.
I0000 00:00:1744205313.897130    5203 cuda_dnn.cc:529] Loaded cuDNN version 90300
2025-04-09 15:28:34.054393: W external/local_xla/xla/service/gpu/nvptx_compiler.cc:930] The NVIDIA driver's CUDA version is 12.4 which is older than the PTX compiler version 12.5.82. Because the driver is older than the PTX compiler version, XLA is disabling parallel compilation, which may slow down compilation. You should update your NVIDIA driver or use the NVIDIA-provided CUDA forward compatibility packages.





[1m44/92[0m [32m━━━━━━━━━[0m[37m━━━━━━━━━━━[0m [1m0s[0m 4ms/step - MulticlassKR: 0.1574 - accuracy: 0.7114 - loss: 14.8275

I0000 00:00:1744205319.603158    5203 device_compiler.h:188] Compiled cluster using XLA!  This line is logged at most once for the lifetime of the process.


[1m92/92[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m12s[0m 42ms/step - MulticlassKR: 0.3092 - accuracy: 0.7963 - loss: 10.1911 - val_MulticlassKR: 0.8441 - val_accuracy: 0.9775 - val_loss: 0.3606
Epoch 2/100
[1m92/92[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 4ms/step - MulticlassKR: 0.9056 - accuracy: 0.9730 - loss: 0.2494 - val_MulticlassKR: 1.0879 - val_accuracy: 0.9872 - val_loss: -0.4126
Epoch 3/100
[1m92/92[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 4ms/step - MulticlassKR: 1.1289 - accuracy: 0.9838 - loss: -0.4260 - val_MulticlassKR: 1.2878 - val_accuracy: 0.9887 - val_loss: -0.7447
Epoch 4/100
[1m92/92[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 4ms/step - MulticlassKR: 1.3308 - accuracy: 0.9863 - loss: -0.7142 - val_MulticlassKR: 1.4732 - val_accuracy: 0.9903 - val_loss: -1.0057
Epoch 5/100
[1m92/92[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 4ms/step - MulticlassKR: 1.5083 - accuracy: 0.9878 - loss: -1.0211 - val_MulticlassK

<keras.src.callbacks.history.History at 0x7f71b34b1d50>

As we can see, the model reaches a very decent accuracy on this task.


In [12]:
vanilla_model = model.vanilla_export()

In [13]:
vanilla_model.summary()

In [14]:
model.save('/home/aws_install/robustess_project/lip_models/demo3_FC_MNIST08_channelfirst_False_disj_Neurons.keras')

In [15]:
vanilla_model.save("/home/aws_install/robustess_project/lip_models/demo3_FC_vanilla_MNIST08_channelfirst_False_disj_Neurons.keras")

In [16]:
layer = vanilla_model.layers[-1]
new_dense = Dense(units=4, activation=None, use_bias=True)
vanilla_model_bis = keras.models.Sequential(vanilla_model.layers[:-1] + [new_dense])

In [17]:
new_dense(layer.input) # compile and erase weights

<KerasTensor shape=(None, 4), dtype=float32, sparse=False, ragged=False, name=keras_tensor_9>

In [18]:
w_temp = np.zeros((16,4), dtype = 'float32')
b_temp = np.zeros((4,))
b_temp[2:] = -10000

w = layer.get_weights()[0] #(16,2)
w_temp[:,:2] = w

In [19]:
new_dense.set_weights([w_temp, b_temp])

In [20]:
vanilla_model_bis.summary()

In [21]:
vanilla_model_bis.predict(x_test[0:1])

[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 273ms/step


array([[-2.5456893e+00,  2.5312362e+00, -1.0000000e+04, -1.0000000e+04]],
      dtype=float32)

In [22]:
vanilla_model.predict(x_test[0:1])

[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 262ms/step


array([[-2.5456893,  2.5312362]], dtype=float32)

In [23]:
vanilla_model_bis.save("/home/aws_install/robustess_project/lip_models/demo3_FC_vanilla_MNIST08_channelfirst_False_disj_Neurons_4logits.keras")