#Anomaly detection using Autoencoder (PyTorch)

## Install requirements

In [None]:
%pip install tsne-torch

In [None]:
! pip install kaggle

## Import libraries

In [78]:
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision
from torchvision import datasets, transforms
import matplotlib.pyplot as plt
import os
import re
from PIL import Image
from sklearn.manifold import TSNE
import numpy as np


## Load data

In [None]:
! mkdir ~/.kaggle

In [5]:
cp kaggle.json ~/.kaggle/

In [6]:
! chmod 600 ~/.kaggle/kaggle.json

In [None]:
! kaggle datasets download vipoooool/new-plant-diseases-dataset

In [None]:
! unzip new-plant-diseases-dataset.zip

In [None]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
device

## ِ Visualize data

In [None]:
img = Image.open('/Users/nguyenphan/Developer/Leaf-Anomaly-Detection/grape-only-dataset/train/Grape___Black_rot/0b9d95bb-51c7-40f1-8a4b-f9838becb418___FAM_B.Rot 0493.JPG')
img

In [None]:
import cv2
import numpy as np
from PIL import Image
import matplotlib.pyplot as plt

# Đọc ảnh gốc
img = Image.open('/Users/nguyenphan/Developer/Leaf-Anomaly-Detection/grape-only-dataset/train/Grape___Esca_(Black_Measles)/eb4b029a-c930-444b-ae7f-337085696357___FAM_B.Msls 3962.JPG')

# Hàm CLAHE
def apply_clahe(img_pil):
    img = np.array(img_pil)
    img_yuv = cv2.cvtColor(img, cv2.COLOR_RGB2YUV)
    clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8,8))
    img_yuv[:,:,0] = clahe.apply(img_yuv[:,:,0])
    img_clahe = cv2.cvtColor(img_yuv, cv2.COLOR_YUV2RGB)
    return Image.fromarray(img_clahe)

# Hàm Gamma Correction
def apply_gamma(img_pil, gamma=1.5):
    img = np.array(img_pil) / 255
    img_gamma = np.power(img, gamma)
    img_gamma = np.uint8(img_gamma * 255)
    return Image.fromarray(img_gamma)

# Áp dụng CLAHE và Gamma Correction
img_clahe = apply_clahe(img)
img_gamma = apply_gamma(img_clahe, gamma=1.5)

# Hiển thị ảnh before & after
plt.figure(figsize=(12,6))
plt.subplot(1,2,1)
plt.title('Before (Original)')
plt.imshow(img)
plt.axis('off')

plt.subplot(1,2,2)
plt.title('After (CLAHE + Gamma)')
plt.imshow(img_gamma)
plt.axis('off')

plt.show()

## Building train set

In [127]:
# train_path = "/Users/nguyenphan/Developer/Leaf-Anomaly-Detection/grape-only-dataset/train"

train_path = "/Users/nguyenphan/Developer/Leaf-Anomaly-Detection/new-plant-diseases-dataset/New Plant Diseases Dataset(Augmented)/New Plant Diseases Dataset(Augmented)/train"

In [128]:
def get_label(name):
  label = 0
  if bool(re.match('Grape___E.+', name)):
    label = 1
  elif bool(re.match('Grape___L.+', name)):
    label = 2
  elif bool(re.match('Grape___h.+', name)):
    label = 3
  return label

In [None]:
convert_tensor = transforms.ToTensor()
train_data = []
for root, dirs, files in os.walk(train_path):
  class_name = root.split('/')[-1]
  if bool(re.match('Grape.+', class_name)):
    for file in files:
      path = os.path.join(root, file)
      img = Image.open(path)
      tensor_img = convert_tensor(img)
      tensor_img = tensor_img.to(device)
      label = get_label(class_name)
      train_data.append([tensor_img, label])

print(len(train_data))

In [None]:
train_data[0][0].is_cuda

In [132]:
train_loader = torch.utils.data.DataLoader(
    dataset= train_data,
    batch_size=16
)

In [86]:
# from torch.utils.data import Dataset, DataLoader

# class LeafDataset(Dataset):
#     def __init__(self, data):
#         self.data = data

#     def __len__(self):
#         return len(self.data)

#     def __getitem__(self, idx):
#         return self.data[idx][0], self.data[idx][1]

# # Tạo dataset và dataloader
# train_dataset = LeafDataset(train_data)
# train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True)

In [None]:
# train_iter = iter(train_loader)
# train_images, train_labels = train_iter.next()
# print(train_images.shape, '  ', train_labels.shape)

In [None]:
# print(torch.min(train_images), torch.max(train_images))

## Building Model

