# Notebook for training CycleGAN to transform one architecture style to another

In [0]:
%%bash
pip3 install torch torchvision
pip3 install pillow==4.1.1
git clone https://github.com/sergeisoly/CycleGAN

In [0]:
%matplotlib inline
from PIL import Image
import itertools

import torch
import torch.nn as nn
from torch.utils.data import Dataset
from torch.utils.data import DataLoader
import torch.nn.functional as F
import torch.optim as optim

import matplotlib.pyplot as plt
plt.rcParams['axes.grid'] = 'True'

import glob
import random
import os

import torchvision.transforms as transforms
import torchvision.models as models

import copy
import warnings
from tqdm import tqdm, tqdm_notebook
from IPython.display import clear_output

import warnings
warnings.simplefilter(action='ignore')

from CycleGAN import nets, functions, data

In [0]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)
!nvidia-smi

Download dataset with different architecture styles \
Dataset was taken from https://sites.google.com/site/zhexuutssjtu/projects/arch \
Instruction for downloading large files (>100 MB) from Google Drive \
https://medium.com/@acpanjan/download-google-drive-files-using-wget-3c2c025a8b99

In [0]:
%%bash
wget --load-cookies /tmp/cookies.txt \
"https://docs.google.com/uc?export=download&confirm=$(wget --quiet --save-cookies /tmp/cookies.txt \
--keep-session-cookies --no-check-certificate 'https://docs.google.com/uc?export=download&id=0Bwo0SFiZwl3JVGRlWGZUaW5va00' \
-O- | sed -rn 's/.*confirm=([0-9A-Za-z_]+).*/\1\n/p')&id=0Bwo0SFiZwl3JVGRlWGZUaW5va00" -O arcDataset.zip  && rm -rf /tmp/cookies.txt

Unpack dataset to current colab directory

I tried transforming from Gothic Style to International and back,
you can try other.

In [0]:
%%bash
unzip arcDataset.zip -d data
cp -r data/arcDataset/Gothic\ architecture data/trainB
cp -r data/arcDataset/International\ style data/trainA
mkdir data/testA
mkdir data/testB

In [0]:
N_EPOCHS = 200
DECAY_START_EPOCH = 100
BATCH_SIZE = 2
DATA_PATH = "data/"
PATH_TO_SAVE = '/content/drive/My Drive/DLSchool/CycleGAN/final/'
# PATH_TO_SAVE = 'data/output/'
NUM_WORKERS = 4
IMAGE_SIZE = 256
LEARNING_RATE = 0.0002

In [0]:
os.makedirs(os.path.dirname(PATH_TO_SAVE), exist_ok=True)
with open(PATH_TO_SAVE + 'history.txt', 'a') as f:
        f.write('loss_gen, loss_disc, loss_identity, loss_gan, loss_cycle')

In [0]:
# This functions creates test set
# and remove grayscale images
data.prepare_dataset(DATA_PATH)

In [0]:
test_transforms = [transforms.Resize((600, 800)),
                   transforms.ToTensor(),
                transforms.Normalize((0.5,0.5,0.5), (0.5,0.5,0.5)) ]
test_loader = DataLoader(data.ImageDataset(DATA_PATH, transforms_=test_transforms, mode='test', unaligned=False), 
                        batch_size=1, shuffle=False, num_workers=NUM_WORKERS)
# Take one image from test set to display results after each epoch
test_iter = iter(test_loader)
sample_Y = next(test_iter)['B']
sample_X = next(test_iter)['A']

In [0]:
def lambda_lr(epoch):
    # Linear decay
    return 1.0 - max(0, epoch - DECAY_START_EPOCH)/(N_EPOCHS - DECAY_START_EPOCH)

def set_requires_grad(nets, requires_grad=False):
        """Set requies_grad=False for all the networks to avoid unnecessary computations
        Parameters:
            nets (network list)   -- a list of networks
            requires_grad (bool)  -- whether the networks require gradients or not
        """
        if not isinstance(nets, list):
            nets = [nets]
        for net in nets:
            if net is not None:
                for param in net.parameters():
                    param.requires_grad = requires_grad

def plot_history(history):
    loss_gen, loss_disc, loss_identity, loss_gan, loss_cycle = zip(*history)
    plt.figure(figsize=(20, 5))
    plt.subplot(151)
    plt.title("loss G")
    plt.plot(loss_gen)
    plt.subplot(152)
    plt.title("loss D")
    plt.plot(loss_disc)
    plt.subplot(153)
    plt.title("loss Identity")
    plt.plot(loss_identity)
    plt.subplot(154)
    plt.title("loss GAN")
    plt.plot(loss_gan)
    plt.subplot(155)
    plt.title("loss Cycle")
    plt.plot(loss_cycle)


In [0]:
# Initialize models and init weights
# Forward X->Y
# Backward Y->X
gen_forward = nets.Generator().to(device)
gen_backward = nets.Generator().to(device)
discX = nets.Discriminator().to(device)
discY = nets.Discriminator().to(device)

gen_forward.apply(functions.weights_init_normal)
gen_backward.apply(functions.weights_init_normal)
discY.apply(functions.weights_init_normal)
discX.apply(functions.weights_init_normal)

models = {'netG_A2B': gen_forward,
          'netG_B2A': gen_backward,
          'netD_A': discY,
          'netD_B': discX}

ones = torch.FloatTensor(BATCH_SIZE).fill_(1.0).to(device)
zeros = torch.FloatTensor(BATCH_SIZE).fill_(0.0).to(device)


