In [2]:
import torch
print(torch.cuda.is_available())

True


In [3]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [4]:
import os
from PIL import Image
import torch
import numpy as np
from torch.utils.data import Dataset,DataLoader
import torchvision.transforms as transforms
from torch.utils.data import random_split

In [5]:
class FaceDataset(Dataset):
    def __init__(self, img_dir, label_dir, img_size=448, grid_size=14, num_bboxes=1, max_images=None, transform=None):
        self.img_dir = img_dir
        self.label_dir = label_dir
        self.img_size = img_size
        self.grid_size = grid_size
        self.num_bboxes = num_bboxes
        self.transform = transform
        self.img_files = []
        count = 0
        for img_file in os.listdir(img_dir):
            if img_file.endswith('.jpg'):
                self.img_files.append(img_file)
                count += 1
                if max_images is not None and count >= max_images:
                    break

    def __len__(self):
        return len(self.img_files)

    def __getitem__(self, idx):
        img_path = os.path.join(self.img_dir, self.img_files[idx])
        img = Image.open(img_path).convert("RGB")
        img = img.resize((self.img_size, self.img_size))

        if self.transform:
            img = self.transform(img)

        label_path = os.path.join(self.label_dir, self.img_files[idx].replace('.jpg', '.txt'))
        boxes = []

        if os.path.exists(label_path):
            with open(label_path, 'r') as file:
                for line in file:
                    _, x_center, y_center, width, height = map(float, line.strip().split())
                    boxes.append([x_center, y_center, width, height, 1.0])

        target_grid = self.map_targets_to_grid(boxes, self.grid_size, self.num_bboxes)

        return img, target_grid

    def map_targets_to_grid(self, targets, grid_size, num_bboxes):
        target_grid = torch.zeros((grid_size, grid_size, num_bboxes * 5))

        for target in targets:
            x_center, y_center, width, height, conf = target
            #print('-------------------------------------')
            #print(x_center, y_center, width, height, conf)
            grid_x = int(x_center * grid_size)
            grid_y = int(y_center * grid_size)

            x_center = (x_center * grid_size) - grid_x
            y_center = (y_center * grid_size) - grid_y
            #print(x_center, y_center, width, height, conf)
            bbox_index = 0
            target_grid[grid_y, grid_x, bbox_index * 5:bbox_index * 5 + 4] = torch.tensor([x_center, y_center, width, height])
            target_grid[grid_y, grid_x, bbox_index * 5 + 4] = conf

        return target_grid


In [6]:
img_dir_train='/content/drive/My Drive/YoloData/images/train'
label_dir_train='/content/drive/My Drive/YoloData/labels/train'
img_dir_test='/content/drive/My Drive/YoloData/images/val'
label_dir_test='/content/drive/My Drive/YoloData/labels/val'

data_transforms = transforms.Compose([
    transforms.ToTensor()
])

train_val_dataset = FaceDataset(
    img_dir=img_dir_train,
    label_dir=label_dir_train,
    transform=data_transforms,
    max_images=5000
)
train_size = int(0.8 * len(train_val_dataset))
val_size = len(train_val_dataset) - train_size
train_dataset, val_dataset = random_split(train_val_dataset, [train_size, val_size])

test_dataset = FaceDataset(
    img_dir=img_dir_test,
    label_dir=label_dir_test,
    transform=data_transforms,
    max_images=1000
)

In [7]:
def collate_fn(batch):
    images, targets = zip(*batch)

    images = torch.stack(images, dim=0)

    return images, targets

In [8]:
train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True, num_workers=2,collate_fn=collate_fn)
val_loader = DataLoader(val_dataset,batch_size=16,shuffle=True,num_workers=2,collate_fn=collate_fn)
test_loader = DataLoader(test_dataset, batch_size=16, shuffle=False, num_workers=2,collate_fn=collate_fn)

In [9]:
#train_dataset[0]

In [10]:
import torch
import torch.nn as nn
import numpy as np

