In [None]:
from torchvision.utils import draw_bounding_boxes
import torchvision
import torch
from torchvision.ops import nms
from torch.utils.data import Dataset
from PIL import Image
import os
from torchvision import io, utils
from torchvision.transforms import v2 as T
from torchvision.transforms.v2 import functional as F
import matplotlib.pyplot as plt
from torchvision import tv_tensors
from torchinfo import summary
import cv2
import numpy as np
import albumentations as A
from albumentations.pytorch import ToTensorV2

# Prepare dataset

In [None]:
# for pascal_voc format and .png image only (CV2 and Albumentation)
class CustomDataset(Dataset):
    def __init__(self, root, split, image_type, transforms=None):
        self.root = root
        self.split = split
        self.transforms = transforms
        self.image_names = os.listdir(os.path.join(self.root, self.split, 'images'))
        self.label_names = [name.replace(f'.{image_type}', '.txt') for name in self.image_names]

    def __len__(self):
        return len(self.image_names)

    def __getitem__(self, idx):
        # Load image
        image_path = os.path.join(self.root, self.split, 'images', self.image_names[idx])
        image = cv2.imread(image_path)
#         print(image)
        if image is None:
            raise Exception(f"Error reading image: {image_path}")
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
                    
        # Load bounding boxes and labels
        label_path = os.path.join(self.root, self.split, 'labels', self.label_names[idx])
        with open(label_path, "r") as file:
            lines = file.read().split('\n')
#         print(lines)
        
        boxes = []
        labels = []
        for i in range(len(lines)):
            line = lines[i].split()

            boxes.append([
                float(line[1]),
                float(line[2]),
                float(line[3]),
                float(line[4]),
            ])
            labels.append(int(line[0])+1)
            
        if self.transforms is not None:
            augmented = self.transforms(image=image, bboxes=boxes, labels=labels)
            image = augmented['image']
            boxes = [list(box) for box in augmented['bboxes']]
            labels = augmented['labels']
            
        heigth, width, _ = image.shape
#         # Convert image to torch tensor
        image = torch.from_numpy(np.transpose(image, (2, 0, 1)))

        boxes = torch.tensor(boxes, dtype=torch.float32)
        boxes = tv_tensors.BoundingBoxes(boxes, format="XYWH", canvas_size=(heigth, width))
        labels = torch.tensor(labels, dtype=torch.int64)
#         # suppose all instances are not crowd
        iscrowd = torch.tensor([0]*len(labels), dtype=torch.int64)
    
        target = {}
        target["boxes"] = boxes
        target["labels"] = labels
        target["image_id"] = idx
        try:
            target["area"] = boxes[:, 2] * boxes[:, 3]
        except IndexError:
            target["area"] = torch.tensor([], dtype=torch.float32)
        target["iscrowd"] = iscrowd
            
        return image, target


In [None]:
def visualize_box_target(image, target, color='blue', width=2, figsize=(8, 8)):
    boxes = target['boxes']
    
    result = draw_bounding_boxes(image, boxes, colors=color, width=width)
    result = T.ToPILImage()(result)
    plt.figure(figsize=figsize)
    plt.imshow(result)
    plt.show()
    

## augment and show a result

In [None]:
transform = A.Compose([
            A.Resize(height=512, width=512, p=1),
            A.HorizontalFlip(p=0.5),
            A.VerticalFlip(p=0.5),
            A.ShiftScaleRotate(shift_limit=0.1, 
                                scale_limit=0.1, 
                                rotate_limit=15, 
                                border_mode=cv2.BORDER_CONSTANT, 
                                value=0,
                                p=0.5),
            A.HueSaturationValue(hue_shift_limit=180*0.1,
                                 sat_shift_limit=255*0.05, 
                                 val_shift_limit=255*0.05, 
                                 p=0.5),
], bbox_params=A.BboxParams(format='pascal_voc', label_fields=['labels'], min_visibility=0.0, min_area=100))

train_data = CustomDataset(
    root='data/fold1_pascal',
    split='train',
    image_type='png',
    transforms=transform
)

for i in range(len(train_data)):
    image, target = train_data[i]
#     print(target)
#     print(image)
    visualize_box_target(image, target, figsize=(6, 6))

## augmentation setting

