## WGAN with Gradient Penalty

Youtube Resource: https://youtu.be/pG0QZ7OddX4

Sorta Insightful Blog Link: https://www.alexirpan.com/2017/02/22/wasserstein-gan.html

Wasserstein GAN: https://arxiv.org/abs/1701.07875

Improved Training of Wasserstein GAN: https://arxiv.org/abs/1704.00028

In [20]:
print("Wasserstein GAN Implementation from Scratch")

Wasserstein GAN Implementation from Scratch


In [21]:
# make sure to load tensorboard or reload if already loaded 
%load_ext tensorboard

The tensorboard extension is already loaded. To reload it, use:
  %reload_ext tensorboard


In [22]:
# start the tensorboard server (default is port 6006)
%tensorboard --logdir=logs/ --reload_interval=5

Reusing TensorBoard on port 6006 (pid 83363), started 0:28:24 ago. (Use '!kill 83363' to kill it.)

In [23]:
import torch
import torch.nn as nn

In [24]:
class Discriminator(nn.Module):
    def __init__(self, channels_img, features_d):
        super(Discriminator, self).__init__()
        self.disc = nn.Sequential(
            # Input: N x channels_img x 64 x 64
            nn.Conv2d(channels_img, features_d, kernel_size=4, stride=2, padding=1),    # 32x32
            nn.LeakyReLU(0.2),
            self._block(features_d, features_d * 2, 4,2,1),     # 16x16
            self._block(features_d * 2, features_d * 4, 4,2,1), # 8x8
            self._block(features_d*4, features_d * 8, 4,2,1),   # 4x4
            nn.Conv2d(features_d*8, 1, kernel_size=4, stride=2, padding=0),
        )

    def _block(self, in_channels, out_channels, kernel_size, stride, padding):
        return nn.Sequential(
            nn.Conv2d(in_channels, out_channels, kernel_size, stride, padding, bias=False,),
            nn.InstanceNorm2d(out_channels, affine=True),    # LayerNorm <-> InstanceNorm | affine=True -> so that it has learnable parameters
            nn.LeakyReLU(0.2),
        )

    def forward(self,x):
        return self.disc(x)
        

In [25]:

class Generator(nn.Module):
    def __init__(self, z_dim, channels_img, features_g):
        super(Generator, self).__init__()
        self.gen = nn.Sequential(
            # Input: N x z_dim x 1 x 1
            self._block(z_dim, features_g*16, 4,1,0),   # N x f_g*16 x 4 x 4
            self._block(features_g*16, features_g*8, 4,2,1),    # 8x8
            self._block(features_g*8, features_g*4, 4,2,1),     # 16x16
            self._block(features_g*4, features_g*2, 4,2,1),     # 32x32
            nn.ConvTranspose2d(
                features_g*2, channels_img, kernel_size=4, stride=2, padding=1,
            ),      # 64x64
            nn.Tanh(),  # [-1,1]  
        )

    def _block(self, in_channels, out_channels, kernel_size, stride, padding):
        return nn.Sequential(
            nn.ConvTranspose2d(
                in_channels, out_channels, kernel_size, stride, padding, bias=False,
            ),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(),
        )

    def forward(self, x):
        return self.gen(x)

In [26]:
# Mean=0, standatd deviaiton = 0.02
def initialize_weights(model):
    for m in model.modules():
        if isinstance(m, (nn.Conv2d, nn.ConvTranspose2d, nn.BatchNorm2d)):
            nn.init.normal_(m.weight.data, 0.0, 0.02)
            

In [27]:
def test():
    N, in_channels, H, W = 8,3,64,64
    z_dim = 100
    x = torch.randn((N, in_channels, H, W))
    disc = Discriminator(in_channels, 8)
    initialize_weights(disc)
    assert disc(x).shape == (N,1,1,1)
    
    gen = Generator(z_dim, in_channels, 8)
    z = torch.randn((N, z_dim, 1, 1))
    initialize_weights(gen)
    
    assert gen(z).shape == (N, in_channels, H, W)
    print("Success")

In [28]:
test()

Success


In [29]:
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision
import torchvision.datasets as datasets
import torchvision.transforms as transforms
from torch.utils.data import DataLoader
from torch.utils.tensorboard import SummaryWriter
# from model import Discriminator, Generator, initialize_weights

In [30]:
device = torch.device("mps" if torch.backends.mps.is_available() else "cpu")
LEARNING_RATE = 1e-4
BATCH_SIZE = 64
IMAGE_SIZE = 64
CHANNELS_IMG = 3    # make it 1 if you are training on a grayscale image
Z_DIM = 100
NUM_EPOCHS = 5      # It will take a lot of time coz there are lots of images to process
FEATURES_DISC = 64
FEATURES_GEN = 64
CRITIC_ITERATIONS = 5
LAMBDA_GP = 10

In [31]:
transforms = transforms.Compose(
    [
        transforms.Resize(IMAGE_SIZE),
        transforms.ToTensor(),
        transforms.Normalize(
            [0.5 for _ in range(CHANNELS_IMG)], [0.5 for _ in range(CHANNELS_IMG)]
        ),
    ]
)

