In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from torch.optim import Adam
from torch.optim.lr_scheduler import MultiStepLR

import numpy as np
import io
import os
import random
from PIL import Image
import matplotlib.pyplot as plt
import pandas as pd
from tqdm import trange
from tqdm import tqdm
import albumentations as A
from sklearn.model_selection import train_test_split

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
!pip install fastparquet

In [None]:
# change to your path in google drive
YOUR_PATH_TO_WORKSPACE = "/content/drive/MyDrive/MSAI/MSAI437 - Deep Learning/HW2/"

In [None]:
os.chdir(os.path.join(YOUR_PATH_TO_WORKSPACE))

In [None]:
imgs, labels, size = [], [], 64
df = pd.read_parquet("./dataset/train-00000-of-00001-38cc4fa96c139e86.parquet", engine='fastparquet')
f = open("./dataset/face.txt", 'w')
for i in trange(2749):
    if 'face' in df['text'][i] and 'clock' not in df['text'][i]:
        image_stream = io.BytesIO(df['image.bytes'][i])
        Image.open(image_stream).resize((size, size)).save(f"./dataset/imgs_org/{i+1}.jpg")
        f.write(f"{i+1},{df['text'][i]}\n")
f.close()

## 1. Utils Functions

In [None]:
_seed = 42

