***TITLE: FN-NET LIGHTWEIGHT CNN MODEL FOR FABRIC DEFECT DETECTION***

***IMPORT LIBRARIES***

In [1]:
# %%


import os
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv2D, MaxPooling2D, Flatten, Dense, Dropout
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau, ModelCheckpoint
from sklearn.model_selection import StratifiedKFold 

2025-05-04 16:44:48.650311: I tensorflow/core/util/port.cc:153] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation orders. To turn them off, set the environment variable `TF_ENABLE_ONEDNN_OPTS=0`.
2025-05-04 16:44:48.659411: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:477] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
E0000 00:00:1746377088.670715    1869 cuda_dnn.cc:8310] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
E0000 00:00:1746377088.674785    1869 cuda_blas.cc:1418] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
2025-05-04 16:44:48.685836: I tensorflow/core/platform/cpu_feature_guard.cc:210] This TensorFlow binary is optimized to use available CPU instr

In [2]:
import matplotlib.pyplot as plt
import cv2
import shutil
import numpy as np


import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split

***DATA PREPROCESSING***

**1)** **DATASET IMPORTING**

In [3]:
from tensorflow.keras.utils import Sequence
from glob import glob
import cv2
import numpy as np
import os

class CustomDataGenerator(Sequence):
    def __init__(self, image_paths, labels, batch_size, img_size, num_classes, shuffle=True, class_indices=None):
        self.image_paths = image_paths
        self.labels = labels
        self.classes = np.array(labels)
        self.batch_size = batch_size
        self.img_size = img_size
        self.num_classes = num_classes
        self.shuffle = shuffle
        self.class_indices = class_indices or self._build_class_indices(labels)
        self.on_epoch_end()
        self.current_index=0
        
    def __iter__(self):
        """Initialize the iterator."""
        self.current_index = 0
        return self

    def __next__(self):
        """Get the next batch."""
        if self.current_index >= len(self):
            # End of epoch
            self.on_epoch_end()
            self.current_index = 0
            raise StopIteration
        
        batch = self.__getitem__(self.current_index)
        self.current_index += 1
        return batch
        
    def _build_class_indices(self, labels):
        unique_labels = sorted(set(labels))
        return {str(lbl): lbl for lbl in unique_labels}

    def __len__(self):
        return int(np.ceil(len(self.image_paths) / self.batch_size))

    def __getitem__(self, idx):
        batch_paths = self.image_paths[idx * self.batch_size:(idx + 1) * self.batch_size]
        batch_labels = self.labels[idx * self.batch_size:(idx + 1) * self.batch_size]

        batch_images = []
        for path in batch_paths:
            img = cv2.imread(path, cv2.IMREAD_GRAYSCALE)
            img = cv2.resize(img, self.img_size)
            img = img.astype(np.float32) / 255.0

            # Sobel edge detection
            sobelx = cv2.Sobel(img, cv2.CV_32F, 1, 0, ksize=3)
            sobely = cv2.Sobel(img, cv2.CV_32F, 0, 1, ksize=3)

            # Normalize edges to 0-1
            sobelx = cv2.normalize(sobelx, None, 0, 1, cv2.NORM_MINMAX)
            sobely = cv2.normalize(sobely, None, 0, 1, cv2.NORM_MINMAX)

            #stacked = np.stack([img,  sobelx, sobely], axis=-1)
            stacked =np.stack([img], axis=-1)
            batch_images.append(stacked)

        batch_images = np.array(batch_images)
        batch_labels = tf.keras.utils.to_categorical(batch_labels, self.num_classes)

        return batch_images, batch_labels

    def on_epoch_end(self):
        if self.shuffle:
            combined = list(zip(self.image_paths, self.labels))
            np.random.shuffle(combined)
            self.image_paths, self.labels = map(list, zip(*combined))  # Convert back to lists
            self.classes = np.array(self.labels)  # Update classes array as well


In [4]:
# %%
train_dir = "/mnt/c/newTrain/Train"

test_dir = "/mnt/c/newTrain/Test"

In [5]:
from sklearn.preprocessing import LabelEncoder

def get_filepaths_and_labels(directory):
    class_names = sorted(os.listdir(directory))
    filepaths = []
    labels = []

    for idx, class_name in enumerate(class_names):
        class_dir = os.path.join(directory, class_name)
        if not os.path.isdir(class_dir):
            continue
        for fname in glob(os.path.join(class_dir, "*")):
            filepaths.append(fname)
            labels.append(class_name)

    label_encoder = LabelEncoder()
    encoded_labels = label_encoder.fit_transform(labels)
    class_indices = dict(zip(label_encoder.classes_, label_encoder.transform(label_encoder.classes_)))
    return filepaths, encoded_labels, len(label_encoder.classes_), label_encoder,class_indices
# Image parameters
IMG_SIZE = (256,256 )  # Image size
BATCH_SIZE = 40       # Number of images in each batch
# Train

train_filepaths, train_labels, NUM_CLASSES, label_encoder,class_indices = get_filepaths_and_labels(train_dir)
test_filepaths, test_labels, _, _,_ = get_filepaths_and_labels(test_dir)

# Combine for full dataset cross-validation
all_filepaths = np.array(train_filepaths + test_filepaths)
all_labels = np.array(train_labels.tolist() + test_labels.tolist())

train_generator = CustomDataGenerator(train_filepaths, train_labels, BATCH_SIZE, IMG_SIZE, NUM_CLASSES,class_indices=class_indices)
test_generator = CustomDataGenerator(test_filepaths, test_labels, BATCH_SIZE, IMG_SIZE, NUM_CLASSES, shuffle=False,class_indices=class_indices)


**2) DATASET STATISTICS AND VISUALIZATION**

In [6]:
# %%
train_set= train_generator
test_set= test_generator
print("Class Indices:", train_set.class_indices)
print("Number of Classes:", train_set.num_classes)
print("Number of Classes:", test_set.num_classes)
from collections import Counter
print(Counter(train_set.classes))
print(Counter(test_set.classes))
print("Length of traingen:",len(train_generator))

Class Indices: {'No_Defect': 0, 'line': 1, 'stain': 2, 'tear': 3}
Number of Classes: 4
Number of Classes: 4
Counter({3: 1065, 0: 1053, 1: 1014, 2: 475})
Counter({0: 350, 1: 349, 3: 347, 2: 158})
Length of traingen: 91


In [None]:
# Map index to class name
from collections import defaultdict
index_to_class = {v: k for k, v in train_generator.class_indices.items()}

# Create a new dictionary to store paths by actual class folder
class_to_images = defaultdict(list)

# Collect images directly from class folders to ensure correct visualization
for class_name, class_idx in train_generator.class_indices.items():
    class_dir = os.path.join(train_dir, class_name)
    image_paths = glob(os.path.join(class_dir, "*"))[:10]  # Get first 10 images
    
    print(f"Class: {class_name} ({len(image_paths)} images sampled)")
    
    plt.figure(figsize=(20, 5))
    for i, path in enumerate(image_paths):
        img = cv2.imread(path, cv2.IMREAD_GRAYSCALE)
        img = cv2.resize(img, train_generator.img_size)
        
        plt.subplot(2, 5, i + 1)
        plt.imshow(img, cmap='gray')
        plt.title(f"{class_name}")
        plt.axis('off')
    
    plt.tight_layout()
    plt.show()

In [None]:
import os
import cv2
import matplotlib.pyplot as plt
import numpy as np
from collections import defaultdict
from glob import glob
import random

# Reverse mapping from index to class name
index_to_class = {v: k for k, v in train_generator.class_indices.items()}

# Store image paths by class
class_to_images = defaultdict(list)

for class_name, class_idx in train_generator.class_indices.items():
    class_dir = os.path.join(train_dir, class_name)
    all_images = glob(os.path.join(class_dir, "*"))
    
    # Randomly sample 100 images
    image_paths = random.sample(all_images, min(100, len(all_images)))
    class_to_images[class_name] = image_paths
    
    print(f"Class: {class_name} ({len(image_paths)} images sampled)")

    # Show all 100 images in a 10x10 grid
    plt.figure(figsize=(20, 20))  # Adjust size for clarity

    for i, path in enumerate(image_paths):
        img = cv2.imread(path, cv2.IMREAD_GRAYSCALE)
        img = cv2.resize(img, train_generator.img_size)
        
        plt.subplot(10, 10, i + 1)
        plt.imshow(img, cmap='gray')
        plt.title(f"{class_name}", fontsize=6)
        plt.axis('off')
    
    plt.tight_layout()
    plt.show()


In [None]:
import matplotlib.pyplot as plt
import cv2
import numpy as np
import os
from glob import glob

# Map index to class name
index_to_class = {v: k for k, v in train_generator.class_indices.items()}

# Function to apply the same preprocessing as in your generator
def preprocess_image(img_path):
    img = cv2.imread(img_path, cv2.IMREAD_GRAYSCALE)
    img = cv2.resize(img, train_generator.img_size)
    img = img.astype(np.float32) / 255.0
    
    # Sobel edge detection
    sobelx = cv2.Sobel(img, cv2.CV_32F, 1, 0, ksize=3)
    sobely = cv2.Sobel(img, cv2.CV_32F, 0, 1, ksize=3)
    
    # Normalize edges to 0-1
    sobelx = cv2.normalize(sobelx, None, 0, 1, cv2.NORM_MINMAX)
    sobely = cv2.normalize(sobely, None, 0, 1, cv2.NORM_MINMAX)
    
    return img, sobelx, sobely

# Number of images to display per class
num_images = 4

# Loop through each class
for class_name, class_idx in train_generator.class_indices.items():
    class_dir = os.path.join(train_dir, class_name)
    image_paths = glob(os.path.join(class_dir, "*"))[:num_images]  # Get first 4 images
    
    print(f"Class: {class_name} ({len(image_paths)} images)")
    
    # Create a figure with 3 rows (original, sobelx, sobely) and num_images columns
    plt.figure(figsize=(16, 12))
    
    for i, path in enumerate(image_paths):
        img, sobelx, sobely = preprocess_image(path)
        
        # Plot original grayscale image
        plt.subplot(3, num_images, i + 1)
        plt.imshow(img, cmap='gray')
        if i == 0:
            plt.ylabel('Original', fontsize=14)
        plt.title(f"{class_name} - {i+1}")
        plt.axis('off')
        
        # Plot Sobel X
        
        plt.subplot(3, num_images, i + 1 + num_images)
        plt.imshow(sobelx, cmap='gray')
        if i == 0:
            plt.ylabel('Sobel X', fontsize=14)
        plt.title(f"Sobel X-{class_name}-{i+1}")
        plt.axis('off')
        
        # Plot Sobel Y
        
        plt.subplot(3, num_images, i + 1 + 2*num_images)
        plt.imshow(sobely, cmap='gray')
        if i == 0:
            plt.ylabel('Sobel Y', fontsize=14)
        plt.title(f"Sobel Y-{class_name}-{i+1}")
        plt.axis('off')
    
    plt.tight_layout()
    plt.show()

