# **Homework 8 - Anomaly Detection**

If there are any questions, please contact mlta-2023-spring@googlegroups.com

Slide:    [Link](https://docs.google.com/presentation/d/18LkR8qulwSbi3SVoLl1XNNGjQQ_qczs_35lrJWOmHCk/edit?usp=sharing)　Kaggle: [Link](https://www.kaggle.com/t/c76950cc460140eba30a576ca7668d28)

# Set up the environment


## Package installation

In [None]:
# Training progress bar
!pip install -q qqdm

## Downloading data

In [None]:
!curl -s https://packagecloud.io/install/repositories/github/git-lfs/script.deb.sh |  bash
!apt-get install -y --allow-unauthenticated git-lfs

In [None]:
!git clone https://github.com/chiyuanhsiao/ml2023spring-hw8

In [None]:
%cd /kaggle/working/ml2023spring-hw8
!git lfs install
!git lfs pull

# Import packages

In [None]:
import random
import numpy as np
import torch
from torch import nn
from torch.utils.data import DataLoader, RandomSampler, SequentialSampler, TensorDataset
import torchvision.transforms as transforms
import torch.nn.functional as F
from torch.autograd import Variable
import torchvision.models as models
from torch.optim import Adam, AdamW
from qqdm import qqdm, format_str
import pandas as pd
from torchinfo import summary

# Loading data

In [None]:
train = np.load('/kaggle/working/ml2023spring-hw8/trainingset.npy', allow_pickle=True)
test = np.load('/kaggle/working/ml2023spring-hw8/testingset.npy', allow_pickle=True)

print(train.shape)
print(test.shape)

## Random seed
Set the random seed to a certain value for reproducibility.

In [None]:
def same_seeds(seed):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    if torch.cuda.is_available():
        torch.cuda.manual_seed(seed)
        torch.cuda.manual_seed_all(seed)
    torch.backends.cudnn.benchmark = False
    torch.backends.cudnn.deterministic = True

same_seeds(48763)

# Autoencoder

# Models & loss

In [None]:
class Residual_block(nn.Module):
    def __init__(self, ic, oc, stride=1): # 当传入 stride = 2 时，会把图片长宽缩小一倍
        super().__init__()
        self.conv1 = nn.Sequential(
            nn.Conv2d(ic, oc, stride=stride, padding=1, kernel_size=3), # stride=2: (H,W) -> (H/2, W/2)
            nn.BatchNorm2d(oc),
            nn.ReLU(inplace=True)
        )
        self.conv2 = nn.Sequential(
            nn.Conv2d(oc, oc, stride=1, padding=1, kernel_size=3), # (H,W) -> (H,W)
            nn.BatchNorm2d(oc)
        )
        
        self.downsample = None # 让原来的 x 变成能和 forward(x) 相加的形状，包括 channel 和 (H,W) 都应相同
        if((stride != 1)  or (ic != oc)): # stride != 1 -> (H,W) 变小 ic != oc -> channel 不同
            self.downsample = nn.Sequential(
                nn.Conv2d(ic, oc, stride=stride, kernel_size=1),
                nn.BatchNorm2d(oc)
            )
            
    def forward(self, x):
        residual = x
        x = self.conv1(x)
        x = self.conv2(x)
        
        if(self.downsample != None):
            residual = self.downsample(residual)
        
        x = x + residual
        x = nn.ReLU(inplace = True)(x)
        return x

class ResNet(nn.Module):
    def __init__(self, block=Residual_block, num_layers = [2, 1, 1, 1]):
        super().__init__()
        self.preconv = nn.Sequential( # 3*64*64 --> 32*64*64
            nn.Conv2d(3, 32, kernel_size=3, padding=1, stride=1, bias=False),
            nn.BatchNorm2d(32),
            nn.ReLU(inplace=True)
        )
        
        def make_residual(block, ic, oc, num_layer, stride=1):
            layers = []
            layers.append(block(ic, oc, stride))
            for i in range(num_layer-1):
                layers.append(block(oc, oc))
            return nn.Sequential(*layers)

        self.layer0 = make_residual(block, ic=32, oc=64, num_layer=num_layers[0], stride=2)
        self.layer1 = make_residual(block, ic=64, oc=128, num_layer=num_layers[1], stride=2)
        self.layer2 = make_residual(block, ic=128, oc=128, num_layer=num_layers[2], stride=2)
        self.layer3 = make_residual(block, ic=128, oc=64, num_layer=num_layers[3], stride=2) 

        self.fc = nn.Sequential(
            nn.Flatten(), # 也可以用 .view(shape[0], -1)
            nn.Dropout(0.2),
            nn.Linear(64*4*4, 64),
            nn.BatchNorm1d(64),
            nn.ReLU(inplace = True)
        )
        
        # 关于 ConvTranspose2d 的介绍：https://blog.csdn.net/qq_36201400/article/details/112604740
        # 大概就是在原图相邻的两个格子之间插 stride-1 个 0（这样原图就会变大了），padding <- kernel-padding-1
        self.decoder = nn.Sequential(
            nn.Linear(64, 64*4*4), # (64) -> (64*4*4)
            nn.BatchNorm1d(64*4*4),
            nn.ReLU(),
            nn.Unflatten(1, (64, 4, 4)), # (64*4*4) -> (64, 4, 4)
            nn.ConvTranspose2d(64,128,kernel_size=4,stride=2,padding=1), # (64,4,4) -> (128,8,8)
            nn.BatchNorm2d(128),
            nn.ReLU(),
            nn.ConvTranspose2d(128,128,kernel_size=4,stride=2,padding=1), # (128,8,8) -> (128,16,16)
            nn.BatchNorm2d(128),
            nn.ReLU(),
            nn.ConvTranspose2d(128,128,kernel_size=4,stride=2,padding=1), # (128,16,16) -> (128,32,32)
            nn.BatchNorm2d(128),
            nn.ReLU(),
            nn.ConvTranspose2d(128,64,kernel_size=4,stride=2,padding=1), # (128,32,32) -> (3,64,64)
            nn.BatchNorm2d(64),
            nn.ReLU(),
            nn.Conv2d(64, 32, kernel_size=3, padding=1, stride=1),
            nn.BatchNorm2d(32),
            nn.ReLU(),
            nn.Conv2d(32, 16, kernel_size=3, padding=1, stride=1),
            nn.BatchNorm2d(16),
            nn.ReLU(),
            nn.Conv2d(16, 3, kernel_size=3, padding=1, stride=1),
            nn.Tanh()
        )
    
    def encoder(self, x):
        x = self.preconv(x) # (3,64,64) -> (32,64,64)
        x = self.layer0(x) # (32,64,64) -> (64,32,32) 且通过 resnet(shortcut) 实现，下同
        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x) # (128,8,8) -> (64,4,4)
        x = self.fc(x) # (64,4,4) -> (64*4*4) -> (64)
        return x
        
    def forward(self, x): # x : (3, 64, 64)
        x = self.encoder(x)
        x = self.decoder(x)
        return x
