In [1]:
!pip install brevitas
!pip install qonnx
!pip install onnxoptimizer



In [2]:
from google.colab import drive
drive.mount('/content/drive', force_remount = True)

Mounted at /content/drive


In [3]:
import numpy as np
import os
import random
import cv2
import torch
import tensorflow as tf
from torch.utils.data import DataLoader, TensorDataset
import pandas as pd

# STO IGNORANDO LA CLASSE INTERMEDIA
class Preprocessing:
    def __init__(self, seed=1234, input_size=(560, 560)):
        self.seed = seed
        self.input_size = input_size

        random.seed(self.seed)
        os.environ['PYTHONHASHSEED'] = str(self.seed)
        np.random.seed(self.seed)
        tf.random.set_seed(self.seed)
        tf.compat.v1.set_random_seed(self.seed)

    def resize_images(self, images):
        resized_images = []
        for img in images:
            img = img.squeeze()
            img = cv2.resize(img.astype(np.float32), self.input_size, interpolation=cv2.INTER_AREA)
            img = np.expand_dims(img, axis=-1)
            resized_images.append(img)
        return np.array(resized_images, dtype=np.float32)

    #NPZ structure - Y: 6 tot columns, only last 3 to use (one hot encoded)
    def load_dataset(self, file_path):
        print(f"Loading dataset from: {file_path}\n")
        data = np.load(file_path)
        X_train = data["X_train"]
        Y_train = data["Y_train"]
        X_val = data["X_val"]
        Y_val = data["Y_val"]
        X_test = data["X_test"]
        Y_test = data["Y_test"]

        def remove_class_1(X, Y):
            class_indices = np.argmax(Y[:, 3:], axis=1)
            mask = class_indices != 1
            return X[mask], Y[mask]

        X_train, Y_train = remove_class_1(X_train, Y_train)
        X_val, Y_val = remove_class_1(X_val, Y_val)
        X_test, Y_test = remove_class_1(X_test, Y_test)

        X_train = self.resize_images(X_train)
        X_val = self.resize_images(X_val)
        X_test = self.resize_images(X_test)

        return X_train, Y_train, X_val, Y_val, X_test, Y_test

    #It returns one hot encoding Y in Y_test shape (6 columns --> only used 4th and 6th) to match the same shape for test_dataloader
    def load_strange_dataset(self, image_dir, label_csv):
        class_labels_df = pd.read_csv(label_csv)

        # Map only 0 and 100 to the correct one-hot columns (class 1 at index 3, class 2 at index 5)
        valid_classes = {0: 3, 100: 5}
        selected_indices = class_labels_df["Classe"].isin(valid_classes.keys())
        filtered_df = class_labels_df[selected_indices]

        Y_strange = np.zeros((len(filtered_df), 6), dtype=np.float32)
        for i, val in enumerate(filtered_df["Classe"]):
            Y_strange[i, valid_classes[val]] = 1.0

        valid_image_formats = (".jpg", ".jpeg", ".png", ".bmp", ".tiff")
        image_filenames = sorted([
            img for img in os.listdir(image_dir)
            if img.lower().endswith(valid_image_formats)
        ])

        # Filter image filenames based on the selected rows
        filtered_image_filenames = [image_filenames[i] for i in range(len(image_filenames)) if selected_indices.iloc[i]]

        X_strange = []
        for image_name in filtered_image_filenames:
            image_path = os.path.join(image_dir, image_name)
            image = cv2.imread(image_path, cv2.IMREAD_GRAYSCALE)
            if image is not None:
                image = cv2.resize(image, self.input_size)
                image = image.astype('float32') / 255.0
                X_strange.append(image)

        X_strange = np.array(X_strange)
        X_strange = np.expand_dims(X_strange, axis=-1)  # NHWC format to match the others

        print(f"Strange dataset loaded: {X_strange.shape}, Labels: {Y_strange.shape}")
        return X_strange, Y_strange



    def train_val_dataloaders(self, X_train, Y_train, X_val, Y_val, batch_size=16):
        X_train_tensor = torch.tensor(X_train, dtype=torch.float32).permute(0, 3, 1, 2)
        X_val_tensor = torch.tensor(X_val, dtype=torch.float32).permute(0, 3, 1, 2)

        Y_train_binary = Y_train[:, [3, 5]]
        Y_val_binary = Y_val[:, [3, 5]]
        Y_train_tensor = torch.tensor(np.argmax(Y_train_binary, axis=-1), dtype=torch.long)
        Y_val_tensor = torch.tensor(np.argmax(Y_val_binary, axis=-1), dtype=torch.long)

        train_dataset = TensorDataset(X_train_tensor, Y_train_tensor)
        val_dataset = TensorDataset(X_val_tensor, Y_val_tensor)

        train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
        val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)

        return train_loader, val_loader


    def test_dataloader(self, X, Y, batch_size=32, one_hot_labels=True):
        X_tensor = torch.tensor(X, dtype=torch.float32)
        if X_tensor.shape[-1] == 1:
            X_tensor = X_tensor.permute(0, 3, 1, 2)

        # !!! if one hot encoded -> only use 4th and 6th column for the label !!!
        if one_hot_labels:
            Y_binary = Y[:, [3, 5]]
            Y_tensor = torch.tensor(np.argmax(Y_binary, axis=-1), dtype=torch.long)
        else:
            Y_tensor = torch.tensor(Y, dtype=torch.long)

        dataset = TensorDataset(X_tensor, Y_tensor)
        return DataLoader(dataset, batch_size=batch_size, shuffle=False)



