In [1]:
import torch
import glob
import cv2
import os
import numpy as np
import random
import time
import matplotlib.pyplot as plt
from torch.optim.lr_scheduler import ReduceLROnPlateau
from xml.dom.minidom import parse
from torchvision.models.detection.faster_rcnn import fasterrcnn_resnet50_fpn

In [2]:
os.environ["CUDA_DEVICE_ORDER"] = "PCI_BUS_ID"
os.environ["CUDA_VISIBLE_DEVICES"] = "2"

In [3]:
idx2label = ["__background__", "person", "bicycle", "car", "motorbike", "bus"]
label2idx = {label:idx for idx, label in enumerate(idx2label)}

In [4]:
def get_image_data(img_name_list, data_dir = "./data/Image/"):
    image_data = dict()
    for image_file in img_name_list:
        image_filename = os.path.join(data_dir, image_file + ".png")
        image_data[image_file] = torch.Tensor(cv2.imread(image_filename)).permute(2,0,1)
    return image_data

In [5]:
def get_annotations_data(annotations_list, data_dir = "./data/Annotations/"):
    annotations_data = dict()
    for annotations in annotations_list:
        domTree = parse(os.path.join(data_dir, annotations+".xml"))
        rootNode = domTree.documentElement
        obj_list = rootNode.getElementsByTagName("object")
        label_list = []
        boxes_list = []
        for obj in obj_list:
            label_list.append(int(label2idx[obj.getElementsByTagName("name")[0].firstChild.data]))
            boxes_list.append([
                    float(obj.getElementsByTagName("bndbox")[0].getElementsByTagName("xmin")[0].firstChild.data),
                    float(obj.getElementsByTagName("bndbox")[0].getElementsByTagName("ymin")[0].firstChild.data),
                    float(obj.getElementsByTagName("bndbox")[0].getElementsByTagName("xmax")[0].firstChild.data),
                    float(obj.getElementsByTagName("bndbox")[0].getElementsByTagName("ymax")[0].firstChild.data)
                ])
        annotations_data[(rootNode.getElementsByTagName("filename")[0].firstChild.data).split(".png")[0]] = {"boxes":torch.Tensor(boxes_list),"labels":torch.LongTensor(label_list)}
    return annotations_data

In [6]:
def get_img_name_list(data_dir = "./data/test.txt"):
    with open(data_dir, "r") as f:
        img_name_list = f.readlines()
    f.close()
    img_name_list = [img_name.strip() for img_name in img_name_list]
    return img_name_list

In [7]:
def vis_data(img_name, image, target, data_dir="./data/pred_img/", isPred = True, show=True):
    image = np.array(image).astype(np.uint8)
    boxes = target['boxes'].cpu().numpy()
    labels = target['labels'].cpu().numpy()
    
    if isPred:
        scores = target["scores"].cpu().numpy()
        for i, score in enumerate(scores):
            if(score > 0.8):
                cv2.putText(image, idx2label[labels[i]], (int(boxes[i][0]), int(boxes[i][1])), cv2.FONT_HERSHEY_COMPLEX, 1.0, (0,0,255), 1)
                cv2.rectangle(image, (boxes[i][0], boxes[i][1]), (boxes[i][2], boxes[i][3]), tuple(random.randint(0,255) for _ in range(3)), 2)
    else:
        for i in range(len(boxes)):
            cv2.putText(image, idx2label[labels[i]], (int(boxes[i][0]), int(boxes[i][1])), cv2.FONT_HERSHEY_COMPLEX, 1.0, (0,0,255), 1)
            cv2.rectangle(image, (boxes[i][0], boxes[i][1]), (boxes[i][2], boxes[i][3]), tuple(random.randint(0,255) for _ in range(3)), 2)
    
    cv2.imwrite(os.path.join(data_dir, img_name + ".png"), image)
    
    if show:
        image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)
        plt.imshow(image)

In [8]:
def writelog(img_name, boxes, labels, scores, data_dir = "./data/pt/", threshold = 0):
    data_len = len(scores)
    with open(os.path.join(data_dir, img_name + ".txt"), "w") as f:
        for i in range(data_len):
            if scores[i] > threshold :
                f.write("{} {} {} {} {}\n".format(idx2label[labels[i]], boxes[i][0], boxes[i][1], boxes[i][2], boxes[i][3]))
    f.close()

In [9]:
def compute_IOU(rec1,rec2):
    #rec: rectangle, [xmin ymin xmax ymax]
    #return IoU of rec1 and rec2
    width=max(0,min(rec1[2],rec2[2])-max(rec1[0],rec2[0]))
    hight=max(0,min(rec1[3],rec2[3])-max(rec1[1],rec2[1]))
    inter=width*hight
    union=(rec1[3]-rec1[1])*(rec1[2]-rec1[0])+(rec2[3]-rec2[1])*(rec2[2]-rec2[0])-inter
    return inter/union

In [10]:
def evaluate(pred_boxes, pred_labels, gt_boxes, gt_labels):
    pred_len = len(pred_boxes)
    gt_len = len(gt_boxes)
    count = 0
    match = np.zeros(gt_len)
    for i in range(pred_len):
        for j in range(gt_len):
            if compute_IOU(pred_boxes[i], gt_boxes[j]) > 0.5 and pred_labels[i] == gt_labels[j] and match[j] == 0:
                count += 1
                match[j] = 1
                break
    num_correct = count
    num_error = pred_len - count
    num_miss = gt_len - count
    print('correct: {}, error: {}, miss: {}'.format(num_correct,num_error,num_miss))
    
    return num_correct, num_error, num_miss

