# Imports

In [3]:
#import utils
import cv2
import imutils
import glob
import os
import numpy as np
import json
import matplotlib.pyplot as plt
from tqdm import tqdm
%matplotlib inline
import skimage
from sklearn.cluster import KMeans
from collections import Counter
from scipy.spatial import distance

The evaluation script was copied in rather than imported. Please note that we left the code as it was and made NO adjustments to it. 

In [4]:
import json
import numpy as np

from typing import Dict, NewType, List, Tuple, Union
import pathlib

Path = Union[pathlib.Path, str]
BoundingBox = NewType("BoundingBox", Tuple[int, int, int, int])
YoloBox = NewType("YoloBox", Tuple[float, float, float, float])
Shape = NewType("Shape", Tuple[int, int])


def compute_AP(detections: Dict[str, List[BoundingBox]],
               targets: Dict[str, List[BoundingBox]]) -> float:
    """ Compute the average precision.
    Params:
        detections: list of detected bounding boxes within each sample
        targets: list of ground truth bounding boxes within each sample
    """
    # define the IoU threshold sequence
    thresholds = np.arange(0.1, 1.0, 0.1)

    precision = np.zeros_like(thresholds)
    recall = np.zeros_like(thresholds)

    iou_scores = [compute_IoU(detections[k], targets[k])
                  for k in targets.keys()]

    for i, iou_th in enumerate(thresholds):
        true_positives = sum(
            [np.sum(np.any(iou > iou_th, 1)) for iou in iou_scores])
        false_positives = sum(
            [np.sum(~np.any(iou > iou_th, 1)) for iou in iou_scores])
        false_negatives = sum(
            [np.sum(~np.any(iou > iou_th, 0)) for iou in iou_scores])
        
        if true_positives + false_positives:
            precision[i] = true_positives/(true_positives+false_positives)
        else:
            precision[i] = 0
        recall[i] = true_positives/(true_positives+false_negatives)

    # compute average precision
    recall = np.append(recall, 0)
    ap = np.sum((recall[:-1] - recall[1:]) * precision)
    return ap


def compute_IoU(detections: List[BoundingBox],
                targets: List[BoundingBox]) -> np.array:
    """ Compute the intersection of union (IoU) score.
    Params:
        detections: detected bounding boxes
        targets: ground truth bounding boxes
    Return:
        Array of IoU score between each pair of detected and target bounding
        box, where the detections are along the rows and the targets along
        the columns.
    """
    iou = np.empty((len(detections), len(targets)))

    for i, d in enumerate(detections):
        dx, dy, dw, dh = d
        for j, t in enumerate(targets):
            tx, ty, tw, th = t
            x = max(dx, tx)
            y = max(dy, ty)
            xx = min(dx + dw, tx + tw)
            yy = min(dy + dh, ty + th)
            intersection_area = max(0, xx-x) * max(0, yy-y)
            iou[i, j] = intersection_area / (dw*dh + tw*th - intersection_area)
                        
    return iou


def read_bb(file: Path) -> Dict[str, List[BoundingBox]]:
    """ Read bounding boxes from json file.
    """
    with open(file) as f:
        js = json.load(f)
    return js


def write_bb(file: Path, bbs: Dict[str, List[BoundingBox]]) -> None:
    """ Write bounding boxes to json file.
    """
    with open(file, "w") as f:
        json.dump(bbs, f)

# Image merging

### Merging hyperparameters

In [5]:
alpha = 0.75
method = 'outl'

