# This repo contains the following:
- Code for CNN-based classification for MNIST Dataset
- Code for Pointnet classification for MNIST3D Dataset
- Code for Fusion-based classification for MNIST and MNIST3D Dataset

# Load Libraries

In [None]:
# Set-up and load libraries
import os
import glob
import h5py
import numpy as np
import pandas as pd


# Tensorflow and Keras...
import tensorflow as tf
from tensorflow import keras
from keras import layers
from keras.models import Sequential
from keras.layers import BatchNormalization
from keras.layers import Conv2D
from keras.layers import MaxPooling2D
from keras.layers import Activation
from keras.layers import Dropout
from keras.layers import Dense
from keras.layers import Flatten
from keras.layers import Input
from keras.layers import concatenate
from keras.layers import GlobalMaxPooling1D
from keras.utils import plot_model
from keras.models import Model


# plot and others...
from matplotlib import pyplot as plt
import trimesh
import seaborn as sns
from matplotlib.animation import FuncAnimation
from sklearn.metrics import classification_report
# set seed
tf.random.set_seed(1234)

# Config and parameters

In [None]:
num_cloud_points = 1024
point_cloud_size = (num_cloud_points, 3)
image_size = (30, 30, 1)
num_classes = 10


epochs = 5
batch_size = 32
patience = 2

# run_type = 'demo'
run_type = 'full'

# Load MNIST (2D + 3D) Dataset

In [None]:
# data directory
DATADIR = '../data/multimodal/mnist3d/'

In [None]:
def load_data(data_type, num_pcs):
    '''
    Load train and test data: 2D images, 3D point clouds and labels

    arg:
    data_type: 'train' or 'test'

    return:
    x_2d: 2D images 
    x_3d: 3D pointcloud
    y: labels
    '''
    print(f"Loading {data_type} data......")

    with h5py.File(DATADIR + f'{data_type}_point_clouds.h5', 'r') as points_dataset:

        # Array to store data...
        x_2d = []
        x_3d = []
        y = [] 

        # Looping over the whole dataset..
        for i, (key, sample) in enumerate(points_dataset.items()):
            
            # Saving images...
            x_2d.append(sample['img'][:])

            # Sampling point cloud and saving...
            pointCloud = sample['points'][:]
            num_of_rows = pointCloud.shape[0]
            random_ind = np.random.choice(num_of_rows, size=num_pcs, replace=False)
            pointCloud = pointCloud[random_ind, :]
            x_3d.append(pointCloud)

            # Ssaving labels...
            y.append(sample.attrs['label'])

    # converting to np array...
    x_2d = np.stack(x_2d).reshape(-1, 30, 30, 1)
    x_3d = np.stack(x_3d)
    
    # convert class vectors to binary class matrices
    y = keras.utils.to_categorical(np.array(y), num_classes)  

    # Dropping samples if running as a demo...
    
    # Total samples..............
    total_samples = x_3d.shape[0]
    # Taking random indices for resampling
    random_indices = np.arange(total_samples)
    np.random.shuffle(random_indices)

    # checking the run_type
    if run_type == 'demo':
        print(f"Original shape of 2D samples: {x_2d.shape}")
        print(f"Original shape of 3D samples: {x_3d.shape}")
        demo_samples = 100
        random_indices = random_indices[0:demo_samples]

    # Selecting training and test samples...
    x_2d = x_2d[random_indices]
    x_3d = x_3d[random_indices]
    y = y[random_indices]
   
    # Done loading and processing dataset...
    # print(f"{data_type} data loaded...!")
    print(f"_______________________________________________")
    print(f"Loaded shape of 2D samples: {x_2d.shape}")
    print(f"Loaded shape of 3D samples: {x_3d.shape}")
    print("\n")
    # return image, pointclouds, and labels
    return x_2d, x_3d, y

In [None]:
# Loading training data
x_train_2d, x_train_3d, y_train = load_data(data_type = 'train', num_pcs = num_cloud_points)
y_train_digit = np.argmax(y_train, axis=1) # Convert one-hot to index

