# **Homework 8 - Anomaly Detection**

If there are any questions, please contact mlta-2022spring-ta@googlegroups.com

Slide:    [Link]()　Kaggle: [Link](https://www.kaggle.com/c/ml2022spring-hw8)　Ref: [Link](https://github.com/pai4451/ML2021/tree/main/hw8)

* GANomaly: 
    * [blog](https://blog.csdn.net/qq7835144/article/details/111029750) 
    * [github](https://github.com/qqsuhao/GANomaly-MvTec-grid)
    

In [None]:
!nvidia-smi

# Import packages

In [None]:
import torch
from torch import nn
from torch.utils.data import DataLoader, RandomSampler, SequentialSampler, TensorDataset
import torchvision
import torchvision.transforms as transforms
import torch.nn.functional as F
from torch.autograd import Variable
import torchvision.models as models
from torch.optim import Adam, AdamW

import os
import random
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from tqdm.auto import tqdm, trange

# Set up the environment


## Device
get device: cpu or cuda

In [None]:
cuda = True if torch.cuda.is_available() else False
device = torch.device('cuda:0' if cuda else 'cpu')
FloatTensor = torch.cuda.FloatTensor if cuda else torch.FloatTensor
device

## Downloading data

In [None]:
%%script false --no-raise-error

if not os.path.exists('data/trainingset.npy'):
    !wget https://github.com/MachineLearningHW/HW8_Dataset/releases/download/v1.0.0/data.zip
    !unzip data.zip

# Loading data

In [None]:
# # in colab
# train_path = 'data/trainingset.npy'
# test_path = 'data/testingset.npy'

# in kaggle
train_path = '../input/ml2022spring-hw8/data/trainingset.npy'
test_path = '../input/ml2022spring-hw8/data/testingset.npy'

In [None]:
train = np.load(train_path, allow_pickle=True)
test = np.load(test_path, allow_pickle=True)

print(train.shape)
print(test.shape)

In [None]:
# MEAN = torch.tensor(train.mean(axis=(0,1,2))/255)
# STD = torch.tensor(train.std(axis=(0,1,2))/255)
# MEAN = MEAN.to(device)
# STD = STD.to(device)
MEAN = torch.tensor([0.5, 0.5, 0.5]).to(device)
STD = torch.tensor([0.5, 0.5, 0.5]).to(device)

In [None]:
def show(imgs, size_inches=(15, 10)):
    if not isinstance(imgs, list):
        imgs = [imgs]
    fig, axs = plt.subplots(ncols=len(imgs), squeeze=False)
    fig.set_size_inches(*size_inches)
    for i, img in enumerate(imgs):
        img = torchvision.transforms.functional.to_pil_image(img)
        axs[0, i].imshow(np.asarray(img))
        axs[0, i].set(xticklabels=[], yticklabels=[], xticks=[], yticks=[])
    plt.show()

In [None]:
train_grid = torchvision.utils.make_grid([torch.tensor(train[i].transpose(2,0,1))  for i in np.random.choice(train.shape[0], 50)], nrow=10)
show(train_grid)
test_grid = torchvision.utils.make_grid([torch.tensor(test[i].transpose(2,0,1))  for i in np.random.choice(test.shape[0], 50)], nrow=10)
show(test_grid)

## Random seed
Set the random seed to a certain value for reproducibility.

In [None]:
def same_seeds(seed):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    if torch.cuda.is_available():
        torch.cuda.manual_seed(seed)
        torch.cuda.manual_seed_all(seed)
    torch.backends.cudnn.benchmark = False
    torch.backends.cudnn.deterministic = True

same_seeds(3)

NameError: ignored

# Autoencoder

# Models & loss

In [None]:
class fcn_autoencoder(nn.Module):
    def __init__(self):
        super(fcn_autoencoder, self).__init__()
        self.encoder = nn.Sequential(
            nn.Linear(64 * 64 * 3, 128),
            nn.ReLU(),
            nn.Linear(128, 64),
            nn.ReLU(), 
            nn.Linear(64, 12), 
            nn.ReLU(), 
            nn.Linear(12, 3)
        )
        
        self.decoder = nn.Sequential(
            nn.Linear(3, 12),
            nn.ReLU(), 
            nn.Linear(12, 64),
            nn.ReLU(),
            nn.Linear(64, 128),
            nn.ReLU(), 
            nn.Linear(128, 64 * 64 * 3), 
            nn.Tanh()
        )

    def forward(self, x):
        x = self.encoder(x)
        x = self.decoder(x)
        return x
    
    

class conv_autoencoder(nn.Module):
    def __init__(self, input_channels=3):
        super(conv_autoencoder, self).__init__()
        
        features = 1024
        self.encoder = nn.Sequential(                  #  3 x 64 x 64
            nn.Conv2d(input_channels, 64, 4, stride=2, padding=1),  # 64 x 32 x 32
            nn.BatchNorm2d(64),
            nn.ReLU(),
            nn.Conv2d(64, 128, 4, stride=2, padding=1), # 128 x 16 x 16 
            nn.BatchNorm2d(128),
            nn.ReLU(),
            nn.Conv2d(128, 256, 4, stride=2, padding=1), # 256 x 8 x 8
            nn.BatchNorm2d(256),
            nn.ReLU(),
            nn.Conv2d(256, 512, 4, stride=2, padding=1), # 512 x 4 x 4
            nn.BatchNorm2d(512),
            nn.ReLU(),
            nn.Conv2d(512, 1024, 4, stride=2, padding=1), # 1024 x 2 x 2
            nn.BatchNorm2d(1024),
            nn.ReLU(),
            nn.Conv2d(1024, features, 2, stride=1, padding=1), # features x 2 x 2
            nn.BatchNorm2d(1024),
            nn.ReLU(),
        )
        self.decoder = nn.Sequential(
            nn.ConvTranspose2d(features, 1024, 2, stride=1, padding=1), 
            nn.BatchNorm2d(1024),
            nn.ReLU(),
            nn.ConvTranspose2d(1024, 512, 4, stride=2, padding=1), 
            nn.BatchNorm2d(512),
            nn.ReLU(),
            nn.ConvTranspose2d(512, 256, 4, stride=2, padding=1), 
            nn.BatchNorm2d(256),
            nn.ReLU(),
            nn.ConvTranspose2d(256, 128, 4, stride=2, padding=1), 
            nn.BatchNorm2d(128),
            nn.ReLU(),
            nn.ConvTranspose2d(128, 64, 4, stride=2, padding=1), 
            nn.BatchNorm2d(64),
            nn.ReLU(),
            nn.ConvTranspose2d(64, input_channels, 4, stride=2, padding=1),
            nn.Tanh(),
        )

    def forward(self, x):
        x = self.encoder(x)
        x = self.decoder(x)
        return x

class VAE(nn.Module):
    def __init__(self):
        super(VAE, self).__init__()
        self.encoder = nn.Sequential(
            # 3 x 64 x 64
            nn.Conv2d(3, 12, 4, stride=2, padding=1),   
            nn.ReLU(),
            nn.Conv2d(12, 24, 4, stride=2, padding=1),
            nn.ReLU(),
        )
        self.enc_logvar = nn.Sequential(
            nn.Conv2d(24, 48, 4, stride=2, padding=1),  
            nn.ReLU(),
        )
        self.enc_mu = nn.Sequential(
            nn.Conv2d(24, 48, 4, stride=2, padding=1),
            nn.ReLU(),
        )
        self.decoder = nn.Sequential(
            nn.ConvTranspose2d(48, 24, 4, stride=2, padding=1), 
            nn.ReLU(),
            nn.ConvTranspose2d(24, 12, 4, stride=2, padding=1), 
            nn.ReLU(),
            nn.ConvTranspose2d(12, 3, 4, stride=2, padding=1), 
            nn.Tanh(),
        )

    def encode(self, x):
        h1 = self.encoder(x)
        return self.enc_mu(h1), self.enc_logvar(h1)

    def reparametrize(self, mu, logvar):
        # std = var^(0.5)
        # log(std) = 0.5*log(var)
        std = logvar.mul(0.5).exp_()
        eps = FloatTensor(std.size()).normal_()
        eps = Variable(eps)
        return eps.mul(std).add_(mu)

    def decode(self, z):
        return self.decoder(z)

    def forward(self, x):
        mu, logvar = self.encode(x)
        z = self.reparametrize(mu, logvar)
        return self.decode(z), mu, logvar


def loss_vae(recon_x, x, mu, logvar, criterion, beta):
    """
    recon_x: generating images
    x: origin images
    mu: latent mean
    logvar: latent log variance
    """
    mse = criterion(recon_x, x)
    # Minimize exp(logvar) - (1+logvar) + mu^2 
    # => ignore mu^2 => logvar = 0 make exp(logvar) always 1 would not be 0
    # mu^2 is L2 regularization
    KLD = logvar.exp() - (1+logvar) + mu.pow(2)
    KLD = KLD.mean(dim=0).sum()
#     KLD_element = mu.pow(2).add_(logvar.exp()).mul_(-1).add_(1).add_(logvar)
#     KLD = torch.sum(KLD_element).mul_(-0.5)
#     print("mse", mse, "KLD", KLD)

    # less beta prevent from reconstruction loss does not decreasing
    return mse + beta * KLD

In [None]:
x = np.arange(-1, 1, 0.01)
y1 = np.exp(x)
y2 = 1+x
y3 = y1 - y2
plt.plot(x, y1, label='$e^{logvar}$')
plt.plot(x, y2, label='$1+logvar$')
plt.plot(x, y3, label='$e^{logvar} - (1+logvar)$')
plt.annotate('Minimum: (0,0)',
            xy=(0, 0), xycoords='data',
            xytext=(0.35, 1),
            arrowprops=dict(facecolor='black', shrink=0.05))
plt.legend()
plt.grid()
plt.show()

In [None]:
class ResBlock(nn.Module):
    def __init__(self, inchannel, outchannel, stride=1):
        super(ResBlock, self).__init__()
        #这里定义了残差块内连续的2个卷积层
        self.left = nn.Sequential(
            nn.Conv2d(inchannel, outchannel, kernel_size=3, stride=stride, padding=1, bias=False),
            nn.BatchNorm2d(outchannel),
            nn.ReLU(inplace=True),
            nn.Conv2d(outchannel, outchannel, kernel_size=3, stride=1, padding=1, bias=False),
            nn.BatchNorm2d(outchannel)
        )
        self.shortcut = nn.Sequential()
        if stride != 1 or inchannel != outchannel:
            #shortcut，这里为了跟2个卷积层的结果结构一致，要做处理
            self.shortcut = nn.Sequential(
                nn.Conv2d(inchannel, outchannel, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(outchannel)
            )
            
    def forward(self, x):
        out = self.left(x)
        #将2个卷积层的输出跟处理过的x相加，实现ResNet的基本结构
        out = out + self.shortcut(x)
        out = nn.functional.relu(out)
        
        return out

class ResNet(nn.Module):
    def __init__(self, ResBlock):
        super(ResNet, self).__init__()
        self.inchannel = 128
        self.conv1 = nn.Sequential(
            nn.Conv2d(3, 128, kernel_size=3, stride=1, padding=1, bias=False),
            nn.BatchNorm2d(128),
            nn.ReLU()
        )
        self.layer1 = self.make_layer(ResBlock, 256, 2, stride=2)
        self.layer2 = self.make_layer(ResBlock, 512, 2, stride=2)            
        
        self.decoder = nn.Sequential(
            nn.ConvTranspose2d(512, 256, 4, stride=2, padding=1), # [256, 8, 8]
            nn.BatchNorm2d(256),
            nn.ReLU(),
            nn.ConvTranspose2d(256, 128, 4, stride=2, padding=1), # [128, 16, 16]
            nn.BatchNorm2d(128),
            nn.ReLU(),
            nn.ConvTranspose2d(128, 64, 4, stride=2, padding=1), # [64, 32, 32]
            nn.BatchNorm2d(64),
            nn.ReLU(),
            nn.ConvTranspose2d(64, 3, 4, stride=2, padding=1),  # [3, 64, 64]
            nn.Tanh(),
        )

    #这个函数主要是用来，重复同一个残差块    
    def make_layer(self, block, channels, num_blocks, stride):
        strides = [stride] + [1] * (num_blocks - 1)
        layers = []
        for stride in strides:
            layers.append(block(self.inchannel, channels, stride))
            self.inchannel = channels
        return nn.Sequential(*layers)
    
    def encode(self, x):
        out = self.conv1(x) # [128, 64, 64]
        out = self.layer1(out) # [256, 32, 32]
        out = self.layer2(out) # [512, 16, 16]
        out = nn.functional.avg_pool2d(out, 4) # [512, 4, 4]
        return out
    
    def decode(self, z):
        return self.decoder(z)

    def forward(self, x):
        x = self.encode(x)
        x = self.decode(x)
        return x

## GANomaly

In [None]:
# setting for weight init function
def weights_init(m):
    classname = m.__class__.__name__
    if classname.find('Conv') != -1:
        m.weight.data.normal_(0.0, 0.02)
    elif classname.find('BatchNorm') != -1:
        m.weight.data.normal_(1.0, 0.02)
        m.bias.data.fill_(0)
    elif classname.find('Linear') != -1:
        m.weight.data.normal_(0.0, 0.02)
        m.bias.data.fill_(0)

In [None]:
class Encoder(nn.Module):
    def __init__(self, in_dim, feature_dim=64, out_dim=10):
        super(Encoder, self).__init__()
        self.encorder = self.new_encorder(in_dim, feature_dim, out_dim)
    
    def new_encorder(self, in_dim, feature_dim, out_dim):
        return nn.Sequential(       
            nn.AvgPool2d(5, stride=1, padding=2),
            nn.Conv2d(in_dim, feature_dim, kernel_size=4, stride=2, padding=1), # (batch, 64, 32, 32)
            nn.LeakyReLU(0.2),
            self.conv_bn_lrelu(feature_dim, feature_dim * 2),                   # (batch, 128, 16, 16)
            self.conv_bn_lrelu(feature_dim * 2, feature_dim * 4),               # (batch, 256, 8, 8)
            self.conv_bn_lrelu(feature_dim * 4, out_dim),                       # (batch, 10, 4, 4)
        )
    
    def conv_bn_lrelu(self, in_dim, out_dim):
        return nn.Sequential(
            nn.Conv2d(in_dim, out_dim, kernel_size=4, stride=2, padding=1),
            nn.BatchNorm2d(out_dim),
            nn.LeakyReLU(0.2),
        )
    
    def forward(self, x):
        x = self.encorder(x)
        return x
    
class Generator(nn.Module):
    def __init__(self, in_dim, feature_dim=64, out_dim=10):
        super(Generator, self).__init__()
        
        #input: (batch, 3, 64, 64)
        self.encoder1 = Encoder(in_dim, feature_dim, out_dim)
        self.encoder2 = Encoder(in_dim, feature_dim, out_dim)
        self.decoder = nn.Sequential(
            nn.ConvTranspose2d(out_dim, feature_dim * 4, 4, stride=2, padding=1),    # (batch, 256, 8, 8)
            nn.ReLU(), 
            self.convTrans_bn_lrelu(feature_dim * 4, feature_dim * 2),               # (batch, 128, 16, 16)
            self.convTrans_bn_lrelu(feature_dim * 2, feature_dim),                   # (batch, 64, 32, 32)
            nn.ConvTranspose2d(feature_dim, in_dim, 4, stride=2, padding=1),         # (batch, 3, 64, 64)
            nn.Tanh(),
        )
#         self.apply(weights_init)
        
    
    def convTrans_bn_lrelu(self, in_dim, out_dim):
        return nn.Sequential(
            nn.ConvTranspose2d(in_dim, out_dim, kernel_size=4, stride=2, padding=1),
            nn.BatchNorm2d(out_dim),
            nn.LeakyReLU(0.2),
        )
        
    def forward(self, x):
        f1 = self.encoder1(x)
        x = self.decoder(f1)
        f2 = self.encoder2(x)
        return x, f1, f2

class Discriminator(nn.Module):
    def __init__(self, in_dim, feature_dim=64, out_dim=10):
        super(Discriminator, self).__init__()
            
        #input: (batch, 3, 64, 64)
        self.encoder = Encoder(in_dim, feature_dim, out_dim)
        self.classifier = nn.Sequential(
            nn.Linear(out_dim*4*4, 1),
            nn.Sigmoid(),
        )
#         self.apply(weights_init)
    
    def forward(self, x):
        features = self.encoder(x)
        y = features.view(x.shape[0], -1)
        y = self.classifier(y)
        y = y.squeeze(1) # (B, 1) -> (B)
        return y, features

def loss_GAN(rec_loss, feature_loss, adv_loss):    
    return 10 * rec_loss + feature_loss + adv_loss 

# Dataset module

Module for obtaining and processing data. The transform function here normalizes image's pixels from [0, 255] to [-1.0, 1.0].


In [None]:
class CustomTensorDataset(TensorDataset):
    """TensorDataset with support of transforms.
    """
    def __init__(self, tensors, half=False):
        self.half = half
        self.tensors = tensors
        if tensors.shape[-1] == 3:
            self.tensors = tensors.permute(0, 3, 1, 2)
        
        self.transform = transforms.Compose([
            transforms.Lambda(lambda x: x.to(torch.float32)),
            transforms.Lambda(lambda x: 2. * x/255. - 1.),
#             transforms.ToPILImage(),
#             transforms.ToTensor(), # range [0, 255] -> [0.0, 1.0]
#             # output[channel] = (input[channel] - mean[channel]) / std[channel]
#             transforms.Normalize(mean=MEAN, std=STD), # range [0, 1.0] -> [-1.0, 1.0]
#             transforms.Grayscale(),
        ])
        
    def __getitem__(self, index):
        x = self.tensors[index]
        
        if self.transform:
            # mapping images to [-1.0, 1.0]
            x = self.transform(x)
        
        if self.half:
            height_cutoff = x.shape[1] // 2 # for detect hat
            x = x[:, :height_cutoff, :]
        return x

    def __len__(self):
        return len(self.tensors)

    
batch_size = 128 # smaller batchsize is better
# Build training dataloader
train_dataset = CustomTensorDataset(torch.from_numpy(train))
train_dataloader = DataLoader(train_dataset, shuffle=True, batch_size=batch_size)

eval_batch_size = 200
# build testing dataloader
test_dataset = CustomTensorDataset(torch.from_numpy(test))
test_dataloader = DataLoader(test_dataset, shuffle=False, batch_size=eval_batch_size)

In [None]:
recover_from_normalize = lambda img: img * STD[:, None, None] + MEAN[:, None, None]

imgs = iter(train_dataloader).next()[:10]
train_grid = torchvision.utils.make_grid([recover_from_normalize(img.to(device)) for img in imgs], nrow=5)
show(train_grid, size_inches=(15, 5))

# Training

## Configuration


In [None]:
# Training hyperparameters
num_epochs = 2
learning_rate = 1e-3

## Training loop

In [None]:
%%script false --no-raise-error

# Model
model_type = 'fcn'   # selecting a model type from {'cnn', 'fcn', 'vae', 'resnet'}
model_classes = {'fcn': fcn_autoencoder(), 'cnn': conv_autoencoder(3), 'vae': VAE(), 'resnet': ResNet(ResBlock)}
model = model_classes[model_type].to(device)

In [None]:
%%script false --no-raise-error

# Loss and optimizer
criterion = nn.MSELoss(reduction='mean')
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

best_loss = np.inf
model.train()

for epoch in range(num_epochs):
    tot_loss = list()
    progress_bar = tqdm(train_dataloader)
    for data in progress_bar:
        # ===================loading=====================
        img = data.float().to(device)
        if model_type in ['fcn']:
            img = img.view(img.shape[0], -1)

        # ===================forward=====================
        output = model(img)
        if model_type in ['vae']:
            loss = loss_vae(output[0], img, output[1], output[2], criterion, epoch/100000)
#             loss = criterion(output[0], img)
        else:
            loss = criterion(output, img)

        tot_loss.append(loss.item())
        # ===================backward====================
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        
        # ===================log========================
        progress_bar.set_postfix(cur_loss=loss.item())
        
    # ===================save_best====================
    mean_loss = np.mean(tot_loss)
    if mean_loss < best_loss:
        print(f"epoch: {epoch + 1:3d}, best loss: {mean_loss:.4f}")
        if model_type in ['vae', 'ganomaly']:
            ori = torchvision.utils.make_grid([recover_from_normalize(x) for x in img[:5]], nrow=5)
            rec = torchvision.utils.make_grid([recover_from_normalize(x) for x in output[0][:5]], nrow=5)
        elif model_type in ["fcn"]:
            ori = torchvision.utils.make_grid([recover_from_normalize(x.view(3,64,64)) for x in img[:5]], nrow=5) 
            rec = torchvision.utils.make_grid([recover_from_normalize(x.view(3,64,64)) for x in output[:5]], nrow=5)
        else:
            ori = torchvision.utils.make_grid([recover_from_normalize(x) for x in img[:5]], nrow=5) 
            rec = torchvision.utils.make_grid([recover_from_normalize(x) for x in output[:5]], nrow=5)
        
        diff = (ori - rec).abs()
        show(ori, size_inches=(15, 5))
        show(rec, size_inches=(15, 5))
        show(diff, size_inches=(15, 5))
        
        best_loss = mean_loss
        torch.save(model, 'best_model_{}.pt'.format(model_type))
    # ===================log========================
    print(f"epoch: {epoch + 1:3d}, loss: {mean_loss:.4f}")
    # ===================save_last========================
    torch.save(model, 'last_model_{}.pt'.format(model_type))

In [None]:
model_type = "ganomaly"
# D.csv: 0.77339, DD.csv: 0.73842 feature_dim = 256
# D.csv: 0.80093, DD.csv: 0.82131 feature_dim = 128
# D.csv: 0.75548, DD.csv: 0.85475 feature_dim = 64
# D.csv: 0.75815, DD.csv: 0.82512 feature_dim = 32
feature_dim = 64
out_dim = 100
# Model
G = Generator(3, feature_dim, out_dim).to(device)
D = Discriminator(3).to(device)
G.apply(weights_init)
D.apply(weights_init)
print(G)
print(D)

In [None]:
# img gen
z_dim = (out_dim, 4, 4)

# Loss and optimizer
criterion_D = nn.BCELoss()
criterion_G = nn.MSELoss()
opt_D = torch.optim.Adam(D.parameters(), lr=learning_rate, betas=(0.5, 0.999))
opt_G = torch.optim.Adam(G.parameters(), lr=learning_rate, betas=(0.5, 0.999))

best_loss = np.array([np.inf, np.inf, np.inf])
G.train()
D.train()

for epoch in range(num_epochs):
    tot_loss = list()
    progress_bar = tqdm(train_dataloader)
    progress_bar.set_description(f"Epoch {epoch+1}")
    for data in progress_bar:
        # ===================loading=====================
        imgs = data.float().to(device)
        bs = imgs.size(0)
        
        # *********************
        # *    Train D        *
        # *********************
        z = Variable(torch.randn(bs, *z_dim)).to(device)
#         r_imgs, _, _ = G(imgs) => bad
        r_imgs = Variable(imgs).to(device)
        f_imgs = G.decoder(z)
        r_label = torch.ones((bs)).to(device)
        f_label = torch.zeros((bs)).to(device)

        # Discriminator forwarding
        r_logit, _ = D(r_imgs)
        f_logit, _ = D(f_imgs)

        # Loss for discriminator
        r_loss = criterion_D(r_logit, r_label)
        f_loss = criterion_D(f_logit, f_label)
        loss_D = (r_loss + f_loss) / 2
        
        # Discriminator backwarding
        D.zero_grad()
        loss_D.backward()
        opt_D.step()
        
        # *********************
        # *    Train G        *
        # *********************
        rec_imgs, feature_in, feature_out = G(imgs)
        _, feature_r_D = D(imgs)
        rec_logit, feature_f_D = D(rec_imgs)
        rec_label = torch.ones((bs)).to(device)
        
        loss_G_rec = criterion_G(rec_imgs, imgs)
        loss_G_fea = criterion_G(feature_in, feature_out)
        loss_G_adv = criterion_G(feature_r_D, feature_f_D) 
#         loss_G_adv = criterion_D(rec_logit, rec_label) => bad
        loss_G = loss_GAN(loss_G_rec, loss_G_fea, loss_G_adv)
        
        # Generator backwarding
        G.zero_grad()
        loss_G.backward()
        opt_G.step()

        # ===================log========================
        tot_loss.append((loss_D.item(), loss_G.item()))
        progress_bar.set_postfix(loss_D=loss_D.item(), 
                    loss_G=loss_G.item(), 
                    loss_G_rec=loss_G_rec.item(), 
                    loss_G_fea=loss_G_fea.item(), 
                    loss_G_adv=loss_G_adv.item())
        
    # ===================save_best====================    
    mean_loss = np.mean(tot_loss, axis=0)
    if mean_loss.sum() < best_loss.sum():
        print(f"epoch: {epoch + 1:3d}, best loss: {mean_loss}")

        ori = torchvision.utils.make_grid([recover_from_normalize(x) for x in imgs[:5]], nrow=5)
        rec = torchvision.utils.make_grid([recover_from_normalize(x) for x in rec_imgs[:5]], nrow=5)
        diff = (ori - rec).abs()
        gen_rand = torchvision.utils.make_grid([recover_from_normalize(x) for x in f_imgs[:5]], nrow=5)

        show(ori, size_inches=(15, 5))
        show(rec, size_inches=(15, 5))
        show(diff, size_inches=(15, 5))
        show(gen_rand, size_inches=(15, 5))

        best_loss = mean_loss
        torch.save(G, 'best_G_model.pt')
        torch.save(D, 'best_D_model.pt')
    # ===================log========================
    print(f"epoch: {epoch + 1:3d}, loss: {mean_loss}")
    # ===================save_last========================
    torch.save(D, 'last_model_D.pt')
    torch.save(G, 'last_model_G.pt')

# Inference
Model is loaded and generates its anomaly score predictions.

## Initialize
- dataloader
- model
- prediction file

In [None]:
%%script false --no-raise-error

# load trained model
checkpoint_path = f'last_model_{model_type}.pt'
model = torch.load(checkpoint_path)

In [None]:
%%script false --no-raise-error

model.eval()
eval_loss = nn.MSELoss(reduction='none')

# prediction file 
out_file = 'prediction.csv'

max_loss = -np.inf

anomality = list()
with torch.no_grad():
    for i, data in enumerate(tqdm(test_dataloader)):
        img = data.float().to(device)
        if model_type in ['fcn']:
            img = img.view(img.shape[0], -1)

        output = model(img)
        if model_type in ['vae', 'ganomaly']:
            output = output[0]
            
#         height_cutoff = output.shape[1] // 2 # for detect hat
#         output = output[:, :height_cutoff, :]
#         img = img[:, :height_cutoff, :]
        if model_type in ['fcn']:
            loss = eval_loss(output, img).sum(-1)
        else:
            loss = eval_loss(output, img).sum([1, 2, 3])
        anomality.append(loss)

        val, i = torch.max(loss, dim=0)
        if val > max_loss:
            max_loss = val
            if model_type in ['fcn']:
                ori = recover_from_normalize(img[i].view(3,64,64))
                rec = recover_from_normalize(output[i].view(3,64,64))
            else:
                ori = recover_from_normalize(img[i])
                rec = recover_from_normalize(output[i])

            diff = (ori - rec).abs()
            show(torchvision.utils.make_grid([ori, rec, diff]), size_inches=(6, 6))

anomality = torch.cat(anomality, axis=0)
anomality = torch.sqrt(anomality).reshape(len(test), 1).cpu().numpy()

df = pd.DataFrame(anomality, columns=['score'])
df.to_csv(out_file, index_label = 'ID')

In [None]:
# # load trained model
# G = torch.load("../input/hw08tmp/last_model_G.pt", map_location=device)
# D = torch.load("../input/hw08tmp/last_model_D.pt", map_location=device)

In [None]:
G.eval()
D.eval()

# prediction file 
loss_fn = nn.MSELoss(reduction="none")

anomality_D = list()
anomality_rec = list()
anomality_F = list()
anomality_FF = list()
with torch.no_grad():
    for i, data in enumerate(tqdm(test_dataloader)):
        img = data.float().to(device)
        rec_img, f1, f2 = G(img)
        _, f3, _ = G(rec_img)
        y, _ = D(img)

        loss_D = -y
        anomality_D.append(loss_D)

        loss_rec = loss_fn(img, rec_img).mean(dim=(1,2,3))
        anomality_rec.append(loss_rec)

        loss_F = loss_fn(f1, f2).mean(dim=(1,2,3))
        anomality_F.append(loss_F)

        loss_FF = loss_fn(f1, f3).mean(dim=(1,2,3))
        anomality_FF.append(loss_FF)

def process_anomality(anomality, outputfile):
    anomality = torch.cat(anomality, axis=0)
    anomality = anomality.reshape(len(test), 1).cpu().numpy()
    df = pd.DataFrame(anomality, columns=['score'])
    df.to_csv(outputfile, index_label = 'ID')
    df.plot.hist(bins=100, title=outputfile)

    return df

df_D = process_anomality(anomality_D, 'prediction_D.csv')
df_rec = process_anomality(anomality_rec, 'prediction_rec.csv')
df_F = process_anomality(anomality_F, 'prediction_F.csv')
df_FF = process_anomality(anomality_FF, 'prediction_FF.csv')

In [None]:
 def show_cmp(df, num=3*20):
    df_sort = df.sort_values('score').reset_index()
    normal = df_sort[:num]
    abnormal = df_sort[-num:]

    normal_img = []
    abnormal_img = []
    with torch.no_grad():
        for i, data in enumerate(tqdm(test_dataset)): 
            if (normal['index'] == i).any():
                normal_img.append(recover_from_normalize(data.to(device)))
            elif (abnormal['index'] == i).any():
                abnormal_img.append(recover_from_normalize(data.to(device)))
                
    print("normal")
    show(torchvision.utils.make_grid(normal_img, nrow=20), size_inches=(18, 18))
    print("abnormal")
    show(torchvision.utils.make_grid(abnormal_img, nrow=20), size_inches=(18, 18))

for df in [df_D, df_rec, df_F, df_FF]:
    show_cmp(df)

## More Train

In [None]:
epochs = 1
df_sort = df_D.sort_values('score').reset_index()
normal = df_sort[:50*10]
abnormal = df_sort[-50*10:]
    
# Loss and optimizer
criterion_D = nn.BCELoss()
opt_D = torch.optim.Adam(D.parameters(), lr=learning_rate, betas=(0.5, 0.999))

D.train()

for epoch in range(epochs):
    progress_bar = tqdm(test_dataset)
    progress_bar.set_description(f"Epoch {epoch+1}")
    for i, data in enumerate(progress_bar):
        label = None
        if (normal['index'] == i).any():
            label = torch.ones((1,)).to(device)
        elif (abnormal['index'] == i).any():
            label = torch.zeros((1,)).to(device)
        else:
            continue

        # ===================loading=====================
        imgs = data.float().to(device).unsqueeze(0)

        # *********************
        # *    Train D        *
        # *********************

        # Discriminator forwarding
        logit, _ = D(imgs)

        # Loss for discriminator
        loss_D = criterion_D(logit, label)

        # Discriminator backwarding
        D.zero_grad()
        loss_D.backward()
        opt_D.step()

        # ===================log========================
        progress_bar.set_postfix(loss_D=loss_D.item())
        

In [None]:
G.eval()
D.eval()

# prediction file 
loss_fn = nn.MSELoss(reduction="none")

anomality_DD = list()
with torch.no_grad():
    for i, data in enumerate(tqdm(test_dataloader)):
        img = data.float().to(device)
        y, _ = D(img)

        loss_DD = -y
        anomality_DD.append(loss_DD)

df_DD = process_anomality(anomality_DD, 'prediction_DD.csv')
show_cmp(df_DD, 50*10)

# Report

2. Train a fully connected autoencoder and adjust at least two different
element of the latent representation. Show your model architecture, plot
out the original image, the reconstructed images for each adjustment and
describe the differences.

In [None]:
if model_type in ['fcn']:
    show(torchvision.utils.make_grid([recover_from_normalize(model.decoder(FloatTensor([x, 0, 0])).view(3,64,64)) for x in range(-20,20)]))
    show(torchvision.utils.make_grid([recover_from_normalize(model.decoder(FloatTensor([0, x, 0])).view(3,64,64)) for x in range(-20,20)]))
    show(torchvision.utils.make_grid([recover_from_normalize(model.decoder(FloatTensor([0, 0, x])).view(3,64,64)) for x in range(-20,20)]))