In [133]:
class Autoencoder(nn.Module):
  def __init__(self):
    super().__init__()
    self.encoder = nn.Sequential(
        nn.Conv2d(3, 16, kernel_size=7, stride=2, padding=1),
        nn.ReLU(),
        nn.Conv2d(16, 32, kernel_size=7, stride=2, padding=1),
        nn.ReLU(),
        nn.Conv2d(32, 64, kernel_size=7, stride=2, padding=1),
        nn.ReLU(),
        nn.Conv2d(64, 128, kernel_size=5)
    )

    self.decoder = nn.Sequential(
        nn.ConvTranspose2d(128, 64, kernel_size=5),
        nn.ReLU(),
        nn.ConvTranspose2d(64, 32, kernel_size=7, stride=2, padding=1),
        nn.ReLU(),
        nn.ConvTranspose2d(32, 16, kernel_size=7, stride=2, padding=1, output_padding=1),
        nn.ReLU(),
        nn.ConvTranspose2d(16, 3, kernel_size=7, stride=2, padding=1, output_padding=1),
        nn.Sigmoid() # As we saw that the input tensors are between 0 and 1 so we should use an activation function to map our values to that range.
    )
    
  def forward(self, x):
    encoded = self.encoder(x)
    decoded = self.decoder(encoded)
    return decoded


In [124]:
# import torch
# import torch.nn as nn
# import torch.nn.functional as F
# from pytorch_msssim import SSIM

# class AnomalyLoss(nn.Module):
#     def __init__(self, alpha=0.5, beta=0.3, gamma=0.2):
#         super().__init__()
#         self.alpha = alpha  # Hệ số cho MSE
#         self.beta = beta    # Hệ số cho SSIM
#         self.gamma = gamma  # Hệ số cho L1 Loss
#         self.ssim = SSIM(data_range=1.0, size_average=True, channel=3)

#     def forward(self, y_pred, y_true):
#         # MSE Loss
#         mse = F.mse_loss(y_pred, y_true)

#         # SSIM Loss (1 - SSIM vì SSIM càng cao càng tốt)
#         ssim = 1 - self.ssim(y_pred, y_true)

#         # L1 Loss
#         l1 = F.l1_loss(y_pred, y_true)

#         # Kết hợp các loss
#         total_loss = self.alpha * mse + self.beta * ssim + self.gamma * l1

#         return total_loss

In [None]:
from piq import ssim
from torchvision.models import vgg16

class AnomalyLoss2(nn.Module):
    def __init__(self, alpha=0.5, beta=0.3, gamma=0.2):
        super().__init__()
        self.alpha = alpha
        self.beta = beta
        self.gamma = gamma
        self.contrastive_loss = nn.CosineEmbeddingLoss()

    def forward(self, y_pred, y_true, labels):
        ssim_loss = 1 - ssim(y_pred, y_true, data_range=1.0)
        
        perceptual_loss = self.perceptual_loss(y_pred, y_true)
        
        #kl_divergence = self.kl_divergence(y_pred, y_true)
        
        contrastive_loss = self.contrastive_loss(y_pred, y_true, labels)
        
        total_loss = self.alpha * ssim_loss + self.beta * perceptual_loss + contrastive_loss
        
        return total_loss
    
    def perceptual_loss(self, y_pred, y_true):
        # Trích xuất đặc trưng từ VGG16
        y_pred_features = self.vgg(y_pred)
        y_true_features = self.vgg(y_true)
        # Tính toán MSE giữa các đặc trưng
        return F.mse_loss(y_pred_features, y_true_features)

    def kl_divergence(self, y_pred, y_true):
        # Tính toán KL Divergence
        y_pred_log_softmax = F.log_softmax(y_pred, dim=1)
        y_true_softmax = F.softmax(y_true, dim=1)
        return F.kl_div(y_pred_log_softmax, y_true_softmax, reduction='batchmean')

In [125]:
import torch
# import torch.nn as nn
# import torch.nn.functional as F

# class CustomLoss(nn.Module):
#     def forward(self, y_pred, y_true):
#         mse = F.mse_loss(y_pred, y_true)
#         bce = F.binary_cross_entropy(y_pred, y_true)
#         return mse + bce
    
# Sử dụng loss function
criterion = AnomalyLoss2().to(device)
model = Autoencoder().to(device)
optimizer = optim.Adam(model.parameters(), lr=1e-3, weight_decay=1e-5)

In [None]:
#print(img.min().item(), img.max().item())   # nên thấy 0.0  1.0


In [None]:
# State_dict của model là một Python dict, với key là tên của layer và value là parameter của layer đó, 
# bao gồm weight và bias. Bên cạnh model, optimizer (torch.optim) cũng có state_dict, có chứa những thông tin về optimizer’s state, 
# cũng như các hyperparameter đi cùng.

print("Model's state_dict:")
for param_tensor in model.state_dict():
    print(param_tensor, "\t", model.state_dict()[param_tensor].size())

print ("\n")

print("Optimizer's state_dict:")
for var in optimizer.state_dict():
    print(var, "\t", optimizer.state_dict()[var])