In [4]:
import torch
import brevitas.nn as qnn
import torch.nn.functional as F
from brevitas.quant import Int8ActPerTensorFloat

#-------------------
#   CONFIG MODEL
#-------------------
x=2
InOutQuant=True
bit_width = 8
#-------------------

class StandardModel(torch.nn.Module):
    def __init__(self, input_shape, elastic_lambda=1e-4):
        super(StandardModel, self).__init__()

        self.conv1 = qnn.QuantConv2d(
            input_shape[0], 8*x, kernel_size=3, stride=1, padding=1, weight_bit_width=bit_width,
            input_quant=Int8ActPerTensorFloat, output_quant=Int8ActPerTensorFloat)

        self.conv2 = qnn.QuantConv2d(
            8*x, 12*x, kernel_size=3, stride=1, padding=1, weight_bit_width=bit_width,
            input_quant=Int8ActPerTensorFloat, output_quant=Int8ActPerTensorFloat)

        self.conv3 = qnn.QuantConv2d(
            12*x, 16*x, kernel_size=3, stride=1, padding=1, weight_bit_width=bit_width,
            input_quant=Int8ActPerTensorFloat, output_quant=Int8ActPerTensorFloat)

        self.conv4 = qnn.QuantConv2d(
            16*x, 20*x, kernel_size=3, stride=1, padding=1, weight_bit_width=bit_width,
            input_quant=Int8ActPerTensorFloat, output_quant=Int8ActPerTensorFloat)

        self.conv5 = qnn.QuantConv2d(
            20*x, 24*x, kernel_size=3, stride=1, padding=1, weight_bit_width=bit_width,
            input_quant=Int8ActPerTensorFloat, output_quant=Int8ActPerTensorFloat)

        self.relu1 = qnn.QuantReLU(bit_width=bit_width)
        self.relu2 = qnn.QuantReLU(bit_width=bit_width)
        self.relu3 = qnn.QuantReLU(bit_width=bit_width)
        self.relu4 = qnn.QuantReLU(bit_width=bit_width)
        self.relu5 = qnn.QuantReLU(bit_width=bit_width)
        self.relu_fc1 = qnn.QuantReLU(bit_width=bit_width)

        self.pool = torch.nn.MaxPool2d(kernel_size=2, stride=2)
        self.global_pool = torch.nn.AdaptiveAvgPool2d(1)

        self.fc1 = qnn.QuantLinear(24*x, 16*x, weight_bit_width=bit_width)
        self.dropout = torch.nn.Dropout(0.3)
        self.fc2 = qnn.QuantLinear(16*x, 2, weight_bit_width=bit_width)

        self.l1_lambda = elastic_lambda
        self.l2_lambda = elastic_lambda

    def forward(self, x):
        x = self.pool(self.relu1(self.conv1(x)))
        x = self.pool(self.relu2(self.conv2(x)))
        x = self.pool(self.relu3(self.conv3(x)))
        x = self.pool(self.relu4(self.conv4(x)))
        x = self.pool(self.relu5(self.conv5(x)))
        x = self.global_pool(x)

        x = torch.flatten(x, 1)
        x = self.relu_fc1(self.fc1(x))
        x = self.dropout(x)
        x = self.fc2(x)

        return F.log_softmax(x, dim=1)


