# Contents

<br/>

## 1. Generator-based anomaly detection (basic)
### 1.1. Gaussian model
### 1.2. Kernel density estimation

<br/><br/>

## 2. Autoencoder-based anomaly detection
### 2.1. Using reconstruction error
### 2.2. Using pre-trained autoencoder with generator-based approach

<br/><br/>

## 3. Likelihood ratio
### 3.1. Likelihood ratio with Gaussian model (basic)
### 3.2. Likelihood ratio with Autoregressive model (Advanced)

<br/><br/>
<br/><br/>
<br/><br/>

# 1. Generator-based anomaly detection

<br/>

---

<br/>

## 1.1. Overview

**Data distribution $p(x)$를 학습하여, 새로 주어진 target data가 outlier인지 판단함.**

1. **Data distribution** $p(x)$를 표현하는 **모델**을 학습함. 

2. 만약, 새로 주어진 **target data** $x'$이 **data distribution**에 속할 확률이 낮다면 ($p(x')<\epsilon$), outlier로 판단함.

<img src="./images/generator_example.png" width="600px" title="" />

<br/>
<br/>
<br/>
<br/>

---

<br/>

## 1.2. Method

### 1.2.1. Gaussian model

**Data distribution $p(x)$를 다루기 쉬운 Gaussian distribution으로 가정하는 프레임워크.**

1. 주어진 데이터셋에 대한 분포를 표현할 수 있는 **Gaussian distribution**를 추정함. 

