## **Image to Image Translation with Conditional Adversarial Networks (CVPR2017)**

**Written By Jonathan Choi**

References from

https://github.com/eriklindernoren/PyTorch-GAN

https://github.com/ndb796/Deep-Learning-Paper-Review-and-Practice/blob/master/code_practices/Pix2Pix_for_Facades.ipynb

### **Code Implement**

- Here, we use the U-Net that is similar to original paper.
- Implementing Pix2Pix model that is representative technique to Image GAN Domain Translation.
- Dataset: Facades (3 x 256 x 256)



### Import Libraries

In [1]:
import torch
import torch.nn as nn
import torchvision

import os
import glob
import numpy as np
import matplotlib.pyplot as plt
from PIL import Image

from torch.utils.data import Dataset
from torch.utils.data import DataLoader
import torchvision.transforms as transforms
from torchvision.utils import save_image

import torchvision.datasets as datasets

### Import Train Dataset

From original Dataset

In [2]:
###### Change Directory ######

# Print the current working directory => root/Image to Image Translation/current directory
print("Current working directory: {0}".format(os.getcwd()))

# Change the current working directory to datasets directory => root/datasets
os.chdir('../../datasets')

# Print the current working directory
print("Current working directory: {0}".format(os.getcwd()))



###### Create Directory #######

def createDataset():
    # Move to created directory
    os.chdir('./facades')

    ###### Download Datasets #######
    dataset_name = "facades.zip"

    #Get the link
    url = 'https://cmp.felk.cvut.cz/~tylecr1/facade/CMP_facade_DB_base.zip'
    r = requests.get(url, allow_redirects=True)
    #Save the content with name.
    open(dataset_name, 'wb').write(r.content)

    ##### Unzip the files ######
    with ZipFile(dataset_name, 'r') as zipObj:
        # Extract all the contents of zip file in current directory
        zipObj.extractall()

import requests
from zipfile import ZipFile

# Create directory
dirName = 'facades'

try:
    # Create target Directory
    os.mkdir(dirName)
    print("Directory " , dirName ,  " Created ") 
    createDataset()

except FileExistsError:
    print("Directory " , dirName ,  " already exists")


Current working directory: c:\Users\John Steve\Desktop\Deep-Learning-Paper-Reviews\Image to Image Translation\Image-to-Image Translation with Conditional Adversarial Networks
Current working directory: c:\Users\John Steve\Desktop\Deep-Learning-Paper-Reviews\datasets
Directory  facades  already exists


Then, the dataset facades, would contain the 'base', 'facades.zip', 'label_names.txt', 'readme.txt'.

Below, we will gonna create the train valid test dataset from base folder


### Data Structure

In the base folder, there will be cmp_b000n.jpg and cmp_b0001.png.

- cmp_b000n.**jpg** => We will call this "**A**" (Real Image)

- cmp_b000n.**png** => We will call this "**B**". (Condition Image)


We will going to create Train A, Train B, Valid A, Valid B, Test A, Test B

Count the Datasets

In [5]:
print(os.getcwd())
path, dirs, files = next(os.walk(os.getcwd()+'/facades/base'))
file_count = len(files)
print(file_count)

c:\Users\John Steve\Desktop\Deep-Learning-Paper-Reviews\datasets
1134


Split the Dataset to TrainA, TrainB, ValidA, ValidB, TestA, TestB

In [4]:
print("Number of train dataset A and B:", len(next(os.walk('./datasets/facades/train/'))[2]))
print("Number of validation dataset A and B:", len(next(os.walk('./datasets/facades/val/'))[2]))
print("Number of test dataset A and B:", len(next(os.walk('./datasets/facades/test/'))[2]))

StopIteration: 

### Image Preview

Each train image has attached form of two image for each (256 x 256)

In [None]:
image = Image.open('./facades/train/1.jpg')
print("Image size:", image.size)

plt.imshow(image)
plt.show()

### Custom Dataset

