This notebook is for training each sub data set separately

In [None]:
import torch
import torchvision
import random
import h5py
import cv2 as cv
import numpy as np
import matplotlib.pyplot as plt
import torch.optim as optim
from pathlib import Path
from tqdm import tqdm
from PIL import Image
from torchvision.utils import draw_bounding_boxes
from torchvision.transforms.functional import to_pil_image

In [None]:
path = Path('/home/tam/') / 'git' / 'cvppp2017_dataset'

In [None]:
training = h5py.File(path / 'CVPPP2017_training_images.h5')
truth = h5py.File(path / 'CVPPP2017_training_truth.h5')

In [None]:
device = torch.device('cuda:0')

In [None]:
dataset = 'A1' # select the sub data set A1, A2, A3 or A4

The section below is to extract the boundary boxes for training dataset

In [None]:
targets = []
images = []
for plant_id in tqdm(training[dataset].keys()):
    boxes = []
    for leaf_id in range(1, np.unique(truth[dataset][plant_id]['label'][()]).max()+1):
        mask = (truth[dataset][plant_id]['label'][()] == leaf_id).astype(np.uint8)
        contours, hierarchy = cv.findContours(mask, cv.RETR_TREE, cv.CHAIN_APPROX_SIMPLE)
        x, y, w, h = cv.boundingRect(contours[0])
        # print(torch.tensor([x, y, x+w, y+h], dtype=torch.float))
        boxes.append([x, y, x+w, y+h])

    image = Image.fromarray(training[dataset][plant_id]['rgb'][:, :, :3])
    images.append(image)

    targets.append({
        'boxes': torch.tensor(boxes, dtype=torch.float).to(device),
        'labels': torch.ones(len(boxes), dtype=torch.int64).to(device)
    })


In [None]:
weights = torchvision.models.detection.FCOS_ResNet50_FPN_Weights.DEFAULT
transform = weights.transforms()

In [None]:
class Dataset(torch.utils.data.Dataset):
    def __init__(self, images, targets, transform):
        self.images = images
        self.targets = targets
        self.transform = transform

    def __len__(self):
        return len(self.targets)

    def __getitem__(self, index):
        X = self.images[index]
        y = self.targets[index]

        return self.transform(X), y

In [None]:
training_set = Dataset(images, targets, transform)
train_loader = torch.utils.data.DataLoader(training_set, batch_size=1, shuffle=True)

The section before is to draw the ground truth boxex for reference

In [None]:
for image_id, plant_id in enumerate(training[dataset].keys()):
    box = draw_bounding_boxes(
        torchvision.transforms.Compose([torchvision.transforms.PILToTensor()])(images[image_id]), 
        targets[image_id]['boxes'],
        # [weights.meta['categories'][i] for i in result[plant_id][0]["labels"]],
        colors='red')
    to_pil_image(box.detach()).save(path / 'result' / f'{dataset}-{plant_id}-groundtruth.png')
    

Since the normal data loader is not working as FCOS expects a list as input parameters. 
I created a new data load by slicing a list.
Then then feed the training set to re-train the model.
At the end, boxes are drawn based on the prediction

In [None]:
max_epoch = 50
training_ratio = 0.7
order = [*range(len(training_set))]
random.shuffle(order)
n_training_data = int(len(training_set)*training_ratio)
training_order = order[:n_training_data]
validate_order = order[n_training_data:]
for batch_size in [1, 5, 10]:
    torch.cuda.empty_cache()
    model = torchvision.models.detection.fcos_resnet50_fpn(
        num_classes=2, trainable_backbone_layers=1
    ).to(device)
    params_to_update = []
    for param in model.parameters():
        if param.requires_grad == True:
            params_to_update.append(param)
    optimizer = optim.SGD(params_to_update, lr=1e-3, momentum=0.9)
    training_losses = []
    evaluation_losses = []
    model.train();
    for epoch in tqdm(range(max_epoch)):
        total_loss = 0
        batch_order = training_order.copy()
        random.shuffle(batch_order)
        while len(batch_order) > 0:
            image_batch = []
            target_batch = []
            if len(batch_order) < batch_size:
                current_batch = batch_order.copy()
            else:
                current_batch = batch_order[:batch_size].copy()

            for i in current_batch:
                image, target = training_set[i]
                image = image.to(device)
                image_batch.append(image)
                target_batch.append(target)
                batch_order.remove(i)
        
            optimizer.zero_grad()
            output = model(image_batch, target_batch)
            loss = output['bbox_regression']
            loss.backward()
            optimizer.step()
            total_loss += float(loss.item())*len(current_batch)

        training_losses.append(total_loss)
        total_loss = 0
        with torch.no_grad():
            for i in validate_order:
                image, target = training_set[i]
                image = image.to(device)
                output = model([image], [target])
                loss = output['bbox_regression']
                total_loss += float(loss.item())
        evaluation_losses.append(total_loss)
    
    model.eval();
    with torch.no_grad():
        for image_id, plant_id in enumerate(training[dataset].keys()):
            image, target = training_set[image_id]
            prediction = model([image.to(device)])
            box = draw_bounding_boxes(
                torchvision.transforms.Compose([torchvision.transforms.PILToTensor()])(images[image_id]), 
                prediction[0]['boxes'],
                # [weights.meta['categories'][i] for i in result[plant_id][0]["labels"]],
                colors='red')
            to_pil_image(box.detach()).save(path / 'result' / 
            f'{dataset}-{plant_id}-batch-size={batch_size}-{max_epoch}-new.png')

In [None]:
fig, ax = plt.subplots(dpi=150)
ax.plot(np.log10(training_losses), label='Training')
ax.plot(np.log10(evaluation_losses), label='Validation')
plt.legend(loc='best');