In [5]:
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.metrics import f1_score
import matplotlib.pyplot as plt
import numpy as np

class Trainer:
    def __init__(self, model, train_loader, val_loader,
                 output_dir, model_name,
                 input_shape=(1, 300, 300), num_epochs=300, learning_rate=0.001,
                 early_stopping_patience=10, device=None):

        self.model = model
        self.num_epochs = num_epochs
        self.learning_rate = learning_rate
        self.early_stopping_patience = early_stopping_patience
        self.device = device or ("cuda" if torch.cuda.is_available() else "cpu")
        self.model.to(self.device)

        self.train_loader = train_loader
        self.val_loader = val_loader

        self.criterion = nn.CrossEntropyLoss()
        self.optimizer = optim.Adam(self.model.parameters(), lr=self.learning_rate)

        self.history = {
            'loss': [], 'val_loss': [],
            'accuracy': [], 'val_accuracy': [],
            'f1_score': [], 'val_f1_score': []
        }

        self.model_name=model_name
        self.save_dir = os.path.join(output_dir, "model_train_val")
        self.log_file = os.path.join(self.save_dir, "training_log.txt")
        os.makedirs(self.save_dir, exist_ok=True)

    def train(self):
        best_val_f1 = -1.0
        patience_counter = 0

        with open(self.log_file, "w") as log:
            for epoch in range(self.num_epochs):
                train_loss, train_acc, train_f1 = self._run_epoch()
                val_loss, val_acc, val_f1 = self._validate()

                self.history['loss'].append(train_loss)
                self.history['val_loss'].append(val_loss)
                self.history['accuracy'].append(train_acc)
                self.history['val_accuracy'].append(val_acc)
                self.history['f1_score'].append(train_f1)
                self.history['val_f1_score'].append(val_f1)

                log.write(f"Epoch {epoch+1}/{self.num_epochs} - "
                          f"Train Loss: {train_loss:.4f}, Acc: {train_acc:.4f}, F1: {train_f1:.4f} | "
                          f"Val Loss: {val_loss:.4f}, Acc: {val_acc:.4f}, F1: {val_f1:.4f}\n")

                if epoch == 0 or val_f1 > best_val_f1:
                    best_val_f1 = val_f1
                    patience_counter = 0
                else:
                    patience_counter += 1
                    if patience_counter >= self.early_stopping_patience:
                        log.write("Early stopping triggered (by val_f1).\n")
                        break


        #self.model.load_state_dict(torch.load("best_model.pth"))
        self._save_metrics()
        torch.save(self.model.state_dict(), os.path.join(self.save_dir, f"brevitas_{self.model_name}.pth"))

    def _run_epoch(self):
        self.model.train()
        total_loss = 0.0
        correct = 0
        total = 0
        all_preds = []
        all_labels = []

        for X_batch, Y_batch in self.train_loader:
            X_batch, Y_batch = X_batch.to(self.device), Y_batch.to(self.device)

            self.optimizer.zero_grad()
            outputs = self.model(X_batch)
            loss = self.criterion(outputs, Y_batch)

            loss.backward()
            self.optimizer.step()

            total_loss += loss.item() * X_batch.size(0)
            _, predicted = outputs.max(1)
            correct += predicted.eq(Y_batch).sum().item()
            total += Y_batch.size(0)

            all_preds.extend(predicted.cpu().numpy())
            all_labels.extend(Y_batch.cpu().numpy())

        avg_loss = total_loss / len(self.train_loader.dataset)
        accuracy = correct / total
        f1 = f1_score(all_labels, all_preds, average='macro')
        return avg_loss, accuracy, f1

    def _validate(self):
        self.model.eval()
        total_loss = 0.0
        correct = 0
        total = 0
        all_preds = []
        all_labels = []

        with torch.no_grad():
            for X_batch, Y_batch in self.val_loader:
                X_batch, Y_batch = X_batch.to(self.device), Y_batch.to(self.device)

                outputs = self.model(X_batch)
                loss = self.criterion(outputs, Y_batch)

                total_loss += loss.item() * X_batch.size(0)
                _, predicted = outputs.max(1)
                correct += predicted.eq(Y_batch).sum().item()
                total += Y_batch.size(0)

                all_preds.extend(predicted.cpu().numpy())
                all_labels.extend(Y_batch.cpu().numpy())

        avg_loss = total_loss / len(self.val_loader.dataset)
        accuracy = correct / total
        f1 = f1_score(all_labels, all_preds, average='macro')
        return avg_loss, accuracy, f1

    def _save_metrics(self):
        for key, values in self.history.items():
            path = os.path.join(self.save_dir, f"{key}.npy")
            np.save(path, np.array(values))

        fig = plt.figure(figsize=(10, 5))

        plt.subplot(1, 2, 1)
        plt.plot(self.history['loss'], label='Train Loss')
        plt.plot(self.history['val_loss'], label='Val Loss')
        plt.title('Loss')
        plt.legend()

        plt.subplot(1, 2, 2)
        plt.plot(self.history['accuracy'], label='Train Acc')
        plt.plot(self.history['val_accuracy'], label='Val Acc')
        plt.plot(self.history['f1_score'], label='Train F1')
        plt.plot(self.history['val_f1_score'], label='Val F1')
        plt.title('Accuracy & F1-Score')
        plt.legend()

        plt.tight_layout()
        plt.savefig(os.path.join(self.save_dir, "training_curves.png"))
        plt.close(fig)


