### Create anomalies evaluation (runtime: ~ 2.5h) (Inspired by: https://github.com/daniel-bogdoll/supervised_unsupervised_anomaly/blob/main/anomaly_detection/compare_and_cluster/map_anomalies_on_image.py)

In [4]:
import numpy as np
import os
import cv2
import matplotlib.pyplot as plt
from multiprocessing import Pool
import json
import yaml
import re
from tqdm import tqdm

#### Set paths and load config
<div class="alert alert-block alert-warning">
<b>ToDo:</b> Set "coda_root" to your <b>working directory</b>
</div>

In [5]:
coda_root = f'/disk/ml/own_datasets/CODA'

In [6]:
model_root = os.path.realpath('../../model_contradictions/')
supervised_unsupervised_root = os.path.join(model_root, 'supervised_unsupervised_anomaly')
config_filename = os.path.join(supervised_unsupervised_root, 'anomaly_detection/config/config_paths.yaml')
json_corner_cases = os.path.join(coda_root, 'corner_case.json')

config = yaml.load(open(config_filename), Loader=yaml.FullLoader)

with open(json_corner_cases, 'r') as f:
    data_cornercases = json.load(f)
    
annotations = data_cornercases['annotations']

#### Functions to create images evaluation

Read calib file

In [7]:
def read_calib_file(filepath, data_dic):    
    """
    Inspired by: https://github.com/utiasSTARS/pykitti/blob/master/pykitti/utils.py
    """
    with open(filepath, 'r') as f:
        for line in f.readlines():
            line = line.rstrip()

            if len(line) == 0: continue
            key, value = line.split(':', 1)

            data_dic[key] = np.array([float(x) for x in value.split()])
    return data_dic 

Create velo_to_cam2

In [8]:
def project_velo_to_cam2(calib):
    """
    Inspired by: https://github.com/darylclimb/cvml_project/blob/master/projections/lidar_camera_projection/utils.py
    """
    P_velo2cam_ref = np.vstack((calib['Tr'].reshape(3, 4), np.array([0., 0., 0., 1.])))  # velo2ref_cam
    P_rect2cam2 = calib['P2'].reshape((3, 4))
    proj_mat = P_rect2cam2 @ P_velo2cam_ref
    
    return proj_mat, P_velo2cam_ref

Translate points into 2D image space

In [9]:
def project_to_image(points, proj_mat):
    """
    Inspired by: https://github.com/darylclimb/cvml_project/blob/master/projections/lidar_camera_projection/utils.py
    """
    num_pts = points.shape[1]

    points = np.vstack((points, np.ones((1, num_pts))))
    points = proj_mat @ points
    points[:2, :] /= points[2, :]
    
    return points[:2, :]

Get colors for each point

In [10]:
def get_labels_and_colors(anomaly_labels, color_dict):
    if color_dict:
        max_sem_key = 0
        for key, data in color_dict.items():
            if key + 1 > max_sem_key:
                max_sem_key = key + 1
        color_lut = np.zeros((max_sem_key + 100, 3), dtype=np.float32)
        for key, value in color_dict.items():
            color_lut[key] = np.array(value, np.float32)
    else:
        quit()

    sem_label = np.zeros((0, 1), dtype=np.int16)
    sem_label_color = np.zeros((0, 3), dtype=np.float32)

    sem_label = anomaly_labels
    sem_label_color = color_lut[sem_label]
    sem_label_color = sem_label_color.reshape((-1, 3))

    #check
    assert(sem_label.shape[0] == sem_label_color.shape[0])

    return sem_label_color

Create image with projected points

In [11]:
def get_point_camerafov(pts_velo, calib, img, img_width, img_height, label_color, dataset):
    """
    Inspired by: https://github.com/darylclimb/cvml_project/blob/master/projections/lidar_camera_projection/utils.py
    """
    # projection matrix (project from velo2cam2)
    pts_velo_xyz = pts_velo[:, :3]
    proj_velo2cam2, P_velo2cam_ref = project_velo_to_cam2(calib)

    # apply projection
    pts_2d = project_to_image(pts_velo_xyz.transpose(), proj_velo2cam2)
    

    inds = np.where((pts_2d[0, :] < img_width) & (pts_2d[0, :] >= 0) &
                (pts_2d[1, :] < img_height) & (pts_2d[1, :] >= 0) &
                (pts_velo_xyz[:, 0] > 0)
                )[0]
    
    if dataset == 'nuscenes':
        inds = np.where((pts_2d[0, :] < img_width) & (pts_2d[0, :] >= 0) &
                    (pts_2d[1, :] < img_height) & (pts_2d[1, :] >= 0) &
                    (pts_velo_xyz[:, 0] > 1)
                    )[0]
    elif dataset == 'once':      
        inds = np.where((pts_2d[0, :] < img_width) & (pts_2d[0, :] >= 0) &
                    (pts_2d[1, :] < img_height) & (pts_2d[1, :] >= 0) &
                    (pts_velo_xyz[:, 0] > 0)
                    )[0]

    # Filter out pixels points
    imgfov_pc_pixel = pts_2d[:, inds]   #xy

    #Filter semantic color
    sem_label_color_velo = label_color[inds, :]
    sem_label_color_velo = np.array(sem_label_color_velo).astype(np.int16)
    
    for i in range(imgfov_pc_pixel.shape[1]):
        color_label = np.array(sem_label_color_velo[i, :])
        b=int(color_label[0])
        g=int(color_label[1])
        r=int(color_label[2])
        cv2.circle(img, (int(np.round(imgfov_pc_pixel[0, i])),
                int(np.round(imgfov_pc_pixel[1, i]))),
                2, color=(b,g,r), thickness=-1)
        
    return img

