In [1]:
import os
import torch
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms as T
from PIL import Image
import torch.nn as nn
import numpy as np



# Check CUDA
print("Torch CUDA available:", torch.cuda.is_available())


Torch CUDA available: True


# ---------------------- Dataset ----------------------------------

In [2]:


class YoloDataset(Dataset):
    def __init__(self, data_dir, img_size=320, transform=None, mode='train'):
        self.data_dir = data_dir
        self.img_size = img_size
        self.transform = transform
        self.mode = mode
        self.images = []
        self.labels = []
        self._prepare_dataset()

    def _check_and_clean(self, img_dir, label_dir):
        for img_name in os.listdir(img_dir):
            if img_name.lower().endswith(('.jpg', '.jpeg', '.png')):
                label_name = os.path.splitext(img_name)[0] + '.txt'
                if not os.path.exists(os.path.join(label_dir, label_name)):
                    os.remove(os.path.join(img_dir, img_name))

    def _load_images_and_labels(self, img_dir, label_dir):
        for img_name in os.listdir(img_dir):
            if img_name.lower().endswith(('.jpg', '.jpeg', '.png')):
                img_path = os.path.join(img_dir, img_name)
                label_path = os.path.join(label_dir, os.path.splitext(img_name)[0] + '.txt')

                img = Image.open(img_path).convert('RGB')
                if self.transform:
                    img = self.transform(img)

                if os.path.exists(label_path):
                    with open(label_path, 'r') as f:
                        labels = [list(map(float, line.strip().split())) for line in f if line.strip()]
                else:
                    labels = []

                self.images.append(img)
                self.labels.append(torch.tensor(labels, dtype=torch.float32))

    def _prepare_dataset(self):
        image_dir = os.path.join(self.data_dir, 'images', self.mode)
        label_dir = os.path.join(self.data_dir, 'labels', self.mode)
        self._check_and_clean(image_dir, label_dir)
        self._load_images_and_labels(image_dir, label_dir)

    def __len__(self):
        return len(self.images)

    def __getitem__(self, idx):
        return self.images[idx], self.labels[idx]


In [37]:

# Transform and DataLoader setup
transform = T.Compose([
    T.Resize((320, 320)),
    T.ToTensor(),
    T.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

data_dir = "/mnt/d/object_detect_tracking/data/brain_tumor_copy/axial_t1wce_2_class"
train_dataset = YoloDataset(data_dir, img_size=320, transform=transform, mode='train')
test_dataset = YoloDataset(data_dir, img_size=320, transform=transform, mode='test')

train_loader = DataLoader(train_dataset, batch_size=8, shuffle=True, num_workers=4)
test_loader = DataLoader(test_dataset, batch_size=8, shuffle=False, num_workers=4)

print("Number of training samples:", len(train_dataset))
print("Number of testing samples:", len(test_dataset))


Number of training samples: 296
Number of testing samples: 75


# ---------------------- SIoU Loss ----------------------------------

In [38]:


class SIoU(nn.Module):
    def __init__(self, x1y1x2y2=True, eps=1e-7):
        super(SIoU, self).__init__()
        self.x1y1x2y2 = x1y1x2y2
        self.eps = eps

    def forward(self, box1, box2):
        if self.x1y1x2y2:
            b1_x1, b1_y1, b1_x2, b1_y2 = box1
            b2_x1, b2_y1, b2_x2, b2_y2 = box2
        else:
            b1_x1, b1_x2 = box1[0] - box1[2] / 2, box1[0] + box1[2] / 2
            b1_y1, b1_y2 = box1[1] - box1[3] / 2, box1[1] + box1[3] / 2
            b2_x1, b2_x2 = box2[0] - box2[2] / 2, box2[0] + box2[2] / 2
            b2_y1, b2_y2 = box2[1] - box2[3] / 2, box2[1] + box2[3] / 2

        inter = (torch.min(b1_x2, b2_x2) - torch.max(b1_x1, b2_x1)).clamp(0) * \
                (torch.min(b1_y2, b2_y2) - torch.max(b1_y1, b2_y1)).clamp(0)

        w1, h1 = b1_x2 - b1_x1, b1_y2 - b1_y1 + self.eps
        w2, h2 = b2_x2 - b2_x1, b2_y2 - b2_y1 + self.eps
        union = w1 * h1 + w2 * h2 - inter + self.eps

        iou = inter / union
        cw = torch.max(b1_x2, b2_x2) - torch.min(b1_x1, b2_x1)
        ch = torch.max(b1_y2, b2_y2) - torch.min(b1_y1, b2_y1)
        s_cw = (b2_x1 + b2_x2 - b1_x1 - b1_x2) * 0.5
        s_ch = (b2_y1 + b2_y2 - b1_y1 - b1_y2) * 0.5
        sigma = torch.sqrt(s_cw ** 2 + s_ch ** 2) + self.eps

        sin_alpha_1 = torch.abs(s_cw) / sigma
        sin_alpha_2 = torch.abs(s_ch) / sigma
        threshold = np.sqrt(2) / 2
        sin_alpha = torch.where(sin_alpha_1 > threshold, sin_alpha_2, sin_alpha_1)

        angle_cost = 1 - 2 * torch.pow(torch.sin(torch.arcsin(sin_alpha) - np.pi / 4), 2)
        rho_x = (s_cw / (cw + self.eps)) ** 2
        rho_y = (s_ch / (ch + self.eps)) ** 2
        gamma = 2 - angle_cost
        distance_cost = 2 - torch.exp(gamma * rho_x) - torch.exp(gamma * rho_y)

        omiga_w = torch.abs(w1 - w2) / torch.max(w1, w2)
        omiga_h = torch.abs(h1 - h2) / torch.max(h1, h2)
        shape_cost = torch.pow(1 - torch.exp(-omiga_w), 4) + torch.pow(1 - torch.exp(-omiga_h), 4)

        return 1 - (iou + 0.5 * (distance_cost + shape_cost))





# Example SIoU loss usage

In [39]:

siou = SIoU(x1y1x2y2=True)
box1 = torch.tensor([50, 50, 150, 150], dtype=torch.float32)
box2 = torch.tensor([60, 60, 140, 140], dtype=torch.float32)
loss = siou(box1, box2)
print("SIoU Loss:", loss.item())

SIoU Loss: 0.3589203357696533


# ---------------------- Training model ----------------------------------

In [40]:
import torch
from torch import nn, optim
from tqdm import tqdm

# Giả sử bạn đã có:
# - model: YOLO model đã khởi tạo
# - train_loader, test_loader: DataLoader đã sẵn sàng
# - criterion: YOLO Loss hoặc BCE/CIoU loss phù hợp
# - optimizer: Adam hoặc SGD


class SIoU(nn.Module):
    def __init__(self, x1y1x2y2=True, eps=1e-7):
        super(SIoU, self).__init__()
        self.x1y1x2y2 = x1y1x2y2
        self.eps = eps

    def forward(self, box1, box2):
        if self.x1y1x2y2:
            b1_x1, b1_y1, b1_x2, b1_y2 = box1
            b2_x1, b2_y1, b2_x2, b2_y2 = box2
        else:
            b1_x1, b1_x2 = box1[0] - box1[2] / 2, box1[0] + box1[2] / 2
            b1_y1, b1_y2 = box1[1] - box1[3] / 2, box1[1] + box1[3] / 2
            b2_x1, b2_x2 = box2[0] - box2[2] / 2, box2[0] + box2[2] / 2
            b2_y1, b2_y2 = box2[1] - box2[3] / 2, box2[1] + box2[3] / 2

        inter = (torch.min(b1_x2, b2_x2) - torch.max(b1_x1, b2_x1)).clamp(0) * \
                (torch.min(b1_y2, b2_y2) - torch.max(b1_y1, b2_y1)).clamp(0)

        w1, h1 = b1_x2 - b1_x1, b1_y2 - b1_y1 + self.eps
        w2, h2 = b2_x2 - b2_x1, b2_y2 - b2_y1 + self.eps
        union = w1 * h1 + w2 * h2 - inter + self.eps

        iou = inter / union
        cw = torch.max(b1_x2, b2_x2) - torch.min(b1_x1, b2_x1)
        ch = torch.max(b1_y2, b2_y2) - torch.min(b1_y1, b2_y1)
        s_cw = (b2_x1 + b2_x2 - b1_x1 - b1_x2) * 0.5
        s_ch = (b2_y1 + b2_y2 - b1_y1 - b1_y2) * 0.5
        sigma = torch.sqrt(s_cw ** 2 + s_ch ** 2) + self.eps

        sin_alpha_1 = torch.abs(s_cw) / sigma
        sin_alpha_2 = torch.abs(s_ch) / sigma
        threshold = np.sqrt(2) / 2
        sin_alpha = torch.where(sin_alpha_1 > threshold, sin_alpha_2, sin_alpha_1)

        angle_cost = 1 - 2 * torch.pow(torch.sin(torch.arcsin(sin_alpha) - np.pi / 4), 2)
        rho_x = (s_cw / (cw + self.eps)) ** 2
        rho_y = (s_ch / (ch + self.eps)) ** 2
        gamma = 2 - angle_cost
        distance_cost = 2 - torch.exp(gamma * rho_x) - torch.exp(gamma * rho_y)

        omiga_w = torch.abs(w1 - w2) / torch.max(w1, w2)
        omiga_h = torch.abs(h1 - h2) / torch.max(h1, h2)
        shape_cost = torch.pow(1 - torch.exp(-omiga_w), 4) + torch.pow(1 - torch.exp(-omiga_h), 4)

        return 1 - (iou + 0.5 * (distance_cost + shape_cost))



# using if pred_boxes and true_boxes are many boxes in one image
def caculate_loss_many_boxes(pred_boxes, true_boxes):
    losses = []
    for i in range(len(pred_boxes)):
        pred_box = pred_boxes[i]
        true_box = true_boxes[i]
        loss = siou(pred_box, true_box)
        losses.append(loss)
    return losses

# using if pred_boxes and true_boxes are one box in one image
def yolo_decode_output(output, num_classes=2, anchors=None, stride=None):
    """
    Decode YOLO output thành list bbox dự đoán [B, N, 6] (cx, cy, w, h, obj_conf, class_pred).
    
    Args:
        output: Tensor đầu ra YOLO [B, A*(5+C), H, W]
        num_classes: số class
        anchors: tensor anchors shape [A, 2] (w, h), normalized hoặc theo stride
        stride: stride tại scale hiện tại, nếu cần scale anchors
        
    Returns:
        pred_bboxes: Tensor [B, N, 6] (cx, cy, w, h, obj_conf, class_id)
    """
    B, C, H, W = output.shape
    num_anchors = C // (5 + num_classes)

    output = output.view(B, num_anchors, 5 + num_classes, H, W)
    output = output.permute(0, 1, 3, 4, 2).contiguous()  # [B, A, H, W, 5+C]

    # Sigmoid cho tx, ty, obj_conf, class
    tx = torch.sigmoid(output[..., 0])
    ty = torch.sigmoid(output[..., 1])
    tw = output[..., 2]
    th = output[..., 3]
    obj_conf = torch.sigmoid(output[..., 4])
    class_pred = torch.softmax(output[..., 5:], dim=-1)

    # Tạo grid
    grid_y, grid_x = torch.meshgrid(torch.arange(H), torch.arange(W), indexing="ij")
    grid_x = grid_x.to(output.device).float()
    grid_y = grid_y.to(output.device).float()

    # Nếu cần anchors
    if anchors is not None:
        anchors = anchors.to(output.device)
        if stride is not None:
            anchors = anchors / stride
        anchors = anchors.view(1, num_anchors, 1, 1, 2)  # [1, A, 1, 1, 2]

        # scale bbox
        bx = (tx + grid_x.unsqueeze(0).unsqueeze(0)) / W
        by = (ty + grid_y.unsqueeze(0).unsqueeze(0)) / H
        bw = (torch.exp(tw) * anchors[..., 0]) / W
        bh = (torch.exp(th) * anchors[..., 1]) / H
    else:
        # nếu anchors không được cung cấp, trả bbox relative grid
        bx = (tx + grid_x.unsqueeze(0).unsqueeze(0)) / W
        by = (ty + grid_y.unsqueeze(0).unsqueeze(0)) / H
        bw = torch.exp(tw) / W
        bh = torch.exp(th) / H

    # Lấy class id
    class_id = torch.argmax(class_pred, dim=-1).float()  # [B, A, H, W]

    # Stack lại [B, A, H, W, 6]
    pred = torch.stack([bx, by, bw, bh, obj_conf, class_id], dim=-1)

    # reshape về [B, N, 6]
    pred = pred.view(B, -1, 6)

    return pred





In [None]:
def loss_calculation(pred_boxes, target_boxes):
    
    losses = []
    for bbox in pred_boxes:
        loss = siou(bbox, target_boxes)
        losses.append(loss)
    losses = losses.mean()
    return losses


In [45]:

from model.yolo.yolo_net import YOLONet
import torchvision.ops as ops

model = YOLONet(num_classes=2, num_anchors=9)
# optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
# criterion = nn.CrossEntropyLoss()

for param in model.parameters():
    param.requires_grad = True

model_size = sum(p.numel() for p in model.parameters() if p.requires_grad)
model_mem_mb = model_size * 4 / (1024 * 1024)
print("Model size (parameters):", model_size)
print("Model memory size (MB):", model_mem_mb)
optimizer = optim.Adam(model.parameters(), lr=0.001)



device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = model.to(device)

num_epochs = 30

for epoch in range(num_epochs):
    model.train()
    train_loss = 0.0

    for imgs, targets in tqdm(train_loader, desc=f"Epoch {epoch+1}/{num_epochs}"):
        imgs = imgs.to(device)
        targets = [t.to(device) for t in targets]

        optimizer.zero_grad()
        outputs = model(imgs)



        out_13, out_26, out_52 = model(imgs)

        pred13 = yolo_decode_output(out_13, num_classes=2)
        pred26 = yolo_decode_output(out_26, num_classes=2)
        pred52 = yolo_decode_output(out_52, num_classes=2)
        for bbox in targets:
            
            for i in range(pred13.shape[0]):
                p = pred13[i]  # [N, 6]

                if bbox.shape[0] == 0:
                    continue

                # convert cxcywh -> xyxy
                p_boxes = ops.box_convert(p[:, :4], in_fmt="cxcywh", out_fmt="xyxy")
                t_boxes = ops.box_convert(bbox[:, :4], in_fmt="cxcywh", out_fmt="xyxy")
                print("p_boxes:", p_boxes.shape)
                print("t_boxes:", t_boxes.shape)
                
                loss = siou(p_boxes, t_boxes)
                print("loss:", loss.item())
                break
            break
        print("Decoded output shapes:", pred13.shape)
        # print("Targets shapes:", targets.shape)
        # loss = caculate_loss_many_boxes(output_13,targets)
        break
        loss.backward()
        optimizer.step()

        train_loss += loss.item()

    break  # Uncomment this line to test only one epoch



    avg_train_loss = train_loss / len(train_loader)
    print(f"Epoch {epoch+1}/{num_epochs} - Train Loss: {avg_train_loss:.4f}")
    # break  for test one epoch
    # Validation
    model.eval()
    val_loss = 0.0
    with torch.no_grad():
        for imgs, targets in test_loader:
            imgs = imgs.to(device)
            targets = [t.to(device) for t in targets]

            outputs = model(imgs)
            loss = torch.rand(1).to(device)  # thay bằng loss YOLO thực tế
            val_loss += loss.item()

    avg_val_loss = val_loss / len(test_loader)
    print(f"Epoch {epoch+1}/{num_epochs} - Val Loss: {avg_val_loss:.4f}")

    # Lưu checkpoint
    torch.save(model.state_dict(), f"yolo_epoch_{epoch+1}.pt")


Model size (parameters): 25786365
Model memory size (MB): 98.3671760559082


Epoch 1/30:   0%|          | 0/37 [00:00<?, ?it/s]

p_boxes: torch.Size([3600, 4])
t_boxes: torch.Size([1, 4])


Epoch 1/30:   0%|          | 0/37 [00:05<?, ?it/s]


ValueError: too many values to unpack (expected 4)