summary(ResNet(), input_size=(64, 3, 64, 64))

In [None]:
class fcn_autoencoder(nn.Module):
    def __init__(self):
        super(fcn_autoencoder, self).__init__()
        self.encoder = nn.Sequential(
            nn.Linear(64 * 64 * 3, 128),
            nn.ReLU(),
            nn.Linear(128, 64),
            nn.ReLU(), 
            nn.Linear(64, 12), 
            nn.ReLU(), 
            nn.Linear(12, 3)
        )
        
        self.decoder = nn.Sequential(
            nn.Linear(3, 12),
            nn.ReLU(), 
            nn.Linear(12, 64),
            nn.ReLU(),
            nn.Linear(64, 128),
            nn.ReLU(), 
            nn.Linear(128, 64 * 64 * 3), 
            nn.Tanh()
        )

    def forward(self, x):
        x = self.encoder(x)
        x = self.decoder(x)
        return x


class conv_autoencoder(nn.Module):
    def __init__(self):
        super(conv_autoencoder, self).__init__()
        self.encoder = nn.Sequential(
            nn.Conv2d(3, 12, 4, stride=2, padding=1),         
            nn.ReLU(),
            nn.Conv2d(12, 24, 4, stride=2, padding=1),        
            nn.ReLU(),
			      nn.Conv2d(24, 48, 4, stride=2, padding=1),         
            nn.ReLU(),
        )
        self.decoder = nn.Sequential(
			      nn.ConvTranspose2d(48, 24, 4, stride=2, padding=1),
            nn.ReLU(),
			      nn.ConvTranspose2d(24, 12, 4, stride=2, padding=1), 
            nn.ReLU(),
            nn.ConvTranspose2d(12, 3, 4, stride=2, padding=1),
            nn.Tanh(),
        )

    def forward(self, x):
        x = self.encoder(x)
        x = self.decoder(x)
        return x