In [None]:
# Albumentation
def get_transform(train):
    if train:
        transforms = [
            A.Resize(height=512, width=512, p=1),
            
            A.HorizontalFlip(p=0.5),
            A.VerticalFlip(p=0.5),
            A.ShiftScaleRotate(shift_limit=0.1, 
                                scale_limit=0.1, 
                                rotate_limit=15, 
                                border_mode=cv2.BORDER_CONSTANT, 
                                value=0,
                                p=0.5),
            A.HueSaturationValue(hue_shift_limit=180*0.1,
                                 sat_shift_limit=255*0.05, 
                                 val_shift_limit=255*0.05, 
                                 p=0.5),
            
            A.Normalize()
                    ]
    else:
        transforms = [
            A.Resize(height=512, width=512, p=1),
            A.Normalize()
        ]
        
    return A.Compose(transforms, 
                     bbox_params=A.BboxParams(format='pascal_voc', label_fields=['labels'], min_visibility=0.0, min_area=100))

## data loader

In [None]:
import utils

# our dataset has two classes only - background and cabbage
num_classes = 2
batch_size = 16

# use our dataset and defined transformations
train_dataset = CustomDataset(
    root='data/fold1_pascal',
    split='train',
    image_type='png',
    transforms=get_transform(train=True)
)

val_dataset = CustomDataset(
    root='data/fold1_pascal',
    split='val',
    image_type='png',
    transforms=get_transform(train=False)
)

# define training and validation data loaders
train_loader = torch.utils.data.DataLoader(
    train_dataset,
    batch_size=batch_size,
    num_workers=8,
    shuffle=True,
    collate_fn=utils.collate_fn
)

val_loader = torch.utils.data.DataLoader(
    val_dataset,
    batch_size=batch_size,
    num_workers=8,
    shuffle=False,
    collate_fn=utils.collate_fn
)

In [None]:
print(f'Training set: #image = {len(train_dataset)}, batch size = {len(train_loader)}')    # amount of steps for training
print(f'Validation set: #image = {len(val_dataset)}, batch size = {len(val_loader)}')    

## Faster R-CNN

## model structure

In [None]:
import torchvision
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor

In [None]:
def get_model(num_classes, device):
    # load an object detection model pre-trained on COCO
    model = torchvision.models.detection.fasterrcnn_resnet50_fpn(weights="COCO_V1")

    # get number of input features for the classifier
    in_features = model.roi_heads.box_predictor.cls_score.in_features
    # replace the pre-trained head with a new one
    model.roi_heads.box_predictor = FastRCNNPredictor(in_features, num_classes)

    # move model to the right device
    model.to(device)
    return model

def load_model(src, num_classes, device):
    model = get_model(num_classes, device)
    model.load_state_dict(torch.load(src))
    model.to(device)
    return model

In [None]:
# train on the GPU or on the CPU, if a GPU is not available
device = torch.device('cuda:1') if torch.cuda.is_available() else torch.device('cpu')

In [None]:
# get the model using our helper function
model = get_model(num_classes, device)

# construct an optimizer
params = [p for p in model.parameters() if p.requires_grad]
optimizer = torch.optim.SGD(
    params,
    lr=0.001,
    momentum=0.9,
    weight_decay=0.0005
)

# and a learning rate scheduler
lr_scheduler = torch.optim.lr_scheduler.StepLR(
    optimizer,
    step_size=1000,
    gamma=0.1
)

summary(model, (1, 3, 512, 512), depth=3, col_names = ("num_params", "mult_adds"), verbose = 0)

## model training

In [None]:
from engine import train_one_epoch, evaluate

def train(model, optimizer, device, num_epochs, train_loader, val_loader, project, name, print_freq):
    best_mAP = -1.00  # Initialize best_mAP to a very low value
    
    # --- result saving dir ---
    os.makedirs('runs', exist_ok=True)
    os.makedirs(f'runs/{project}', exist_ok=True)
    os.makedirs(f'runs/{project}/{name}')
    best_model_path = f'runs/{project}/{name}/best.pth'

    for epoch in range(1, num_epochs+1):
        # train for one epoch, printing every {print_freq} iterations
        train_result = train_one_epoch(model, optimizer, train_loader, device, epoch, print_freq=print_freq)
        # get training loss
        epoch_loss = train_result.get_meter('epoch_loss').avg
        # update the learning rate
        lr_scheduler.step()

        # evaluate on the val dataset (metrix beased)
        val_result = evaluate(model, val_loader, device=device, confident_threshold=0.5, area_threshold=100)
        # get mAP@50:95
        mAP = val_result.coco_eval['bbox'].stats[0]

        # Check if the current model is the best
        if best_mAP < mAP:
            best_mAP = mAP
            torch.save(model.state_dict(), best_model_path)
            print(f"Saved new best model with mAP: {mAP:.4f}")