In [None]:
def seed_everything(seed=42):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    if torch.cuda.is_available():
        torch.cuda.manual_seed(seed)
        torch.cuda.manual_seed_all(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

## 2. Models

In [None]:
class ConvNormReLU(nn.Module):
    def __init__(self, in_channels, out_channels, kernel_size, stride=1, padding=0, norm_layer=nn.BatchNorm2d) -> None:
        super().__init__()

        self.conv = nn.Conv2d(in_channels, out_channels, kernel_size, stride, padding)
        self.instance_norm = norm_layer(out_channels)

    def forward(self, x):
        feat = self.instance_norm(self.conv(x))
        return F.relu(feat, inplace=True)

class ResidualBlock(nn.Module):
    def __init__(self, in_channels, norm_layer=nn.InstanceNorm2d) -> None:
        super().__init__()

        self.convs = nn.Sequential(
            ConvNormReLU(in_channels, in_channels, 3, 1, 1, norm_layer),
            nn.Conv2d(in_channels, in_channels, 3, 1, 1),
            norm_layer(in_channels)
        )

    def forward(self, x):
        return F.relu(x + self.convs(x))

class StyleMapping(nn.Module):
    def __init__(self, style_dim):
        super().__init__()
        self.model = nn.Sequential(
            nn.Linear(style_dim, style_dim),
            nn.ReLU(inplace=True),
            nn.Linear(style_dim, style_dim),
        )
    
    def forward(self, x):
        return self.model(x)
    
class AdaIN(nn.Module):
    def __init__(self, in_channel, style_dim):
        super().__init__()
        self.norm = nn.InstanceNorm2d(in_channel)
        self.style = nn.Linear(style_dim, in_channel * 2)
        self.style.bias.data[:in_channel] = 1
        self.style.bias.data[in_channel:] = 0

    def forward(self, input, style):
        style = self.style(style).unsqueeze(2).unsqueeze(3)
        gamma, beta = style.chunk(2, 1)
        out = self.norm(input)
        out = gamma * out + beta

        return out
    
class EncoderBlock(nn.Module):
    def __init__(self, in_channels, out_channels, norm_layer=nn.InstanceNorm2d):
        super().__init__()

        self.res_blocks = nn.Sequential(
            ResidualBlock(in_channels, norm_layer),
        )
        self.down = ConvNormReLU(in_channels, out_channels, 3, 2, 1, norm_layer)

    def forward(self, x):
        feat = self.res_blocks(x)
        return self.down(feat)

class DecoderBlock(nn.Module):
    def __init__(self, in_channels, out_channels, norm_layer=nn.InstanceNorm2d):
        super().__init__()

        self.convs = nn.Sequential(
            ConvNormReLU(in_channels, in_channels, 3, 1, 1, norm_layer),
            ConvNormReLU(in_channels, out_channels, 3, 1, 1, norm_layer)
        )
        self.style_mapping = StyleMapping(64)
        self.adain = AdaIN(out_channels, 64)

    def forward(self, x, latent):
        x = F.interpolate(x, scale_factor=2, mode='bilinear', align_corners=True)
        feat = self.convs(x)
        style = self.style_mapping(latent)
        return self.adain(feat, style)


class AE(nn.Module):
    def __init__(self) -> None:
        super().__init__()

        self.encoder = nn.Sequential(
            EncoderBlock(3, 16),
            EncoderBlock(16, 32),
            EncoderBlock(32, 64),
            nn.Conv2d(64, 64, 8),
        )

        self.decoder1 = nn.Sequential(
            nn.ConvTranspose2d(64, 64, 8),
            nn.InstanceNorm2d(64),
            nn.ReLU(inplace=True)
        )
        self.decoder2 = DecoderBlock(64, 32)
        self.decoder3 = DecoderBlock(32, 32)
        self.decoder4 = DecoderBlock(32, 16)
        self.final = nn.Conv2d(16, 3, 3, 1, 1)

    def forward(self, x):
        latent = self.encoder(x)
        _latent = latent.view(latent.size(0), 64)
        feat = self.decoder1(latent)
        feat = self.decoder2(feat, _latent)
        feat = self.decoder3(feat, _latent)
        feat = self.decoder4(feat, _latent)
        return F.sigmoid(self.final(feat)), latent
    
    def encode(self, x):
        latent = self.encoder(x)
        return latent

    def decode(self, latent):
        _latent = latent.view(latent.size(0), 64)
        feat = self.decoder1(latent)
        feat = self.decoder2(feat, _latent)
        feat = self.decoder3(feat, _latent)
        feat = self.decoder4(feat, _latent)
        return F.sigmoid(self.final(feat))
    
# model = AE()
# pred, latent = model(torch.randn(1, 3, 64, 64))

## 3. Dataset

In [None]:
from typing import Any


def make_dataset(path: str="./dataset/train-00000-of-00001-38cc4fa96c139e86.parquet", size: int=64, seed: int=42):
    imgs, labels = [], []
    df = pd.read_parquet(path, engine='fastparquet')
    for i in trange(2749):
        if 'face' in df['text'][i] and 'clock' not in df['text'][i]:
            image_stream = io.BytesIO(df['image.bytes'][i])
            imgs.append(np.array(Image.open(image_stream).resize((size, size))))
            labels.append(df['text'][i])

    imgs_train, imgs_temp, labels_train, labels_temp = train_test_split(
        imgs, labels, test_size=0.4, shuffle=True, random_state=seed)
    imgs_val, imgs_test, labels_val, labels_test = train_test_split(
        imgs_temp, labels_temp, test_size=0.5, shuffle=True, random_state=seed)

    return imgs_train, labels_train, imgs_val, labels_val, imgs_test, labels_test


def make_dataset_face_cleaned(path: str="./dataset/imgs/", label_path: str="./dataset/face.txt", seed: int=42):

    names = [s.split(".")[0] for s in os.listdir(path) if s.split(".")[0] != ""]
    names = sorted(names)
    f = open(label_path, 'r')
    imgs = [np.array(Image.open(os.path.join(path, s+".jpg"))) for s in names]
    labels_map = dict()
    for s in f.readlines():
        idx, text = s.strip().split(",")
        labels_map[idx] = text
    labels = [labels_map[idx] for idx in names]

    imgs_train, imgs_temp, labels_train, labels_temp = train_test_split(
        imgs, labels, test_size=0.4, shuffle=True, random_state=seed)
    imgs_val, imgs_test, labels_val, labels_test = train_test_split(
        imgs_temp, labels_temp, test_size=0.5, shuffle=True, random_state=seed)

    return imgs_train, labels_train, imgs_val, labels_val, imgs_test, labels_test


class CutMix:
    def __init__(self, data, patch_size: int=8, p=0.5) -> None:
        self.data = data
        self.patch_size = patch_size
        self.p = p

    def __call__(self, img) -> Any:
        if random.uniform(0, 1) > self.p:
            return img

        h, w = img.shape[:2]
        mask = np.zeros((h, w, 1), dtype='float32')

        rdx = random.randint(0, w-self.patch_size-1)
        rdy = random.randint(0, h-self.patch_size-1)

        mask[rdy:rdy+self.patch_size, rdx:rdx+self.patch_size, :] = 1.0
        source = self.data[random.randint(0, len(self.data)-1)]

        return img * (1 - mask) + source * mask


class MixUp:
    def __init__(self, data, max_alpha=1, p=0.5) -> None:
        self.data = data
        self.max_alpha = max_alpha
        self.p = p
    
    def __call__(self, img) -> Any:
        if random.uniform(0, 1) > self.p:
            return img
        
        source = self.data[random.randint(0, len(self.data)-1)]
        alpha = random.uniform(0, self.max_alpha)
        return img * (1 - alpha) + source * alpha
        

class Emoji(Dataset):
    def __init__(
            self, imgs, labels, 
            aug: bool=True, cutmix: bool=True, mixup: bool=True,
            same: bool=True) -> None:
        super().__init__()

        self.aug = aug
        self.apply_cutmix = cutmix
        self.apply_mixup = mixup
        
        self.same = same
        self.imgs, self.labels = imgs, labels
        self.train_transforms = A.Compose([
            A.HorizontalFlip(p=0.5),
            A.VerticalFlip(p=0.3),
            A.Rotate(limit=90, p=0.3),
            A.RandomBrightnessContrast(brightness_limit=0.2, contrast_limit=0.2, p=0.3),
            A.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.2, p=0.3),
            # A.GaussNoise(var_limit=(10.0, 50.0), p=0.3),
            # A.GaussianBlur(blur_limit=7, p=0.3),
            # A.Cutout(8, 4, 4, 0, p=0.3)
        ])
        self.cutmix = CutMix(self.imgs, 32, 0.6)
        self.mixup = MixUp(self.imgs, 0.4, 0.3)

    def __getitem__(self, index):
        img = self.imgs[index]
        if self.aug:
            _img = self.train_transforms(image=img)['image']
            _img = self.cutmix(_img) if self.apply_cutmix else _img
            _img = self.mixup(_img) if self.apply_mixup else _img
        else: _img = img.copy()
        gt = torch.from_numpy(img.transpose((2, 0, 1)) / 255).float()
        img = torch.from_numpy(_img.transpose((2, 0, 1)) / 255).float()
        if self.same: return img, img
        return img, gt

    def __len__(self):
        return len(self.imgs)