In [6]:
class ImageMerger:
    """
    ImageMerger can merge images from a dataset including images collected by a drone with the purpose of detecting 
    moving people in the images.
    """
    def __init__(self, data_dir, mask_file):
        """
        :param data_dir: path to the directory where the data is located
        :param mask_file: path of a mask which removes unneccesary text from the images
        """
        
        # Get all directories inside the data_dir
        self.data_dir = data_dir
        self.dirs = os.listdir(data_dir)
        self.mask = np.array(cv2.imread(mask_file)) // 255
        
        # Store the merged images
        self.merged_images = {}
        
    def load_images(self):
        """
        Loads the images and homographies from one directory at a time and yields them
        """
        
        for a_dir in self.dirs:
            if a_dir == 'labels.json':
                continue
            input_images = {}
            
            # Load all images in the train directory
            image_dir_file_path = os.path.join(os.path.join(self.data_dir, a_dir), '*.png')
            images = glob.glob(image_dir_file_path)
            
            # Copy all the raw images to the input images dict
            for im in images:
                file = os.path.basename(im)
                img_name = os.path.splitext(file)[0]
                src = cv2.cvtColor(cv2.imread(im), cv2.COLOR_BGR2RGB)
                src = self.apply_mask(src)
                input_images[img_name] = src
            
            # Load the homographies json file
            homographies_file_path = os.path.join(os.path.join(self.data_dir, a_dir), 'homographies.json')
            with open(homographies_file_path, 'rb') as f:
                homographies = json.load(f)
            
            yield a_dir, input_images, homographies
        
    def apply_mask(self, image):
        """
        Applys the provided mask on image
        :param image: image file which will be treated as np.array
        """

        return image * self.mask
    
    def merge_images(self, alpha=0.75, axis=0, method='ltr', debug=False):
        """
        Merges all the images given the homographies which are retrieved from load_data. 
        :param alpha: Weighting for merging the images. The image to be merged on will be considered with a weight a alpha,
        the second image with a weight of 1-alpha.
        :param axis: defines the axis along which the images shall be merged. 0: camera axis, 1: time axis
        :param method: defines in which order the images shall be merged:
            - ltr:  The images are merged from left to right along the camera axis or from the first in time (index 0) 
                    to the last in time (index 6)
            - rtl:  The images are merged from right to left along the camera axis or from the last in time (index 6) 
                    to the first in time (index 0)
            - outl: The images are merged from the center out switching between the left and right side of the center 
                    (along camera axis)/previous and next in time (along time axis) starting with the left side/previous 
                    in time. 
            - outr: The images are merged from the center out switching between the left and right side of the center 
                    (along camera axis)/previous and next in time (along time axis) starting with the right side/next in 
                    time.
            - inl:  The images are merged from the outside inward switching between the left and right side of the center 
                    (along camera axis)/previous and next in time (along time axis) starting with the left side/previous 
                    in time. 
            - inr:  The images are merged from the outside inward switching between the left and right side of the center 
                    (along camera axis)/previous and next in time (along time axis) starting with the right side/next in 
                    time.
        :param debug: if True, results are printed to be able to debug 
        """
        
        assert method in ['ltr', 'rtl', 'outl', 'outr', 'inl', 'inr']
        
        # Define the keys and indices depending on axis and method
        keys = []
        indices = []
        if axis == 0:
            indices = range(0, 7)
            if method == 'ltr':
                keys = ['-B05', '-B04', '-B03', '-B02', '-B01', '-G01', '-G02', '-G03', '-G04', '-G05']
            elif method == 'rtl': 
                keys = ['-G05', '-G04', '-G03', '-G02', '-G01', '-B01', '-B02', '-B03', '-B04', '-B05']
            elif method == 'outl':
                keys = ['-B01', '-G01', '-B02', '-G02', '-B03', '-G03', '-B04', '-G04', '-B05', '-G05']
            elif method == 'outr':
                keys = ['-G01', '-B01', '-G02', '-B02', '-G03', '-B03', '-G04', '-B04', '-G05', '-B05']
            elif method == 'inl':
                keys = ['-B05', '-G05', '-B04', '-G04', '-B03', '-G03', '-B02', '-G02', '-B01', '-G01']
            elif method == 'inr':
                keys = ['-G05', '-B05', '-G04', '-B04', '-G03', '-B03', '-G02', '-B02', '-G01', '-B01']
        elif axis == 1:
            indices = ['-B05', '-B04', '-B03', '-B02', '-B01', '-G01', '-G02', '-G03', '-G04', '-G05']
            if method == 'ltr':
                keys = [0, 1, 2, 3, 4, 5, 6]
            elif method == 'rtl': 
                keys = [6, 5, 4, 3, 2, 1, 0]
            elif method == 'outl':
                keys = [3, 2, 4, 1, 5, 0, 6]
            elif method == 'outr':
                keys = [3, 4, 2, 5, 1, 6, 0]
            elif method == 'inl':
                keys = [0, 6, 1, 5, 2, 4, 3]
            elif method == 'inr':
                keys = [6, 0, 5, 1, 4, 2, 3]
                
        # Do the merging by looping through all indices and keys
        for a_dir, images, homographies in self.load_images():
            # Loop through the indices
            for i in indices: 
                # Load the base image depending on the method
                base_image_key = ''
                if axis == 0:
                    base_image_key = str(i) + keys[0]
                elif axis == 1:
                    base_image_key = str(keys[0]) + i
                    
                # Define the merged image and set it to the base image as the start
                merged_image = images[base_image_key]
                
                if debug:
                    print('directory: ', a_dir)
                    print('base key: ', base_image_key)

                for k in keys:
                    # Get the key given the index and k
                    key = ''
                    if axis == 0:
                        key = str(i) + k
                    elif axis == 1:
                         key = str(k) + i
                        
                    # Load the image and the corresponding homography matrix
                    im = images[key]
                    homography = np.array(homographies[key])
                    
                    # Warp the perspective (i.e. transform the current image into the perspective of the base image (-B01))
                    im_warped = cv2.warpPerspective(im, homography, im.shape[:2])
                    # Merge the images
                    merged_image = cv2.addWeighted(merged_image, alpha, im_warped, 1 - alpha, 0.0)

                self.merged_images[a_dir + '-' + str(i)] = merged_image
                
                if debug:
                    plt.imshow(merged_image)
                    plt.show()

