<a href="https://www.kaggle.com/code/sonujha090/global-wheat-detection-pytorch?scriptVersionId=115151610" target="_blank"><img align="left" alt="Kaggle" title="Open in Kaggle" src="https://kaggle.com/static/images/open-in-kaggle.svg"></a>

# Aim: To Create A Pipeline For Object Detection With Pytorch

## Importing the necessary Libraries

In [None]:
# Download TorchVision repo to use some files from
# references/detection
!pip install pycocotools --quiet
!git clone https://github.com/pytorch/vision.git
!git checkout v0.3.0

!cp vision/references/detection/utils.py ./
!cp vision/references/detection/transforms.py ./
!cp vision/references/detection/coco_eval.py ./
!cp vision/references/detection/engine.py ./
!cp vision/references/detection/coco_utils.py ./

In [None]:
import torch, torchvision
from torch.utils.data import Dataset, DataLoader

In [None]:
import numpy as np 
import pandas as pd 
from pathlib import Path
import matplotlib.pyplot as plt
import matplotlib.patches as patches
import ast 
from PIL import Image
import os 

# Data Preprocessing

In [None]:
path = Path('/kaggle/input/global-wheat-detection')

In [None]:
df = pd.read_csv(path/'train.csv')
df.shape

In [None]:
df.head()

In [None]:
cvt2list = lambda x: ast.literal_eval(x) 
df['bbox'] = df['bbox'].apply(cvt2list)

In [None]:
conv = lambda x: [x[0], x[1], x[0]+x[2], x[1]+x[3]]
df['bbox'] = df['bbox'].apply(conv)

In [None]:
df.head()

**Unique Images**

In [None]:
df_processed= df.groupby('image_id')['bbox'].apply(list).reset_index(name='bboxes')

In [None]:
df_processed.head()

**Splitting the dataset into train and valid set**

In [None]:
from sklearn.model_selection import train_test_split

# Split the data into independent and dependent variables
X = df_processed.drop(columns=['bboxes'])
y = df_processed['bboxes']

# Split the data into train and validation sets
X_train, X_valid, y_train, y_valid = train_test_split(X, y, test_size=0.2, random_state=42)

# Create the train and validation dataframes
train_df = pd.concat([X_train, y_train], axis=1)
valid_df = pd.concat([X_valid, y_valid], axis=1)
train_df.shape, valid_df.shape

In [None]:
# # taking a small sample for experiment
# train_df = train_df.sample(frac=0.1)
# valid_df = valid_df.sample(frac=0.1)
# train_df.shape, valid_df.shape

# Pytorch DataLoader

In [None]:
class WheatDataset(Dataset):
    def __init__(self, df, root, transform=None):
        self.df = df
        self.root = Path(root)
        self.transforms = transform
        self.image_ids = self.df.image_id.unique()
    
    def __len__(self):
        return len(self.df)
    
    def __getitem__(self, idx):
        img_id, bboxes = self.df.iloc[idx]
        img_path = os.path.join(self.root, img_id+'.jpg')
        img = Image.open(img_path)
        boxes = []
        areas = []
        for bbox in bboxes:
            x0 = bbox[0]
            y0 = bbox[1]
            x1 = bbox[2]
            y1 = bbox[3]
            boxes.append([x0, y0, x1, y1])
            areas.append((x1-x0)*(y1-y0))
            
        boxes = np.array(boxes)
        boxes = torch.tensor(bboxes, dtype=torch.float32)
        areas= torch.tensor(areas)
        iscrowd = torch.zeros((boxes.shape[0],), dtype=torch.int64)
        target = {'boxes': boxes, 'labels':torch.ones(len(boxes), dtype=torch.int64), 'image_id':torch.tensor(idx), 'area': areas, 'iscrowd': iscrowd}

        if self.transforms is not None:
            img = self.transforms(img)
            
        return img, target

In [None]:
ds = WheatDataset(train_df, path/'train')

# Visualize a dataset

In [None]:
def plot_bboxes(img, target):
    # Get the image and bounding box information
    image = img
    bboxes = target['boxes']
    category_idx = target['labels']
    category_names = ['0', '1']

    # Create a figure and axis
    fig, ax = plt.subplots(figsize=(10,10))
    

    # Display the image
    ax.imshow(image)

    # Add a bounding box for each object in the image
    for bbox, idx in zip(bboxes, category_idx):
        x, y, width, height = bbox
        rect = patches.Rectangle((x, y), width, height, linewidth=2, edgecolor='r', facecolor='none')
        ax.add_patch(rect)
#         ax.text(x, y, category_names[idx], color='w', fontsize=12, bbox=dict(facecolor='r', alpha=0.5))

    plt.show()

In [None]:
img, target = ds[1]
plot_bboxes(img, target)

**Transform**

In [None]:
import torchvision.transforms as T

def get_transform(train):
    transforms = []
    transforms.append(T.PILToTensor())
    transforms.append(T.ConvertImageDtype(torch.float))
    if train:
        transforms.append(T.RandomHorizontalFlip(0.5))
    return T.Compose(transforms)

In [None]:
train_ds = WheatDataset(train_df, path/'train', transform=get_transform(train=True))
valid_ds = WheatDataset(valid_df, path/'train', transform=get_transform(train=False))

**DataLoader**

In [None]:
def collate_fn(batch):
    return tuple(zip(*batch))

train_dl = DataLoader(train_ds, collate_fn=collate_fn, batch_size=8)
valid_dl = DataLoader(valid_ds, collate_fn=collate_fn, batch_size=8)

# Model

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [None]:
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor
model = torchvision.models.detection.fasterrcnn_resnet50_fpn(pretrained=True)


num_classes = 2  # 1 class (wheat) + background

# get number of input features for the classifier
in_features = model.roi_heads.box_predictor.cls_score.in_features

