# Import Libraries and Define Directories

In [None]:
# Import Library
import os
import re
import time
import numpy as np
import pandas as pd
import cv2
import matplotlib.pyplot as plt
import torch
import torchvision
from torch.utils.data import DataLoader, Dataset
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor
import albumentations as A
from albumentations.pytorch import ToTensorV2

# Define Directories
DIR_INPUT = ''
DIR_TRAIN = '/kaggle/input/leaf-detection/train'
DIR_TEST = '/kaggle/input/leaf-detection/test'
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')

# Read and Parse the CSV file

In [None]:
# Reading and parsing the CSV
train_df = pd.read_csv(os.path.join(DIR_INPUT, "/kaggle/input/leaf-detection/train.csv"))
train_df['x'] = -1
train_df['y'] = -1
train_df['w'] = -1
train_df['h'] = -1

def expand_bbox(x):
    r = np.array(re.findall("([0-9]+[.]?[0-9]*)", x))
    if len(r) == 0:
        r = [-1, -1, -1, -1]
    return r

train_df[['x', 'y', 'w', 'h']] = np.stack(train_df['bbox'].apply(lambda x: expand_bbox(x)))
train_df.drop(columns=['bbox'], inplace=True)
train_df['x'] = train_df['x'].astype(float)
train_df['y'] = train_df['y'].astype(float)
train_df['w'] = train_df['w'].astype(float)
train_df['h'] = train_df['h'].astype(float)
image_ids = train_df['image_id'].unique()
valid_ids = image_ids[-4:]
valid_ids = np.append(valid_ids, image_ids[:4])
train_ids = image_ids[4:-4]

valid_df = train_df[train_df['image_id'].isin(valid_ids)]
train_df = train_df[train_df['image_id'].isin(train_ids)]

valid_df.shape, train_df.shape

# Define the Dataset Class

In [None]:
class LeafDataset(Dataset):
    def __init__(self, dataframe, image_dir, transforms=None):
        super().__init__()

        self.image_ids = dataframe['image_id'].unique()
        self.df = dataframe
        self.image_dir = image_dir
        self.transforms = transforms

    def __getitem__(self, index: int):
        image_id = self.image_ids[index]
        records = self.df[self.df['image_id'] == image_id]

        image = cv2.imread(f'{self.image_dir}/{image_id}', cv2.IMREAD_COLOR)
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB).astype(np.float32)  # Use RGB color space
        image /= 255.0

        boxes = records[['x', 'y', 'w', 'h']].values.astype(np.float32)  # Ensure boxes are float32
        boxes[:, 2] = boxes[:, 0] + boxes[:, 2]
        boxes[:, 3] = boxes[:, 1] + boxes[:, 3]

        area = (boxes[:, 3] - boxes[:, 1]) * (boxes[:, 2] - boxes[:, 0])
        area = torch.as_tensor(area, dtype=torch.float32)

        # there is only one class
        labels = torch.ones((records.shape[0],), dtype=torch.int64)

        # suppose all instances are not crowd
        iscrowd = torch.zeros((records.shape[0],), dtype=torch.int64)

        target = {}
        target['boxes'] = torch.as_tensor(boxes, dtype=torch.float32)  # Ensure boxes are float32
        target['labels'] = labels
        target['image_id'] = torch.tensor([index])
        target['area'] = area
        target['iscrowd'] = iscrowd

        if self.transforms:
            sample = {
                'image': image,
                'bboxes': target['boxes'],
                'labels': labels
            }
            sample = self.transforms(**sample)
            image = sample['image']

            target['boxes'] = torch.stack(tuple(map(torch.tensor, zip(*sample['bboxes'])))).permute(1, 0)

        return image, target, image_id

    def __len__(self) -> int:
        return self.image_ids.shape[0]

# Define Transformations and DataLoader

In [None]:
# This Albumentation for now it is empty.
def transform():
    return A.Compose([
        ToTensorV2(p=1.0),
    ], bbox_params={'format': 'pascal_voc', 'label_fields': ['labels']})

def collate_fn(batch):
    return tuple(zip(*batch))

train_dataset = LeafDataset(train_df, DIR_TRAIN, transform())
valid_dataset = LeafDataset(valid_df, DIR_TRAIN, transform())

