In [None]:
import os
import cv2
import numpy as np
import pandas as pd
from PIL import Image
from tqdm import tqdm
import matplotlib.patches as patches
import matplotlib.pyplot as plt
from collections import Counter
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
import torchvision.transforms as transforms
import torchvision.transforms.functional as FT

## Step 1 : Build Costum data class

In [None]:
class BloodCellDataset(torch.utils.data.Dataset):
    def __init__(self, img_dir, csv_file, split, num_classes, num_bboxes):
        self.annotations = pd.read_csv(csv_file)
        self.dict_ = {"rbc":0, "wbc":1}
        self.annotations['label'] = self.annotations.label.map(self.dict_)
        self.img_dir = img_dir
        self.S = split
        self.C = num_classes
        self.B = num_bboxes
        
    def __len__(self):
        return len(os.listdir(self.img_dir))
    
    def __getitem__(self, index):
        ## image_name as parameter
        bboxes = []
        image_name = str(index)+".png"
        image_name_csv = "image-"+str(index)+".png"
        
        image = cv2.imread(os.path.join(self.img_dir,image_name))
        boxes = self.annotations[self.annotations["image"] == image_name_csv].drop(columns=["image"]).to_numpy()
        for box in boxes:
            xmin, ymin, xmax, ymax, class_label = float(box[1]), float(box[2]), float(box[3]), float(box[4]), box[5]
            x, y = ((xmin + xmax)/2)/image.shape[0], ((ymin + ymax)/2)/image.shape[1]
            width, height = abs(xmax - xmin)/image.shape[0], abs(ymax - ymin)/image.shape[1]
            bboxes.append([class_label, x,y,width,height]) 
        img = Image.open(os.path.join(self.img_dir,image_name))
        bboxes = torch.Tensor([x for x in bboxes])
        
        new_transform = transforms.Compose([transforms.Resize((224, 224)), transforms.ToTensor()])
        img = new_transform(img)
        
         ## label_matrix
        label_matrix = torch.zeros((self.S, self.S, self.C + self.B * 5 ))
        
        for box in bboxes:
            class_label, x, y, width, height = box.tolist()
            i, j = int(self.S*y), int(self.S*x)
            x_cell, y_cell = self.S * x - j, self.S * y - i  
            width_cell, height_cell = (                                                                                                                
                width * self.S,
                height * self.S
            )
                
            if label_matrix[i, j, 2] == 0:
                label_matrix[i,j,2] = 1
                label_matrix[i,j,3:7] = torch.tensor([x_cell, y_cell, width_cell, height_cell])
                label_matrix[i,j,int(class_label)] = 1
        return img, label_matrix
    
    def __show_img_annotated__(self, index):
        img_file = str(index)+".png"
        img_file_csv =  "image-"+ str(index)+".png"
        image_path = os.path.join(self.img_dir,img_file)
        image_boxes = self.annotations[self.annotations["image"] == img_file_csv].drop(columns=["image"]).to_numpy()
        im = cv2.imread(image_path)
        for i in image_boxes:
            plt.plot([i[1],i[1],i[3],i[3],i[1]], [i[2],i[4],i[4],i[2],i[2]])
        plt.imshow(im)
        plt.show()
        new_transform = transforms.Compose([transforms.ToPILImage(),transforms.Resize((112, 112))])
        im = new_transform(im)

In [None]:
blood_cell_dataset =  BloodCellDataset("dataset/images", "dataset/annotation.csv", 7, 2, 1)

In [None]:
blood_cell_dataset.__show_img_annotated__(0)

## Step 3 : Utils Functions

In [None]:
def Intersection_over_union(box1, box2):
    ## find (x1,y1,x2,y2) from (centerx, centery, width, height)
    box1_x1 = box1[..., 0:1] - box1[..., 2:3] / 2
    box1_y1 = box1[..., 1:2] - box1[..., 3:4] / 2
    box1_x2 = box1[..., 0:1] + box1[..., 2:3] / 2
    box1_y2 = box1[..., 1:2] + box1[..., 3:4] / 2
    box2_x1 = box2[..., 0:1] - box2[..., 2:3] / 2
    box2_y1 = box2[..., 1:2] - box2[..., 3:4] / 2
    box2_x2 = box2[..., 0:1] + box2[..., 2:3] / 2
    box2_y2 = box2[..., 1:2] + box2[..., 3:4] / 2

    x1 = torch.max(box1_x1, box2_x1)
    y1 = torch.max(box1_y1, box2_y1)
    x2 = torch.min(box1_x2, box2_x2)
    y2 = torch.min(box1_y2, box2_y2)

    intersection = (x2 - x1).clamp(0) * (y2 - y1).clamp(0)

    box1_area = abs((box1_x2 - box1_x1) * (box1_y2 - box1_y1))
    box2_area = abs((box2_x2 - box2_x1) * (box2_y2 - box2_y1))

    return intersection / (box1_area + box2_area - intersection + 1e-6)