***MODEL CREATION***

In [15]:
# Parameters
INPUT_SHAPE = (256, 256, 1)  # Input size of images
NUM_CLASSES =  train_generator.num_classes             # many class
LEARNING_RATE = 0.001
EPOCHS = 100                 # Increased epochs
BATCH_SIZE = 40

In [16]:
# %%

# Step 2: Define CNN Model

def create_cnn_model(input_shape=(128, 128, 1), num_classes=NUM_CLASSES):
    """
    Creates the FN-Net model as described in the paper.
    
    Args:
        input_shape: Tuple of (height, width, channels). Default is (128, 128, 1) for grayscale
        num_classes: Number of output classes. Default is 2 for binary classification
        
    Returns:
        Keras Sequential model
    """
    model = Sequential([
        # First Convolutional Block
        Conv2D(16, (3, 3), padding='same', activation='relu', input_shape=input_shape),
        MaxPooling2D(pool_size=(3, 3), strides=3),
        
        # Second Convolutional Block
        Conv2D(32, (3, 3), padding='same', activation='relu'),
        MaxPooling2D(pool_size=(3, 3), strides=3),
        
        # Third Convolutional Block
        Conv2D(64, (3, 3), padding='same', activation='relu'),
        MaxPooling2D(pool_size=(2, 2), strides=2),
        
        # Fourth Convolutional Block
        Conv2D(96, (3, 3), padding='same', activation='relu'),
        MaxPooling2D(pool_size=(2, 2), strides=2),
        
        # Flatten layer
        Flatten(),
        
        # First Dense Layer with Dropout
        Dense(512, activation='relu'),
        Dropout(0.5),
        
        # Second Dense Layer
        Dense(256, activation='relu'),
        
        # Output Layer
        Dense(num_classes, activation='softmax')
    ])
    
    return model


# %%

**QUANTIZATION AWARE TRAINING (QAT)**

In [None]:
import tensorflow as tf

# Define quantization functions
def fake_quantize(x, bits=8, min_value=None, max_value=None):
    # Your existing implementation...
    # (No changes needed here)
    if min_value is None:
        min_value = tf.reduce_min(x)
    if max_value is None:
        max_value = tf.reduce_max(x)
    
    # Ensure min doesn't equal max to prevent division by zero
    max_value = tf.maximum(max_value, min_value + 1e-6)
    
    # Calculate the step size (the value of 1 bit)
    step = (max_value - min_value) / (2**bits - 1)
    
    # Quantize the values
    x_int = tf.round((x - min_value) / step)
    
    # Clip values to the quantization range
    x_int = tf.clip_by_value(x_int, 0, 2**bits - 1)
    
    # Convert back to original range
    x_q = x_int * step + min_value
    
    # During training, pass through the gradients using STE (Straight Through Estimator)
    return x + tf.stop_gradient(x_q - x)

@tf.keras.utils.register_keras_serializable()
class QuantizedConv2D(tf.keras.layers.Layer):
    # Your existing implementation...
    def __init__(self, filters, kernel_size, padding='same', strides=1, activation=None, weight_bits=8, activation_bits=8, **kwargs):
        super(QuantizedConv2D, self).__init__(**kwargs)
        # self.filters = filters
        # self.kernel_size = tuple(kernel_size if isinstance(kernel_size, tuple) else (kernel_size, kernel_size))
        # self.padding = padding
        # self.strides = strides if isinstance(strides, tuple) else (strides, strides)
        # self.activation_fn = tf.keras.activations.get(activation)
        # self.weight_bits = weight_bits
        # self.activation_bits = activation_bits
        self.filters = int(filters)
        # Ensure kernel_size is stored as a tuple of integers
        if isinstance(kernel_size, int):
            self.kernel_size = (int(kernel_size), int(kernel_size))
        else:
            self.kernel_size = (int(kernel_size[0]), int(kernel_size[1]))
        self.padding = padding
        # Ensure strides is stored as a tuple of integers
        if isinstance(strides, int):
            self.strides = (int(strides), int(strides))
        else:
            self.strides = (int(strides[0]), int(strides[1]))
        self.activation_fn = tf.keras.activations.get(activation)
        self.weight_bits = int(weight_bits)
        self.activation_bits = int(activation_bits)
        
    def build(self, input_shape):
        # Your existing build method...
        #input_channels = input_shape[-1]
        input_channels = int(input_shape[-1])
        
        #kernel_shape = self.kernel_size + (input_channels, self.filters)
        kernel_size = tuple(self.kernel_size)
        kernel_shape = kernel_shape = (
            int(self.kernel_size[0]),
            int(self.kernel_size[1]),
            int(input_channels),
            int(self.filters)
            )
        self.kernel = self.add_weight(
            name='kernel',
            shape=kernel_shape,
            initializer='glorot_uniform',
            trainable=True
        )
        
        self.bias = self.add_weight(
            name='bias',
            shape=(self.filters,),
            initializer='zeros',
            trainable=True
        )
        
        # Track min and max values for weights (needed for quantization)
        self.w_min = self.add_weight(
            name='w_min',
            shape=(1,),
            initializer=tf.constant_initializer(-1.0),
            trainable=False
        )
        
        self.w_max = self.add_weight(
            name='w_max',
            shape=(1,),
            initializer=tf.constant_initializer(1.0),
            trainable=False
        )
        
        # Track min and max values for activations
        self.a_min = self.add_weight(
            name='a_min',
            shape=(1,),
            initializer=tf.constant_initializer(0.0),
            trainable=False
        )
        
        self.a_max = self.add_weight(
            name='a_max',
            shape=(1,),
            initializer=tf.constant_initializer(6.0),  # ReLU typically has max around 6
            trainable=False
        )
        
        self.built = True
    
    def call(self, inputs, training=None):
        # Your existing call method...
        # Update min/max tracking during training
        if training:
            curr_w_min = tf.reduce_min(self.kernel)
            curr_w_max = tf.reduce_max(self.kernel)
            
            # Use EMA (Exponential Moving Average) to update min/max values
            momentum = 0.9
            self.w_min.assign(momentum * self.w_min + (1 - momentum) * curr_w_min)
            self.w_max.assign(momentum * self.w_max + (1 - momentum) * curr_w_max)
        
        # Quantize weights
        quantized_kernel = fake_quantize(
            self.kernel, 
            bits=self.weight_bits,
            min_value=self.w_min,
            max_value=self.w_max
        )
        
        # Standard convolution with quantized weights
        outputs = tf.nn.conv2d(
            inputs,
            quantized_kernel,
            strides=[1, self.strides[0], self.strides[1], 1],
            padding=self.padding.upper()
        )
        
        outputs = tf.nn.bias_add(outputs, self.bias)
        
        # Apply activation if specified
        if self.activation_fn is not None:
            outputs = self.activation_fn(outputs)
            
            # Update activation min/max during training
            if training:
                curr_a_min = tf.reduce_min(outputs)
                curr_a_max = tf.reduce_max(outputs)
                
                # Use EMA to update min/max values
                momentum = 0.9
                self.a_min.assign(momentum * self.a_min + (1 - momentum) * curr_a_min)
                self.a_max.assign(momentum * self.a_max + (1 - momentum) * curr_a_max)
            
            # Quantize activations
            outputs = fake_quantize(
                outputs,
                bits=self.activation_bits,
                min_value=self.a_min,
                max_value=self.a_max
            )
        
        return outputs
    
    # def get_config(self):
    #     config = super(QuantizedConv2D, self).get_config()
    #     config.update({
    #         'filters': self.filters,
    #         'kernel_size': self.kernel_size,
    #         'padding': self.padding,
    #         'strides': self.strides,
    #         'activation': tf.keras.activations.serialize(self.activation_fn),
    #         'weight_bits': self.weight_bits,
    #         'activation_bits': self.activation_bits
    #     })
    #     return config

    def get_config(self):
        config = super(QuantizedConv2D, self).get_config()
        config.update({
            'filters': int(self.filters),  # Ensure filters is serialized as int
            'kernel_size': (int(self.kernel_size[0]), int(self.kernel_size[1])),  # Ensure tuple of ints
            'padding': self.padding,
            'strides': (int(self.strides[0]), int(self.strides[1])),  # Ensure tuple of ints
            'activation': tf.keras.activations.serialize(self.activation_fn),
            'weight_bits': int(self.weight_bits),
            'activation_bits': int(self.activation_bits)
        })
        return config

