# 构建数据集

In [1]:
import torch
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
from yaml import safe_load
from os.path import join, exists, getsize
from os import listdir
from PIL import Image

def collate_fn(data):
    imgs, labels = zip(*data)
    assert len(imgs) == len(labels)
 
    for i, lb in enumerate(labels):
        lb[:, 0] = i  # add target image index for build_targets()

    labels = torch.cat(labels, 0)
    imgs = torch.stack(imgs)
    return imgs, labels


class URPC2020_DataSet(Dataset):
    def __init__(self, data_yaml_path="/home/szt/projects/ultralytics/hg.yaml", type="train", transforms=None):
        self.data_yaml_path = data_yaml_path
        self.transforms = transforms
        with open(self.data_yaml_path, 'r') as file:
            self.data_dict = safe_load(file)
        self.nc = self.data_dict['nc']
        self.root_path = self.data_dict["path"]
        self.classes = self.data_dict["names"]

        if(type == "train"):
            self.images_path = join(self.root_path, self.data_dict["train"])
        elif type == "val":
            self.images_path = join(self.root_path, self.data_dict["val"])
        else:
            self.images_path = join(self.root_path, self.data_dict["test"])

        self.labels_path = f"{self.root_path}/{type}/labels"
        labels = listdir(self.labels_path)
        self.labels = self.filter_labels(labels)
    
    def filter_labels(self, labels):
        # filter empty labels
        labels = [label for label in labels if getsize(join(self.labels_path, label)) != 0]
        return labels
    
    def __len__(self):
        return len(self.labels)

    def __getitem__(self, index):
        label = self.labels[index]
        image = label[:-3] + "jpeg" #.jpg
        image_path = f"{self.images_path}/{image}"
        label_path = f"{self.labels_path}/{label}"

        img = Image.open(image_path).convert("RGB")
        label = []
        with open(label_path, "r") as file:
            for line in file:
                line_strip = line.strip().split(" ") #index in batch, cls, bbox
                label.append([int(0), int(line_strip[0]), float(line_strip[1]), float(line_strip[2]), float(line_strip[3]), float(line_strip[4])])
        img = self.transforms(img)
        label = torch.tensor(label, dtype=torch.float32)
        return img, label


def group_adjust(labels, batch_size):
    # 创建一个字典，按照第一列的值进行分组
    grouped_labels = [[] for i in range(batch_size)]
    for row in iter(labels):
        i = int(row[0].item())
        grouped_labels[i].append(row[1:])
    try:
        grouped_labels = [torch.stack(group) for group in grouped_labels]
    finally:
        return grouped_labels


def build_dataset(yaml_path="/home/szt/projects/ultralytics/hg.yaml", batch_size=16, data_transform=None):
    if(data_transform is None):
        data_transform = transforms.Compose([transforms.ToTensor(),
                                            transforms.Resize((640, 640), antialias=True), # image resize but label is ratio, so label is not need to change
                                            transforms.Normalize((0.1307,), (0.3081,))])
    # Create a custom dataset
    dataset = URPC2020_DataSet(yaml_path, transforms=data_transform)

    # Create a DataLoader
    dataloader = DataLoader(dataset,
                            batch_size=batch_size,
                            num_workers=4,
                            shuffle=True,
                            pin_memory=True,
                            collate_fn=collate_fn)

    return dataloader

In [32]:
data_loader = build_dataset()
for i, (imgs, labels) in enumerate(data_loader):
    print(imgs.size())
    print(labels.size())
    print(labels[0])
    break

torch.Size([16, 3, 640, 640])
torch.Size([16, 6])
tensor([0.0000, 2.0000, 0.5480, 0.4997, 0.0363, 0.0460])


# build a network

In [2]:
import torch
from torch import nn

class Conv(nn.Module):
    def __init__(self, in_channels, out_channels, kernel_size=3, stride=2, padding=1, bias=False, groups=1): # default half the size
        super(Conv, self).__init__()
        # (w-k+2p)/s + 1
        self.conv = nn.Conv2d(in_channels, out_channels, kernel_size, stride, padding, bias=bias, groups=groups)
        self.bn = nn.BatchNorm2d(out_channels)
        self.act = nn.LeakyReLU()

    def forward(self, x):
        return self.act(self.bn(self.conv(x)))