In [None]:
num_epochs = 500
print_freq = 5

train(
    model=model,
    optimizer=optimizer,
    device=device,
    num_epochs=num_epochs,
    train_loader=train_loader,
    val_loader=val_loader,
    project='faster_rcnn',
    name='500ep_aug',
    print_freq=print_freq
)

## model evaluation

In [None]:
# load best model
model = load_model('runs/faster_rcnn/500ep_aug/best.pth', num_classes, device)

In [None]:
from engine import evaluate

train_dataset = CustomDataset(
    root='data/fold1_pascal',
    split='train',
    image_type='png',
    transforms=get_transform(train=False)
)

train_loader = torch.utils.data.DataLoader(
    train_dataset,
    batch_size=16,
    num_workers=1,
    shuffle=False,
    collate_fn=utils.collate_fn
)

print(f'Training set: #image = {len(train_dataset)}, batch size = {len(train_loader)}') 

# evaluate on the train dataset
result = evaluate(model, train_loader, device=device, confident_threshold=0.5, area_threshold=100)

In [None]:
val_dataset = CustomDataset(
    root='data/fold1_pascal',
    split='val',
    image_type='png',
    transforms=get_transform(train=False)
)

val_loader = torch.utils.data.DataLoader(
    val_dataset,
    batch_size=16,
    num_workers=1,
    shuffle=False,
    collate_fn=utils.collate_fn
)

print(f'Validation set: #image = {len(val_dataset)}, batch size = {len(val_loader)}') 

# evaluate on the valid dataset
result = evaluate(model, val_loader, device=device, confident_threshold=0.5, area_threshold=100)

In [None]:
test_dataset = CustomDataset(
    root='data/fold1_pascal',
    split='test',
    image_type='png',
    transforms=get_transform(train=False)
)

test_loader = torch.utils.data.DataLoader(
    test_dataset,
    batch_size=16,
    num_workers=1,
    shuffle=False,
    collate_fn=utils.collate_fn
)

print(f'Testing set: #image = {len(test_dataset)}, batch size = {len(test_loader)}') 

# evaluate on the test dataset
result = evaluate(model, test_loader, device=device, confident_threshold=0.5, area_threshold=100)

## model predicting

In [None]:
import matplotlib.pyplot as plt
from torchvision.utils import draw_bounding_boxes

def predict(model, image, device, iou_threshold=0.6):
    transforms = T.Compose([
        T.Resize(size=512),
        T.ToDtype(torch.float, scale=True),
        T.ToPureTensor()
    ])
    image = transforms(image)
    
    model.eval()
    with torch.no_grad():
        x = image
        # convert RGBA -> RGB and move to device
        x = x[:3, ...].to(device)
        predictions = model([x, ])
        pred = predictions[0]

    # denormalize a image
    image = (255.0 * (image - image.min()) / (image.max() - image.min())).to(torch.uint8)
    image = image[:3, ...]
    
    return image, pred

def visualize_box_result(i, image, result, color='red', width=2, figsize=(8, 8), axis='off', save=False, dst=None):
    # visualize a result
    if result["boxes"].shape[0] == 0:
        output_image = image
    else:
        result_labels = [f"cabbage: {score:.3f}" for label, score in zip(result["labels"], result["scores"])]
        result_boxes = result["boxes"].long()
        output_image = draw_bounding_boxes(image, result_boxes, result_labels, colors=color, width=width)
    output_image = output_image.permute(1, 2, 0)
    plt.figure(figsize=figsize)
    plt.imshow(output_image)
    plt.axis(axis)
    
#     print(output_image.numpy())
    os.makedirs(dst, exist_ok=True)
    plt.imsave(f'{dst}/image{i+1}.png', output_image.numpy())

def visualize_box_target_result(image, target, result, color=('blue', 'red'), width=2, figsize=(12, 12), axis='on'):
    plt.figure(figsize=figsize)
    
    # target
    target_box = draw_bounding_boxes(image, target['boxes'], colors=color[0], width=width)
    plt.subplot(121)
    plt.imshow(target_box.permute(1, 2, 0))
    plt.axis(axis)
    
    # result
    if result["boxes"].shape[0] == 0:
        output_box = image
    else:
        result_labels = [f"cabbage: {score:.3f}" for label, score in zip(result["labels"], result["scores"])]
        result_boxes = result["boxes"].long()
        output_box = draw_bounding_boxes(image, result_boxes, result_labels, colors=color[1], width=width, font_size=30)
    plt.subplot(122)
    plt.imshow(output_box.permute(1, 2, 0))
    plt.axis(axis)
    
    plt.show()
    