# Loading test data
x_test_2d, x_test_3d, y_test = load_data(data_type = 'test', num_pcs = num_cloud_points)
y_test_digit = np.argmax(y_test, axis=1) # Convert one-hot to index
y_test_df = pd.DataFrame(y_test_digit, columns = ['Label'])

x_train_dict = {}
x_train_dict['2D'] = x_train_2d
x_train_dict['3D'] = x_train_3d
x_train_dict['Fusion'] = [x_train_2d, x_train_3d]

x_test_dict = {}
x_test_dict['2D'] = x_test_2d
x_test_dict['3D'] = x_test_3d
x_test_dict['Fusion'] = [x_test_2d, x_test_3d]

# Creating tensor for adv sample generation
y_test_df_samp = y_test_df.groupby('Label', group_keys=False).apply(lambda x: x.sample(1))
test_ind = y_test_df_samp.index

# Creating test samples with statified sampling...
# x_test_2d_tesnor = tf.convert_to_tensor(x_test_2d[test_ind])
# x_test_3d_tesnor = tf.convert_to_tensor(x_test_3d[test_ind])
# x_test_fusion_tesnor = [x_test_2d_tesnor, x_test_3d_tesnor]
# input_label = tf.convert_to_tensor(y_test[test_ind])
x_actual_dict = {}
x_actual_dict['2D'] = tf.convert_to_tensor(x_test_2d[test_ind])
x_actual_dict['3D'] = tf.convert_to_tensor(x_test_3d[test_ind])
x_actual_dict['Fusion'] = [x_actual_dict['2D'], x_actual_dict['3D']]
y_actual = tf.convert_to_tensor(y_test[test_ind])
y_actual_digit = y_test_df_samp['Label'].values


# Loss object..
loss_func = tf.keras.losses.CategoricalCrossentropy()

# Creating Models to extract features

In [None]:
# Functions for building the models.......

# Convolution with batch normalization...
def conv_bn(x, filters):
    x = layers.Conv1D(filters, kernel_size=1, padding="valid")(x)
    x = layers.BatchNormalization(momentum=0.0)(x)
    return layers.Activation("relu")(x)

# Dense with batch normalization...
def dense_bn(x, filters):
    x = layers.Dense(filters)(x)
    x = layers.BatchNormalization(momentum=0.0)(x)
    return layers.Activation("relu")(x)

# Transormation Network (T-Net)...
def tnet(inputs, num_features):
    # Initalise bias as the indentity matrix
    bias = keras.initializers.Constant(np.eye(num_features).flatten())
    # reg = OrthogonalRegularizer(num_features)
    x = conv_bn(inputs, 32)
    x = conv_bn(x, 64)
    x = conv_bn(x, 512)
    x = layers.GlobalMaxPooling1D()(x)
    x = dense_bn(x, 256)
    x = dense_bn(x, 128)
    x = layers.Dense(
        num_features * num_features,
        kernel_initializer="zeros",
        bias_initializer=bias,
        # activity_regularizer=reg,
    )(x)
    feat_T = layers.Reshape((num_features, num_features))(x)
    # Apply affine transformation to input features
    return layers.Dot(axes=(2, 1))([inputs, feat_T])


In [None]:
# CNN Model to extract 128 features from 2D MNIST dataset
inputs_2D = Input(shape=image_size)
feat_2d = Conv2D(32, kernel_size=(3, 3), activation="relu")(inputs_2D)
feat_2d = MaxPooling2D(pool_size=(2, 2))(feat_2d)
feat_2d = Conv2D(64, kernel_size=(3, 3), activation="relu")(feat_2d)
feat_2d = MaxPooling2D(pool_size=(2, 2))(feat_2d)
feat_2d = Flatten()(feat_2d)
feat_2d = Dropout(0.3)(feat_2d)
feat_2d = Dense(128, activation="relu")(feat_2d)
# Creating the model to extract features from input images
model_feat_ext_2D = Model(inputs=inputs_2D, outputs=feat_2d,  name="feature-extractor-2D")
print(model_feat_ext_2D.summary())