In [None]:
class ImageDataset(Dataset):
    def __init__(self, root, transforms_=None, mode="train"):
        self.transform = transforms_

        self.files = sorted(glob.glob(os.path.join(root, mode) + "/*.jpg"))
        #Use Test data at training since the dataset is small
        if mode == "train":
            self.files.extend(sorted(glob.glob(os.path.join(root, "test") + "/*.jpg")))

    def __getitem__(self, index):
        img = Image.open(self.files[index % len(self.files)])
        w, h = img.size
        img_A = img.crop((0, 0, w / 2, h)) # Image left half
        img_B = img.crop((w / 2, 0, w, h)) # Image right half

        #Horizontal Flip to Data Augumentation
        if np.random.random() < 0.5:
            img_A = Image.fromarray(np.array(img_A)[:, ::-1, :], "RGB")
            img_B = Image.fromarray(np.array(img_B)[:, ::-1, :], "RGB")

        img_A = self.transform(img_A)
        img_B = self.transform(img_B)

        return {"A": img_A, "B": img_B}

    def __len__(self):
        return len(self.files)

In [None]:
transforms_ = transforms.Compose([
    transforms.Resize((256, 256), Image.BICUBIC),
    transforms.ToTensor(),
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
])

train_dataset = ImageDataset("facades", transforms_=transforms_)
val_dataset = ImageDataset("facades", transforms_=transforms_)

train_dataloader = DataLoader(train_dataset, batch_size=10, shuffle=True, num_workers=4)
val_dataloader = DataLoader(val_dataset, batch_size=10, shuffle=True, num_workers=4)

### Define Generator and Discriminator Models

- Pix2Pix has a form of cGAN, printing the image with the condition of specific image.
- To make same dimension of input and output, we use U-Net Architecture.


<img src="https://blog.kakaocdn.net/dn/MEqfm/btqD2vOm4wM/ojgKsu3uZTG78WQPKgGeXK/img.png" width="1000px" />

- U-Net Architecture uses skip-connection as below.
- Many low-level information can be shared at the process of the input and output

<img src="https://learnopencv.com/wp-content/uploads/2021/07/Pix2Pix-employs-a-UNET-Generator-an-encoder-decoder.jpg" width="1000px" />

In [None]:
# U-Net Architecture's Down Sampling Module
class UNetDown(nn.Module):
    def __init__(self, in_channels, out_channels, normalize=True, dropout=0.0):
        super(UNetDown, self).__init__()
        # Reduce by half at each H, W
        layers = [nn.Conv2d(in_channels, out_channels, kernel_size=4, stride=2, padding=1, bias=False)]
        if normalize:
            layers.append(nn.InstanceNorm2d(out_channels))
        layers.append(nn.LeakyReLU(0.2))
        if dropout:
            layers.append(nn.Dropout(dropout))
        self.model = nn.Sequential(*layers)

    def forward(self, x):
        return self.model(x)


# U-Net Architecture's UpSampling Module: Use Skip Connection
class UNetUp(nn.Module):
    def __init__(self, in_channels, out_channels, dropout=0.0):
        super(UNetUp, self).__init__()
        # 너비와 높이가 2배씩 증가
        layers = [nn.ConvTranspose2d(in_channels, out_channels, kernel_size=4, stride=2, padding=1, bias=False)]
        layers.append(nn.InstanceNorm2d(out_channels))
        layers.append(nn.ReLU(inplace=True))
        if dropout:
            layers.append(nn.Dropout(dropout))
        self.model = nn.Sequential(*layers)

    def forward(self, x, skip_input):
        x = self.model(x)
        x = torch.cat((x, skip_input), 1) # Concatenation at Channel Level

        return x


