# Inference of Detectron model

## Run All via cell below
Comment out cell below 1st!

In [21]:
run_inference()

Running inference on LABELLED dataset on cpu
1/2 images inferred.
2/2 images inferred.
Inference complete!

Evaluating test statistics...

Inference complete. Key Results:
Avg Precision (IOU50) = 0.9999999999999999
Avg Precision (IOU75) = 0.9999999999999999
Avg Recall (IOU50) = 0.85

Detailed results are available in 'test_results.txt' log in same folder as inference images


In [2]:
# This csv file can be found in the models folder. Default filename is "labels_train_val.csv"
trained_csv_file = 'your/training/csv/file/here'

# Search for this in the models folder. Typically of a few hundred MBs.
trained_model_dir = 'your/trained/model/here'

# Test images
test_dataset_images_dir = 'your/test/images/here'

# Test image PascalVOC annotations (OPTIONAL) - leave as '' if needed
test_dataset_annotations_dir = 'your/test/annotations/here'
# test_dataset_annotations_dir = ''

In [3]:
num_workers=0
threshold = 0.5

## Do not edit below

In [4]:
import pandas as pd
import numpy as np
import os, glob, sys
from datetime import datetime as DT
from PIL import ImageFont, ImageDraw, Image
import xml.etree.ElementTree as ET

from typing import List, Tuple, Dict, Optional
import torch
import torchvision
import torch.utils.data
from torch import nn, Tensor
from torchvision.transforms import functional as F

# Import selected model - Faster R-CNN ResNet-50 FPN
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor

# Import from vision
os.chdir("vision")
from engine import train_one_epoch, evaluate
import utils
import transforms as T
os.chdir("..")

## Check if data is labelled or not

In [5]:
if os.path.isdir(test_dataset_annotations_dir):
    print("Annotations verified! Your test dataset is labelled. Moving to inference...\n")
    TEST_DATA_IS_LABELLED = True
else:
    print("No annotations found for your test dataset. Inference will still proceed but ground truth boxes and test statistics will not be available....\n")
    TEST_DATA_IS_LABELLED = False

Annotations verified! Your test dataset is labelled. Moving to inference...



## Process Train CSV
Get info on:
- Number of classes
- Class mapping

In [6]:
# Read CSV
my_trained_labels_df = pd.read_csv(trained_csv_file)

## Read Test XML Labels and convert to CSV for groundtruth

In [7]:
# Function to convert xml to csv

def xml_to_csv(path):
    xml_list = []
    for xml_file in glob.glob(path + '/*.xml'):
        tree = ET.parse(xml_file)
        root = tree.getroot()
        for member in root.findall('object'):
            value = (root.find('filename').text,
                     int(root.find('size')[0].text),
                     int(root.find('size')[1].text),
                     member[0].text,
                     int(member[4][0].text),
                     int(member[4][1].text),
                     int(member[4][2].text),
                     int(member[4][3].text)
                     )
            xml_list.append(value)
    column_name = ['filename', 'width', 'height', 'class', 'xmin', 'ymin', 'xmax', 'ymax']
    xml_df = pd.DataFrame(xml_list, columns=column_name)
    return xml_df

In [8]:
if TEST_DATA_IS_LABELLED:
    # Find annotations and convert to CSV
    test_annotations_dir = os.path.join(test_dataset_annotations_dir)
    test_xml_df = xml_to_csv(test_annotations_dir)

    # Save CSV for importing by dataloader
    my_test_labels_savefile = os.path.join(test_dataset_annotations_dir, f"test_labels.csv")
    test_xml_df.to_csv(my_test_labels_savefile)

In [9]:
# Replace class labels with numbers

# Get list of classes in dataframe, including "0" background layer
list_of_classes = np.array([0])
temp_class_list = np.unique(my_trained_labels_df["class"])

# Sort in alphabetical order to ensure consistency.
temp_class_list = sorted(temp_class_list)
list_of_classes = np.append(list_of_classes, temp_class_list).astype(object)

# Get number of classes, including bg class
num_classes_inference = len(list_of_classes)

# Map class names to numbers (e.g. 0 = background, 1 = flammable etc..)
for j in range(len(my_trained_labels_df["class"])):
  class_index = np.where(list_of_classes == my_trained_labels_df["class"][j])[0][0]
  my_trained_labels_df.at[j,"class"] = class_index