In [7]:
image_merger = ImageMerger('../4_Notebooks/data_WiSAR/data/validation/', '../4_Notebooks/data_WiSAR/data/mask.png')
image_merger.merge_images(axis=0, method=method, debug=False, alpha=alpha)
valid_images = image_merger.merged_images

FileNotFoundError: [WinError 3] Das System kann den angegebenen Pfad nicht finden: '../4_Notebooks/data_WiSAR/data/validation/'

In [None]:
image_merger = ImageMerger('../4_Notebooks/data_WiSAR/data/test/', '../4_Notebooks/data_WiSAR/data/mask.png')
image_merger.merge_images(axis=0, method=method, debug=False, alpha=alpha)
test_images = image_merger.merged_images

# Import of ground truth of validation set

In [None]:
ground_truth = {}

with open(os.path.join('../4_Notebooks/data_WiSAR/data/validation/', 'labels.json'), 'r') as file:
    ground_truth = json.load(file)

# Approach leading to best precision on validation set

As described in the method pdf, for the approach that led to the best precision on the validation set, we use the first, center and last time step. 

### Hyperparameters

In [None]:
max_area = 3
threshold = 5

### Anomaly images
The anomaly images are obtained using the Mahalanobis distance and thresholding the distances. 

In [None]:
def mahalanobis(image, threshold=5):
    array = image.copy()
    mean_vector = np.mean(array, axis=(0, 1))  
    
    im_re = array.reshape(-1, 3).astype(np.float64)
    im_re -= im_re.mean(0, keepdims=True)
    im_re_cov = 1/(im_re.shape[0]-1) * im_re.T @ im_re
        
    cov = np.linalg.inv(im_re_cov)
            
    am = array - mean_vector
    amc = am @ cov
    distances = np.sqrt(np.einsum('ijk,ijk->ij', amc, am))
    
    anomaly = np.zeros_like(distances)   
    anomaly[distances > threshold] = 1
        
    return distances, anomaly

In [None]:
def clear_anomaly(images, key, show_images=False, threshold=5):
    image_0 = images[key + '-0']
    distances_0, anomaly_0 = mahalanobis(image_0, threshold=threshold)

    image_6 = images[key + '-6']
    distances_6, anomaly_6 = mahalanobis(image_6, threshold=threshold)
    
    if show_images:
        plt.figure(figsize=(10.6, 10.6))
        plt.imshow(image_0)
        plt.figure(figsize=(10.6, 10.6))
        plt.imshow(anomaly_0)
        plt.show()

        plt.figure(figsize=(10.6, 10.6))
        plt.imshow(image_6)
        plt.figure(figsize=(10.6, 10.6))
        plt.imshow(anomaly_6)
        plt.show()

    both = anomaly_0 * anomaly_6

    image_3 = images[key + '-3']
    distances_3, anomaly_3 = mahalanobis(image_3, threshold=threshold)
        
    if show_images:
        
        plt.figure(figsize=(10.6, 10.6))
        plt.imshow(image_3)
        plt.figure(figsize=(10.6, 10.6))
        plt.imshow(anomaly_3)
        plt.show()

    contours_both, _ = cv2.findContours(both.astype('uint8'), cv2.RETR_TREE, cv2.CHAIN_APPROX_NONE)
    contours_image_3, _ = cv2.findContours(anomaly_3.astype('uint8'), cv2.RETR_TREE, cv2.CHAIN_APPROX_NONE)

    # Remove tiny areas
    remaining_cnts = []
    for cnt in contours_image_3:
        area = cv2.contourArea(cnt)

        if area <= 10:
            x,y,w,h = cv2.boundingRect(cnt)
            anomaly_3[y:y+h, x:x+w] = 0
        else:
            remaining_cnts.append(cnt)

    # Get close contours
    min_distance = 20
    close_contours = []
    for i in range(len(remaining_cnts) - 1):
        for j in range(i + 1, len(remaining_cnts)):
            for pt in remaining_cnts[j]:
                r_cnt = remaining_cnts[i].reshape(-1, 2)
                diff = r_cnt - pt
                dist = np.sqrt(diff[:, 0] ** 2 + diff[:, 1] ** 2)
                if dist.min() < min_distance:
                    close_contours.append((i, j))
                    break
        
    def remove_close_cnts(i):
        for j, (m, n) in enumerate(close_contours):
            if i == m:
                cv2.drawContours(mask, [remaining_cnts[n]], -1, 0, -1)
                close_contours.remove(close_contours[j])
                remove_close_cnts(n)
            elif i == n:
                cv2.drawContours(mask, [remaining_cnts[m]], -1, 0, -1)
                close_contours.remove(close_contours[j])
                remove_close_cnts(m)

    # Remove all contours that seem to be static and create a mask
    mask = np.ones_like(anomaly_3)
    i = 0
    for i, r_cnt in enumerate(remaining_cnts):
        for cnt in contours_both:
            cnt = cnt.reshape(-1, 2)
            for x, y in cnt:
                if cv2.pointPolygonTest(r_cnt, (float(x), float(y)), True) >= 0:
                    cv2.drawContours(mask, [r_cnt], -1, 0, -1)
                    remove_close_cnts(i)
                    break
    
    # Return the masked anomaly image of timestep 3
    return anomaly_3 * mask