def visualize_box_both(image, target, result, color=('blue', 'red'), width=2, figsize=(8, 8), axis='on'):
    plt.figure(figsize=figsize)
    
    # target
    target_box = draw_bounding_boxes(image, target['boxes'], colors=color[0], width=width)
    
    # result
    result_labels = [f"cabbage: {score:.3f}" for label, score in zip(result["labels"], result["scores"])]
    result_boxes = result["boxes"].long()
    output_box = draw_bounding_boxes(target_box, result_boxes, result_labels, colors=color[1], width=width, font_size=30)
    
    plt.imshow(output_box.permute(1, 2, 0))
    plt.axis(axis)
    plt.show()

In [None]:
from engine import post_processing

classId2name = {
    0: 'background',
    1: 'cabbage'
}

test_dataset = CustomDataset(
    root='data/fold1_pascal',
    split='test',
    image_type='png',
    transforms=A.Compose([A.Resize(height=512, width=512, p=1)], 
                     bbox_params=A.BboxParams(format='pascal_voc', label_fields=['labels'], min_visibility=0.0, min_area=100))
)

confident_threshold = 0.5
area_threshold = 100

for i in range(len(test_dataset)):
    image, target = test_dataset[i]
    
    image, pred = predict(model, image, device)
    
    # post processing
    pred = post_processing([pred], confident_threshold, area_threshold)[0]
    
    # visualize a result
    visualize_box_result(i, image, pred, color='red', width=2, figsize=(8, 8), axis='off', save=True, dst='runs/faster_rcnn/500ep_aug/test_results')
    
    # visualize target and result
#     visualize_box_target_result(image, target, pred, color=('blue', 'red'), width=2, figsize=(12, 12), axis='off')   
#     visualize_box_both(image, target, pred, color=('blue', 'red'), width=2, figsize=(8, 8), axis='on')  

# RetinaNet

In [None]:
import torchvision
import torch
from torchvision.models.detection.retinanet import RetinaNetClassificationHead

In [None]:
def get_model(num_classes, device):
    # load an object detection model pre-trained on COCO
    model = torchvision.models.detection.retinanet.retinanet_resnet50_fpn(weights="COCO_V1")

    num_anchors = model.head.classification_head.num_anchors
    in_channels = model.head.classification_head.conv[0][0].in_channels
    
    model.head.classification_head = RetinaNetClassificationHead(
        in_channels=in_channels,
        num_anchors=num_anchors,
        num_classes=num_classes,
#         norm_layer=partial(torch.nn.GroupNorm, 32)
    )
    
    model.to(device)
    return model

def load_model(src, num_classes, device):
    model = get_model(num_classes, device)
    model.load_state_dict(torch.load(src))
    model.to(device)
    return model

In [None]:
device = torch.device('cuda:1')
num_classes = 2

# get the model using our helper function
model = get_model(num_classes, device)

# construct an optimizer
params = [p for p in model.parameters() if p.requires_grad]
optimizer = torch.optim.SGD(
    params,
    lr=0.001,
    momentum=0.9,
    weight_decay=0.0005
)

# and a learning rate scheduler
lr_scheduler = torch.optim.lr_scheduler.StepLR(
    optimizer,
    step_size=1000,
    gamma=0.1
)

summary(model, (1, 3, 512, 512), depth=3, col_names = ("num_params", "mult_adds"), verbose = 0)

## train

In [None]:
num_epochs = 500
print_freq = 5

train(
    model=model,
    optimizer=optimizer,
    device=device,
    num_epochs=num_epochs,
    train_loader=train_loader,
    val_loader=val_loader,
    project='retinalnet',
    name='500ep_aug',
    print_freq=print_freq
)

## evaluate

In [None]:
# load best model
model = load_model('runs/retinalnet/500ep_aug/best.pth', num_classes, device)

In [None]:
from engine import evaluate

train_dataset = CustomDataset(
    root='data/fold1_pascal',
    split='train',
    image_type='png',
    transforms=get_transform(train=False)
)

train_loader = torch.utils.data.DataLoader(
    train_dataset,
    batch_size=16,
    num_workers=1,
    shuffle=False,
    collate_fn=utils.collate_fn
)

