# CIS6930 Week 4: Generative Adversarial Networks (Student version)

---

Preparation: Go to `Runtime > Change runtime type` and choose `GPU` for the hardware accelerator.



In [None]:
gpu_info = !nvidia-smi -L
gpu_info = "\n".join(gpu_info)
if gpu_info.find("failed") >= 0:
    print("Not connected to a GPU")
else:
    print(gpu_info)

## Preparation

In [None]:
# Please create a Kaggle account and accept the term before downloading the dataset
# https://www.kaggle.com/c/dogs-vs-cats
# This is for educational purpose and do not distribute the link
!gdown --id 1c_GFLUlW7nB1mwlhNT9t4a2jfilIKPNt

In [None]:
!unzip dogs-vs-cats.zip
!unzip train.zip
!unzip test1.zip

In [None]:
!ls train

In [None]:
from PIL import Image

Image.open("train/dog.1.jpg")

In [None]:
Image.open("train/cat.1.jpg")

In [None]:
import copy
import random
from time import time
from typing import Any, Dict

import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch import optim
from torch.utils.data import Dataset, TensorDataset, DataLoader
import torchvision
import torchvision.transforms as transforms

from matplotlib import pyplot as plt

### Organizing data directory

In [None]:
#!mkdir -p images/cat
#!mv train/cat.*.jpg images/cat/
!mkdir -p images/dog
!mv train/dog.*.jpg images/dog/

### Load dataset after normalization

In [None]:
image_size = 64 # 128, 224
dataset = torchvision.datasets.ImageFolder(
    root="images", # it considers subdirectory as the class name
    transform=transforms.Compose(
        [transforms.Resize((image_size, image_size)),
         transforms.CenterCrop(image_size),
         transforms.ToTensor(),
         transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))]))

In [None]:
# Utility function
def denormalize_image(x):
    """https://discuss.pytorch.org/t/what-is-the-most-simple-way-to-reobtain-a-pil-image-after-normalization/85355/3
    """
    min_i = x.min(dim=(1), keepdim=True).values.min(dim=(2), keepdim=True).values
    max_i = x.max(dim=(1), keepdim=True).values.max(dim=(2), keepdim=True).values
    x = ((x - min_i) / (max_i - min_i)) * 255
    return transforms.ToPILImage()(x.type(torch.uint8))

plt.imshow(transforms.ToPILImage()(dataset[0][0]))
plt.show()
plt.imshow(denormalize_image(dataset[0][0]))
plt.show()

Alternatively, `torchvision.utils.make_grid()` is a go-to option.

In [None]:
dataloader = DataLoader(dataset, batch_size=16,
                        shuffle=True)
batch = next(iter(dataloader))
plt.imshow(np.transpose(torchvision.utils.make_grid(batch[0][:64],
                                         padding=2,
                                         normalize=True).cpu(),
                        (1,2,0)))

## DCGAN Model