## 4. Auto Encoder - Training

checkpoints / logs file name:

- AutoEncoder - ae
- ResBlock - rb
- AdaIN - adain
- CutMix - cm
- MixUp - mu

### 4.1 AE(baseline)

In [None]:
from models.ae import AE
name = "ae"

seed_everything(_seed)
imgs_train, labels_train, imgs_val, labels_val, imgs_test, labels_test = make_dataset(size=64, seed=_seed)

emoji_train = Emoji(
    imgs_train, labels_train,
    aug=True, cutmix=False, mixup=False,
    same=True)
train_loader = DataLoader(dataset=emoji_train, batch_size=16, shuffle=True, num_workers=8)
emoji_val = Emoji(imgs_val, labels_val, aug=False)
val_loader = DataLoader(dataset=emoji_val, batch_size=12, shuffle=False, num_workers=8)

model = AE().cuda()
optimizer = Adam(params=model.parameters(), lr=5e-4, weight_decay=4e-5)
lr_sch = MultiStepLR(optimizer, [500, 700, 900], gamma=0.2)

n_train_steps = len(train_loader)
n_val_steps = len(val_loader)
epochs = 1000
eval_interval = 1


# epoch / train mse / val mse
log_file = open(f"./logs/log_{name}.csv", 'w')

for e in range(epochs):
    _train_loss = 0
    with tqdm(total=n_train_steps, desc=f'Epoch {e+1}/{epochs}', unit='batch') as pbar:
        for i, (img, gt) in enumerate(train_loader):
            img, gt = img.cuda(), gt.cuda()
            pred, z = model(img)
            loss = F.mse_loss(pred, gt)

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            _train_loss += float(loss)

            pbar.set_description(f'Epoch {e+1}/{epochs}, Iter {i+1}/{n_train_steps} - loss: {float(loss):.4f}')
            pbar.update(1)

        pbar.set_description(f'Epoch {e+1}/{epochs}, Iter {i+1}/{n_train_steps} - loss: {float(_train_loss / n_train_steps):.4f}')

    _val_loss = 0
    if (e+1) % eval_interval == 0:
        with tqdm(total=n_val_steps, desc=f'Epoch (Val) {e+1}/{epochs}', unit='batch') as pbar:
            with torch.no_grad():
                for i, (img, gt) in enumerate(val_loader):
                    img, gt = img.cuda(), gt.cuda()
                    pred, latent = model(img)
                    loss = F.mse_loss(pred, gt)

                    _val_loss += float(loss)

                    pbar.set_description(f'Epoch (Val) {e+1}/{epochs}, Iter {i+1}/{n_val_steps} - val loss: {float(loss):.4f}')
                    pbar.update(1)

            pbar.set_description(f'Epoch (Val) {e+1}/{epochs}, Iter {i+1}/{n_val_steps} - val loss: {float(_val_loss / n_val_steps):.4f}')

    log_file.write(f"{e+1},{_train_loss/n_train_steps},{_val_loss/n_val_steps}\n")
    lr_sch.step()