In [None]:
# Pointnet model to extract 128 features from 3D MNIST dataset
inputs_3D = Input(shape= point_cloud_size)

feat_3D = tnet(inputs_3D, 3)
feat_3D = conv_bn(feat_3D, 32)
feat_3D = conv_bn(feat_3D, 32)
feat_3D = tnet(feat_3D, 32)
feat_3D = conv_bn(feat_3D, 32)
feat_3D = conv_bn(feat_3D, 64)
feat_3D = conv_bn(feat_3D, 512)
feat_3D = GlobalMaxPooling1D()(feat_3D)
feat_3D = dense_bn(feat_3D, 256)
feat_3D = Dropout(0.3)(feat_3D)
feat_3D = dense_bn(feat_3D, 128)
# Creating the model to extract features from input point cluds
model_feat_ext_3D = Model(inputs=inputs_3D, outputs=feat_3D,  name="feature-extractor-3D")
print(model_feat_ext_3D.summary())


In [None]:
def train_and_evaluate(model_name, model_type, model, epochs, batch_size, patience):
    """  
    arg: model and other info
    return: trained model & eval data
    """
    # Train and evaluate model
    # Starting the training...
    callback = tf.keras.callbacks.EarlyStopping(monitor="val_loss", patience=patience)
    history = model.fit(x_train_dict[model_type], y_train, batch_size=batch_size
    , epochs=epochs, validation_split=0.1, callbacks=[callback])

    # Evaluate model on test data...
    accuracy = model.evaluate(x_test_dict[model_type], y_test, batch_size=56)[1]
    print(f"Test Accuracy of {model_name}: {np.round(accuracy*100,5)}%")

    plt.figure()
    plt.plot(history.history['loss'], marker = 'p')
    plt.title(f"Test Accuracy of {model_name}: {np.round(accuracy*100,5)}%")
    plt.show()

    # Classification matrix...
    y_pred = np.argmax(model.predict(x_test_dict[model_type]), axis=1)
    cls_rep = classification_report(y_test_digit, y_pred) 

    model_eval = {
        'model': model,
        'model_name' : model_name,
        'model_type': model_type,
        'history': history,
        'accuracy': accuracy,
        'cls_rep' : cls_rep
    }   
    #-------------------------
    return model_eval

def visualize_prediction(model_name, model_type, model):
    """ 
    Visualize the prediction performance
    arg: 
    model_type: '2D' , '3D' or 'Fusion'
    model: keras model

    return:
    plot the first 10 samples with predicted labels
    """
    
    # Visualize predictions and images...
    noOfsamples = 5 if model_type =='Fusion' else 10
    images, points, labels = x_test_2d[:noOfsamples], x_test_3d[:noOfsamples], y_test[:noOfsamples]

    # Setting up input data for the prediction
    if model_type == '2D':
        input_data = images
    elif model_type == '3D':
        input_data = points
    elif model_type =='Fusion':
        input_data = [images, points]
        

    # run test data through model
    preds = model.predict(input_data)
    preds = tf.math.argmax(preds, -1)
    labels = np.argmax(labels, axis = 1)

    # plot points with predicted class and label
    fig = plt.figure(figsize=(12, 5))
    # Looping over the input sample to visualization
    for i in range(noOfsamples):
        print(f"pred: {preds[i].numpy()}, label: {labels[i]}") 
        
        if model_type == '2D' or model_type == 'Fusion':
            if model_type != 'Fusion':
                index = i+1
            else:
                index = i+1
            ax = fig.add_subplot(2, 5, index)
            ax.imshow(images[i, :, :, 0])
            ax.set_title(
                "Pred: {:}, Label: {:}".format(
                    preds[i].numpy(), labels[i]
                )
            )
            ax.set_axis_off()

        if model_type == '3D' or model_type == 'Fusion':
            if model_type != 'Fusion':
                index = i+1
            else:
                index = noOfsamples+i+1
            ax = fig.add_subplot(2, 5, index, projection="3d")    
            ax.scatter(points[i, :, 0], points[i, :, 1], points[i, :, 2], marker='.')
            # ax.set_axis_off()
            if model_type != 'Fusion':
                ax.set_title("Pred: {:}, Label: {:}".format(
                    preds[i].numpy(), labels[i]))
            ax.set_xticks([])
            ax.grid(True)
            ax.view_init(0, 10)
    plt.show()