The following code is based on [this PyTorch tutorial](https://pytorch.org/tutorials/beginner/dcgan_faces_tutorial.html) with some modification (adding more comments.)



### Generator

In [None]:
class Generator(nn.Module):
    def __init__(self,
                 n_z: int = 100,
                 n_f: int = 64, # image_size
                 n_c: int = 3,
                 relu_slope: float = 0.2 # LeakyReLU slope
                 ):
        super().__init__()
        self.network = nn.Sequential(
            # (B, n_z, 1, 1) -> (B, n_f * 8, 4, 4)
            nn.ConvTranspose2d(in_channels=n_z,
                               out_channels=n_f * 8,
                               kernel_size=4,
                               stride=1,
                               padding=0,
                               bias=False),
            nn.BatchNorm2d(n_f * 8),
            #nn.LeakyReLU(negative_slope=relu_slope, inplace=True),
            nn.ReLU(inplace=True),
            # (B, n_f * 8, 4, 4) -> (B, n_f * 4, 8, 8)
            nn.ConvTranspose2d(in_channels=n_f * 8,
                               out_channels=n_f * 4,
                               kernel_size=4,
                               stride=2,
                               padding=1,
                               bias=False),
            nn.BatchNorm2d(n_f * 4),
            #nn.LeakyReLU(negative_slope=relu_slope, inplace=True),
            nn.ReLU(True),
            # (B, n_f * 4, 8, 8) -> (B, n_f * 2, 16, 16)
            nn.ConvTranspose2d(in_channels=n_f * 4,
                               out_channels=n_f * 2,
                               kernel_size=4,
                               stride=2,
                               padding=1,
                               bias=False),
            nn.BatchNorm2d(n_f * 2),
            # nn.LeakyReLU(negative_slope=0.2, inplace=True),
            nn.ReLU(True),
            # (B, n_f * 2, 16, 16) -> (B, n_f, 32, 32)
            nn.ConvTranspose2d(in_channels=n_f * 2,
                               out_channels=n_f,
                               kernel_size=4,
                               stride=2,
                               padding=1,
                               bias=False),
            nn.BatchNorm2d(n_f),
            nn.ReLU(True),
            # nn.LeakyReLU(negative_slope=0.2, inplace=True),
            # (B, n_f, 32, 32) -> (B, n_c, 64, 64)
            nn.ConvTranspose2d(in_channels=n_f,
                               out_channels=n_c,
                               kernel_size=4,
                               stride=2,
                               padding=1,
                               bias=False),
            nn.Tanh()
            # Output shape: (B, n_c, 64, 64)
        )

    def forward(self, x):
        return self.network(x)

### Discriminator

In [None]:
class Discriminator(nn.Module):
    def __init__(self,
                 n_f: int = 64, # image_size
                 n_c: int = 3,
                 relu_slope: float = 0.2):
        super().__init__()
        self.network = nn.Sequential(
            # (B, n_c, 64, 64) -> (B, n_f, 32, 32)
            nn.Conv2d(in_channels=n_c,
                      out_channels=n_f,
                      kernel_size=4,
                      stride=2,
                      padding=1, bias=False),
            nn.LeakyReLU(negative_slope=0.2, inplace=True),
            # (B, n_f, 32, 32) -> (B, n_f * 2, 16, 16)
            nn.Conv2d(in_channels=n_f,
                      out_channels=n_f * 2,
                      kernel_size=4,
                      stride=2,
                      padding=1,
                      bias=False),
            nn.BatchNorm2d(n_f * 2),
            nn.LeakyReLU(negative_slope=0.2, inplace=True),
            # (B, n_f * 2, 16, 16) -> (B, n_f * 4, 8, 8)
            nn.Conv2d(in_channels=n_f * 2,
                      out_channels=n_f * 4,
                      kernel_size=4,
                      stride=2,
                      padding=1, bias=False),
            nn.BatchNorm2d(n_f * 4),
            nn.LeakyReLU(negative_slope=0.2, inplace=True),
            # (B, n_f * 4, 16, 16) -> (B, n_f * 8, 4, 4)
            nn.Conv2d(in_channels=n_f * 4,
                      out_channels=n_f * 8,
                      kernel_size=4,
                      stride=2,
                      padding=1,
                      bias=False),
            nn.BatchNorm2d(n_f * 8),
            nn.LeakyReLU(negative_slope=0.2, inplace=True),
            # (B, n_f * 8, 4, 4) -> (B, 1, 1, 1)
            nn.Conv2d(in_channels=n_f * 8,
                      out_channels=1,
                      kernel_size=4,
                      stride=1,
                      padding=0,
                      bias=False),
            nn.Sigmoid()
        )

    def forward(self, x):
        return self.network(x)

### Custom weight initialization function

In [None]:
# custom weights initialization for Generator and Discriminator
def weights_init(m):
    classname = m.__class__.__name__
    if classname.find("Conv") != -1:
        nn.init.normal_(m.weight.data, 0.0, 0.02)
    elif classname.find("BatchNorm") != -1:
        nn.init.normal_(m.weight.data, 1.0, 0.02)
        nn.init.constant_(m.bias.data, 0)

### Training script

In [None]:
## ===============
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

n_z = 100

lr = 0.0002
beta1 = 0.5  # <- 0.9
beta2 = 0.999  # 0.999

num_epochs = 20
batch_size = 16 # 128 is the original configuration

random_seed = 1
## ===============

# Random Seeds ===============
torch.manual_seed(random_seed)
random.seed(random_seed)
np.random.seed(random_seed)
# Random Seeds ===============


# Create models
model_D = Discriminator(n_f=image_size).to(device)
model_D.apply(weights_init)

model_G = Generator(n_f=image_size).to(device)
model_G.apply(weights_init)

# Initialize BCELoss function
criterion = nn.BCELoss()

# Create batch of latent vectors that we will use to visualize
#  the progression of the generator
fixed_noise = torch.randn(image_size,
                          n_z,
                          1,
                          1, device=device)

# Establish convention for real and fake labels during training
real_label = 1.
fake_label = 0.

# Setup Adam optimizers for both G and D
optimizer_D = optim.Adam(model_D.parameters(), lr=lr, betas=(beta1, beta2))
optimizer_G = optim.Adam(model_G.parameters(), lr=lr, betas=(beta1, beta2))

# Data loader
dataloader = DataLoader(dataset,
                        batch_size=batch_size,
                        shuffle=True,
                        drop_last=True)

img_list = []
G_losses = []
D_losses = []
iters = 0

print("Starting Training Loop...")
# For each epoch
for epoch in range(num_epochs):
    for i, batch in enumerate(dataloader):
        ############################
        # (1) Update D network: maximize log(D(x)) + log(1 - D(G(z)))
        ###########################
        ## Train with all-real batch
        model_D.train()
        model_G.train()
        X, _ = batch  # (B, 3, 64, 64)
        X = X.to(device)
        model_D.zero_grad()
        # create labels (all 1) for Discriminator
        label = torch.full((X.shape[0],),
                           real_label,
                           dtype=torch.float,
                           device=device)
        # Predictions for real images by D (i.e., [0, 1])
        output = model_D(X).view(-1) # (B, 3, 64, 64) -> (B, 1)
        # Calculate BCE loss
        errD_real = criterion(output, label)
        # Calculate gradients for D by backpropagation
        errD_real.backward()
        D_x = output.mean().item()

        ## Train with all-fake batch
        # Generate batch of latent vectors
        noise = torch.randn(batch_size, n_z, 1, 1, device=device)
        # Generate fake image batch with G
        fake = model_G(noise)
        label.fill_(fake_label)
        # Classify all fake batch with D
        output = model_D(fake.detach()).view(-1)
        # Calculate D's loss on the all-fake batch
        errD_fake = criterion(output, label)
        # Calculate the gradients for this batch, accumulated (summed) with previous gradients
        errD_fake.backward()
        D_G_z1 = output.mean().item()
        # Compute error of D as sum over the fake and the real batches
        errD = errD_real + errD_fake
        # Update D
        optimizer_D.step()

        ############################
        # (2) Update G network: maximize log(D(G(z)))
        ###########################
        model_G.zero_grad()
        label.fill_(real_label)  # fake labels are real for generator cost
        # Since we just updated D, perform another forward pass of all-fake batch through D
        output = model_D(fake).view(-1)
        # Calculate G's loss based on this output
        errG = criterion(output, label)
        # Calculate gradients for G
        errG.backward()
        D_G_z2 = output.mean().item()
        # Update G
        optimizer_G.step()  # Note that this does not update D

        # Output training stats
        if i % 50 == 0:
            print('[%d/%d][%d/%d]\tLoss_D: %.4f\tLoss_G: %.4f\tD(x): %.4f\tD(G(z)): %.4f / %.4f'
                  % (epoch, num_epochs, i, len(dataloader),
                     errD.item(), errG.item(), D_x, D_G_z1, D_G_z2))

        # Save Losses for plotting later
        G_losses.append(errG.item())
        D_losses.append(errD.item())

        # Check how the generator is doing by saving G's output on fixed_noise
        model_D.eval()
        model_G.eval()

    # Save generated images for each epoch
    with torch.no_grad():
        fake = model_G(fixed_noise).detach().cpu()
    img_list.append(torchvision.utils.make_grid(fake, padding=2, normalize=True))

### Training losses for Generator/Discriminator

In [None]:
plt.figure(figsize=(10,5))
plt.title("Generator and Discriminator Loss During Training")
plt.plot(G_losses,label="G")
plt.plot(D_losses,label="D")
plt.xlabel("iterations")
plt.ylabel("Loss")
plt.legend()
plt.show()

### Visualize generated images

In [None]:
from IPython.display import HTML
import matplotlib.animation as animation
fig = plt.figure(figsize=(8,8))
plt.axis("off")
ims = [[plt.imshow(np.transpose(i,(1,2,0)), animated=True)] for i in img_list]
ani = animation.ArtistAnimation(fig, ims, interval=1000, repeat_delay=1000, blit=True)

HTML(ani.to_jshtml())

### Download the animation

In [None]:
filename = "dcgan_dogs.mp4" 
ani.save(filename)

from google.colab import files
files.download(filename)  # Note: this only works with Google Chrome