# CycleGAN
CycleGAN Implementation.

#### References
* [Paper](https://arxiv.org/pdf/1611.07004.pdf)
* [Confluece page](https://machinereinforcedbook.atlassian.net/wiki/spaces/ML/pages/22052990/Pix2Pix)
* [Medium Article](https://medium.com/datadriveninvestor/style-transferring-of-image-using-cyclegan-3cc7aff4fe61)
* [Medium Article](https://blog.paperspace.com/unpaired-image-to-image-translation-with-cyclegan/)

In [1]:
import sys
sys.path.insert(0,'..')
import torch
import torch.nn.functional as F
import torch.nn as nn
from torch import optim
from PIL import Image
import itertools
import torchvision.transforms as transforms
from torch.utils.data import DataLoader
from dataset_utils.img_folder_dataset_cycle import ImageDataset
from torch.optim.lr_scheduler import StepLR
import numpy as np
import random
from tqdm import tqdm
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print('Device:', device)
num_gpu = torch.cuda.device_count()
#num_gpu = 8
print('Number of GPUs Available:', num_gpu)
print('Pytorch version:', torch.__version__)

# Tensorboard
from torch.utils.tensorboard import SummaryWriter
!rm -rf ./runs
writer = SummaryWriter('./runs/train')

# Metaparameters
learning_rate = 0.0002
b1 = 0.5
b2 = 0.999
img_height = img_width = 256
n_residual_blocks = 9
dataset_name = 'maps'
batch_size = 5
num_epochs = 200
lambda_cycle = 10.0
lambda_identity = 5.0

# Calculate output of image discriminator (PatchGAN)
patch = (1, img_height // 2 ** 4, img_width // 2 ** 4)
print('Patch size:', patch)

Device: cuda:0
Number of GPUs Available: 8
Pytorch version: 1.2.0
Patch size: (1, 16, 16)


#### DataLoaders and Replay Buffer

The replay Buffer will bring back previously generated samples, this is useful on the Forward/Backward cycles.

In [2]:
class ReplayBuffer:
    def __init__(self, max_size=50):
        assert max_size > 0, "Empty buffer or trying to create a black hole. Be careful."
        self.max_size = max_size
        self.data = []

    def push_and_pop(self, data):
        to_return = []
        for element in data.data:
            element = torch.unsqueeze(element, 0)
            if len(self.data) < self.max_size:
                self.data.append(element)
                to_return.append(element)
            else:
                if random.uniform(0, 1) > 0.5:
                    i = random.randint(0, self.max_size - 1)
                    to_return.append(self.data[i].clone())
                    self.data[i] = element
                else:
                    to_return.append(element)
        return torch.cat(to_return)
    

# Buffers of previously generated samples
fake_A_buffer = ReplayBuffer()
fake_B_buffer = ReplayBuffer()
    

# Image transformations
transforms_ = [
    transforms.Resize(int(img_height * 1.12), Image.BICUBIC),
    transforms.RandomCrop((img_height, img_width)),
    transforms.RandomHorizontalFlip(),
    transforms.ToTensor(),
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5)),
]

# Training data loader
dataloader_train = DataLoader(
    ImageDataset("../data/%s" % dataset_name, transforms_=transforms_, unaligned=True),
    batch_size=batch_size,
    shuffle=True,
    num_workers=10,
)
# Test data loader
val_dataloader = DataLoader(
    ImageDataset("../data/%s" % dataset_name, transforms_=transforms_, unaligned=True, mode="test"),
    batch_size=5,
    shuffle=True,
    num_workers=1,
)

#### Declare Generator and Discriminator Models

In [3]:
class ResidualBlock(nn.Module):
    def __init__(self, in_features):
        super(ResidualBlock, self).__init__()

        self.block = nn.Sequential(
            nn.ReflectionPad2d(1),
            nn.Conv2d(in_features, in_features, 3),
            nn.InstanceNorm2d(in_features),
            nn.ReLU(inplace=True),
            nn.ReflectionPad2d(1),
            nn.Conv2d(in_features, in_features, 3),
            nn.InstanceNorm2d(in_features),
        )

    def forward(self, x):
        return x + self.block(x)


class GeneratorResNet(nn.Module):
    def __init__(self, input_shape, num_residual_blocks=9):
        super(GeneratorResNet, self).__init__()

        channels = input_shape[0]

        # Initial convolution block
        out_features = 64
        model = [
            nn.ReflectionPad2d(channels),
            nn.Conv2d(channels, out_features, 7),
            nn.InstanceNorm2d(out_features),
            nn.ReLU(inplace=True),
        ]
        in_features = out_features

        # Downsampling
        for _ in range(2):
            out_features *= 2
            model += [
                nn.Conv2d(in_features, out_features, 3, stride=2, padding=1),
                nn.InstanceNorm2d(out_features),
                nn.ReLU(inplace=True),
            ]
            in_features = out_features

        # Residual blocks
        for _ in range(num_residual_blocks):
            model += [ResidualBlock(out_features)]

        # Upsampling
        for _ in range(2):
            out_features //= 2
            model += [
                nn.Upsample(scale_factor=2),
                nn.Conv2d(in_features, out_features, 3, stride=1, padding=1),
                nn.InstanceNorm2d(out_features),
                nn.ReLU(inplace=True),
            ]
            in_features = out_features

        # Output layer
        model += [nn.ReflectionPad2d(channels), nn.Conv2d(out_features, channels, 7), nn.Tanh()]

        self.model = nn.Sequential(*model)

    def forward(self, x):
        return self.model(x)


##############################
#        Discriminator
##############################


class Discriminator(nn.Module):
    def __init__(self, input_shape):
        super(Discriminator, self).__init__()

        channels, height, width = input_shape

        # Calculate output shape of image discriminator (PatchGAN)
        self.output_shape = (1, height // 2 ** 4, width // 2 ** 4)

        def discriminator_block(in_filters, out_filters, normalize=True):
            """Returns downsampling layers of each discriminator block"""
            layers = [nn.Conv2d(in_filters, out_filters, 4, stride=2, padding=1)]
            if normalize:
                layers.append(nn.InstanceNorm2d(out_filters))
            layers.append(nn.LeakyReLU(0.2, inplace=True))
            return layers

        self.model = nn.Sequential(
            *discriminator_block(channels, 64, normalize=False),
            *discriminator_block(64, 128),
            *discriminator_block(128, 256),
            *discriminator_block(256, 512),
            nn.ZeroPad2d((1, 0, 1, 0)),
            nn.Conv2d(512, 1, 4, padding=1)
        )

    def forward(self, img):
        return self.model(img)

    
input_shape = (3, img_height, img_width)
G_AB = GeneratorResNet(input_shape, n_residual_blocks).to(device)
G_BA = GeneratorResNet(input_shape, n_residual_blocks).to(device)
D_A = Discriminator(input_shape).to(device)
D_B = Discriminator(input_shape).to(device)

#### Define Loss Functions

In [4]:
criterion_GAN = torch.nn.MSELoss()
criterion_cycle = torch.nn.L1Loss()
criterion_L1 = torch.nn.L1Loss()

#### Define Optimizers

In [5]:
# Optimizers
optimizer_G = torch.optim.Adam(
    itertools.chain(G_AB.parameters(), G_BA.parameters()), lr=learning_rate, betas=(b1, b2)
)
optimizer_D_A = torch.optim.Adam(D_A.parameters(), lr=learning_rate, betas=(b1, b2))
optimizer_D_B = torch.optim.Adam(D_B.parameters(), lr=learning_rate, betas=(b1, b2))

#### Train loop

In [None]:
for epoch in tqdm(range(num_epochs)):
    running_loss_G = 0.0
    running_loss_D = 0.0
    running_pixelwise_loss = 0.0
    # Iterate over the training data
    for idx_sample, batch in enumerate(dataloader_train):
        real_A = batch["A"].to(device)
        real_B = batch["B"].to(device)
        # Get Batch Size
        batch_size = real_A.size()[0]
        
        # Adversarial ground truths (you can do soft-label here....)
        # Remember that our discriminator outputs a grid of patches
        # So valid/fake will be a spatial tensor
        valid = torch.ones(batch_size, *patch).to(device)
        fake = torch.zeros(batch_size, *patch).to(device)
    
        # Train Generators
        optimizer_G.zero_grad()
        
        # Identity loss (Part of cycle-consistent loss)
        L1_A = criterion_L1(G_BA(real_A), real_A)
        L1_B = criterion_L1(G_AB(real_B), real_B)
        loss_identity = (L1_A + L1_B) / 2
        
        # GAN loss
        fake_B = G_AB(real_A)
        loss_GAN_AB = criterion_GAN(D_B(fake_B), valid)
        fake_A = G_BA(real_B)
        loss_GAN_BA = criterion_GAN(D_A(fake_A), valid)
        loss_GAN = (loss_GAN_AB + loss_GAN_BA) / 2
        
        # Cycle loss
        recov_A = G_BA(fake_B)
        loss_cycle_A = criterion_cycle(recov_A, real_A)
        recov_B = G_AB(fake_A)
        loss_cycle_B = criterion_cycle(recov_B, real_B)

        loss_cycle = (loss_cycle_A + loss_cycle_B) / 2

        # Total generator loss
        loss_G = loss_GAN + (lambda_cycle * loss_cycle) + (lambda_identity * loss_identity)

        loss_G.backward()
        optimizer_G.step()
        
        # Train Discriminator A
        optimizer_D_A.zero_grad()
        # Real loss
        loss_real = criterion_GAN(D_A(real_A), valid)
        # Fake loss (on batch of previously generated samples)
        fake_A_ = fake_A_buffer.push_and_pop(fake_A)
        loss_fake = criterion_GAN(D_A(fake_A_.detach()), fake)
        # Total loss
        loss_D_A = (loss_real + loss_fake) / 2
        loss_D_A.backward()
        optimizer_D_A.step()
        
        # Train Discriminator B
        optimizer_D_B.zero_grad()
        # Real loss
        loss_real = criterion_GAN(D_B(real_B), valid)
        # Fake loss (on batch of previously generated samples)
        fake_B_ = fake_B_buffer.push_and_pop(fake_B)
        loss_fake = criterion_GAN(D_B(fake_B_.detach()), fake)
        # Total loss
        loss_D_B = (loss_real + loss_fake) / 2
        loss_D_B.backward()
        optimizer_D_B.step()

        # Calculate both discriminator losses
        loss_D = (loss_D_A + loss_D_B) / 2
        
        
        # Update statistics
        running_loss_G += loss_G.item() * batch_size
        # Update statistics
        running_loss_D += loss_D.item() * batch_size
    
    # Epoch ends
    epoch_loss_generator = running_loss_G / len(dataloader_train.dataset)
    epoch_loss_discriminator = running_loss_D / len(dataloader_train.dataset)
    
    # Send results to tensorboard
    writer.add_scalar('train/loss_generator', epoch_loss_generator, epoch)
    writer.add_scalar('train/loss_discriminator', epoch_loss_discriminator, epoch)
    
    # Send images to tensorboard
    writer.add_images('train/real_A', real_A, epoch)
    writer.add_images('train/real_B', real_B, epoch)
    writer.add_images('train/G_BA', fake_A, epoch)
    writer.add_images('train/G_AB', fake_B, epoch)
    writer.add_images('train/G_AB', fake_B, epoch)

 12%|█▏        | 23/200 [2:06:43<16:15:01, 330.52s/it]