<a href="https://colab.research.google.com/github/thaneesan99/PyTorch_Faster_RCNN/blob/main/Train_PyTorch_Faster_RCNN.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import numpy as np
import pandas as pd
import os
from PIL import Image, ImageDraw
import torch
from torch.utils.data import Dataset, DataLoader
import torchvision
from torchvision import transforms as T
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor
from sklearn.metrics import precision_recall_fscore_support
from torchvision.ops import box_iou

In [None]:
# Download the file from the specified URL
!wget "place_your_dataset_link" -O dataset.zip

# Unzip the downloaded file
!unzip dataset.zip > /dev/null

In [None]:
# Load CSV and prepare datasets
train = pd.read_csv('/content/train/_annotations.csv')
valid = pd.read_csv('/content/valid/_annotations.csv')

In [None]:
train_unique_imgs = train.filename.unique()
valid_unique_imgs = valid.filename.unique()

In [None]:
class CustDat(Dataset):
    def __init__(self, df, unique_imgs, root_dir, transform=None):
        self.df = df
        self.unique_imgs = unique_imgs
        self.root_dir = root_dir
        self.transform = transform

    def __len__(self):
        return len(self.unique_imgs)

    def __getitem__(self, idx):
        image_name = self.unique_imgs[idx]
        # Filter bounding boxes for the current image
        boxes_df = self.df[self.df.filename == image_name][['xmin', 'ymin', 'xmax', 'ymax']]
        boxes = boxes_df.values.astype("float")  # Convert bounding box coordinates to float
        img_path = os.path.join(self.root_dir, image_name)
        img = Image.open(img_path).convert('RGB')  # Open the image

        # Assuming a single class for all boxes (can be adjusted as needed)
        labels = torch.ones((boxes.shape[0]), dtype=torch.int64)

        # Prepare the target dictionary
        target = {
            'boxes': torch.tensor(boxes, dtype=torch.float32),
            'labels': labels
        }

        # Apply any transformations to the image
        if self.transform:
            img = self.transform(img)

        return img, target

In [None]:
# Create datasets
train_dataset = CustDat(df=train, unique_imgs=train_unique_imgs, root_dir='/content/train',
                        transform=T.ToTensor())
valid_dataset = CustDat(df=valid, unique_imgs=valid_unique_imgs, root_dir='/content/valid',
                        transform=T.ToTensor())

# Create dataloaders
train_dl = DataLoader(train_dataset, batch_size=4, shuffle=True, collate_fn=lambda x: tuple(zip(*x)))
valid_dl = DataLoader(valid_dataset, batch_size=1, shuffle=False, collate_fn=lambda x: tuple(zip(*x)))

In [None]:
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')

In [None]:
model = torchvision.models.detection.fasterrcnn_resnet50_fpn(pretrained=True)
num_classes = 2
in_features = model.roi_heads.box_predictor.cls_score.in_features
model.roi_heads.box_predictor = FastRCNNPredictor(in_features, num_classes)

In [None]:
optimizer = torch.optim.SGD(model.parameters(), lr=0.001, momentum=0.9, weight_decay=0.0005)
num_epochs = 100

In [None]:
save_dir = "faster_rcnn_model"
os.makedirs(save_dir, exist_ok=True)  # Create the directory if it doesn't exist

In [None]:
model.to(device)

for epoch in range(num_epochs):
    # TRAINING
    model.train()
    epoch_loss = 0
    for imgs, targets in train_dl:
        imgs = [img.to(device) for img in imgs]
        targets = [{k: v.to(device) for k, v in t.items()} for t in targets]

        # Forward pass
        loss_dict = model(imgs, targets)
        loss = sum(v for v in loss_dict.values())

        # Accumulate epoch loss
        epoch_loss += loss.cpu().detach().numpy()

        # Backpropagation
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

    print(f"Epoch [{epoch + 1}/{num_epochs}] Loss: {epoch_loss:.4f}")

    # Save model every 10 epochs
    if (epoch + 1) % 10 == 0:
        save_path = os.path.join(save_dir, f"faster_rcnn_epoch_{epoch + 1}.pth")
        torch.save(model.state_dict(), save_path)
        print(f"Model saved at {save_path}")

    # VALIDATION
    model.eval()  # Set the model to evaluation mode
    y_true = []
    y_pred = []

    with torch.no_grad():
        for imgs, targets in valid_dl:
            imgs = [img.to(device) for img in imgs]
            targets = [{k: v.to(device) for k, v in t.items()} for t in targets]

            outputs = model(imgs)  # Predictions from the model

            # Validate predictions
            for i, output in enumerate(outputs):
                pred_boxes = output["boxes"].cpu()
                pred_labels = output["labels"].cpu()
                gt_boxes = targets[i]["boxes"].cpu()
                gt_labels = targets[i]["labels"].cpu()

                # IoU matching between ground truths and predictions
                if len(gt_boxes) > 0 and len(pred_boxes) > 0:
                    iou_matrix = box_iou(gt_boxes, pred_boxes)

                    for gt_idx, gt_label in enumerate(gt_labels):
                        max_iou, pred_idx = iou_matrix[gt_idx].max(0)  # Best match for each GT box
                        if max_iou > 0.9:
                            y_true.append(gt_label.item())
                            y_pred.append(pred_labels[pred_idx].item())
                        else:
                            # False negative for unmatched ground truths
                            y_true.append(gt_label.item())
                            y_pred.append(0)  # Assume unmatched predictions as class 0

                    # False positives for unmatched predictions
                    matched_pred_indices = iou_matrix.argmax(0)
                    unmatched_preds = set(range(len(pred_boxes))) - set(matched_pred_indices.tolist())
                    for pred_idx in unmatched_preds:
                        y_true.append(0)  # No ground truth
                        y_pred.append(pred_labels[pred_idx].item())
                else:
                    # If no predictions or no ground truths, handle accordingly
                    y_true.extend(gt_labels.tolist())
                    y_pred.extend([0] * len(gt_labels))  # All are unmatched (false negatives)

                    y_true.extend([0] * len(pred_boxes))  # All predictions are unmatched (false positives)
                    y_pred.extend(pred_labels.tolist())

    # Calculate validation metrics
    precision, recall, f1, _ = precision_recall_fscore_support(y_true, y_pred, average="weighted", zero_division=0)


    # Print validation metrics
    print(f"Validation - Epoch [{epoch + 1}/{num_epochs}]:")
    print(f"Precision: {precision:.4f}, Recall: {recall:.4f}, F1-Score: {f1:.4f}")



In [None]:
from google.colab import files
files.download('/content/faster_rcnn_model/faster_rcnn_epoch_100.pth')