def non_max_supression(bboxes,iou_threshold,threshold):
    ## filter bbox with probability more than threshold
    bboxes = [box for box in bboxes if box[1] > threshold]
    ## sort bboxes according to their probabilities
    bboxes = sorted(bboxes, key=lambda x: x[1], reverse=True)
    bboxes_after_nms = []
    while bboxes:
        bbox = bboxes.pop(0)
        bboxes = [
                    box 
                    for box in bboxes
                    if box[0] != bbox[0] or Intersection_over_union(torch.tensor(bbox[2:]),torch.tensor(box[2:])) < iou_threshold
                ]
        bboxes_after_nms.append(bbox)
    return bboxes_after_nms

def fix_bboxes(predictions, S=7):
    predictions = predictions.to("cpu")
    predictions = predictions.reshape( predictions.shape[0], 7, 7, 7)
    boxes = predictions[..., 3:7]
    
    i_indices = torch.arange(7).repeat(predictions.shape[0], 7, 1).unsqueeze(-1)
    j_indices = i_indices.permute(0, 2, 1, 3)

    x = (boxes[..., :1] + i_indices) / S
    y = (boxes[..., 1:2] + j_indices) / S 
    w_y = boxes[..., 2:4] / S
    
    predicted_bboxes = torch.cat((x, y, w_y), dim=-1)
    predicted_class = predictions[..., :2].argmax(-1).unsqueeze(-1)
    confidence = predictions[..., 2].unsqueeze(-1)
    
    predictions = torch.cat((predicted_class, confidence, predicted_bboxes), dim=-1)
    predictions = predictions.reshape(predictions.shape[0], S * S, predictions.shape[-1])
    all_bboxes = []
    for idx in range(S * S):
        all_bboxes.append([x.item() for x in predictions[0, idx, :]])
    return all_bboxes


def get_bboxes(loader, model, iou_threshold, threshold):
    all_pred_boxes = []
    all_true_boxes = []
    model.eval()
    train_idx = 0
    for (x, labels) in loader:
        x = x.to(device)
        labels = labels.to(device)
        with torch.no_grad():
            predictions = model(x)
        true_bboxes = fix_bboxes(labels)
        bboxes = fix_bboxes(predictions)

        nms_boxes = non_max_supression(bboxes, iou_threshold=iou_threshold,threshold=threshold)
        for nms_box in nms_boxes:
            all_pred_boxes.append([train_idx] + nms_box)
        for box in true_bboxes:
            if box[1] > threshold:
                all_true_boxes.append([train_idx] + box)
        train_idx += 1
    model.train()
    return all_pred_boxes, all_true_boxes

