In [None]:
!pip install torchmetrics

In [None]:
# Imports
import numpy as np
import torch
from torch.utils.data import DataLoader, Dataset
import torchvision
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor
from torchvision.io import read_image
from torchvision import datasets, transforms
import warnings
import zipfile
from tqdm import tqdm
import os
from pathlib import Path
import random
from PIL import Image, ImageDraw
from IPython.display import display
warnings.filterwarnings('ignore')
from torchmetrics.detection.mean_ap import MeanAveragePrecision
from torch import nn

In [None]:
filename = '/content/smoke-fire-detection-yolo.zip' # Note that filename might be different depending on the device, this one should work for google colab.
with zipfile.ZipFile(filename, 'r') as zip_ref:
    zip_ref.extractall('../content/')

In [None]:
class RCNNDataset(Dataset): # Inherits from torch.utils.data.Dataset
  def __init__(self, img_dir, label_dir, transform=None):
    self.img_dir = img_dir
    self.label_dir = label_dir
    self.transform = transform
    self.img_files = sorted(os.listdir(img_dir)) # Sorting so the order remains the same
    self.label_files = sorted(os.listdir(label_dir))

    # Keep only the images that have existing labels
    self.valid_files = []
    for img_file, label_file in zip(self.img_files, self.label_files):
      label_path = os.path.join(label_dir, label_file)
      if os.path.getsize(label_path) > 1:  # Skip empty labels
        self.valid_files.append((img_file, label_file))

  def __len__(self): # Needed for determining the size of the dataset
    return len(self.valid_files)

  def __getitem__(self, idx): # Used to get items
    img_file, label_file = self.valid_files[idx]

    # Getting image
    img_path = os.path.join(self.img_dir, img_file)
    img = Image.open(img_path).convert("RGB")

    # Getting bounding boxes
    label_path = os.path.join(self.label_dir, label_file)
    with open(label_path, 'r') as f:
      boxes, labels = [], []
      for line in f:
        x_center, y_center, w, h = list(map(float, line.strip().split()))[1:]
        label = list(map(float, line.strip().split()))[0]

        # Convert proportions to pixel values
        img_w, img_h = img.size
        x = (x_center - w / 2) * img_w
        y = (y_center - h / 2) * img_h
        w = w * img_w
        h = h * img_h
        boxes.append([x, y, x + w, y + h])
        labels.append(label)

      # Check for invalid box proportions and skip the image if found. Added to avoid error
      for box in boxes:
        if box[2] - box[0] <= 0 or box[3] - box[1] <= 0:
          print(f"Skipping image {img_path} because box: {box} proportions are invalid")
          # if there's an invalid box we skip by calling this method again but with the next image
          return self.__getitem__((idx + 1)%self.__len__()) # should return index of the next item because index+1 divided by total length will return index+1, but if index is greater than length, it will return index 0

      # Convert to Torch Tensors
      boxes = torch.tensor(boxes, dtype=torch.float32)
      labels = torch.tensor(labels, dtype=torch.int64)

      # Apply Transformations
      if self.transform:
        img = self.transform(img)

      target = {"boxes": boxes, "labels": labels}
      return img, target


# Define Transformations
transform = transforms.Compose([transforms.ToTensor(),
                                transforms.Normalize(0.5, 0.5)])

# Create Dataset & DataLoader
image_folder = "/content/data/test/images"
label_folder = "/content/data/test/labels"

test_dataset = RCNNDataset(image_folder, label_folder, transform=transform)
test_dataloader = DataLoader(test_dataset, batch_size=20, shuffle=True, collate_fn=lambda x: tuple(zip(*x)))


In [None]:
import torch
import torchvision
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor

# Loading the model structure that we will upload our trained model on
model = torchvision.models.detection.fasterrcnn_resnet50_fpn(pretrained=True)

num_classes = 3
in_features = model.roi_heads.box_predictor.cls_score.in_features
model.roi_heads.box_predictor = FastRCNNPredictor(in_features, num_classes)

# Move model to GPU if available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)



FasterRCNN(
  (transform): GeneralizedRCNNTransform(
      Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
      Resize(min_size=(800,), max_size=1333, mode='bilinear')
  )
  (backbone): BackboneWithFPN(
    (body): IntermediateLayerGetter(
      (conv1): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
      (bn1): FrozenBatchNorm2d(64, eps=0.0)
      (relu): ReLU(inplace=True)
      (maxpool): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
      (layer1): Sequential(
        (0): Bottleneck(
          (conv1): Conv2d(64, 64, kernel_size=(1, 1), stride=(1, 1), bias=False)
          (bn1): FrozenBatchNorm2d(64, eps=0.0)
          (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
          (bn2): FrozenBatchNorm2d(64, eps=0.0)
          (conv3): Conv2d(64, 256, kernel_size=(1, 1), stride=(1, 1), bias=False)
          (bn3): FrozenBatchNorm2d(256, eps=0.0)
          (relu): ReLU(

In [None]:
model_path = '/content/test_model_01_RCNN.pth' # uploaded trained model dict state path, change accordingly.

!ls -lh {model_path}  # Checking file size and permissions.
# !unzip -t {model_path} # Uncomment this line if file unzipping needed.

model.load_state_dict(torch.load(model_path, map_location=torch.device('cuda')))

In [None]:
from torchmetrics.detection.mean_ap import MeanAveragePrecision

metric = MeanAveragePrecision(iou_type="bbox")
model.eval()

for imgs, targets in test_dataloader:
    imgs = list(img.cuda() for img in imgs)

    with torch.no_grad():
        preds = model(imgs)

    # Format targets and preds as required
    preds_formatted = [{k: v.cpu() for k, v in p.items()} for p in preds]
    targets_formatted = [{k: v for k, v in t.items()} for t in targets]

    metric.update(preds_formatted, targets_formatted)

# Get final score
results = metric.compute()
print(results)  # includes mAP, precision, recall, per-class metrics