# Training Notebook #
This notebook will be used to test training of neural networks using pytorch

In [17]:
import torch, os, random
import numpy as np
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torchvision import transforms
from torchvision.ops import box_iou
from PIL import Image, ImageDraw
from IPython.display import display
from tqdm import tqdm

seed = 2023
torch.manual_seed(2023)

learning_rate = 2e-5
device = "cuda" if torch.cuda.is_available() else "cpu"
print(device)
batch_size = 16
weight_decay = 0
epochs = 100
num_workers = 4
pin_memory = True
#load_model = False

dataset_path = "./data/RisikoDataset"


<torch._C.Generator at 0x7f85c3ffdad0>

Device check

## Dataset ##
Download dataset and stuff

In [None]:
class RisikoDataset(torch.utils.data.Dataset):
    def __init__(self, dataset_dir:str, mode:str, transform=None):
        if mode != "train" and mode != "val" and mode != "test":
            raise Exception("Mode value of dataset not valid")

        self.imgs_dir = dataset_dir + "/" + mode + "/images"
        self.annots_dir = dataset_dir + "/" + mode + "/labels"

        self.annotations = sorted( filter( lambda x: os.path.isfile(os.path.join(self.annots_dir, x)), os.listdir(self.annots_dir) ) )
        self.images = sorted( filter( lambda x: os.path.isfile(os.path.join(self.imgs_dir, x)), os.listdir(self.imgs_dir) ) )
        self.transform = transform

        if len(self.annotations) != len(self.images):
            raise Exception("Number of annotations is different from the number of images")

        for i in range(len(self.annotations)):
            if os.path.splitext(os.path.basename(self.annotations[i]))[0] != os.path.splitext(os.path.basename(self.images[i]))[0]:
                raise Exception("Mismatch between images and annotations at id " + str(i) + ".   imgName = " + os.path.splitext(os.path.basename(self.images[i]))[0] + "   labelName = " + os.path.splitext(os.path.basename(self.annotations[i]))[0])
    
    def __len__(self) -> int:
        return len(self.images)
    
    def __getitem__(self, idx:int) -> tuple[torch.Tensor, dict]:
        annotations_file_data = np.genfromtxt(fname= self.annots_dir + "/" + self.annotations[idx], delimiter=' ', dtype=np.float32)
        classes, bboxes = np.hsplit(annotations_file_data, np.array([1]))

        annotations = {} 
        annotations["boxes"] = torch.from_numpy(bboxes)
        annotations["labels"] = torch.from_numpy(classes).type(torch.int32)
        
        img = Image.open(self.imgs_dir + "/" + self.images[idx]).convert("RGB")

        if self.transform:
            img = self.transform(img)

        return img, annotations


In [None]:
img_to_tensor = transforms.Compose([
    #transforms.Resize(size=(500,500)),
    transforms.PILToTensor()
])

train_set = RisikoDataset(dataset_dir=dataset_path, mode="train", transform=img_to_tensor)
val_set = RisikoDataset(dataset_dir=dataset_path, mode="val", transform=img_to_tensor)
test_set = RisikoDataset(dataset_dir=dataset_path, mode="test", transform=img_to_tensor)

train_loader = torch.utils.data.DataLoader(train_set, batch_size=batch_size, shuffle=True, num_workers=num_workers, pin_memory=pin_memory, drop_last=True)
val_loader = torch.utils.data.DataLoader(val_set, batch_size=batch_size, shuffle=False, num_workers=num_workers, pin_memory=pin_memory, drop_last=True)
test_loader = torch.utils.data.DataLoader(test_set, batch_size=batch_size, shuffle=False, num_workers=num_workers, pin_memory=pin_memory, drop_last=True)

## Check dataset ##
Print random image with bouding box to be sure that everything is working correctly

In [None]:
def draw_bboxes_on_image(dataset: RisikoDataset, index:int):
    img, labels = dataset.__getitem__(index)
    tensor_to_img = transforms.Compose([transforms.ToPILImage()])
    img = tensor_to_img(img)
    bboxes: torch.Tensor = labels["boxes"]

    img_draw = ImageDraw.Draw(img)
    bboxes = bboxes * torch.tensor([1280,720,1280,720])

    for i in range(bboxes.shape[0]):
        bbox = bboxes[i]

        x0 = bbox[0] - bbox[2] / 2
        x1 = bbox[0] + bbox[2] / 2
        y0 = bbox[1] - bbox[3] / 2
        y1 = bbox[1] + bbox[3] / 2

        img_draw.rectangle([x0, y0, x1, y1], outline="red")
        
    display(img)


draw_bboxes_on_image(train_set, random.randint(0, len(train_set)-1))


## Neural Network ##
Definition of the Neural Network