In [11]:
def evaluate_model_nobatch(img_name_list, image_data, annotations_data):
#     model = torch.load("best_model.pt", map_location=torch.device('cpu'))
    model.eval()
    
    num_correct = 0
    num_error = 0 
    num_miss = 0
    
    for img_name in img_name_list:
        if not image_data.__contains__(img_name):
    #             print(img_name)
            continue

        if not annotations_data.__contains__(img_name):
    #             print(img_name)
            continue

        with torch.no_grad():
            if use_cuda:
                pred_data = model([image_data[img_name].cuda()])
            else:
                pred_data = model([image_data[img_name].cpu()])
            
        tmp_num_correct, tmp_num_error, tmp_num_miss = evaluate(pred_data[0]["boxes"].cpu(), pred_data[0]["labels"].cpu(), 
                                                                annotations_data[img_name]["boxes"].cpu(), annotations_data[img_name]["labels"].cpu())
        num_correct = num_correct + tmp_num_correct
        num_error = num_error + tmp_num_error
        num_miss = num_miss + tmp_num_miss
        
        writelog(img_name = img_name, boxes = pred_data[0]["boxes"].cpu(), labels = pred_data[0]["labels"].cpu(), scores = pred_data[0]["scores"].cpu())
        writelog(img_name = img_name, boxes = annotations_data[img_name]["boxes"], labels = annotations_data[img_name]["labels"], 
                 scores = np.ones(len(annotations_data[img_name]["boxes"])), data_dir="./data/gt/")
        vis_data(img_name = img_name, image = image_data[img_name].permute(1,2,0), target = pred_data[0], data_dir="./data/pred_img/", isPred = True, show=False)
        
    mAP=num_correct/(num_correct+num_error)
    mAR=num_correct/(num_correct+num_miss)
    F_measure=2*mAP*mAR/(mAP+mAR)
    print('mAP={}\n mAR={}\n F-measure={}'.format(mAP,mAR,F_measure))

In [12]:
def evaluate_all(pred_boxes, pred_labels, gt_boxes, gt_labels):
    
    num_correct = 0
    num_error = 0 
    num_miss = 0
    
    eval_len = len(pred_boxes)
    for i in range(eval_len):
        tmp_num_correct, tmp_num_error, tmp_num_miss = evaluate(pred_boxes[i], pred_labels[i], gt_boxes[i], gt_labels[i])
        num_correct = num_correct + tmp_num_correct
        num_error = num_error + tmp_num_error
        num_miss = num_miss + tmp_num_miss
        
    mAP=num_correct/(num_correct+num_error)
    mAR=num_correct/(num_correct+num_miss)
    F_measure=2*mAP*mAR/(mAP+mAR)
    print('mAP={}\n mAR={}\n F-measure={}'.format(mAP,mAR,F_measure))

In [13]:
def evaluate_model(img_name_list, image_data, annotations_data):
    
    model.eval()

    pred_img = []
    gt_data = []
    for img_name in img_name_list:
        if not image_data.__contains__(img_name):
    #             print(img_name)
            continue

        if not annotations_data.__contains__(img_name):
    #             print(img_name)
            continue

        pred_img.append(image_data[img_name])
        gt_data.append(annotations_data[img_name])

    with torch.no_grad():
        pred_data = model(pred_img)

    pred_eval_boxes = []
    pred_eval_labels = []
    gt_eval_boxes = []
    gt_eval_labels = []

    for item in pred_data:
        pred_eval_boxes.append(item["boxes"])
        pred_eval_labels.append(item["labels"])
        # wirte predict log
        writelog(img_name = img_name, boxes = item["boxes"], labels = item["labels"], scores = item["scores"])

    for item in gt_data:
        gt_eval_boxes.append(item["boxes"])
        gt_eval_labels.append(item["labels"])

    evaluate_all(pred_eval_boxes, pred_eval_labels, gt_eval_boxes, gt_eval_labels)

In [14]:
img_name_list = get_img_name_list()
image_data = get_image_data(img_name_list)
annotations_data = get_annotations_data(img_name_list)

In [15]:
use_cuda = True

In [21]:
if use_cuda:
    model = torch.load("best_model_0.0211.pt", map_location=torch.device('cuda'))
else:
    model = torch.load("best_model_0.0211.pt", map_location=torch.device('cpu'))

In [22]:
evaluate_model_nobatch(img_name_list, image_data, annotations_data)

correct: 6, error: 0, miss: 0
correct: 2, error: 0, miss: 0
correct: 7, error: 0, miss: 0
correct: 10, error: 0, miss: 0
correct: 11, error: 0, miss: 1
correct: 15, error: 2, miss: 0
correct: 7, error: 0, miss: 0
correct: 15, error: 1, miss: 0
correct: 2, error: 0, miss: 0
correct: 7, error: 0, miss: 0
correct: 3, error: 0, miss: 0
correct: 8, error: 0, miss: 0
correct: 16, error: 1, miss: 1
correct: 50, error: 3, miss: 3
correct: 4, error: 0, miss: 0
correct: 1, error: 0, miss: 0
correct: 1, error: 0, miss: 0
correct: 1, error: 0, miss: 0
correct: 3, error: 0, miss: 0
correct: 2, error: 0, miss: 0
correct: 4, error: 0, miss: 0
correct: 5, error: 0, miss: 0
correct: 1, error: 0, miss: 0
correct: 3, error: 1, miss: 0
correct: 1, error: 0, miss: 0
correct: 11, error: 0, miss: 0
correct: 2, error: 0, miss: 0
correct: 9, error: 0, miss: 0
correct: 10, error: 0, miss: 1
correct: 2, error: 0, miss: 0
correct: 8, error: 0, miss: 0
correct: 13, error: 1, miss: 0
correct: 1, error: 0, miss: 0
c

In [18]:
torch.cuda.empty_cache()