def draw_bounding_boxes(anomaly_image, images, key, show_images=False, max_area=3, add_ground_truth=False):
    image = images[key + '-3'].copy()
    kernel = np.ones((10, 10),np.uint8)
    anomaly_image = cv2.morphologyEx(anomaly_image.astype('uint8'), cv2.MORPH_CLOSE, kernel)
    
    contours, _ = cv2.findContours(anomaly_image.astype('uint8'), cv2.RETR_TREE, cv2.CHAIN_APPROX_NONE)
    
    bb_vals = []
        
    for cnt in contours:
        area = cv2.contourArea(cnt)
        if area >= 20:
            uncertainty = min(int(max(750 / area, 1)), max_area)
            
            rect = cv2.boundingRect(cnt)
            x,y,w,h = rect
            #print(x, y, w, h)
            w, h = w * uncertainty, h * uncertainty
            x, y = x - w // 2, y - h // 2 
            #print(x, y, w, h)
            
            roi = images[key + '-3'][y:y+h, x:x+w]
            
            #plt.figure(figsize=(10.6, 10.6))
            #plt.imshow(roi)
            #plt.show()
            
            """
            print(mean, std)
            print(mean + std)
            print(roi.mean(axis=(0, 1)), roi.std(axis=(0, 1)))
            print(roi.mean(axis=(0, 1)) > mean + std)
            print(roi.mean(axis=(0, 1)) < mean - std)
            print(roi.mean(axis=(0, 1)) + roi.std(axis=(0, 1)) > mean + std)
            print(roi.mean(axis=(0, 1)) - roi.std(axis=(0, 1)) < mean - std)"""
            
            m = roi.mean(axis=(0, 1))
            s = roi.std(axis=(0, 1))
            
            
            if (m + s > mean + std).all() or (m - s < mean - std).all():
                bb_vals.append([x, y, w, h])
                if show_images:
                    image = cv2.rectangle(image, (x,y), (x+w,y+h), (0, 255, 0), 2)
            elif np.sum(m + s > mean + std) == 2:
                tolerance = 1
                if (m + s + tolerance > mean + std).all(): 
                    bb_vals.append([x, y, w, h])
                            
                    if show_images:
                        image = cv2.rectangle(image, (x,y), (x+w,y+h), (0, 255, 0), 2)
    
    if show_images:
        plt.figure(figsize=(10.6, 10.6))
        
        if add_ground_truth and (key + '-3') in valid_images:
            for box in ground_truth[key]:
                start_point = (box[0], box[1])
                end_point = (box[0] + box[3], box[1] + box[2])

                color = (255, 0, 0)

                thickness = 2
                image = cv2.rectangle(image, start_point, end_point, color, thickness)
                
        plt.imshow(image)
        plt.show()
        
    return bb_vals

In [None]:
def analyze_images(images):
    keys = images.keys()
    means = np.zeros(shape=(len(keys), 3))   
    stds = np.zeros_like(means)

    for i, key in enumerate(keys): 
        image = images[key]
        means[i] = np.mean(image, axis=(0, 1))
        stds[i] = np.std(image, axis=(0, 1))

    mean = means.mean(axis=0)
    std = stds.mean(axis=0)
    
    return mean, std

    print('Mean: ', mean, ', Standard deviation: ', std)

### Predictions on validation set

In [None]:
valid_strings = {0: "valid-1-0", 1: "valid-1-1", 2: "valid-1-2", 3: "valid-1-3", 4: "valid-1-4", 5: "valid-1-5",
                 6: "valid-1-6", 7: "valid-2-0", 8: "valid-2-1", 9: "valid-2-2", 10: "valid-2-3"}

