# Gamma/neutron discrimination based on ML

## Machine learning - Training and compression

--- 

- Workflow based on R. S. Molina, I. R. Morales, M. L. Crespo, V. G. Costa, S. Carrato and G. Ramponi, "An End-to-End Workflow to Efficiently Compress and Deploy DNN Classifiers on SoC/FPGA", in IEEE Embedded Systems Letters, vol. 16, no. 3, pp. 255-258, Sept. 2024, doi: 10.1109/LES.2023.3343030.

- Code adapted from the official repository of "An End-to-End Workflow to Efficiently Compress and Deploy DNN Classifiers on SoC/FPGA"

- Using open dataset from: https://doi.org/10.5281/zenodo.8037058

----

## Import libraries


In [None]:
import os
import numpy as np
from numpy import array
import matplotlib.pyplot as plt
import seaborn as sn
import pandas as pd

## Tensorflow + Keras libraries
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import Sequential
from tensorflow.keras.models import *
from tensorflow.keras.layers import *
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.optimizers import SGD, Adam
from tensorflow.keras.regularizers import l2, l1

import tensorflow_model_optimization as tfmot

## Pruning
from tensorflow_model_optimization.python.core.sparsity.keras import prune, pruning_callbacks, pruning_schedule
from tensorflow_model_optimization.sparsity.keras import strip_pruning

## Quantization
from qkeras import *

## Knowledge Distillation
from distillationClassKeras import *

## Metrics
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
from sklearn.utils import shuffle

## Pre-processing
from sklearn.preprocessing import MinMaxScaler, StandardScaler, LabelEncoder

## Training
from sklearn.model_selection import train_test_split

import hls4ml

import plotting

tf.random.set_seed(42)


## Auxiliar functions

In [None]:
# Function to define the training and testing datasets

def preproc_dataset_(signal_dfN, signal_dfG):
    
    # Label in csv file corresponds to the signal class
    _LABEL_COLUMN = 'class'
    
    dfTest = pd.DataFrame()
    dfTrain = pd.DataFrame()
    
    signal_dfN = shuffle(signal_dfN)
    signal_dfG = shuffle(signal_dfG)
    
    signal_df = pd.concat([signal_dfN, signal_dfG])
    
    for k in range(0,2):
     
        df2 = signal_df[signal_df[_LABEL_COLUMN].isin([k])]
        
        df_tr = df2[:10000]
        df_t = df2[10001:10900]
        
        dfTrain = pd.concat([df_tr, dfTrain])   
        dfTest = pd.concat([df_t, dfTest])
        
    
    return dfTrain, dfTest

---

## Dataset

In [None]:
# Define path to the dataset for training

PATH = '../dataset/'

GAMMA_DATASET_FILE = PATH + 'gamma_label.csv'
NEUTRON_DATASET_FILE = PATH + 'neutron_label.csv'

TEST_DATASET_FILE = PATH + 'test.csv'

In [None]:
# Load datasets in Pandas data frame 

dfGamma = pd.read_csv(GAMMA_DATASET_FILE)
dfNeutron = pd.read_csv(NEUTRON_DATASET_FILE)
dfTestGN = pd.read_csv(TEST_DATASET_FILE)

In [None]:
# Pre-processing dataset for training 
df_train, dfTest = preproc_dataset_(dfNeutron, dfGamma)


In [None]:
df_train_ = df_train.pop('class')
dfTest_ = dfTest.pop('class')

# One-hot encoder
le = LabelEncoder()
y = le.fit_transform(df_train_)
y = to_categorical(df_train_, 2)

le = LabelEncoder()
yTest = le.fit_transform(dfTest_)
yTest = to_categorical(dfTest_, 2)

# Split training dataset into training and validation
x_train, x_val, y_train, y_val = train_test_split(df_train, y, test_size=0.3, random_state=42)

---

## Machine leraning - Training and compression

### Teacher training

#### Hyperparameters for the teacher architecture

In [None]:
# Define the hyperparameters for the teacher model
lr = 0.001
neurons_teacher = [10, 5, 7, 5, 6]

#### Architecture