Save image

In [12]:
def save_img_to_file(img, path_to_save, seq):
    if not os.path.exists(os.path.join(path_to_save, seq)):
        os.makedirs(os.path.join(path_to_save, seq))
    save_path = os.path.join(path_to_save, seq, seq + '.png')
    cv2.imwrite(save_path, cv2.cvtColor(img, cv2.COLOR_RGB2BGR))

Save counts, points, and labels

In [30]:
def save_counts_points_labels(path_save, anno, points, labels, counts):
    if not os.path.exists(os.path.join(path_save, anno)):
        os.makedirs(os.path.join(path_save, anno))
    with open(path_save + '/counts.txt', 'a') as f:
        counts_string = ','.join(map(str, counts))
        f.write(counts_string + '\n')
    points.astype('float32').tofile(os.path.join(path_save, anno, anno + '_points.bin'))
    labels.astype('float32').tofile(os.path.join(path_save, anno, anno + '_labels.bin'))

Combine ground truth and detection method labels

In [14]:
def combine_gt_and_detection_method_labels(gt_labels, dm_labels):
    points = []
    labels = []
    used_points = [False] * len(dm_labels)
    for o_point in gt_labels:
        for index, n_point in enumerate(dm_labels):
            if o_point[0] + 0.001 >= n_point[0] and o_point[0] - 0.001 <= n_point[0] and o_point[1] + 0.001 >= n_point[1] and o_point[1] - 0.001 <= n_point[1] and o_point[2] + 0.001 >= n_point[2] and o_point[2] - 0.001 <= n_point[2]:
                points.append(o_point[:3])
                used_points[index] = True
                if o_point[4] == -1:
                    if n_point[3] == 1:
                        labels.append(-11)
                    elif n_point[3] == 2:
                        labels.append(-12)
                    elif n_point[3] == 3:
                        labels.append(-13)
                    elif n_point[3] == 4:
                        labels.append(-14)
                elif o_point[4] == 1:
                    if n_point[3] == 1:
                        labels.append(11)
                    elif n_point[3] == 2:
                        labels.append(12)
                    elif n_point[3] == 3:
                        labels.append(13)
                    elif n_point[3] == 4:
                        labels.append(14)
                break
        else:
            points.append(o_point[:3])
            if o_point[4] == -1:
                labels.append(-10)
            elif o_point[4] == 1:
                labels.append(10)    
    
    points = np.array(points).reshape((-1, 3)) 
    labels = np.array(labels).reshape((-1, 1))

    return points, labels

Create the counts for each label

In [15]:
def create_counts(labels, gt_labels, dm_labels):
    c_11 = np.count_nonzero(labels == -11)
    c_12 = np.count_nonzero(labels == -12)
    c_13 = np.count_nonzero(labels == -13)
    c_14 = np.count_nonzero(labels == -14)
    c11 = np.count_nonzero(labels == 11)
    c12 = np.count_nonzero(labels == 12)
    c13 = np.count_nonzero(labels == 13)
    c14 = np.count_nonzero(labels == 14)
    c_10 = np.count_nonzero(labels == -10)
    c10 = np.count_nonzero(labels == 10)
    c1 = 0
    c2 = 0   
    c3 = 0  
    c4 = 0
    tp = c13 + c14
    fp = c_13 + c_14 + c3 + c4
    fn = c11 + c12 + c10
    tn = c_11 + c_12 + c_10 + c1 + c2
    total = labels.shape[0]
    gt_total = gt_labels.shape[0]
    dm_label_total = dm_labels.shape[0]
    
    return c_11, c_12, c_13, c_14, c11, c12, c13, c14, c_10, c10, c1, c2, c3, c4, tp, fp, fn, tn, total, gt_total, dm_label_total

