# Medicine Box detection algorithm 

In [None]:
import tensorflow as tf 
import tensorflow.keras.layers as layers 
import tensorflow.keras.models as models
import tensorflow.keras.optimizers as optim
from tensorflow.keras.regularizers import l2, l1

import os 
import cv2
import numpy as np 
import random
import pandas as pd 
import matplotlib.pyplot as plt 
from scipy.sparse import csr_matrix
import traceback

## Control values 

In [None]:
PATH = r"Pill_Detection"
IMG  = (256, 256, 3)
GRID = (8, 8)
FAC = IMG[0] / GRID[0]

DEF_NUM = 4
DEF_KER = 3
DEF_STR = 1
MOMENT = 0.95
KREG = 3e-4
BREG = 3e-6
ACTF = "silu"
VRED = 0.7
XFAC = 2.0

In [None]:
A2V = lambda x: x.reshape(-1, 1)
_2L = lambda x: [int(x)] if type(x) == type(int()) else [int(c) for c in x]

In [None]:
def find_class_indexes(classes, class_list=None):
    if type(class_list) == type(None):
        class_list = list(set(list(classes)))
    else:
         class_list = list(set(class_list + list(classes)))
    datapoint_index = A2V(np.array([class_list.index(data) for data in classes]))
    return class_list, datapoint_index

def make_grid(xmin, ymin, xmax, ymax, image_indexes, imgset, class_indexes, factor=FAC):
    base_shape = np.zeros(((len(imgset), ) +  GRID + (6,)))
    centerx = xmin
    centery = ymin
    width = A2V((xmax - xmin) )
    height = A2V((ymax - ymin) )
    boxx = np.array(centerx / factor, dtype="uint8") 
    boxy = np.array(centery / factor, dtype="uint8") 
    
    centerx = A2V((centerx % factor) / factor) 
    centery = A2V((centery % factor) / factor)
    width = width / factor 
    height = height / factor 
    print(image_indexes.shape, boxx.shape, boxy.shape)

    base_shape[image_indexes, boxx, boxy, :] = np.concatenate([class_indexes, np.ones((class_indexes.shape)), centerx, centery, width, height], axis=1)
    return base_shape
    

def read_folder(PATH, annotation = "_annotations.csv", class_list = None):
        base_path = os.getcwd()
    # try:
        os.chdir(PATH)
        dataset = pd.read_csv(annotation).values
        image_set = list(set(dataset[:, 0]))
        image_indexes = np.array([image_set.index(data) for data in  dataset[:, 0]])
        x_train = np.array([cv2.cvtColor(cv2.resize(cv2.imread(datapoint), IMG[:2], cv2.INTER_CUBIC), cv2.COLOR_RGB2BGR) for datapoint in image_set])
        print(type(image_indexes[0]))

        W, H = dataset[:, 1:3].T
        xmin, ymin, xmax, ymax = dataset[:, -4:].T
        xmin = xmin * (IMG[0] / W )
        xmax = xmax * (IMG[0] / W )
        ymin = ymin * (IMG[1] / H )
        ymax = ymax * (IMG[1] / H )
        class_list, class_indexes = find_class_indexes(dataset[:, 3], class_list)
        y_train = make_grid(xmin, ymin, xmax, ymax, image_indexes, image_set, class_indexes, FAC)
        os.chdir(base_path)
        return x_train, y_train, class_list
         
    # except Exception as e:
    #     os.chdir(base_path)
    #     print(base_path)
    #     error_message = traceback.print_exc()
    #     print(error_message)

    

In [None]:
folders = os.listdir(PATH)

xtrain, ytrain, class_list = read_folder(PATH + "/" + folders[2] )
xvalid, yvalid, _ = read_folder(PATH + "/" + folders[0], class_list=class_list )
x_test, y_test, _ = read_folder(PATH + "/" + folders[1], class_list=class_list )


In [None]:
len(class_list)

In [None]:
def point_reader(ytr, factor = FAC):
    sparse_matrix_list = [csr_matrix(ytr[:, :, c]).tocoo() for c in range(ytr.shape[-1])]

    centerx = sparse_matrix_list[-4].row * factor + sparse_matrix_list[-4].data * factor
    centery = sparse_matrix_list[-4].col * factor + sparse_matrix_list[-3].data * factor

    width_by_2  = sparse_matrix_list[-2].data * factor
    height_by_2 = sparse_matrix_list[-1].data * factor
    
    xmin_list = _2L(centerx)
    ymin_list = _2L(centery )
    xmax_list = _2L(centerx + width_by_2)
    ymax_list = _2L(centery + height_by_2)
    return _2L(sparse_matrix_list[-5].data), xmin_list, ymin_list, xmax_list, ymax_list, list(sparse_matrix_list[0].data)


def plot_box(image, ytr, factor = FAC):
    conf_list, xmin_list, ymin_list, xmax_list, ymax_list, _ = point_reader(ytr, factor)
    for conf, xmin, ymin, xmax, ymax in zip(conf_list, xmin_list, ymin_list, xmax_list, ymax_list):
        if float(conf) > 0.5:
            cv2.rectangle(image, (xmin, ymin), (xmax, ymax), (255, 0, 0), 1, 1)
    plt.imshow(image)

