In [None]:
import os
import pylab as pl
import numpy as np
import sys
import io
import cv2
import pickle

from detectron2.data.datasets import load_coco_json, register_coco_instances
from detectron2.data import MetadataCatalog
from detectron2 import model_zoo
from detectron2.config import get_cfg
from detectron2.engine import DefaultPredictor

from detectron2.data import DatasetMapper, transforms, build_detection_train_loader
from detectron2.engine import DefaultTrainer, SimpleTrainer
from detectron2.engine.hooks import PeriodicWriter

# Image -> GPS lookup

In [None]:
# Load metadata about the existing images
metadata_pkl = 'gsv/metadata.pkl'
if os.path.exists(metadata_pkl):
    with open(metadata_pkl, 'rb') as f:
        metadata = pickle.load(f)
else:
    metadata = {}
    
print(len(metadata))

# Load Model

In [None]:
# Custom Trainer class to include custom data augmentations
class Trainer(DefaultTrainer):
    @classmethod
    def build_train_loader(cls, cfg):
        # Set up data augmentation
        augs = [transforms.ResizeShortestEdge(
                    [640, 672, 704, 736, 768, 800],
                     max_size=1333, sample_style="choice"),
                transforms.RandomBrightness(0.8, 1.2),
                transforms.RandomSaturation(0.8, 1.2),
                transforms.RandomFlip(prob=0.5)]
        data_loader = build_detection_train_loader(cfg,
            mapper=DatasetMapper(cfg, is_train=True, 
                                 augmentations=augs))
        return data_loader

In [None]:
def build_test_dict(datadir, image_dir, ann_file):
    # Test dataset
    name = ann_file.split('.')[0]
    test_dataset_name = f"test_{name}_data"
    test_data_dir = os.path.join(datadir, image_dir)
    test_json_file = os.path.join(datadir, image_dir, ann_file)
    print(test_json_file)

    register_coco_instances(test_dataset_name, {}, test_json_file, test_data_dir)
    test_dict = load_coco_json(test_json_file, test_data_dir,
                    dataset_name=test_dataset_name)
    test_metadata = MetadataCatalog.get(test_dataset_name)
    
    return test_dict, test_metadata


In [None]:
def load_model(datadir,  training_dict, training_metadata, output_dir):
    
    # Most importantly, let's set up a model type
    model_type = "retinanet"
    model_file = "COCO-Detection/retinanet_R_50_FPN_3x.yaml"

    # Create a configuration and set up the model and datasets
    cfg = get_cfg()
    cfg.merge_from_file(model_zoo.get_config_file(model_file))
    cfg.DATASETS.TRAIN = (training_metadata.name,)
    #cfg.DATASETS.TEST = (test_metadata.name,)
    cfg.OUTPUT_DIR = f"{output_dir}/{model_type}_training_output"
    cfg.DATALOADER.NUM_WORKERS = 4
    if model_type == "maskrcnn":
        cfg.MODEL.ROI_HEADS.NUM_CLASSES = len(training_metadata.thing_classes)
        cfg.MODEL.ROI_HEADS.SCORE_THRESH_TEST = 0.2
    if model_type == "retinanet":
        cfg.MODEL.RETINANET.NUM_CLASSES = len(training_metadata.thing_classes)
        cfg.MODEL.RETINANET.SCORE_THRESH_TEST = 0.2
    cfg.MODEL.WEIGHTS = model_zoo.get_checkpoint_url(model_file)  # Initialize weights from Model Zoo
    cfg.MODEL.ROI_HEADS.BATCH_SIZE_PER_IMAGE = 256   # (default: 512)

    # Solver options
    cfg.SOLVER.BASE_LR = 1e-3           # Base learning rate
    cfg.SOLVER.GAMMA = 0.5              # Learning rate decay
    cfg.SOLVER.STEPS = (250, 500, 750)  # Iterations at which to decay learning rate
    cfg.SOLVER.MAX_ITER = 1000          # Maximum number of iterations
    cfg.SOLVER.WARMUP_ITERS = 100       # Warmup iterations to linearly ramp learning rate from zero
    cfg.SOLVER.IMS_PER_BATCH = 1        # Lower to reduce memory usage (1 is the lowest)

    # Set low threshold (for high recall)
    cfg.MODEL.RETINANET.SCORE_THRESH_TEST = 0.2

    ######################
    # Train Model

    # Create an output folder and delete any old files
    os.makedirs(cfg.OUTPUT_DIR, exist_ok=True)
    for fname in os.listdir(cfg.OUTPUT_DIR):
        if "tfevents" in fname:
            os.remove(os.path.join(cfg.OUTPUT_DIR, fname))

    # Create a Trainer using the data augmentations defined above and train the network
    # To avoid creating a custom trainer class, you can use 
    #  `trainer = SimpleTrainer(cfg)` : A no-frills training pipeline with nothing added  
    #  `trainer = DefaultTrainer(cfg)` : The default training pipeline with some data augmentation and hooks
    trainer = Trainer(cfg)

    # Hack to reduce the printing frequency, which defaults to every 20 iterations..
    # There is a cleaner way to do this using `trainer.build_hooks()` but it requires much more effort.
    for hook in trainer._hooks:
        if isinstance(hook, PeriodicWriter):
            hook._period = 50

    # Finally, train the network
    trainer.resume_or_load(resume=True)


    ######################
    # Evaluate model

    # Load weights from the most recent training run
    cfg.MODEL.WEIGHTS = os.path.join(cfg.OUTPUT_DIR, "model_final.pth")

    # Run the official Detectron2 test evaluator, which returns useful metrics
    #if model_type == "maskrcnn":
    #    eval_tasks = ("segm",)
    #elif model_type == "retinanet":
    #    eval_tasks = ("bbox",)
    #evaluator = COCOEvaluator(test_metadata.name,
    #                          tasks=eval_tasks,
    #                          distributed=False,
    #                          output_dir=f"output/detector2/walkthrough/{model_type}_test_output")
    #trainer.test(cfg, trainer.model, evaluators=evaluator)

    return cfg