bb_pred_dict = {}

mean, std = analyze_images(valid_images)

for j in valid_strings:
    print(valid_strings[j])
    cleared_image = clear_anomaly(valid_images, valid_strings[j], show_images=False, threshold=threshold)
    bb_pred_dict[valid_strings[j]] = draw_bounding_boxes(cleared_image, valid_images, valid_strings[j], show_images=True, add_ground_truth=True)

result = str(bb_pred_dict)
result = result.replace("'", "\"")
print(result)
with open("val.json","w") as file:
    file.write(result)

### Precision on validation set

In [None]:
import os
from typing import Dict, List

def evaluate(detections: Dict[str, List[BoundingBox]],
             targets: Dict[str, List[BoundingBox]]) -> float:
    return compute_AP(detections, targets)


# load the detections
detections = read_bb("val.json")
# load the targets
targets = read_bb(os.path.join('../4_Notebooks/data_WiSAR/data/validation/', 'labels.json'))

ap = evaluate(detections, targets)
print(f"Average precision={ap:.5f} on validation set.")

### Predictions on test set

In [None]:
test_strings = {0: "test-1-7", 1: "test-1-8", 2: "test-1-9", 3: "test-1-10", 4: "test-1-11", 5: "test-1-12",
                 6: "test-1-13", 7: "test-1-14", 8: "test-2-4", 9: "test-2-5", 10: "test-2-6", 11: "test-2-7", 
                 12: "test-2-8"}

bb_pred_dict = {}

mean, std = analyze_images(test_images)

for j in test_strings:
    print(test_strings[j])
    cleared_image = clear_anomaly(test_images, test_strings[j], show_images=False, threshold=threshold)
    bb_pred_dict[test_strings[j]] = draw_bounding_boxes(cleared_image, test_images, test_strings[j], show_images=True, add_ground_truth=True)

result = str(bb_pred_dict)
result = result.replace("'", "\"")
print(result)
with open("test.json","w") as file:
    file.write(result)

# What we tried but didn't improve results (code added for reference)

We tried to include all timesteps to find out which objects (anomalies) are static and which not. Then we removed the static anomalies from every anomaly image of every timestep and added those images up. This would help to detect also people that are not visible in the center timestep but in others. We did pretty well on detecting people but also detected objects as people that aren't.

In [None]:
def mahalanobis(image, threshold=5):
    array = image.copy()
    mean_vector = np.mean(array, axis=(0, 1))  
    
    im_re = array.reshape(-1, 3).astype(np.float64)
    im_re -= im_re.mean(0, keepdims=True)
    im_re_cov = 1/(im_re.shape[0]-1) * im_re.T @ im_re
        
    cov = np.linalg.inv(im_re_cov)
            
    am = array - mean_vector
    amc = am @ cov
    distances = np.sqrt(np.einsum('ijk,ijk->ij', amc, am))
        
    distances = distances/distances.max()
    threshold = np.quantile(distances, .999)
    
    anomaly = np.zeros_like(distances)   
    anomaly[distances > threshold] = 1
        
    return distances, anomaly

In [None]:
def crop(image):
    y_nonzero, x_nonzero, _ = np.nonzero(image)
    return image[np.min(y_nonzero):np.max(y_nonzero), np.min(x_nonzero):np.max(x_nonzero)]