log_file.close()
torch.save(model.state_dict(), f"./checkpoints/{name}_{epochs}.pt")

### 4.2 AE(baseline) + ResBlock

In [None]:
from models.ae_rb import AE
name = "ae_rb"

seed_everything(_seed)
imgs_train, labels_train, imgs_val, labels_val, imgs_test, labels_test = make_dataset(size=64, seed=_seed)

emoji_train = Emoji(
    imgs_train, labels_train,
    aug=True, cutmix=False, mixup=False,
    same=True)
train_loader = DataLoader(dataset=emoji_train, batch_size=16, shuffle=True, num_workers=8)
emoji_val = Emoji(imgs_val, labels_val, aug=False)
val_loader = DataLoader(dataset=emoji_val, batch_size=12, shuffle=False, num_workers=8)

model = AE().cuda()
optimizer = Adam(params=model.parameters(), lr=5e-4, weight_decay=4e-5)
lr_sch = MultiStepLR(optimizer, [500, 700, 900], gamma=0.2)

n_train_steps = len(train_loader)
n_val_steps = len(val_loader)
epochs = 1000
eval_interval = 1


# epoch / train mse / val mse
log_file = open(f"./logs/log_{name}.csv", 'w')

for e in range(epochs):
    _train_loss = 0
    with tqdm(total=n_train_steps, desc=f'Epoch {e+1}/{epochs}', unit='batch') as pbar:
        for i, (img, gt) in enumerate(train_loader):
            img, gt = img.cuda(), gt.cuda()
            pred, z = model(img)
            loss = F.mse_loss(pred, gt)

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            _train_loss += float(loss)

            pbar.set_description(f'Epoch {e+1}/{epochs}, Iter {i+1}/{n_train_steps} - loss: {float(loss):.4f}')
            pbar.update(1)

        pbar.set_description(f'Epoch {e+1}/{epochs}, Iter {i+1}/{n_train_steps} - loss: {float(_train_loss / n_train_steps):.4f}')

    _val_loss = 0
    if (e+1) % eval_interval == 0:
        with tqdm(total=n_val_steps, desc=f'Epoch (Val) {e+1}/{epochs}', unit='batch') as pbar:
            with torch.no_grad():
                for i, (img, gt) in enumerate(val_loader):
                    img, gt = img.cuda(), gt.cuda()
                    pred, latent = model(img)
                    loss = F.mse_loss(pred, gt)

                    _val_loss += float(loss)

                    pbar.set_description(f'Epoch (Val) {e+1}/{epochs}, Iter {i+1}/{n_val_steps} - val loss: {float(loss):.4f}')
                    pbar.update(1)

            pbar.set_description(f'Epoch (Val) {e+1}/{epochs}, Iter {i+1}/{n_val_steps} - val loss: {float(_val_loss / n_val_steps):.4f}')

    log_file.write(f"{e+1},{_train_loss/n_train_steps},{_val_loss/n_val_steps}\n")
    lr_sch.step()