In [None]:
datadir = '/crimea/wboag/2021/aclum/camera-detection/data/all_known'
train_dict, train_metadata = build_test_dict(datadir, 'non-bh', 'non-bh.json')
test_dict, test_metadata = build_test_dict(datadir, 'bh', 'bh.json')
cfg = load_model(datadir, train_dict, train_metadata, 'output/detector2/nonresidential-1')

# Helper Functions

In [None]:
def plot_annotations(test_images, cfg, image_id, show_name=False, figsize=(34,18)):
    ###############
    # print Image
    
    # Identify the correct image object
    image_obj = None
    for obj in test_images:
        if obj['image_id'] == image_id:
            image_obj = obj
    assert image_obj is not None
        
    path = image_obj['file_name']
    img = cv2.imread(path)
    
    pl.figure(figsize=figsize)
    pl.imshow(img[:, :, ::-1])
    
    print(metadata[image_id]['location'])
    if show_name:
        print(path)


    ################
    # True Boxes
    camera_boxes = []
    bluebox_boxes = []
    for obj in image_obj['annotations']:
        if obj['category_id'] == 0:
            camera_boxes.append(obj['bbox']) # category_id == 0
        else:
            bluebox_boxes.append(obj['bbox']) # category_id == 1
            
    for color,boxes in zip(['pink','blue'],[camera_boxes,bluebox_boxes]):
        for i,box in enumerate(boxes):
            #print(box)
            #color = 'b' # colors[i]

            # Plot rectangle
            x,y,width,height = box
            rectangle = pl.Rectangle((x,y), width, height, ec=color, facecolor='None', lw=8)
            pl.gca().add_patch(rectangle)

        
    ################
    # Predicted Boxes
    predictor = DefaultPredictor(cfg)
    instances = predictor(img)['instances']
    probs   = instances.scores 
    boxes   = instances.pred_boxes 
    classes = instances.pred_classes

    colors = ['red', 'yellow']
    counter = 0
    for j,box in sorted(enumerate(boxes), key=lambda t:probs[t[0]], reverse=True):
        # Only show the 7 highest-probability boxes
        counter += 1
        if counter > 7: break
            
        #color = 'r' # colors[j]
        pred_label = classes[j]
        color = colors[pred_label]
        prob = probs[j]

        # Plot rectangle
        x1,y1,x2,y2 = box.tolist()
        rectangle = pl.Rectangle((x1,y1), x2-x1, y2-y1, ec=color, facecolor='None', lw=1+10*prob)
        pl.gca().add_patch(rectangle)

        # Plot probability
        pl.text(x1, y1*.99, f'{int(prob*100)}%', c=color, fontsize=6+50*prob)
        
    pl.show()
    
#plot_annotations(test_dict, cfg, 199)