class VAE(nn.Module):
    def __init__(self):
        super(VAE, self).__init__()
        self.encoder = nn.Sequential(
            nn.Conv2d(3, 12, 4, stride=2, padding=1),            
            nn.ReLU(),
            nn.Conv2d(12, 24, 4, stride=2, padding=1),    
            nn.ReLU(),
        )
        self.enc_out_1 = nn.Sequential(
            nn.Conv2d(24, 48, 4, stride=2, padding=1),  
            nn.ReLU(),
        )
        self.enc_out_2 = nn.Sequential(
            nn.Conv2d(24, 48, 4, stride=2, padding=1),
            nn.ReLU(),
        )
        self.decoder = nn.Sequential(
			      nn.ConvTranspose2d(48, 24, 4, stride=2, padding=1), 
            nn.ReLU(),
			      nn.ConvTranspose2d(24, 12, 4, stride=2, padding=1), 
            nn.ReLU(),
            nn.ConvTranspose2d(12, 3, 4, stride=2, padding=1), 
            nn.Tanh(),
        )

    def encode(self, x):
        h1 = self.encoder(x)
        return self.enc_out_1(h1), self.enc_out_2(h1)

    def reparametrize(self, mu, logvar):
        std = logvar.mul(0.5).exp_()
        if torch.cuda.is_available():
            eps = torch.cuda.FloatTensor(std.size()).normal_()
        else:
            eps = torch.FloatTensor(std.size()).normal_()
        eps = Variable(eps)
        return eps.mul(std).add_(mu)

    def decode(self, z):
        return self.decoder(z)

    def forward(self, x):
        mu, logvar = self.encode(x)
        z = self.reparametrize(mu, logvar)
        return self.decode(z), mu, logvar


def loss_vae(recon_x, x, mu, logvar, criterion):
    """
    recon_x: generating images
    x: origin images
    mu: latent mean
    logvar: latent log variance
    """
    mse = criterion(recon_x, x)
    KLD_element = mu.pow(2).add_(logvar.exp()).mul_(-1).add_(1).add_(logvar)
    KLD = torch.sum(KLD_element).mul_(-0.5)
    return mse + KLD

# Dataset module

Module for obtaining and processing data. The transform function here normalizes image's pixels from [0, 255] to [-1.0, 1.0].


In [None]:
class CustomTensorDataset(TensorDataset):
    """TensorDataset with support of transforms.
    """
    def __init__(self, tensors):
        self.tensors = tensors
        if tensors.shape[-1] == 3:
            self.tensors = tensors.permute(0, 3, 1, 2)
        
        self.transform = transforms.Compose([
          transforms.Lambda(lambda x: x.to(torch.float32)),
          transforms.Lambda(lambda x: 2. * x/255. - 1.),
        ])
        
    def __getitem__(self, index):
        x = self.tensors[index]
        
        if self.transform:
            # mapping images to [-1.0, 1.0]
            x = self.transform(x)

        return x

    def __len__(self):
        return len(self.tensors)

# Training

## Configuration


In [None]:
# Training hyperparameters
num_epochs = 100
batch_size = 512
learning_rate = 1e-3

# Build training dataloader
x = torch.from_numpy(train)
train_dataset = CustomTensorDataset(x)

train_sampler = RandomSampler(train_dataset)
train_dataloader = DataLoader(train_dataset, sampler=train_sampler, batch_size=batch_size)

# Model
model_type = 'resnet'   # selecting a model type from {'cnn', 'fcn', 'vae', 'resnet'}
model_classes = {'fcn': fcn_autoencoder(), 'cnn': conv_autoencoder(), 'vae': VAE(),'resnet':ResNet()}
model = model_classes[model_type].cuda()

# Loss and optimizer
criterion = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
import transformers
scheduler = transformers.get_cosine_schedule_with_warmup(optimizer,3,num_epochs)

## Training loop

In [None]:

best_loss = np.inf
model.train()

