# Image-to-Image Translation : *pix2pix*

## Download scripts & datasets

In [None]:
# download facades dataset
!mkdir ./dataset
!wget -N http://efrosgans.eecs.berkeley.edu/pix2pix/datasets/facades.tar.gz -O ./datasets/facades.tar.gz
!mkdir -p ./datasets/facades/
!tar -zxvf ./datasets/facades.tar.gz -C ./datasets/


### Prepare DataLoader for MNIST dataset

In [None]:
import torch
from torch.utils.data import DataLoader
from torchvision import transforms

# fix manual seed.
torch.manual_seed(1234)

# set batch size.
BATCH_SIZE = 8
NUM_EXAMPLES = 8

In [None]:
import os
from torch.utils.data import Dataset
import PIL

# Custom Dataset 
class CustomFacades(Dataset):
    def __init__(self, root, train=True, transform=None):
        self.root = root
        self.transform = transform       
        self.train = train
        self.dir = 'train' if train else 'test'
        self.len_train = 400
        self.len_test = 106

    def __len__(self):
        if self.train:
            return self.len_train
        else:
            return self.len_test

    def __getitem__(self, idx):
        data_idx = idx if self.train else idx + self.len_train
        X = PIL.Image.open(os.path.join(self.root, self.dir, '{}.jpg'.format(idx+1)))
        if self.transform is not None:
            X = self.transform(X)
        return X[..., 256:], X[..., :256]

In [None]:
# prepare dataloader.
tf = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.5,0.5,0.5), (0.5,0.5,0.5))
    ])

# train loader for training GAN
train_dataset = CustomFacades(root='./datasets/facades', train=True, 
        transform=tf)
train_loader = DataLoader(dataset=train_dataset, num_workers=8, batch_size=BATCH_SIZE, shuffle=True)

# test loader for examining test samples
test_dataset = CustomFacades(root='./datasets/facades', train=False, transform=tf)
test_loader = DataLoader(dataset=test_dataset, num_workers=8, batch_size=BATCH_SIZE, shuffle=False)

### Define GANs Models

#### Define Generator

In [None]:
import torch.nn as nn
import torch.nn.functional as F