@tf.keras.utils.register_keras_serializable()
class QuantizedDense(tf.keras.layers.Layer):
    # Your existing implementation...
    def __init__(self, units, activation=None, weight_bits=8, activation_bits=8, **kwargs):
        super(QuantizedDense, self).__init__(**kwargs)
        self.units = units
        self.activation_fn = tf.keras.activations.get(activation)
        self.weight_bits = weight_bits
        self.activation_bits = activation_bits
        
    def build(self, input_shape):
        input_dim = input_shape[-1]
        
        self.kernel = self.add_weight(
            name='kernel',
            shape=(input_dim, self.units),
            initializer='glorot_uniform',
            trainable=True
        )
        
        self.bias = self.add_weight(
            name='bias',
            shape=(self.units,),
            initializer='zeros',
            trainable=True
        )
        
        # Track min and max values for weights
        self.w_min = self.add_weight(
            name='w_min',
            shape=(1,),
            initializer=tf.constant_initializer(-1.0),
            trainable=False
        )
        
        self.w_max = self.add_weight(
            name='w_max',
            shape=(1,),
            initializer=tf.constant_initializer(1.0),
            trainable=False
        )
        
        # Track min and max values for activations
        self.a_min = self.add_weight(
            name='a_min',
            shape=(1,),
            initializer=tf.constant_initializer(0.0),
            trainable=False
        )
        
        self.a_max = self.add_weight(
            name='a_max',
            shape=(1,),
            initializer=tf.constant_initializer(6.0),
            trainable=False
        )
        
        self.built = True
    
    def call(self, inputs, training=None):
        # Your existing call method...
        # Update min/max tracking during training
        if training:
            curr_w_min = tf.reduce_min(self.kernel)
            curr_w_max = tf.reduce_max(self.kernel)
            
            # Use EMA to update min/max values
            momentum = 0.9
            self.w_min.assign(momentum * self.w_min + (1 - momentum) * curr_w_min)
            self.w_max.assign(momentum * self.w_max + (1 - momentum) * curr_w_max)
        
        # Quantize weights
        quantized_kernel = fake_quantize(
            self.kernel,
            bits=self.weight_bits,
            min_value=self.w_min,
            max_value=self.w_max
        )
        
        # Standard dense operation with quantized weights
        outputs = tf.matmul(inputs, quantized_kernel)
        outputs = tf.nn.bias_add(outputs, self.bias)
        
        # Apply activation if specified
        if self.activation_fn is not None:
            outputs = self.activation_fn(outputs)
            
            # Update activation min/max during training
            if training:
                curr_a_min = tf.reduce_min(outputs)
                curr_a_max = tf.reduce_max(outputs)
                
                # Use EMA to update min/max values
                momentum = 0.9
                self.a_min.assign(momentum * self.a_min + (1 - momentum) * curr_a_min)
                self.a_max.assign(momentum * self.a_max + (1 - momentum) * curr_a_max)
            
            # Quantize activations
            outputs = fake_quantize(
                outputs,
                bits=self.activation_bits,
                min_value=self.a_min,
                max_value=self.a_max
            )
        
        return outputs
    
    def get_config(self):
        config = super(QuantizedDense, self).get_config()
        config.update({
            'units': self.units,
            'activation': tf.keras.activations.serialize(self.activation_fn),
            'weight_bits': self.weight_bits,
            'activation_bits': self.activation_bits
        })
        return config
# Now redefine your model creation function using quantized layers
def create_quantized_cnn_model(input_shape=(128, 128, 1), num_classes=10, weight_bits=8, activation_bits=8):
    """
    Creates a quantization-aware version of the model
    """
    inputs = tf.keras.Input(shape=input_shape)
    
    # First Convolutional Block
    x = QuantizedConv2D(16, (3, 3), padding='same', activation='relu', 
                        weight_bits=weight_bits, activation_bits=activation_bits)(inputs)
    x = tf.keras.layers.MaxPooling2D(pool_size=(6, 6), strides=6)(x)
    
    # Second Convolutional Block
    x = QuantizedConv2D(32, (3, 3), padding='same', activation='relu',
                        weight_bits=weight_bits, activation_bits=activation_bits)(x)
    x = tf.keras.layers.MaxPooling2D(pool_size=(3, 3), strides=3)(x)
    
    # Third Convolutional Block
    x = QuantizedConv2D(64, (3, 3), padding='same', activation='relu',
                        weight_bits=weight_bits, activation_bits=activation_bits)(x)
    x = tf.keras.layers.MaxPooling2D(pool_size=(2, 2), strides=2)(x)
    
    # Fourth Convolutional Block
    x = QuantizedConv2D(96, (3, 3), padding='same', activation='relu',
                        weight_bits=weight_bits, activation_bits=activation_bits)(x)
    x = tf.keras.layers.MaxPooling2D(pool_size=(2, 2), strides=2)(x)
    
    # Flatten layer
    x = tf.keras.layers.GlobalAveragePooling2D()(x)
    
    # First Dense Layer with Dropout
    x = QuantizedDense(512, activation='relu',
                      weight_bits=weight_bits, activation_bits=activation_bits)(x)
    x = tf.keras.layers.Dropout(0.5)(x)
    
    # Second Dense Layer
    x = QuantizedDense(256, activation='relu',
                      weight_bits=weight_bits, activation_bits=activation_bits)(x)
    
    # Output Layer - we typically don't quantize the final layer
    outputs = tf.keras.layers.Dense(num_classes, activation='softmax')(x)
    
    model = tf.keras.Model(inputs=inputs, outputs=outputs)
    return model

# Then load the model with custom_objects
custom_objects = {
    'QuantizedConv2D': QuantizedConv2D,
    'QuantizedDense': QuantizedDense
}

# Load the model
#cnn = tf.keras.models.load_model('GreyColor3_Quant.keras', custom_objects=custom_objects)

**OPTIMIZERS AND CALLBACK DEFINITIONS**

In [27]:
# Create the model
model = create_quantized_cnn_model(INPUT_SHAPE, NUM_CLASSES, weight_bits=8, activation_bits=8)

# Compile the model
model.compile(
    optimizer=Adam(learning_rate=LEARNING_RATE),
    loss='categorical_crossentropy',  # Multi-class classification
    metrics=['accuracy']
)

# Display model summary
model.summary()

In [23]:
# %%
from tensorflow.keras.callbacks import Callback, EarlyStopping

class StopAtAccuracy(Callback):
    def __init__(self, target_acc=0.85):
        super(StopAtAccuracy, self).__init__()
        self.target_acc = target_acc

    def on_epoch_end(self, epoch, logs=None):
        if logs is None:
            return
        if logs.get("val_accuracy") >= self.target_acc:  # Stop when val_accuracy reaches target
            print(f"\nStopping training: Reached {self.target_acc * 100:.1f}% validation accuracy")
            self.model.stop_training = True


# %%

In [24]:
# Step 3: Callbacks
# Early stopping to prevent overfitting
early_stopping = EarlyStopping(
    monitor='val_loss', patience=9, restore_best_weights=True, verbose=1
)
stop_at_100 = StopAtAccuracy(target_acc=1)
# Learning rate scheduler to reduce LR when validation loss plateaus
lr_scheduler = ReduceLROnPlateau(
    monitor='val_loss', factor=0.1, patience=3, min_lr=1e-6, verbose=1
)

# Model checkpoint to save the best model
#checkpoint = ModelCheckpoint('/mnt/c/modelFiles/GreyColor3_Quant_kfold_old_learn.keras', monitor='val_loss', save_best_only=True, verbose=1)

***MODEL TRAINING***

In [25]:
skf = StratifiedKFold(n_splits=10, shuffle=True, random_state=42)
history_list = []
i = 0
for fold, (train_idx, val_idx) in enumerate(skf.split(all_filepaths, all_labels)):
    print(f"\n[INFO] Fold {fold + 1}")

    X_train, y_train = all_filepaths[train_idx], all_labels[train_idx]
    X_val, y_val = all_filepaths[val_idx], all_labels[val_idx]

    # Create data generators for this fold
    train_generator = CustomDataGenerator(X_train, y_train, BATCH_SIZE, IMG_SIZE, NUM_CLASSES, shuffle=True, class_indices=class_indices)
    val_generator = CustomDataGenerator(X_val, y_val, BATCH_SIZE, IMG_SIZE, NUM_CLASSES, shuffle=False, class_indices=class_indices)

    # Initialize model (make sure create_model is defined)
    model = create_quantized_cnn_model(INPUT_SHAPE, NUM_CLASSES, weight_bits=8, activation_bits=8)

    model.compile(optimizer='adam',
                  loss='categorical_crossentropy',
                  metrics=['accuracy'])
    
    with tf.device('/GPU:0'):
        history = model.fit(
            train_generator,
            validation_data=test_generator,
            epochs=EPOCHS,
            callbacks=[early_stopping, lr_scheduler, ModelCheckpoint(f'/mnt/c/modelFiles/10splits/GreyColor3_Quant_kfold_old_learn_{fold}.keras', monitor='val_loss', save_best_only=True, verbose=1), stop_at_100],
            verbose=1
        )
    model.save(f"/mnt/c/modelFiles/10splits/greyColor_quantized_kfold_model_{fold}.keras")
    if i == 0:
        model.summary()
        i += 1
    history_list.append(history)
    model.save(f"/mnt/c/modelFiles/10splits/qat_kfold_model_{fold}.h5")  # Save in HDF5 format
    print("model saved")
    # Reload it (optional, just to make sure conversion works from disk)
    modelh5 = tf.keras.models.load_model(f"/mnt/c/modelFiles/10splits/qat_kfold_model_{fold}.h5", custom_objects=custom_objects)
    print("model loaded")
    # Convert to TensorFlow Lite
    converter = tf.lite.TFLiteConverter.from_keras_model(modelh5)
    print("converter set")
    # Enable full integer quantization
    converter.optimizations = [tf.lite.Optimize.DEFAULT]
    print("optimizations initialized")
    # Define a representative dataset function for calibration
    def representative_dataset_gen():
        for i in range(100):  # Just use first 100 images for calibration
            img, _ = train_generator[i]
            for x in img:
                yield [np.expand_dims(x, axis=0).astype(np.float32)]


    converter.representative_dataset = representative_dataset_gen
    print("representative dataset initialized")
    # Ensure all tensors are int8
    converter.target_spec.supported_ops = [tf.lite.OpsSet.TFLITE_BUILTINS_INT8]
    converter.inference_input_type = tf.int8
    converter.inference_output_type = tf.int8
    print("type bit converstions done")
    # Perform conversion
    quantized_tflite_model = converter.convert()
    print("quantized model obtained")
    # Save the quantized model to disk
    with open(f"/mnt/c/modelFiles/10splits/quantized_kfold_model_{fold}.tflite", "wb") as f:
        f.write(quantized_tflite_model)
    print("tfmodel saved") #tflite *

    # Optionally save the model
    # model.save(f'model_fold_{fold + 1}.h5')



[INFO] Fold 1


  self._warn_if_super_not_called()


Epoch 1/100