log_file.close()
torch.save(model.state_dict(), f"./checkpoints/{name}_{epochs}.pt")

### 4.3 AE(baseline) + ResBlock + CutMix + MixUp

In [None]:
from models.ae_rb import AE
name = "ae_rb_cm_mu"

seed_everything(_seed)
imgs_train, labels_train, imgs_val, labels_val, imgs_test, labels_test = make_dataset(size=64, seed=_seed)

emoji_train = Emoji(
    imgs_train, labels_train,
    aug=True, cutmix=True, mixup=True,
    same=True)
train_loader = DataLoader(dataset=emoji_train, batch_size=16, shuffle=True, num_workers=8)
emoji_val = Emoji(imgs_val, labels_val, aug=False)
val_loader = DataLoader(dataset=emoji_val, batch_size=12, shuffle=False, num_workers=8)

model = AE().cuda()
optimizer = Adam(params=model.parameters(), lr=5e-4, weight_decay=4e-5)
lr_sch = MultiStepLR(optimizer, [500, 700, 900], gamma=0.2)

n_train_steps = len(train_loader)
n_val_steps = len(val_loader)
epochs = 1000
eval_interval = 1


# epoch / train mse / val mse
log_file = open(f"./logs/log_{name}.csv", 'w')

for e in range(epochs):
    _train_loss = 0
    with tqdm(total=n_train_steps, desc=f'Epoch {e+1}/{epochs}', unit='batch') as pbar:
        for i, (img, gt) in enumerate(train_loader):
            img, gt = img.cuda(), gt.cuda()
            pred, z = model(img)
            loss = F.mse_loss(pred, gt)

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            _train_loss += float(loss)

            pbar.set_description(f'Epoch {e+1}/{epochs}, Iter {i+1}/{n_train_steps} - loss: {float(loss):.4f}')
            pbar.update(1)

        pbar.set_description(f'Epoch {e+1}/{epochs}, Iter {i+1}/{n_train_steps} - loss: {float(_train_loss / n_train_steps):.4f}')

    _val_loss = 0
    if (e+1) % eval_interval == 0:
        with tqdm(total=n_val_steps, desc=f'Epoch (Val) {e+1}/{epochs}', unit='batch') as pbar:
            with torch.no_grad():
                for i, (img, gt) in enumerate(val_loader):
                    img, gt = img.cuda(), gt.cuda()
                    pred, latent = model(img)
                    loss = F.mse_loss(pred, gt)

                    _val_loss += float(loss)

                    pbar.set_description(f'Epoch (Val) {e+1}/{epochs}, Iter {i+1}/{n_val_steps} - val loss: {float(loss):.4f}')
                    pbar.update(1)

            pbar.set_description(f'Epoch (Val) {e+1}/{epochs}, Iter {i+1}/{n_val_steps} - val loss: {float(_val_loss / n_val_steps):.4f}')

    log_file.write(f"{e+1},{_train_loss/n_train_steps},{_val_loss/n_val_steps}\n")
    lr_sch.step()

log_file.close()
torch.save(model.state_dict(), f"./checkpoints/{name}_{epochs}.pt")

### 4.3 AE(baseline) + ResBlock + AdaIN

In [None]:
from models.ae_rb_adain import AE
name = "ae_rb_adain"


seed_everything(_seed)
imgs_train, labels_train, imgs_val, labels_val, imgs_test, labels_test = make_dataset(size=64, seed=_seed)

emoji_train = Emoji(
    imgs_train, labels_train,
    aug=True, cutmix=False, mixup=False,
    same=True)
train_loader = DataLoader(dataset=emoji_train, batch_size=16, shuffle=True, num_workers=8)
emoji_val = Emoji(imgs_val, labels_val, aug=False)
val_loader = DataLoader(dataset=emoji_val, batch_size=12, shuffle=False, num_workers=8)

