# Import Packages

In [None]:
import torch
import torchvision
from torch import nn
from torch.autograd import Variable
from torch.utils.data import DataLoader
from torchvision.datasets import ImageFolder
from torchvision import transforms
from torchvision.utils import save_image
from torchvision.datasets import MNIST
import os

from functionalities import filemanager as fm
from functionalities import dataloader as dl
from tqdm import tqdm_notebook as tqdm
from functionalities import loss as lo
from functionalities import plot as pl

from architecture import CelebA_autoencoder as celeba

# Pretraining Setup

In [None]:




if not os.path.exists('./celeb_img'):
    os.mkdir('./celeb_img')

number_dev = 3
if torch.cuda.is_available():
    torch.cuda.set_device(number_dev)
    device = 'cuda'
else:
    device = 'cpu'
    
print(device)

def to_img(x):
    #x = 0.5 * (x + 1)
    x = x.clamp(0, 1)
    x = x.view(x.size(0), 3, 218, 178)
    return x


num_epochs = 10
batch_size = 32
learning_rate = 1e-3
image_size = 178
milestones = [8, 9, 10]

IMAGE_PATH = './img_align_celeba/'
transform = transforms.Compose([
    #transforms.Scale(image_size),
    #transforms.Resize(image_size),
    transforms.ToTensor(),
    #transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
])
dataset = ImageFolder(IMAGE_PATH, transform)
trainloader, testloader = dl.split_dataset(dataset, 0.2, batch_size, False)
#data_loader = DataLoader(dataset=dataset, batch_size=batch_size, shuffle=True, num_workers=8, drop_last=True)

#trainset, testset, classes = dl.load_cifar()
#trainloader, validloader, testloader = dl.make_dataloaders(trainset, testset, batch_size)


#img_transform = transforms.Compose([
#    transforms.ToTensor(),
#    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
#])

#dataset = MNIST('./datasets/mnist', transform=img_transform)
#dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)

# Training

In [None]:



bottleneck_train_log = []
bottleneck_test_log = []    

for bottleneck in [2 ** x for x in range(0, 14, 2)]: #[2 ** x for x in range(0, 15, 2)]:
    print('bottleneck dimension: {}'.format(bottleneck))
    model = celeba.celeba_autoencoder(bottleneck).cuda()
    #criterion = nn.MSELoss() # l2 loss
    #criterion = nn.L1Loss() # l1 loss
    optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate, weight_decay=1e-5)
    scheduler = torch.optim.lr_scheduler.MultiStepLR(optimizer, milestones=milestones, gamma=0.1)
    
    train_loss_log = []
    test_loss_log = []

    for epoch in range(num_epochs):
        scheduler.step()
        for data in tqdm(trainloader):
            img, _ = data
            img = Variable(img).cuda()
            # ===================forward=====================
            output = model(img)
            loss = lo.l1_loss(output, img)
            # ===================backward====================
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
        # ===================log========================
        train_loss_log.append(loss.data.item())
        print('epoch [{}/{}], loss:{:.4f}'.format(epoch+1, num_epochs, loss.data.item()))
        if epoch % 2 == 0 or epoch == (num_epochs - 1):
            pic = to_img(output.cpu().data)
            save_image(pic, './celeb_img/l1_lin_image_{}_{}.png'.format(bottleneck, epoch))
            
        with torch.no_grad():
            test_loss = 0
            for data in tqdm(testloader):
                img, _ = data
                img = Variable(img).cuda()
                output = model(img)
                test_loss += lo.l1_loss(output, img).data.item()
            
            test_loss /= len(testloader)
            test_loss_log.append(test_loss)
            print('test loss:{:.4f}'.format(test_loss))
    
            
    bottleneck_train_log.append(train_loss_log[-1])
    bottleneck_test_log.append(test_loss_log[-1])    

    fm.save_model(model, 'l1_lin_celebA_autoencoder_{}'.format(bottleneck))
    fm.save_weight(model, 'l1_lin_celebA_autoencoder_{}'.format(bottleneck))
    fm.save_variable([train_loss_log, test_loss_log], 'l1_lin_celebA_autoencoder_{}'.format(bottleneck))
    
fm.save_variable([bottleneck_train_log, bottleneck_test_log], 'l1_lin_celebA_autoencoder_bottleneck')

# Plot Reconstruction and Difference Images Examples

In [None]:
from torch.autograd import Variable
import torchvision
import matplotlib.pyplot as plt

num_img = 25
grid_row_size = 5

img, label = next(iter(testloader))
#img = img.view(img.size(0), -1)
img = Variable(img).cuda()

for i in [2 ** x for x in range(0, 14, 2)]:
    print('bottleneck dimension: {}'.format(i))
    model = fm.load_model('l1_lin_celebA_autoencoder_{}'.format(i)).to(device)
    output = model(img.to(device))

    original = to_img(img.cpu().data) 
    pic = to_img(output.cpu().data)

    print("Original Image:")
    pl.imshow(torchvision.utils.make_grid(original[:num_img].detach(), grid_row_size), filename="com_classic_celeba_{}_original".format(i))
    print("Reconstructed Image:")
    pl.imshow(torchvision.utils.make_grid(pic[:num_img].detach(), grid_row_size), filename="com_classic_celeba_{}_reconstructed".format(i))
    print("Difference:")
    diff_img = (original - pic + 1) / 2
    pl.imshow(torchvision.utils.make_grid(diff_img[:num_img].detach(), grid_row_size), filename="com_classic_celeba_{}_difference".format(i))