In [None]:
# Storing model eval data...
model_eval_dict = {}

## Creating 2D CNN-based Classifier

In [None]:
# Model to predict score for 128 features for MNIST
feats_extracted_2D = Input(shape= feat_2d.shape[1:])  
output_pred_2D = Dense(num_classes, activation="softmax")(feats_extracted_2D)
model_pred_scores_2D = Model(feats_extracted_2D, output_pred_2D)
# model_pred_scores_2D.summary()

# Model 2D CNN-based Classifier for MNIST 2D
# Complete CNN model for MNIST 2D Dataset
model_2D_mnist = keras.Model(inputs_2D, model_pred_scores_2D(model_feat_ext_2D(inputs_2D)))
model_2D_mnist.compile(loss="categorical_crossentropy", optimizer="adam", metrics=["accuracy"])
print(model_2D_mnist.summary())


model_name = 'model_2D_mnist'
model = model_2D_mnist
model_type = '2D'

# Train and evaluate...
model_eval_dict[model_name]= train_and_evaluate(
    model_name, model_type, model, epochs, batch_size, patience)
# # Visualize the prediction on 3D point clouds
visualize_prediction(model_name, model_type, model)

## Creating 3D Pointnet-based Classifier

In [None]:
# Model to predict score for 128 features for MNIST 3D dataset
feats_extracted_3D = Input(shape= feat_3D.shape[1:])  
output_pred_3D = Dense(num_classes, activation="softmax")(feats_extracted_3D)
model_pred_scores_3D = Model(feats_extracted_3D, output_pred_3D)
# model_pred_scores_3D.summary()

# Model Pointnet Classifier for MNIST 3D
model_3D_mnist = keras.Model(inputs_3D, model_pred_scores_3D(model_feat_ext_3D(inputs_3D)))
model_3D_mnist.compile(loss="categorical_crossentropy", optimizer="adam", metrics=["accuracy"])
print(model_3D_mnist.summary())




model_name = 'model_3D_mnist'
model = model_3D_mnist
model_type = '3D'
# epochs = 1 
# batch_size = 32
# patience = 5
# x_train = x_train_3d
# x_test = x_test_3d

# Train and evaluate...
model_eval_dict[model_name]= train_and_evaluate(
    model_name, model_type, model, epochs, batch_size, patience)

# # Visualize the prediction on 3D point clouds
visualize_prediction(model_name, model_type, model)

# Create Fusion Models

In [None]:
# Create Late Fusion Models with different fusion layers
from keras.layers import Concatenate, Average,Maximum, Minimum, Add, Subtract, Multiply

mid_fusion_layers_dict = {
    'concatenate': Concatenate(),
    'Average' : Average(),
    # 'Maximum' : Maximum(),
    # 'Minimum' : Minimum(),
    # 'Add': Add(),
    # 'Subtract': Subtract(),
    # 'Multiply': Multiply()
}


late_fusion_layers_dict = {
    'Average' : Average(),
    'Maximum' : Maximum(),
    # 'Minimum' : Minimum(),
    # 'Add': Add(),
    # 'Subtract': Subtract(),
    # 'Multiply': Multiply()
}

In [None]:
model_feat_ext_2D.trainable = False
model_feat_ext_3D.trainable = False

model_pred_scores_2D.trainable = False
model_pred_scores_3D.trainable = False

model_2D_mnist.trainable = False
model_3D_mnist.trainable = False

# model_feat_ext_2D.trainable = True
# model_feat_ext_3D.trainable = True

# model_pred_scores_2D.trainable = True
# model_pred_scores_3D.trainable = True

# model_2D_mnist.trainable = True
# model_3D_mnist.trainable = True