model = AE().cuda()
optimizer = Adam(params=model.parameters(), lr=5e-4, weight_decay=4e-5)
lr_sch = MultiStepLR(optimizer, [500, 700, 900], gamma=0.2)

n_train_steps = len(train_loader)
n_val_steps = len(val_loader)
epochs = 1000
eval_interval = 1


# epoch / train mse / val mse
log_file = open(f"./logs/log_{name}.csv", 'w')

for e in range(epochs):
    _train_loss = 0
    with tqdm(total=n_train_steps, desc=f'Epoch {e+1}/{epochs}', unit='batch') as pbar:
        for i, (img, gt) in enumerate(train_loader):
            img, gt = img.cuda(), gt.cuda()
            pred, z = model(img)
            loss = F.mse_loss(pred, gt)

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            _train_loss += float(loss)

            pbar.set_description(f'Epoch {e+1}/{epochs}, Iter {i+1}/{n_train_steps} - loss: {float(loss):.4f}')
            pbar.update(1)

        pbar.set_description(f'Epoch {e+1}/{epochs}, Iter {i+1}/{n_train_steps} - loss: {float(_train_loss / n_train_steps):.4f}')

    _val_loss = 0
    if (e+1) % eval_interval == 0:
        with tqdm(total=n_val_steps, desc=f'Epoch (Val) {e+1}/{epochs}', unit='batch') as pbar:
            with torch.no_grad():
                for i, (img, gt) in enumerate(val_loader):
                    img, gt = img.cuda(), gt.cuda()
                    pred, latent = model(img)
                    loss = F.mse_loss(pred, gt)

                    _val_loss += float(loss)

                    pbar.set_description(f'Epoch (Val) {e+1}/{epochs}, Iter {i+1}/{n_val_steps} - val loss: {float(loss):.4f}')
                    pbar.update(1)

            pbar.set_description(f'Epoch (Val) {e+1}/{epochs}, Iter {i+1}/{n_val_steps} - val loss: {float(_val_loss / n_val_steps):.4f}')

    log_file.write(f"{e+1},{_train_loss/n_train_steps},{_val_loss/n_val_steps}\n")
    lr_sch.step()

log_file.close()
torch.save(model.state_dict(), f"./checkpoints/{name}_{epochs}.pt")

### 4.4 AE(baseline) + ResBlock + AdaIN + CutMix

In [None]:
from models.ae_rb_adain import AE
name = "ae_rb_adain_cm"

seed_everything(_seed)
imgs_train, labels_train, imgs_val, labels_val, imgs_test, labels_test = make_dataset(size=64, seed=_seed)

emoji_train = Emoji(
    imgs_train, labels_train,
    aug=True, cutmix=True, mixup=False,
    same=True)
train_loader = DataLoader(dataset=emoji_train, batch_size=16, shuffle=True, num_workers=8)
emoji_val = Emoji(imgs_val, labels_val, aug=False)
val_loader = DataLoader(dataset=emoji_val, batch_size=12, shuffle=False, num_workers=8)

model = AE().cuda()
optimizer = Adam(params=model.parameters(), lr=5e-4, weight_decay=4e-5)
lr_sch = MultiStepLR(optimizer, [500, 700, 900], gamma=0.2)

n_train_steps = len(train_loader)
n_val_steps = len(val_loader)
epochs = 1000
eval_interval = 1


# epoch / train mse / val mse
log_file = open(f"./logs/log_{name}.csv", 'w')