[1m 75/109[0m [32m━━━━━━━━━━━━━[0m[37m━━━━━━━[0m [1m31s[0m 927ms/step - accuracy: 0.3737 - loss: 1.2387





[1m109/109[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 931ms/step - accuracy: 0.3992 - loss: 1.1891
Epoch 1: val_loss improved from inf to 0.97245, saving model to /mnt/c/modelFiles/10splits/GreyColor3_Quant_kfold_old_learn_0.keras
[1m109/109[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m142s[0m 1s/step - accuracy: 0.3999 - loss: 1.1878 - val_accuracy: 0.5457 - val_loss: 0.9725 - learning_rate: 0.0010
Epoch 2/100
[1m109/109[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 1s/step - accuracy: 0.5762 - loss: 0.8701
Epoch 2: val_loss improved from 0.97245 to 0.69531, saving model to /mnt/c/modelFiles/10splits/GreyColor3_Quant_kfold_old_learn_0.keras
[1m109/109[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m146s[0m 1s/step - accuracy: 0.5764 - loss: 0.8697 - val_accuracy: 0.6611 - val_loss: 0.6953 - learning_rate: 0.0010
Epoch 3/100
[1m 92/109[0m [32m━━━━━━━━━━━━━━━━[0m[37m━━━━[0m [1m12s[0m 728ms/step - accuracy: 0.6459 - loss: 0.7245

KeyboardInterrupt: 

**MODEL PERFORMANCE**

In [None]:
import matplotlib.pyplot as plt
import numpy as np

# Assuming you have a list of history objects called history_list
# Each history object contains the training history from one run

def plot_twenty_individual_graphs(history_list):
    """
    Creates 10 separate plots:
    - 5 plots for accuracy vs validation accuracy (one for each run)
    - 5 plots for loss vs validation loss (one for each run)
    """
    # Verify we have 5 history objects
    if len(history_list) != 10:
        print(f"Warning: Expected 10 history objects, but found {len(history_list)}")
    
    # Create figure for accuracy plots (top row)
    plt.figure(figsize=(20, 10))
    
    # Plot accuracy vs val_accuracy for each run
    for i, history in enumerate(history_list):
        # Get the history dictionary
        history_dict = history.history
        
        # Check for accuracy metric name (could be 'acc' or 'accuracy')
        acc_key = 'accuracy' if 'accuracy' in history_dict else 'acc'
        val_acc_key = 'val_accuracy' if 'val_accuracy' in history_dict else 'val_acc'
        
        # Get the number of epochs
        epochs = range(1, len(history_dict[acc_key]) + 1)
        
        # Create subplot for this run's accuracy
        plt.subplot(1, 5, i+1)
        plt.plot(epochs, history_dict[acc_key], 'b-', label='Training Accuracy')
        plt.plot(epochs, history_dict[val_acc_key], 'r-', label='Validation Accuracy')
        plt.title(f'Run {i+1} Accuracy')
        plt.xlabel('Epoch')
        plt.ylabel('Accuracy')
        plt.legend()
    
    plt.tight_layout()
    plt.savefig(f"/mnt/c/modelFiles/10splits/accuracy_plots.png")
    plt.show()
    print("Accuracy plots saved as 'accuracy_plots.png'")
    
    # Create figure for loss plots (bottom row)
    plt.figure(figsize=(20, 10))
    
    # Plot loss vs val_loss for each run
    for i, history in enumerate(history_list):
        # Get the history dictionary
        history_dict = history.history
        
        # Get the number of epochs
        epochs = range(1, len(history_dict['loss']) + 1)
        
        # Create subplot for this run's loss
        plt.subplot(1, 5, i+1)
        plt.plot(epochs, history_dict['loss'], 'b-', label='Training Loss')
        plt.plot(epochs, history_dict['val_loss'], 'r-', label='Validation Loss')
        plt.title(f'Run {i+1} Loss')
        plt.xlabel('Epoch')
        plt.ylabel('Loss')
        plt.legend()
    
    plt.tight_layout()
    plt.savefig(f"/mnt/c/modelFiles/10splits/loss_plots.png")
    plt.show()
    print("Loss plots saved as 'loss_plots.png'")

    # Optional: Print summary statistics for each run
    print("\nSummary Statistics:")
    for i, history in enumerate(history_list):
        history_dict = history.history
        acc_key = 'accuracy' if 'accuracy' in history_dict else 'acc'
        val_acc_key = 'val_accuracy' if 'val_accuracy' in history_dict else 'val_acc'
        
        final_train_acc = history_dict[acc_key][-1]
        final_val_acc = history_dict[val_acc_key][-1]
        final_train_loss = history_dict['loss'][-1]
        final_val_loss = history_dict['val_loss'][-1]
        
        print(f"Run {i+1}:")
        print(f"  Final training accuracy: {final_train_acc:.4f}")
        print(f"  Final validation accuracy: {final_val_acc:.4f}")
        print(f"  Final training loss: {final_train_loss:.4f}")
        print(f"  Final validation loss: {final_val_loss:.4f}")

# Call the function with your list of history objects
plot_twenty_individual_graphs(history_list)

**QUANTIZED MODEL EVALUATION**

Soft Voting Ensemble of 5 .tflite Models

In [None]:
import tensorflow as tf
import numpy as np
from sklearn.metrics import classification_report

# Paths to the 5 TFLite models
tflite_model_paths = [f"/mnt/c/modelFiles/try2_quantized_kfold_model_{i}.tflite" for i in range(5)]

# Load all interpreters
interpreters = []
input_details_list = []
output_details_list = []
quant_params_list = []

for path in tflite_model_paths:
    interpreter = tf.lite.Interpreter(model_path=path)
    interpreter.allocate_tensors()
    input_details = interpreter.get_input_details()
    output_details = interpreter.get_output_details()
    scale, zero_point = input_details[0]['quantization']

    interpreters.append(interpreter)
    input_details_list.append(input_details)
    output_details_list.append(output_details)
    quant_params_list.append((scale, zero_point))

# Ensemble prediction
y_true = []
y_pred_ensemble = []

print(f"\nRunning ensemble prediction using {len(interpreters)} TFLite models...\n")

for batch_images, batch_labels in test_generator:
    for img, label in zip(batch_images, batch_labels):
        img = np.expand_dims(img, axis=0).astype(np.float32)  # [1, H, W, C]
        
        # Collect predictions from all models
        preds = []
        for i in range(len(interpreters)):
            scale, zero_point = quant_params_list[i]
            img_int8 = (img / scale + zero_point).astype(np.int8)

            interpreters[i].set_tensor(input_details_list[i][0]['index'], img_int8)
            interpreters[i].invoke()
            output_data = interpreters[i].get_tensor(output_details_list[i][0]['index'])

            preds.append(output_data[0])  # shape: (NUM_CLASSES,)

        # Average predictions
        avg_pred = np.mean(preds, axis=0)
        pred_class = np.argmax(avg_pred)
        true_class = np.argmax(label)

        y_pred_ensemble.append(pred_class)
        y_true.append(true_class)

# Generate classification report
print("TFLite Ensemble Model Accuracy:", np.mean(np.array(y_true) == np.array(y_pred_ensemble)))
print("\nClassification Report (Soft-Voted Ensemble):")
target_names = list(label_encoder.classes_)  # assumes you have a fitted label_encoder
print(classification_report(y_true, y_pred_ensemble, target_names=target_names))


***KERAS MODEL EVALUATION***

In [None]:
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix, classification_report
from tensorflow.keras.models import load_model

# Load all 5 models
model_paths = [f"/mnt/c/modelFiles/greyColor_quantized_kfold_model_{fold}.keras" for i in range(5)]
models = [load_model(path) for path in model_paths]

In [None]:
def evaluate_soft_voting(models, test_generator):
    num_samples = len(test_generator.image_paths)
    steps = int(np.ceil(num_samples / test_generator.batch_size))

    y_true_all, y_pred_all = [], []

    for _ in range(steps):
        x_batch, y_batch = next(test_generator)
        pred_probs = np.zeros((len(x_batch), len(test_generator.class_indices)))

        for model in models:
            pred_probs += model.predict(x_batch, verbose=0)

        avg_probs = pred_probs / len(models)
        y_pred_all.extend(np.argmax(avg_probs, axis=1))
        y_true_all.extend(np.argmax(y_batch, axis=1))

    y_true_all = np.array(y_true_all[:num_samples])
    y_pred_all = np.array(y_pred_all[:num_samples])

    class_names = list(test_generator.class_indices.keys())

    # Confusion matrix
    cm = confusion_matrix(y_true_all, y_pred_all)
    plt.figure(figsize=(10, 8))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Oranges',
                xticklabels=class_names,
                yticklabels=class_names)
    plt.title('Confusion Matrix - Soft Voting Ensemble')
    plt.xlabel('Predicted Label')
    plt.ylabel('True Label')
    plt.tight_layout()
    plt.show()

    # Metrics
    print("\nClassification Report:")
    print(classification_report(y_true_all, y_pred_all, target_names=class_names))

    accuracy = np.sum(y_true_all == y_pred_all) / len(y_true_all)
    print(f"\nOverall Accuracy: {accuracy:.4f}")

    print("\nPer-class Accuracy:")
    for i, class_name in enumerate(class_names):
        mask = y_true_all == i
        correct = np.sum((y_true_all == i) & (y_pred_all == i))
        print(f"{class_name}: {correct / np.sum(mask):.4f}")

# Usage
evaluate_soft_voting(models, test_generator)


(SIMULATION) 
**COMPARISON BETWEEN A GENERIC MODEL AND QUANTIZATION AWARE TRAINING MODEL**

In [None]:
def visualize_quantization_effects(original_model_path, quantized_model_path, test_generator):
    """
    Visualize the effects of quantization on model weights and activations.
    
    Args:
        original_model_path: Path to the original (non-quantized) model
        quantized_model_path: Path to the quantized-aware trained model
        test_generator: A data generator providing test data
    """
    import matplotlib.pyplot as plt
    import numpy as np
    import tensorflow as tf
    
    # Define custom objects for loading the quantized model
    custom_objects = {'QuantizedConv2D': QuantizedConv2D, 'QuantizedDense': QuantizedDense}
    
    # Load both models
    original_model = tf.keras.models.load_model(original_model_path)
    quantized_model = tf.keras.models.load_model(quantized_model_path, custom_objects=custom_objects)
    # Build models if they aren't already
    sample_input_shape = (None, 256, 256, 1)
    original_model.build(input_shape=sample_input_shape)
    quantized_model.build(input_shape=sample_input_shape)
    # Get a batch of test data
    x_batch, _ = next(iter(test_generator))
    
    # Extract weights for comparison
    def get_conv_weights(model):
        # Get weights from the first conv layer
        for layer in model.layers:
            if isinstance(layer, tf.keras.layers.Conv2D) or isinstance(layer, QuantizedConv2D):
                return layer.get_weights()[0]  # Return kernel weights
        return None
    
    orig_weights = get_conv_weights(original_model)
    quant_weights = get_conv_weights(quantized_model)
    
    # 1. Compare weight distributions
    plt.figure(figsize=(15, 5))
    
    plt.subplot(1, 3, 1)
    plt.hist(orig_weights.flatten(), bins=50, alpha=0.5, label='Original')
    plt.title('Original Weight Distribution')
    plt.xlabel('Weight Value')
    plt.ylabel('Frequency')
    
    plt.subplot(1, 3, 2)
    plt.hist(quant_weights.flatten(), bins=50, alpha=0.5, label='Quantized')
    plt.title('Quantized Weight Distribution')
    plt.xlabel('Weight Value')
    plt.ylabel('Frequency')
    
    plt.subplot(1, 3, 3)
    plt.hist(orig_weights.flatten(), bins=50, alpha=0.5, label='Original')
    plt.hist(quant_weights.flatten(), bins=50, alpha=0.5, label='Quantized')
    plt.title('Overlaid Distributions')
    plt.xlabel('Weight Value')
    plt.ylabel('Frequency')
    plt.legend()
    
    plt.tight_layout()
    plt.savefig('weight_distribution_comparison.png')
    plt.show()
    
    # 2. Visualize quantization steps
    # Create a simple model to extract activations
    def create_activation_model(model, layer_name):
        # Define the known input shape (256, 256, 3)
        input_shape = (256,256,1)
        
        # Create new input layer
        input_layer = tf.keras.layers.Input(shape=input_shape)
        
        # Find the target layer
        target_layer = None
        x = input_layer
        
        for layer in model.layers:
            try:
                x = layer(x)
                if layer_name.lower() in layer.name.lower():  # Case-insensitive match
                    target_layer = x
                    break
            except:
                # If layer fails (like Input layers), skip it
                continue
        
        # If target layer not found, use the output of the middle layer
        if target_layer is None:
            middle_idx = len(model.layers) // 2
            x = input_layer
            for i, layer in enumerate(model.layers):
                try:
                    x = layer(x)
                    if i == middle_idx:
                        target_layer = x
                        break
                except:
                    continue
        
        return tf.keras.Model(inputs=input_layer, outputs=target_layer)
    # Get activations from a middle conv layer
    orig_activation_model = create_activation_model(original_model, 'conv2d')
    quant_activation_model = create_activation_model(quantized_model, 'quantized_conv2d')
    
    orig_activations = orig_activation_model.predict(x_batch)
    quant_activations = quant_activation_model.predict(x_batch)
    
    # Plot activation distributions
    plt.figure(figsize=(15, 5))
    
    plt.subplot(1, 3, 1)
    plt.hist(orig_activations.flatten(), bins=50, alpha=0.5)
    plt.title('Original Activation Distribution')
    plt.xlabel('Activation Value')
    plt.ylabel('Frequency')
    
    plt.subplot(1, 3, 2)
    plt.hist(quant_activations.flatten(), bins=50, alpha=0.5)
    plt.title('Quantized Activation Distribution')
    plt.xlabel('Activation Value')
    plt.ylabel('Frequency')
    
    plt.subplot(1, 3, 3)
    plt.hist(orig_activations.flatten(), bins=50, alpha=0.5, label='Original')
    plt.hist(quant_activations.flatten(), bins=50, alpha=0.5, label='Quantized')
    plt.title('Overlaid Activation Distributions')
    plt.xlabel('Activation Value')
    plt.ylabel('Frequency')
    plt.legend()
    
    plt.tight_layout()
    plt.savefig('activation_distribution_comparison.png')
    plt.show()
    
    # 3. Visualize original vs quantized features for a sample image
    # Get a single test image
    sample_img = x_batch[0:1]
    
    # Function to get intermediate layer outputs
    def get_layer_outputs(model, input_data, num_layers=4):
           
        model.build(input_shape=(None,256, 256, 1))

        
        outputs = []
        count = 0
    
        for layer in model.layers:
            if isinstance(layer, (tf.keras.layers.Conv2D, QuantizedConv2D)):
                temp_model = tf.keras.Model(inputs=model.inputs, outputs=layer.output)
                out = temp_model.predict(input_data)
                outputs.append(out)
                count += 1
                if count >= num_layers:
                    break
        return outputs



    
    
    orig_features = get_layer_outputs(original_model, sample_img)
    quant_features = get_layer_outputs(quantized_model, sample_img)
    
    # Plot feature maps from conv layers
    for layer_idx in range(min(len(orig_features), len(quant_features))):
        orig_feature = orig_features[layer_idx][0]
        quant_feature = quant_features[layer_idx][0]
        
        # Plot first 8 channels from each feature map
        plt.figure(figsize=(16, 8))
        plt.suptitle(f'Layer {layer_idx+1} Feature Maps', fontsize=16)
        
        for i in range(min(8, orig_feature.shape[-1])):
            # Original feature
            plt.subplot(2, 8, i+1)
            plt.imshow(orig_feature[:,:,i], cmap='viridis')
            plt.title(f'Original Ch {i+1}')
            plt.axis('off')
            
            # Quantized feature
            plt.subplot(2, 8, i+9)
            plt.imshow(quant_feature[:,:,i], cmap='viridis')
            plt.title(f'Quantized Ch {i+1}')
            plt.axis('off')
        
        plt.tight_layout(rect=[0, 0, 1, 0.95])
        plt.savefig(f'feature_map_comparison_layer_{layer_idx+1}.png')
        plt.show()
    
    # 4. Compare prediction probabilities
    orig_preds = original_model.predict(x_batch[:5])
    quant_preds = quantized_model.predict(x_batch[:5])
    
    plt.figure(figsize=(12, 6))
    
    for i in range(min(5, len(orig_preds))):
        plt.subplot(1, 5, i+1)
        
        # Get top 3 class indices
        top_orig = np.argsort(orig_preds[i])[-3:][::-1]
        top_quant = np.argsort(quant_preds[i])[-3:][::-1]
        
        # Combine unique classes
        classes = np.unique(np.concatenate([top_orig, top_quant]))
        
        # Plot side by side
        x = np.arange(len(classes))
        width = 0.35
        
        orig_values = [orig_preds[i][cls] for cls in classes]
        quant_values = [quant_preds[i][cls] for cls in classes]
        
        plt.bar(x - width/2, orig_values, width, label='Original')
        plt.bar(x + width/2, quant_values, width, label='Quantized')
        
        plt.title(f'Sample {i+1}')
        plt.xlabel('Class Index')
        plt.ylabel('Probability')
        plt.xticks(x, classes)
        
        if i == 0:
            plt.legend()
    
    plt.tight_layout()
    plt.savefig('prediction_probability_comparison.png')
    plt.show()
    
    # 5. Calculate size reduction
    def get_model_size(model):
        """Get approximate model size in MB"""
        weights = [w.numpy() for w in model.weights]
        total_params = sum(w.size for w in weights)
        
        # Calculate size in MB (32-bit float = 4 bytes)
        size_mb = (total_params * 4) / (1024 * 1024)
        return size_mb
    
    orig_size = get_model_size(original_model)
    
    # Simulate quantized model size (8-bit = 1 byte)
    quant_size = get_model_size(quantized_model) / 4  # Approximation: 8-bit is 1/4 of 32-bit
    
    print(f"Original model size: {orig_size:.2f} MB")
    print(f"Estimated quantized model size: {quant_size:.2f} MB")
    print(f"Size reduction: {(1 - quant_size/orig_size) * 100:.1f}%")
    
    # 6. Measure and compare inference speed
    import time
    
    # Warm up
    _ = original_model.predict(x_batch[:10])
    _ = quantized_model.predict(x_batch[:10])
    
    # Measure original model speed
    start_time = time.time()
    _ = original_model.predict(x_batch)
    orig_time = time.time() - start_time
    
    # Measure quantized model speed
    start_time = time.time()
    _ = quantized_model.predict(x_batch)
    quant_time = time.time() - start_time
    
    print(f"Original model inference time: {orig_time:.4f} seconds")
    print(f"Quantized model inference time: {quant_time:.4f} seconds")
    print(f"Speed improvement: {(1 - quant_time/orig_time) * 100:.1f}%")
    
    # Return summary as dictionary for further analysis
    return {
        "original_size_mb": orig_size,
        "quantized_size_mb": quant_size,
        "size_reduction_percent": (1 - quant_size/orig_size) * 100,
        "original_inference_time": orig_time,
        "quantized_inference_time": quant_time,
        "speed_improvement_percent": (1 - quant_time/orig_time) * 100
    }

# Usage example:
# First train your model with quantization-aware training
# Then call this function:
visualize_quantization_effects('/mnt/c/modelFiles/GreyColor_NoQuant_NoKFold.keras', "/mnt/c/modelFiles/greyColor_quantized_kfold_model_0.keras", test_generator)

**CLASS BASED ADAPTIVE ACCURACY THRESHOLDING**

In [None]:
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd
from sklearn.metrics import confusion_matrix, f1_score, classification_report, roc_curve, auc
from tensorflow.keras.models import load_model
import tensorflow as tf

def calculate_metrics_multiclass(y_true, y_pred_probs, thresholds):
    """
    Apply per-class thresholds and compute evaluation metrics.
    
    Args:
        y_true: True class labels
        y_pred_probs: Predicted probabilities for each class
        thresholds: List of per-class thresholds
    
    Returns:
        y_pred: Final predicted labels based on adaptive thresholds
        cm: Confusion matrix
        f1_scores: Per-class F1 scores
    """
    num_classes = y_pred_probs.shape[1]
    
    # Apply adaptive thresholds: assign class only if it exceeds its threshold
    y_pred = np.full(y_true.shape, -1)  # Initialize with -1 (uncertain)
    
    for i in range(num_classes):
        mask = y_pred_probs[:, i] >= thresholds[i]
        y_pred[mask] = i  # Assign class i if its probability exceeds threshold
    
    # Handle uncertain cases: assign most confident prediction if no threshold is met
    uncertain_mask = y_pred == -1
    y_pred[uncertain_mask] = np.argmax(y_pred_probs[uncertain_mask], axis=1)

    # Compute confusion matrix
    cm = confusion_matrix(y_true, y_pred)

    # Compute per-class F1 scores
    f1_scores = f1_score(y_true, y_pred, average=None)  # Per-class F1-score
    overall_f1 = f1_score(y_true, y_pred, average="macro")  # Macro-average F1-score
    
    return y_pred, cm, f1_scores, overall_f1

def soft_voting_ensemble(models, test_generator):
    """
    Perform soft voting ensemble prediction from multiple models.
    
    Args:
        models: List of trained models
        test_generator: Test data generator
    
    Returns:
        y_true: True class labels
        y_pred_probs: Averaged predicted probabilities from ensemble
    """
    steps = len(test_generator)
    y_true = []
    y_pred_probs = []
    
    for _ in range(steps):
        x_batch, y_batch = next(test_generator)
        y_true.extend(np.argmax(y_batch, axis=1))  # Convert one-hot labels to indices
        
        # Get predicted probabilities from each model and average them
        model_preds = [model.predict(x_batch, verbose=0) for model in models]
        avg_preds = np.mean(model_preds, axis=0)  # Average probabilities
        y_pred_probs.extend(avg_preds)
    
    y_true = np.array(y_true)
    y_pred_probs = np.array(y_pred_probs)
    
    return y_true, y_pred_probs

def adaptive_threshold_determination_multiclass_ensemble(models, test_generator, threshold_range=None):
    """
    Determine adaptive per-class thresholds and evaluate the ensemble model.
    
    Args:
        models: List of trained models
        test_generator: Test data generator
        threshold_range: Range of thresholds to test (default: 0 to 1 in 0.01 steps)
    
    Returns:
        optimal_thresholds: List of per-class optimal thresholds
    """
    # Get ensemble predictions
    y_true, y_pred_probs = soft_voting_ensemble(models, test_generator)
    
    num_classes = y_pred_probs.shape[1]
    class_names = list(test_generator.class_indices.keys())

    # Set threshold search range
    if threshold_range is None:
        threshold_range = np.arange(0, 1.01, 0.01)

    # Find best threshold for each class based on F1-score
    best_thresholds = []
    best_f1_scores = []

    for i in range(num_classes):
        best_threshold = 0.5  # Default
        best_f1 = 0.0

        for threshold in threshold_range:
            temp_thresholds = [0.5] * num_classes  # Start with default 0.5 for all
            temp_thresholds[i] = threshold  # Vary only the current class
            
            _, _, f1_scores, _ = calculate_metrics_multiclass(y_true, y_pred_probs, temp_thresholds)
            if f1_scores[i] > best_f1:
                best_f1 = f1_scores[i]
                best_threshold = threshold

        best_thresholds.append(best_threshold)
        best_f1_scores.append(best_f1)

    # Apply optimal thresholds
    y_pred, cm, f1_scores, overall_f1 = calculate_metrics_multiclass(y_true, y_pred_probs, best_thresholds)

    # Compute ROC Curve & AUC per class
    fpr = {}
    tpr = {}
    roc_auc = {}

    for i in range(num_classes):
        y_true_binary = (y_true == i).astype(int)
        y_pred_binary = y_pred_probs[:, i]

        fpr[i], tpr[i], _ = roc_curve(y_true_binary, y_pred_binary)
        roc_auc[i] = auc(fpr[i], tpr[i])

    macro_auc = np.mean(list(roc_auc.values()))

    # Plot confusion matrix
    plt.figure(figsize=(10, 8))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues',
                xticklabels=class_names,
                yticklabels=class_names)
    plt.title('Confusion Matrix')
    plt.xlabel('Predicted Label')
    plt.ylabel('True Label')
    plt.show()

    # Plot ROC Curves
    plt.figure(figsize=(8, 6))
    for i in range(num_classes):
        plt.plot(fpr[i], tpr[i], label=f'Class {class_names[i]} (AUC = {roc_auc[i]:.2f})')
    
    plt.plot([0, 1], [0, 1], 'k--')  # Random classifier line
    plt.xlabel('False Positive Rate')
    plt.ylabel('True Positive Rate')
    plt.title(f'Multiclass ROC Curve (Macro AUC = {macro_auc:.2f})')
    plt.legend()
    plt.grid(True)
    plt.show()

    # Print classification report
    print("\nClassification Report:")
    print(classification_report(y_true, y_pred, target_names=class_names))

    # Print optimal thresholds
    print("\nOptimal Per-Class Thresholds:")
    for i, class_name in enumerate(class_names):
        print(f"{class_name}: {best_thresholds[i]:.2f} (F1 = {best_f1_scores[i]:.4f})")

    print(f"\nOverall Macro F1 Score: {overall_f1:.4f}")
    print(f"Macro-Averaged ROC AUC: {macro_auc:.4f}")

    return best_thresholds

# Usage:
def evaluate_multiclass_with_adaptive_threshold_ensemble(models, test_generator):
    """
    Evaluate ensemble model on a multi-class dataset using adaptive thresholding.
    """
    optimal_thresholds = adaptive_threshold_determination_multiclass_ensemble(models, test_generator)
    return optimal_thresholds


# Call the evaluation function
try:
    evaluate_multiclass_with_adaptive_threshold_ensemble(models, test_generator)
except StopIteration as e:
    print("Iteration stopped. Evaluating again.")
    evaluate_multiclass_with_adaptive_threshold_ensemble(models, test_generator)


In [None]:
'''# %%
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import confusion_matrix, f1_score, roc_curve, auc
import tensorflow as tf

def calculate_metrics(y_true, y_pred, threshold):
    """Calculate TP, TN, FP, FN, FPR, FNR for given threshold"""
    predictions = (y_pred >= threshold).astype(int)
    tn, fp, fn, tp = confusion_matrix(y_true, predictions).ravel()   
    
    fpr = fp / (fp + tn) if (fp + tn) > 0 else 0
    fnr = fn / (fn + tp) if (fn + tp) > 0 else 0
    f1 = f1_score(y_true, predictions)
    
    return {
        'threshold': threshold,
        'TP': tp, 'TN': tn, 'FP': fp, 'FN': fn,
        'FPR': fpr, 'FNR': fnr, 'F1': f1
    }
'''

In [None]:
'''#evaluate multiclass
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd
from sklearn.metrics import confusion_matrix, f1_score, roc_curve, auc, classification_report
import tensorflow as tf

def adaptive_threshold_determination_multiclass1(model, test_generator):
    """
    Evaluate a multi-class classification model.
    
    Args:
        model: Trained model
        test_generator: Test data generator
    """
    # Reset generator and get predictions
    test_generator.reset()
    steps = len(test_generator)
    
    # Get all predictions and true labels
    y_true = []
    y_pred_probs = []
    
    for i in range(steps):
        x_batch, y_batch = next(test_generator)
        batch_pred = model.predict(x_batch, verbose=0)
        y_true.extend(np.argmax(y_batch, axis=1))  # Convert one-hot labels to class indices
        y_pred_probs.extend(batch_pred)  # Keep full probability distribution
    
    y_true = np.array(y_true)
    y_pred_probs = np.array(y_pred_probs)
    
    # Convert predicted probabilities to class labels
    y_pred = np.argmax(y_pred_probs, axis=1)

    # Get class names
    class_names = list(test_generator.class_indices.keys())
    num_classes = len(class_names)
    
    # Compute confusion matrix
    cm = confusion_matrix(y_true, y_pred)
    
    # Compute per-class F1 scores
    f1_scores = f1_score(y_true, y_pred, average=None)  # Per-class F1-score
    overall_f1 = f1_score(y_true, y_pred, average="macro")  # Macro-average F1-score
    
    # ROC Curve & AUC for multiclass (One-vs-Rest)
    fpr = {}
    tpr = {}
    roc_auc = {}
    
    for i in range(num_classes):
        # Convert labels to binary (One-vs-Rest)
        y_true_binary = (y_true == i).astype(int)
        y_pred_binary = y_pred_probs[:, i]
        
        fpr[i], tpr[i], _ = roc_curve(y_true_binary, y_pred_binary)
        roc_auc[i] = auc(fpr[i], tpr[i])
    
    # Macro-Averaged AUC
    macro_auc = np.mean(list(roc_auc.values()))

    # Plot confusion matrix
    plt.figure(figsize=(10, 8))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues',
                xticklabels=class_names,
                yticklabels=class_names)
    plt.title('Confusion Matrix')
    plt.xlabel('Predicted Label')
    plt.ylabel('True Label')
    plt.show()

    # Plot ROC Curves
    plt.figure(figsize=(8, 6))
    for i in range(num_classes):
        plt.plot(fpr[i], tpr[i], label=f'Class {class_names[i]} (AUC = {roc_auc[i]:.2f})')
    
    plt.plot([0, 1], [0, 1], 'k--')  # Random classifier line
    plt.xlabel('False Positive Rate')
    plt.ylabel('True Positive Rate')
    plt.title(f'Multiclass ROC Curve (Macro AUC = {macro_auc:.2f})')
    plt.legend()
    plt.grid(True)
    plt.show()

    # Print classification report
    print("\nClassification Report:")
    print(classification_report(y_true, y_pred, target_names=class_names))

    # Print per-class F1 scores
    print("\nPer-Class F1 Scores:")
    for i, class_name in enumerate(class_names):
        print(f"{class_name}: {f1_scores[i]:.4f}")

    print(f"\nOverall Macro F1 Score: {overall_f1:.4f}")
    print(f"Macro-Averaged ROC AUC: {macro_auc:.4f}")

# Usage:
def evaluate_multiclass(model, test_generator):
    """
    Evaluate model on a multi-class dataset.
    """
    adaptive_threshold_determination_multiclass1(model, test_generator)'''


In [None]:
'''#PER CLASS THRESHOLDING


import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd
from sklearn.metrics import confusion_matrix, f1_score, classification_report, roc_curve, auc
import tensorflow as tf

def calculate_metrics_multiclass(y_true, y_pred_probs, thresholds):
    """
    Apply per-class thresholds and compute evaluation metrics.
    
    Args:
        y_true: True class labels
        y_pred_probs: Predicted probabilities for each class
        thresholds: List of per-class thresholds
    
    Returns:
        y_pred: Final predicted labels based on adaptive thresholds
        cm: Confusion matrix
        f1_scores: Per-class F1 scores
    """
    num_classes = y_pred_probs.shape[1]
    
    # Apply adaptive thresholds: assign class only if it exceeds its threshold
    y_pred = np.full(y_true.shape, -1)  # Initialize with -1 (uncertain)
    
    for i in range(num_classes):
        mask = y_pred_probs[:, i] >= thresholds[i]
        y_pred[mask] = i  # Assign class i if its probability exceeds threshold
    
    # Handle uncertain cases: assign most confident prediction if no threshold is met
    uncertain_mask = y_pred == -1
    y_pred[uncertain_mask] = np.argmax(y_pred_probs[uncertain_mask], axis=1)

    # Compute confusion matrix
    cm = confusion_matrix(y_true, y_pred)

    # Compute per-class F1 scores
    f1_scores = f1_score(y_true, y_pred, average=None)  # Per-class F1-score
    overall_f1 = f1_score(y_true, y_pred, average="macro")  # Macro-average F1-score
    
    return y_pred, cm, f1_scores, overall_f1


def adaptive_threshold_determination_multiclass2(model, test_generator, threshold_range=None):
    """
    Determine adaptive per-class thresholds and evaluate the model.
    
    Args:
        model: Trained model
        test_generator: Test data generator
        threshold_range: Range of thresholds to test (default: 0 to 1 in 0.01 steps)
    
    Returns:
        optimal_thresholds: List of per-class optimal thresholds
    """
    # Reset generator and get predictions
    #test_generator.reset()
    steps = len(test_generator)
    
    y_true = []
    y_pred_probs = []
    
    for _ in range(steps):
        x_batch, y_batch = next(test_generator)
        batch_pred = model.predict(x_batch, verbose=0)
        y_true.extend(np.argmax(y_batch, axis=1))  # Convert one-hot labels to indices
        y_pred_probs.extend(batch_pred)  # Store full probability distribution
    
    y_true = np.array(y_true)
    y_pred_probs = np.array(y_pred_probs)
    
    num_classes = y_pred_probs.shape[1]
    class_names = list(test_generator.class_indices.keys())

    # Set threshold search range
    if threshold_range is None:
        threshold_range = np.arange(0, 1.01, 0.01)

    # Find best threshold for each class based on F1-score
    best_thresholds = []
    best_f1_scores = []

    for i in range(num_classes):
        best_threshold = 0.5  # Default
        best_f1 = 0.0

        for threshold in threshold_range:
            temp_thresholds = [0.5] * num_classes  # Start with default 0.5 for all
            temp_thresholds[i] = threshold  # Vary only the current class
            
            _, _, f1_scores, _ = calculate_metrics_multiclass(y_true, y_pred_probs, temp_thresholds)
            if f1_scores[i] > best_f1:
                best_f1 = f1_scores[i]
                best_threshold = threshold

        best_thresholds.append(best_threshold)
        best_f1_scores.append(best_f1)

    # Apply optimal thresholds
    y_pred, cm, f1_scores, overall_f1 = calculate_metrics_multiclass(y_true, y_pred_probs, best_thresholds)

    # Compute ROC Curve & AUC per class
    fpr = {}
    tpr = {}
    roc_auc = {}

    for i in range(num_classes):
        y_true_binary = (y_true == i).astype(int)
        y_pred_binary = y_pred_probs[:, i]

        fpr[i], tpr[i], _ = roc_curve(y_true_binary, y_pred_binary)
        roc_auc[i] = auc(fpr[i], tpr[i])

    macro_auc = np.mean(list(roc_auc.values()))

    # Plot confusion matrix
    plt.figure(figsize=(10, 8))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues',
                xticklabels=class_names,
                yticklabels=class_names)
    plt.title('Confusion Matrix')
    plt.xlabel('Predicted Label')
    plt.ylabel('True Label')
    plt.show()

    # Plot ROC Curves
    plt.figure(figsize=(8, 6))
    for i in range(num_classes):
        plt.plot(fpr[i], tpr[i], label=f'Class {class_names[i]} (AUC = {roc_auc[i]:.2f})')
    
    plt.plot([0, 1], [0, 1], 'k--')  # Random classifier line
    plt.xlabel('False Positive Rate')
    plt.ylabel('True Positive Rate')
    plt.title(f'Multiclass ROC Curve (Macro AUC = {macro_auc:.2f})')
    plt.legend()
    plt.grid(True)
    plt.show()

    # Print classification report
    print("\nClassification Report:")
    print(classification_report(y_true, y_pred, target_names=class_names))

    # Print optimal thresholds
    print("\nOptimal Per-Class Thresholds:")
    for i, class_name in enumerate(class_names):
        print(f"{class_name}: {best_thresholds[i]:.2f} (F1 = {best_f1_scores[i]:.4f})")

    print(f"\nOverall Macro F1 Score: {overall_f1:.4f}")
    print(f"Macro-Averaged ROC AUC: {macro_auc:.4f}")

    return best_thresholds


# Usage:
def evaluate_multiclass_with_adaptive_threshold(model, test_generator):
    """
    Evaluate model on a multi-class dataset using adaptive thresholding.
    """
    optimal_thresholds = adaptive_threshold_determination_multiclass2(model, test_generator)
    return optimal_thresholds
'''

In [None]:
'''# %%
cnn = tf.keras.models.load_model("/mnt/c/modelFiles/ensemble_greyColor_quantized_average_model.keras")
'''


In [None]:
'''try:
    evaluate_multiclass_with_adaptive_threshold(cnn,test_generator)
except StopIteration as e:
    print("iteration stopped. evaluating again.")
    evaluate_multiclass_with_adaptive_threshold(cnn,test_generator)'''

***QAT GENERIC KERAS MODEL RESOURCE UTILISATIONS AND INFERENCE***

In [None]:
'''# %%
import tensorflow as tf
import numpy as np
import time
import psutil
import os
# !pip install gputil
# !pip install pynvml

import GPUtil'''

In [None]:
'''# Load the trained model

model = tf.keras.models.load_model("/mnt/c/modelFiles/ensemble_greyColor_quantized_average_model.keras")

# Load a single test image (modify path as needed)
img_path = "/mnt/c/newTrain/Train/line/17.jpg"
# Load as grayscale
img = cv2.imread(img_path, cv2.IMREAD_GRAYSCALE)
img = cv2.resize(img, (256, 256))
img = img.astype(np.float32) / 255.0  # Normalize

# Compute Sobel edge detection
sobelx = cv2.Sobel(img, cv2.CV_32F, 1, 0, ksize=3)
sobely = cv2.Sobel(img, cv2.CV_32F, 0, 1, ksize=3)

# Normalize edges to 0-1
sobelx = cv2.normalize(sobelx, None, 0, 1, cv2.NORM_MINMAX)
sobely = cv2.normalize(sobely, None, 0, 1, cv2.NORM_MINMAX)

# Stack channels (grayscale repeated 3 times + Sobel X + Sobel Y)
stacked = np.stack([img], axis=-1)

# Add batch dimension
img_array = np.expand_dims(stacked, axis=0)  # Shape becomes (1, 256, 256, 3)

# Measure cold start time (time to first prediction)
t1 = time.time()
prediction = model.predict(img_array)
t2 = time.time()
cold_start_time = t2 - t1

# Measure inference time for a single image
num_trials = 10
times = []
for _ in range(num_trials):
    t1 = time.time()
    _ = model.predict(img_array)
    t2 = time.time()
    times.append(t2 - t1)

inference_time = np.mean(times)

# Model size
model_size = os.path.getsize("/mnt/c/modelFiles/ensemble_greyColor_quantized_average_model.keras") / (1024 * 1024)  # In MB

# Memory usage before and after prediction
process = psutil.Process(os.getpid())
mem_before = process.memory_info().rss / (1024 * 1024)  # In MB
model.predict(img_array)
mem_after = process.memory_info().rss / (1024 * 1024)  # In MB
memory_usage = mem_after - mem_before

# GPU utilization
gpus = GPUtil.getGPUs()
gpu_usage = gpus[0].load * 100 if gpus else None

# CPU utilization
cpu_util = psutil.cpu_percent(interval=1)

# Power consumption (only works on supported systems)
power_usage = None
try:
    power_usage = psutil.sensors_battery().power_plugged  # Approximate if available
except AttributeError:
    pass

import tensorflow as tf
from tensorflow.python.framework.convert_to_constants import convert_variables_to_constants_v2
from tensorflow.compat.v1.profiler import profile
from tensorflow.compat.v1.profiler import ProfileOptionBuilder

def get_flops(model, input_shape):
    concrete = tf.function(lambda inputs: model(inputs)).get_concrete_function(
        tf.TensorSpec([1] + list(input_shape), model.dtype)
    )
    frozen_func = convert_variables_to_constants_v2(concrete)
    graph = frozen_func.graph

    # Use TensorFlow's profiler
    run_meta = tf.compat.v1.RunMetadata()
    opts = ProfileOptionBuilder.float_operation()
    
    flops = profile(graph, run_meta=run_meta, options=opts)
    
    return flops.total_float_ops  # Total FLOPs

# Example usage:
input_shape = (256, 256, 1)  # Adjust based on your model
flops = get_flops(model, input_shape)
print("FLOPs:", flops)


# Number of parameters
num_params = model.count_params()

# Data loading time
t1 = time.time()
_ = tf.keras.preprocessing.image.load_img(img_path, target_size=(256, 256))
t2 = time.time()
data_loading_time = t2 - t1

# Print results
print(f"Cold Start Time: {cold_start_time:.4f} seconds")
print(f"Average Inference Time: {inference_time:.4f} seconds")
print(f"Model Size: {model_size:.2f} MB")
print(f"Memory Usage: {memory_usage:.2f} MB")
print(f"FLOPs: {flops}")
print(f"Number of Parameters: {num_params}")
print(f"GPU Utilization: {gpu_usage:.2f}%" if gpu_usage is not None else "GPU Utilization: Not Available")
print(f"CPU Utilization: {cpu_util:.2f}%")
print(f"Power Consumption w/o GPU: {power_usage}")
print(f"Data Loading Time: {data_loading_time:.4f} seconds")
import pynvml

def get_gpu_power():
    pynvml.nvmlInit()
    handle = pynvml.nvmlDeviceGetHandleByIndex(0)  # GPU 0
    power = pynvml.nvmlDeviceGetPowerUsage(handle) / 1000  # Convert to Watts
    pynvml.nvmlShutdown()
    return power  # Power in Watts

power_watts = get_gpu_power()
print("power watts (GPU):",power_watts)

def get_sparsity_ratio(model):
    total_params = np.sum([np.prod(w.shape) for w in model.weights])
    zero_params = np.sum([np.sum(w.numpy() == 0) for w in model.weights])
    sparsity = zero_params / total_params
    return sparsity

def get_energy_efficiency(flops, power_watts=None):
    if power_watts is None:
        return "Power consumption data missing"
    return flops / power_watts  # FLOPs per Watt

# Example usage:
sparsity_ratio = get_sparsity_ratio(model)
print("Sparsity Ratio:", sparsity_ratio)

# Replace 'power_watts' with the actual power usage if available
energy_efficiency = get_energy_efficiency(flops, power_watts)
print("Energy Efficiency:", energy_efficiency)'''

In [None]:
# %%
import tensorflow as tf
import numpy as np
import time
import psutil
import os
import cv2
import GPUtil
import pynvml
from tensorflow.python.framework.convert_to_constants import convert_variables_to_constants_v2
from tensorflow.compat.v1.profiler import profile
from tensorflow.compat.v1.profiler import ProfileOptionBuilder

model_paths = [
    "/mnt/c/modelFiles/greyColor_quantized_kfold_model_0.keras",
    "/mnt/c/modelFiles/greyColor_quantized_kfold_model_1.keras",
    "/mnt/c/modelFiles/greyColor_quantized_kfold_model_2.keras",
    "/mnt/c/modelFiles/greyColor_quantized_kfold_model_3.keras",
    "/mnt/c/modelFiles/greyColor_quantized_kfold_model_4.keras"
]

# Load a single test image (modify path as needed)
img_path = "/mnt/c/newTrain/Train/line/17.jpg"
img = cv2.imread(img_path, cv2.IMREAD_GRAYSCALE)
img = cv2.resize(img, (256, 256))
img = img.astype(np.float32) / 255.0

# Stack into 1 channel
stacked = np.stack([img], axis=-1)  # shape: (256, 256, 1)
img_array = np.expand_dims(stacked, axis=0)  # shape: (1, 256, 256, 1)

# Cold start timing
t1 = time.time()
ensemble_probs = np.mean([model.predict(img_array, verbose=0) for model in models], axis=0)
t2 = time.time()
cold_start_time = t2 - t1

# Inference timing
num_trials = 10
times = []
for _ in range(num_trials):
    t1 = time.time()
    _ = np.mean([model.predict(img_array, verbose=0) for model in models], axis=0)
    t2 = time.time()
    times.append(t2 - t1)
inference_time = np.mean(times)

# Average model size
model_size = np.mean([os.path.getsize(path) for path in model_paths]) / (1024 * 1024)

# Memory usage
process = psutil.Process(os.getpid())
mem_before = process.memory_info().rss / (1024 * 1024)
_ = np.mean([model.predict(img_array, verbose=0) for model in models], axis=0)
mem_after = process.memory_info().rss / (1024 * 1024)
memory_usage = mem_after - mem_before

# GPU and CPU stats
gpus = GPUtil.getGPUs()
gpu_usage = gpus[0].load * 100 if gpus else None
cpu_util = psutil.cpu_percent(interval=1)

# Power usage (CPU only)
try:
    power_usage = psutil.sensors_battery().power_plugged
except AttributeError:
    power_usage = None

# GPU power usage
def get_gpu_power():
    pynvml.nvmlInit()
    handle = pynvml.nvmlDeviceGetHandleByIndex(0)
    power = pynvml.nvmlDeviceGetPowerUsage(handle) / 1000
    pynvml.nvmlShutdown()
    return power
power_watts = get_gpu_power()

# FLOPs for one model (assuming same for all)
import io
from contextlib import redirect_stdout

def get_flops(model, input_shape):
    concrete = tf.function(lambda inputs: model(inputs)).get_concrete_function(
        tf.TensorSpec([1] + list(input_shape), model.inputs[0].dtype)
    )
    frozen_func = convert_variables_to_constants_v2(concrete)
    graph = frozen_func.graph

    run_meta = tf.compat.v1.RunMetadata()
    opts = ProfileOptionBuilder.float_operation()

    # Suppress stdout
    with io.StringIO() as buf, redirect_stdout(buf):
        flops = profile(graph, run_meta=run_meta, options=opts)
    
    return flops.total_float_ops

input_shape = (256, 256, 1)
flops = get_flops(models[0], input_shape) * len(models)  # total ensemble FLOPs

# Parameter count
num_params = sum([model.count_params() for model in models])

# Data load timing
t1 = time.time()
_ = tf.keras.preprocessing.image.load_img(img_path, target_size=(256, 256))
t2 = time.time()
data_loading_time = t2 - t1

# Sparsity
def get_sparsity_ratio(model):
    total_params = np.sum([np.prod(w.shape) for w in model.weights])
    zero_params = np.sum([np.sum(w.numpy() == 0) for w in model.weights])
    return zero_params / total_params

sparsity_ratios = [get_sparsity_ratio(model) for model in models]
avg_sparsity = np.mean(sparsity_ratios)

# Energy efficiency
def get_energy_efficiency(flops, power_watts=None):
    if power_watts is None:
        return "Power consumption data missing"
    return flops / power_watts

energy_efficiency = get_energy_efficiency(flops, power_watts)

# Print results
print(f"Cold Start Time: {cold_start_time:.4f} seconds")
print(f"Average Inference Time: {inference_time:.4f} seconds")
print(f"Average Model Size: {model_size:.2f} MB")
print(f"Memory Usage: {memory_usage:.2f} MB")
print(f"Total FLOPs (Ensemble): {flops}")
print(f"Total Parameters (Ensemble): {num_params}")
print(f"GPU Utilization: {gpu_usage:.2f}%" if gpu_usage is not None else "GPU Utilization: Not Available")
print(f"CPU Utilization: {cpu_util:.2f}%")
print(f"Power Consumption (CPU): {power_usage}")
print(f"Power Watts (GPU): {power_watts}")
print(f"Data Loading Time: {data_loading_time:.4f} seconds")
print(f"Average Sparsity Ratio: {avg_sparsity:.4f}")
print(f"Energy Efficiency (FLOPs/Watt): {energy_efficiency}")


***QUANTIZED TFLITE MODEL RESOURCE UTILISATIONS AND INFERENCE***

In [None]:
import numpy as np
import cv2
import time
import os
import psutil
import GPUtil
import pynvml
import tensorflow as tf

# Paths to your 5 TFLite models
model_paths = [
    "/mnt/c/modelFiles/quantized_kfold_model_0.tflite",
    "/mnt/c/modelFiles/quantized_kfold_model_1.tflite",
    "/mnt/c/modelFiles/quantized_kfold_model_2.tflite",
    "/mnt/c/modelFiles/quantized_kfold_model_3.tflite",
    "/mnt/c/modelFiles/quantized_kfold_model_4.tflite"
]

# Load interpreters
interpreters = [tf.lite.Interpreter(model_path=path) for path in model_paths]
for interpreter in interpreters:
    interpreter.allocate_tensors()

input_details = interpreters[0].get_input_details()
output_details = interpreters[0].get_output_details()

# Preprocess image
img_path = "/mnt/c/newTrain/Train/line/17.jpg"
img = cv2.imread(img_path, cv2.IMREAD_GRAYSCALE)
img = cv2.resize(img, (256, 256))
img = img.astype(np.float32) / 255.0

# Sobel edges
sobelx = cv2.Sobel(img, cv2.CV_32F, 1, 0, ksize=3)
sobely = cv2.Sobel(img, cv2.CV_32F, 0, 1, ksize=3)
sobelx = cv2.normalize(sobelx, None, 0, 1, cv2.NORM_MINMAX)
sobely = cv2.normalize(sobely, None, 0, 1, cv2.NORM_MINMAX)

# Stack channels
stacked = np.stack([img], axis=-1)

# Quantize input
input_scale, input_zero_point = input_details[0]['quantization']
img_input = stacked / input_scale + input_zero_point
img_input = np.clip(img_input, -128, 127).astype(np.int8)
img_input = np.expand_dims(img_input, axis=0)

# Cold start time (for one model)
t1 = time.time()
interpreters[0].set_tensor(input_details[0]['index'], img_input)
interpreters[0].invoke()
_ = interpreters[0].get_tensor(output_details[0]['index'])
t2 = time.time()
cold_start_time = t2 - t1

# Inference time over 10 trials (ensemble)
times = []
for _ in range(10):
    t1 = time.time()
    predictions = []
    for interpreter in interpreters:
        interpreter.set_tensor(input_details[0]['index'], img_input)
        interpreter.invoke()
        output_data = interpreter.get_tensor(output_details[0]['index'])
        predictions.append(output_data[0])
    _ = np.mean(predictions, axis=0)
    t2 = time.time()
    times.append(t2 - t1)
inference_time = np.mean(times)

# Model size (sum of all 5)
model_size = sum(os.path.getsize(path) for path in model_paths) / (1024 * 1024)

# Memory usage
process = psutil.Process(os.getpid())
mem_before = process.memory_info().rss / (1024 * 1024)
for interpreter in interpreters:
    interpreter.set_tensor(input_details[0]['index'], img_input)
    interpreter.invoke()
    _ = interpreter.get_tensor(output_details[0]['index'])
mem_after = process.memory_info().rss / (1024 * 1024)
memory_usage = mem_after - mem_before

# GPU usage
gpus = GPUtil.getGPUs()
gpu_usage = gpus[0].load * 100 if gpus else None

# CPU usage
cpu_util = psutil.cpu_percent(interval=1)

# Power (battery)
try:
    power_usage = psutil.sensors_battery().power_plugged
except:
    power_usage = None

# GPU power
def get_gpu_power():
    pynvml.nvmlInit()
    handle = pynvml.nvmlDeviceGetHandleByIndex(0)
    power = pynvml.nvmlDeviceGetPowerUsage(handle) / 1000
    pynvml.nvmlShutdown()
    return power

power_watts = get_gpu_power()

# Data loading time
t1 = time.time()
_ = cv2.imread(img_path, cv2.IMREAD_GRAYSCALE)
t2 = time.time()
data_loading_time = t2 - t1

# Print results
print(f"Cold Start Time: {cold_start_time:.4f} s")
print(f"Average Inference Time (Ensemble): {inference_time:.4f} s")
print(f"average Model Size (5 models): {model_size/5:.2f} MB")
print(f"Memory Usage: {memory_usage:.2f} MB")
print(f"GPU Utilization: {gpu_usage:.2f}%" if gpu_usage else "GPU Utilization: Not Available")
print(f"CPU Utilization: {cpu_util:.2f}%")
print(f"Power Plugged In: {power_usage}")
print(f"GPU Power (W): {power_watts}")
print(f"Data Loading Time: {data_loading_time:.4f} s")


**ENDING CODES**

In [None]:
# %%
!pip freeze > requirements.txt

# %%