if TEST_DATA_IS_LABELLED:
    for k in range(len(test_xml_df["class"])):
      class_index = np.where(list_of_classes == test_xml_df["class"][k])[0][0]
      test_xml_df.at[k,"class"] = class_index

## Inference
For labelled data

In [10]:
def run_inference():
    if TEST_DATA_IS_LABELLED:
        infer_labelled_data()
    else:
        run_inference_unlabelled()

In [11]:
def infer_labelled_data():
    
    # Load Model for test dataset
    inference_device = 'cuda' if torch.cuda.is_available() else 'cpu'
    print(f"Running inference on LABELLED dataset on {inference_device}")
    
    loaded_model = get_model(num_classes_inference)
    loaded_model.load_state_dict(torch.load(trained_model_dir, 
        map_location=torch.device(inference_device)))

    # Create test results output folder
    test_results_dir = os.path.join(test_dataset_images_dir, "inference_results_images")
    if not os.path.exists(test_results_dir):
        os.mkdir(test_results_dir)
        
    # Directory is named after time and date    
    new_dir_name = DT.now().strftime("%Y_%m_%d-%H.%M.%S")
    new_dir = os.path.join(test_results_dir, f"testResults_{new_dir_name}")
    os.mkdir(new_dir)
    
    # Draw and save inference result
    test_size = len(load_test_data())
    for i in range(test_size):
        prediction = draw_outputs_seen_data(i, new_dir, loaded_model)
        if i%2==0 or (i+1)==test_size:
            print(f"{i+1}/{test_size} images inferred.")
            
    # Getting test statistics
    print("\nInference complete!")
    
    print("\nEvaluating test statistics...\n")
    
    loaded_model.to(inference_device)
    data_loader_test = torch.utils.data.DataLoader(
                load_test_data(), batch_size=2, shuffle=False, 
                num_workers=num_workers,
                collate_fn=utils.collate_fn)
    
    # Save results to log
    original_stdout = sys.stdout # Save a reference to the original standard output

    with open(os.path.join(new_dir, '_test_results.txt'), 'w') as f:
        sys.stdout = f # Change the standard output to the file we created.
        print("TEST RESULTS")
        print(f'Model Used: {os.path.basename(os.path.normpath(trained_model_dir))}')
        print("\n******************************\n")
        test_eval = evaluate(loaded_model, data_loader_test, device=inference_device)
        
        sys.stdout = original_stdout # Reset the standard output to its original value
    
    
    # Display key statistics
    test_eval_stats = test_eval.coco_eval['bbox'].__dict__['stats']
    
    print(f"Inference complete. Here are some key Results:")
    print(f"Avg Precision (IOU50) = {test_eval_stats[1]}")
    print(f"Avg Precision (IOU75) = {test_eval_stats[2]}")
    print(f"Avg Recall (IOU50) = {test_eval_stats[7]}")
    
    print("\nDetailed results are available in '_test_results.txt' log in same folder as inference images")

In [12]:
def draw_outputs_seen_data(idx, new_dir, loaded_model):
    """
    Draws inference results.
    """
    dataset_test = load_test_data()

    # Get prediction and scores
    img, prediction = get_prediction(idx, loaded_model)


    # Retrieve torch.tensor containing box coordinate(s) and convert to np.array
    # NOTE: Can have multiple label boxes! GROUND TRUTH BOXES
    label_boxes = np.array(dataset_test[idx][1]["boxes"])
    
    # Get image and prepare to print image
    image = Image.fromarray(img.mul(255).permute(1, 2,0).byte().numpy())
    draw = ImageDraw.Draw(image)


    # DRAWING BOUNDING BOXES

    # Draw groundtruth (GREEN)
    for elem in range(len(label_boxes)):
        draw.rectangle([(label_boxes[elem][0], label_boxes[elem][1]),
        (label_boxes[elem][2], label_boxes[elem][3])], 
        outline ="green", width =3)

    # Draw predicted bounding box (RED)
    for element in range(len(prediction[0]["boxes"])):

        # Coords of predicted bouding box. Replaced .cpu() with .detach() for performance
        boxes = prediction[0]["boxes"][element].detach().numpy()

        # Score = confidence level of prediction 
        score = np.round(prediction[0]["scores"][element].detach().numpy(),
                          decimals= 4)

        # Retrieve predicted class labels (e.g. "oxidizer")
        predicted_class = prediction[0]["labels"][element].detach().numpy()
        predicted_classes_label = list_of_classes[predicted_class]

        # Only draw predicted bounding boxes exceeding threshold CONF
        if score > threshold:
            draw.rectangle([(boxes[0], boxes[1]), (boxes[2], boxes[3])], 
            outline ="red", width =3)
            draw.text((boxes[0]+5, boxes[1]-12), text = f"{predicted_classes_label}, {str(score)}", fill="red")


    # Save image
    image.save(os.path.join(new_dir, f"inference_{idx}.jpg"))

    return prediction