qqdm_train = qqdm(range(num_epochs), desc=format_str('bold', 'Description'))
for epoch in qqdm_train:
    tot_loss = list()
    for data in train_dataloader:

        # ===================loading=====================
        img = data.float().cuda()
        if model_type in ['fcn']:
            img = img.view(img.shape[0], -1)

        # ===================forward=====================
        output = model(img)
        if model_type in ['vae']:
            loss = loss_vae(output[0], img, output[1], output[2], criterion)
        else:
            loss = criterion(output, img)

        tot_loss.append(loss.item())
        # ===================backward====================
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        scheduler.step()
    # ===================save_best====================
    mean_loss = np.mean(tot_loss)
    if mean_loss < best_loss:
        best_loss = mean_loss
        torch.save(model, 'best_model_{}.pt'.format(model_type))
    # ===================log========================
    qqdm_train.set_infos({
        'epoch': f'{epoch + 1:.0f}/{num_epochs:.0f}',
        'loss': f'{mean_loss:.4f}',
    })
    # ===================save_last========================
    torch.save(model, 'last_model_{}.pt'.format(model_type))

# Inference
Model is loaded and generates its anomaly score predictions.

## Initialize
- dataloader
- model
- prediction file

In [None]:
eval_batch_size = 200

# build testing dataloader
data = torch.tensor(test, dtype=torch.float32)
test_dataset = CustomTensorDataset(data)
test_sampler = SequentialSampler(test_dataset)
test_dataloader = DataLoader(test_dataset, sampler=test_sampler, batch_size=eval_batch_size, num_workers=1)
eval_loss = nn.MSELoss(reduction='none')

# load trained model
checkpoint_path = f'last_model_{model_type}.pt'
model = torch.load(checkpoint_path)
model.eval()

# prediction file 
out_file = '/kaggle/working/prediction.csv'

In [None]:
anomality = list()
with torch.no_grad():
  for i, data in enumerate(test_dataloader):
    img = data.float().cuda()
    if model_type in ['fcn']:
      img = img.view(img.shape[0], -1)
    output = model(img)
    if model_type in ['vae']:
      output = output[0]
    if model_type in ['fcn']:
        loss = eval_loss(output, img).sum(-1)
    else:
        loss = eval_loss(output, img).sum([1, 2, 3])
    anomality.append(loss)
anomality = torch.cat(anomality, axis=0)
anomality = torch.sqrt(anomality).reshape(len(test), 1).cpu().numpy()

df = pd.DataFrame(anomality, columns=['score'])
df.to_csv(out_file, index_label = 'ID')

In [None]:
import matplotlib.pyplot as plt

# 获取 score 最大和最小的四个索引
max_idx = df.score.nlargest(4).index
min_idx = df.score.nsmallest(4).index

# 创建子图布局，2 行 4 列
fig, axes = plt.subplots(4, 4, figsize=(16, 16))

# 绘制最大分数的图像
for i, idx in enumerate(max_idx):
    # 原始图像
    axes[0, i].imshow(test[idx])
    axes[0, i].set_title(f'Max Score {i+1}')
    axes[0, i].axis('off')  # 隐藏坐标轴

    # 获取预测图像
    img = test_dataset.__getitem__(idx)
    pred = model(img.unsqueeze(0).cuda()).squeeze(0).permute(1, 2, 0).cpu().detach().numpy()

    # 对预测图像进行归一化并转换为 int 类型
    pred = (pred + 1) * 255 / 2
    pred = pred.astype(int)

    # 显示预测图像
    axes[1, i].imshow(pred)
    axes[1, i].set_title(f'Predicted {i+1}')
    axes[1, i].axis('off')  # 隐藏坐标轴

# 绘制最小分数的图像
for i, idx in enumerate(min_idx):
    # 原始图像
    axes[2, i].imshow(test[idx])
    axes[2, i].set_title(f'Min Score {i+1}')
    axes[2, i].axis('off')  # 隐藏坐标轴

    # 获取预测图像
    img = test_dataset.__getitem__(idx)
    pred = model(img.unsqueeze(0).cuda()).squeeze(0).permute(1, 2, 0).cpu().detach().numpy()

    # 对预测图像进行归一化并转换为 int 类型
    pred = (pred + 1) * 255 / 2
    pred = pred.astype(int)

    # 显示预测图像
    axes[3, i].imshow(pred)
    axes[3, i].set_title(f'Predicted {i+1}')
    axes[3, i].axis('off')  # 隐藏坐标轴

# 显示图像
plt.tight_layout()  # 自动调整子图布局
plt.show()

In [None]:
df.describe()