def mean_average_precision(predicted_boxes, true_boxes, iou_threshold=0.5, num_classes=2):   
    average_precisions = [] 
    ## for each class
    for c in range(num_classes):   
        predicted_bboxes = [predicted_bbox for predicted_bbox in predicted_boxes if predicted_bbox[1] == c]         
        ground_truths = [true_box for true_box in true_boxes if true_box[1] == c]
        num_bbox_each_image = Counter([x[0] for x in ground_truths])
        
        for key, val in num_bbox_each_image.items():
            num_bbox_each_image[key] = torch.zeros(val)
            
        ## sort detections by confidence value 
        predicted_bboxes.sort(key=lambda x: x[2], reverse=True)
        True_positive = torch.zeros((len(predicted_bboxes)))
        False_positive = torch.zeros((len(predicted_bboxes)))
        if len(ground_truths) == 0:
            continue
        for idx_prediction, prediction in enumerate(predicted_bboxes):
            ground_truth_img = [bbox for bbox in ground_truths if bbox[0] == prediction[0]]
            best_iou = 0
            for idx, gt in enumerate(ground_truth_img):
                iou = Intersection_over_union(torch.tensor(prediction[3:]),torch.tensor(gt[3:]))
                if iou > best_iou:
                    best_iou = iou
                    best_gt_idx = idx   
            if best_iou =< iou_threshold:## the detected bbox does not fit any groud truth bbox --> False positive          
                    False_positive[idx_prediction] = 1
            else:
                if num_bbox_each_image[prediction[0]][best_gt_idx] == 0: ## if there is no other good prediction
                    True_positive[idx_prediction] = 1
                    num_bbox_each_image[prediction[0]][best_gt_idx] = 1
                else: 
                    False_positive[idx_prediction] = 1  
           
        True_positive_cumsum = torch.cumsum(True_positive, dim=0)
        False_positive_cumsum = torch.cumsum(False_positive, dim=0)
        
        recalls = True_positive_cumsum / (len(ground_truths) + 1e-6)
        precisions = torch.cat((torch.tensor([1]), torch.divide(True_positive_cumsum, (True_positive_cumsum + False_positive_cumsum + 1e-6))))
        recalls = torch.cat((torch.tensor([0]), recalls))
        average_precisions.append(torch.trapz(precisions, recalls))
    MAP = sum(average_precisions) / len(average_precisions)
    return MAP
    
class YoloLoss(nn.Module):
    def __init__(self, S=7, B=1, C=2):
        super(YoloLoss, self).__init__()
        self.mse = nn.MSELoss(reduction="sum")
        self.S = S
        self.B = B
        self.C = C
        self.lambda_noobj = 0.5
        self.lambda_coord = 5

    def forward(self, predictions, target):
        predictions = predictions.reshape(-1, self.S, self.S, self.C + self.B * 5)
        exists_box = target[..., 2].unsqueeze(3)
        
        # box coordinates loss
        box_predictions = exists_box * predictions[..., 3:7]
        box_targets = exists_box * target[..., 3:7]
        box_predictions[..., 2:4] = torch.sign(box_predictions[..., 2:4]) * torch.sqrt(torch.abs(box_predictions[..., 2:4] + 1e-6))
        box_targets[..., 2:4] = torch.sqrt(box_targets[..., 2:4])
        box_loss = self.mse(torch.flatten(box_predictions, end_dim=-2),torch.flatten(box_targets, end_dim=-2))
        
        # objects loss
        pred_box = predictions[..., 2:3]
        object_loss = self.mse(torch.flatten(exists_box * pred_box),torch.flatten(exists_box * target[..., 2:3]))
        
        # noobj loss
        no_object_loss = self.mse(torch.flatten((1 - exists_box) * predictions[..., 2:3], start_dim=1),torch.flatten((1 - exists_box) * target[..., 2:3], start_dim=1),)
        
        # class loss
        class_loss = self.mse(torch.flatten(exists_box * predictions[..., :2], end_dim=-2,),torch.flatten(exists_box * target[..., :2], end_dim=-2,),)
        
        # overall loss
        loss = (
            self.lambda_coord * box_loss 
            + object_loss  
            + self.lambda_noobj * no_object_loss 
            + class_loss 
        )
        return loss

## Build Network

In [None]:
Model_architecture = [
    (7, 64, 2, 3),
    "M",
    (3, 192, 1, 1),
    "M",
    (1, 128, 1, 0),
    (3, 256, 1, 1),
    (1, 256, 1, 0),
    (3, 512, 1, 1),
    "M",
    [(1, 256, 1, 0), (3, 512, 1, 1), 4],
    (1, 512, 1, 0),
    (3, 512, 1, 1),
    "M",
    (3, 1024, 1, 1),
    (3, 1024, 1, 1)
]

In [None]:
class Block(nn.Module):
    def __init__(self, in_channels, out_channels, kernel_size, stride, padding):
        super(Block, self).__init__()
        self.conv = nn.Conv2d(in_channels, out_channels, kernel_size=kernel_size, stride=stride, padding=padding)
        self.batchnorm = nn.BatchNorm2d(out_channels)
        self.Leakyrelu = nn.LeakyReLU(0.001)
    def forward(self, x):
        return self.Leakyrelu(self.batchnorm(self.conv(x)))