train_data_loader = DataLoader(
    train_dataset,
    batch_size=16,
    shuffle=False,
    num_workers=4,
    collate_fn=collate_fn
)

valid_data_loader = DataLoader(
    valid_dataset,
    batch_size=1,
    shuffle=False,
    num_workers=4,
    collate_fn=collate_fn
)

# Define the Averager Class

In [None]:
class Averager:
    def __init__(self):
        self.current_total = 0.0
        self.iterations = 0.0

    def send(self, value):
        self.current_total += value
        self.iterations += 1

    @property
    def value(self):
        if self.iterations == 0:
            return 0
        else:
            return 1.0 * self.current_total / self.iterations

    def reset(self):
        self.current_total = 0.0
        self.iterations = 0.0

# Define Helper Functions

In [None]:
# HELPER FUNCTIONS FOR VIZUALISING / PREDICTING

def get_boxes(tensor, index, score=0.5):
    if index >= len(tensor) or index < 0:
        return 0

    temp_boxes = []
    for i in range(len(tensor[index]['boxes'])):
        if tensor[index]['scores'][i] > score:
            temp_boxes.append(tensor[index]['boxes'][i].cpu().detach().numpy().astype(np.int32))

    return temp_boxes

def get_sample_image(itr):
    images, targets, image_ids = next(itr)
    images = list(image.to(device) for image in images)
    targets = [{k: v.to(device) for k, v in t.items()} for t in targets]

    boxes = targets[0]['boxes'].cpu().numpy().astype(np.int32)
    sample = images[0].permute(1, 2, 0).cpu().numpy()
    
    fig, ax = plt.subplots(1, 1, figsize=(16, 8))

    for box in boxes:
        cv2.rectangle(sample,
                      (box[0], box[1]),
                      (box[2], box[3]),
                      (0, 255, 0), 2)  # Use green color for bounding boxes

    ax.set_axis_off()
    ax.imshow(sample)

def get_validation_image(itr):
    images, targets, image_ids = next(itr)
    images = list(img.to(device) for img in images)
    targets = [{k: v.to(device) for k, v in t.items()} for t in targets]

    boxes = targets[0]['boxes'].cpu().numpy().astype(np.int32)
    sample = images[0].permute(1, 2, 0).cpu().numpy()

    model.eval()

    outputs = model(images)
    outputs = [{k: v.to(device) for k, v in t.items()} for t in outputs]
    boxes = get_boxes(outputs, 0)

    fig, ax = plt.subplots(1, 1, figsize=(16, 8))

    for box in boxes:
        cv2.rectangle(sample,
                      (box[0], box[1]),
                      (box[2], box[3]),
                      (0, 255, 0), 2)  # Use green color for bounding boxes

    ax.set_axis_off()
    ax.imshow(sample)

# Define Test Dataset Loader

In [None]:
def load_test_dataset():
    data_path = DIR_TEST
    test_dataset = torchvision.datasets.ImageFolder(
        root=data_path,
        transform=torchvision.transforms.Compose([
            torchvision.transforms.ToTensor(),
        ])
    )

    test_loader = torch.utils.data.DataLoader(
        test_dataset,
        batch_size=1,
        num_workers=1,
        shuffle=False
    )
    return test_loader

def get_test_image(itr, score=0.5):
    image, targets = next(itr)
    sample = image

    image = image.to(device)
    model.eval()
    outputs = model(image)

    outputs = [{k: v.to(device) for k, v in t.items()} for t in outputs]

    boxes = get_boxes(outputs, 0, score)

    fig, ax = plt.subplots(1, 1, figsize=(16, 8))
    print(sample.shape)
    img = sample[0].permute(1, 2, 0).cpu().numpy()
    print(img.shape)

    img = np.array(img)
    print(img.shape)
    for box in boxes:
        x, y, w, h = box

        cv2.rectangle(np.float32(img),
                      (int(box[0]), int(box[1])),
                      (int(box[2]), int(box[3])),
                      (0, 255, 0), 2)  # Use green color for bounding boxes
    ax.set_axis_off()
    ax.imshow(img)

# Sample Training Data Augmentation and Model Initialization

In [None]:
# Sample of training data augmented