The teacher architecture is shown in the next figure. It is composed of five hidden layers, an input of 161 elements, and an output of two elements that correspond to gamma and neutron classification.


<p align="center">
<img src = "img/teacherArchitecture.png" width = 50%>
      <figcaption style = "text-align: center; font-style: italic">Teacher architecture.</figcaption>
</p>

In [None]:
# This function defines the teacher architecture composed of six dense layers.
# bestHP contains the number of neurons per each dense layer.


def teacher_topology(bestHP):

    teacher = keras.Sequential(
        [
            keras.Input(shape=(161, )),
             
            Dense(bestHP[0], name='fc1', input_shape=(161,), kernel_regularizer=l2(0.001),),
            Activation(activation='relu', name='relu1'),       

            Dense(bestHP[1], name='fc2', kernel_regularizer=l2(0.001)),
            Activation(activation='relu', name='relu2'),
        
            Dense(bestHP[2], name='fc3', kernel_regularizer=l2(0.001)),
            Activation(activation='relu', name='relu3'),      
            Dropout(0.1),    
            
            Dense(bestHP[3], name='fc4', kernel_regularizer=l2(0.001)),
            Activation(activation='relu', name='relu4'), 
            Dropout(0.2), 

            Dense(bestHP[4], name='fc5'),
            Activation(activation='relu', name='relu5'), 
            Dropout(0.2),            

            Dense(2, name='output'),
            Activation(activation='sigmoid', name='activationOutput'),
            
        ],
        name="teacher_MLP",
    )

    teacher.summary()


    return teacher

#### Build teacher model

In [None]:

def build_teacher(neurons_teacher):

    model = teacher_topology(neurons_teacher)
       
    opt = Adam(lr)
    
    model.compile(optimizer=opt, loss=['binary_crossentropy'], metrics=['accuracy'])
    
    return model

The teacher is defined by means of the `build_teacher()` function. 

In [None]:
teacher_model = build_teacher(neurons_teacher)

#### Training of the teacher model

This steps performs the neural network training through `teacher_model.fit()`. 

- _x_ and _y_ are the defined as _x_train_ and _y_train_, respectively. The validation data corresponds to _x_val_ and _y_val_. 

- The batch size is defined as 64, while the number of epochs is 32. 

- Callbacks are used to early stop the training process if no improvement in loss is obtained during training (_EarlyStopping_), and the reduction of the learning rate based on the accuracy metric (_ReduceLROnPlateau_). 

In [None]:
callbacks = [
            tf.keras.callbacks.EarlyStopping(monitor="val_loss", patience=10, verbose=1, restore_best_weights=True),
            tf.keras.callbacks.ReduceLROnPlateau(monitor='accuracy', factor=0.4, patience=3, verbose=1)
            ] 

history_teacher  = teacher_model.fit(x=x_train, y=y_train,
                  validation_data=(x_val, y_val), 
                  batch_size = 64,
                  epochs=32,
                  callbacks = [callbacks],
                  verbose=1
                  )   


#### Plot accuracy and loss 

Plots of the behavior of accuracy and loss during training are generated.

In [None]:
print(history_teacher.history.keys())

## Plot for accuracy
plt.figure(figsize=(15,3))
plt.plot(history_teacher.history['accuracy'])
plt.plot(history_teacher.history['val_accuracy'])
plt.title('model accuracy')
plt.ylabel('accuracy')
plt.xlabel('epoch')
plt.legend(['train', 'validation'], loc='upper left')
plt.show()
plt.figure()

## Plot for loss
plt.figure(figsize=(15,3))
plt.plot(history_teacher.history['loss'])
plt.plot(history_teacher.history['val_loss'])
plt.title('model loss')
plt.ylabel('loss')
plt.xlabel('epoch')
plt.legend(['train', 'validation'], loc='upper left')
plt.show()


For this project, a pre-trained model can be loaded to skip the training step. To enable this, uncomment the following lines.

In [None]:
# teacher_model = load_model('models/teacherModel_GN_smr4078.h5')
# teacher_model.summary()

#### Performance analysis

In [None]:
# Obtain the confusion matrix using the testing dataset 
y_pred_probs = teacher_model.predict(dfTest)
y_pred = np.argmax(y_pred_probs, axis=1)