In [32]:
# dataset = datasets.MNIST(root="dataset/", train=True, transform=transforms, download=True)
dataset = datasets.ImageFolder(root="../dcgan/celebA_dataset/", transform=transforms)

dataloader = DataLoader(dataset, batch_size=BATCH_SIZE, shuffle=True)

gen = Generator(Z_DIM, CHANNELS_IMG, FEATURES_GEN).to(device)
critic = Discriminator(CHANNELS_IMG, FEATURES_DISC).to(device)

initialize_weights(gen)
initialize_weights(critic)

opt_gen = optim.Adam(gen.parameters(), lr=LEARNING_RATE, betas=(0.0,0.9))
opt_critic = optim.Adam(critic.parameters(), lr=LEARNING_RATE, betas=(0.0,0.9))


fixed_noise = torch.randn(32, Z_DIM, 1,1).to(device)
writer_real = SummaryWriter(f"logs/real")
writer_fake = SummaryWriter(f"logs/fake")

step = 0

## Send both the networks to training modes
gen.train()
critic.train()

Discriminator(
  (disc): Sequential(
    (0): Conv2d(3, 64, kernel_size=(4, 4), stride=(2, 2), padding=(1, 1))
    (1): LeakyReLU(negative_slope=0.2)
    (2): Sequential(
      (0): Conv2d(64, 128, kernel_size=(4, 4), stride=(2, 2), padding=(1, 1), bias=False)
      (1): InstanceNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=False)
      (2): LeakyReLU(negative_slope=0.2)
    )
    (3): Sequential(
      (0): Conv2d(128, 256, kernel_size=(4, 4), stride=(2, 2), padding=(1, 1), bias=False)
      (1): InstanceNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=False)
      (2): LeakyReLU(negative_slope=0.2)
    )
    (4): Sequential(
      (0): Conv2d(256, 512, kernel_size=(4, 4), stride=(2, 2), padding=(1, 1), bias=False)
      (1): InstanceNorm2d(512, eps=1e-05, momentum=0.1, affine=True, track_running_stats=False)
      (2): LeakyReLU(negative_slope=0.2)
    )
    (5): Conv2d(512, 1, kernel_size=(4, 4), stride=(2, 2))
  )
)

In [33]:
def gradient_penalty(critic, real, fake, device="mps"):
    BATCH_SIZE, C, H, W = real.shape
    epsilon = torch.rand((BATCH_SIZE,1,1,1)).repeat(1,C,H,W).to(device)
    interpolated_images = real * epsilon + fake *(1-epsilon)

    # Calculate critic scores
    mixed_scores = critic(interpolated_images)

    # Computing gradient of the mixed scores with respect to the interpolated images
    gradient = torch.autograd.grad(
        inputs = interpolated_images,
        outputs = mixed_scores,
        grad_outputs = torch.ones_like(mixed_scores),
        create_graph = True,
        retain_graph = True,
    )[0]

    gradient = gradient.view(gradient.shape[0],-1)      # flatten all the other dimensions of the the gradient
    gradient_norm = gradient.norm(2, dim=1)     # L2 normalization
    gradient_penalty = torch.mean((gradient_norm-1)**2)
    return gradient_penalty

In [34]:
for epoch in range(NUM_EPOCHS):
    for batch_idx, (real, _) in enumerate(dataloader):
        real = real.to(device)
        curr_batch_size = real.shape[0]

        for _ in range(CRITIC_ITERATIONS):
            noise = torch.randn(curr_batch_size, Z_DIM, 1, 1).to(device)
            fake = gen(noise)
            critic_real = critic(real).reshape(-1)
            critic_fake = critic(fake).reshape(-1)

            # Add the gradient penalty to the loss
            gp = gradient_penalty(critic, real, fake, device=device)

            loss_critic = (
                -(torch.mean(critic_real) - torch.mean(critic_fake)) +
                LAMBDA_GP*gp
                )
            critic.zero_grad()
            loss_critic.backward(retain_graph=True)
            opt_critic.step()


        ### Train Generator: min -E(critic(gen_fake))
        output = critic(fake).reshape(-1)
        loss_gen = -torch.mean(output)
        gen.zero_grad()
        loss_gen.backward()
        opt_gen.step()
        
        ###  Print Losses occasionally and print to tensorboard
        if batch_idx % 100 == 0 and batch_idx > 0:
            print(f"Epoch [{epoch}/{NUM_EPOCHS} Batch {batch_idx}/{len(dataloader)} \n Loss D: {loss_critic:.4f}, Loss G: {loss_gen:.4f}]")

            with torch.no_grad():
                fake = gen(fixed_noise)
                # Take out (upto) 32 examples
                img_grid_real = torchvision.utils.make_grid(
                    real[:32], normalize=True
                )
                img_grid_fake = torchvision.utils.make_grid(
                    fake[:32], normalize=True
                )

                writer_real.add_image("Real", img_grid_real, global_step=step)

                writer_fake.add_image("Fake", img_grid_fake, global_step=step)
            
            step += 1


RuntimeError: The size of tensor a (64) must match the size of tensor b (78) at non-singleton dimension 2

In [16]:
%tensorboard --logdir=logs/ --reload_interval=5

Reusing TensorBoard on port 6008 (pid 56358), started 0:00:13 ago. (Use '!kill 56358' to kill it.)