In [None]:
def clear_anomaly_new(images, key, show_images=False, threshold=threshold):
    anomaly_ims = []
    
    image_0 = images[key + '-0'].copy()
    image_0_org = image_0.copy()
    
    image_0 = reduce_image(image_0)

    _, anomaly_0 = mahalanobis(image_0)
    anomaly_0 = remove_wrong_anomalies(anomaly_0, image_0_org)
    base = anomaly_0
    
    anomaly_ims.append(anomaly_0)
    
    # Take care of moving elements that move just a little by removing only those elements which area doesn't change
    # much in the multiplied image
    areas_0 = get_anomaly_0_areas(base)
    
    if show_images:
        plt.figure(figsize=(10.6, 10.6))
        plt.imshow(anomaly_0)
        plt.show()

    for i in range(1, 7):
        image = images[key + '-' + str(i)].copy()
        image_org = image.copy()
        
        image = reduce_image(image)
        
        _, anomaly = mahalanobis(image, threshold=threshold)
        anomaly = remove_wrong_anomalies(anomaly, image_org)
        
        anomaly_ims.append(anomaly)

        base = anomaly * base
        
        # Remove stored areas if they don't appear in base anymore
        base_contours, _ = cv2.findContours(base.astype('uint8'), cv2.RETR_TREE, cv2.CHAIN_APPROX_NONE)
        if len(base_contours) < len(areas_0):
            anomaly_0_non_static = remove_static_anomalies(anomaly_0, base, None)
            anomaly_0_remaining = 1 - anomaly_0_non_static
            
            anomaly_0 = anomaly_0 * anomaly_0_remaining
            areas_0 = get_anomaly_0_areas(anomaly_0)
        
        if show_images:
            print(i)
            plt.figure(figsize=(10.6, 10.6))
            plt.imshow(base)
            plt.show()
    
    # Check area decline of contours in multiplied image. If area decreased a lot relatively, it is likely moving
    contours, _ = cv2.findContours(base.astype('uint8'), cv2.RETR_TREE, cv2.CHAIN_APPROX_NONE)
    for i, cnt in enumerate(contours):
        area = cv2.contourArea(cnt)
        
        if len(contours) <= len(areas_0):
            if areas_0[i] != 0 and area/areas_0[i] < 0.05:
                rect = cv2.boundingRect(cnt)
                x,y,w,h = rect
                base[y:y+h, x:x+w] = 0
                
        if area < 10:
            rect = cv2.boundingRect(cnt)
            x,y,w,h = rect
            base[y:y+h, x:x+w] = 0

    masked_0 = remove_static_anomalies(anomaly_ims[0], base, image_0)
    
    full_masked = masked_0
    
    for anomaly in anomaly_ims:
        masked = remove_static_anomalies(anomaly, base, None)
        full_masked += masked
        
    if show_images:
        plt.figure(figsize=(10.6, 10.6))
        plt.imshow(full_masked)
        plt.show()
    
    contours, _ = cv2.findContours(full_masked.astype('uint8'), cv2.RETR_TREE, cv2.CHAIN_APPROX_NONE)
    
    cnts_anoms = [False] * len(contours)
    
    for i in range(7):
        image = images[key + '-' + str(i)]
        mean = image.mean(axis=(0, 1))
        std = image.std(axis=(0, 1))
        
        for j, cnt in enumerate(contours):
            x,y,w,h = cv2.boundingRect(cnt)
            if area_fo_statistics(image[y:y+h, x:x+w], mean, std, show_images=False): 
                cnts_anoms[j] = True
    
    for cnt_anom, cnt in zip(cnts_anoms, contours):
        x,y,w,h = cv2.boundingRect(cnt)
        
        if not cnt_anom:
            full_masked[y:y+h, x:x+w] = 0
    
    if show_images:
        plt.figure(figsize=(10.6, 10.6))
        plt.imshow(full_masked)
        plt.show()
        
    contours, _ = cv2.findContours(full_masked.astype('uint8'), cv2.RETR_TREE, cv2.CHAIN_APPROX_NONE)
    for cnt in contours:
        area = cv2.contourArea(cnt)
        
        if area < 5:
            x,y,w,h = cv2.boundingRect(cnt)
            full_masked[y:y+h, x:x+w] = 0

    if show_images:
        plt.figure(figsize=(10.6, 10.6))
        plt.imshow(full_masked)
        plt.show()
    
    return full_masked

def reduce_image(image):
    im = image.copy()
    
    mean = crop(im).mean(axis=(0,1))
    std = crop(im).std(axis=(0,1))

    im[im[:, :, 0] < mean[0] + std[0]] = 0
    im[im[:, :, 1] < mean[1] + std[1]] = 0
    im[im[:, :, 2] < mean[2] + std[2]] = 0

    q95 = np.quantile(im, 0.95, axis=(0,1))
    q99 = np.quantile(im, 0.99, axis=(0,1))

    im[im[:, :, 0] < q95[0]] = 0
    im[im[:, :, 1] > q99[1]] = 0
    im[im[:, :, 2] < q95[2]] = 0
    
    return im
    
def get_anomaly_0_areas(anomaly_0):
    contours, _ = cv2.findContours(anomaly_0.astype('uint8'), cv2.RETR_TREE, cv2.CHAIN_APPROX_NONE)
    areas_0 = []
    for cnt in contours:
        area = cv2.contourArea(cnt)
        areas_0.append(area)
    
    return areas_0