2. 만약 주어진 **target data**가 **추정된 Gaussian distribution**에서 얻어질 확률이 작다면 ($p(x')<\epsilon$), 해당 데이터를 **outlier**로 판단함.

<img src="./images/gaussian_example.png" width="600px" title="" />

<br/>
<br/>
<br/>
<br/>

### 1.2.2. Kernel density estimation (KDE)

**Data distribution $p(x)$를 모든 training data point에 대한 point-wise kernel을 이용하여 표현함.** 

1. 모든 training data로 표현된 **kernel density**로부터 **target data**가 얻어질 확률을 값을 계산함.

2. 계산된 확률 값이 일정 이하라면, 해당 데이터를 **outlier**로 판단함.

<img src="./images/kde_example.png" width="600px" title="" />


<br/>
<br/>
<br/>

---

<br/>

## 1.3. Implementation

<br/>

### Install / Import library

In [None]:
# Install library 
!pip3 install --upgrade pip
!pip3 install --upgrade setuptools
!pip3 install numpy
!pip3 install matplotlib
!pip3 install scipy
!pip3 install sklearn

In [None]:
# Import library
import numpy as np
import matplotlib.pyplot as plt
from scipy.stats import multivariate_normal

<br/>

### Define toy dataset (2-dimensional data)

- 간단한 예시로서, **2-dimensional toy dataset**에서 anomaly detection을 수행함. 

- **training data는 총 100개**이며, **test data는 20개** (10개는 anomaly, 남은 10개는 normal data).


<br/>

**Variable list:**

> **N**: training data의 개수 (=100).

> **M**: test data의 개수 (=20).

> **training_data**: training dataset에 대한 numpy matrix ($N \times 2$).

> **test_data**: test dataset에 대한 numpy matrix ($M \times 2$).

> **test_data_label**: 각각의 test data가 anomaly data인가에 대한 ground truth (boolean array).

In [None]:
N = 100 # Number of training data
M = 20 # Number of test data


# Generate some normal data points
np.random.seed(0)
mean_normal = np.array([5, 5])
cov_normal = np.array([[1, 0.5], [0.5, 1]])
training_data = np.random.multivariate_normal(mean_normal, cov_normal, N) # normal data


# Generate some test data points
mean_anomaly = np.array([7, 7])
cov_anomaly = np.array([[2, -1], [-1, 2]])
test_data_normal = np.random.multivariate_normal(mean_normal, cov_normal, M//2) # normal data
test_data_anomaly = np.random.multivariate_normal(mean_anomaly, cov_anomaly, M//2) # anomaly data
test_data = np.concatenate([test_data_normal,test_data_anomaly], axis = 0) # concatenate
test_data_label = np.array([False]*(M//2)+[True]*(M//2)) # True indicates the anomaly data point

print('training_data.shape:', training_data.shape)
print('target_data.shape:', test_data.shape)

<br/>

### Visualize toy dataset

In [None]:
plt.figure(figsize=(5, 3))

# Plot training data in green
plt.scatter(training_data[:, 0], training_data[:, 1], s = 20,c='green', label='Training Data')
# Plot test data
plt.scatter(test_data[:, 0], test_data[:, 1], c=[('red' if v else 'blue') for v in test_data_label])

plt.title('Data Visualization')
plt.xlabel('Feature 1')
plt.ylabel('Feature 2')
plt.show()

- **우리는 위의 데이터에서 anomaly detection을 예시로서 수행함.**

<br/><br/>
<br/><br/>
<br/><br/>


### Anomaly detection with Gaussian model [1/3]

**우선, data distribution을 Gaussian distribution $p(x;\mu,\Sigma)$으로 표현함.** 구체적으로,

- **Gaussian distribution** $p(x;\mu,\Sigma)$를 표현하는 파라미터인 **mean**과 **covariance** $(\mu,\Sigma)$를 **training data**로 계산해야함.

- **Mean**과 **covariance** $(\mu,\Sigma)$는 다음과 같이 계산 가능함.
$$\mu=\frac{1}{N}\sum_{n=1}^N x_n, \qquad \Sigma=\frac{1}{N}\sum_{n=1}^N (x_n-\mu)(x_n-\mu)^T$$


In [None]:
def fit_Gaussian_parameter(data):
    """Obtain parameters of Gaussian distribution.

    Parameters
    ----------
    data: N x 2 array (training data, where row indicies each data point)

    Returns
    -------
    mean: single float
    covariance: single float
    """

    """ You can use functions in libraries.
    mean: https://numpy.org/doc/stable/reference/generated/numpy.mean.html
    covariance: https://numpy.org/doc/stable/reference/generated/numpy.cov.html
    """ 

    number_of_data = data.shape[0]

    # Compute mean and covariance using numpy module
    mean = """ TODO """
    cov = """ TODO """

    # (Optional) Compute mean parameter directly
    mean = 0
    for i in range(""" TODO """):
        mean = """ TODO """
    mean = mean / """ TODO """

    # (Optional) Compute covariance parameter directly
    cov = np.zeros((2, 2))
    for i in range(""" TODO """):
        de_mean = """ TODO """ # x_i - mean
        de_mean = de_mean.reshape(2,1) # Convert 2-dimensional vector to (2x1) matrix
        cov = cov + np.dot(""" TODO """,""" TODO """.T) # (x_i - mean) (x_i - mean).T
    cov = cov / """ TODO """

    return mean, cov

<br/>

### Anomaly detection with Gaussian model [2/3]

- **Gaussian distribution**의 파라미터 ($\mu,\Sigma$)가 주어졌을 때, 테스트 데이터 $x'$의 **likelihood**는 다음과 같음. $$p(x';\mu,\Sigma)=\frac{1}{\sqrt{(2\pi)^{2}\det(\Sigma)}}\exp(-\frac{1}{2}(x'-\mu)^T\Sigma^{-1}(x'-\mu))$$

- 계산된 **likelihood** $p(x';\mu,\Sigma)$로 데이터 $x'$가 **outlier**인지 판단할 수 있음. 

In [None]:
def compute_likelihood_Gaussian(target, mean, covariance):    
    """Compute the likelihood of target given Gaussian distribution.

    Parameters
    ----------
    target: 2-dimensional array (single target data)
    mean: 
    covariance:
    threshhold:

    Returns
    -------
    is_anomaly: Bool
    """

    """ You can use function in package.
    multivariate_normal.pdf: https://docs.scipy.org/doc/scipy/reference/generated/scipy.stats.multivariate_normal.html
    (recomendation) anomaly_score = multivariate_normal.pdf(# TODO #)
    """ 
    
    # Compute likelihood using scipy module given mean and covariance
    likelihood = """ TODO """

    # (Optional) Compute likelihood directly given mean and covariance
    det_cov = np.linalg.det(""" TODO """) # determinant
    inv_cov = np.linalg.inv(""" TODO """) # inverse matrix
    exponent = -0.5 * np.dot(np.dot((""" TODO """).T, inv_cov), (""" TODO """))
    coeff = 1 / (np.sqrt((2 * np.pi) ** 2 * det_cov))
    likelihood = coeff * np.exp(exponent)

    return likelihood   

<br/>

### Anomaly detection with Gaussian model [3/3]

1. training data distribution을 표현하는 **Gaussian model의 파라미터**를 얻음.

2. **test data에 대한 likelihood**를 위의 모델로부터 계산함

3. 만약, 주어진 데이터의 **likelihood**가 threshhold $\epsilon$보다 작다면 $(p(x')<\epsilon )$, outlier로 판단함.

<br/>

> **anomaly_predict**: 각각의 test data가 anomaly data인가에 대한 prediction (boolean list)

In [None]:
# Obtain mean and covariance parameter through training data
mean, cov = """ TODO """
threshhold = 0.03

# Predict the label of test data (is anomaly?)
anomaly_predict = []

for i in range(0,M):
    likelihood = """ TODO """ # Compute the likelihood of test data point

    is_anomaly = likelihood < threshhold  # If likelihood is smaller than threshhold, is_anomaly should be True
    anomaly_predict.append(is_anomaly)

print('Prediciton:', anomaly_predict)

- 예측한 결과에 대한 performance를 F1 score 및 accuracy를 이용하여 측정함.

In [None]:
import sklearn.metrics

""" You can use functions in a package.
sklearn.metrics.accuracy: https://scikit-learn.org/stable/modules/generated/sklearn.metrics.accuracy_score.html#sklearn.metrics.accuracy_score
sklearn.metrics.F1: https://scikit-learn.org/stable/modules/generated/sklearn.metrics.f1_score.html#sklearn.metrics.f1_score
"""

accuracy = """ TODO """
F1 = """ TODO """
print('Accuracy: ', accuracy)
print('F1 score: ', F1)

<br/>

### Visualize results

- **'o'** indicates the prediciton is correct, whereas **'x'** indicates the prediction is wrong.

In [None]:
# Visualization
plt.figure(figsize=(5, 3))

# Plot decision boundary
x, y = np.meshgrid(np.linspace(1, 10, 100), np.linspace(0, 10.5, 100))
points = np.column_stack((x.ravel(), y.ravel()))
pdf_values = multivariate_normal.pdf(points, mean=mean, cov=cov)

# Plot the contour of the PDF
plt.contour(x, y, pdf_values.reshape(x.shape), levels=[threshhold], colors='green', linewidths=2, linestyles='dashed')

# Plot test data
markers = np.where(anomaly_predict==test_data_label, "o", "x").tolist()
for i in range(M):
    plt.scatter(test_data[i, 0], test_data[i, 1], c=('red' if test_data_label[i] else 'blue'), marker=markers[i])

plt.title('Data Visualization')
plt.xlabel('Feature 1')
plt.ylabel('Feature 2')
plt.show()

<br/><br/>
<br/><br/>
<br/><br/>

### Anomaly detection with Kernel Density Estimation [1/2]

**Data distribution을 각각의 training data에 대한 point-wise kernel을 이용하여 표현함.**

- Data point $x'$에 대하여 **kernel density estimation**의 **likelihood**는 다음과 같이 정의됨. $$p(x')=\frac{1}{N}\sum^{N}_{i} k(x_i,x'),$$ $\{x_1,\ldots, x_N\}$은 training data이며 $k(\cdot,\cdot)$는 kernel. 

- 각각의 kernel $k(x_i,x')$은 주어진 training data $x_i$를 mean으로 사용하는 Gaussian distribution을 기반으로 정의함. $$k(x_i,x')=\frac{1}{(2\pi \ell^2)}\exp(-\frac{1}{2 \ell^2}\|x-x_i \|^2)$$

In [None]:
def kernel_density_estimation(dataset, target, bandwidth = 1, threshhold=0.1):
    """Compute the likelihood of the target using training data based on kernel density estimation.

    Parameters
    ----------
    dataset: N x 2 array (training data)
    target: 2-dimensional array array (single target data)
    bandwidth: float

    Returns
    -------
    likelihood: float
    """
    

    def kernel(data, target, bandwidth):
        """Compute the kernel value of the target.

        Parameters
        ----------
        data: 2-dimensioanl array (single training data)
        target: 2-dimensional array (single target data)
        bandwidth: float

        Returns
        -------
        k: float
        """

        frac = 1/(2*np.pi*bandwidth)
        norm_sq = """ TODO """ # ||x - x_i||^2
        k = frac * """ TODO """ # hint: Use np.exp()
        return k


    likelihood = 0

    for i in range(N):
        k = """ TODO """
        likelihood = """ TODO """

    likelihood = likelihood / N

    return likelihood


<br/>

### Anomaly detection with Kernel Density Estimation [2/2]

1. **test data에 대한 likelihood**를 **kernel density estimation**으로부터 계산함

2. 만약, 주어진 데이터의 **likelihood**가 threshhold $\epsilon$보다 작다면 $(p(x')<\epsilon )$, outlier로 판단.

In [None]:
anomaly_predict = [] 
threshhold = 0.03

for i in range(0,M):
    likelihood = """ TODO """ # Compute the likelihood of test data point
    
    is_anomaly = likelihood < threshhold # If likelihood is smaller than threshhold, is_anomaly should be True
    anomaly_predict.append(is_anomaly)

print('Prediction: ', anomaly_predict)

In [None]:
accuracy = sklearn.metrics.accuracy_score(test_data_label, anomaly_predict)
F1 = sklearn.metrics.f1_score(test_data_label, anomaly_predict)
print('Accuracy: ', accuracy)
print('F1 score: ', F1)

<br/>

### Visualize results

- **'o'** indicates the prediciton is correct, whereas **'x'** indicates the prediction is wrong.

In [None]:
# Visualization
plt.figure(figsize=(5, 3))

# Plot decision boundary
x, y = np.meshgrid(np.linspace(1, 10, 30), np.linspace(0, 10.5, 30))
points = np.column_stack((x.ravel(), y.ravel()))
pdf_values = np.array([kernel_density_estimation(training_data, p) for p in points])

# Plot the contour of the PDF
plt.contour(x, y, pdf_values.reshape(x.shape), levels=[threshhold], colors='green', linewidths=2, linestyles='dashed')

# Plot test data
markers = np.where(anomaly_predict==test_data_label, "o", "x").tolist()
for i in range(M):
    plt.scatter(test_data[i, 0], test_data[i, 1], c=('red' if test_data_label[i] else 'blue'), marker=markers[i])


plt.title('Data Visualization')
plt.xlabel('Feature 1')
plt.ylabel('Feature 2')
plt.show()

<br/><br/>
<br/><br/>
<br/><br/>
<br/>

# 2. Anomaly detection with Autoencoder

<br/>

---

<br/>

## 2.1. Overview 

**Problem: Gaussian model 및 KDE와 같은 방법은 이미지 (e.g., 28 x 28 = 784 dimensional data)와 같은 **high-dimensional data**에 적용하기 어려움.**

- **High-dimensional image**를 **low dimension vector** (e.g., 2-dimensional vector)로 mapping하는 **Autoencoder**를 이용하여 문제를 해결함.

- **Autoencoder**는 **neural network**로 구성된 **encoder**와 **decoder**로, 데이터를 **압축 및 복원**함.

<br/>
<br/>
<br/>

---

## 2.2. Method

### 2.2.1. Reconstruction error-based anomaly detection.

**비교적 간단한 방법. test data가 autoencoder에 얼마나 잘 일반화 되었는가를 이용함.**

1. **Autoencoder**를 **training data**에 대하여 **reconstruction error** (복원 오차)를 최소화 하도록 학습시킴.

2. 만약, 새로 주어진 테스트 데이터 $x'$에 대하여 **reconstruction error**가 일정 이상이면, outlier로 판단함. 

<br/>

<img src="./images/autoencoder_1.png" width="600px" title="" />

<br/><br/>
<br/><br/>
<br/><br/>

### 2.2.2. Pre-trained autoencoder-based anomaly detection.



1. 데이터에 대한 **pre-trained auto encoder**를 가지고 있다면, **high-dimensional image**를 **low dimension vector** (e.g., 2-dimensional vector) 로 mapping할 수 있음. 

3. 그러면, 이미지들이 mapping된 **low dimensional space**에서 **Gaussian model** 및 **KDE**를 적용하여 쉽게 **anomaly detection**을 수행할 수 있음.

<br/>

<img src="./images/autoencoder.png" width="600px" title="" />

<br/>
<br/>
<br/>

---

<br/>

## 2.3. Implementation

<br/>

### Install / Import library

In [None]:
!pip3 install torch torchvision torchaudio

In [None]:
import torch
import torchvision.datasets as dsets
import torchvision.transforms as transforms

<br/>

### Import dataset

- **training image는 1000개**이며, **test image는 100개** (50개는 normal data, 나머지 50개는 anomaly data). 

- 각각의 image는 $28\times 28=784$ **pixels**를 가졌음.

<br/>

> **N**: training data의 개수 (=1000).

> **M**: test data의 개수 (=100).

> **training_data:** 숫자 6에 대한 손글씨 데이터 ($N \times 784$)

> **test_image:** 숫자 6에 대한 손글씨 데이터 ($(M/2) \times 784$)와 이외의 숫자에 대한 손글씨 데이터 ($(M/2) \times 784$)


In [None]:
mnist = dsets.MNIST(root='MNIST_data/',
                          train=True,
                          transform=transforms.ToTensor(),
                          download=True)

N = 1000
M = 100

training_data = torch.cat([data for (data, label) in mnist if label == 6],dim = 0)[:N].view(-1,28*28)
test_data_normal = torch.cat([data for (data, label) in mnist if label == 6],dim = 0)[N:N+M//2].view(-1,28*28)
test_data_anomaly = torch.cat([data for (data, label) in mnist if label != 6],dim = 0)[:M//2].view(-1,28*28)

test_data = torch.cat([test_data_normal,test_data_anomaly], dim = 0) 
test_data_label = np.array([False]*(M//2)+[True]*(M//2)) # True indicates the anomaly data point

print('training_image.shape:', training_data.shape)
print('test_image.shape:', test_data.shape)

<br/>

### Visualize data

In [None]:
# Create a 5x5 subplot
plt.figure(figsize=(4, 4))
for i in range(16):
    plt.subplot(4, 4, i + 1)
    plt.imshow(test_data[i].numpy().reshape(28,28), cmap='gray')
    plt.axis('off')
print('Normal data samples:')
plt.show()

plt.figure(figsize=(4, 4))
for i in range(16):
    plt.subplot(4, 4, i + 1)
    plt.imshow(test_data[M//2+i].numpy().reshape(28,28), cmap='gray')
    plt.axis('off')
print('Anomaly data samples:')
plt.show()

<br/><br/>
<br/><br/>
<br/><br/>

### Reconstruction error-based anomaly detection [1/3]

- 784차원의 데이터를 **encoder**를 이용하여 2차원의 **low dimensional space로 mapping**하고, **decoder**를 이용하여 복원하는 **autoencoder**를 학습함.

- MLP를 기반으로 **encoder**와 **decoder**를 설계함.


In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Autoencoder Model
class Autoencoder(torch.nn.Module):
    def __init__(self):
        super(Autoencoder, self).__init__()

        """First, we simply define the encoder and decoder archiectures.
        """

        # Encoder: Map 784-dimensional image to 2-dimensional vector
        self.encoder = torch.nn.Sequential(
            torch.nn.Linear(28 * 28, 128), 
            torch.nn.ReLU(),            
            torch.nn.Linear(128, 128), 
            torch.nn.ReLU(),
            torch.nn.Linear(128, 2),
        )

        # Decoder: reconstruct 784-dimensional image from 2-dimensional vector
        self.decoder = torch.nn.Sequential(
            torch.nn.Linear(2, 128), 
            torch.nn.ReLU(),
            torch.nn.Linear(128, 128), 
            torch.nn.ReLU(),
            torch.nn.Linear(128, 28 * 28),
            torch.nn.Sigmoid()
        )
        
    def forward(self, x):
        encoded = self.encoder(x) # encoding
        decoded = self.decoder(encoded) # deconding
        return decoded

# Instantiate the autoencoder
autoencoder = Autoencoder().to(device)
optimizer = torch.optim.Adam(autoencoder.parameters(), lr=1e-2)

<br/>

### Reconstruction error-based anomaly detection [2/3]

- 위의 autoencoder를 training data에 대하여 **reconstruction error** $$\text{min}\frac{1}{N}\sum^N_{i=1}\|x_i-D(E(x_i)) \|^2$$를 최소화 하도록 학습시킴. $E(\cdot)$은 인코더이며, $D(\cdot)$은 디코더임.

In [None]:
def reconstruction_error(data,reconstructed_data):
    """Compute the reconstruction error.

    Parameters
    ----------
    dataset: original data (Nx784 torch matrix)
    reconstructed_data: reconstructed data (Nx784 torch matrix)

    Returns
    -------
    error: float
    """

    """ You can use functions in a package.
    torch.sum(): https://pytorch.org/docs/stable/generated/torch.sum.html
    torch.mean(): https://pytorch.org/docs/stable/generated/torch.sum.html
    """
    
    error = torch.mean(torch.sum(""" TODO """), dim = 0) 

    return error


# (Optional) Use package
""" You can use function in package: https://pytorch.org/docs/stable/generated/torch.nn.MSELoss.html
reconstruction_error = # TODO #
""" 

# Training loop
num_epoch = 5000
training_data = training_data.to(device)

for i in range(num_epoch):
    # Forward pass
    reconstructed_data = autoencoder(training_data)
    loss = reconstruction_error(reconstructed_data, training_data)
    
    # Backpropagation and optimization
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()
    
    if (i+1) % 100 == 0:
        print(f"\r Epoch [{i+1}/{num_epoch}], Loss: {loss.item():.4f}", end = ' ')

last_loss = loss.item()
print('Done!')

<br/>

### Reconstruction error-based anomaly detection [3/3]

- 테스트 데이터에 대한 **reconstruction error**가 일정 이상으로 크다면, **outlier**로 판단함. 

In [None]:
anomaly_predict = [] 
anomaly_score = [] 
threshhold = 3*last_loss

for i in range(0,M):
    error = reconstruction_error(test_data[i].view(1,-1),autoencoder(test_data[i].view(1,-1))).item() # Compute the likelihood of test data point
    is_anomaly = threshhold < error  # If error is bigger than threshhold, is_anomaly should be True
    
    anomaly_score.append(error)
    anomaly_predict.append(is_anomaly)

print('Prediction: ', anomaly_predict)

In [None]:
accuracy = sklearn.metrics.accuracy_score(test_data_label, anomaly_predict)
F1 = sklearn.metrics.f1_score(test_data_label, anomaly_predict)
print('Accuracy: ', accuracy)
print('F1 score: ', F1)

<br/>

### Visualization

In [None]:
# Create a 5x5 subplot
plt.figure(figsize=(6, 6))
for i in range(16):
    plt.subplot(4, 4, i + 1)
    plt.imshow(test_data[i].numpy().reshape(28,28), cmap='gray')
    plt.title('error: '+str(np.round(anomaly_score[i],3)))
    plt.axis('off')
print('Normal data samples:')
plt.tight_layout()
plt.show()

plt.figure(figsize=(6, 6))
for i in range(16):
    plt.subplot(4, 4, i + 1)
    plt.imshow(test_data[M//2+i].numpy().reshape(28,28), cmap='gray')
    plt.title('error: '+str(np.round(anomaly_score[M//2+i],3)))
    plt.axis('off')
print('Anomaly data samples:')
plt.tight_layout()
plt.show()

<br/><br/>
<br/><br/>
<br/><br/>

### Pre-trained autoencoder-based anomaly detection [1/3]

- 784차원의 데이터를 2차원의 **low dimensional space로 mapping**하는 **pre-trained autoencoder**를 이용함.

- 그러면, **low dimensional space**에서 Gaussian model 및 KDE 방법 적용 가능.

<br/>
<br/>

**하지만, pre-trained autoencoder가 사전에 존재해야함.**

- 이를 만족 시키기 위해, 임시로 **pre-trained autoencoder**를 좀 더 큰 데이터셋에서 따로 학습시킴.

> **training_data_for_pretraining:** pre-trained deep learning model (autoencoder)를 학습하기 위한 데이터셋 ($5000 \times 2$ torch matrix).

In [None]:
training_data_for_pretraining = torch.cat([data for (data, _) in mnist],dim = 0)[:5000].view(-1,28*28).to(device)

# Instantiate the autoencoder
pre_trained_autoencoder = Autoencoder().to(device)
optimizer = torch.optim.Adam(pre_trained_autoencoder.parameters(), lr=1e-2)

In [None]:
# pre-training loop
num_epoch = 5000
training_data_for_AE = pre_trained_autoencoder.to(device)

for i in range(num_epoch):
    # Forward pass
    reconstructed_data = pre_trained_autoencoder(training_data_for_pretraining)
    loss = reconstruction_error(reconstructed_data, training_data_for_pretraining)
    
    # Backpropagation and optimization
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()
    
    if (i+1) % 100 == 0:
        print(f"\r Epoch [{i+1}/{num_epoch}], Loss: {loss.item():.4f}", end = ' ')

print('Done!')

<br/>

### Pre-trained autoencoder-based anomaly detection [2/3]

- **Pre-trained autoencoder**를 이용하여, **data**를 **low-dimensional space**에 mapping함.

In [None]:
training_data = pre_trained_autoencoder.encoder(training_data.to(device)).cpu().detach().numpy()
test_data = pre_trained_autoencoder.encoder(test_data.to(device)).cpu().detach().numpy()


# Visualization
plt.figure(figsize=(5, 3))

# Plot training data in green
plt.scatter(training_data[:, 0], training_data[:, 1], s = 20,c='green', label='Training Data')
# Plot test data
plt.scatter(test_data[:, 0], test_data[:, 1], c=[('red' if v else 'blue') for v in test_data_label])

plt.title('Data Visualization')
plt.xlabel('Feature 1')
plt.ylabel('Feature 2')
plt.show()

<br/>


### Pre-trained autoencoder-based anomaly detection [3/3]

- **Low-dimensional space**에서 **Kernel density estimation**를 사용하여 anomaly detection 수행 (**Gaussian model**을 사용해도 됨).

In [None]:
anomaly_predict = []
threshhold = 0.05

for i in range(0,M):
    likelihood = """ TODO """
    is_anomaly = likelihood < threshhold
    anomaly_predict.append(is_anomaly)

print('Prediction: ', anomaly_predict)

In [None]:
accuracy = sklearn.metrics.accuracy_score(test_data_label, anomaly_predict)
F1 = sklearn.metrics.f1_score(test_data_label, anomaly_predict)
print('Accuracy: ', accuracy)
print('F1 score: ', F1)

<br/>

### Visualize results

- **'o'** indicates the prediciton is correct, whereas **'x'** indicates the prediction is wrong.

In [None]:
# Visualization
plt.figure(figsize=(5, 3))

# Plot decision boundary
x, y = np.meshgrid(np.linspace(np.min(test_data[:,0]), np.max(test_data[:,0]), 30), np.linspace(np.min(test_data[:,1]), np.max(test_data[:,1]), 30))
points = np.column_stack((x.ravel(), y.ravel()))
pdf_values = np.array([kernel_density_estimation(training_data, p) for p in points])

# Plot the contour of the PDF
plt.contour(x, y, pdf_values.reshape(x.shape), levels=[threshhold], colors='green', linewidths=2, linestyles='dashed')

# Plot test data
for i in range(M):
    plt.scatter(test_data[i, 0], test_data[i, 1], c=('red' if test_data_label[i] else 'blue'), 
                marker='o' if anomaly_predict[i]==test_data_label[i] else 'x')


plt.title('Data Visualization')
plt.xlabel('Feature 1')
plt.ylabel('Feature 2')
plt.show()

<br/><br/>
<br/><br/>
<br/><br/>
<br/>

# 3. Anomaly detection with likelihood ratio

<br/>

---

<br/>

## 3.1. Overview

**Problem: 데이터에 대한 likelihood score의 대부분은 background statistics에 큰 영향을 받음.**

- 예를 들어 아래와 같은 이미지의 경우, background인 zero pixel이 대부분의 statistics을 차지함.

<img src="./images/anomaly_motivation.png" width="600px" title="" />

<br/>

- 그러면, 충분한 zero pixel을 포함하는 **anomaly data**와 **normal data**의 **likelihood 차이가 적을 것**. 즉, anomaly detection이 어려움.

<br/>

**Likelihood ratio**는 **background information을 anomaly detection에서 무시**하기 위해 제안된 비교적 최근 프레임 워크.

<br/>
<br/>
<br/>

---

<br/>

## 3.2. Method

**Likelihood ratio는 다음과 같은 가정을 기반으로함.**

- **(Assumption 1)** 각각의 데이터 $x$는 **semantic component**인 $x_S$와 **background componen**t인 $x_B$로 분해될 수 있음 ($p(x)=p(x_B)p(x_S)$).

- **(Assumption 2)** 데이터에 **noise perturbation을 추가**해도, **background component**에 대한 **statistics**는 크게 변하지 않음.  

<br/><br/>
<br/>

**위와 같은 가정에서, likelihood ratio는 다음과 같은 프레임워크를 가짐.**

- **Ingredient: Noise perturbed data**에 대한 분포를 나타내는 $\tilde{p}({x})$와 기존의 **training data**에 대한 분포를 나타내는 $p(x)$.

- **Likelihood ratio**은 주어진 data $x'$에 대하여, 두 분포에 대한 **likelihood ratio**를 다음과 같이 정의함.

$$\text{LLR}(x')=\frac{p(x')}{\tilde{p}(x')}=\frac{p({x'}_B)p({x'}_S)}{\tilde{p}({x'}_B)\tilde{p}({x'}_S)}\approx\frac{p({x'}_S)}{\tilde{p}({x'}_S)},\qquad \text{(Assumption 2) } \tilde{p}({x'}_B)\approx p({x'}_B)$$

- 여기서,  $\text{LLR}(x')$는 **semantic component**만을 기반으로 **likelihood score**를 제공함.



<br/>
<br/>
<br/>

---

<br/>

## 3.3. Implementation

1. **Noise perturbed data 대한 분포를 나타내는 $\tilde{p}({x})$와 기존의 training data에 대한 분포를 나타내는 $p(x)$를 학습.** 

2. 테스트 데이터에 대해 **$\text{LLR}(x')={p(x')}/{\tilde{p}(x')}$를 계산하면, background statistics는 무시하고 anomaly detection 수행 가능.**


<br/><br/><br/>

### Define perturbed data

- 우선, **pertured data distribution** $\tilde{p}(x)$를 학습하기 위한 **perturbed dataset** $\{\tilde{x}_1,\ldots,\tilde{x}_N\}$을 정의함. 이는 단순히 training data에 **random noise**를 추가하여 얻을 수 있음.

> **perturbed_training_data**: Noise perturbation이 추가된 perturebed dataset ($N \times 2$)

In [None]:
""" You can sample noise from various distribution.
normal distribution: https://numpy.org/doc/stable/reference/random/generated/numpy.random.uniform.html
uniform distribution: https://numpy.org/doc/stable/reference/random/generated/numpy.random.normal.html
(recomendation) np.random.normal(# TODO #)
""" 

perturbed_training_data =  """ TODO """ # add perturbation

<br/><br/>
<br/><br/>
<br/><br/>


### Likelihood ratio-based Anomaly detection with Gaussian Model [1/2]

- **Perturbed data distribution**인 $\tilde{p}(x)$와, **normal training data distribution**인 $p(x)$를 각각 학습해야함.

- **Gaussian model**을 이용하여 각각의 **data distribution**을 표현할 수 있음. $$p(x)=p(x;\mu,\Sigma),\qquad \tilde{p}(x)=p(x;\tilde{\mu},\tilde{\Sigma})$$ $\mu,\Sigma$는 normal training data $\{x_1,\ldots, x_N\}$에 대한 분포를 표현하며, $\tilde{\mu},\tilde{\Sigma}$는 perturbed data $\{\tilde{x}_1,\ldots, \tilde{x}_N\}$에 대한 분포를 표현함. 


In [None]:
""" You can use fit_Gaussian_parameter() function.
mean, cov: mean and covariance parameters of normal training data distribution.
perturbed_mean, perturbed_cov: mean and covariance parameters of perturbed training data distribution.
""" 

# The parameters of Gaussian model trained on training data
mean, cov = """ TODO """

# The parameters of Gaussian model trained on perturbed data
perturbed_mean, perturbed_cov = """ TODO """

<br/>

### Likelihood ratio-based Anomaly detection with Gaussian Model [2/2]

- 각각 학습된 모델을 기반으로, $\text{LLR}(x')={p(x')}/{\tilde{p}(x')}$를 계산할 수 있음.

In [None]:
anomaly_predict = []
threshhold = 1.5

for i in range(M):

    # Compute LLR score
    p_x = """ TODO """
    p_perturbed_x = """ TODO """
    LLR_x = """ TODO """

    # Is anomaly?
    is_anomaly = LLR_x < threshhold
    anomaly_predict.append(is_anomaly)

print('Prediction: ', anomaly_predict)

In [None]:
accuracy = sklearn.metrics.accuracy_score(test_data_label, anomaly_predict)
F1 = sklearn.metrics.f1_score(test_data_label, anomaly_predict)
print('Accuracy: ', accuracy)
print('F1 score: ', F1)

<br/>

### Visualize results

- **'o'** indicates the prediciton is correct, whereas **'x'** indicates the prediction is wrong.

In [None]:
# Visualization

plt.figure(figsize=(5, 3))

# Plot decision boundary
x, y = np.meshgrid(np.linspace(np.min(test_data[:,0]), np.max(test_data[:,0]), 30), np.linspace(np.min(test_data[:,1]), np.max(test_data[:,1]), 30))
points = np.column_stack((x.ravel(), y.ravel()))
pdf_values = np.array([compute_likelihood_Gaussian(p, mean, cov)/compute_likelihood_Gaussian(p, perturbed_mean, perturbed_cov)
                       for p in points])

# Plot the contour of the PDF
plt.contour(x, y, pdf_values.reshape(x.shape), levels=[threshhold], colors='green', linewidths=2, linestyles='dashed')

# Plot test data
markers = np.where(anomaly_predict==test_data_label, "o", "x").tolist()
for i in range(M):
    plt.scatter(test_data[i, 0], test_data[i, 1], c=('red' if test_data_label[i] else 'blue'), 
                marker='o' if anomaly_predict[i]==test_data_label[i] else 'x')


plt.title('Data Visualization')
plt.xlabel('Feature 1')
plt.ylabel('Feature 2')
plt.show()

<br/><br/>
<br/><br/>
<br/><br/>
<br/><br/>

### Likelihood ratio with Autoregressive model

우리는 $p(x)$를 표현하기 위해 간단한 모델을 사용했음. 하지만, **sequence**와 같은 data $x=\{x^{(1)},\ldots,x^{(L)}\}$에 대하여, $p(x)$를 좀 더 복잡하게 복잡하게 모델링 해야할 수 있음.

- **Sequence data** $x$에 대한 **likelihood ratio** 기반의 anomaly detection을 수행함.

- 좀 더 복잡한 **autoregressive model** $p(x)=p(x^{(1)})p(x^{(2)}|x^{(1)})\cdots p(x_L|x^{(1)},\ldots,x^{(L-1)})$를 이용하여 data distribution을 표현함.


<br/>

### Likelihood ratio with Autoregressive model [1/4]

- 우선, A부터 D로 구성된 **sequence** 형태의 data $x$를 정의함.

- **Normal data**와 **abnormal data**는 sequence에서 다른 패턴을 가짐.

In [None]:
# Generate a synthetic categorical sequence dataset with dependencies
def generate_normal_categorical_sequence(length=10):
    
    # Define the transition probabilities
    # If current token is 'A', the next token will be 'A' (with p = 0.7) or 'B' (with p = 0.3).
    # e.g., AAABBCCCCDABBBB...
    transition_probabilities = {
        'A': {'A': 0.7, 'B': 0.3, 'C': 0.0, 'D': 0.0},
        'B': {'A': 0.0, 'B': 0.7, 'C': 0.3, 'D': 0.0},
        'C': {'A': 0.0, 'B': 0.0, 'C': 0.7, 'D': 0.3},
        'D': {'A': 0.3, 'B': 0.0, 'C': 0.0, 'D': 0.7}
    }

    categories = ['A', 'B', 'C', 'D']
    sequence = []
    current_category = np.random.choice(categories)  # Start with a random category
    sequence.append(current_category)

    for _ in range(length - 1):
        next_category = np.random.choice(categories, p=[transition_probabilities[current_category][c] for c in categories])
        sequence.append(next_category)
        current_category = next_category

    return sequence



# Generate a synthetic categorical sequence dataset with dependencies
def generate_abnormal_categorical_sequence(length=10):
    
    # Define the transition probabilities
    # e.g., AAAADDCBBADDD....
    transition_probabilities = {
        'A': {'A': 0.7, 'B': 0.1, 'C': 0.1, 'D': 0.1},
        'B': {'A': 0.1, 'B': 0.7, 'C': 0.1, 'D': 0.1},
        'C': {'A': 0.1, 'B': 0.1, 'C': 0.7, 'D': 0.1},
        'D': {'A': 0.1, 'B': 0.1, 'C': 0.1, 'D': 0.7}
    }

    categories = ['A', 'B', 'C', 'D']
    sequence = []
    current_category = np.random.choice(categories)  # Start with a random category
    sequence.append(current_category)

    for _ in range(length - 1):
        next_category = np.random.choice(categories, p=[transition_probabilities[current_category][c] for c in categories])
        sequence.append(next_category)
        current_category = next_category

    return sequence



print('Example of normal x: ', generate_normal_categorical_sequence(50))
print('\nExample of abnormal x: ', generate_abnormal_categorical_sequence(50))

<br/><br/>
<br/><br/>
위의 데이터를, **torch**가 다룰 수 있는 형태의 **dataset**으로 정의함.

- **50 length를 가진 1000개의 sequence**를 **training dataset**으로 정의하며, **50 length를 가진 10개의 sequence를 test dataset**으로 정의함.

- 각각의 alphabet에 one hot operation을 적용하여 4-dimensional vector로 만듬.

<br/>

> **training_data**: ($1000 \times 50 \times 4$)의 torch tensor

> **test_data**: ($10 \times 50 \times 4$)의 torch tensor (1~5은 normal, 5~10은 outlier)

In [None]:
# Define a dictionary to map categories to one-hot vectors
category_to_one_hot = {'A': [1, 0, 0, 0], 'B': [0, 1, 0, 0], 'C': [0, 0, 1, 0], 'D': [0, 0, 0, 1]}



# Generate a training data
training_data = []
N = 1000

for _ in range(N):
    sequence = generate_normal_categorical_sequence(length=50)
    training_data.append(sequence)

# Convert the sequences to one-hot encoded vectors
training_data = torch.tensor([
    [category_to_one_hot[category] for category in sequence]
    for sequence in training_data
]).to(device)



# Generate a test data
test_data_orig = []
M = 10

for _ in range(M//2):
    sequence = generate_normal_categorical_sequence(length=50)
    test_data_orig.append(sequence)
for _ in range(M//2):
    sequence = generate_abnormal_categorical_sequence(length=50)
    test_data_orig.append(sequence)

# Convert the sequences to one-hot encoded vectors
test_data = torch.tensor([
    [category_to_one_hot[category] for category in sequence]
    for sequence in test_data_orig
]).to(device)
test_data_label = np.array([False]*(M//2)+[True]*(M//2)) # True indicates the anomaly data point

print('training_data.shape: ',training_data.shape)
print('test_data.shape: ',test_data.shape)

<br/>

### Likelihood ratio with Autoregressive model [2/4]

- **LSTM** 기반의 **autoregressive** 모델 $p(x)=p(x^{(1)})p(x^{(2)}|x^{(1)})\cdots p(x_L|x^{(1)},\ldots,x^{(L-1)})$ 을 정의 및 훈련함.

In [None]:
# Create and train the LSTM model for generative modeling with log softmax
class LSTMGenerativeModel(torch.nn.Module):
    def __init__(self, input_size, hidden_size, output_size):
        super(LSTMGenerativeModel, self).__init__()
        self.hidden_size = hidden_size
        self.embedding = torch.nn.Linear(input_size, hidden_size)
        self.lstm = torch.nn.LSTM(hidden_size, hidden_size, batch_first = True)
        self.fc = torch.nn.Linear(hidden_size, output_size)

    def forward(self, x, hidden):
        x = self.embedding(x)
        out, hidden = self.lstm(x, hidden)
        out = self.fc(out)
        out = torch.nn.functional.log_softmax(out, dim = -1)
        return out, hidden
    
# Define hyperparameters
input_size = 4  # Number of categories (A, B, C, D)
hidden_size = 64  # Size of the LSTM hidden state
output_size = 4  # Number of categories (A, B, C, D)
learning_rate = 0.003

# Initialize LSTMGenerativeModel
model = LSTMGenerativeModel(input_size, hidden_size, output_size)
model.to(device)

In [None]:
import torch.nn as nn
import torch.optim as optim

# Define loss function and optimizer
criterion = nn.NLLLoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)



num_epochs = 200

# Training loop
for epoch in range(num_epochs):
    # Initialize hidden state of LSTM for the entire batch
    hidden = (torch.zeros(1, 1000, hidden_size).to(device), torch.zeros(1, 1000, hidden_size).to(device))
    
    optimizer.zero_grad()
    loss = 0
    for time_step in range(49):
        batch_input = training_data[:, time_step, :].unsqueeze(1).float()  # Shape: (batch_size, 1, input_size)

        # Forward pass through the model
        output, hidden = model(batch_input, hidden)
        batch_target = training_data[:, time_step+1, :].argmax(dim=-1)  # Shape: (batch_size)

        # Calculate loss for this time step and accumulate
        loss += criterion(output.squeeze(1), batch_target)

    # Backward pass and optimization
    loss.backward()
    optimizer.step()

    # Print the loss at the end of each epoch
    print(f'\r Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item():.4f}', end = ' ')


<br/>

### Likelihood ratio with Autoregressive model [3/4]

- **Autoregressive** 모델을 **perturbed data**에 대하여 훈련함 (perturbed data distribution $\tilde{p}(x)$를 훈련).

- 우선, **perturbed training data** 부터 정의함.


<br/>

> **perturbed training data**: training data에 random perturbation을 가함 (one-hot index의 location을 랜덤으로 변경)

In [None]:
import random
# Define perturbed data

def introduce_perturbations(tensor, perturbation_probability=0.1):
    perturbed_tensor = tensor.clone()  # Create a copy of the input tensor
    
    # Iterate through the tensor and introduce perturbations
    for sequence in perturbed_tensor:
        for t in range(sequence.size(0)):  # Assuming the first dimension is the time steps
            if random.random() < perturbation_probability:
                # Find the current location of '1' in the sequence
                current_location = torch.argmax(sequence[t])
                
                # Generate a new random location for '1' that is different from the current location
                new_location = current_location
                while new_location == current_location:
                    new_location = random.randint(0, sequence.size(1) - 1)  # Assuming the second dimension is the one-hot encoding
                
                # Move '1' to the new location
                sequence[t][current_location] = 0
                sequence[t][new_location] = 1
    
    return perturbed_tensor



# Introduce perturbations into your training_data tensor
perturbed_training_data = introduce_perturbations(training_data, perturbation_probability=0.2)

In [None]:
# Initialize LSTMGenerativeModel
perturbed_model = LSTMGenerativeModel(input_size, hidden_size, output_size)
perturbed_model.to(device)

# Define loss function and optimizer
criterion = nn.NLLLoss()
optimizer = optim.Adam(perturbed_model.parameters(), lr=learning_rate)



num_epochs = 200

# Training loop
for epoch in range(num_epochs):
    # Initialize hidden state for the entire batch
    hidden = (torch.zeros(1, 1000, hidden_size).to(device), torch.zeros(1, 1000, hidden_size).to(device))
    
    optimizer.zero_grad()
    loss = 0
    for time_step in range(49):
        batch_input = perturbed_training_data[:, time_step, :].unsqueeze(1).float()  # Shape: (batch_size, 1, input_size)

        # Forward pass through the model
        output, hidden = perturbed_model(batch_input, hidden)
        batch_target = perturbed_training_data[:, time_step+1, :].argmax(dim=-1)  # Shape: (batch_size)

        # Calculate loss for this time step and accumulate
        loss += criterion(output.squeeze(1), batch_target)

    # Backward pass and optimization
    loss.backward()
    optimizer.step()

    # Print the loss at the end of each epoch
    print(f'\r Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item():.4f}', end = ' ')


<br/>

### Likelihood ratio with Autoregressive model [4/4]

- **perturbed model**과 **model**을 기반으로, **test data**에 대한 **likelihood ratio**를 계산함.

In [None]:
# Iterate through each sequence in the test data
for i, sequence in enumerate(test_data):
    # Initialize hidden state for the sequence
    hidden = (torch.zeros(1, 1, hidden_size).to(device), torch.zeros(1, 1, hidden_size).to(device))


    # Initialize likelihood
    sequence_likelihood = 0.0
    
    for time_step in range(0,49):
        category_vector = sequence[time_step].view(1, 1, -1).float()  # Reshape for LSTM
        output, hidden = model(category_vector, hidden)
        target = torch.argmax(sequence[time_step+1].view(1, 1, -1), dim=-1)
        sequence_likelihood += -criterion(output.squeeze(0), target.squeeze(0)).item()  # Use negative log-likelihood
        
        
        
    # Initialize likelihood for perturbed
    sequence_likelihood_perturbed = 0.0

    for time_step in range(0,49):
        category_vector = sequence[time_step].view(1, 1, -1).float()  # Reshape for LSTM
        output, hidden = perturbed_model(category_vector, hidden)
        target = torch.argmax(sequence[time_step+1].view(1, 1, -1), dim=-1)
        sequence_likelihood_perturbed += -criterion(output.squeeze(0), target.squeeze(0)).item()  # Use negative log-likelihood
        
        
    LLR = np.exp(sequence_likelihood)/np.exp(sequence_likelihood_perturbed)
    
    print(''.join(test_data_orig[i]), ' label:','outlier' if test_data_label[i] else 'normal', ' LLR:', LLR)