for e in range(epochs):
    _train_loss = 0
    with tqdm(total=n_train_steps, desc=f'Epoch {e+1}/{epochs}', unit='batch') as pbar:
        for i, (img, gt) in enumerate(train_loader):
            img, gt = img.cuda(), gt.cuda()
            pred, z = model(img)
            loss = F.mse_loss(pred, gt)

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            _train_loss += float(loss)

            pbar.set_description(f'Epoch {e+1}/{epochs}, Iter {i+1}/{n_train_steps} - loss: {float(loss):.4f}')
            pbar.update(1)

        pbar.set_description(f'Epoch {e+1}/{epochs}, Iter {i+1}/{n_train_steps} - loss: {float(_train_loss / n_train_steps):.4f}')

    _val_loss = 0
    if (e+1) % eval_interval == 0:
        with tqdm(total=n_val_steps, desc=f'Epoch (Val) {e+1}/{epochs}', unit='batch') as pbar:
            with torch.no_grad():
                for i, (img, gt) in enumerate(val_loader):
                    img, gt = img.cuda(), gt.cuda()
                    pred, latent = model(img)
                    loss = F.mse_loss(pred, gt)

                    _val_loss += float(loss)

                    pbar.set_description(f'Epoch (Val) {e+1}/{epochs}, Iter {i+1}/{n_val_steps} - val loss: {float(loss):.4f}')
                    pbar.update(1)

            pbar.set_description(f'Epoch (Val) {e+1}/{epochs}, Iter {i+1}/{n_val_steps} - val loss: {float(_val_loss / n_val_steps):.4f}')

    log_file.write(f"{e+1},{_train_loss/n_train_steps},{_val_loss/n_val_steps}\n")
    lr_sch.step()

log_file.close()
torch.save(model.state_dict(), f"./checkpoints/{name}_{epochs}.pt")

### 4.6 AE(baseline) + ResBlock + AdaIN + CutMix + MixUp

In [None]:
from models.ae_rb_adain import AE
name = "ae_rb_adain_cm_mu"

seed_everything(_seed)
imgs_train, labels_train, imgs_val, labels_val, imgs_test, labels_test = make_dataset(size=64, seed=_seed)

emoji_train = Emoji(
    imgs_train, labels_train, 
    aug=True, cutmix=True, mixup=True,
    same=True)
train_loader = DataLoader(dataset=emoji_train, batch_size=16, shuffle=True, num_workers=8)
emoji_val = Emoji(imgs_val, labels_val, aug=False)
val_loader = DataLoader(dataset=emoji_val, batch_size=12, shuffle=False, num_workers=8)

model = AE().cuda()
optimizer = Adam(params=model.parameters(), lr=5e-4, weight_decay=4e-5)
lr_sch = MultiStepLR(optimizer, [500, 700, 900], gamma=0.2)

n_train_steps = len(train_loader)
n_val_steps = len(val_loader)
epochs = 1000
eval_interval = 1


# epoch / train mse / val mse
log_file = open(f"./logs/log_{name}.csv", 'w')

for e in range(epochs):
    _train_loss = 0
    with tqdm(total=n_train_steps, desc=f'Epoch {e+1}/{epochs}', unit='batch') as pbar:
        for i, (img, gt) in enumerate(train_loader):
            img, gt = img.cuda(), gt.cuda()
            pred, z = model(img)
            loss = F.mse_loss(pred, gt)

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            _train_loss += float(loss)

            pbar.set_description(f'Epoch {e+1}/{epochs}, Iter {i+1}/{n_train_steps} - loss: {float(loss):.4f}')
            pbar.update(1)

        pbar.set_description(f'Epoch {e+1}/{epochs}, Iter {i+1}/{n_train_steps} - loss: {float(_train_loss / n_train_steps):.4f}')

    _val_loss = 0
    if (e+1) % eval_interval == 0:
        with tqdm(total=n_val_steps, desc=f'Epoch (Val) {e+1}/{epochs}', unit='batch') as pbar:
            with torch.no_grad():
                for i, (img, gt) in enumerate(val_loader):
                    img, gt = img.cuda(), gt.cuda()
                    pred, latent = model(img)
                    loss = F.mse_loss(pred, gt)

                    _val_loss += float(loss)

                    pbar.set_description(f'Epoch (Val) {e+1}/{epochs}, Iter {i+1}/{n_val_steps} - val loss: {float(loss):.4f}')
                    pbar.update(1)

            pbar.set_description(f'Epoch (Val) {e+1}/{epochs}, Iter {i+1}/{n_val_steps} - val loss: {float(_val_loss / n_val_steps):.4f}')
    
    log_file.write(f"{e+1},{_train_loss/n_train_steps},{_val_loss/n_val_steps}\n")
    lr_sch.step()

