In [1]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.metrics import f1_score

import warnings
warnings.filterwarnings('ignore')

# Anomaly Detection

In [2]:
class AnomalyDetection():
    
    def __init__(self, kind='default'):
        
        self.kind = kind
        
    def estimateGaussian(self, data):
        
        self.mean = data.mean(axis=0)
        if self.kind == 'multi':
            self.sigma = np.cov(data.T)
            self.sigma_det = np.linalg.det(self.sigma)
            self.sigma_inv = np.linalg.inv(self.sigma)
        else:
            self.sigma = np.std(data, axis=0)
            
    def univariate(self, data):
    
        upper_term = (np.exp((-((data - self.mean)**2) / (2*(self.sigma**2)))))
        lower_term = (np.sqrt(2*np.pi) * self.sigma)

        prob = upper_term / lower_term

        return (np.prod(prob, axis=1))
    
    def multivariateGaussian(self, data):

        n = data.shape[1]
        
        upper_term = (np.exp(-(((data - self.mean).dot(self.sigma_inv)).dot((data - self.mean).T)) / 2))
        lower_term = (((2*np.pi)**(n/2))*((self.sigma_det)**(0.5)))

        prob = upper_term / lower_term

        return prob
    
    def getProb(self, data):
        
        if self.kind == 'multi':
            prob = [(self.multivariateGaussian(sample.reshape(1, -1)))[0, 0] for sample in data]
            return np.array(prob)
        else:
            return self.univariate(data)
    
    def optimalThreshold(self, val_x, val_y):
        
        prob = self.getProb(val_x)
    
        stepsize = (np.max(prob) - np.min(prob))/1000
        epsilon = np.arange(np.min(prob), np.max(prob), stepsize)
        epsilon = epsilon[::-1]

        pred = np.where(prob < epsilon[:, np.newaxis], 1, 0)

        F1 = np.array([f1_score(val_y, pred[i]) for i in range(len(pred))])

        return epsilon[F1.argmax()], F1.max()
    
    def predict(self, val_x, threshold):
        
        prob = self.getProb(val_x)
        
        return np.where(prob < threshold, 1, 0)

## Data Preparation

**Feature x1's range :** 1 ~ 10

**Feature x2's range :** 1 ~ 15

**Feature x3's range :** 20 ~ 50

Any value out of these ranges would be marked as anomaly (outlier).

**Labels**

Anomaly : 1 | Not Anomaly : 0

In [3]:
np.random.seed(0)

m = 1000

x1 = np.random.randint(1, 10, size=m)
x2 = x1 + np.random.randint(1, 5, size=m)
x3 = np.random.randint(20, 50, size=m)
train_x = np.array([x1, x2, x3]).T
print(train_x.shape)
train_x

(1000, 3)


array([[ 6,  9, 21],
       [ 1,  2, 42],
       [ 4,  7, 41],
       ...,
       [ 4,  8, 21],
       [ 5,  8, 30],
       [ 4,  5, 26]])

## Train

Train using Multivariate Gaussian.

In [4]:
a = AnomalyDetection(kind='multi')
a.estimateGaussian(train_x)

## Validation evaluation

#### Validation dataset

In [5]:
val_x = np.array([[20, 25, 50], [30, 21, 40], [8, 11, 32], [6, 8, 45], [6, 9, 40], [40, 9, 17], [4, 7, 21], [1, 3, 30]])
val_y = np.array([1, 1, 0, 0, 0, 1, 0, 0])

#### Finding Optimal threshold

In [6]:
threshold, F1 = a.optimalThreshold(val_x, val_y)
threshold, F1

(0.0006151806824039895, 1.0)

## Evaluate on Test data

In [7]:
test_x = np.array([[15, 13, 41], [3, 7, 40]])
a.predict(test_x, threshold)

array([1, 0])