class ResidualBlock(nn.Module):
    def __init__(self, channels, use_residual=True, num_repeats=3):
        super(ResidualBlock, self).__init__()
        self.layers = nn.ModuleList([nn.Sequential(
                    Conv(channels, channels//2, kernel_size=1, stride=1, padding=0),
                    Conv(channels//2, channels, kernel_size=3, stride=1, padding=1)
                ) for _ in range(num_repeats)])

        self.use_residual = use_residual
        self.num_repeats = num_repeats

    def forward(self, x):
        for layer in self.layers:
            if self.use_residual:
                x = x + layer(x)
            else:
                x = layer(x)
        return x

class Head(nn.Module):
    def __init__(self, in_channels, num_classes=4, num_anchors=3):
        super(Head, self).__init__()
        self.cv1 = nn.Sequential(Conv(in_channels, in_channels // 2, stride=1), 
                                 nn.Conv2d(in_channels // 2, num_anchors*4, kernel_size=1))
        self.cv2 = nn.Sequential(Conv(in_channels, in_channels // 2, stride=1),
                                 nn.Conv2d(in_channels // 2, num_classes, kernel_size=1))

    def forward(self, x):
        pred_bbox = self.cv1(x)
        pred_cls = self.cv2(x)
        return pred_bbox, pred_cls

class MyYolo(nn.Module):
    def __init__(self, in_channels=3, num_classes=4, num_anchors=3):
        super(MyYolo, self).__init__()

        self.architecture = [
            (in_channels, 32), #0 output 320
            (32, 64), #1 160
            ("R", 64),
            (64, 128), #3 80
            ("R", 128),
            (128, 256), #5 40
            ("R", 256),
            (256, 512), #7 20
            ("R", 512),
            ("H", 512, num_classes, num_anchors) #9 bbox, cls
        ]

        self.layers = nn.ModuleList([self._make_layers(param) for param in self.architecture])

    def _make_layers(self, param):
        if param[0] == "R":
            layer = ResidualBlock(param[1])
        elif param[0] == "H":
            layer = Head(param[1], param[2], param[3])
        else:
            layer = Conv(param[0], param[1])
        return layer

    def forward(self, x):
        for layer in self.layers:
            x = layer(x)
        return x

model = MyYolo(num_anchors=1)
x = torch.randn(16, 3, 640, 640)
out = model(x)
print(out[0].shape, out[1].shape)

torch.Size([16, 4, 20, 20]) torch.Size([16, 4, 20, 20])


In [3]:
torch.onnx.export(model, x, "model.onnx")

In [4]:
# 迭代数据集
batch_size = 16
# data_loader = build_dataset(yaml_path="/home/szt/projects/ultralytics/urpc2020.yaml", batch_size=batch_size, data_transform=None)
data_loader = build_dataset(batch_size=batch_size, data_transform=None)

# utils functions

In [5]:
# copy from yolov8

import numpy as np
def clip_boxes(boxes, shape):
    """
    Takes a list of bounding boxes and a shape (height, width) and clips the bounding boxes to the shape.

    Args:
        boxes (torch.Tensor): the bounding boxes to clip
        shape (tuple): the shape of the image

    Returns:
        (torch.Tensor | numpy.ndarray): Clipped boxes
    """
    if isinstance(boxes, torch.Tensor):  # faster individually (WARNING: inplace .clamp_() Apple MPS bug)
        boxes[..., 0] = boxes[..., 0].clamp(0, shape[1])  # x1
        boxes[..., 1] = boxes[..., 1].clamp(0, shape[0])  # y1
        boxes[..., 2] = boxes[..., 2].clamp(0, shape[1])  # x2
        boxes[..., 3] = boxes[..., 3].clamp(0, shape[0])  # y2
    else:  # np.array (faster grouped)
        boxes[..., [0, 2]] = boxes[..., [0, 2]].clip(0, shape[1])  # x1, x2
        boxes[..., [1, 3]] = boxes[..., [1, 3]].clip(0, shape[0])  # y1, y2
    return boxes

def xyxy2xywh(x):
    """
    Convert bounding box coordinates from (x1, y1, x2, y2) format to (x, y, width, height) format where (x1, y1) is the
    top-left corner and (x2, y2) is the bottom-right corner.

    Args:
        x (np.ndarray | torch.Tensor): The input bounding box coordinates in (x1, y1, x2, y2) format.

    Returns:
        y (np.ndarray | torch.Tensor): The bounding box coordinates in (x, y, width, height) format.
    """
    assert x.shape[-1] == 4, f'input shape last dimension expected 4 but input shape is {x.shape}'
    y = torch.empty_like(x) if isinstance(x, torch.Tensor) else np.empty_like(x)  # faster than clone/copy
    y[..., 0] = (x[..., 0] + x[..., 2]) / 2  # x center
    y[..., 1] = (x[..., 1] + x[..., 3]) / 2  # y center
    y[..., 2] = x[..., 2] - x[..., 0]  # width
    y[..., 3] = x[..., 3] - x[..., 1]  # height
    return y


def xywh2xyxy(x):
    """
    Convert bounding box coordinates from (x, y, width, height) format to (x1, y1, x2, y2) format where (x1, y1) is the
    top-left corner and (x2, y2) is the bottom-right corner.

    Args:
        x (np.ndarray | torch.Tensor): The input bounding box coordinates in (x, y, width, height) format.

    Returns:
        y (np.ndarray | torch.Tensor): The bounding box coordinates in (x1, y1, x2, y2) format.
    """
    assert x.shape[-1] == 4, f'input shape last dimension expected 4 but input shape is {x.shape}'
    y = torch.empty_like(x) if isinstance(x, torch.Tensor) else np.empty_like(x)  # faster than clone/copy
    dw = x[..., 2] / 2  # half-width
    dh = x[..., 3] / 2  # half-height
    y[..., 0] = x[..., 0] - dw  # top left x
    y[..., 1] = x[..., 1] - dh  # top left y
    y[..., 2] = x[..., 0] + dw  # bottom right x
    y[..., 3] = x[..., 1] + dh  # bottom right y
    return y

def xyxy2xywhn(x, w=640, h=640, clip=False, eps=0.0):
    """
    Convert bounding box coordinates from (x1, y1, x2, y2) format to (x, y, width, height, normalized) format. x, y,
    width and height are normalized to image dimensions.

    Args:
        x (np.ndarray | torch.Tensor): The input bounding box coordinates in (x1, y1, x2, y2) format.
        w (int): The width of the image. Defaults to 640
        h (int): The height of the image. Defaults to 640
        clip (bool): If True, the boxes will be clipped to the image boundaries. Defaults to False
        eps (float): The minimum value of the box's width and height. Defaults to 0.0

    Returns:
        y (np.ndarray | torch.Tensor): The bounding box coordinates in (x, y, width, height, normalized) format
    """
    if clip:
        x = clip_boxes(x, (h - eps, w - eps))
    assert x.shape[-1] == 4, f'input shape last dimension expected 4 but input shape is {x.shape}'
    y = torch.empty_like(x) if isinstance(x, torch.Tensor) else np.empty_like(x)  # faster than clone/copy
    y[..., 0] = ((x[..., 0] + x[..., 2]) / 2) / w  # x center
    y[..., 1] = ((x[..., 1] + x[..., 3]) / 2) / h  # y center
    y[..., 2] = (x[..., 2] - x[..., 0]) / w  # width
    y[..., 3] = (x[..., 3] - x[..., 1]) / h  # height
    return y


# post process

In [18]:
# pred label transform
def get_pred_label(x, num_classes=4, anchors=torch.tensor([[10, 10]])):
    pred_bbox, pred_cls = x

    num_anchors = len(anchors)
    batch_size, _, grid_size, _ = pred_bbox.shape
    stride = 640 // grid_size

    pred_bbox = pred_bbox.view(batch_size, num_anchors*4, pred_bbox.size(2), pred_bbox.size(3)).permute(0, 2, 3, 1).contiguous() # b, h, w, 4*a
    x_offset, y_offset = torch.meshgrid(torch.arange(grid_size), torch.arange(grid_size))
    x_offset = x_offset.to(pred_bbox.device).float()
    y_offset = y_offset.to(pred_bbox.device).float()
    x_center = (torch.sigmoid(pred_bbox[..., 0]) + x_offset) * stride
    y_center = (torch.sigmoid(pred_bbox[..., 1]) + y_offset) * stride
    width = torch.sigmoid(pred_bbox[..., 2]) * anchors[:, 0].view(1, 1, 1)
    height = torch.sigmoid(pred_bbox[..., 3]) * anchors[:, 1].view(1, 1, 1)
    x_min = x_center - width / 2.0
    y_min = y_center - height / 2.0
    x_max = x_center + width / 2.0
    y_max = y_center + height / 2.0
    decoded_boxes = torch.stack([x_min, y_min, x_max, y_max], dim=-1)
    decoded_boxes = decoded_boxes.view(batch_size, -1, num_anchors*4) # b, h*w, num_class, num_anchors*4

    pred_cls = pred_cls.view(batch_size, num_classes, pred_cls.size(2), pred_cls.size(3)).permute(0, 2, 3, 1).contiguous() # b, h, w, c
    pred_cls = pred_cls.view(batch_size, -1, num_classes) # b, h*w, num_class
    pred_cls = torch.sigmoid(pred_cls)
    return decoded_boxes, pred_cls

import numpy as np
def bbox_iou(bbox1, bbox2, eps=1e-8):
    """
    Compute the Intersection over Union (IoU) of two bounding boxes.
    :param bbox1: bounding box No.1.
    :param bbox2: bounding box No.2.
    :return: IoU of bbox1 and bbox2.
    """
    x1, y1, w1, h1 = bbox1
    xmin1, ymin1 = int(x1 - w1 / 2.0), int(y1 - h1 / 2.0)
    xmax1, ymax1 = int(x1 + w1 / 2.0), int(y1 + h1 / 2.0)
    x2, y2, w2, h2 = bbox2
    xmin2, ymin2 = int(x2 - w2 / 2.0), int(y2 - h2 / 2.0)
    xmax2, ymax2 = int(x2 + w2 / 2.0), int(y2 + h2 / 2.0)

    # Calculate intersection coordinates
    xx1 = np.max([xmin1, xmin2])
    yy1 = np.max([ymin1, ymin2])
    xx2 = np.min([xmax1, xmax2])
    yy2 = np.min([ymax1, ymax2])

    # Calculate intersection area
    w = np.max([0.0, xx2 - xx1 + 1])
    h = np.max([0.0, yy2 - yy1 + 1])
    area_intersection = w * h

    # Calculate union area (subtract overlapping area to avoid double counting)
    area1 = (xmax1 - xmin1 + 1) * (ymax1 - ymin1 + 1)
    area2 = (xmax2 - xmin2 + 1) * (ymax2 - ymin2 + 1)
    area_union = area1 + area2 - area_intersection

    # Calculate IoU
    iou = area_intersection / (area_union + eps)
    return iou

def bboxes_iou(bbox1, bboxes):
    # return iou of bbox1 and bboxes
    ious = []
    for bbox2 in bboxes:
        iou = bbox_iou(bbox1, bbox2)
        ious.append(iou)
    return torch.tensor(ious)


# nms, non-maximum suppression
def nms_(pred_bbox, pred_cls, iou_threshold=0.5, cls_threshold=0.3):
    # pred_bbox: h*w, num_anchors*4
    # pred_cls: h*w
    indices = []
    
    # according to cls scores, sort the bbox
    sorted_indices = torch.argsort(pred_cls, descending=True)
    while sorted_indices.size(0) > 0:
        i = sorted_indices[0]
        if(pred_cls[i] < cls_threshold): break
        indices.append(i)
        if sorted_indices.size(0) == 1:
            break
        iou = bboxes_iou(pred_bbox[i], pred_bbox[sorted_indices[1:]])
        sorted_indices = sorted_indices[1:][iou < iou_threshold]
    # 
    return torch.stack(indices)


# loss
box_loss = nn.MSELoss()
cls_loss = nn.CrossEntropyLoss()
def calculate_loss(pred_bbox, pred_cls, gt_bboxes, gt_clss):
    b_loss = box_loss(pred_bbox, gt_bboxes)
    c_loss = cls_loss(pred_cls, gt_clss)
    loss = b_loss + c_loss
    return loss


# assigner
def pred_gt_assigner(pred_boxes, gt_boxes, iou_threshold=0.5, max_num_negatives=3):

    h = len(gt_boxes)
    w = pred_boxes.shape[0]
    assgined_dict = {}
    for i in range(h):
        assgined_dict[i] = []

    # 遍历每一个真实标签框  
    for i, gt_box in enumerate(gt_boxes):  
        
        # 遍历每一个预测框  
        for j, pred_box in enumerate(pred_boxes):  
            # 计算IoU  
            iou = bbox_iou(gt_box, pred_box)
            if(iou > iou_threshold):
                assgined_dict[i].append(j)
    
    return assgined_dict

# train

In [30]:

from torchvision.ops import nms
from torch.optim import SGD
from tqdm import tqdm
import time

optimizer = SGD(model.parameters(), lr=0.01, weight_decay=0.005)  # 使用随机梯度下降优化器  
epochs = 50  # 训练100轮

for epoch in tqdm(range(epochs)):
    start_time = time.time()

    loss_t = []
    for batch in data_loader:
        features, labels = batch
        b_t = features.shape[0]
        labels = group_adjust(labels, b_t) # b, n, 5; list of tensor
        if(len(labels) < b_t):
            continue
        
        preds = model(features) # first! consider output box as center and width, height
        pred_bboxes_xyxy, pred_clss = get_pred_label(preds)
        
        # when training , do not 执行NMS
        # keep_indices = [[nms(bbox, pred_clss[i].max(dim=1)[0], 0.5)] for i in range(pred_clss.shape[0])] # spend batch_size * 6.5s, need to optimize
        # keep_indices = [nms(pred_bboxes_xyxy[i], pred_clss[i].max(dim=1)[0], 0.5) for i in range(b_t)]

        # keep_indices中存储了保留的预测框的索引
        # 可以根据这些索引获取最终的目标框

        gt_clss = [labels[i][:, 0] for i in range(b_t)]# n
        gt_clss = [gt_clss[i].type(torch.long) for i in range(b_t)]
        gt_bboxes = [labels[i][:, 1:] for i in range(b_t)]# n, 4
        gt_bboxes_xyxy = [xywh2xyxy(gt_bboxes[i]) * 640 for i in range(b_t)]

        # task assigner
        loss = torch.tensor(0)
        n = 0
        assigned_dicts = [pred_gt_assigner(gt_boxes=gt_bboxes_xyxy[i], pred_boxes=pred_bboxes_xyxy[i], iou_threshold=0.5, max_num_negatives=1) for i in range(b_t)]
        for i in range(b_t):  # 针对没张图片
            gt_bboxes_xyxy_i = gt_bboxes_xyxy[i]
            gt_clss_i = gt_clss[i]
            assigned_dict = assigned_dicts[i]

            for b_i in range(len(gt_bboxes_xyxy_i)): # 针对图中的的每一个目标框
                if b_i in assigned_dict:
                    pred_bbox = pred_bboxes_xyxy[i][assigned_dict[b_i]]
                    pred_cls = pred_clss[i][assigned_dict[b_i]]
                    gt_bbox = gt_bboxes_xyxy_i[b_i]
                    gt_cls = gt_clss_i[b_i]
                    for k in range(len(pred_bbox)):
                        loss = loss + calculate_loss(pred_bbox[k], pred_cls[k], gt_bbox, gt_cls)
                        n = n + 1
        loss = loss / n
        loss_t.append(loss.item())

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

    end_time = time.time()
    print("epoch: {epoch:4}, time: {time:^10.4f}, loss: {loss:^10.4f}".format(epoch=epoch, time=end_time-start_time, loss=np.mean(loss_t)))

# bbox1 = torch.tensor([100, 100, 50, 50])
# bbox2 = torch.tensor([75, 75, 50, 50])
# iou = bbox_iou(bbox1, bbox2)
# print(iou)

  2%|▏         | 1/50 [01:03<51:44, 63.36s/it]

epoch:    0, time:  63.3628  , loss: 2954.2907 


  4%|▍         | 2/50 [02:06<50:41, 63.36s/it]

epoch:    1, time:  63.3602  , loss: 2974.3583 


  6%|▌         | 3/50 [03:10<49:40, 63.41s/it]

epoch:    2, time:  63.4604  , loss: 2957.4438 


  8%|▊         | 4/50 [04:13<48:36, 63.41s/it]

epoch:    3, time:  63.4069  , loss: 2950.3857 


 10%|█         | 5/50 [05:17<47:34, 63.44s/it]

epoch:    4, time:  63.4838  , loss: 2983.6090 


 12%|█▏        | 6/50 [06:20<46:30, 63.43s/it]

epoch:    5, time:  63.4039  , loss: 2978.7689 


 14%|█▍        | 7/50 [07:23<45:26, 63.40s/it]

epoch:    6, time:  63.3402  , loss: 2966.8249 


 16%|█▌        | 8/50 [08:27<44:23, 63.41s/it]

epoch:    7, time:  63.4326  , loss: 2954.3005 


 18%|█▊        | 9/50 [09:30<43:17, 63.35s/it]

epoch:    8, time:  63.2317  , loss: 2970.1861 


 20%|██        | 10/50 [10:33<42:15, 63.39s/it]

epoch:    9, time:  63.4564  , loss: 2973.4811 


 20%|██        | 10/50 [10:53<43:33, 65.33s/it]


KeyboardInterrupt: 

In [None]:
torch.save(model, "hg_model.pth") # save model

# predict and visualize

In [None]:
import cv2

img = Image.open("/home/szt/datasets/hg_underwater_target/hard_samples_test/1702521478.434.3239.jpeg").convert("RGB")
original_img = cv2.imread("/home/szt/datasets/hg_underwater_target/hard_samples_test/1702521478.434.3239.jpeg", cv2.COLOR_BGR2RGB)
original_img = cv2.resize(original_img, (640, 640))
data_transform = transforms.Compose([transforms.ToTensor(),
                                            transforms.Resize((640, 640), antialias=True), # image resize but label is ratio, so label is not need to change
                                            transforms.Normalize((0.1307,), (0.3081,))])
img = data_transform(img)
img = torch.unsqueeze(img, 0)
out = model(img)
print(len(out))
print(out[0].shape, out[1].shape) # bbox, cls

bboxes, clss = get_pred_label(out)
bt = bboxes.shape[0]
print(bboxes[0].shape, clss[0].shape)

# 执行NMS
keep_indices = [nms_(bboxes[i], clss[i].max(dim=1)[0], 0.3, 0.7) for i in range(bt)]
print(keep_indices)
print(bboxes[0].shape, clss[0].shape)
# keep_indices中存储了保留的预测框的索引
# 可以根据这些索引获取最终的目标框
bbox = [bboxes[i][keep_indices[i]] for i in range(0, bt)]
score = [clss[i][keep_indices[i]].max(dim=1)[0] for i in range(0, bt)]
print(bbox[0].shape, score[0].shape)

# filer score
for box, s in zip(bbox[0], score[0]):
    cv2.rectangle(original_img, (int(box[0]), int(box[1])), (int(box[2]), int(box[3])), (0, 255, 0), 1)
cv2.imwrite("test.jpg", original_img)