# U-Net Generator Architecture
class GeneratorUNet(nn.Module):
    def __init__(self, in_channels=3, out_channels=3):
        super(GeneratorUNet, self).__init__()

        self.down1 = UNetDown(in_channels, 64, normalize=False) # Output: [64 X 128 X 128]
        self.down2 = UNetDown(64, 128) # Output: [128 X 64 X 64]
        self.down3 = UNetDown(128, 256) # Output: [256 X 32 X 32]
        self.down4 = UNetDown(256, 512, dropout=0.5) # Output: [512 X 16 X 16]
        self.down5 = UNetDown(512, 512, dropout=0.5) # Output: [512 X 8 X 8]
        self.down6 = UNetDown(512, 512, dropout=0.5) # Output: [512 X 4 X 4]
        self.down7 = UNetDown(512, 512, dropout=0.5) # Output: [512 X 2 X 2]
        self.down8 = UNetDown(512, 512, normalize=False, dropout=0.5) # Output: [512 X 1 X 1]

        # Skip Connection (Output Channel size X 2 == Next input Channel size)
        self.up1 = UNetUp(512, 512, dropout=0.5) # Output: [1024 X 2 X 2]
        self.up2 = UNetUp(1024, 512, dropout=0.5) # Output: [1024 X 4 X 4]
        self.up3 = UNetUp(1024, 512, dropout=0.5) # Output: [1024 X 8 X 8]
        self.up4 = UNetUp(1024, 512, dropout=0.5) # Output: [1024 X 16 X 16]
        self.up5 = UNetUp(1024, 256) # Output: [512 X 32 X 32]
        self.up6 = UNetUp(512, 128) # Output: [256 X 64 X 64]
        self.up7 = UNetUp(256, 64) # Output: [128 X 128 X 128]

        self.final = nn.Sequential(
            nn.Upsample(scale_factor=2), # Output: [128 X 256 X 256]
            nn.ZeroPad2d((1, 0, 1, 0)),
            nn.Conv2d(128, out_channels, kernel_size=4, padding=1), # Output: [3 X 256 X 256]
            nn.Tanh(),
        )

    def forward(self, x):
        # U-Net Generator: Feed forwarding from encoder to decoder
        d1 = self.down1(x)
        d2 = self.down2(d1)
        d3 = self.down3(d2)
        d4 = self.down4(d3)
        d5 = self.down5(d4)
        d6 = self.down6(d5)
        d7 = self.down7(d6)
        d8 = self.down8(d7)
        u1 = self.up1(d8, d7)
        u2 = self.up2(u1, d6)
        u3 = self.up3(u2, d5)
        u4 = self.up4(u3, d4)
        u5 = self.up5(u4, d3)
        u6 = self.up6(u5, d2)
        u7 = self.up7(u6, d1)

        return self.final(u7)


# U-Net Discriminator Architrecture
class Discriminator(nn.Module):
    def __init__(self, in_channels=3):
        super(Discriminator, self).__init__()

        def discriminator_block(in_channels, out_channels, normalization=True):
            # Reduce by half of each H, W
            layers = [nn.Conv2d(in_channels, out_channels, kernel_size=4, stride=2, padding=1)]
            if normalization:
                layers.append(nn.InstanceNorm2d(out_channels))
            layers.append(nn.LeakyReLU(0.2, inplace=True))
            return layers

        self.model = nn.Sequential(
            # Input Channel size doubled since the input is two image (real/ translated Image, Condition Image)
            *discriminator_block(in_channels * 2, 64, normalization=False), # Output: [64 X 128 X 128]
            *discriminator_block(64, 128), # Output: [128 X 64 X 64]
            *discriminator_block(128, 256), # Output: [256 X 32 X 32]
            *discriminator_block(256, 512), # Output: [512 X 16 X 16]
            nn.ZeroPad2d((1, 0, 1, 0)),
            nn.Conv2d(512, 1, kernel_size=4, padding=1, bias=False) # Output: [1 X 16 X 16]
        )

    # img_A: real / translated image, img_B: condition
    def forward(self, img_A, img_B):
        # Generating Input Data by concatenating two images at the Channel Level.
        img_input = torch.cat((img_A, img_B), 1)
        return self.model(img_input)

### Model training and Sampling

- Initialize the Generator and Discriminator Model to train.
- Set appropriate hyperparameters.
- Set appropriate Loss Function.
- Pix2Pix uses L1 loss to make the output image similar to ground-truth.