log_file.close()
torch.save(model.state_dict(), f"./checkpoints/{name}_{epochs}.pt")

In [None]:
logf = open("./logs/log_ae_rb_adain_cm_mu.csv", 'r')
val_mses, train_mses = [], []
for line in logf.readlines():
    _e, _val_mse, _train_mse = line.strip().split(",")
    val_mses.append(float(_val_mse))
    train_mses.append(float(_train_mse))

plt.plot(train_mses, label='train-mse')
plt.plot(val_mses, label='val-mse')
plt.ylim(0, 0.05)
plt.legend()
plt.xlabel("Epoch")
plt.ylabel("MSE")

## 5. Evaluation and Testing

In [None]:
def to_rgb(pred):
    return (pred[0].detach().cpu().numpy() * 255).transpose((1, 2, 0)).astype('int')

def plt_show_grid_imgs(imgs, ncol, nrow, scale=5, path=None):
    assert len(imgs) == ncol * nrow
    fig, axs = plt.subplots(nrow, ncol, figsize=(ncol*scale, nrow*scale), gridspec_kw={'wspace': 0, 'hspace': 0})
    axs = axs.flatten()
    
    for img, ax in zip(imgs, axs):
        ax.imshow(img, cmap='gray')
        ax.axis('off')

    if path is not None:
        plt.savefig(path, bbox_inches='tight')
    
    plt.show()
    
def plt_save_show_grid_results(data, ncol, nrow, scale=5, path=None):
    data_vis = []
    for im in data:
        img = torch.from_numpy(im.transpose((2, 0, 1))[np.newaxis, :, :, :] / 255).float().cuda()
        pred, latent = model(img)
        data_vis.append(im)
        data_vis.append(to_rgb(pred))

    plt_show_grid_imgs(data_vis, ncol, nrow, scale, path)
    
def plot_learning_curves(log_file: str):
    logf = open(log_file, 'r')
    val_mses, train_mses = [], []
    for line in logf.readlines():
        _e, _val_mse, _train_mse = line.strip().split(",")
        val_mses.append(float(_val_mse))
        train_mses.append(float(_train_mse))

    plt.plot(train_mses, label='train-mse')
    plt.plot(val_mses, label='val-mse')
    plt.ylim(0, 0.05)
    plt.legend()
    plt.xlabel("Epoch")
    plt.ylabel("MSE")

In [None]:
imgs_train, labels_train, imgs_val, labels_val, imgs_test, labels_test = make_dataset(size=64, seed=_seed)

### 5.1 AE(baseline) + ResBlock + AdaIN + CutMix + MixUp

In [None]:
from models.ae_rb_adain import AE

name = "ae_rb_adain_cm_mu"
model = AE().cuda()
model.load_state_dict(torch.load(f"./checkpoints/{name}_1000.pt"))
model = model.eval()

#### 5.1.1 Visualization of Re-construction

In [None]:
plt_save_show_grid_results(imgs_test, 8, 9, scale=1, path=f"./images/{name}_vis_test.jpg")

In [None]:
plt_save_show_grid_results(imgs_val, 8, 9, scale=1, path=f"./images/{name}_vis_val.jpg")

In [None]:
plt_save_show_grid_results(imgs_train, 18, 12, scale=1, path=f"./images/{name}_vis_train_org.jpg")

#### 5.1.2 Latent Space Linear Interpolation

In [None]:
data = imgs_test
idx1, idx2 = 34, 8

with torch.no_grad():
    img1 = torch.from_numpy(data[idx1].transpose((2, 0, 1))[np.newaxis, :, :, :] / 255).float().cuda()
    img2 = torch.from_numpy(data[idx2].transpose((2, 0, 1))[np.newaxis, :, :, :] / 255).float().cuda()

    l1 = model.encode(img1)
    l2 = model.encode(img2)

    plt_show_grid_imgs([
        to_rgb(model.decode(l1*(1-s*0.1) + l2*s*0.1)) for s in range(0, 11)
    ], 11, 1)