In [3]:
import tensorflow as tf
from tensorflow.keras.layers import Input, GlobalAveragePooling2D, Dense, Dropout,Conv2D,BatchNormalization,LeakyReLU,Concatenate,UpSampling2D,MaxPool2D,Add
from keras import backend as K
import math
import numpy as np

In [None]:
gpus = tf.config.experimental.list_physical_devices('GPU')
for gpu in gpus:
    tf.config.experimental.set_memory_growth(gpu, True)

In [4]:
NUM_CLASSES = 60
NUM_ANCHORS = 3
constant_anchors = {
    "52":[[10,13], [16,30], [33,23]],
    "26":[[30,61], [62,45], [59,119]],
    "13":[[116,90], [156,198], [373,326]],
}

In [6]:
def LeakyConvNoStride(input_tensor,num_filters = 32,num_kernel = 1):
    output = Conv2D(kernel_size=num_kernel,filters = num_filters,padding="same")(input_tensor)
    output = BatchNormalization()(output)
    output = LeakyReLU()(output)
    return output

def LeakyConv(input_tensor,num_filters = 32,num_kernel = 1):
    output = Conv2D(kernel_size=num_kernel,strides=(2,2),filters = num_filters,padding="same")(input_tensor)
    output = BatchNormalization()(output)
    output = LeakyReLU()(output)
    return output