transforms_ = [ transforms.Resize(int(IMAGE_SIZE*1.12)), 
                transforms.RandomCrop(IMAGE_SIZE), 
                transforms.RandomHorizontalFlip(),
                transforms.ToTensor(),
                transforms.Normalize((0.5,0.5,0.5), (0.5,0.5,0.5)) ]

dataloader = DataLoader(data.ImageDataset(DATA_PATH, transforms_=transforms_, unaligned=True), 
                        batch_size=BATCH_SIZE, shuffle=True, num_workers=NUM_WORKERS)

criterion_GAN = nn.MSELoss()
criterion_cycle = nn.L1Loss()
criterion_identity = nn.L1Loss()

# Optimizers & LR schedulers
gen_optim = optim.Adam(itertools.chain(gen_forward.parameters(), gen_backward.parameters()),
                                lr=LEARNING_RATE, betas=(0.5, 0.999))
discX_optim = optim.Adam(discX.parameters(), lr=LEARNING_RATE, betas=(0.5, 0.999))
discY_optim = optim.Adam(discY.parameters(), lr=LEARNING_RATE, betas=(0.5, 0.999))

lr_scheduler_gen = optim.lr_scheduler.LambdaLR(gen_optim, lr_lambda=lambda_lr)
lr_scheduler_discX = optim.lr_scheduler.LambdaLR(discX_optim, lr_lambda=lambda_lr)
lr_scheduler_discY = optim.lr_scheduler.LambdaLR(discY_optim, lr_lambda=lambda_lr)


###### Training ######
history = []

for epoch in range(N_EPOCHS):
    loss_gan_av = 0
    loss_cycle_av = 0
    loss_identity_av = 0
    loss_disc_av = 0
    loss_gen_av = 0
    with tqdm_notebook(desc=f"Epoch {epoch}/{N_EPOCHS}", total=len(dataloader), position=0) as pbar:
        for i, batch in enumerate(dataloader):
            
            real_X = batch['A'].to(device)
            real_Y = batch['B'].to(device)

            # Generators
            set_requires_grad([discX, discY], False)     
            gen_optim.zero_grad()

            # Identity loss
            same_Y = gen_forward(real_Y)
            loss_identity_Y = criterion_identity(same_Y, real_Y)

            same_X = gen_backward(real_X)
            loss_identity_X = criterion_identity(same_X, real_X)

            # Forward GAN loss
            fake_Y = gen_forward(real_X)
            pred_fake = discY(fake_Y)
            loss_GAN_X2Y = criterion_GAN(pred_fake, ones) # [(D(G(x)) - 1)^2]

            # Backward GAN loss
            fake_X = gen_backward(real_Y)
            pred_fake = discX(fake_X)
            loss_GAN_Y2X = criterion_GAN(pred_fake, ones) # [(D'(F(y)) - 1)^2]

            # Cycle loss
            recovered_X = gen_backward(fake_Y)
            recovered_Y = gen_forward(fake_X)

            loss_cycle = criterion_cycle(recovered_X, real_X) + criterion_cycle(recovered_Y, real_Y)

            gen_loss = 5.0*loss_identity_X + 5.0*loss_identity_Y + loss_GAN_X2Y + loss_GAN_Y2X + 10.0*loss_cycle
            gen_loss.backward()
            
            gen_optim.step()

            # Backward discriminator loss
            set_requires_grad([discX, discY], True)   
            discX_optim.zero_grad()

            loss_D_real = criterion_GAN(discX(real_X), ones)    # [(D'(x) - 1)^2]      
            loss_D_fake = criterion_GAN(discX(fake_X.detach()), zeros) # [D'(F(y))^2]

            loss_discX = (loss_D_real + loss_D_fake)*0.5
            loss_discX.backward()

            discX_optim.step()

            # Forward discriminator loss
            discY_optim.zero_grad()

            loss_D_real = criterion_GAN(discY(real_Y), ones)   # [(D(y) - 1)^2]      
            loss_D_fake = criterion_GAN(discY(fake_Y.detach()), zeros) # [D(G(x))^2]

            loss_discY = (loss_D_real + loss_D_fake)*0.5
            loss_discY.backward()

            discY_optim.step()

            pbar.update(1)

            loss_gan_av += loss_GAN_Y2X.item() + loss_GAN_X2Y.item()
            loss_cycle_av += loss_cycle.item()
            loss_identity_av += loss_identity_X.item() + loss_identity_Y.item()
            loss_disc_av += loss_discY.item() + loss_discX.item()
            loss_gen_av += gen_loss.item()

    loss_gan_av /= len(dataloader)
    loss_cycle_av /= len(dataloader)
    loss_identity_av /= len(dataloader)
    loss_disc_av /= len(dataloader)
    loss_gen_av /= len(dataloader)
    
    history.append([loss_gen_av, loss_disc_av, loss_identity_av,
                    loss_gan_av, loss_cycle_av])

                
    clear_output(wait=True)
    plot_history(history)
    # Using a consistent image (sample) so that the progress of the model
    # is clearly visible.
    print("Generation A -> B")
    functions.generate_images(gen_forward, sample_X, device)
    print("Generation B -> A")
    functions.generate_images(gen_backward, sample_Y, device)

    # Update learning rates
    lr_scheduler_gen.step()
    lr_scheduler_discX.step()
    lr_scheduler_discY.step()

    # Save models checkpoints and history
    with open(PATH_TO_SAVE + 'history.txt', 'a') as f:
        f.write(','.join([str(i) for i in history[-1]]))
    functions.save_models(PATH_TO_SAVE, models)