In [13]:
# Function to draw image

def get_prediction(idx, loaded_model):
    dataset_test = load_test_data()

    # Underscore "_" is used as the 2nd output (target dict) is not important. Only want "img"
    img, _ = dataset_test[idx]

    # Put the model in evaluation mode
    loaded_model.eval()


    # Retrieve predicted bounding box (red)
    # There are MANY predicted bounding boxes, each with a score
    with torch.no_grad():
        prediction = loaded_model([img])

    return img, prediction

## Inference
For unlabelled data

In [14]:
def run_inference_unlabelled():
    
    # Load Model for test dataset
    inference_device = 'cuda' if torch.cuda.is_available() else 'cpu'
    print(f"Running inference on UNLABELLED test dataset on {inference_device}")
    
    loaded_model = get_model(num_classes_inference)
    loaded_model.load_state_dict(torch.load(trained_model_dir, 
        map_location=torch.device(inference_device)))

    # Create test results output folder
    test_results_dir = os.path.join(test_dataset_images_dir, "inference_results_images")
    if not os.path.exists(test_results_dir):
        os.mkdir(test_results_dir)
        
    # Directory is named after time and date    
    new_dir_name = DT.now().strftime("%Y_%m_%d-%H.%M.%S")
    new_dir = os.path.join(test_results_dir, f"testResults_{new_dir_name}")
    os.mkdir(new_dir)
    
    imgs_to_test = [f for f in os.listdir(test_dataset_images_dir) if os.path.isfile(os.path.join(test_dataset_images_dir, f))]
    for idx, filename in enumerate(imgs_to_test):
        img_path = os.path.join(test_dataset_images_dir, filename)
        draw_outputs_unlabelled_data(img_path, loaded_model, new_dir, idx)
        test_size = len(imgs_to_test)
        
        if idx%2==0 or (idx+1)==test_size:
            print(f"{idx+1}/{test_size} images inferred.")
            
    print(f"\nInference complete. Results saved in {new_dir}")  

In [15]:
# Function to draw image

def draw_outputs_unlabelled_data(img_path, loaded_model, new_dir, idx):
  

  # Convert unseen image into tensor
  img = Image.open(img_path).convert("RGB")
  transforms = T.ToTensor()
  # transforms = augment_image(train=False)
  target = None
  img, target = transforms(img, target)


  # Put the model in evaluation mode
  loaded_model.eval()

  # Retrieve predicted bounding box (red)
  # There are MANY predicted bounding boxes, each with a score
  with torch.no_grad():
    prediction = loaded_model([img])


  # Get image and prepare to print image
  image = Image.fromarray(img.mul(255).permute(1, 2,0).byte().numpy())
  draw = ImageDraw.Draw(image)
  
  # Get predicted scores
  predicted_scores = prediction[0]["scores"].detach().numpy()


  if True:
    for element in range(len(prediction[0]["boxes"])):

      # Coords of predicted bouding box. Replaced .cpu() with .detach() for performance
      boxes = prediction[0]["boxes"][element].detach().numpy()

      # Score = confidence level of prediction 
      score = np.round(prediction[0]["scores"][element].detach().numpy(),
                        decimals= 4)
      
      # Retrieve predicted class labels (e.g. "oxidizer")
      predicted_class = prediction[0]["labels"][element].detach().numpy()
      predicted_classes_label = list_of_classes[predicted_class]
      
      # Only draw predicted bounding boxes exceeding threshold CONF
      if score > threshold:
        draw.rectangle([(boxes[0], boxes[1]), (boxes[2], boxes[3])], 
        outline ="red", width =3)
        draw.text((boxes[0]+5, boxes[1]-12), text = f"{predicted_classes_label}, {str(score)}", fill="black")


  # Save image
  image.save(os.path.join(new_dir, f"inference_{idx}.jpg"))