In [None]:
def evaluate_overlap(test_dict, cfg, image_id):
    
    # Identify the correct image object
    image_obj = None
    for obj in test_dict:
        if obj['image_id'] == image_id:
            image_obj = obj
    assert image_obj is not None
    
    # True Boxes
    true_boxes = []            
    camera_true = []
    bluebox_true = []
    for obj in image_obj['annotations']:
        x,y,width,height = obj['bbox']
        box = (x,y,x+width,y+height)
        true_boxes.append(box)

        if obj['category_id'] == 1:
            bluebox_true.append(box)   # category_id == 1
        else:
            camera_true.append(box)    # category_id == 0
                
        
    # Predicted Boxes
    path = image_obj['file_name']
    img = cv2.imread(path)
    predictor = DefaultPredictor(cfg)
    instances = predictor(img)['instances']
    predicted_boxes = [box.tolist() for box in instances.pred_boxes]

    classes = instances.pred_classes
    
    camera_predicted = []
    bluebox_predicted = []
        
    for i in range(min(len(predicted_boxes),7)):
        if classes[i] == 0:
            camera_predicted.append(predicted_boxes[i])
        else:
            bluebox_predicted.append(predicted_boxes[i])

    results = {}
    for label, true_boxes, pred_boxes in zip(['bluebox','camera'],[bluebox_true,camera_true],[bluebox_predicted,camera_predicted]):

        #print(label)
        #print(true_boxes)
        #print(pred_boxes)
        
        # Recall
        true_covered = 0
        for tbox in true_boxes:
            #print(tbox)
            tx1,ty1,tx2,ty2 = tbox
            for pbox in pred_boxes:
                #print('\t', pbox)
                px1,py1,px2,py2 = pbox
                # check for overlap: https://stackoverflow.com/questions/20925818/algorithm-to-check-if-two-boxes-overlap
                if tx1<px2 and px1<tx2 and ty1<py2 and py1<ty2:
                    true_covered += 1
                    #print('\t\tHIT!')
                    break
        recall = (true_covered / (len(true_boxes)+1e-9), true_covered, len(true_boxes))

        #print()
        
        # Precision
        pred_covered = 0
        for pbox in pred_boxes:
            #print(pbox)
            px1,py1,px2,py2 = pbox
            for tbox in true_boxes:
                #print('\t', tbox)
                tx1,ty1,tx2,ty2 = tbox
                # check for overlap: https://stackoverflow.com/questions/20925818/algorithm-to-check-if-two-boxes-overlap
                if tx1<px2 and px1<tx2 and ty1<py2 and py1<ty2:
                    pred_covered += 1
                    #print('\t\tHIT!')
                    break
        precision = (true_covered / (len(pred_boxes)+1e-9), true_covered,  len(pred_boxes))

        
        #print()

            
        results[label] = (precision,recall)
        
    return results



In [None]:
def evaluate_model(cfg, test_dict, show_images=False):
    if show_images:
        output = sys.stdout
    else:
        output = io.StringIO()
    
    camera_precisions = []
    camera_recalls = []
    bluebox_precisions = []
    bluebox_recalls = []
    for sample in test_dict:
        image_id = sample['image_id']
        results = evaluate_overlap(test_dict, cfg, image_id)

        camera_precisions.append(results['camera'][0][0])
        camera_recalls.append(   results['camera'][1][0])

        bluebox_precisions.append(results['bluebox'][0][0])
        bluebox_recalls.append(   results['bluebox'][1][0])


        cam_tp = results["camera" ][1][1]
        cam_n  = results["camera" ][1][2]
        bb_tp  = results["bluebox"][1][1]
        bb_n   = results["bluebox"][1][2]
        print(f'\tImage={image_id} Camera Recall=({cam_tp},{cam_n}) BlueBox Recall=({bb_tp},{bb_n})', file=output)
        
        if show_images:
            plot_annotations(test_dict, cfg, image_id)

    print()
    print('mean camera  precision:', np.mean(camera_precisions) , file=output)
    print('mean camera  recall   :', np.mean(camera_recalls)    , file=output)
    print('mean bluebox precision:', np.mean(bluebox_precisions), file=output)
    print('mean bluebox recall   :', np.mean(bluebox_recalls)   , file=output)
    
    if not show_images:
        contents = output.getvalue()
        output.close()
        return contents

# Evaluate Model

In [None]:
out = evaluate_model(cfg, test_dict, show_images=True)