## Creating Late-Fusion Models

In [None]:
for fusion_layer_name, fusion_layer in late_fusion_layers_dict.items():

    print(fusion_layer_name)
    model_name = f"model_late_fusion_mnist_{fusion_layer_name}"
    
    # combine the output of the two branches
    output_pred_late_fusion = fusion_layer([model_2D_mnist.output, model_3D_mnist.output])

    # Finally combine two models and build a single multi modal mode...
    model_late_fusion_mnist = Model(inputs=[inputs_2D, inputs_3D], outputs= output_pred_late_fusion,  name = model_name)
    model_late_fusion_mnist.compile(loss="categorical_crossentropy", optimizer="adam", metrics=["accuracy"])
    # print(model_late_fusion_mnist.summary())

    model_name = model_name
    model = model_late_fusion_mnist
    model_type = 'Fusion'
    # epochs = 2 
    # batch_size = 32
    # patience = 5
    # x_train = [x_train_2d, x_train_3d]
    # x_test = [x_test_2d, x_test_3d]

    # Train and evaluate...
    model_eval_dict[model_name]= train_and_evaluate(
        model_name, model_type, model, epochs, batch_size, patience)

    # # Visualize the prediction on 3D point clouds
    visualize_prediction(model_name, model_type, model)

## Creating Mid Fusion Models

In [None]:
for fusion_layer_name, fusion_layer in mid_fusion_layers_dict.items():

    print(fusion_layer_name)
    model_name = f"model_mid_fusion_mnist_{fusion_layer_name}"
    
    # combine the output of the two branches
    output_pred_mid_fusion = fusion_layer([model_feat_ext_2D.output, model_feat_ext_3D.output])
    output_pred_mid_fusion = Dense(128, activation="relu")(output_pred_mid_fusion)
    output_pred_mid_fusion = Dense(64, activation="relu")(output_pred_mid_fusion)
    output_pred_mid_fusion = Dense(num_classes, activation="softmax")(output_pred_mid_fusion)

    # Finally combine two models and build a single multi modal mode...
    model_mid_fusion_mnist = Model(inputs=[inputs_2D, inputs_3D], outputs= output_pred_mid_fusion,  name = model_name)
    model_mid_fusion_mnist.compile(loss="categorical_crossentropy", optimizer="adam", metrics=["accuracy"])
    # print(model_late_fusion_mnist.summary())

    model_name = model_name
    model = model_mid_fusion_mnist
    model_type = 'Fusion'
    # epochs = 2 
    # batch_size = 32
    # patience = 5
    # x_train = [x_train_2d, x_train_3d]
    # x_test = [x_test_2d, x_test_3d]

    # Train and evaluate...
    model_eval_dict[model_name]= train_and_evaluate(
        model_name, model_type, model, epochs, batch_size, patience)

    # # Visualize the prediction on 3D point clouds
    visualize_prediction(model_name, model_type, model)

In [None]:
accuracy_list = []
for key in model_eval_dict.keys():
    accuracy_list.append([key, model_eval_dict[key]['accuracy']])
accuracy_df = pd.DataFrame(accuracy_list, columns = ['Name', 'Accuracy'])

In [None]:
# Visualize performance...
sns.barplot(accuracy_df, y = "Name", x = "Accuracy")

In [None]:
# Visualize performance...
sns.scatterplot(accuracy_df, y = "Name", x = "Accuracy")

# Avdersarisal attacks

## FGSM Attacks

### Test data on which we will create adversarial samples

In [None]:
# y_test_df_samp = y_test_df.groupby('Label', group_keys=False).apply(lambda x: x.sample(5))
# test_ind = y_test_df_samp.index

# # Creating test samples with statified sampling...
# x_test_2d_tesnor = tf.convert_to_tensor(x_test_2d[test_ind])
# x_test_3d_tesnor = tf.convert_to_tensor(x_test_3d[test_ind])
# x_test_fusion_tesnor = [x_test_2d_tesnor, x_test_3d_tesnor]
# input_label = tf.convert_to_tensor(y_test[test_ind])