## Dataloader and Model loader

In [16]:
# Set up model with required no. of classes

def get_model(num_classes):
    model = torchvision.models.detection.fasterrcnn_resnet50_fpn(pretrained=True)

    # get the number of input features for the classifier
    in_features = model.roi_heads.box_predictor.cls_score.in_features

    # replace the pre-trained head with a new one
    model.roi_heads.box_predictor = FastRCNNPredictor(in_features, num_classes)

    return model

In [17]:
def load_test_data():
    dataset_test = MyCustomDataset(root= test_dataset_images_dir, 
                        data_file= my_test_labels_savefile, 
                        transforms = augment_image(train=False))
    return dataset_test

In [18]:
def augment_image(train):
    # Create new list of original and transformed images
    transforms = []

    # converts the PIL image, into a PyTorch Tensor
    transforms.append(T.ToTensor())

    # Augments image only if during training stage; train == True:
    if train:
      # Randomly augment the training images and ground-truths for data augmentation
        transforms.append(T.RandomHorizontalFlip(0.5))
    return T.Compose(transforms)

In [19]:
# Function to extract data labels
# "labels" from previously read pandas df of csv file

def parse_one_annot(path_to_data_file, filename):                 
    # Handle test data
    if path_to_data_file == my_test_labels_savefile:
        target_row = test_xml_df[test_xml_df["filename"] == filename]  # Extracts one row matching "filename" 
    # Handle trainig data
    else:
        target_row = my_trained_labels_df[my_trained_labels_df["filename"] == filename] # Extracts one row matching "filename"
        
    boxes_array = target_row[["xmin", "ymin", "xmax", "ymax"]].values         # Extracts bounding coords from that row
    class_type = target_row[["class"]].values

    return boxes_array, class_type.astype(str).astype(int)

## Custom Class

In [20]:
class MyCustomDataset(torch.utils.data.Dataset):
  # Transforms is for data augmentation, by default None but a transforms helper fn is used (see below)
  
  def __init__(self, root, data_file, transforms=None):
    self.root = root
    self.transforms = transforms
    self.imgs = sorted(f for f in os.listdir(root) if os.path.isfile(os.path.join(root, f)))      # Navigate to image directory
    self.path_to_data_file = data_file


  # Get image information, returning required tensor "target"
  def __getitem__(self, idx):

    # PART 0: GET IMAGE PIL FILE
    # load EACH image and its bounding boxes
    img_path = os.path.join(self.root, self.imgs[idx])        # Get address of each image
    img = Image.open(img_path).convert("RGB")                           # Open image and convert to RGB
  
    # PART 1: GET COORDS OF BOUNDING BOXES   
    box_list, class_type_num = parse_one_annot(self.path_to_data_file,                  # Get bounding box coords for this image
    self.imgs[idx])
    boxes = torch.as_tensor(box_list, dtype=torch.float32)              # Convert array of coords to tensor

    # PART 2: GET CLASS LABELS
    # To handle multiple classes
    class_type_num = class_type_num.transpose()[0] 
    labels = torch.as_tensor(class_type_num, dtype=torch.int64)
    
    # PART 3: GET IMAGE_IDs
    image_id = torch.tensor([idx])

    # PART 4: COMPUTE AREA
    area = (boxes[:, 3] - boxes[:, 1]) * (boxes[:, 2] - boxes[:,0])

    # PART 5: suppose all instances are not crowd so none will be ignored
    num_objs = len(box_list)
    iscrowd = torch.zeros((num_objs,), dtype=torch.int64)

    # ASSEMBLE REQUIRED TARGET DICTIONARY
    target = {}
    target["boxes"] = boxes
    target["labels"] = labels
    target["image_id"] = image_id
    target["area"] = area
    target["iscrowd"] = iscrowd


    if self.transforms is not None:
        img, target = self.transforms(img, target)
    return img, target


  def __len__(self):
    return len(self.imgs)