print(f'Training set: #image = {len(train_dataset)}, batch size = {len(train_loader)}') 

# evaluate on the train dataset
result = evaluate(model, train_loader, device=device, confident_threshold=0.5, area_threshold=100)

In [None]:
val_dataset = CustomDataset(
    root='data/fold1_pascal',
    split='val',
    image_type='png',
    transforms=get_transform(train=False)
)

val_loader = torch.utils.data.DataLoader(
    val_dataset,
    batch_size=16,
    num_workers=1,
    shuffle=False,
    collate_fn=utils.collate_fn
)

print(f'Validation set: #image = {len(val_dataset)}, batch size = {len(val_loader)}') 

# evaluate on the valid dataset
result = evaluate(model, val_loader, device=device, confident_threshold=0.5, area_threshold=100)

In [None]:
test_dataset = CustomDataset(
    root='data/fold1_pascal',
    split='test',
    image_type='png',
    transforms=get_transform(train=False)
)

test_loader = torch.utils.data.DataLoader(
    test_dataset,
    batch_size=16,
    num_workers=1,
    shuffle=False,
    collate_fn=utils.collate_fn
)

print(f'Testing set: #image = {len(test_dataset)}, batch size = {len(test_loader)}') 

# evaluate on the test dataset
result = evaluate(model, test_loader, device=device, confident_threshold=0.5, area_threshold=100)

## predict

In [None]:
from engine import post_processing

classId2name = {
    0: 'background',
    1: 'cabbage'
}

test_dataset = CustomDataset(
    root='data/fold1_pascal',
    split='test',
    image_type='png',
    transforms=A.Compose([A.Resize(height=512, width=512, p=1)], 
                     bbox_params=A.BboxParams(format='pascal_voc', label_fields=['labels'], min_visibility=0.0, min_area=100))
)

confident_threshold = 0.5
area_threshold = 100

for i in range(len(test_dataset)):
    image, target = test_dataset[i]
    
    image, pred = predict(model, image, device)
    
    # post processing
    pred = post_processing([pred], confident_threshold, area_threshold)[0]
    
    # visualize a result
    visualize_box_result(i, image, pred, color='red', width=2, figsize=(8, 8), axis='off', save=True, dst='runs/retinalnet/500ep_aug/test_results')
    
    # visualize target and result
#     visualize_box_target_result(image, target, pred, color=('blue', 'red'), width=2, figsize=(12, 12), axis='off')   
#     visualize_box_both(image, target, pred, color=('blue', 'red'), width=2, figsize=(8, 8), axis='on')  

# SSD

In [None]:
import torchvision
from torchvision.models.detection import _utils
import torch
from torchvision.models.detection import ssd300_vgg16
from torchvision.models.detection.ssd import SSDClassificationHead

In [None]:
def get_model(device, num_classes=91, size=300):
    # Load the Torchvision pretrained model.
    model = ssd300_vgg16(weights="COCO_V1")
    
    # Retrieve the list of input channels. 
    in_channels = _utils.retrieve_out_channels(model.backbone, (size, size))
    # List containing number of anchors based on aspect ratios.
    num_anchors = model.anchor_generator.num_anchors_per_location()
    # The classification head.
    model.head.classification_head = SSDClassificationHead(
        in_channels=in_channels,
        num_anchors=num_anchors,
        num_classes=num_classes,
    )
    # Image size for transforms.
    model.transform.min_size = (size,)
    model.transform.max_size = size
    
    model.to(device)
    return model

def load_model(src, num_classes, device, size):
    model = get_model(num_classes=num_classes, device=device, size=size)
    model.load_state_dict(torch.load(src))
    model.to(device)
    return model

In [None]:
device = torch.device('cuda:1')
num_classes = 2

# get the model using our helper function
model = get_model(num_classes=num_classes, device=device, size=512)

# construct an optimizer
params = [p for p in model.parameters() if p.requires_grad]
optimizer = torch.optim.SGD(
    params,
    lr=0.001,
    momentum=0.9,
    weight_decay=0.0005
)

# and a learning rate scheduler
lr_scheduler = torch.optim.lr_scheduler.StepLR(
    optimizer,
    step_size=1000,
    gamma=0.1
)

summary(model, (1, 3, 512, 512), depth=3, col_names = ("num_params", "mult_adds"), verbose = 0)

## train

In [None]:
num_epochs = 500
print_freq = 5