class Generator(nn.Module):
    def __init__(self):
        super(Generator, self).__init__()
        # borrowed from : https://github.com/znxlwm/pytorch-pix2pix/blob/master/network.py
        d = 64
        # Unet Encoder
        self.conv1 = nn.Conv2d(3, d, 4, 2, 1)
        self.conv2 = nn.Conv2d(d, d * 2, 4, 2, 1)
        self.conv2_bn = nn.BatchNorm2d(d * 2)
        self.conv3 = nn.Conv2d(d * 2, d * 4, 4, 2, 1)
        self.conv3_bn = nn.BatchNorm2d(d * 4)
        self.conv4 = nn.Conv2d(d * 4, d * 8, 4, 2, 1)
        self.conv4_bn = nn.BatchNorm2d(d * 8)
        self.conv5 = nn.Conv2d(d * 8, d * 8, 4, 2, 1)
        self.conv5_bn = nn.BatchNorm2d(d * 8)
        self.conv6 = nn.Conv2d(d * 8, d * 8, 4, 2, 1)
        self.conv6_bn = nn.BatchNorm2d(d * 8)
        self.conv7 = nn.Conv2d(d * 8, d * 8, 4, 2, 1)
        self.conv7_bn = nn.BatchNorm2d(d * 8)
        self.conv8 = nn.Conv2d(d * 8, d * 8, 4, 2, 1)

        # Unet Decoder
        self.deconv1 = nn.ConvTranspose2d(d * 8, d * 8, 4, 2, 1)
        self.deconv1_bn = nn.BatchNorm2d(d * 8)
        self.deconv2 = nn.ConvTranspose2d(d * 8 * 2, d * 8, 4, 2, 1)
        self.deconv2_bn = nn.BatchNorm2d(d * 8)
        self.deconv3 = nn.ConvTranspose2d(d * 8 * 2, d * 8, 4, 2, 1)
        self.deconv3_bn = nn.BatchNorm2d(d * 8)
        self.deconv4 = nn.ConvTranspose2d(d * 8 * 2, d * 8, 4, 2, 1)
        self.deconv4_bn = nn.BatchNorm2d(d * 8)
        self.deconv5 = nn.ConvTranspose2d(d * 8 * 2, d * 4, 4, 2, 1)
        self.deconv5_bn = nn.BatchNorm2d(d * 4)
        self.deconv6 = nn.ConvTranspose2d(d * 4 * 2, d * 2, 4, 2, 1)
        self.deconv6_bn = nn.BatchNorm2d(d * 2)
        self.deconv7 = nn.ConvTranspose2d(d * 2 * 2, d, 4, 2, 1)
        self.deconv7_bn = nn.BatchNorm2d(d)
        self.deconv8 = nn.ConvTranspose2d(d * 2, 3, 4, 2, 1)
        
    def forward(self, x):
        # encoding
        e1 = self.conv1(x)
        e2 = self.conv2_bn(self.conv2(F.leaky_relu(e1, 0.2)))
        e3 = self.conv3_bn(self.conv3(F.leaky_relu(e2, 0.2)))
        e4 = self.conv4_bn(self.conv4(F.leaky_relu(e3, 0.2)))
        e5 = self.conv5_bn(self.conv5(F.leaky_relu(e4, 0.2)))
        e6 = self.conv6_bn(self.conv6(F.leaky_relu(e5, 0.2)))
        e7 = self.conv7_bn(self.conv7(F.leaky_relu(e6, 0.2)))
        e8 = self.conv8(F.leaky_relu(e7, 0.2))

        # decoding
        d1 = F.dropout(self.deconv1_bn(self.deconv1(F.relu(e8))), 0.5, training=True)
        d1 = torch.cat([d1, e7], 1)
        d2 = F.dropout(self.deconv2_bn(self.deconv2(F.relu(d1))), 0.5, training=True)
        d2 = torch.cat([d2, e6], 1)
        d3 = F.dropout(self.deconv3_bn(self.deconv3(F.relu(d2))), 0.5, training=True)
        d3 = torch.cat([d3, e5], 1)
        d4 = self.deconv4_bn(self.deconv4(F.relu(d3)))
        d4 = torch.cat([d4, e4], 1)
        d5 = self.deconv5_bn(self.deconv5(F.relu(d4)))
        d5 = torch.cat([d5, e3], 1)
        d6 = self.deconv6_bn(self.deconv6(F.relu(d5)))
        d6 = torch.cat([d6, e2], 1)
        d7 = self.deconv7_bn(self.deconv7(F.relu(d6)))
        d7 = torch.cat([d7, e1], 1)
        d8 = self.deconv8(F.relu(d7))
        out = F.tanh(d8)

        return out


#### Define Discriminator

In [None]:
class Discriminator(nn.Module):
    def __init__(self, d=64):
        # borrowed from : https://github.com/znxlwm/pytorch-pix2pix/blob/master/network.py
        super(Discriminator, self).__init__()
        self.conv1 = nn.Conv2d(6, d, 4, 2, 1)
        self.conv2 = nn.Conv2d(d, d * 2, 4, 2, 1)
        self.conv2_bn = nn.BatchNorm2d(d * 2)
        self.conv3 = nn.Conv2d(d * 2, d * 4, 4, 2, 1)
        self.conv3_bn = nn.BatchNorm2d(d * 4)
        self.conv4 = nn.Conv2d(d * 4, d * 8, 4, 1, 1)
        self.conv4_bn = nn.BatchNorm2d(d * 8)
        self.conv5 = nn.Conv2d(d * 8, 1, 4, 1, 1)

    def forward(self, x):
        out = F.leaky_relu(self.conv1(x), 0.2)
        out = F.leaky_relu(self.conv2_bn(self.conv2(out)), 0.2)
        out = F.leaky_relu(self.conv3_bn(self.conv3(out)), 0.2)
        out = F.leaky_relu(self.conv4_bn(self.conv4(out)), 0.2)
        out = F.sigmoid(self.conv5(out))
        return out


#### Prepare GAN model and initialize the weights

In [None]:
#weight initialization function. 
def weights_init(net):
    for m in net.modules():
        if isinstance(m, nn.Conv2d) or isinstance(m, nn.ConvTranspose2d) or isinstance(m, nn.Linear):
            # m.weight.data.normal_(1.0, 0.2)
            nn.init.xavier_normal_(m.weight.data)
            if m.bias is not None:
                m.bias.data.zero_()
        elif isinstance(m, nn.BatchNorm2d):
            m.weight.data.normal_(1.0, 0.2)
            m.bias.data.zero_()

