# Active Testing on Box level

data prepared from prepare_box_data.ipynb
results store in ./results/active_test_box_level

In [1]:
import argparse
import datetime
import json
import random
import time
from pathlib import Path
import os, sys
import numpy as np
import torch

from main import build_model_main
from util.slconfig import SLConfig
from datasets import build_dataset
from util.visualizer import COCOVisualizer
from util import box_ops
import pickle
import copy
import random
from util.utils import slprint, to_device
import util.misc as utils
from engine import evaluate
from torch.utils.data import DataLoader
from datasets import build_dataset, get_coco_api_from_dataset

In [2]:
def read_one_image_results(path):
    with open(path, "r") as outfile:
        data = json.load(outfile)
    return data

def write_one_results(path, json_data):
    with open(path, "w") as outfile:
        json.dump(json_data, outfile)
        
def transform_tensor_to_list(l):
    return l.cpu().tolist()

def transform_tensors_to_list(l):
    if torch.is_tensor(l):
        return transform_tensor_to_list(l)
    if isinstance(l, list):
        r = []
        for i in l:
            r.append(transform_tensors_to_list(i))
        return r
    if isinstance(l, dict):
        r = {}
        for k,v in l.items():
            r[k] = transform_tensors_to_list(v)
        return r
    return l

def LURE_weights_for_risk_estimator(weights, N):
    M = weights.size
    if M < N:
        m = np.arange(1, M+1)
        v = (
            1
            + (N-M)/(N-m) * (
                    1 / ((N-m+1) * weights)
                    - 1
                    )
            )
    else:
        v = 1

    return v

def get_one_annotation_values(path, key, file_num = 5000):
    results = []
    for img_idx in range(file_num):
        file_path = os.path.join(path, str(img_idx)+".json")
        data = read_one_image_results(file_path)
        if data[key] != None:
            results.extend(data[key])
        else:
            # print(f"{file} don't have this data")
            result = [-1]*len(data['loss'])
            results.extend(result)
    return np.array(results)

def get_img_idxes(path, file_num = 5000):
    results = []
    for img_idx in range(file_num):
        file_path = os.path.join(path, str(img_idx)+".json")
        data = read_one_image_results(file_path)
        result = [img_idx] * len(data['loss'])
        results.extend(result)
    return np.array(results)

def acquire(expected_loss_inputs, samples_num):
    assert samples_num <= expected_loss_inputs.size
    expected_loss = np.copy(expected_loss_inputs)
    # Log-lik can be negative.
    # Make all values positive.
    if (expected_loss < 0).sum() > 0:
        expected_loss += np.abs(expected_loss.min())
    
    if np.any(np.isnan(expected_loss)):
        logging.warning(
            'Found NaN values in expected loss, replacing with 0.')
        logging.info(f'{expected_loss}')
        expected_loss = np.nan_to_num(expected_loss, nan=0)
    pick_sample_idxs = np.zeros((samples_num), dtype = int)
    idx_array = np.arange(expected_loss.size)
    weights = np.zeros((samples_num), dtype = np.single)
    uniform_clip_val = 0.2
    for i in range(samples_num):
        expected_loss /= expected_loss.sum()
        # clip all values less than 10 percent of uniform propability
        expected_loss = np.maximum(uniform_clip_val * 1/expected_loss.size, expected_loss)
        expected_loss /= expected_loss.sum()
        sample = np.random.multinomial(1, expected_loss)
        cur_idx = np.where(sample)[0][0]
        # cur_idx = np.random.randint(expected_loss.size)
        pick_sample_idxs[i] = idx_array[cur_idx]
        weights[i] = expected_loss[cur_idx]
        selected_mask = np.ones((expected_loss.size), dtype=bool)
        selected_mask[cur_idx] = False
        expected_loss = expected_loss[selected_mask]
        idx_array = idx_array[selected_mask]
    return pick_sample_idxs, weights

In [3]:
split = "val"
data_path = "./data/5_scale_31/" + split + "/data/"
annotation_path = "./data/5_scale_31/" + split + "/box_annotation/"
result_json_path = "./results/active_test_box_level/"
sample_size_set = [50, 100, 150, 200, 250, 500, 750, 1000, 1500, 2000, 3000]
random_seed_set = [4519, 9524, 5901, 1028, 6382, 5383, 5095, 7635,  890,  608]

In [4]:
losses = get_one_annotation_values(annotation_path, "loss")
label_idxes = get_one_annotation_values(annotation_path, "matched_target_indexes")
img_idxes = get_img_idxes(annotation_path)

## Random Sample risk estimation

In [5]:
def run_one_random_sample_risk_estimator(true_losses, seed, samples_num):
    torch.manual_seed(seed)
    np.random.seed(seed)
    random.seed(seed)
    perm = np.random.permutation(true_losses.size)
    pick_sample_idxs = perm[:samples_num]
    sampled_true_losses = true_losses[pick_sample_idxs]
    return sampled_true_losses.mean()

In [6]:
# The required label might be repeated
file_path = result_json_path + "random_sample_repeated_R50_31_10_runs.json"
json_object = {}
for sample_size in sample_size_set:
    for seed in random_seed_set:
        result = {"active_test_type": "random sample repeated", "sample_size": sample_size}
        loss_risk = run_one_random_sample_risk_estimator(losses, seed, sample_size)
        result["loss"] = loss_risk
        json_object[len(json_object)] = result
with open(file_path, "w") as outfile:
    json.dump(json_object, outfile)

In [7]:
def run_one_random_sample_without_repeated_risk_estimator(true_losses, img_idxes, label_idxes, seed, samples_num):
    torch.manual_seed(seed)
    np.random.seed(seed)
    random.seed(seed)
    assert true_losses.size == img_idxes.size
    assert true_losses.size == label_idxes.size
    idx_array = np.arange(true_losses.size)
    count = 0
    choose_loss = []
    choose_box_idx = {}
    while count < samples_num:
        idx = np.random.randint(idx_array.size, size=1)
        real_idx = idx_array[idx]
        choose_loss.extend(true_losses[real_idx].tolist())
        img_idx = int(img_idxes[real_idx])
        lable_idx = int(label_idxes[real_idx])
        if img_idx in choose_box_idx:
            if lable_idx in choose_box_idx[img_idx]:
                continue
            choose_box_idx[img_idx].append(lable_idx)
        choose_box_idx[img_idx] = [lable_idx]
        count += 1
    return np.array(choose_loss).mean()

In [11]:
# The required label won't be repeated
file_path = result_json_path + "random_sample_R50_31_10_runs.json"
json_object = {}
for sample_size in sample_size_set:
    for seed in random_seed_set:
        result = {"active_test_type": "random sample", "sample_size": sample_size}
        loss_risk = run_one_random_sample_without_repeated_risk_estimator(losses, img_idxes, label_idxes, seed, sample_size)
        result["loss"] = loss_risk
        json_object[len(json_object)] = result
with open(file_path, "w") as outfile:
    json.dump(json_object, outfile)

## Whole data set risk estimation

In [9]:
def get_whole_data_set_risk_estimator(true_losses):
    return true_losses.mean()

In [10]:
file_path = result_json_path + "None_R50_31.json"
result = {"active_test_type": "None", "sample_size": losses.size}
result["loss"] = get_whole_data_set_risk_estimator(losses)
json_object = {}
json_object[0] = result
with open(file_path, "w") as outfile:
    json.dump(json_object, outfile)