def IoU(ypred, ytrue, factor = FAC):
    xmin_pred, ymin_pred, xmax_pred, ymax_pred = point_reader(ypred, factor)
    xmin_true, ymin_true, xmax_true, ymax_true = point_reader(ytrue, factor)

    int_x_min = max(xmin_true, xmin_pred)
    int_y_min = max(ymin_pred, ymin_true)
    int_x_max = min(xmax_true, xmax_pred)
    int_y_max = min(ymax_pred, ymax_true)

    int_width  = int_x_max - int_x_min 
    int_height = int_y_max - int_y_min

    int_area = int_width * int_height 

    pred_area = (xmax_pred - xmin_pred) * (ymax_pred - ymin_pred)
    true_area = (xmax_true - xmax_true) * (ymax_true - ymin_true)

    uno_area = pred_area + true_area - int_area 

    return int_area / uno_area

In [None]:
c = random.randint(0, xtrain.shape[0])
plot_box(xtrain[c], ytrain[c])

In [None]:
plt.imshow(xtrain[1])

In [None]:
def se_block(input_tensor, reduction_ratio=32, kernel_reg=1e-4, drop_se=0.2):
    channels = input_tensor.shape[-1]
    x = layers.GlobalAveragePooling2D()(input_tensor)
    x = layers.Reshape((1, 1, channels))(x)
    x = layers.Dense(channels // reduction_ratio, activation='relu', use_bias=False, kernel_regularizer=l2(kernel_reg))(x)
    x = layers.Dropout(drop_se)(x)
    x = layers.Dense(channels, activation='sigmoid', use_bias=False, kernel_regularizer=l2(kernel_reg))(x)
    output = layers.Multiply()([input_tensor, x])
    return output

In [None]:
def ConvOp(x, resx, denx:list, filters:int, num:int = DEF_NUM, kernel:int = DEF_KER, 
           stride:int = DEF_STR, moment:float = MOMENT, activation:str=ACTF, 
           kernel_reg:float=KREG, bias_reg:float=BREG, var_red:float=VRED, xp_factor = XFAC):
    MBilters = filters
    xlist = denx

    kernels = range(-(kernel // 2), num - kernel // 2) if num > 1 else range(1)

    for c in kernels:
        reduce = abs(c) * 2 if c != 0 else 1
        rilters = int(MBilters / reduce)
        xl = layers.Conv2D(rilters, kernel + 2*c, padding="same", strides= stride,
                           kernel_regularizer=l2(kernel_reg), 
                           bias_regularizer=l1(bias_reg))(x)
        xl = layers.BatchNormalization(momentum=moment)(xl)
        xlist.append(xl)
    xl = layers.MaxPool2D(3, stride, padding="same")(x)
    xl = layers.BatchNormalization(momentum=moment)(xl)

    xlist.append(xl)
    x = layers.Concatenate()(xlist)
    x = layers.Activation(activation)(x)

    x = layers.Conv2D(filters, kernel, padding="same", 
                      kernel_regularizer=l2(kernel_reg), 
                      bias_regularizer=l1(bias_reg))(x)
    x = se_block(x)
    nesx = layers.BatchNormalization(momentum=moment)(x)
    x = layers.Activation(activation)(nesx + resx)

    denx.append(x)
    return x, nesx, denx

In [None]:
def ConvBlock(x, residual_connect, dense_connect, filters, num_delta_x, 
              kernel:int = DEF_KER, moment:float = MOMENT, activation:str=ACTF, 
              kernel_reg:float=KREG, bias_reg:float=BREG, var_red:float=VRED, xp_factor = XFAC):
    
    for c, dense in enumerate(dense_connect):
        dense = layers.Conv2D(filters, 3, padding="same", strides=2,
                         kernel_regularizer = l2(kernel_reg), 
                         bias_regularizer = l1(bias_reg))(dense)
        dense = layers.BatchNormalization(momentum=moment)(dense)
        dense_connect[c] = layers.Activation(activation)(dense)
    
    if dense_connect != []:
        denx = layers.Concatenate()(dense_connect)
        denx = layers.Conv2D(filters, kernel, padding="same",
                            kernel_regularizer = l2(kernel_reg), 
                            bias_regularizer = l1(bias_reg))(denx)
        denx = layers.BatchNormalization(momentum=moment)(denx)
        denx = [denx]
    else:
        denx = []
    
    resx1 = layers.Conv2D(filters, 3, strides=2, padding="same",
                         kernel_regularizer = l2(kernel_reg), 
                         bias_regularizer = l1(bias_reg))(residual_connect)
    
    resx2 = layers.MaxPool2D(2, 2)(residual_connect)
    resx2 = layers.Conv2D(filters, 1,
                         kernel_regularizer = l2(kernel_reg), 
                         bias_regularizer = l1(bias_reg))(resx2)
    
    resx = resx1 + resx2
    
    for c, num in enumerate(num_delta_x):
        stride = 1 if c!=0 else 2
        x, resx, denx = ConvOp(x, resx, denx, filters, num, kernel, stride, moment, 
                               activation, kernel_reg, bias_reg)
    
    denx = layers.Concatenate()(denx)
    denx = layers.Conv2D(filters, kernel, padding="same",
                         kernel_regularizer = l2(kernel_reg), 
                         bias_regularizer = l1(bias_reg))(denx)
    denx = layers.BatchNormalization(momentum=moment)(denx)
    denx = layers.Activation(activation)(denx)

    dense_connect.append(denx)

    return x, resx, dense_connect

In [None]:
def positional_encoding(seq_length, depth):
    # Create a positional encoding matrix
    position = tf.range(seq_length, dtype=tf.float32)[:, tf.newaxis]
    div_term = tf.exp(tf.range(0, depth, 2, dtype=tf.float32) * -(tf.math.log(10000.0) / depth))
    
    # Calculate sine and cosine for even and odd indices
    pos_enc_even = tf.sin(position * div_term)
    pos_enc_odd = tf.cos(position * div_term)
    
    # Concatenate the even and odd positional encodings
    pos_enc = tf.concat([pos_enc_even, pos_enc_odd], axis=-1)
    
    return pos_enc

def transformer_block(x, num_heads, ff_dim, dropout_rate=0.1):
    # Multi-Head Self-Attention
    attention_output = layers.MultiHeadAttention(num_heads=num_heads, key_dim=x.shape[-1])(x, x)
    attention_output = layers.Dropout(dropout_rate)(attention_output)
    x = layers.LayerNormalization(epsilon=1e-6)(x + attention_output)  # Residual connection

    # Feed Forward Network
    ff_output = layers.Dense(ff_dim, activation='relu')(x)
    ff_output = layers.Dropout(dropout_rate)(ff_output)
    x = layers.LayerNormalization(epsilon=1e-6)(x + ff_output)  # Residual connection

    return x

In [None]:
def CNN(input_shape = IMG, num_num = [ [4], [3], [2], [1]], num_trans_block:int=3, filters=64, classes = len(class_list), 
         kernel:int = DEF_KER, moment:float = MOMENT, activation:str=ACTF, kernel_reg:float=KREG, 
         bias_reg:float=BREG, var_red:float=VRED, xp_factor = XFAC, gf=2):
    inp = layers.Input(input_shape)
    x = layers.Conv2D(filters, 7, padding="same", strides=2, 
                      kernel_regularizer=l2(kernel_reg), 
                      bias_regularizer = l1(bias_reg))(inp) # 256
    x = layers.Activation(activation)(x)
    resx = layers.Conv2D(filters, 3, padding="same", strides=1, 
                      kernel_regularizer=l2(kernel_reg), 
                      bias_regularizer = l1(bias_reg))(x) # 256
    x = layers.Activation(activation)(resx)

    dense_connect = []
    for num in num_num:
        x, resx, dense_connect = ConvBlock(x, resx, dense_connect, filters, num, kernel, moment, 
                                           activation, kernel_reg, bias_reg, var_red, xp_factor)
        filters = filters * gf
        
    x = layers.Conv2D(filters, 1, padding="same", 
                      kernel_regularizer=l2(kernel_reg), 
                      bias_regularizer = l1(bias_reg))(x)
    x = layers.BatchNormalization(momentum = moment)(x)
    x = layers.Activation(activation)(x)
        
    # x = layers.Reshape((GRID[0] * GRID[1], filters))(x)  # Reshape to (batch_size, seq_length, channels)

    # # Add positional encoding
    # pos_enc = positional_encoding(GRID[0] * GRID[1], filters)
    # x += pos_enc  # Add positional encoding to the input features

    # # Transformer block
    # for _ in range(num_trans_block):
    #     x = transformer_block(x, num_heads=8, ff_dim=filters)

    x = layers.Flatten()(x)
    x =  layers.Dropout(0.1)(x)
    x = layers.Dense(4096, activation="relu", 
                      kernel_regularizer=l2(kernel_reg), 
                      bias_regularizer = l1(bias_reg))(x)
    
    cce = layers.Dropout(0.3)(x)
    cce = layers.Dense(1024, activation="relu", 
                      kernel_regularizer=l2(kernel_reg), 
                      bias_regularizer = l1(bias_reg))(cce)
    cce = layers.Dropout(0.3)(cce)
    cce = layers.Dense(GRID[0] * GRID[1] * classes, 
                      kernel_regularizer=l2(kernel_reg), 
                      bias_regularizer = l1(bias_reg))(cce)
    cce = layers.Reshape((GRID[0], GRID[1], classes))(cce)
    classification = layers.Softmax()(cce)
    
    con = layers.Dropout(0.5)(x)
    con = layers.Dense(1024, activation="relu", 
                      kernel_regularizer=l2(kernel_reg), 
                      bias_regularizer = l1(bias_reg))(con)
    con = layers.Dropout(0.5)(con)
    con = layers.Dense(GRID[0] * GRID[1], 
                      kernel_regularizer=l2(kernel_reg), 
                      bias_regularizer = l1(bias_reg))(con)
    con = layers.Reshape((GRID[0], GRID[1], 1))(con)
    confidence = layers.Activation("sigmoid")(con)
    
    box = layers.Dropout(0.01)(x)
    box = layers.Dense(4096, activation="relu", 
                      kernel_regularizer=l2(kernel_reg), 
                      bias_regularizer = l1(bias_reg))(box)
    box = layers.Dense(GRID[0] * GRID[1] * 4, 
                      kernel_regularizer=l2(kernel_reg), 
                      bias_regularizer = l1(bias_reg))(box)
    boxes = layers.Reshape((GRID[0], GRID[1], 4))(box)
    
    out = layers.Concatenate()([classification, confidence, boxes])
    
    model = models.Model(inputs=inp, outputs=out )
    return model

In [None]:
model = CNN()
model.summary()

In [None]:


def binary_focal_loss(y_true, y_pred, alpha=0.25, gamma=2.0, neg_weight=0.1):
    y_pred = tf.clip_by_value(y_pred, 1e-7, 1 - 1e-7)  # Avoid log(0)
    pos_loss = -alpha * tf.pow(1 - y_pred, gamma) * y_true * tf.math.log(y_pred)
    neg_loss = -(1 - alpha) * tf.pow(y_pred, gamma) * (1 - y_true) * tf.math.log(1 - y_pred)
    neg_loss *= neg_weight
    focal_loss = pos_loss + neg_loss
    return focal_loss

def sparse_categorical_focal_loss(y_true, y_pred, alpha=0.25, gamma=2.0, class_weights=None):
    y_pred = tf.clip_by_value(y_pred, 1e-7, 1 - 1e-7)  # Avoid log(0)
    y_true_one_hot = tf.one_hot(tf.cast(y_true, tf.int32), depth=tf.shape(y_pred)[-1])
    y_true_one_hot = tf.squeeze(y_true_one_hot, axis=-2)  # Remove extra dimension if it exists
    pred_prob = tf.reduce_sum(y_pred * y_true_one_hot, axis=-1)
    
    focal_loss = -alpha * tf.pow(1 - pred_prob, gamma) * tf.math.log(pred_prob)
    
    if class_weights is not None:
        focal_loss *= class_weights
    
    return focal_loss

def smooth_l1_loss(y_true, y_pred, beta=1.0):
    diff = tf.abs(y_true - y_pred)
    return tf.reduce_mean(diff, axis=-1)

def point_reader_tensor(ytr, factor=FAC):
    centerx, centery, width_by_2, height_by_2 = (
        ytr[..., -4] * factor,
        ytr[..., -3] * factor,
        ytr[..., -2] * factor,
        ytr[..., -1] * factor,
    )
    xmin, ymin = centerx , centery
    xmax, ymax = centerx + width_by_2, centery + height_by_2
    return xmin, ymin, xmax, ymax

def IoU_tensor(y_pred, y_true, factor=FAC, smooth=1e-6):
    xmin_pred, ymin_pred, xmax_pred, ymax_pred = point_reader_tensor(y_pred, factor)
    xmin_true, ymin_true, xmax_true, ymax_true = point_reader_tensor(y_true, factor)
    int_xmin, int_ymin = tf.math.maximum(xmin_pred, xmin_true), tf.math.maximum(ymin_pred, ymin_true)
    int_xmax, int_ymax = tf.math.minimum(xmax_pred, xmax_true), tf.math.minimum(ymax_pred, ymax_true)
    int_area = tf.math.maximum(0.0, int_xmax - int_xmin) * tf.math.maximum(0.0, int_ymax - int_ymin)
    pred_area = tf.math.maximum(0.0, (xmax_pred - xmin_pred) * (ymax_pred - ymin_pred))
    true_area = tf.math.maximum(0.0, (xmax_true - xmin_true) * (ymax_true - ymin_true))
    union_area = pred_area + true_area - int_area
    iou = (int_area) / (union_area + smooth)
    return iou

def robust_iou_loss(y_pred, y_true, factor=FAC, smooth=1e-6):
    iou = IoU_tensor(y_pred, y_true, factor, smooth)
    return 1 - iou  # Directly using IoU for loss

def dice_loss_boxes(y_pred, y_true, factor=FAC, smooth=1e-6):
    xmin_pred, ymin_pred, xmax_pred, ymax_pred = point_reader_tensor(y_pred, factor)
    xmin_true, ymin_true, xmax_true, ymax_true = point_reader_tensor(y_true, factor)
    int_xmin, int_ymin = tf.math.maximum(xmin_pred, xmin_true), tf.math.maximum(ymin_pred, ymin_true)
    int_xmax, int_ymax = tf.math.minimum(xmax_pred, xmax_true), tf.math.minimum(ymax_pred, ymax_true)
    int_area = tf.math.maximum(0.0, int_xmax - int_xmin) * tf.math.maximum(0.0, int_ymax - int_ymin)
    pred_area = (xmax_pred - xmin_pred) * (ymax_pred - ymin_pred)
    true_area = (xmax_true - xmin_true) * (ymax_true - ymin_true)
    
    dice_score = (2 * int_area + smooth) / (pred_area + true_area + smooth)
    return 1 - dice_score  # Return the loss as 1 - Dice score

def loss_function(y_true, y_pred, alpha=0.25, gamma=2.0, neg_weight=0.1, lambda_coord=5.0, 
                  lambda_noobj=0.5, lambda_class=1.0, lambda_iou=1.0, lambda_dice=1.0, 
                  factor=FAC, smooth=1e-6, beta=1.0, class_weights=None):
    class_true = y_true[..., :-5]  # Class labels (one-hot encoded)
    obj_true = y_true[..., -5:-4]  # Objectness score
    bbox_true = y_true[..., -4:]  # Bounding box (x, y, w, h)

    class_pred = y_pred[..., :-5]  # Predicted class scores
    obj_pred = y_pred[..., -5:-4]  # Predicted objectness score
    bbox_pred = y_pred[..., -4:]  # Predicted bounding box (x, y, w, h)

    obj_mask = obj_true  # Only consider object cells for loss
    noobj_mask = (1 - obj_true) * obj_pred  # No-object cells

    focal_loss_obj = binary_focal_loss(obj_true, obj_pred * obj_true, alpha, gamma, neg_weight)
    focal_loss_class = sparse_categorical_focal_loss(class_true, class_pred * obj_mask, alpha, gamma, class_weights)

    loc_loss = smooth_l1_loss(bbox_true, bbox_pred * obj_mask, beta)
    iou_loss = robust_iou_loss(bbox_pred * obj_mask, bbox_true, factor, smooth)
    dice_loss = dice_loss_boxes(bbox_pred * obj_mask, bbox_true, factor, smooth)

    class_loss = tf.reduce_mean(focal_loss_class) 
    focal_loss_obj = tf.reduce_mean(focal_loss_obj)
    loc_loss = tf.reduce_sum(loc_loss)
    iou_loss = tf.reduce_mean(iou_loss)
    dice_loss = tf.reduce_mean(dice_loss)
    no_obj_loss = tf.reduce_mean(noobj_mask)

    total_loss =  focal_loss_obj + no_obj_loss + class_loss + loc_loss + iou_loss 

    return total_loss

In [None]:
def iou_metric(y_true, y_pred, confidence_threshold=0.5):
    """IoU metric with confidence threshold based on union of true and predicted masks."""
    # Separate bounding box coordinates and confidence scores
    bbox_true = y_true[..., -4:]
    bbox_pred = y_pred[..., -4:]
    obj_pred = y_true[..., -5:-4]

    # Create confidence mask based on union of true and predicted objectness scores
    confidence_mask = tf.cast((obj_pred >= confidence_threshold), tf.float32)

    # Apply mask to bounding box predictions and ground truth
    masked_bbox_true = bbox_true * confidence_mask
    masked_bbox_pred = bbox_pred * confidence_mask

    # Extract coordinates for the predicted and true boxes
    xmin_pred, ymin_pred, xmax_pred, ymax_pred = point_reader_tensor(masked_bbox_pred)
    xmin_true, ymin_true, xmax_true, ymax_true = point_reader_tensor(masked_bbox_true)

    # Compute intersection
    int_xmin = tf.math.maximum(xmin_pred, xmin_true)
    int_ymin = tf.math.maximum(ymin_pred, ymin_true)
    int_xmax = tf.math.minimum(xmax_pred, xmax_true)
    int_ymax = tf.math.minimum(ymax_pred, ymax_true)
    int_area = tf.math.maximum(0.0, int_xmax - int_xmin) * tf.math.maximum(0.0, int_ymax - int_ymin)

    # Compute union
    pred_area = tf.math.maximum(0.0, (xmax_pred - xmin_pred) * (ymax_pred - ymin_pred))
    true_area = tf.math.maximum(0.0, (xmax_true - xmin_true) * (ymax_true - ymin_true))
    union_area = pred_area + true_area - int_area

    # IoU: intersection over union
    iou = (int_area ) / (union_area + 1e-6)  # Add a small epsilon for numerical stability

    # Return the mean IoU
    return tf.reduce_mean(iou)


def accuracy_metric(y_true, y_pred):
    # Extract class labels and predictions
    class_true = y_true[..., :-5]  # True class labels (one-hot encoded)
    class_pred = y_pred[..., :-5]  # Predicted class probabilities

    # Extract objectness score and create a mask
    obj_true = y_true[..., -5:-4]  # Objectness score
    normal_mask = tf.cast(obj_true >= 0.5, tf.float32)  # Mask for object cells

    # Predicted and true class indices
    pred_class_indices = tf.argmax(class_pred, axis=-1)  # Predicted class index
    true_class_indices = tf.argmax(class_true, axis=-1)  # True class index

    # Compute matches
    matches = tf.cast(tf.equal(pred_class_indices, true_class_indices), tf.float32)

    # Apply mask to consider only object cells
    matches *= normal_mask[..., 0]

    # Compute accuracy
    total_objects = tf.reduce_sum(normal_mask[..., 0]) + 1e-7  # Avoid division by zero
    accuracy = tf.reduce_sum(matches) / total_objects

    return accuracy

def categorical_loss_metric(y_true, y_pred, alpha=0.25, gamma=2.0):
    # Extract the class labels and class predictions
    class_true = y_true[..., :-5]  # True class labels (sparse or one-hot encoded)
    class_pred = y_pred[..., :-5]  # Predicted class probabilities
    
    # Mask to ensure we're only evaluating predictions where objectness > 0.5
    obj_true = y_true[..., -5:-4]
    normal_mask = tf.cast(obj_true >= 0.5, tf.float32)

    # Compute Sparse Categorical Focal Loss (Classification Metric)
    y_true_one_hot = tf.one_hot(tf.cast(tf.argmax(class_true, axis=-1), tf.int32), depth=tf.shape(class_pred)[-1])
    pred_prob = tf.reduce_sum(class_pred * y_true_one_hot, axis=-1)
    focal_loss = -alpha * tf.pow(1 - pred_prob, gamma) * tf.math.log(pred_prob + 1e-7)

    # Apply mask
    focal_loss *= normal_mask[..., 0]
    
    # Return mean loss as a metric
    return tf.reduce_mean(focal_loss)

def bce_metrics(y_true, y_pred, confidence_threshold=0.5):
    """Binary focal loss metric with confidence filter."""
    obj_true = y_true[..., -5:-4]
    obj_pred = y_pred[..., -5:-4]

    # Compute binary focal loss only for confident predictions
    bce_loss = binary_focal_loss(obj_true, obj_pred)
    return tf.reduce_mean(bce_loss)  # Average over confident predictions

def smooth_l1_metric(y_true, y_pred, beta=1.0, confidence_threshold=0.5):
    # Separate bounding box coordinates and confidence scores
    bbox_true = y_true[..., -4:]
    bbox_pred = y_pred[..., -4:]
    obj_pred = y_pred[..., -5:-4]

    # Apply confidence threshold
    confidence_mask = tf.cast(obj_pred >= confidence_threshold, tf.float32)

    # Apply mask to bounding box predictions and ground truth
    masked_bbox_true = bbox_true * confidence_mask
    masked_bbox_pred = bbox_pred * confidence_mask

    # Compute Smooth L1 loss
    diff = tf.abs(masked_bbox_true - masked_bbox_pred)
    loss = tf.where(diff < beta, 0.5 * tf.square(diff) / beta, diff - 0.5 * beta)

    # Return the mean as the metric value
    return tf.reduce_mean(loss)


def confidence_accuracy(y_true, y_pred, threshold=0.5):
    # Extract objectness (confidence) scores
    obj_true = y_true[..., -5] # Ground truth object confidence (batch, grid, grid)
    obj_pred = y_pred[..., -5]  # Predicted object confidence (batch, grid, grid)

    # Create binary masks for predictions and ground truth based on the threshold
    pred_mask = tf.cast(obj_pred >= threshold, tf.float32)  # Predicted confidence >= threshold
    true_mask = tf.cast(obj_true >= threshold, tf.float32)  # True object confidence >= threshold

    # Compute intersection and union of the masks
    intersection = tf.reduce_sum(pred_mask * true_mask)  # Overlap between predicted and true boxes
    union = tf.reduce_sum(pred_mask + true_mask) - intersection  # Union of predicted and true boxes

    # Avoid division by zero
    union = tf.maximum(union, 1e-6)

    # Compute confidence accuracy
    confidence_acc = intersection / union

    return confidence_acc

import tensorflow as tf

def confidence_accuracy2(y_true, y_pred, threshold=0.5):
    ytrue = y_true[..., -5]
    ypred = y_pred[..., -5]

    # Convert predicted probabilities to binary predictions based on the threshold
    y_pred_binary = tf.cast(ypred >= threshold, tf.float32)
    
    # Compute binary accuracy
    correct_predictions = tf.equal(ytrue, y_pred_binary)
    accuracy = tf.reduce_mean(tf.cast(correct_predictions, tf.float32))
    
    return accuracy





In [None]:
def step_decay(epoch, lr):
    drop_rate = 0.75
    epochs_drop = 10
    if epoch % epochs_drop == 0 and epoch > 0:
        return lr * drop_rate
    return lr

class RandomGridBoundingBoxPlotterCallback(tf.keras.callbacks.Callback):
    def __init__(self, test_images, test_labels, class_names, factor=FAC, num_images=4):
        self.test_images = test_images
        self.test_labels = test_labels
        self.class_names = class_names
        self.factor = factor
        self.num_images = num_images
        plt.ion()  # Enable interactive plotting

    def on_epoch_end(self, epoch, logs=None):
        random_indices = np.random.choice(len(self.test_images), self.num_images, replace=False)
        predictions = self.model.predict(self.test_images[random_indices])

        # Create a figure for the grid
        fig, axes = plt.subplots(1, self.num_images, figsize=(15, 5))
        fig.suptitle(f"Epoch {epoch + 1} Predictions", fontsize=16)

        for idx, ax in enumerate(axes):
            img = self.test_images[random_indices[idx]]
            true_boxes = self.test_labels[random_indices[idx]]
            pred_boxes = predictions[idx]

            # Clear axes and plot image
            ax.clear()
            ax.imshow(img)
            ax.axis("off")

            # Plot true boxes (green) and predicted boxes (red)
            self.plot_boxes(ax, true_boxes, pred_boxes)

        # Refresh the plot
        plt.draw()
        plt.pause(0.001)
        plt.close(fig)

    def plot_boxes(self, ax, true_boxes, pred_boxes):
        grid_size = 4  # Defined by the model output shape
        for y in range(grid_size):
            for x in range(grid_size):
                # True boxes: Assume last dimension is (1 + 5) -> object confidence + bbox
                true_box = true_boxes[y, x]
                if true_box[-5] > 0.5:  # Confidence threshold
                    xmin, ymin, xmax, ymax = point_reader_tensor(true_box[-4:], self.factor)
                    ax.add_patch(
                        plt.Rectangle(
                            (y * self.factor + xmin, x * self.factor + ymin),
                            xmax - xmin,  # Corrected width
                            ymax - ymin,  # Corrected height
                            linewidth=2,
                            edgecolor="green",
                            facecolor="none",
                        )
                    )
                    ax.text(
                        y * self.factor + xmin, 
                        x * self.factor + ymax,
                        f"True: {self.class_names[int(true_box[0])]}",
                        color="black",
                        fontsize=6,
                        backgroundcolor="green",
                    )

                # Predicted boxes: Assume last dimension is (8 + 5) -> class probabilities + bbox
                pred_box = pred_boxes[y, x]
                class_idx = tf.argmax(pred_box[:-5])
                class_name = self.class_names[class_idx]
                obj_confidence = pred_box[-5]
                if obj_confidence > 0.5:  # Confidence threshold
                    xmin, ymin, xmax, ymax = point_reader_tensor(pred_box[-4:], self.factor)
                    ax.add_patch(
                        plt.Rectangle(
                            (y * self.factor + xmin, x * self.factor + ymin),
                            xmax - xmin,  # Corrected width
                            ymax - ymin,  # Corrected height
                            linewidth=2,
                            edgecolor="red",
                            facecolor="none",
                        )
                    )
                    ax.text(
                        y * self.factor + xmin, 
                        x * self.factor + ymin - 5 ,
                        f"Pred: {class_name} ({obj_confidence:.2f})",
                        color="black",
                        fontsize=8,
                        backgroundcolor="red",
                    )

# Create the callback
lr_scheduler = tf.keras.callbacks.LearningRateScheduler(step_decay)
random_plotter_callback = RandomGridBoundingBoxPlotterCallback(
    test_images=xtrain,  # A batch of test images
    test_labels=ytrain,  # Corresponding ground truth labels
    class_names=class_list,
    factor=FAC,
    num_images=4
)

In [None]:
class ObjectDetectionLoss(tf.keras.losses.Loss):
    def __init__(self, num_classes, alpha=0.25, gamma=2.0, lambda_cls=1.0, lambda_bbox=1.0, lambda_obj=1.0):
        super(ObjectDetectionLoss, self).__init__()
        self.num_classes = num_classes
        self.alpha = alpha  # Focal loss alpha
        self.gamma = gamma  # Focal loss gamma
        self.lambda_cls = lambda_cls  # Weight for classification loss
        self.lambda_bbox = lambda_bbox  # Weight for bounding box regression loss
        self.lambda_obj = lambda_obj  # Weight for objectness loss

    def call(self, y_true, y_pred):
        # Parse predictions
        bbox_preds = y_pred[..., -4:]  # Bounding box predictions
        class_preds = y_pred[..., :-5]  # Class scores
        objectness_preds = y_pred[..., -5:-4]  # Objectness score

        # Parse ground truth
        bbox_targets = y_true[..., -4:]  # Bounding box targets
        class_targets = y_true[..., :-5]  # One-hot class targets
        objectness_targets = y_true[..., -5:-4]  # Objectness targets

        # Bounding box regression loss (CIoU loss)
        bbox_loss = self.ciou_loss(bbox_targets, bbox_preds) * objectness_targets[..., 0]

        # Classification loss (Focal Loss)
        class_loss = self.focal_loss(class_targets, class_preds) * objectness_targets[..., 0]

        # Objectness loss (Binary Cross-Entropy)
        objectness_loss = tf.keras.losses.binary_crossentropy(objectness_targets, objectness_preds)

        # Combine the losses
        total_loss = (
            self.lambda_bbox * tf.reduce_mean(bbox_loss) +
            self.lambda_cls * tf.reduce_mean(class_loss) +
            self.lambda_obj * tf.reduce_mean(objectness_loss)
        )
        return total_loss


    def focal_loss(self, y_true, y_pred):
        # Ensure predictions are within a valid range to avoid log(0)
        y_pred = tf.clip_by_value(y_pred, 1e-7, 1.0 - 1e-7)
        y_true = tf.one_hot(tf.argmax(y_true, axis=-1), depth=self.num_classes)

        # Compute focal loss
        cross_entropy = -y_true * tf.math.log(y_pred)
        scaling_factor = tf.pow(1 - y_pred, self.gamma)
        focal_loss = self.alpha * scaling_factor * cross_entropy
        return tf.reduce_sum(focal_loss, axis=-1)

    def ciou_loss(self, y_true, y_pred):
        # Convert (x_min, y_min, width, height) to (x1, y1, x2, y2)
        x1_true, y1_true, w_true, h_true = tf.split(y_true, 4, axis=-1)
        x1_pred, y1_pred, w_pred, h_pred = tf.split(y_pred, 4, axis=-1)

        x2_true = x1_true + w_true
        y2_true = y1_true + h_true
        x2_pred = x1_pred + w_pred
        y2_pred = y1_pred + h_pred

        # Ensure predictions are positive and non-zero
        w_true = tf.maximum(w_true, 1e-6)
        h_true = tf.maximum(h_true, 1e-6)
        w_pred = tf.maximum(w_pred, 1e-6)
        h_pred = tf.maximum(h_pred, 1e-6)

        # Calculate intersection
        xi1 = tf.maximum(x1_true, x1_pred)
        yi1 = tf.maximum(y1_true, y1_pred)
        xi2 = tf.minimum(x2_true, x2_pred)
        yi2 = tf.minimum(y2_true, y2_pred)
        intersection = tf.maximum(0.0, xi2 - xi1) * tf.maximum(0.0, yi2 - yi1)

        # Calculate union
        area_true = w_true * h_true
        area_pred = w_pred * h_pred
        union = area_true + area_pred - intersection
        union = tf.maximum(union, 1e-6)  # Prevent division by zero

        # IoU
        iou = intersection / union

        # Center distance
        cx_true = x1_true + 0.5 * w_true
        cy_true = y1_true + 0.5 * h_true
        cx_pred = x1_pred + 0.5 * w_pred
        cy_pred = y1_pred + 0.5 * h_pred
        center_dist = tf.square(cx_true - cx_pred) + tf.square(cy_true - cy_pred)

        # Diagonal of the smallest enclosing box
        x_min_enclose = tf.minimum(x1_true, x1_pred)
        y_min_enclose = tf.minimum(y1_true, y1_pred)
        x_max_enclose = tf.maximum(x2_true, x2_pred)
        y_max_enclose = tf.maximum(y2_true, y2_pred)
        diagonal_enclose = tf.square(x_max_enclose - x_min_enclose) + tf.square(y_max_enclose - y_min_enclose)
        diagonal_enclose = tf.maximum(diagonal_enclose, 1e-6)  # Prevent division by zero

        # Aspect ratio penalty
        aspect_ratio_true = w_true / h_true
        aspect_ratio_pred = w_pred / h_pred
        aspect_ratio_penalty = tf.square(tf.math.log(tf.maximum(aspect_ratio_true, 1e-6)) - 
                                        tf.math.log(tf.maximum(aspect_ratio_pred, 1e-6)))

        # CIoU with penalty terms
        ciou = iou - (center_dist / diagonal_enclose) - (0.5 * aspect_ratio_penalty)

        # Avoid NaN by clipping CIoU
        ciou = tf.clip_by_value(ciou, -1.0, 1.0)

        # Regression loss (Harsh version using cubic penalty)
        abs_diff = tf.abs(y_true - y_pred)  # Absolute difference
        regression_loss = tf.reduce_mean(tf.pow(abs_diff, 3), axis=-1)   # Cubic penalty for larger deviations

        # Assuming ciou is also a part of the overall loss, reduce it
        ciou = tf.reduce_mean(ciou, axis=-1)


        # Combine CIoU and regression loss
        alpha = 0.5 # Weight for CIoU
        beta = 0.5   # Weight for regression loss
        total_bbox_loss = alpha * (1 - ciou) + beta * regression_loss

        return total_bbox_loss


In [None]:
xtrain = xtrain / 255.0
max(xtrain.flatten())

In [None]:
max(ytrain[:, :, :, 1:].flatten())

In [None]:
model.compile(
    optimizer= optim.legacy.Adam(learning_rate= 1e-4),
    loss=ObjectDetectionLoss(num_classes=len(class_list)),
    metrics=[bce_metrics, confidence_accuracy, confidence_accuracy2, smooth_l1_metric, iou_metric, categorical_loss_metric, accuracy_metric]
)
model.fit(xtrain, ytrain, epochs=250, verbose=1, validation_split=0.2, 
          callbacks=[lr_scheduler])



In [None]:
model.evaluate(x_test, y_test)

In [None]:
results = model(x_test[0:1])[0]

In [None]:
results[:, :, 8]

In [None]:
y_test[0:1, :, :, 0]

In [None]:
plot_box(x_test[0], results.numpy())