In [None]:
class Net(nn.Module):
    def __init__(self):
        super().__init__()
        self.conv1 = nn.Conv2d(3, 64, 3)
        self.conv2 = nn.Conv2d(64, 128, 3)
        self.conv3 = nn.Conv2d(128, 256, 3)
        self.conv4 = nn.Conv2d(256, 256, 6, stride=4, padding=2)
        self.conv5 = nn.Conv2d(256, 128, 3, padding=1)
        self.conv6 = nn.Conv2d(128, 12+1+4, 3, padding=1) # 12 for classes, 1 for obj presence prob. and 4 for bbox
        #self.scale = torch.tensor([128,72,128,72], dtype=torch.float32).expand(128*72,4)
        self.scale = torch.tensor([128,72], dtype=torch.float32).repeat(2)
        self.center_offset = torch.stack([torch.arange(0, 1, 1/128, dtype=torch.float32).repeat(72), torch.arange(0, 1, 1/72, dtype=torch.float32).repeat(128, 1).t().flatten()]).t()

    def forward(self, x):
        x = F.leaky_relu(self.conv1(x))
        x = F.leaky_relu(self.conv2(x))
        x = F.leaky_relu(self.conv3(x))
        x = F.leaky_relu(self.conv4(x))
        x = F.leaky_relu(self.conv5(x))
        x = F.sigmoid(self.conv6(x))
        torch.mul(x[..., 13:17], self.scale, out=x[..., 13:17])
        torch.add(x[..., 13:15], self.center_offset, out=x[..., 13:15])

        return x


net = Net().to(device)

## Loss Function ##


In [16]:
class CustomLoss(nn.Module):
    def __init__(self, lambda_coord:float = 1.0, lambda_no_obj:float = 0.5):
        super(CustomLoss, self).__init__()
        self.lambda_coord = lambda_coord
        self.lambda_no_obj = lambda_no_obj
        self.mse = nn.MSELoss(reduce="sum")

    def forward(self, predictions:torch.Tensor, target:torch.Tensor):

        #Convert bbox coordinates to feed torchvision.ops.box_iou
        bbox_wh_half = torch.mul(target[..., 15:17], 2)
        true_x0_y0, true_x1_y1 = torch.sub(target[..., 13:15], bbox_wh_half), torch.add(target[..., 13:15], bbox_wh_half)
        bbox_wh_half = torch.mul(predictions[..., 15:17], 2)
        pred_x0_y0, pred_x1_y1 = torch.sub(predictions[..., 13:15], bbox_wh_half), torch.add(predictions[..., 13:15], bbox_wh_half)

        iou = box_iou(torch.cat([pred_x0_y0, pred_x1_y1],1), torch.cat([true_x0_y0, true_x1_y1],1))
        iou_maxes, best_box = torch.max(iou, dim=0)
        exist_box = target[..., 12].unsqueeze(3)

        # ==================== #
        #       BOX LOSS       #
        # ==================== #
        box_predictions = exist_box * best_box * predictions[..., 13:17]
        box_targets = exist_box * best_box * target[..., 13:17]

        box_predictions[..., 2:4] = torch.sign(box_predictions[..., 2:4]) * torch.sqrt(torch.abs(box_predictions[..., 2:4]))
        box_targets[..., 2:4] = torch.sqrt(box_targets[..., 2:4])

        box_loss = self.mse(torch.flatten(box_predictions, end_dim=-2), torch.flatten(box_targets, end_dim=-2))
        
        # ==================== #
        #       OBJ LOSS       #
        # ==================== #
        obj_loss = self.mse(torch.flatten(best_box * predictions[..., 12:13]), torch.flatten(exist_box * target[..., 12:13]))

        # ==================== #
        #     NO OBJ LOSS      #
        # ==================== #
        no_obj_loss = self.mse(torch.flatten((1 - exist_box) * predictions[..., 12:13], start_dim=1), torch.flatten(1 - exist_box, start_dim=1)) # here is simplified tecnically

        # ==================== #
        #      CLASS LOSS      #
        # ==================== #
        class_loss = self.mse(torch.flatten(exist_box * predictions[..., :12], end_dim=-2), torch.flatten(exist_box * target[..., :12], end_dim=-2))

        loss = self.lambda_coord * box_loss + obj_loss + self.lambda_no_obj * no_obj_loss + class_loss

        return loss

## Training Function ##

In [None]:
def train_function(train_loader, model, optimizer, loss_function):
    loop = tqdm(train_loader, leave=True)
    mean_loss = []

    for batch_idx, (x,y) in enumerate(loop):
        x, y = x.to(device), y.to(device)
        out = model(x)
        loss = loss_function(out, y)
        mean_loss.append(loss.item())
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        #update progress-bar
        loop.set_postfix(loss=loss.item())

    print(f"Mean loss was {sum(mean_loss)/len(mean_loss)}")

## Train ##

In [None]:
model = Net().to(device)
optimizer = optim.adam(model.parameters(), lr=learning_rate, weight_decay=weight_decay)
loss_function = CustomLoss()

for epoch in range(epochs):
    

    train_function(train_loader, model, optimizer, loss_function)