# Since y_test is one-hot encoded, you need to convert it back to class indices
y_true = np.argmax(yTest, axis=1)  # Convert one-hot encoded labels to class indices

cm = confusion_matrix(y_true, y_pred)
disp = ConfusionMatrixDisplay(confusion_matrix=cm)
disp.plot(cmap="Blues")
plt.title('Confusion Matrix - Teacher model')
plt.show()


In [None]:
# Weight distribution

weights = np.concatenate([w.flatten() for w in teacher_model.get_weights()])

plt.figure(figsize=(10,2))
plt.hist(weights, bins=60, color='green', alpha=0.6)
plt.xlabel("Weight Value")
plt.ylabel("Frequency")
plt.title("Model MLP for G/N discrimination - Weight Distribution")
plt.show()



#### Save teacher model

In [None]:
# If needed, save the teacher model [uncomment the following line]
# teacher_model.save('models/teacherModel_GN_smr4078.h5')

----

### Student training

To obtain the reduced model that will be deployed onto the FPGA (hereinafter referred to as the student network), a **KD** learning approach will be employed. This method involves the teacher model transferring its knowledge to the student model. The latter is defined using quantization and pruning strategies.  

In this project, the number of bits was set to eight, and the target sparsity was set to 50%. 

**Qkeras** is employed to define the student model in a quantized manner. 

**For more information regarding QKeras:** Coelho, C. N., Kuusela, A., Zhuang, H., Aarrestad, T., Loncar, V., Ngadiuba, J., ... & Summers, S. (2020). _Ultra low-latency, low-area inference accelerators using heterogeneous deep quantization with QKeras and hls4ml_. arXiv preprint arXiv:2006.10159, 108.

#### Hyperparameters for the student model

In [None]:
# Define the hyperparameters for the student model
lr = 0.001
neurons_student = [6, 4, 2, 4, 3]

#### Define student architecture


<p align="center">
<img src = "img/studentArchitecture.png" width = 50%>
      <figcaption style = "text-align: center; font-style: italic">Student architecture.</figcaption>
</p>

In [None]:
def student_architecture(neurons_student):
    
    '''
    Model to be trained. Defined with quantization strategies. 
    Input: hyperparams. (bestHP)
    Output: compressed model (studentQ_MLP). 

    '''
    ######## ---------------------------  Model definition - 1D STUDENT -----------------------------------------

    # Number of bits 
    ## 8-bits
    kernelQ = "quantized_bits(8,4)"
    biasQ = "quantized_bits(8,4)"
    activationQ = 'quantized_bits(8)'
    
    ## 16-bits
    kernelQ_16b = "quantized_bits(16,6)"
    biasQ_16b = "quantized_bits(16,6)"
    activationQ_16b = 'quantized_bits(16)'
    
    studentQ_MLP = keras.Sequential(
            [   
                Input(shape=(161,), name='inputLayer'),
                QDense(neurons_student[0], name='fc1',
                        kernel_quantizer= kernelQ, bias_quantizer= biasQ,
                        kernel_initializer='lecun_uniform'),
                QActivation(activation= activationQ ,  name='relu0'),
                
                Dropout(0.1),
                    
                QDense(neurons_student[1], name='fc2',
                        kernel_quantizer=kernelQ, bias_quantizer=biasQ,
                        kernel_initializer='lecun_uniform'),
                QActivation(activation=activationQ, name='relu1'), 
                Dropout(0.1),
                

                QDense(neurons_student[2], name='fc3',
                        kernel_quantizer=kernelQ, bias_quantizer=biasQ,
                        kernel_initializer='lecun_uniform'),
                QActivation(activation=activationQ, name='relu2'), 
                Dropout(0.1),

                QDense(neurons_student[3], name='fc4',
                        kernel_quantizer=kernelQ, bias_quantizer=biasQ,
                        kernel_initializer='lecun_uniform'),
                QActivation(activation=activationQ, name='relu3'), 
                Dropout(0.2),

                QDense(neurons_student[4], name='fc5',
                       kernel_quantizer=kernelQ, bias_quantizer=biasQ,
                       kernel_initializer='lecun_uniform'),
                QActivation(activation=activationQ, name='relu4'), 
                Dropout(0.2),
               
                QDense(2, name='output',
                        kernel_quantizer= kernelQ_16b, bias_quantizer= biasQ_16b,
                        kernel_initializer='lecun_uniform'),
                Activation(activation='sigmoid', name='outputActivation')

                
            ],
            name="studentMLP",
        )


    return studentQ_MLP