def draw_bounding_boxes_new(anomaly_image, images, key, show_images=False, max_area=3, add_ground_truth=False):
    image = images[key + '-3'].copy()
    kernel = np.ones((10, 10),np.uint8)
    anomaly_image = cv2.morphologyEx(anomaly_image.astype('uint8'), cv2.MORPH_CLOSE, kernel)
    
    contours, _ = cv2.findContours(anomaly_image.astype('uint8'), cv2.RETR_TREE, cv2.CHAIN_APPROX_NONE)
    
    bb_vals = []
        
    for cnt in contours:
        area = cv2.contourArea(cnt)
        if area >= 5:
            uncertainty = min(int(max(800 / area, 1)), max_area)
            
            rect = cv2.boundingRect(cnt)
            x,y,w,h = rect
            w, h = w * uncertainty, h * uncertainty
            x, y = x - w // 2, y - h // 2
            
            bb_vals.append([x, y, w, h])
            
            if show_images:
                image = cv2.rectangle(image, (x,y), (x+w,y+h), (0, 255, 0), 2)
    
    if show_images:
        plt.figure(figsize=(10.6, 10.6))
        
        if add_ground_truth and (key + '-3') in valid_images:
            for box in ground_truth[key]:
                start_point = (box[0], box[1])
                end_point = (box[0] + box[3], box[1] + box[2])

                color = (255, 0, 0)

                thickness = 2
                image = cv2.rectangle(image, start_point, end_point, color, thickness)
                
        plt.imshow(image)
        plt.show()
        
    return bb_vals

def remove_static_anomalies(anomaly_image, static_anomalies_image, original_image, show_images=False):
    contours_static, _ = cv2.findContours(static_anomalies_image.astype('uint8'), cv2.RETR_TREE, cv2.CHAIN_APPROX_NONE)
    contours_image, _ = cv2.findContours(anomaly_image.astype('uint8'), cv2.RETR_TREE, cv2.CHAIN_APPROX_NONE)
    
    # Remove tiny areas
    remaining_cnts = []
    for cnt in contours_image:
        area = cv2.contourArea(cnt)
        x,y,w,h = cv2.boundingRect(cnt)
        
        if area <= 10:
            anomaly_image[y:y+h, x:x+w] = 0
        else:
            remaining_cnts.append(cnt)

    # Get close contours
    min_distance = 10
    close_contours = []
    for i in range(len(remaining_cnts) - 1):
        for j in range(i + 1, len(remaining_cnts)):
            for pt in remaining_cnts[j]:
                r_cnt = remaining_cnts[i].reshape(-1, 2)
                diff = r_cnt - pt
                dist = np.sqrt(diff[:, 0] ** 2 + diff[:, 1] ** 2)
                if dist.min() < min_distance:
                    close_contours.append((i, j))
                    break
        
    def remove_close_cnts(i):
        for j, (m, n) in enumerate(close_contours):
            if i == m:
                cv2.drawContours(mask, [remaining_cnts[n]], -1, 0, -1)
                close_contours.remove(close_contours[j])
                remove_close_cnts(n)
            elif i == n:
                cv2.drawContours(mask, [remaining_cnts[m]], -1, 0, -1)
                close_contours.remove(close_contours[j])
                remove_close_cnts(m)

    # Remove all contours that seem to be static
    mask = np.ones_like(anomaly_image)
    i = 0
    for i, r_cnt in enumerate(remaining_cnts):
        for cnt in contours_static:
            cnt = cnt.reshape(-1, 2)
            for x, y in cnt:
                if cv2.pointPolygonTest(r_cnt, (float(x), float(y)), True) >= 0:
                    cv2.drawContours(mask, [r_cnt], -1, 0, -1)
                    remove_close_cnts(i)
                    break
    
    if show_images:
        plt.figure(figsize=(10.6, 10.6))
        plt.imshow(anomaly_image)
        plt.show()

        plt.figure(figsize=(10.6, 10.6))
        plt.imshow(mask)
        plt.show()

    return anomaly_image * mask

def remove_wrong_anomalies(anomaly_image, original_image):
    kernel = np.ones((3, 3),np.uint8)
    anomaly_image = cv2.morphologyEx(anomaly_image.astype('uint8'), cv2.MORPH_OPEN, kernel)
    kernel = np.ones((10, 10),np.uint8)
    anomaly_image = cv2.morphologyEx(anomaly_image.astype('uint8'), cv2.MORPH_CLOSE, kernel)
    contours, _ = cv2.findContours(anomaly_image.astype('uint8'), cv2.RETR_TREE, cv2.CHAIN_APPROX_NONE)

    # Remove tiny areas
    for cnt in contours:
        area = cv2.contourArea(cnt)
        
        rect = cv2.boundingRect(cnt)
        x,y,w,h = rect
        
        if area <= 10:
            anomaly_image[y:y+h, x:x+w] = 0
        else:
            mean = original_image.mean(axis=(0,1))
            std = original_image.std(axis=(0,1))
            if not area_fo_statistics(original_image[y:y+h, x:x+w], mean, std, False):
                anomaly_image[y:y+h, x:x+w] = 0
                
    return anomaly_image