# define GAN model.
G = Generator().cuda()
D = Discriminator().cuda()

# weight initialization 
G.apply(weights_init)
D.apply(weights_init)

#### Start training GAN

In [None]:
# install tensorboardx to use tensorboard.
%pip install tensorboardx

from tensorboardX import SummaryWriter
from torchvision.utils import make_grid
import time

# Hyper-parameters. 
# ====== You don't need to change here ===== #
EPOCHS = 200
LAMBDA_L1 = 100
# ========================================== #

# logger for tensorboard.
logger = SummaryWriter()

# GT labels for calculating binary cross entropy loss. 
real_label = torch.ones(size=(BATCH_SIZE,1)).cuda()
fake_label = torch.zeros(size=(BATCH_SIZE,1)).cuda()

# criterion for binary cross entropy loss
BCE_criterion = torch.nn.BCELoss()

# define optimizer. Here we use Adam optimizer. 
optimizer_G = torch.optim.Adam(G.parameters(), lr=2e-4, betas=(0.5,0.999))
optimizer_D = torch.optim.Adam(D.parameters(), lr=2e-4, betas=(0.5,0.999))

iterations = 0

for epoch in range(EPOCHS):
    # Set both G&D train modes.
    G.train()
    D.train()

    # For logging in tensorboard
    loss_G_total, loss_D_total = 0., 0.
    loss_G_sub_total, loss_D_sub_total = 0., 0.

    for batch_idx, (real_A, real_B) in enumerate(train_loader):
        t1 = time.time()
        real_A = real_A.cuda()
        real_B = real_B.cuda()

        # ============================================================ #
        # TODO : Fill the part for updating D&G.
        #  * you can generate fake_B using Generator(G).
        #   ex) fake_B = G(real_A)
        #  * first compute loss and back propgation with optimizers.
        #  * Note that you should compute L1 loss to when computing 
        #    generator loss. 
        # ============================================================ #

        # ================= Update D ================== # 

        # Fill here. 
        # First compute loss_D 
        # Then update the network with loss_D using optimizer_D
        
        # ================= Update G ================== # 

        # Fill here 
        # First compute loss_G 
        # Then update the network with loss_G using optimizer_G.
        # You should also consider L1 loss between real_B, fake_B.



        # For logging
        loss_D_total += loss_D.item()
        loss_G_total += loss_G.item()
        
        loss_G_sub_total += loss_G.item()
        loss_D_sub_total += loss_D.item()

        # print current states
        print_freq = 10
        if batch_idx % print_freq  == 0 and batch_idx>0:
            print('Epoch : {} || {}/{} || loss_G={:.3f} loss_D={:.3f} time={:.3f} secs/iter'.format(
                epoch, batch_idx, len(train_loader), 
                loss_G_sub_total/print_freq, loss_D_sub_total/print_freq, 
                time.time()-t1
            ))
            # ================= Genearte example samples ================== # 
            for bi, (real_A_test, real_B_test) in enumerate(test_loader):
                if bi>0:
                    break
                real_A_test = real_A_test[:NUM_EXAMPLES].cuda()
                real_B_test = real_B_test[:NUM_EXAMPLES].cuda()
                with torch.no_grad():
                    fake_B_test = G(real_A_test)
                    fake_B_test = fake_B_test[:NUM_EXAMPLES]

                real_A_samples = (real_A_test + 1)/2
                real_B_samples = (real_B_test + 1)/2
                fake_B_samples = (fake_B_test + 1)/2
                examples = torch.cat((real_A_samples, fake_B_samples, real_B_samples), dim=0)
                examples = make_grid(examples, nrow=NUM_EXAMPLES)
                
                # log images
                logger.add_image('Generated_pairs', examples, iterations)

            iterations += 1
            loss_G_sub_total = 0
            loss_D_sub_total = 0

    loss_G_total /= len(train_loader)
    loss_D_total /= len(train_loader)
    
    # logging on tensorboard
    logger.add_scalar('loss_G', loss_G_total, epoch)
    logger.add_scalar('loss_D', loss_D_total, epoch)

    # print current states
    print('Epoch : {} has done. AVG loss : loss_G={:.3f} loss_D={:.3f}'.format(
        epoch, loss_G_total, loss_D_total
    ))

In [None]:
# Check Tensorboard.
%ls runs
%load_ext tensorboard
%tensorboard --logdir runs --port 9999