Overall combination of all functions above

In [35]:
def create_evaluation_images(path_velo, path_labels, path_img, path_gt_label, path_calibrations, path_save, anno, frame_img, dataset):
    #open point cloud
    pc_velo_camerafov = np.fromfile(path_velo, dtype=np.float32).reshape((-1,3))
    anomaly_labels = np.fromfile(path_labels, dtype=np.int16).reshape((-1))
    zero_column = np.zeros((1, pc_velo_camerafov.shape[0])) 
    dm_labels = np.insert(pc_velo_camerafov, 3, anomaly_labels, axis=1)   
    dm_labels = np.insert(dm_labels, 4, zero_column, axis=1)
    
    gt_labels = np.fromfile(path_gt_label, dtype=np.float32).reshape((-1,4))
    zero_column = np.zeros((1, gt_labels.shape[0]))    
    gt_labels = np.insert(gt_labels, 3, zero_column, axis=1)
    
    points, labels = combine_gt_and_detection_method_labels(gt_labels, dm_labels)
    
    counts = create_counts(labels, gt_labels, dm_labels)
    
    save_counts_points_labels(path_save, anno, points, labels, counts)
        
    if points.shape[0] == 0:
        return
    
    color_dict = {-1:[0,0,0], 13:[34,139,34], 14:[34,139,34], -13:[178,34,34], -14:[178,34,34], 3:[178,34,34], 4:[178,34,34], 11:[225,225,0], 12:[225,225,0], 10:[225,225,0], -11:[65,105,225], -12:[65,105,225], -10:[65,105,225], 1:[65,105,225], 2:[65,105,225]}
    sem_label_color = get_labels_and_colors(labels, color_dict)
    
    rgb = cv2.cvtColor(cv2.imread(os.path.join(path_img)), cv2.COLOR_BGR2RGB)
    img_height, img_width, img_channel = rgb.shape

    calib_dic = {}
    calib = read_calib_file(path_calibrations, calib_dic)

    img = get_point_camerafov(points, calib, rgb, img_width, img_height, sem_label_color, dataset)
    
    if points.shape[0] != 0:
        save_img_to_file(img, path_save, anno)


Main function to set paths for the individual anomaly

In [32]:
def main(anno):
    seq = '{0:04d}'.format(int(anno['image_id']))
    anno = '{0:04d}'.format(int(anno['id']))
    
    path_dataset = config['path_dataset']
    path_inference = config['path_inference']
    path_anomalies_sup_self = os.path.join(path_inference, 'anomalies_sup_self')
    path_original_labels = os.path.join(path_inference, 'original_labels_annotation')
    path_groundtruth_eval = os.path.join(path_inference, 'groundtruth_eval_all_annotation')

    if not os.path.exists(os.path.join(path_anomalies_sup_self, seq, 'anomaly_labels')):
        if not os.path.exists(os.path.join(path_groundtruth_eval, anno)):
            os.makedirs(os.path.join(path_groundtruth_eval, anno))
        with open(path_groundtruth_eval + '/counts.txt', 'a') as f:
            f.write('%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s\n' % (0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0))
        return

    frames = os.listdir(os.path.join(path_anomalies_sup_self, seq, 'anomaly_labels'))
    frames = sorted(frames, key=lambda x: int(re.findall(r'\d+', x)[-1]))
    
    once = r'^\d{6}_\d{13}_\d{1,2}\.bin$'
    
    
    for frame in range(len(frames)):
        frame_label = frames[frame]
        if int(frame_label.split('.')[0].split('_')[2]) != 8:
            continue
        dataset = ''
        if re.match(once, frame_label):
            dataset = 'once'
        elif 'kitti' in frame_label:
            dataset = 'kitti'
        elif 'nuscenes' in frame_label:
            dataset = 'nuscenes'
        frame_image = frames[frame].split('.')[0] +'.png'
        
        path_calib_frame = os.path.join(path_dataset, seq, 'calib.txt')
        path_pc_velo_camerafov_frame = os.path.join(path_anomalies_sup_self, seq, 'pc_velo_camerafov', frame_label)
        path_anomaly_labels_frame = os.path.join(path_anomalies_sup_self, seq, 'anomaly_labels', frame_label)
        path_image_frame = os.path.join(path_dataset, seq, 'image_2', frame_image)
        path_original_label_frame = os.path.join(path_original_labels, anno + '.bin')

        create_evaluation_images(path_pc_velo_camerafov_frame, path_anomaly_labels_frame, path_image_frame, path_original_label_frame, path_calib_frame, path_groundtruth_eval, anno, frame_image, dataset)


#### Call main function for all anomalies

In [None]:
for anno in tqdm(annotations):
        main(anno)