def CSPBlock(input_tensor):
    general_filters = input_tensor.shape[-1]
    print("in",input_tensor.shape)

    output = Conv2D(filters = general_filters // 2,kernel_size=3,padding="same")(input_tensor)
    output = BatchNormalization()(output)
    output = LeakyReLU()(output)

    split = tf.keras.layers.Lambda(lambda t: tf.split(t, 2, axis=-1))(output)
    first_branch, second_branch = split[0], split[1]

    #first branch

    first_branch = Conv2D(filters = general_filters // 4,kernel_size=3,padding="same")(first_branch)
    first_branch = BatchNormalization()(first_branch)
    first_branch = LeakyReLU()(first_branch)

    #second branch

    second_branch = Conv2D(filters = general_filters // 4,kernel_size=3,padding="same")(second_branch)
    second_branch = BatchNormalization()(second_branch)
    second_branch = LeakyReLU()(second_branch)

    second_branch = Conv2D(filters = general_filters // 4,kernel_size=3,padding="same")(second_branch)
    second_branch = BatchNormalization()(second_branch)
    second_branch = LeakyReLU()(second_branch)

    #concat

    preoutput = Add()([first_branch,second_branch])

    preoutput = Conv2D(kernel_size = 3,filters = general_filters // 2,padding="same")(preoutput)
    preoutput = BatchNormalization()(preoutput)
    preoutput = LeakyReLU()(preoutput)

    output = Concatenate()([preoutput,output])
    print("out",output.shape)

    return output


#model 
input_tensor = Input(shape=(416, 416, 3))
x = LeakyConv(input_tensor,num_filters=32,num_kernel = 3)
x = LeakyConv(x,num_filters=64,num_kernel = 3)
x = LeakyConv(x,num_filters=128,num_kernel = 3)
x = CSPBlock(x)
x = MaxPool2D(strides=1,pool_size=2,padding="same")(x)
x = CSPBlock(x)
end1 = x
x = MaxPool2D(strides=1,pool_size=2,padding="same")(x)
x = CSPBlock(x)
x = MaxPool2D(strides=1,pool_size=2,padding="same")(x)
x = LeakyConv(x,num_filters=256,num_kernel = 3)
x = LeakyConv(x,num_filters=512,num_kernel = 1)
end2 = x
print(end1.shape)

print(x.shape)

x = LeakyConvNoStride(x,num_filters=512,num_kernel = 3)
x = LeakyConvNoStride(x,num_filters=1024,num_kernel = 1)

yolo_output_13 = Conv2D(filters=NUM_ANCHORS * (5 + NUM_CLASSES), kernel_size=1, padding='same')(x)

print(x.shape)

end2 = LeakyConvNoStride(end2,num_filters=128,num_kernel = 1)
end2 = UpSampling2D(size=4)(end2)
end2 = Concatenate()([end1,end2])

end2 = LeakyConv(end2,num_filters=256,num_kernel = 3)
end2 = LeakyConvNoStride(end2,num_filters=512,num_kernel = 1)

yolo_output_26 = Conv2D(filters=NUM_ANCHORS * (5 + NUM_CLASSES), kernel_size=1, padding='same')(end2)

print(yolo_output_13.shape)
print(yolo_output_26.shape)

model = tf.keras.Model(inputs=input_tensor, outputs=[yolo_output_13, yolo_output_26])
model.summary()


in (None, 52, 52, 128)
out (None, 52, 52, 128)
in (None, 52, 52, 128)
out (None, 52, 52, 128)
in (None, 52, 52, 128)
out (None, 52, 52, 128)
(None, 52, 52, 128)
(None, 13, 13, 512)
(None, 13, 13, 1024)
(None, 13, 13, 195)
(None, 26, 26, 195)


In [None]:
PHOTO_SIZE = 416


def iou(box1, box2):
    """
    Calculate IoU between two bounding boxes.
    Each box is [x_min, y_min, x_max, y_max]
    """
    # Calculate intersection box coordinates
    inter_xmin = max(box1[0], box2[0])
    inter_ymin = max(box1[1], box2[1])
    inter_xmax = min(box1[2], box2[2])
    inter_ymax = min(box1[3], box2[3])

    # Compute intersection area
    inter_width = max(inter_xmax - inter_xmin, 0)
    inter_height = max(inter_ymax - inter_ymin, 0)
    inter_area = inter_width * inter_height

    # Compute areas of the boxes
    box1_area = (box1[2] - box1[0]) * (box1[3] - box1[1])
    box2_area = (box2[2] - box2[0]) * (box2[3] - box2[1])

    # Compute union area
    union_area = box1_area + box2_area - inter_area

    # Avoid division by zero
    if union_area == 0:
        return 0

    # IoU calculation
    iou_value = inter_area / union_area
    return iou_value

def box_to_corners(box):
    """
    Convert [x_center, y_center, width, height] to [x_min, y_min, x_max, y_max]
    """
    x_center, y_center, w, h = box
    x_min = x_center - w / 2
    y_min = y_center - h / 2
    x_max = x_center + w / 2
    y_max = y_center + h / 2
    return [x_min, y_min, x_max, y_max]

def iou_center(box1, box2):
    """
    Calculate IoU between two bounding boxes.
    Each box is [x_center, y_center, width, height]
    """
    box1_corners = box_to_corners(box1)
    box2_corners = box_to_corners(box2)
    return iou(box1_corners, box2_corners)

def ciou(boxes1, boxes2, eps=1e-7):
    """
    Calculate CIoU between boxes1 and boxes2

    boxes1, boxes2: [..., 4], format = [x_center, y_center, width, height], normalized coords (0~1)
    returns: tensor [...], CIoU values
    """

    # Convert centers to corners: (x1, y1, x2, y2)
    boxes1_x1 = boxes1[..., 0] - boxes1[..., 2] / 2
    boxes1_y1 = boxes1[..., 1] - boxes1[..., 3] / 2
    boxes1_x2 = boxes1[..., 0] + boxes1[..., 2] / 2
    boxes1_y2 = boxes1[..., 1] + boxes1[..., 3] / 2

    boxes2_x1 = boxes2[..., 0] - boxes2[..., 2] / 2
    boxes2_y1 = boxes2[..., 1] - boxes2[..., 3] / 2
    boxes2_x2 = boxes2[..., 0] + boxes2[..., 2] / 2
    boxes2_y2 = boxes2[..., 1] + boxes2[..., 3] / 2

    # Intersection box
    inter_x1 = tf.maximum(boxes1_x1, boxes2_x1)
    inter_y1 = tf.maximum(boxes1_y1, boxes2_y1)
    inter_x2 = tf.minimum(boxes1_x2, boxes2_x2)
    inter_y2 = tf.minimum(boxes1_y2, boxes2_y2)

    inter_w = tf.maximum(inter_x2 - inter_x1, 0)
    inter_h = tf.maximum(inter_y2 - inter_y1, 0)
    inter_area = inter_w * inter_h

    # Areas
    area1 = (boxes1_x2 - boxes1_x1) * (boxes1_y2 - boxes1_y1)
    area2 = (boxes2_x2 - boxes2_x1) * (boxes2_y2 - boxes2_y1)

    union_area = area1 + area2 - inter_area + eps
    iou = inter_area / union_area

    # center distance squared
    center_dist = tf.square(boxes1[..., 0] - boxes2[..., 0]) + tf.square(boxes1[..., 1] - boxes2[..., 1])

    # smallest enclosing box
    enclose_x1 = tf.minimum(boxes1_x1, boxes2_x1)
    enclose_y1 = tf.minimum(boxes1_y1, boxes2_y1)
    enclose_x2 = tf.maximum(boxes1_x2, boxes2_x2)
    enclose_y2 = tf.maximum(boxes1_y2, boxes2_y2)
    enclose_w = enclose_x2 - enclose_x1
    enclose_h = enclose_y2 - enclose_y1
    c2 = tf.square(enclose_w) + tf.square(enclose_h) + eps

    # aspect ratio consistency
    w1 = boxes1[..., 2]
    h1 = boxes1[..., 3]
    w2 = boxes2[..., 2]
    h2 = boxes2[..., 3]

    v = (4 / (3.14159265 ** 2)) * tf.square(tf.math.atan(w2 / (h2 + eps)) - tf.math.atan(w1 / (h1 + eps)))
    with tf.device('/CPU:0'):  # to avoid potential GPU precision errors
        alpha = v / (1 - iou + v + eps)

    ciou = iou - (center_dist / c2) - alpha * v

    return ciou


def sigmoid_focal_loss(y_true, y_pred, gamma=2.0, alpha=0.25):
    """Compute sigmoid focal loss.
    Reference Paper:
        "Focal Loss for Dense Object Detection"
        https://arxiv.org/abs/1708.02002

    Args:
        y_true: Ground truth targets,
            tensor of shape (?, num_boxes, num_classes).
        y_pred: Predicted logits,
            tensor of shape (?, num_boxes, num_classes).
        gamma: exponent of the modulating factor (1 - p_t) ^ gamma.
        alpha: optional alpha weighting factor to balance positives vs negatives.

    Returns:
        sigmoid_focal_loss: Sigmoid focal loss, tensor of shape (?, num_boxes).
    """
    sigmoid_loss = K.binary_crossentropy(y_true, y_pred, from_logits=True)

    pred_prob = tf.sigmoid(y_pred)
    p_t = ((y_true * pred_prob) + ((1 - y_true) * (1 - pred_prob)))
    modulating_factor = tf.pow(1.0 - p_t, gamma)
    alpha_weight_factor = (y_true * alpha + (1 - y_true) * (1 - alpha))

    sigmoid_focal_loss = modulating_factor * alpha_weight_factor * sigmoid_loss
    # sigmoid_focal_loss = tf.reduce_sum(sigmoid_focal_loss, axis=-1)

    return sigmoid_focal_loss

def what_grid_cell_it_resides(true_BB, resolution):
    """
    true_BB: [x, y, w, h] — all values are normalized in [0, 1]
    resolution: int — size of the output grid (e.g., 13 or 26)

    Returns:
        x_cell, y_cell — integer indices of grid cell
        normalized_BB — [x_offset_in_cell, y_offset_in_cell, w, h]
    """

    x = true_BB[0]
    y = true_BB[1]

    x_cell = int(x * resolution)
    y_cell = int(y * resolution)

    x_offset = x * resolution - x_cell
    y_offset = y * resolution - y_cell

    w = true_BB[2]  # already normalized
    h = true_BB[3]  # already normalized

    return x_cell, y_cell, [x_offset, y_offset, w, h]

def normalize_yolo_output(y_pred):
    """
    y_pred: (batch, S, S, 3, 5 + C)
    anchors: (3, 2) — anchor box sizes (width, height) in pixels
    grid_size: int — S = 13 or 26 typically

    Returns:
        Tensor of shape (batch, S, S, 3, 5 + C) with:
        - x, y: normalized center coords in [0,1]
        - w, h: normalized width and height in [0,1]
        - objectness: sigmoid
        - class scores: sigmoid
    """
    grid_size = y_pred.shape[1]

    anchors = constant_anchors[grid_size]

    anchors = tf.convert_to_tensor(anchors, dtype=tf.float32)
    anchors = anchors / tf.constant([PHOTO_SIZE, PHOTO_SIZE], dtype=tf.float32)  # normalize

    # Build grid
    grid_y = tf.range(grid_size, dtype=tf.float32)
    grid_x = tf.range(grid_size, dtype=tf.float32)
    gx, gy = tf.meshgrid(grid_x, grid_y)
    grid = tf.stack([gx, gy], axis=-1)  # (S, S, 2)
    grid = tf.expand_dims(grid, axis=2)  # (S, S, 1, 2)
    grid = tf.tile(grid, [1, 1, 3, 1])   # (S, S, 3, 2)

    # Normalize predictions
    box_xy = tf.sigmoid(y_pred[..., 0:2])  # x, y
    box_wh = tf.exp(y_pred[..., 2:4]) * anchors  # w, h (scale anchors)
    box_conf = tf.sigmoid(y_pred[..., 4:5])      # confidence
    box_class = tf.sigmoid(y_pred[..., 5:])      # class scores

    # Normalize xy to full image
    box_xy = (box_xy + grid) / grid_size

    # Final tensor
    out = tf.concat([box_xy, box_wh, box_conf, box_class], axis=-1)

    return out

def yolo_loss_single_head(y_true, y_pred, object_scale=1.0, no_object_scale=1.0):
    """
    y_true: (batch, MAX_BOUNDING_BOXES, 5 + C)
    y_pred: (batch, S, S, B, 5 + C)
    anchors: Tensor shape (B, 2) → width, height
    """

    resolution = y_pred.shape[1]
    batch_size = y_pred.shape[0]

    total_loss = 0
    position_loss = 0
    obj_confidence_loss = 0
    noobj_confidence_loss = 0
    clasification_loss = 0

    bce = tf.keras.losses.BinaryCrossentropy(from_logits=True)
    cce = tf.keras.losses.CategoricalCrossentropy(from_logits=True)

    normalized_pred = normalize_yolo_output(y_pred)

    for b in range(batch_size):
        for true_BB in y_true[b]:
            confidence = true_BB[4]
            if confidence == 0:
                continue
            dimensions = true_BB[:4]
            clasification = true_BB[5:]

            x_cell,y_cell,normalized_true_BB = what_grid_cell_it_resides(dimensions,resolution)
            max_IOU = -1e9
            best_ind = 0
            for anchor_idx, anchor in enumerate(normalized_pred[b][x_cell][y_cell]):
                anchor_BB = anchor[:4]
                iou_val = iou_center(normalized_true_BB, anchor_BB)
                if iou_val > max_IOU:
                    best_ind = anchor_idx
                    max_IOU = iou_val

            best_BB = normalized_pred[b][x_cell][y_cell][best_ind][:4]
            best_confidence = normalized_pred[b][x_cell][y_cell][best_ind][4]
            best_clasification = normalized_pred[b][x_cell][y_cell][best_ind][5:]

            position_loss += 1 - ciou(normalized_true_BB,best_BB)
            obj_confidence_loss += bce(tf.ones((1,)), tf.reshape(best_confidence, (1,)))


            clasification_loss += cce(
                    tf.expand_dims(clasification, 0),
                    tf.expand_dims(best_clasification, 0)
                )

            for i in range(resolution):
                for j in range(resolution):
                    for h in range(NUM_ANCHORS):
                        if i == x_cell and j == y_cell and h == best_ind:
                            continue
                        noobj_anchor_BB = normalized_pred[b][i][j][h][:4]
                        if iou_center(dimensions,noobj_anchor_BB) > 0.5:
                            continue
                        noobj_anchor_confidence = normalized_pred[b][i][j][h][4]
                        #noobj_anchor_clasification = y_pred[b][i][j][h][5:]
                        noobj_confidence_loss += bce(tf.zeros((1,)), tf.reshape(noobj_anchor_confidence, (1,)))

    total_loss = position_loss + obj_confidence_loss + noobj_confidence_loss + clasification_loss

    return total_loss / tf.cast(batch_size, tf.float32)

def yolo_multihead_loss(y_trues, y_preds):
    """
    y_trues, y_preds: list of (batch, S, S, B*(5 + C)) tensors
    anchors_list: list of (B, 2) tensors
    """
    total_loss = 0.0
    for y_true, y_pred in zip(y_trues,y_preds):
        y_true = tf.reshape(y_true, (-1, y_true.shape[1], y_true.shape[2], NUM_ANCHORS, 5 + NUM_CLASSES))
        y_pred = tf.reshape(y_pred, (-1, y_pred.shape[1], y_pred.shape[2], NUM_ANCHORS, 5 + NUM_CLASSES))

        loss = yolo_loss_single_head(y_true, y_pred)
        total_loss += loss

    return total_loss

In [None]:
model.compile(
    optimizer=tf.keras.optimizers.Adam(learning_rate=1e-4),
    loss=yolo_multihead_loss
)