train(
    model=model,
    optimizer=optimizer,
    device=device,
    num_epochs=num_epochs,
    train_loader=train_loader,
    val_loader=val_loader,
    project='ssd',
    name='500ep_aug',
    print_freq=print_freq
)

## evaluate

In [None]:
# load best model
model = load_model('runs/ssd/500ep_aug/best.pth', num_classes, device, 512)

In [None]:
from engine import evaluate

train_dataset = CustomDataset(
    root='data/fold1_pascal',
    split='train',
    image_type='png',
    transforms=get_transform(train=False)
)

train_loader = torch.utils.data.DataLoader(
    train_dataset,
    batch_size=16,
    num_workers=1,
    shuffle=False,
    collate_fn=utils.collate_fn
)

print(f'Training set: #image = {len(train_dataset)}, batch size = {len(train_loader)}') 

# evaluate on the train dataset
result = evaluate(model, train_loader, device=device, confident_threshold=0.5, area_threshold=100)

In [None]:
val_dataset = CustomDataset(
    root='data/fold1_pascal',
    split='val',
    image_type='png',
    transforms=get_transform(train=False)
)

val_loader = torch.utils.data.DataLoader(
    val_dataset,
    batch_size=16,
    num_workers=1,
    shuffle=False,
    collate_fn=utils.collate_fn
)

print(f'Validation set: #image = {len(val_dataset)}, batch size = {len(val_loader)}') 

# evaluate on the valid dataset
result = evaluate(model, val_loader, device=device, confident_threshold=0.5, area_threshold=100)

In [None]:
test_dataset = CustomDataset(
    root='data/fold1_pascal',
    split='test',
    image_type='png',
    transforms=get_transform(train=False)
)

test_loader = torch.utils.data.DataLoader(
    test_dataset,
    batch_size=16,
    num_workers=1,
    shuffle=False,
    collate_fn=utils.collate_fn
)

print(f'Testing set: #image = {len(test_dataset)}, batch size = {len(test_loader)}') 

# evaluate on the test dataset
result = evaluate(model, test_loader, device=device, confident_threshold=0.5, area_threshold=100)

## predict

In [None]:
from engine import post_processing

classId2name = {
    0: 'background',
    1: 'cabbage'
}

test_dataset = CustomDataset(
    root='data/fold1_pascal',
    split='test',
    image_type='png',
    transforms=A.Compose([A.Resize(height=512, width=512, p=1)], 
                     bbox_params=A.BboxParams(format='pascal_voc', label_fields=['labels'], min_visibility=0.0, min_area=100))
)

confident_threshold = 0.5
area_threshold = 100

for i in range(len(test_dataset)):
    image, target = test_dataset[i]
    
    image, pred = predict(model, image, device)
    
    # post processing
    pred = post_processing([pred], confident_threshold, area_threshold)[0]
    
    # visualize a result
    visualize_box_result(i, image, pred, color='red', width=2, figsize=(8, 8), axis='off', save=True, dst='runs/ssd/500ep_aug/test_results')
    
    # visualize target and result
#     visualize_box_target_result(image, target, pred, color=('blue', 'red'), width=2, figsize=(12, 12), axis='off')   
#     visualize_box_both(image, target, pred, color=('blue', 'red'), width=2, figsize=(8, 8), axis='on')  

# SSDlite

In [None]:
import torchvision
import torch
from functools import partial
from torch import nn
from torchvision.models.detection import _utils
from torchvision.models.detection import ssdlite320_mobilenet_v3_large
from torchvision.models.detection.ssdlite import SSDLiteClassificationHead

In [None]:
def get_model(device, num_classes=91, size=320):
    # Load the Torchvision pretrained model.
    model = ssdlite320_mobilenet_v3_large(weights="COCO_V1")
    
    # Retrieve the list of input channels. 
    in_channels = _utils.retrieve_out_channels(model.backbone, (size, size))
    # List containing number of anchors based on aspect ratios.
    num_anchors = model.anchor_generator.num_anchors_per_location()
    # The classification head.
    model.head.classification_head = SSDLiteClassificationHead(
        in_channels=in_channels,
        num_anchors=num_anchors,
        num_classes=num_classes,
        norm_layer=partial(nn.BatchNorm2d, eps=0.001, momentum=0.03)
    )
    # Image size for transforms.
    model.transform.min_size = (size,)
    model.transform.max_size = size
    
    model.to(device)
    return model