#### Build student model

In [None]:
# Build student model with pruning strategy

def build_student(student_neurons):

    '''
    Model to be compressed. Defined with quantization strategies. 
    Input: hyperparams (student_neurons).
    Output: compressed model (studentQ). 

    '''
    
    qmodel = student_architecture(student_neurons)

    # Pruning parameters 
    pruning_params = {
        'pruning_schedule': tfmot.sparsity.keras.PolynomialDecay(
        initial_sparsity=0.0, final_sparsity=0.5, begin_step=0, end_step=1000
    )
    }
    
    studentQ = prune.prune_low_magnitude(qmodel, **pruning_params)
    
    
    return studentQ



#### Compile student model for KD + QAT

In [None]:
studentQ = build_student(neurons_student)

distilled_student = Distiller(student=studentQ, teacher=teacher_model)

adam = Adam(lr)

train_labels = np.argmax(y_train, axis=1)

distilled_student.compile(
        optimizer=adam,
        metrics=[keras.metrics.SparseCategoricalAccuracy()],
        student_loss_fn=keras.losses.SparseCategoricalCrossentropy(from_logits=True),
        distillation_loss_fn=keras.losses.KLDivergence(),
        alpha=0.3, 
        temperature=9,
    )

#### Training of the student model

In [None]:
callbacks = [
                tf.keras.callbacks.EarlyStopping(patience=10, verbose=1),
                tf.keras.callbacks.ReduceLROnPlateau(monitor='sparse_categorical_accuracy', factor=0.3, patience=3, verbose=1),
                ]  


callbacks.append(pruning_callbacks.UpdatePruningStep())

history_studentQPKD = distilled_student.fit(x_train, train_labels, 
                               batch_size = 64, 
                               epochs= 32, 
                               validation_split=0.2,
                               callbacks = callbacks
                               )

#### Performance analysis

In [None]:
# Plot accuracy over epochs
plt.figure(figsize=(15,3))
plt.plot(history_studentQPKD.history['sparse_categorical_accuracy'], label='Train Accuracy')
plt.plot(history_studentQPKD.history['val_sparse_categorical_accuracy'], label='Validation Accuracy')
plt.xlabel('Epochs')
plt.ylabel('Accuracy')
plt.legend()
plt.title('Accuracy over epochs')
plt.show()

In [None]:
# Plot loss over epochs
plt.figure(figsize=(15,3))
plt.plot(history_studentQPKD.history['student_loss'], label='Train Loss')
plt.plot(history_studentQPKD.history['val_student_loss'], label='Validation Loss')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.legend()
plt.title('Loss over epochs')
plt.show()


Weight distribution after pruning

In [None]:

weights = np.concatenate([w.flatten() for w in distilled_student.student.get_weights()])

plt.figure(figsize=(10,2))
plt.hist(weights, bins=60, color='green', alpha=0.6)
plt.xlabel("Weight Value")
plt.ylabel("Frequency")
plt.title("Model for G/N - Weight Distribution")
plt.show()


Print the confusion matrix to observe the performance of the model.

In [None]:

# Obtain the confusion matrix using the testing dataset 
y_pred_probs = distilled_student.student.predict(dfTest)
y_pred = np.argmax(y_pred_probs, axis=1)

# Since y_test is one-hot encoded, you need to convert it back to class indices
y_true = np.argmax(yTest, axis=1)  # Convert one-hot encoded labels to class indices

cm = confusion_matrix(y_true, y_pred)
disp = ConfusionMatrixDisplay(confusion_matrix=cm)
disp.plot(cmap="Blues")
plt.title('Confusion Matrix - Student model Q, P, and KD')
plt.show()


#### Test single prediction

Test the model for single inputs. 