In [6]:
import torch
import numpy as np
import os
import matplotlib.pyplot as plt
from torch.utils.data import DataLoader, TensorDataset
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay

class Tester:
    def __init__(self, model, output_dir, model_name):
        self.model = model.to("cuda" if torch.cuda.is_available() else "cpu")
        self.model.eval()
        self.device = "cuda" if torch.cuda.is_available() else "cpu"

        self.output_dir = os.path.join(output_dir, "model_test")
        os.makedirs(self.output_dir, exist_ok=True)
        self.prefix = model_name.lower()

    def evaluate(self, dataloader, prefix_suffix=""):
        y_true, y_pred = [], []
        with torch.no_grad():
            for X_batch, Y_batch in dataloader:
                X_batch = X_batch.to(self.device)
                outputs = self.model(X_batch)
                preds = torch.argmax(outputs, dim=1)
                y_pred.extend(preds.cpu().numpy())
                y_true.extend(Y_batch.numpy())

        y_true, y_pred = np.array(y_true), np.array(y_pred)
        acc = np.mean(y_true == y_pred)
        cm = confusion_matrix(y_true, y_pred)

        prefix = f"{prefix_suffix}_{self.prefix}" if prefix_suffix else self.prefix
        with open(os.path.join(self.output_dir, f"{prefix}_accuracy.txt"), "w") as f:
            f.write(f"Accuracy: {acc:.4f}\n")

        np.save(os.path.join(self.output_dir, f"{prefix}_predictions.npy"), y_pred)
        np.save(os.path.join(self.output_dir, f"{prefix}_ground_truth.npy"), y_true)

        disp = ConfusionMatrixDisplay(cm)
        disp.plot()
        plt.title(f"Confusion Matrix - {prefix}")
        plt.savefig(os.path.join(self.output_dir, f"{prefix}_confusion_matrix.png"))
        plt.close()

        return acc, cm


In [7]:
import torch
from brevitas.export import export_qonnx

class QONNXExporter:
    def __init__(self, model, model_name, input_shape, export_path):
        self.model = model
        self.model_name = model_name
        self.input_shape = input_shape
        self.export_path = export_path
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    def export(self):
        self.model.to(self.device)
        self.model.eval()
        dummy_input = torch.randn(self.input_shape).to(self.device)

        export_qonnx(self.model, dummy_input, self.export_path)
        print(f"QONNX model exported to: {self.export_path}\n")