def load_model(src, num_classes, device, size):
    model = get_model(num_classes=num_classes, device=device, size=size)
    model.load_state_dict(torch.load(src))
    model.to(device)
    return model

In [None]:
device = torch.device('cuda:1')
num_classes = 2

# get the model using our helper function
model = get_model(num_classes=num_classes, device=device, size=512)

# construct an optimizer
params = [p for p in model.parameters() if p.requires_grad]
optimizer = torch.optim.SGD(
    params,
    lr=0.001,
    momentum=0.9,
    weight_decay=0.0005
)

# and a learning rate scheduler
lr_scheduler = torch.optim.lr_scheduler.StepLR(
    optimizer,
    step_size=1000,
    gamma=0.1
)

summary(model, (1, 3, 512, 512), depth=3, col_names = ("num_params", "mult_adds"), verbose = 0)

## train

In [None]:
num_epochs = 500
print_freq = 5

train(
    model=model,
    optimizer=optimizer,
    device=device,
    num_epochs=num_epochs,
    train_loader=train_loader,
    val_loader=val_loader,
    project='ssdlite',
    name='500ep_aug',
    print_freq=print_freq
)

## evaluate

In [None]:
# load best model
model = load_model('runs/ssdlite/500ep_aug/best.pth', num_classes, device, 512)

In [None]:
from engine import evaluate

train_dataset = CustomDataset(
    root='data/fold1_pascal',
    split='train',
    image_type='png',
    transforms=get_transform(train=False)
)

train_loader = torch.utils.data.DataLoader(
    train_dataset,
    batch_size=16,
    num_workers=1,
    shuffle=False,
    collate_fn=utils.collate_fn
)

print(f'Training set: #image = {len(train_dataset)}, batch size = {len(train_loader)}') 

# evaluate on the train dataset
result = evaluate(model, train_loader, device=device, confident_threshold=0.5, area_threshold=100)

In [None]:
val_dataset = CustomDataset(
    root='data/fold1_pascal',
    split='val',
    image_type='png',
    transforms=get_transform(train=False)
)

val_loader = torch.utils.data.DataLoader(
    val_dataset,
    batch_size=16,
    num_workers=1,
    shuffle=False,
    collate_fn=utils.collate_fn
)

print(f'Validation set: #image = {len(val_dataset)}, batch size = {len(val_loader)}') 

# evaluate on the valid dataset
result = evaluate(model, val_loader, device=device, confident_threshold=0.5, area_threshold=100)

In [None]:
test_dataset = CustomDataset(
    root='data/fold1_pascal',
    split='test',
    image_type='png',
    transforms=get_transform(train=False)
)

test_loader = torch.utils.data.DataLoader(
    test_dataset,
    batch_size=16,
    num_workers=1,
    shuffle=False,
    collate_fn=utils.collate_fn
)

print(f'Testing set: #image = {len(test_dataset)}, batch size = {len(test_loader)}') 

# evaluate on the test dataset
result = evaluate(model, test_loader, device=device, confident_threshold=0.5, area_threshold=100)

## predict

In [None]:
from engine import post_processing

classId2name = {
    0: 'background',
    1: 'cabbage'
}

test_dataset = CustomDataset(
    root='data/fold1_pascal',
    split='test',
    image_type='png',
    transforms=A.Compose([A.Resize(height=512, width=512, p=1)], 
                     bbox_params=A.BboxParams(format='pascal_voc', label_fields=['labels'], min_visibility=0.0, min_area=100))
)

confident_threshold = 0.5
area_threshold = 100

for i in range(len(test_dataset)):
    image, target = test_dataset[i]
    
    image, pred = predict(model, image, device)
    
    # post processing
    pred = post_processing([pred], confident_threshold, area_threshold)[0]
    
    # visualize a result
    visualize_box_result(i, image, pred, color='red', width=2, figsize=(8, 8), axis='off', save=True, dst='runs/ssdlite/500ep_aug/test_results')
    
    # visualize target and result
#     visualize_box_target_result(image, target, pred, color=('blue', 'red'), width=2, figsize=(12, 12), axis='off')   
#     visualize_box_both(image, target, pred, color=('blue', 'red'), width=2, figsize=(8, 8), axis='on')  

# FCOS

In [None]:
import torchvision
import torch
from torchvision.models.detection import fcos_resnet50_fpn
from torchvision.models.detection.fcos  import FCOSClassificationHead