The variable **indexPrediction** corresponds to the signals' index  in the testing dataset. You can change this number to observe how the ML-based model predicts for specific inputs. 

In [None]:
# Index of the signal in the testing dataset
indexPrediction = 1500

x_input = dfTest.iloc[indexPrediction]
y_label = yTest[indexPrediction]

inputPred = array([x_input])

y_pred = distilled_student.student.predict(inputPred) 
print("> Input %s -> Predicted = %s | " % (y_label, y_pred))

plt.figure(figsize=(15,3))
plt.xlabel('Samples', fontsize=11)
plt.ylabel('Amplitude', fontsize=11)
plt.grid(True, alpha=1.0)
plt.plot(x_input.values,  label="Signal 1", color='navy', markersize=7, lw=1)

plt.title('Signal used for inference - Pulse: %s, Label: %s, Prediction: %s' % (indexPrediction, y_label, y_pred))

#### Save student model

In [None]:

model = strip_pruning(distilled_student.student)
model.summary()

model.save('../models/studentModel_GN_smr4110.h5')

----

## Integration with a hardware synthesis tool for ML

### hls4ml

**hls4ml** is a Python package developed for converting machine learning (ML) models into HLS (High-Level Synthesis) projects, enabling deployment of ML-based inference on hardware like FPGAs. More details can be found at [hls4ml documentation](https://fastmachinelearning.org/hls4ml/).

The user can control several options related to the model, including:

- **Precision:** Define the precision of the calculations in your model (e.g., fixed-point or floating-point representation).

- **Dataflow/Resource Reuse:** Control the level of parallelism or streaming in model implementations, with varying degrees of pipelining.



hls4ml has compatibility with **Quantization Aware Training:** Achieve optimized performance at low precision by using tools like QKeras. Models trained with QKeras benefit automatically from hls4ml's parsing of QKeras models during inference.

An HLS configuration should be created using the function `hls4ml.utils.config_from_keras_model(kerasModel, granularity)`, where kerasModel is the pre-trained model you want to implement on an FPGA, and granularity determines the configuration level. The two possible values for granularity are:

- 'model': The same configuration applies to the entire model (e.g., all layers use 16-bit fixed-point precision).

- 'name': Layer-specific configurations can be applied (e.g., the input layer can be defined in 8-bit fixed-point precision, while the second layer is set to 16-bit fixed-point precision).

In [None]:
# PATH Xilinx Virtual Machine
os.environ['PATH'] = '/tools/Xilinx/Vitis_HLS/2022.2/bin:' + os.environ['PATH']
os.environ['PATH']

In [None]:

hls_config = hls4ml.utils.config_from_keras_model(model, granularity='name')



print("-----------------------------------")
plotting.print_dict(hls_config)
print("-----------------------------------")




In [None]:
for Layer in hls_config['LayerName'].keys():
    hls_config['LayerName'][Layer]['Strategy'] = 'Latency'
    hls_config['LayerName'][Layer]['ReuseFactor'] = 1
    hls_config['LayerName'][Layer]['Precision'] = 'ap_fixed<8,4>'

hls_config['Model']['Precision'] = 'ap_fixed<16,10>'
hls_config['LayerName']['outputActivation']['Strategy'] = 'Stable'
hls_config['LayerName']['inputLayer']['Precision'] = 'ap_fixed<16, 6>'   


In [None]:
# Create configuration for Vitis HLS as backend.
cfg = hls4ml.converters.create_config(backend='Vitis')

# HLSConfig correspond to the configuration created in hls_config 
cfg['HLSConfig']  = hls_config
# Model to be converted
cfg['KerasModel'] = model
# Folder where the HLS project will be created
cfg['OutputDir']  = 'hlsPrj/'
# FPGA part 
cfg['Part'] = 'xc7z020clg484-1'  
  
hls_model = hls4ml.converters.keras_to_hls(cfg)

hls_model.compile()

In [None]:
# This will perform the synthesis of the HLS project, showing the main results (latency and resource usage) once the process is completed. 

hls_model.build(csim=False, export=False)

**Once this stage is complete, you can return to the wiki to continue with the next steps.**