it = iter(train_data_loader)
get_sample_image(it)
get_sample_image(it)
get_sample_image(it)
get_sample_image(it)
get_sample_image(it)
get_sample_image(it)
get_sample_image(it)
get_sample_image(it)

# Loading ResNet50 trained on COCO
model = torchvision.models.detection.fasterrcnn_resnet50_fpn(pretrained=True)

num_classes = 2  # 1 class (leaf) + background

# get number of input features for the classifier
in_features = model.roi_heads.box_predictor.cls_score.in_features

# replace the pre-trained head with a new one
model.roi_heads.box_predictor = FastRCNNPredictor(in_features, num_classes)

model.to(device)
print("Model loaded")

# Define Accuracy Calculation Function

In [None]:
# Function to calculate accuracy
def calculate_accuracy(model, data_loader, device):
    model.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for images, targets, image_ids in data_loader:
            images = list(img.to(device) for img in images)
            outputs = model(images)
            outputs = [{k: v.to(device) for k, v in t.items()} for t in outputs]

            for output, target in zip(outputs, targets):
                boxes = output['boxes'].cpu().numpy()
                gt_boxes = target['boxes'].cpu().numpy()
                if len(boxes) > 0:
                    iou_matrix = torchvision.ops.box_iou(torch.tensor(boxes), torch.tensor(gt_boxes))
                    max_iou, _ = iou_matrix.max(dim=1)
                    correct += (max_iou > 0.5).sum().item()
                total += len(gt_boxes)
    accuracy = correct / total if total > 0 else 0
    return accuracy

# Training Loop

In [None]:
# Training
params = [p for p in model.parameters() if p.requires_grad]
optimizer = torch.optim.SGD(params, lr=0.005, momentum=0.9, weight_decay=0.0005)
# lr_scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=3, gamma=0.1)
lr_scheduler = None

num_epochs = 15
loss_hist = Averager()
itr = 1

previous_epoch = 1000
es_rate = 0

es_threshold = 2 # How many epochs without improvement to early stop

for epoch in range(num_epochs):
    loss_hist.reset()
    min_loss = 1000
    
    for images, targets, image_ids in train_data_loader:
        
        images = list(image.to(device) for image in images)
        targets = [{k: v.to(device) for k, v in t.items()} for t in targets]

        model.train()
        loss_dict = model(images, targets)

        losses = sum(loss for loss in loss_dict.values())
        loss_value = losses.item()

        loss_hist.send(loss_value)

        optimizer.zero_grad()
        losses.backward()
        optimizer.step()
        
        if itr % 50 == 0:
            print(f"Iteration #{itr} loss: {loss_value}")
        
        itr += 1
                
    # update the learning rate
    if lr_scheduler is not None:
        lr_scheduler.step()
    min_loss = loss_hist.value
    
    if min_loss < previous_epoch:
        previous_epoch = min_loss
        es_rate = 0
        
    else:
        if es_rate < es_threshold:
            es_rate += 1
        elif es_rate >= es_threshold:
            break
    
    # Calculate accuracy on validation set
    accuracy = calculate_accuracy(model, valid_data_loader, device)
    print(f"Epoch #{epoch} loss: {loss_hist.value} accuracy: {accuracy}")

# Validation and Testing

In [None]:
# Validation (On data from Training)
it = iter(valid_data_loader)
get_validation_image(it)
get_validation_image(it)
get_validation_image(it)
get_validation_image(it)
get_validation_image(it)
get_validation_image(it)
get_validation_image(it)
get_validation_image(it)

# Testing
image_list = os.listdir(DIR_TEST + "/leaf")
print(image_list)
it = iter(load_test_dataset())

start = time.time()
get_test_image(it, 0.5)
print(time.time() - start)
start = time.time()
get_test_image(it, 0.5)
print(time.time() - start)
start = time.time()
get_test_image(it, 0.5)
print(time.time() - start)
start = time.time()
get_test_image(it, 0.5)
print(time.time() - start)
start = time.time()
get_test_image(it, 0.5)
print(time.time() - start)
start = time.time()
get_test_image(it, 0.50)
print(time.time() - start)
start = time.time()
get_test_image(it, 0.50)
print(time.time() - start)
torch.save(model, 'leaves_fasterrcnn_model.pth')