### Anomaly Detection
- To indentify the data points that are different of the other data point
- There are 3 categories
    - Supervised Anomaly Dection: labels for both nomal and anomalous data
    - Semi-Supervised Anomaly Dectection: only label of nomal data 
    - Unsupervise Anomaly Dectection: no label is available
- Algorithms
    - Mahalanobis Distance: 
        - For one single dimension data, z-score can be used for outlier/anomaly detection. A data point is labeled as an anomaly if |z| > T (Threshold)
        - For multidimensional data, the z-score is not applicable. The Mahalanobis distance defined as https://en.wikipedia.org/wiki/Mahalanobis_distance
        
    - Density-based spatial clustering of application with noise (DBSCAN)
        - Find neighbors of all points of redius - Redius of defining neighborhood
        - Indentify a core point minimum number of datapoints required for formimng a local cluster
        - Find  the connected components of core point on the neighbors graph
        - Assign each non-core point to a nearby cluster if the cluster is an Redius of defining neighborhood, otherwise assign it to noise
    - Local Outlier Factor (LOF): Density-based anomaly detection algorithm
        - n_neighbors: the factor dictating the neighborhood size
        - contamination: the point of training data that may be outlier or anomalies
        - norlety: True for contrincing decision functin
    - Isolation Forest
    - One-Class Support Vector Machine

In [2]:
# import libraries
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

# make dataset
from sklearn.datasets import make_blobs

# statistical modules for data generation and critical values
from scipy.stats import multivariate_normal, beta, uniform, t

from functools import partial

from sklearn import svm
from sklearn.neighbors import NearestNeighbors
from sklearn.metrics import pairwise_distances
from sklearn.covariance import EllipticEnvelope # use for Mohalanobis distance
from sklearn.cluster import DBSCAN
from sklearn.ensemble import IsolationForest
from sklearn.neighbors import LocalOutlierFactor
from sklearn.linear_model import SGDOneClassSVM

from sklearn.inspection import DecisionBoundaryDisplay
from sklearn.metrics import classification_report
from sklearn.metrics import roc_curve, auc

from typing import Any


In [3]:
n_neightbors = 10 # number of neighbor for comparision
contamination = 0.01
novelty = True # need True to contruct decision boundary

In [None]:
clf_lof = LocalOutlierFactor(n_neighbors = n_neightbors, contamination = contamination, novelty = novelty)
clf_lof

In [None]:
# create a dataset that have points from a single multivariate Gaussion population, then manuallu add a outlier into a dataset

np.random.seed(0) # random seed = 0
n = 1000 # 1000 samples
mu = np.array([0, 0]) # zero means
sigma = np.array([
    [2, 1],
    [1, 2]
])
normal = multivariate_normal.rvs(mean = mu, cov = sigma, size = n) # rvs: random variates of size (size, N), where N is the dimension of the random variable
normal

In [None]:
len(normal)

In [None]:
# create anomaly

anomaly = np.array([3, 3])
anomaly = np.array([3, 3]).reshape(-1, 2) # anomaly data popint
anomaly

In [None]:
fig, ax = plt.subplots(1, 1, figsize = (10, 5))
ax.scatter(normal[:, 0], normal[:, 1], s = 10, label = 'Normal Data')
ax.plot([0], [0], marker = '+', color = 'green', markersize = 10) # mark the center
ax.scatter(anomaly[:, 0], anomaly[:, 1], s = 20, color = 'red', label = 'Anormal Data') # add anomaly data point

ax.legend()
ax.set_title('Unimodal Data with Single Anomaly');

In [None]:
# concate normal and anomaly together

dataset1 = np.concatenate((normal, anomaly)) # last item is the anomaly
dataset1

In [None]:
len(dataset1)

In [None]:
# create labels: 0 for nomal, 1 for anomaly

label = np.concatenate([(np.zeros(n, )), np.ones(1, )])  # last item is the anomaly
label

In [None]:
len(label)

In [None]:
n_neightbors = 20 # number of neighbor for comparision
contamination = 0.01
novelty = True # need True to contruct decision boundary

clf_lof = LocalOutlierFactor(n_neighbors = n_neightbors, contamination = contamination, novelty = novelty)
clf_lof

In [None]:
# train/fit a model

clf_lof.fit(dataset1)

In [None]:
# Decision boundary and anomaly output

dataset = dataset1
normal_data = normal
anomaly_data = anomaly

# plot

fig, ax = plt.subplots(1, 1, figsize = (10, 7)) # make subplot of size 10x5

disp = DecisionBoundaryDisplay.from_estimator(clf_lof, # anomaly detector
                                              dataset, # dataset for drawing decision boundary
                                              response_method = "decision_function", # plot decision function
                                              alpha = 0.7, # transparent level
                                              ax = ax)

pred = clf_lof.predict(normal_data) # predict 
disp.ax_.scatter(normal_data[pred == 1, 0], normal_data[pred == 1, 1], s = 10, color = 'black', label = "True Normal")
disp.ax_.scatter(normal_data[pred == -1, 0], normal_data[pred == -1, 1], s = 10, color = 'salmon', label = "False Anomaly")

preda = clf_lof.predict(anomaly_data) # predict 
disp.ax_.scatter(anomaly_data[preda == -1, 0], anomaly_data[preda == -1, 1], s = 10, color = 'red', label = "True Anomaly")
disp.ax_.scatter(anomaly_data[preda == 1, 0], anomaly_data[preda == 1, 1], s = 10, color = 'blue', label = "False Normal")

