# Library imports



In [1]:
import numpy as np
import pandas as pd

import torch
import torch.nn as nn
import torch.optim as optim
from torchinfo import summary
from torch.utils.data import Dataset, DataLoader
import matplotlib.pyplot as plt
from torchvision import transforms
from torch.cuda.amp import GradScaler, autocast

import cv2
from PIL import Image
import os
from tqdm import tqdm

# Config

In [None]:
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
CHECKPOINT = "checkpoint.pth.tar"

ANCHORS = [
    [(0.28, 0.22), (0.38, 0.48), (0.9, 0.78)],
    [(0.07, 0.15), (0.15, 0.11), (0.14, 0.29)],
    [(0.02, 0.03), (0.04, 0.07), (0.08, 0.06)],
]
NUM_CLASSES = 3
BATCH_SIZE=32
LEARNING_RATE=1e-5
EPOCHS=20
IMAGE_SIZE=640
STRIDE=[32, 16, 8]
WEIGHT_DECAY=0
GRID_SIZE=[ (IMAGE_SIZE//s) for s in STRIDE]
CLASS_LABEL=['normal','mite','virus']
IMG_FOLDER_DIR='/kaggle/input/dr-bee/images/train'
CSV_DIR='/kaggle/input/dr-bee/train.csv'

# Model Architecture

## Blocks

In [2]:
# Basic Conv Block 정의
class CNNBlock(nn.Module):
    def __init__(self, in_channels, out_channels, kernel_size, stride=1,padding=0):
        super().__init__()

        self.conv = nn.Sequential(
            # TODO : (Conv, BatchNorm, LeakyReLU) 스펙 보고 구현
            nn.Conv2d(in_channels, out_channels, kernel_size=kernel_size, bias=False,stride=stride,padding=padding),
            nn.BatchNorm2d(out_channels),
            nn.LeakyReLU(0.1)
        )

    def forward(self, x):
        return self.conv(x)


# ResidualBlock 정의
class ResidualBlock(nn.Module):
    def __init__(self, in_channels, num_repeats=1): # residual block은 input channel 수와 output channel 수가 동일하다.
        super().__init__()
        res_layers=[]
        for _ in range(num_repeats):
            res_layers.append(nn.Sequential(
            CNNBlock(in_channels,in_channels//2,kernel_size=1,stride=1,padding=0),
            CNNBlock(in_channels//2,in_channels,kernel_size=3,stride=1,padding=1),
        ))
        self.layers = nn.ModuleList(res_layers)


    def forward(self, x):
        for layer in self.layers:
            skip_connection = x
            x = layer(x)
            x+=skip_connection
        return x



# DarkNet53 정의
class Darknet53(nn.Module):
    def __init__(self,in_channels=3):
        super().__init__()
        # TODO : define darknet53 (위에서 정의한 Conv block과 Res block 활용)
        self.block1 = nn.Sequential(
            CNNBlock(in_channels, 32, kernel_size=3, stride=1, padding=1),
            CNNBlock(32, 64, kernel_size=3, stride=2, padding=1),
            ResidualBlock(64, num_repeats=1),
            CNNBlock(64, 128, kernel_size=3, stride=2, padding=1),
            ResidualBlock(128, num_repeats=2),
            CNNBlock(128, 256, kernel_size=3, stride=2, padding=1),
            ResidualBlock(256, num_repeats=8),
        )
        self.block2 = nn.Sequential(
            CNNBlock(256, 512, kernel_size=3, stride=2, padding=1),
            ResidualBlock(512, num_repeats=8),
        )
        self.block3 = nn.Sequential(
            CNNBlock(512, 1024, kernel_size=3, stride=2, padding=1),
            ResidualBlock(1024, num_repeats=4),
        )

    def forward(self, x):
        # TODO : Darknet53에서 output으로 나오는 세가지 feature map 생산
        high_feature_map = self.block1(x)
        medium_feature_map = self.block2(high_feature_map)
        low_feature_map = self.block3(medium_feature_map)
        return high_feature_map, medium_feature_map, low_feature_map

class UpSampling(nn.Module):
    def __init__(self, in_channels, out_channels):
        super().__init__()

        self.upsample = nn.Sequential(
            # TODO : YOLO Network Architecture에서 Upsampling에 사용
            CNNBlock(in_channels, out_channels, kernel_size=1, stride=1, padding=0),
            nn.Upsample(scale_factor=2)
        )
    def forward(self, x):
        return self.upsample(x)


class YoloBlock(nn.Module):
    def __init__(self,in_channels,out_channels):
        super().__init__()
        self.route_conv = nn.Sequential(
            # TODO : define route conv & output conv
            CNNBlock(in_channels, out_channels, kernel_size=1, stride=1, padding=0),
            CNNBlock(out_channels, out_channels*2, kernel_size=3, stride=1, padding=1),
            CNNBlock(out_channels*2, out_channels, kernel_size=1, stride=1, padding=0),
            CNNBlock(out_channels, out_channels*2, kernel_size=3, stride=1, padding=1),
            CNNBlock(out_channels*2, out_channels, kernel_size=1, stride=1, padding=0),
        )

    def forward(self, x):
        route = self.route_conv(x)
        return route        #DetectionLayer로 전달


class DetectionLayer(nn.Module):
    def __init__(self, in_channels, num_classes):
        super().__init__()
        self.num_classes = num_classes
        # TODO : YOLO Network에서 output 된 결과를 이용하여 prediction

        self.pred=nn.Sequential(
            CNNBlock(in_channels, in_channels*2, kernel_size=3, stride=1, padding=1),
            nn.Conv2d(in_channels*2,(num_classes+5)*3 , kernel_size=1,stride=1,padding=0)
        )

    def forward(self, x):
        output = self.pred(x)
        # [batch size, (#bounding box) * (predicted bounding box), cell_width, cell_height ]
        output = output.view(x.size(0), 3, self.num_classes + 5, x.size(2), x.size(3))
        # [batch size, #bounding box,predicted bounding box, cell_width, cell_height ]
        output = output.permute(0, 1, 3, 4, 2)
        # [batch size, #bounding box, cell_width, cell_height ,predicted bounding box]
        return output

## yolov3 architecture

In [3]:
class YOLOv3(nn.Module):
    def __init__(self, in_channels=3,num_classes = 3):
        super().__init__()

        self.num_classes = num_classes

        self.darknet = Darknet53(in_channels=in_channels)

        self.yolo_block_01 = YoloBlock(1024,512)
        self.detectlayer_01 = DetectionLayer(512, num_classes)
        self.upsample_01 = UpSampling(512, 256)

        # input_channels : darknet53 feature map 02 채널(512) + upsampling 채널(256)
        self.yolo_block_02 = YoloBlock(512 + 256, 256)
        self.detectlayer_02 = DetectionLayer(256, num_classes)
        self.upsample_02 = UpSampling(256, 128)

        # input_channels : darknet53 feature map 01 채널(256) + upsampling 채널(128)
        self.yolo_block_03 = YoloBlock(256 + 128, 128)
        self.detectlayer_03 = DetectionLayer(128, num_classes)

    def forward(self, x):
        high_feature_map, medium_feature_map, low_feature_map =self.darknet(x)

        x= self.yolo_block_01(low_feature_map)
        output_01 = self.detectlayer_01(x)
        x = self.upsample_01(x)

        x = self.yolo_block_02(torch.cat([x,medium_feature_map], dim=1))
        output_02 = self.detectlayer_02(x)
        x = self.upsample_02(x)

        x = self.yolo_block_03(torch.cat([x, high_feature_map], dim=1))
        output_03 = self.detectlayer_03(x)

        return output_01, output_02, output_03

## Model test

In [9]:
# num_classes = 3
#
# # Creating model and testing output shapes
# model = YOLOv3(num_classes=num_classes)
# x = torch.randn((1, 3, IMAGE_SIZE, IMAGE_SIZE))
# out = model(x)
# print(out[0].shape)
# print(out[1].shape)
# print(out[2].shape)
#
# # Asserting output shapes
# assert model(x)[0].shape == (1, 3, IMAGE_SIZE // 32, IMAGE_SIZE // 32, num_classes + 5) # B, RGB, cell size, cell size, (c, x, y, w, h) + classes_prob
# assert model(x)[1].shape == (1, 3, IMAGE_SIZE // 16, IMAGE_SIZE // 16, num_classes + 5)
# assert model(x)[2].shape == (1, 3, IMAGE_SIZE // 8, IMAGE_SIZE // 8, num_classes + 5)
# print("Output shapes are correct!")
#
# # torch summary
# summary(model, input_size=(2, 3, IMAGE_SIZE, IMAGE_SIZE), device="cpu")

torch.Size([1, 3, 13, 13, 8])
torch.Size([1, 3, 26, 26, 8])
torch.Size([1, 3, 52, 52, 8])
Output shapes are correct!


Layer (type:depth-idx)                                       Output Shape              Param #
YOLOv3                                                       [2, 3, 13, 13, 8]         --
├─Darknet53: 1-1                                             [2, 256, 52, 52]          --
│    └─Sequential: 2-1                                       [2, 256, 52, 52]          --
│    │    └─CNNBlock: 3-1                                    [2, 32, 416, 416]         928
│    │    └─CNNBlock: 3-2                                    [2, 64, 208, 208]         18,560
│    │    └─ResidualBlock: 3-3                               [2, 64, 208, 208]         20,672
│    │    └─CNNBlock: 3-4                                    [2, 128, 104, 104]        73,984
│    │    └─ResidualBlock: 3-5                               [2, 128, 104, 104]        164,608
│    │    └─CNNBlock: 3-6                                    [2, 256, 52, 52]          295,424
│    │    └─ResidualBlock: 3-7                               [2, 256, 52

# Define Util & Loss function
참고 자료 : https://www.geeksforgeeks.org/yolov3-from-scratch-using-pytorch/

## Setting anchors

### pytorch method 사용 예정

In [None]:
## Detection utils
# def Giou(box1:torch.tensor, box2:torch.tensor):
#     # [x_center,y_center,width,height]
#     # batch를 고려하여 차원 유지
#     box1_x1 = (box1[..., 0:1] - box1[..., 2:3]) * 0.5
#     box1_y1 = (box1[..., 1:2] - box1[..., 3:4]) * 0.5
#     box1_x2 = (box1[..., 0:1] + box1[..., 2:3]) * 0.5
#     box1_y2 = (box1[..., 1:2] + box1[..., 3:4]) * 0.5
#
#     box2_x1 = (box2[..., 0:1] - box2[..., 2:3]) * 0.5
#     box2_y1 = (box2[..., 1:2] - box2[..., 3:4]) * 0.5
#     box2_x2 = (box2[..., 0:1] + box2[..., 2:3]) * 0.5
#     box2_y2 = (box2[..., 1:2] + box2[..., 3:4]) * 0.5
#
#     #intersection 계산
#     inter_x1 = torch.max(box1_x1, box2_x1)
#     inter_x2 = torch.min(box1_x2, box2_x2)
#     inter_y1 = torch.max(box1_y1, box2_y1)
#     inter_y2 = torch.min(box1_y2, box2_y2)
#
#     inter_area = torch.clamp(inter_x2 - inter_x1, min=0) * torch.clamp(inter_y2 - inter_y1, min=0)
#     box1_area = (box1_x2 - box1_x1) * (box1_y2 - box1_y1)
#     box2_area = (box2_x2 - box2_x1) * (box2_y2 - box2_y1)
#     union_area = box1_area + box2_area - inter_area
#
#     iou = inter_area / torch.clamp(union_area, min=1e-6)
#
#     enclosing_x1 = torch.min(box1_x1, box2_x1)
#     enclosing_y1 = torch.min(box1_y1, box2_y1)
#     enclosing_x2 = torch.max(box1_x2, box2_x2)
#     enclosing_y2 = torch.max(box1_y2, box2_y2)
#     enclosing_area = torch.clamp(enclosing_x2 - enclosing_x1, min=0) * torch.clamp(enclosing_y2 - enclosing_y1, min=0)
#
#     giou = iou - (enclosing_area - union_area) / torch.clamp(enclosing_area, min=1e-6)
#
#     return giou

## Detection utils

In [6]:
def convert_cells_to_bboxes(predictions :torch.tensor , anchors:list, stride :int):
    # [confidence, x, y, width, height, P(class1),P(class2),P(class3)]
    batch_size = predictions.shape[0]
    num_anchors = len(anchors)
    grid_size = predictions.shape[2]

    anchors = torch.tensor(anchors, device=predictions.device).reshape(1, num_anchors, 1, 1, 2)

    box_predictions = predictions[..., 1:5]

    box_predictions[..., 0:2] = torch.sigmoid(box_predictions[..., 0:2])
    box_predictions[..., 2:4] = torch.exp(box_predictions[..., 2:4]) * anchors

    scores = torch.sigmoid(predictions[..., 0:1])
    best_class = torch.argmax(predictions[..., 5:], dim=-1).unsqueeze(-1)

    grid_indices = torch.arange(grid_size, device=predictions.device).repeat(batch_size, num_anchors, grid_size, 1).unsqueeze(-1)

    x = (box_predictions[..., 0:1] + grid_indices) * stride
    y = (box_predictions[..., 1:2] + grid_indices.permute(0, 1, 3, 2, 4)) * stride

    width_height = box_predictions[..., 2:4] * stride

    converted_bboxes = torch.cat((scores, x, y, width_height,best_class), dim=-1)
    converted_bboxes = converted_bboxes.reshape(batch_size, num_anchors * grid_size * grid_size, 6)

    return converted_bboxes

def plot_image(image, boxes):

    plt.show()



## Model checkpoint

In [7]:
def save_checkpoint(model, optimizer, filename = "dr_bee_checkpoint.ptr.tar"):
    print("==> Saving checkpoint")
    checkpoint = {
        "state_dict": model.state_dict(),
        "optimizer": optimizer.state_dict(),
    }
    torch.save(checkpoint, filename)

# Function to load checkpoint
def load_checkpoint(checkpoint_file, model, optimizer, lr, device):
    print("==> Loading checkpoint")
    checkpoint = torch.load(checkpoint_file, map_location=device)
    model.load_state_dict(checkpoint["state_dict"])
    optimizer.load_state_dict(checkpoint["optimizer"])

    for param_group in optimizer.param_groups:
        param_group["lr"] = lr






## Loss function

In [8]:
class YoloLoss(nn.Module):
    def __init__(self, num_classes, anchors):
        super().__init__()
        self.num_classes = num_classes
        self.anchors = anchors
        self.mse_loss = nn.MSELoss()
        self.bce_loss = nn.BCEWithLogitsLoss()
        self.ce_loss = nn.CrossEntropyLoss()

    def forward(self, predictions, targets, anchors):
        # 손실 계산 (objectness, class, bbox)
        objectness_loss = self.bce_loss(predictions[..., 0], targets[..., 0])
        class_loss = self.ce_loss(predictions[..., 5:], targets[..., 5].long())
        box_loss = self.mse_loss(predictions[..., 1:5], targets[..., 1:5])
        return objectness_loss + class_loss + box_loss


# Data Load

In [None]:
class YoloDataset(Dataset):
    def __init__(self, csv_file, img_folder, transform=None):
        """
        Args:
            csv_file (str): 라벨 정보를 담고 있는 CSV 파일 경로
            img_folder (str): 이미지 폴더 경로
            transform (callable, optional): 이미지에 적용할 변환 (예: 크기 조정, 데이터 증강)
        """
        self.labels = pd.read_csv(csv_file)  # CSV 파일 읽기
        self.img_folder = img_folder
        self.transform = transform

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):
        # CSV에서 이미지 이름 및 YOLO 형식의 데이터 가져오기
        image_id = self.labels.iloc[idx]['image_id']
        x_center = self.labels.iloc[idx]['x_center']
        y_center = self.labels.iloc[idx]['y_center']
        width = self.labels.iloc[idx]['width']
        height = self.labels.iloc[idx]['height']
        class_id = self.labels.iloc[idx]['class_id']

        # 바운딩 박스 및 클래스 정보
        bbox = torch.tensor([x_center, y_center, width, height], dtype=torch.float32)
        label = torch.tensor(class_id, dtype=torch.long)

        # 이미지 로드
        img_path = os.path.join(self.img_folder, image_id)
        image = Image.open(img_path).convert("RGB")

        # 이미지 변환 적용
        if self.transform:
            image = self.transform(image)

        # 추가 메타 정보 (필요 시)
        disease = self.labels.iloc[idx]['disease']  # 선택적으로 활용 가능

        return image, bbox, label


transform = transforms.Compose([
    transforms.ToTensor(),
])


# 데이터셋 객체 생성
dataset = YoloDataset(csv_file=CSV_DIR, img_folder=IMG_FOLDER_DIR, transform=transform)

# DataLoader 생성
dataloader = DataLoader(dataset, batch_size=BATCH_SIZE, shuffle=True)


# Training

In [None]:
model = YOLOv3(in_channels=3,num_classes=NUM_CLASSES).to(DEVICE)
optimizer = optim.AdamW (model.parameters(), lr = LEARNING_RATE, weight_decay = WEIGHT_DECAY)
loss_fn = YoloLoss(NUM_CLASSES)
scaler = GradScaler()
scaled_anchors = (
    torch.tensor(ANCHORS)
    * torch.tensor(STRIDE).unsqueeze(1).unsqueeze(1).repeat(1, 3, 2)
).to(DEVICE)

In [None]:
def training_loop(loader, model, optimizer, loss_fn, scaler, scaled_anchors):
    # Creating a progress bar
    progress_bar = tqdm(loader, leave=True)

    # Initializing a list to store the losses
    losses = []

    # Iterating over the training data
    for _, (x, y) in enumerate(progress_bar):
        x = x.to(DEVICE)
        y0, y1, y2 = (
            y[0].to(DEVICE),
            y[1].to(DEVICE),
            y[2].to(DEVICE),
        )

        with torch.cuda.amp.autocast():
            # Getting the model predictions
            outputs = model(x)
            # Calculating the loss at each scale
            loss = (
                  loss_fn(outputs[0], y0, scaled_anchors[0])
                + loss_fn(outputs[1], y1, scaled_anchors[1])
                + loss_fn(outputs[2], y2, scaled_anchors[2])
            )

        # Add the loss to the list
        losses.append(loss.item())

        # Reset gradients
        optimizer.zero_grad()

        # Backpropagate the loss
        scaler.scale(loss).backward()

        # Optimization step
        scaler.step(optimizer)

        # Update the scaler for next iteration
        scaler.update()

        # update progress bar with loss
        mean_loss = sum(losses) / len(losses)
        progress_bar.set_postfix(loss=mean_loss)