In [1]:
from tqdm.notebook import tqdm
import imageio.v2 as imageio
import matplotlib.animation as animation
import gc
from glob import glob
import os
from random import sample, seed, randint, random, shuffle, choice, choices, uniform, gauss, triangular
from PIL import Image, ImageOps 
import pandas as pd
import shutil
from collections import OrderedDict
import numpy as np
import sys
import matplotlib.pyplot as plt
import cv2
import math
import copy
from joblib import Parallel, delayed
from joblib import Parallel, delayed
import torch
import torch.nn as nn
import torch.optim as optim
import transformers
from transformers import CvtConfig, CvtModel, AutoImageProcessor, CvtModel, CvtForImageClassification

In [2]:
#vars 
CHANGE_WINDOW = 60
SDEV_SENSITIVITY = 1 # not connected to any vars yet. Scale with the std under the data dist curve... it means something mathematically
ROLLING_WINDOW = 10
KERNEL_SIZE = 5

In [3]:
class interpret_anomaly_score():
    def __init__(self, label_list, anomaly_scores, print_results=True):
        self.print_results = print_results
        self.x = anomaly_scores
        self.label_list = label_list
        self.prep_data()
        self.identify_anomalies()
        self.clump_predictions()

    def prep_data(self):
        if type (self.x) == list:
            self.x = np.array(self.x)
        if type (self.label_list) == list:
            self.label_list = np.array(self.label_list)
        self.x_shape = self.x.shape[0]
        self.data = self.x
        self.data = self.normalize_data(self.data)
        self.data = pd.Series(self.data)
        self.zeros = pd.DataFrame(np.zeros(self.data.shape[0]), columns=['zeros'])
   
    def normalize_data(self, data):
        return (data - data.min()) / (data.max() - data.min())   

    def identify_anomalies(self):
        # rolling mean window
        rolling_mean = self.data.rolling(ROLLING_WINDOW).mean()
        rolling_mean = self.normalize_data(rolling_mean)

        # STD rolling window
        rolling_std = self.data.rolling(ROLLING_WINDOW).std()
        rolling_std = self.normalize_data(rolling_std)

        # loss gradient direction at each point
        self.gradient = np.gradient(self.data)
        self.gradient = pd.Series(self.gradient)
        
        self.points = self.data[(self.data < rolling_std) & (self.gradient <= self.zeros.zeros)]

    def clump_predictions(self):
        """Takes a list of labels and a list of anomaly predictions and returns a clumbed version of the number of true positives, false positives, and false negatives.
        
            - clump predictions so that they are not flagged more than once per CHANGE_WINDOW

            - every pred within CHANGE_wINDOW will have only one beginning and end point (which can be the same)

            - this serves to represent how many times an anomaly would be flagged for human review within a video """

        PREDS = self.points.index # this is the index of the predictions to be clumped together
        TP = 0
        FP = 0
        FN = 0
        counted_locations = []
        anomaly_locations = []

        # once an anomaly is counted, the next CHANGE_WINDOW points are not counted since they are already accounted for
        for point in self.points.index:
            if point in counted_locations or point < ROLLING_WINDOW:
                continue
            else:
                counted_locations.append(point)
                anomaly_locations.append(point)
                for i in range(point, point + CHANGE_WINDOW):
                    if i not in counted_locations:
                        counted_locations.append(i)

        # find total clumped anomalies based on labels (using the clumped predictions method above). 
        counted_locations = []
        total_anomalies = []
        for i in range(len(self.label_list)):
            if i < ROLLING_WINDOW or i in counted_locations:
                continue
            else:
                if self.label_list[i] == 1 and self.label_list[i-1] == 0:
                    total_anomalies.append(i)
                    counted_locations.append(i)
                    for j in range(i, i + CHANGE_WINDOW):
                        if j not in counted_locations:
                            counted_locations.append(j)

        # find TP and FP
        for i in anomaly_locations:
            if i in total_anomalies:
                TP += 1
            else:
                FP += 1

        # find FN
        FN = len(total_anomalies) - TP

        self.TP = TP
        self.FP = FP
        self.FN = FN
        self.pred_clumped_anomalies = anomaly_locations
        self.actual_clumped_anomalies = total_anomalies
        if self.print_results:
            print(f'TP: {TP}, FP: {FP}, FN: {FN}')
            print(f'Precision: {TP/(TP+FP)}')
            print(f'Recall: {TP/(TP+FN)}')
            print(f'F1: {2*(TP/(TP+FP))*(TP/(TP+FN))/((TP/(TP+FP))+(TP/(TP+FN)))}')
            print(f'Accuracy: {(TP+FN)/(TP+FP+FN)}')
            print("")
            print(f'Anomaly locations: {anomaly_locations}')
            print(f'Actual anomaly locations: {total_anomalies}')
            print(PREDS)


In [4]:
if __name__ == '__main__':
    # fake data
    GT = [0,0,0,0,0,0,0,0,0,0,0,0,0,1,1,1,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]
    anomaly_scores = [.7,.7,.8,.8,.8,.7,.7,.8,.7,.8,.9,.9,.8,.5,.2,.7,.6,.9,.8,.9,.7,.8,.9,.8,.9,.8,.9,.9,.8,.9,.8,.9,.8,.9,.9,.9,.9,.9,.9,.9,.9,.9,.9,.8,.8,.8,.8,.9,.6,.6,.8,.5,.3,.7,.8,.9,.4,.5,.4,.6,.8,.7,.6,.8,.5,.8,.6,.9,.9,.9,.8,.9,.9,.8,.9,.9,.8,.9,.9,.8,.7,.7]

    # test this on the fake data
    a =  interpret_anomaly_score(GT, anomaly_scores)

TP: 1, FP: 0, FN: 0
Precision: 1.0
Recall: 1.0
F1: 1.0
Accuracy: 1.0

Anomaly locations: [13]
Actual anomaly locations: [13]
Int64Index([13, 18, 20, 23, 51, 56, 57, 61, 64], dtype='int64')