# input_tensor = x_test_fusion_tesnor
# # Loss object..
# loss_func = tf.keras.losses.CategoricalCrossentropy()



In [None]:
perturb_dict = {}

In [None]:
# Run FGSM attacks to find purturbation matrix
# Create adversarial samples...
def create_adversarial_pattern(model, model_type):

    input_data = x_actual_dict[model_type]
    input_label = y_actual
    # Dictionary to save purturb signs
    perturbations_sign_dict = {}

    with tf.GradientTape() as tape:
        # Calcuate loss...
        tape.watch(input_data)
        input_pred = model(input_data)
        loss = loss_func(input_label, input_pred)

        # Find gradients of the loss w.r.t to the input image.
        gradient = tape.gradient(loss, input_data)

        # Get the sign of the gradients to create the perturbation
        if model_type == 'Fusion':
            # split into two parts
            perturbations_sign_dict['2D'] = tf.sign(gradient[0]).numpy()
            perturbations_sign_dict['3D'] = tf.sign(gradient[1]).numpy()
        else:
            # Single output
            perturbations_sign_dict[model_type] = tf.sign(gradient).numpy()

            # Visualize purturbation...
    return perturbations_sign_dict  

In [None]:
# epsilons = [0, 0.005, 0.01, 0.05, 0.1] #, 10, 100]
epsilons = [0, 0.1] #, 10, 100]
x_avd_sign_dict = {}
model_eval_adv = []

In [None]:
from tqdm import tqdm

In [None]:
for model_name in model_eval_dict.keys():
    
    model = model_eval_dict[model_name]['model']
    model_type = model_eval_dict[model_name]['model_type']

    # Generate adversarial samples...
    x_avd_sign_dict[model_name] = create_adversarial_pattern(model, model_type)

    #evalaute models performance on adversarial samples..

    if model_type == '2D':
        epsilons_2d = epsilons
        epsilons_3d = [0.0]
    elif model_type == '3D':
        epsilons_2d = [0.0]
        epsilons_3d = epsilons
    else:
        epsilons_2d = epsilons
        epsilons_3d = epsilons
    

    # Evaluate performance for different values of EPSILON
    for eps_2d in tqdm(epsilons_2d):
        for eps_3d in tqdm(epsilons_3d):
            
            x_del_dict = {}
            x_adv_dict = {}

            if model_type == '2D' or model_type == 'Fusion':
                # Noise to be added...
                x_del_dict['2D'] = x_avd_sign_dict[model_name]['2D'] * eps_2d
                x_adv_dict['2D'] = x_actual_dict['2D'] + x_del_dict['2D']
            
            if model_type == '3D' or model_type == 'Fusion':
                x_del_dict['3D'] = x_avd_sign_dict[model_name]['3D'] * eps_3d
                x_adv_dict['3D'] = x_actual_dict['3D'] + x_del_dict['3D']

            if model_type == 'Fusion':
                x_adv_dict['Fusion'] = [x_adv_dict['2D'], x_adv_dict['3D']]
            


            # Evaluate model on adversarial data...
            accuracy_adv = model.evaluate(x_adv_dict[model_type], y_actual, batch_size=32)[1]
            print(f"Test Accuracy of {model_name}: {np.round(accuracy_adv*100,5)}%")

            y_adv_pred = model.predict(x_adv_dict[model_type], batch_size = 32)

            # Classification matrix...
            y_adv_pred_digit = np.argmax(y_adv_pred, axis=1)
            cls_rep_adv = classification_report(y_actual_digit, y_adv_pred_digit) 


            model_eval_adv.append([model_name, model_type, eps_2d, eps_3d, accuracy_adv, cls_rep_adv])

                




In [None]:
model_eval_adv_df = pd.DataFrame(model_eval_adv, columns = ['model_name', 'model_type', 'eps_2d', 'eps_3d', 'accuracy_adv', 'cls_rep_adv'])
model_eval_adv_df

In [None]:
model_eval_adv_df