In [11]:
class YOLO(nn.Module):
  def __init__(self,grid_size=14,num_bboxes=1):
    super(YOLO,self).__init__()
    self.grid_size=grid_size
    self.num_bboxes=num_bboxes
    self.num_classes=1

    self.conv_layers = nn.Sequential(
            nn.Conv2d(3, 64, kernel_size=7, stride=2, padding=3),
            nn.BatchNorm2d(64),
            nn.LeakyReLU(0.1),
            nn.MaxPool2d(kernel_size=2, stride=2),

            nn.Conv2d(64, 192, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(192),
            nn.LeakyReLU(0.1),
            nn.MaxPool2d(kernel_size=2, stride=2),

            nn.Conv2d(192, 128, kernel_size=1, stride=1),
            nn.BatchNorm2d(128),
            nn.LeakyReLU(0.1),

            nn.Conv2d(128, 256, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(256),
            nn.LeakyReLU(0.1),

            nn.Conv2d(256, 256, kernel_size=1, stride=1),
            nn.BatchNorm2d(256),
            nn.LeakyReLU(0.1),
            nn.MaxPool2d(kernel_size=2, stride=2),

            nn.Conv2d(256, 512, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(512),
            nn.LeakyReLU(0.1),
            nn.MaxPool2d(kernel_size=2, stride=2) #14x14x512
        )

    self.fc_layers = nn.Sequential(
            nn.Flatten(),
            nn.Linear(512 * self.grid_size * self.grid_size, 4096),
            nn.LeakyReLU(0.1),
            nn.Linear(4096, self.grid_size * self.grid_size * (self.num_bboxes * 5))
        )

  def forward(self, x):

        x = self.conv_layers(x)
        x = self.fc_layers(x)

        x = x.view(-1, self.grid_size, self.grid_size, self.num_bboxes * 5)

        bboxes = torch.sigmoid(x[..., :self.num_bboxes * 5])

        return bboxes
  def predict(self, x, iou_threshold=0.5, confidence_threshold=0.5):
        bboxes = self.forward(x)

        filtered_boxes = []
        filtered_scores = []


        for i in range(self.grid_size):
            for j in range(self.grid_size):
                for b in range(self.num_bboxes):
                    offset = b * 5
                    confidence = bboxes[:, i, j, offset + 4]
                    if confidence > confidence_threshold:
                        box = bboxes[:, i, j, offset:offset + 4]
                        score = confidence
                        filtered_boxes.append(box)
                        filtered_scores.append(score)


        if len(filtered_boxes) == 0:
            return [], []

        filtered_boxes = torch.stack(filtered_boxes)
        filtered_scores = torch.stack(filtered_scores)


        keep_indices = nms(filtered_boxes, filtered_scores, iou_threshold)
        final_boxes = filtered_boxes[keep_indices]
        final_scores = filtered_scores[keep_indices]


        return final_boxes, final_scores


In [12]:
def xywh_to_xyxy(box):
    x_center, y_center, width, height = box[..., 0], box[..., 1], box[..., 2], box[..., 3]
    x_min = x_center - width / 2
    y_min = y_center - height / 2
    x_max = x_center + width / 2
    y_max = y_center + height / 2
    return torch.stack([x_min, y_min, x_max, y_max], dim=-1)

In [13]:
def iou(boxes1, boxes2):
    boxes1 = xywh_to_xyxy(boxes1)
    boxes2 = xywh_to_xyxy(boxes2)

    inter_x_min = torch.max(boxes1[..., 0], boxes2[..., 0])
    inter_y_min = torch.max(boxes1[..., 1], boxes2[..., 1])
    inter_x_max = torch.min(boxes1[..., 2], boxes2[..., 2])
    inter_y_max = torch.min(boxes1[..., 3], boxes2[..., 3])

    inter_area = torch.clamp(inter_x_max - inter_x_min, min=0) * torch.clamp(inter_y_max - inter_y_min, min=0)

    area1 = (boxes1[..., 2] - boxes1[..., 0]) * (boxes1[..., 3] - boxes1[..., 1])
    area2 = (boxes2[..., 2] - boxes2[..., 0]) * (boxes2[..., 3] - boxes2[..., 1])

    union_area = area1 + area2 - inter_area
    iou = inter_area / torch.clamp(union_area, min=1e-6)

    return iou

In [14]:
def nms(bboxes, scores, iou_threshold=0.5):
    keep = []
    indices = scores.sort(descending=True)[1]

    while indices.numel() > 0:
        current = indices[0]
        keep.append(current.item())
        if indices.numel() == 1:
            break

        ious = iou(bboxes[current], bboxes[indices[1:]])
        indices = indices[1:][ious < iou_threshold]

    return keep

In [15]:
import torch

def yolo_loss(pred_bboxes, target, lambda_coord=5,lambda_size=5,lambda_obj=1, lambda_noobj=0.5):
    obj_mask = target[..., 4] > 0
    noobj_mask = target[..., 4] == 0

    coord_center_loss = lambda_coord * torch.sum((pred_bboxes[obj_mask][..., :2] - target[obj_mask][..., :2]) ** 2)
    coord_size_loss = lambda_size * torch.sum(pred_bboxes[obj_mask][..., 2:4] - target[obj_mask][..., 2:4] ** 2)

    iou_scores = iou(pred_bboxes[obj_mask][..., :4], target[obj_mask][..., :4])

    obj_loss = lambda_obj*torch.sum((iou_scores-pred_bboxes[obj_mask][..., 4]) ** 2)
    noobj_loss = lambda_noobj * torch.sum(target[noobj_mask][..., 4]** 2)
    #noobj_loss_1 = lambda_noobj * torch.sum(torch.clamp(pred_bboxes[noobj_mask_1][..., 4], min=1e-6) ** 2)

    loss = coord_center_loss + coord_size_loss + obj_loss + noobj_loss
    return loss


In [16]:
model = YOLO()
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

YOLO(
  (conv_layers): Sequential(
    (0): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3))
    (1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): LeakyReLU(negative_slope=0.1)
    (3): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (4): Conv2d(64, 192, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (5): BatchNorm2d(192, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (6): LeakyReLU(negative_slope=0.1)
    (7): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (8): Conv2d(192, 128, kernel_size=(1, 1), stride=(1, 1))
    (9): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (10): LeakyReLU(negative_slope=0.1)
    (11): Conv2d(128, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (12): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (13): LeakyReLU(negative_s

In [20]:
from tqdm import tqdm
import torch.optim as optim

def train_model(model, train_loader, val_loader, num_epochs=10, learning_rate=0.0001,checkpoint_dir='/content/drive/My Drive/checkpoints',patience=5):
    training_losses = []
    validation_losses = []
    best_val_loss = np.inf
    epochs_no_improve = 0

    optimizer = optim.Adam(model.parameters(), lr=learning_rate,weight_decay=0.0005)

    scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=10, gamma=0.1)

    model.train()

    for epoch in range(num_epochs):
        running_loss = 0.0
        progress_bar = tqdm(train_loader, desc=f"Epoch [{epoch + 1}/{num_epochs}]")

        for images, targets in progress_bar:
            images = images.to(device)

            targets = [target.to(device) for target in targets]

            optimizer.zero_grad()

            bboxes = model(images) #(8,14,14,5)

            losses = [yolo_loss(bbox, target) for bbox, target in zip(bboxes, targets)]
            loss = torch.mean(torch.stack(losses))

            loss.backward()
            optimizer.step()

            running_loss += loss.item()
            progress_bar.set_postfix({'Loss': loss.item()})

        avg_loss = running_loss / len(train_loader)
        training_losses.append(avg_loss)
        print(f"Epoch [{epoch + 1}/{num_epochs}], Loss: {avg_loss}")

        scheduler.step()

        model.eval()
        val_running_loss = 0.0
        with torch.no_grad():
            for val_images, val_targets in val_loader:
                val_images = val_images.to(device)
                val_targets = [val_target.to(device) for val_target in val_targets]

                val_bboxes = model(val_images)
                val_losses = [yolo_loss(val_bbox, val_target) for val_bbox, val_target in zip(val_bboxes, val_targets)]
                val_loss = torch.mean(torch.stack(val_losses))

                val_running_loss += val_loss.item()

        avg_val_loss = val_running_loss / len(val_loader)
        validation_losses.append(avg_val_loss)
        print(f"Epoch [{epoch + 1}/{num_epochs}], Validation Loss: {avg_val_loss}")
        if((epoch+1)%2==0):
          save_checkpoint(model, optimizer, epoch + 1, avg_loss, checkpoint_dir)
        model.train()

    print("Training complete!")
    return training_losses,validation_losses


In [21]:
def save_checkpoint(model, optimizer, epoch, loss, checkpoint_dir="checkpoints"):
    if not os.path.exists(checkpoint_dir):
        os.makedirs(checkpoint_dir)

    checkpoint_path = os.path.join(checkpoint_dir, f"V2_model_epoch_{epoch}_loss_{loss:.4f}.pth")
    torch.save({
        'epoch': epoch,
        'model_state_dict': model.state_dict(),
        'optimizer_state_dict': optimizer.state_dict(),
        'loss': loss,
    }, checkpoint_path)
    print(f"Checkpoint saved at {checkpoint_path}")

In [22]:
def load_checkpoint(checkpoint_path, model, optimizer):
    checkpoint = torch.load(checkpoint_path)
    model.load_state_dict(checkpoint['model_state_dict'])
    optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
    epoch = checkpoint['epoch']
    loss = checkpoint['loss']
    print(f"Loaded checkpoint '{checkpoint_path}' (epoch {epoch}, loss {loss})")
    return epoch, loss

In [23]:
import matplotlib.pyplot as plt
from PIL import Image, ImageDraw

def predict_and_draw(model, test_loader, device):
    model.eval()

    data_iter = iter(test_loader)
    next(data_iter)
    next(data_iter)
    next(data_iter)
    next(data_iter)
    #next(data_iter)
    images, targets = next(data_iter)
    image = images[0].unsqueeze(0).to(device)
    print("Image size:", image.shape)

    with torch.no_grad():
        predictions = model(image)

    predictions = predictions[0].cpu().numpy()

    img = images[0].permute(1, 2, 0).cpu().numpy()
    plt.imshow(img)
    ax = plt.gca()

    grid_size = 14
    num_bboxes = 1

    for i in range(grid_size):
        for j in range(grid_size):
            confidence = predictions[i, j, 4]
            if confidence > 0:
                x_center, y_center, width, height = predictions[i, j, :4]
                print(f"Box: {i},{j}")
                print(x_center, y_center, width, height, confidence)


                x_center = (x_center + j) / grid_size
                y_center = (y_center + i) / grid_size


                width /= grid_size
                height /= grid_size

                xmin = x_center - width / 2
                ymin = y_center - height / 2
                xmax = x_center + width / 2
                ymax = y_center + height / 2


                xmin *= img.shape[1]
                xmax *= img.shape[1]
                ymin *= img.shape[0]
                ymax *= img.shape[0]

                rect = plt.Rectangle((xmin, ymin), xmax-xmin, ymax-ymin, fill=False, color='red')
                ax.add_patch(rect)

    plt.axis('off')
    plt.show()


In [None]:
training_losses,validation_losses = train_model(model,train_loader,val_loader,num_epochs=35)

Epoch [1/35]: 100%|██████████| 250/250 [33:50<00:00,  8.12s/it, Loss=7.13]

Epoch [1/35], Loss: 8.851869491577148





Epoch [1/35], Validation Loss: 9.170057803865463


Epoch [2/35]: 100%|██████████| 250/250 [02:24<00:00,  1.73it/s, Loss=4.4]

Epoch [2/35], Loss: 8.386895565032958





Epoch [2/35], Validation Loss: 9.192597192431252
Checkpoint saved at /content/drive/My Drive/checkpoints/V2_model_epoch_2_loss_8.3869.pth


Epoch [3/35]: 100%|██████████| 250/250 [02:43<00:00,  1.53it/s, Loss=9.07]

Epoch [3/35], Loss: 8.323365762710571





Epoch [3/35], Validation Loss: 9.24989900891743


Epoch [4/35]: 100%|██████████| 250/250 [02:21<00:00,  1.77it/s, Loss=7.64]

Epoch [4/35], Loss: 8.493031445503235





Epoch [4/35], Validation Loss: 9.53250972051469
Checkpoint saved at /content/drive/My Drive/checkpoints/V2_model_epoch_4_loss_8.4930.pth


Epoch [5/35]: 100%|██████████| 250/250 [02:41<00:00,  1.55it/s, Loss=6.37]

Epoch [5/35], Loss: 8.494858006477356





Epoch [5/35], Validation Loss: 9.578392452663845


Epoch [6/35]: 100%|██████████| 250/250 [02:22<00:00,  1.76it/s, Loss=12]

Epoch [6/35], Loss: 8.513794486045837





Epoch [6/35], Validation Loss: 9.525426183428083
Checkpoint saved at /content/drive/My Drive/checkpoints/V2_model_epoch_6_loss_8.5138.pth


Epoch [7/35]: 100%|██████████| 250/250 [02:47<00:00,  1.49it/s, Loss=6.64]

Epoch [7/35], Loss: 8.438693099975586





Epoch [7/35], Validation Loss: 9.323019958677746


Epoch [8/35]: 100%|██████████| 250/250 [02:20<00:00,  1.78it/s, Loss=8.8]

Epoch [8/35], Loss: 8.415054872512817





Epoch [8/35], Validation Loss: 9.418920214214022
Checkpoint saved at /content/drive/My Drive/checkpoints/V2_model_epoch_8_loss_8.4151.pth


Epoch [9/35]: 100%|██████████| 250/250 [02:48<00:00,  1.49it/s, Loss=12]

Epoch [9/35], Loss: 8.404593872070313





Epoch [9/35], Validation Loss: 9.49513268092322


Epoch [10/35]: 100%|██████████| 250/250 [02:21<00:00,  1.77it/s, Loss=6.23]

Epoch [10/35], Loss: 8.38321106338501





Epoch [10/35], Validation Loss: 9.466433464534699