# replace the pre-trained head with a new one
model.roi_heads.box_predictor = FastRCNNPredictor(in_features, num_classes)

model.to(device)

**Testing on one batch**

In [None]:
images, targets = next(iter(train_dl))

for images, targets in train_dl:

    images = list(image.to(device) for image in images)
    targets = [{k: v.to(device) for k, v in t.items()} for t in targets]

    loss_dict = model(images, targets)
    print(loss_dict)
    break

**Hyperparameter**

In [None]:
# construct an optimizer
params = [p for p in model.parameters() if p.requires_grad]
optimizer = torch.optim.SGD(params, lr=0.005,
                            momentum=0.9, weight_decay=0.0005)

# and a learning rate scheduler which decreases the learning rate by
# 10x every 3 epochs
lr_scheduler = torch.optim.lr_scheduler.StepLR(optimizer,
                                               step_size=3,
                                               gamma=0.1)

# Training

In [None]:
import math
import sys
import time

import torch
import torchvision.models.detection.mask_rcnn
import utils
from coco_eval import CocoEvaluator
from coco_utils import get_coco_api_from_dataset


def train_one_epoch(model, optimizer, data_loader, device, epoch, print_freq, scaler=None):
    model.train()
    metric_logger = utils.MetricLogger(delimiter="  ")
    metric_logger.add_meter("lr", utils.SmoothedValue(window_size=1, fmt="{value:.6f}"))
    header = f"Epoch: [{epoch}]"

    lr_scheduler = None
    if epoch == 0:
        warmup_factor = 1.0 / 1000
        warmup_iters = min(1000, len(data_loader) - 1)

#         lr_scheduler = torch.optim.lr_scheduler.linear_lr(
#             optimizer, start_factor=warmup_factor, total_iters=warmup_iters
#         )
    lr_scheduler = torch.optim.lr_scheduler.StepLR(
        optimizer, step_size=10
    )

    for images, targets in metric_logger.log_every(data_loader, print_freq, header):
        images = list(image.to(device) for image in images)
        targets = [{k: v.to(device) for k, v in t.items()} for t in targets]
        with torch.cuda.amp.autocast(enabled=scaler is not None):
            loss_dict = model(images, targets)
            losses = sum(loss for loss in loss_dict.values())

        # reduce losses over all GPUs for logging purposes
        loss_dict_reduced = utils.reduce_dict(loss_dict)
        losses_reduced = sum(loss for loss in loss_dict_reduced.values())

        loss_value = losses_reduced.item()

        if not math.isfinite(loss_value):
            print(f"Loss is {loss_value}, stopping training")
            print(loss_dict_reduced)
            sys.exit(1)

        optimizer.zero_grad()
        if scaler is not None:
            scaler.scale(losses).backward()
            scaler.step(optimizer)
            scaler.update()
        else:
            losses.backward()
            optimizer.step()

        if lr_scheduler is not None:
            lr_scheduler.step()

        metric_logger.update(loss=losses_reduced, **loss_dict_reduced)
        metric_logger.update(lr=optimizer.param_groups[0]["lr"])

    return 

In [None]:
%%time 

# training for 5 epochs
from engine import evaluate
num_epochs = 5

for epoch in range(num_epochs):
    # training for one epoch
    train_one_epoch(model, optimizer, train_dl, device, epoch, print_freq=10)
    # update the learning rate
    lr_scheduler.step()
    # evaluate on the test dataset
    evaluate(model, valid_dl, device=device)

In [None]:
# Saving the model
torch.save(model.state_dict(), 'fasterrcnn_resnet50_fpn.pth')

In [None]:
# the function takes the original prediction and the iou threshold.

def apply_nms(orig_prediction, iou_thresh=0.3):
    
    # torchvision returns the indices of the bboxes to keep
    keep = torchvision.ops.nms(orig_prediction['boxes'], orig_prediction['scores'], iou_thresh)
    
    final_prediction = orig_prediction
    final_prediction['boxes'] = final_prediction['boxes'][keep]
    final_prediction['scores'] = final_prediction['scores'][keep]
    final_prediction['labels'] = final_prediction['labels'][keep]
    
    return final_prediction

# function to convert a torchtensor back to PIL image
def torch_to_pil(img):
    return T.ToPILImage()(img).convert('RGB')


# Testing on one image

In [None]:
# pick one image from the test set
img, target = valid_ds[5]
# put the model in evaluation mode
model.eval()
with torch.no_grad():
    prediction = model([img.to(device)])[0]
    
print('predicted #boxes: ', len(prediction['labels']))

In [None]:
def plot_bboxes(img, target):
    # Get the image and bounding box information
    image = img
    bboxes = target['boxes'].cpu()
    category_idx = target['labels'].cpu()
    category_names = ['0', '1']

    # Create a figure and axis
    fig, ax = plt.subplots(figsize=(10,10))
    

    # Display the image
    ax.imshow(image)

    # Add a bounding box for each object in the image
    for bbox, idx in zip(bboxes, category_idx):
        x, y, width, height = bbox
        rect = patches.Rectangle((x, y), width, height, linewidth=2, edgecolor='r', facecolor='none')
        ax.add_patch(rect)
#         ax.text(x, y, category_names[idx], color='w', fontsize=12, bbox=dict(facecolor='r', alpha=0.5))

    plt.show()

In [None]:
print('MODEL OUTPUT')

plot_bboxes(torch_to_pil(img), prediction)

In [None]:
nms_prediction = apply_nms(prediction, iou_thresh=0.2)
print('NMS APPLIED MODEL OUTPUT')
plot_bboxes(torch_to_pil(img), nms_prediction)

# Inference Kernal

**https://www.kaggle.com/code/sonujha090/submission-fasterrcnn/**