def area_fo_statistics(area, mean, std, show_images=False):
    m = area.mean(axis=(0, 1))
    s = area.std(axis=(0, 1))
    
    bias = 1.1
    
    rbh = 1.6
    
    r_out = (m[0] + s[0] > (mean[0] + std[0]) * bias).all()
    b_out = (m[2] + s[2] > (mean[2] + std[2]) * bias).all()
    
    red_or_blue_high = ((m[0] + s[0])/(mean[0] + std[0]) > rbh or (m[2] + s[2])/(mean[2] + std[2]) > rbh)
    
    quot = (m + s)/(mean + std)
    green_min = False
    if np.argmin(quot) == 1: 
        green_min = True

    if (r_out and b_out and green_min) or red_or_blue_high:
        if show_images:    
            print(mean + std)
            print(m + s)
            print((m + s)/(mean + std))
            print()
            plt.imshow(area)
            plt.show()
        return True

    return False

In [None]:
valid_strings = {0: "valid-1-0", 1: "valid-1-1", 2: "valid-1-2", 3: "valid-1-3", 4: "valid-1-4", 5: "valid-1-5",
                 6: "valid-1-6", 7: "valid-2-0", 8: "valid-2-1", 9: "valid-2-2", 10: "valid-2-3"}

bb_pred_dict = {}

mean, std = analyze_images(valid_images)

for j in valid_strings:
    print(valid_strings[j])
    cleared_image = clear_anomaly_new(valid_images, valid_strings[j], show_images=False, threshold=threshold)
    bb_pred_dict[valid_strings[j]] = draw_bounding_boxes_new(cleared_image, valid_images, valid_strings[j], show_images=True, add_ground_truth=True)

result = str(bb_pred_dict)
result = result.replace("'", "\"")
print(result)
with open("val_fail.json","w") as file:
    file.write(result)

In [None]:
def evaluate(detections: Dict[str, List[BoundingBox]],
             targets: Dict[str, List[BoundingBox]]) -> float:
    return compute_AP(detections, targets)


# load the detections
detections = read_bb("val_fail.json")
# load the targets
targets = read_bb(os.path.join('../4_Notebooks/data_WiSAR/data/validation/', 'labels.json'))

ap = evaluate(detections, targets)
print(f"Average precision={ap:.5f} on validation set.")

In [None]:
test_strings = {0: "test-1-7", 1: "test-1-8", 2: "test-1-9", 3: "test-1-10", 4: "test-1-11", 5: "test-1-12",
                 6: "test-1-13", 7: "test-1-14", 8: "test-2-4", 9: "test-2-5", 10: "test-2-6", 11: "test-2-7", 
                 12: "test-2-8"}

bb_pred_dict = {}

mean, std = analyze_images(test_images)

for j in test_strings:
    print(test_strings[j])
    cleared_image = clear_anomaly_new(test_images, test_strings[j], show_images=False, threshold=threshold)
    bb_pred_dict[test_strings[j]] = draw_bounding_boxes_new(cleared_image, test_images, test_strings[j], show_images=True, add_ground_truth=True)

result = str(bb_pred_dict)
result = result.replace("'", "\"")
print(result)
with open("test_fail.json","w") as file:
    file.write(result)

# rCRD approach for finding anomalies (also didn't improve results, code added for reference)

In [None]:
def rCRD_XY(image, r, l, threshold):
    image = image.swapaxes(2, 1).swapaxes(0, 1)

    d = image.shape[0]
    n = image.shape[1]*image.shape[2]
    
    X = image.copy()
    Xr = np.zeros([d, r])
    
    pts = []
                
    for i in range(r):
        x = np.random.randint(0, image.shape[1])
        y = np.random.randint(0, image.shape[2])
                
        while all(X[:, x, y] == 0) or ((y, x) in pts):
            x = np.random.randint(0, image.shape[1])
            y = np.random.randint(0, image.shape[2])
        Xr[:, i] = X[:, x, y]
        pts.append((y, x))
        
    a = Xr.T @ Xr + l
    unq, count = np.unique(a, axis=0, return_counts=True)
    b = np.linalg.pinv(Xr.T @ Xr + l) @ Xr.T
    A = (X.T @ b.T).T
    
    X_est = (A.T @ Xr.T).T
    
    diff = X - X_est
        
    delta = np.linalg.norm(diff, axis=0)
    delta = delta/np.max(delta)
    anomaly = np.where(delta > threshold, 1, 0)
    
    return anomaly

In [None]:
image = valid_images['valid-2-0-6']
plt.figure(figsize=(10.6, 10.6))
plt.imshow(image)
plt.show()

r = 500
t = 0.8

anomaly = rCRD_XY(image, r=r, l=1e-6, threshold=t)

plt.figure(figsize=(10.6, 10.6))
plt.imshow(anomaly)
plt.show()