In [None]:
def get_model(num_classes, device):
    # load an object detection model pre-trained on COCO
    model = fcos_resnet50_fpn(weights="COCO_V1")

    num_anchors = model.head.classification_head.num_anchors
    in_channels = model.head.classification_head.conv[0].in_channels
    
    model.head.classification_head = FCOSClassificationHead(
        in_channels=in_channels,
        num_anchors=num_anchors,
        num_classes=num_classes
    )
    
    model.to(device)
    return model

def load_model(src, num_classes, device):
    model = get_model(num_classes, device)
    model.load_state_dict(torch.load(src))
    model.to(device)
    return model

In [None]:
device = torch.device('cuda:1')
num_classes = 2

# get the model using our helper function
model = get_model(num_classes, device)

# construct an optimizer
params = [p for p in model.parameters() if p.requires_grad]
optimizer = torch.optim.SGD(
    params,
    lr=0.001,
    momentum=0.9,
    weight_decay=0.0005
)

# and a learning rate scheduler
lr_scheduler = torch.optim.lr_scheduler.StepLR(
    optimizer,
    step_size=1000,
    gamma=0.1
)

summary(model, (1, 3, 512, 512), depth=3, col_names = ("num_params", "mult_adds"), verbose = 0)

# train

In [None]:
num_epochs = 500
print_freq = 5

train(
    model=model,
    optimizer=optimizer,
    device=device,
    num_epochs=num_epochs,
    train_loader=train_loader,
    val_loader=val_loader,
    project='fcos',
    name='500ep_aug',
    print_freq=print_freq
)

## evaluate

In [None]:
# load best model
model = load_model('runs/fcos/500ep_aug/best.pth', num_classes, device)

In [None]:
from engine import evaluate

train_dataset = CustomDataset(
    root='data/fold1_pascal',
    split='train',
    image_type='png',
    transforms=get_transform(train=False)
)

train_loader = torch.utils.data.DataLoader(
    train_dataset,
    batch_size=16,
    num_workers=1,
    shuffle=False,
    collate_fn=utils.collate_fn
)

print(f'Training set: #image = {len(train_dataset)}, batch size = {len(train_loader)}') 

# evaluate on the train dataset
result = evaluate(model, train_loader, device=device, confident_threshold=0.5, area_threshold=100)

In [None]:
val_dataset = CustomDataset(
    root='data/fold1_pascal',
    split='val',
    image_type='png',
    transforms=get_transform(train=False)
)

val_loader = torch.utils.data.DataLoader(
    val_dataset,
    batch_size=16,
    num_workers=1,
    shuffle=False,
    collate_fn=utils.collate_fn
)

print(f'Validation set: #image = {len(val_dataset)}, batch size = {len(val_loader)}') 

# evaluate on the valid dataset
result = evaluate(model, val_loader, device=device, confident_threshold=0.5, area_threshold=100)

In [None]:
test_dataset = CustomDataset(
    root='data/fold1_pascal',
    split='test',
    image_type='png',
    transforms=get_transform(train=False)
)

test_loader = torch.utils.data.DataLoader(
    test_dataset,
    batch_size=16,
    num_workers=1,
    shuffle=False,
    collate_fn=utils.collate_fn
)

print(f'Testing set: #image = {len(test_dataset)}, batch size = {len(test_loader)}') 

# evaluate on the test dataset
result = evaluate(model, test_loader, device=device, confident_threshold=0.5, area_threshold=100)

## predict

In [None]:
from engine import post_processing

classId2name = {
    0: 'background',
    1: 'cabbage'
}

test_dataset = CustomDataset(
    root='data/fold1_pascal',
    split='test',
    image_type='png',
    transforms=A.Compose([A.Resize(height=512, width=512, p=1)], 
                     bbox_params=A.BboxParams(format='pascal_voc', label_fields=['labels'], min_visibility=0.0, min_area=100))
)

confident_threshold = 0.5
area_threshold = 100

for i in range(len(test_dataset)):
    image, target = test_dataset[i]
    
    image, pred = predict(model, image, device)
    
    # post processing
    pred = post_processing([pred], confident_threshold, area_threshold)[0]
    
    # visualize a result
    visualize_box_result(i, image, pred, color='red', width=2, figsize=(8, 8), axis='off', save=True, dst='runs/fcos/500ep_aug/test_results')
    
    # visualize target and result
#     visualize_box_target_result(image, target, pred, color=('blue', 'red'), width=2, figsize=(12, 12), axis='off')   
#     visualize_box_both(image, target, pred, color=('blue', 'red'), width=2, figsize=(8, 8), axis='on')  