In [None]:
import matplotlib.pyplot as plt
import numpy as np
from functionalities import plot as pl

def imshow(img, figsize=(30, 30), filename=None):
    """
    Custom modified imshow function.

    :param img: image to plot
    :param figsize: the size of the generated plot
    :param filename: file name under which the plot will be saved. (optional)
    :return: None
    """
    img = torch.clamp(img, 0, 1)
    npimg = img.numpy()
    plt.figsize = figsize
    plt.imshow(np.transpose(npimg, (1, 2, 0)))

    if filename is not None:
        subdir = "./plot"
        if not os.path.exists(subdir):
            os.makedirs(subdir)

        plt.savefig(os.path.join(subdir, filename + ".png"))

    plt.show()

In [None]:
from torchvision import transforms
from torchvision.datasets import ImageFolder
from functionalities import dataloader as dl
from functionalities import filemanager as fm

num_epochs = 20
batch_size = 128
learning_rate = 1e-3
image_size = 178

IMAGE_PATH = './img_align_celeba/'
transform = transforms.Compose([
    #transforms.Scale(image_size),
    #transforms.Resize(image_size),
    transforms.ToTensor(),
    #transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
])
dataset = ImageFolder(IMAGE_PATH, transform)
trainloader, testloader = dl.split_dataset(dataset, 0.2, batch_size, False)

In [None]:
from torch import nn

class autoencoder(nn.Module):
    def __init__(self, bottleneck=500): # input: 3, 218, 178 (original: 3, 218, 178)
        super(autoencoder, self).__init__()
        self.encoder = nn.Sequential(
            nn.Conv2d(3, 128, 4, stride=2, padding=0),  # b, 128, 108, 88 
            nn.ReLU(True),
            nn.MaxPool2d(2, stride=2, padding=1),  # b, 128, 54, 44
            nn.Conv2d(128, 128*2, 4, stride=2, padding=(0,1)),  # b, 256, 26, 22
            nn.ReLU(True),
            nn.Conv2d(128*2, 128*4, 4, stride=2, padding=0), # b, 512, 12, 10
            nn.ReLU(True),
            nn.Conv2d(128*4, 128*8, 4, stride=2, padding=0), # b, 1024, 5, 4
            nn.ReLU(True), 
            nn.Conv2d(128*8, bottleneck, 4, stride=1, padding=1) # b, bottleneck, 4, 3
        )
        self.decoder = nn.Sequential(
            nn.ConvTranspose2d(bottleneck, 512, 5, stride=2),  # b, 512, 11, 9
            nn.ReLU(True),
            nn.ConvTranspose2d(512, 256, 5, stride=2),  # b, 256, 25, 21
            nn.ReLU(True),
            nn.ConvTranspose2d(256, 128, 5, stride=2, padding=(0,1)),  # b, 128, 53, 43
            nn.ReLU(True), 
            nn.ConvTranspose2d(128, 64, 4, stride=2), # b, 64, 108, 88
            nn.ReLU(True), 
            nn.ConvTranspose2d(64, 3, 4, stride=2), # b, 3, 218, 178
            nn.Tanh()
        )

    def forward(self, x):
        x = self.encoder(x)
        x = self.decoder(x)
        return x

In [None]:
def to_img(x):
    #x = 0.5 * (x + 1)
    x = x.clamp(0, 1)
    x = x.view(x.size(0), 3, 218, 178)
    return x

In [None]:
import torch

number_dev = 2
if torch.cuda.is_available():
    torch.cuda.set_device(number_dev)
    device = 'cuda'
else:
    device = 'cpu'

In [None]:
from torch.autograd import Variable
import torchvision

num_img = 100
grid_row_size = 10

img, label = next(iter(testloader))
#img = img.view(img.size(0), -1)
img = Variable(img).cuda()

for i in [2 ** x for x in range(14)]:
    print('bottleneck dimension: {}'.format(i))
    model = fm.load_model('l1_lin_celebA_autoencoder_{}'.format(i)).to(device)
    output = model(img.to(device))

    original = to_img(img.cpu().data) 
    pic = to_img(output.cpu().data)

    print("Original Image:")
    imshow(torchvision.utils.make_grid(original[:num_img].detach(), grid_row_size))
    print("Reconstructed Image:")
    imshow(torchvision.utils.make_grid(pic[:num_img].detach(), grid_row_size))
    print("Difference:")
    diff_img = (original - pic + 1) / 2
    imshow(torchvision.utils.make_grid(diff_img[:num_img].detach(), grid_row_size))

In [None]:
train, test = fm.load_variable("l1_lin_celebA_autoencoder_bottleneck")
y = [train, test]

pl.plot([2 ** x for x in range(14)], y, 'latent dimension', 'loss', ['train', 'test'], 'Train & Test Reconstruction Loss History', 'loss_l1_celebA_bottleneck') 

# Plot Recontruction Loss against Bottleneck Size

In [None]:
train, test = fm.load_variable("l1_lin_celebA_autoencoder_bottleneck")
y = [train, test]

pl.plot([2 ** x for x in range(7)], y, 'latent dimension', 'loss', ['train', 'test'], 'Train & Test Reconstruction Loss History', 'loss_l1_celebA_bottleneck') 