In [8]:
import os

# --- CONFIGURATION SECTION ---
dataset_path = "drive/MyDrive/HPPS_Nico/HPPS/Project/ModelClassification/Dataset/preprocessed_binary_dataset.npz"
strange_img_dir = "drive/MyDrive/HPPS_Nico/HPPS/Project/ModelClassification/Dataset/strange_images/Immagini"
strange_label_csv = "drive/MyDrive/HPPS_Nico/HPPS/Project/ModelClassification/Dataset/strange_images_classes.csv"

batch_size = 32
epochs = 300
learning_rate = 0.001
input_size = 128

export_dir = f"drive/MyDrive/HPPS_Nico/HPPS/Project/ModelClassification/Weights/Final/Size{input_size}"
if InOutQuant:
  model_name = f"standardModel2Class_binaryDataset_ReluFixed_size{input_size}_InOutQuant_x{x}_weightBitW{bit_width}"
else:
  model_name = f"standardModel2Class_binaryDataset_ReluFixed_size{input_size}_NOInOutQuant_x{x}_weightBitW{bit_width}"

# -----------------------------

def main():
    input_shape = (1, input_size, input_size)
    qonnx_input_shape = (1, 1, input_size, input_size)

    save_dir = os.path.join(export_dir, model_name)
    os.makedirs(save_dir, exist_ok=True)

    # --- Preprocessing ---
    prep = Preprocessing(input_size=(input_size, input_size))

    X_train, Y_train, X_val, Y_val, X_test, Y_test = prep.load_dataset(dataset_path)
    X_strange, Y_strange = prep.load_strange_dataset(strange_img_dir, strange_label_csv)
    print(f"X_strange: {X_strange.shape}, Y_strange: {Y_strange.shape}")
    print(f"X_test: {X_test.shape}, Y_test: {Y_test.shape}")

    train_loader, val_loader = prep.train_val_dataloaders(X_train, Y_train, X_val, Y_val, batch_size=batch_size)
    test_loader = prep.test_dataloader(X_test, Y_test, batch_size=batch_size, one_hot_labels=True)
    strange_loader = prep.test_dataloader(X_strange, Y_strange, batch_size=batch_size, one_hot_labels=True)

    # --- Model ---
    model = StandardModel(input_shape=input_shape)

    # --- Training ---
    trainer = Trainer(model, train_loader, val_loader,
                      output_dir=save_dir,
                      model_name=model_name,
                      input_shape=input_shape,
                      num_epochs=epochs,
                      learning_rate=learning_rate)

    trainer.train()

    # --- Testing ---
    tester = Tester(model=model, output_dir=save_dir, model_name=model_name)
    tester.evaluate(test_loader, prefix_suffix="test")
    tester.evaluate(strange_loader, prefix_suffix="strange")

    # --- Export QONNX ---
    onnx_model_path = os.path.join(save_dir, f"qonnx_{model_name}.onnx")
    exporter = QONNXExporter(model, model_name=model_name, input_shape=qonnx_input_shape, export_path=onnx_model_path)
    exporter.export()

    # --- FINN Build ---
    #builder = Builder(onnx_model_path, save_dir, input_shape=qonnx_input_shape, model_name=model_name)
    #builder.build()

if __name__ == "__main__":
    main()


Loading dataset from: drive/MyDrive/HPPS_Nico/HPPS/Project/ModelClassification/Dataset/preprocessed_binary_dataset.npz

Strange dataset loaded: (34, 128, 128, 1), Labels: (34, 6)
X_strange: (34, 128, 128, 1), Y_strange: (34, 6)
X_test: (862, 128, 128, 1), Y_test: (862, 6)


  return super().rename(names)


QONNX model exported to: drive/MyDrive/HPPS_Nico/HPPS/Project/ModelClassification/Weights/Final/Size128/standardModel2Class_binaryDataset_ReluFixed_size128_InOutQuant_x2_weightBitW8/qonnx_standardModel2Class_binaryDataset_ReluFixed_size128_InOutQuant_x2_weightBitW8.onnx