In [None]:
ckpt_path = '/Users/nguyenphan/Developer/Leaf-Anomaly-Detection/checkpoint.pth'
start_epoch = 0
if os.path.exists(ckpt_path):
    ckpt = torch.load(ckpt_path, map_location=device)
    model.load_state_dict(ckpt['model_state_dict'])
    optimizer.load_state_dict(ckpt['optimizer_state_dict'])
    start_epoch = ckpt['epoch'] + 1
    print(f'\nLoaded checkpoint at epoch {start_epoch}')

In [None]:
# lưu state_dict vào model 
torch.save(model.state_dict(), '/Users/nguyenphan/Developer/Leaf-Anomaly-Detection/checkpoint.pth')
# tải state_dict từ file
state_dict = torch.load('/Users/nguyenphan/Developer/Leaf-Anomaly-Detection/checkpoint.pth')
# áp dụng state_dict vào model
model.load_state_dict(state_dict)
# save all model
torch.save(model, '/Users/nguyenphan/Developer/Leaf-Anomaly-Detection/model.pth')
# load all model
model = torch.load('/Users/nguyenphan/Developer/Leaf-Anomaly-Detection/model.pth')


In [None]:
def train():
  for epoch in range(1000): # 1000 epochs
    for (img, _) in train_loader:
      recon = model(img)
      loss = criterion(recon, img)

      optimizer.zero_grad()
      loss.backward()
      optimizer.step()

    print(f'Epoch:{epoch+1}, Loss:{loss.item():.4f}')
    # outputs.append((epoch, img, recon))
    torch.save({
      'epoch': epoch,
      'model_state_dict': model.state_dict(),
      'optimizer_state_dict': optimizer.state_dict(),
      'loss': loss,
    }, '/Users/nguyenphan/Developer/Leaf-Anomaly-Detection/checkpoint.pth')
      
  

In [None]:
#load checkpoint
checkpoint = torch.load('/Users/nguyenphan/Developer/Leaf-Anomaly-Detection/checkpoint.pth')
model.load_state_dict(checkpoint['model_state_dict'])
optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
epoch = checkpoint['epoch']
loss = checkpoint['loss']

model.train()

## Building test set

In [94]:
test_path = "/Users/nguyenphan/Developer/Leaf-Anomaly-Detection/grape-only-dataset/test/test"

In [None]:
convert_tensor = transforms.ToTensor()
test_data = []
test_label = []
for root, dirs, files in os.walk(test_path):
    for file in files:
      path = os.path.join(root, file)
      img = Image.open(path)
      tensor_img = convert_tensor(img)
      tensor_img = tensor_img.to(device)
      test_data.append([tensor_img, 4])
      test_label.append('Anomaly')

print(len(test_data))

In [96]:
test_data = test_data[:28]
test_label = test_label[:28]

adding un-anomaly data to test set:

In [97]:
val_path = '/Users/nguyenphan/Developer/Leaf-Anomaly-Detection/grape-only-dataset/valid'

In [None]:
count = [0 for i in range(4)]

for root, dirs, files in os.walk(val_path):
  class_name = root.split('/')[-1]
  if bool(re.match('Grape.+', class_name)):
    for file in files:
      label = get_label(class_name)
      count[label] += 1
      if count[label] >= 18:
        break
      path = os.path.join(root, file)
      img = Image.open(path)
      tensor_img = convert_tensor(img)
      tensor_img = tensor_img.to(device)
      test_data.append([tensor_img, label])
      test_label.append(label)


print(len(test_data))

In [None]:
test_data[0][0].is_cuda

In [100]:
test_loader = torch.utils.data.DataLoader(
    dataset= test_data,
    batch_size=8,
    shuffle=False
)

## Test model

In [101]:
torch.cuda.empty_cache() 

In [None]:
encodes = []

for (img, _) in test_loader:
  code = model.encoder(img)
  encodes.append(torch.tensor(code, device = 'cpu'))
  
len(encodes)

In [None]:
[t.shape for t in encodes]

In [None]:
y = []
tsne = TSNE(perplexity=5)
for t in encodes:
  x = t.reshape(8, -1)
  embd = tsne.fit_transform(x)
  y.append(embd)

len(y)

In [None]:
y[7]

In [None]:
x = []
for i in range(len(y)):
  for j in y[i]:
    x.append(j)

len(x)

In [None]:
# ttt = np.array(x)
# list(map(lambda i : i[0], x))

In [None]:
import seaborn as sns

# sns.set(rc={'figure.figsize':(11.7,8.27)})
# sns.scatterplot(list(map(lambda i : i[0], x)), list(map(lambda i : i[1], x)), hue=test_label)

sns.set(rc={'figure.figsize':(11.7,8.27)})
sns.scatterplot(x = list(map(lambda i : i[1], x)), y = list(map(lambda i : i[0], x)), hue=test_label)