In [None]:
def weights_init_normal(m):
    classname = m.__class__.__name__
    if classname.find("Conv") != -1:
        torch.nn.init.normal_(m.weight.data, 0.0, 0.02)
    elif classname.find("BatchNorm2d") != -1:
        torch.nn.init.normal_(m.weight.data, 1.0, 0.02)
        torch.nn.init.constant_(m.bias.data, 0.0)


# Initialize Generator and Discriminator
generator = GeneratorUNet()
discriminator = Discriminator()

generator.cuda()
discriminator.cuda()

# Initialize Weights
generator.apply(weights_init_normal)
discriminator.apply(weights_init_normal)

# Loss Function
criterion_GAN = torch.nn.MSELoss()
criterion_pixelwise = torch.nn.L1Loss()

criterion_GAN.cuda()
criterion_pixelwise.cuda()

# Set Learning Rate
lr = 0.0002

# Optimize function to Generator and Discriminator
optimizer_G = torch.optim.Adam(generator.parameters(), lr=lr, betas=(0.5, 0.999))
optimizer_D = torch.optim.Adam(discriminator.parameters(), lr=lr, betas=(0.5, 0.999))

We can check the results by sampling periodically while training.

In [None]:
import time

n_epochs = 200 # training epochs
sample_interval = 200 # set the interval to print the results

# L1 pixel-wise weighted loss parameters between translated image and label image.
lambda_pixel = 100

start_time = time.time()

for epoch in range(n_epochs):
    for i, batch in enumerate(train_dataloader):
        # import input data of model
        real_A = batch["B"].cuda()
        real_B = batch["A"].cuda()

        # Generate the label of real and fake image (size of width and height divided by 16)
        real = torch.cuda.FloatTensor(real_A.size(0), 1, 16, 16).fill_(1.0) # real: 1
        fake = torch.cuda.FloatTensor(real_A.size(0), 1, 16, 16).fill_(0.0) # fake: 0

        """ Train the Generator """
        optimizer_G.zero_grad()

        # Generate the Image
        fake_B = generator(real_A)

        # Generator Loss
        loss_GAN = criterion_GAN(discriminator(fake_B, real_A), real)

        # Pixel-wise L1 Loss
        loss_pixel = criterion_pixelwise(fake_B, real_B) 

        # Total Loss
        loss_G = loss_GAN + lambda_pixel * loss_pixel

        # Update Generator
        loss_G.backward()
        optimizer_G.step()

        """ Train the Discriminator """
        optimizer_D.zero_grad()

        # Discriminator Loss
        loss_real = criterion_GAN(discriminator(real_B, real_A), real) # condition: real_A
        loss_fake = criterion_GAN(discriminator(fake_B.detach(), real_A), fake)
        loss_D = (loss_real + loss_fake) / 2

        # Update the Discriminator
        loss_D.backward()
        optimizer_D.step()

        done = epoch * len(train_dataloader) + i
        if done % sample_interval == 0:
            imgs = next(iter(val_dataloader)) # Generate by sampling 10 images
            real_A = imgs["B"].cuda()
            real_B = imgs["A"].cuda()
            fake_B = generator(real_A)
            # real_A: condition, fake_B: translated image, real_B: Label Image
            img_sample = torch.cat((real_A.data, fake_B.data, real_B.data), -2) # Connect image by image height
            save_image(img_sample, f"{done}.png", nrow=5, normalize=True)

    # print log after 1 epoch finished.
    print(f"[Epoch {epoch}/{n_epochs}] [D loss: {loss_D.item():.6f}] [G pixel loss: {loss_pixel.item():.6f}, adv loss: {loss_GAN.item()}] [Elapsed time: {time.time() - start_time:.2f}s]")


### Print the Generated Images

In [None]:
from IPython.display import Image

Image('10000.png')

### Save the trained Model and Test

In [None]:
torch.save(generator.state_dict(), "Pix2Pix_Generator_for_Facades.pt")
torch.save(discriminator.state_dict(), "Pix2Pix_Discriminator_for_Facades.pt")
print("Model saved!")