disp.ax_.set_title(r"Decision Boundary of Local Outlier Factor (n_neighbors = {}, contamination = {})".format(n_neightbors, contamination))
plt.axis("Square")
plt.colorbar(disp.ax_.collections[1])
plt.legend();

In [None]:
# Performance evaluation

pred = clf_lof.predict(dataset1)
pred = 1 - ((pred + 1)/2).astype('int') # convert 1 -> 0, -1 -> 1
target_names = ['Nomal', 'Anomaly']

print("Classification Report of Decision Boundary of Local Outlier Factor")
print(classification_report(label, pred, target_names = target_names))

In [None]:
# Unimodal data surrounded by Anomalies
# create a dataset which the data points are derived from a single multivariate Gaussian population. Oulier are added as the extreme outlier.

np.random.seed(0) # set for consistance run
n = 10_000
n

In [None]:
n_normal = int(n * 0.98)
n_normal

In [None]:
n_anomaly = n - n_normal
n_anomaly

In [None]:
normal2, _ = make_blobs(n_samples = n_normal, n_features = 2, centers = [[0, 0]], cluster_std = [1.0])
normal2

In [None]:
len(normal2)

In [None]:
# anomalies are located on the circle of radius r = 4 centered at the origin
 
r = 4
theta = np.arange(n_anomaly) * (2 * np.pi * n_anomaly) # 2% anomaly rate
theta

In [None]:
len(theta), type(theta)

In [None]:
x = np.cos(theta) * r + np.random.randn(n_anomaly) * r * 0.1
x

In [None]:
len(x)

In [None]:
y = np.sin(theta) * r + np.random.randn(n_anomaly) * r * 0.1
y

In [None]:
len(y)

In [None]:
anomaly2 = np.stack((x, y), axis = 0)
anomaly2

In [None]:
len(anomaly2)

In [None]:
anomaly2.shape

In [None]:
anomaly2 = np.stack((x, y), axis = 0).T
anomaly2

In [None]:
len(anomaly2)

In [None]:
anomaly2.shape

In [None]:
# plot

plt.figure(figsize = (10, 5))
plt.scatter(normal2[:, 0], normal2[:, 1], s = 20, color = 'green', label = 'Normal Data')
plt.scatter(anomaly2[:, 0], anomaly2[:, 1], s = 20, color = 'red', label = 'Anomalies Data')
plt.legend();

In [None]:
# concate nomal and anomalies data

dataset2 = np.concatenate((normal2, anomaly2))
dataset2

In [None]:
len(dataset2)

In [None]:
label2 = np.concatenate([(np.zeros(n_normal, )), np.ones(n_anomaly, )]) # labeled data by setting 0 for normal and 1 for anomaly
label2

In [None]:
len(label2)

In [None]:
n_neightbors = 20 # number of neighbor for comparision
contamination = 0.01
novelty = True # need True to contruct decision boundary

clf_lof = LocalOutlierFactor(n_neighbors = n_neightbors, contamination = contamination, novelty = novelty)
clf_lof

In [None]:
# train/fit a model

clf_lof.fit(dataset2)

In [None]:
# Decision boundary and anomaly output

dataset = dataset2
normal_data = normal2
anomaly_data = anomaly2

# plot

fig, ax = plt.subplots(1, 1, figsize = (10, 7)) # make subplot of size 10x5

disp = DecisionBoundaryDisplay.from_estimator(clf_lof, # anomaly detector
                                              dataset, # dataset for drawing decision boundary
                                              response_method = "decision_function", # plot decision function
                                              alpha = 0.7, # transparent level
                                              ax = ax)

pred = clf_lof.predict(normal_data) # predict 
disp.ax_.scatter(normal_data[pred == 1, 0], normal_data[pred == 1, 1], s = 10, color = 'black', label = "True Normal")
disp.ax_.scatter(normal_data[pred == -1, 0], normal_data[pred == -1, 1], s = 10, color = 'salmon', label = "False Anomaly")

preda = clf_lof.predict(anomaly_data) # predict 
disp.ax_.scatter(anomaly_data[preda == -1, 0], anomaly_data[preda == -1, 1], s = 10, color = 'red', label = "True Anomaly")
disp.ax_.scatter(anomaly_data[preda == 1, 0], anomaly_data[preda == 1, 1], s = 10, color = 'blue', label = "False Normal")

disp.ax_.set_title(r"Decision Boundary of Local Outlier Factor (n_neighbors = {}, contamination = {})".format(n_neightbors, contamination))
plt.axis("Square")
plt.colorbar(disp.ax_.collections[1])
plt.legend();

In [None]:
# Performance evaluation

pred = clf_lof.predict(dataset2)
pred = 1 - ((pred + 1)/2).astype('int') # convert 1 -> 0, -1 -> 1
target_names = ['Nomal', 'Anomaly']

print("Classification Report of Local Outlier Factor")
print(classification_report(label2, pred, target_names = target_names))

In [None]:
# ROC

scores = -clf_lof.decision_function(dataset2)
scores

In [None]:
len(scores)

In [None]:
fpr, tpr, _ = roc_curve(y_true = label2, y_score = scores, pos_label = 1)
fpr

In [None]:
tpr

In [None]:
_

In [None]:
auc_score = auc(fpr, tpr)
auc_score

In [None]:
plt.plot(fpr, tpr)
plt.title(f"ROC Curve of Local Outlier Factor \nAUC = {auc_score:0.3f}")
plt.grid()
plt.xlabel("False Alarm Rate")
plt.ylabel("Detection Rate")
plt.plot()