In [None]:
class Net(nn.Module):
    def __init__(self, network_architecture, in_channels):
        super(Net, self).__init__()
        self.network_architecture = network_architecture
        self.in_channels = in_channels
        self.convLayers = self.convLayers_()
        
        self.fc1 = nn.Linear(7*7*1024, 1024)
        self.leakyrelu = nn.LeakyReLU(0.001)
        self.fc2 = nn.Linear(1024, 7*7*7)
        
    def convLayers_(self):
        layers = []
        for i in self.network_architecture:
            if type(i) == str:
                layers += [nn.MaxPool2d(2, stride=2)]
                
            elif type(i) == tuple:
                layers += [Block(self.in_channels, out_channels=i[1], kernel_size=i[0], stride=i[2], padding=i[3])]
                self.in_channels = i[1]
                
            elif type(i) == list:
                for _ in range(i[-1]):
                    layers += [Block(self.in_channels, out_channels=i[0][1], kernel_size=i[0][0], stride=i[0][2], padding=i[0][3])]
                    layers += [Block(in_channels=i[0][1], out_channels=i[1][1], kernel_size=i[1][0], stride=i[1][2], padding=i[1][3])]
                    self.in_channels = i[1][1]
        return nn.Sequential(*layers)

    
        
    def forward(self, x):
        x = self.convLayers(x)
        x = x.view(-1, 7*7*1024)
        x = self.fc1(x)
        x = self.leakyrelu(x)
        x = self.fc2(x) 
        return x

In [None]:
device = "cuda" if torch.cuda.is_available() else "cpu"

In [None]:
net = Net(network_architecture=Model_architecture, in_channels=3).to(device)

## Training

In [None]:
IMG_SIZE = 224
LEARNING_RATE = 2e-5
BATCH_SIZE = 1 # 64 in original paper
EPOCHS = 100

In [None]:
blood_cell_dataset =  BloodCellDataset("dataset/images", "dataset/annotation.csv", 7, 2, 1)
train_loader = DataLoader(
    dataset=blood_cell_dataset,
    batch_size=BATCH_SIZE,
    shuffle=True
)
optimizer = optim.Adam(net.parameters(), lr=LEARNING_RATE)
loss_fn = YoloLoss()

In [None]:
for epoch in range(EPOCHS):  
    print("[INFO] EPOCH ", epoch)
    loop = tqdm(train_loader, leave=True)
    mean_loss = []
    for batch_idx, (x, y) in enumerate(loop):
        x, y = x.to(device), y.to(device)
        out = net(x)
        loss = loss_fn(out, y)
        mean_loss.append(loss.item())
        net.zero_grad()
        loss.backward()
        optimizer.step()
        
    pred_boxes, target_boxes = get_bboxes(train_loader, net, iou_threshold=0.5, threshold=0.4)
    print(np.shape(pred_boxes[0]))
    print(pred_boxes[0])
    mean_avg_prec = mean_average_precision(pred_boxes, target_boxes, iou_threshold=0.5)
    print(f"Train mAP: {mean_avg_prec}")
    print(f"Mean loss was {sum(mean_loss)/len(mean_loss)}")


## Validate the Model predictions

In [None]:
def plot_image(image, boxes):
    im = np.array(image)
    height, width, _ = im.shape
    fig, ax = plt.subplots(figsize=(10, 10))
    ax.imshow(im)
    # Create a Rectangle patch
    for box in boxes:
        label = "rbc" if box[0] == 0 else "wbc"
        box = box[2:]
        upper_left_x = box[0] - box[2] / 2
        upper_left_y = box[1] - box[3] / 2
        rect = patches.Rectangle(
            (upper_left_x * width, upper_left_y * height),
            box[2] * width,
            box[3] * height,
            linewidth=1,
            edgecolor="r",
            facecolor="none",
        )
        center_x = upper_left_x * width + (box[2] * width)/2
        center_y = upper_left_y * height + (box[3] * height)/2
        plt.text(center_x, center_y, label, fontsize=9)

        ax.add_patch(rect)
    plt.show()

In [None]:
for x, y in train_loader:
    x = x.to(device)
    bboxes = fix_bboxes(net(x))
    bboxes = non_max_supression(bboxes, iou_threshold=0.5, threshold=0.4)
    plot_image(x[0].permute(1,2,0).to("cpu"), bboxes)
    break