In [1]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [2]:
!tar -xf '/content/cat.tar' -C '/content/'


In [3]:
import matplotlib.pyplot as plt


In [4]:
def get_data_loader(data_path, opts):
    """Creates data loaders.
    """
    # Basic transformations
    basic_transform = transforms.Compose([
        transforms.Resize(opts.image_size, Image.BICUBIC),
        transforms.ToTensor(),
        transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5)),
    ])

    # Deluxe transformations
    deluxe_transform = transforms.Compose([
        transforms.Resize(int(opts.image_size * 1.1)),  # Resize to slightly larger than final size
        transforms.RandomCrop(opts.image_size),  # Randomly crop to the final size
        transforms.RandomHorizontalFlip(),  # Randomly flip the images horizontally
        transforms.ToTensor(),
        transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5)),
    ])

    # Choose the transform based on the option
    transform = deluxe_transform if opts.data_aug == 'deluxe' else basic_transform

    dataset = CustomDataSet(os.path.join('data/', data_path), opts.ext, transform)
    dloader = DataLoader(dataset=dataset, batch_size=opts.batch_size, shuffle=True, num_workers=opts.num_workers)

    return dloader


In [5]:
# CMU 16-726 Learning-Based Image Synthesis / Spring 2021, Assignment 3
# The code base is based on the great work from CSC 321, U Toronto
# https://www.cs.toronto.edu/~rgrosse/courses/csc321_2018/assignments/a4-code.zip

import glob
import os
import PIL.Image as Image
from torch.utils.data import DataLoader, Dataset
from torchvision import transforms


class CustomDataSet(Dataset):
    """Load images under folders"""
    def __init__(self, main_dir, ext='*.png', transform=None):
        self.main_dir = main_dir
        self.transform = transform
        all_imgs = glob.glob(os.path.join(main_dir, ext))
        self.total_imgs = all_imgs
        print(os.path.join(main_dir, ext))
        print(len(self))

    def __len__(self):
        return len(self.total_imgs)

    def __getitem__(self, idx):
        img_loc = self.total_imgs[idx]
        image = Image.open(img_loc).convert("RGB")
        tensor_image = self.transform(image)
        return tensor_image, 0.

def get_data_loader(data_path, opts):
    """Creates data loaders.
    """
    basic_transform = transforms.Compose([
        transforms.Resize(opts.image_size, Image.BICUBIC),
        transforms.ToTensor(),
        transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5)),
    ])

    if opts.data_aug == 'basic':
        transform = basic_transform
    elif opts.data_aug == 'deluxe':
        load_size = int(1.1 * opts.image_size)
        transform = transforms.Compose([
            transforms.Resize((load_size, load_size), Image.BICUBIC),
            transforms.RandomCrop(opts.image_size),
            transforms.RandomHorizontalFlip(),
            transforms.ToTensor(),
            transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5)),
        ])

    dataset = CustomDataSet(os.path.join('data/', data_path), opts.ext, transform)
    dloader = DataLoader(dataset=dataset, batch_size=opts.batch_size, shuffle=True, num_workers=opts.num_workers)

    return dloader

### models.py (Part B and C only Implementation)

I commented out the CycleGenerator class since it was needed.

#### DC Generator:

The generator is defined by a series of deconvolutional (transpose convolution) layers (deconv). Deconvolutional layers are used to upsample the input noise vector to generate realistic images.


The forward method takes a noise vector z as input and passes it through the defined deconvolutional layers with ReLU activation functions.
The output of the last layer uses the hyperbolic tangent (F.tanh) activation function, which scales the output to values between -1 and 1, suitable for image data.

#### DCDiscriminator

The discriminator is defined by a series of convolutional layers (conv). Convolutional layers are used to downsample the input image and extract hierarchical features.

The forward method takes an input tensor x and passes it through the defined convolutional layers with ReLU activation functions.
The output of the last convolutional layer is passed through a final convolutional layer with no activation function (F.relu) and no normalization (norm=None).
The output is squeezed to remove dimensions of size 1, resulting in a 1-dimensional tensor.

In [6]:

import torch
import torch.nn as nn
from torch.nn import init
import torch.nn.functional as F


def deconv(in_channels, out_channels, kernel_size, stride=2, padding=1, norm='batch'):
    """Creates a transposed-convolutional layer, with optional batch normalization.
    """
    layers = []
    layers.append(nn.ConvTranspose2d(in_channels, out_channels, kernel_size, stride, padding, bias=False))
    if norm == 'batch':
        layers.append(nn.BatchNorm2d(out_channels))
    elif norm == 'instance':
        layers.append(nn.InstanceNorm2d(out_channels))

    return nn.Sequential(*layers)


def conv(in_channels, out_channels, kernel_size, stride=2, padding=1, norm='batch', init_zero_weights=False):
    """Creates a convolutional layer, with optional batch normalization.
    """
    layers = []
    conv_layer = nn.Conv2d(in_channels=in_channels, out_channels=out_channels, kernel_size=kernel_size, stride=stride, padding=padding, bias=False)
    if init_zero_weights:
        init.normal_(conv_layer.weight, mean=0.0, std=0.02)
    layers.append(conv_layer)

    if norm == 'batch':
        layers.append(nn.BatchNorm2d(out_channels))
    elif norm == 'instance':
        layers.append(nn.InstanceNorm2d(out_channels))
    return nn.Sequential(*layers)

class DCGenerator(nn.Module):
    def __init__(self, noise_size, conv_dim=64):
        super(DCGenerator, self).__init__()

        ###########################################
        ##   FILL THIS IN: CREATE ARCHITECTURE   ##
        ###########################################

        self.deconv1 = deconv(noise_size, conv_dim * 4, 4, 1, 0, norm='instance')
        self.deconv2 = deconv(conv_dim * 4, conv_dim * 2, 4, 2, 1, norm='instance')
        self.deconv3 = deconv(conv_dim * 2, conv_dim, 4, 2, 1, norm='instance')
        self.deconv4 = deconv(conv_dim, 3, 4, 2, 1, norm='instance')
        self.deconv5 = deconv(3, 3, 4, 2, 1, norm=None)  # No normalization in the last layer

    def forward(self, z):
        """Generates an image given a sample of random noise.

            Input
            -----
                z: BS x noise_size x 1 x 1   -->  16x100x1x1

            Output
            ------
                out: BS x channels x image_width x image_height  -->  16x3x32x32
        """


        ###########################################
        ##   FILL THIS IN: FORWARD PASS   ##
        ###########################################
        x = F.leaky_relu(self.deconv1(z), negative_slope=0.2)
        x = F.leaky_relu(self.deconv2(x), negative_slope=0.2)
        x = F.leaky_relu(self.deconv3(x), negative_slope=0.2)
        x = F.leaky_relu(self.deconv4(x), negative_slope=0.2)
        x = F.tanh(self.deconv5(x))
        return x


class ResnetBlock(nn.Module):
    def __init__(self, conv_dim, norm):
        super(ResnetBlock, self).__init__()
        self.conv_layer = conv(in_channels=conv_dim, out_channels=conv_dim, kernel_size=3, stride=1, padding=1, norm=norm)

    def forward(self, x):
        out = x + self.conv_layer(x)
        return out


class DCDiscriminator(nn.Module):
    """Defines the architecture of the discriminator network.
       Note: Both discriminators D_X and D_Y have the same architecture in this assignment.
    """
    def __init__(self, conv_dim=64, norm='batch'):
        super(DCDiscriminator, self).__init__()

        ###########################################
        ##   FILL THIS IN: CREATE ARCHITECTURE   ##
        ###########################################

        self.conv1 = conv(3, conv_dim, 4, 2, padding=1, norm='instance')
        self.conv2 = conv(conv_dim, conv_dim * 2, 4, 2, padding=1, norm='instance')
        self.conv3 = conv(conv_dim * 2, conv_dim * 4, 4, 2, padding=1, norm='instance')
        self.conv4 = conv(conv_dim * 4, conv_dim * 8, 4, 2, padding=1, norm='instance')
        self.conv5 = conv(conv_dim * 8, 1, 4, 1, padding=0, norm=None)  # No padding in the last layer

    def forward(self, x):
        out = F.relu(self.conv1(x))

        ###########################################
        ##   FILL THIS IN: FORWARD PASS   ##
        ###########################################
        out = F.leaky_relu(self.conv1(x), negative_slope=0.2)
        out = F.leaky_relu(self.conv2(out), negative_slope=0.2)
        out = F.leaky_relu(self.conv3(out), negative_slope=0.2)
        out = F.leaky_relu(self.conv4(out), negative_slope=0.2)
        out = self.conv5(out).squeeze()
        return out



### vanilla_gan.py (Part D)

Creates generator (G) and discriminator (D) models.
Sets up Adam optimizers for both generator and discriminator.

Computes discriminator loss on real and fake images.
Computes generator loss.
Updates discriminator parameters every two iterations.
Prints and logs loss information.
Saves generated samples and model parameters at specified intervals.


 the script generates images and saves them to the specified directories during training, providing a visual representation of the training progress.


 Here, (D(real_images) - 1) represents the difference between the discriminator's output on real images and the target value (1 for real images).


In [7]:
from models import DCGenerator, DCDiscriminator


In [13]:
import os
import warnings
import imageio
import numpy as np
import torch
import torch.optim as optim
from torch.utils.tensorboard import SummaryWriter
from torchvision import transforms
import utils
from data_loader import get_data_loader
from models import DCGenerator, DCDiscriminator

# Ignore warnings
warnings.filterwarnings("ignore")

# Seed for reproducibility
SEED = 11
np.random.seed(SEED)
torch.manual_seed(SEED)
if torch.cuda.is_available():
    torch.cuda.manual_seed(SEED)

# Manually set the options usually passed as command-line arguments
class Options:
    image_size = 64
    conv_dim = 64
    noise_size = 100
    num_epochs = 100
    batch_size = 64
    num_workers = 2
    lr = 0.0002
    beta1 = 0.5
    beta2 = 0.999
    data = '/content/cat/grumpifyBprocessed'  # Update this path
    data_aug = 'deluxe'
    ext = '*.png'
    checkpoint_dir = '/content/checkpoints_vanilla'  # Update this path
    sample_dir = '/content/vanilla_samples'  # Update this path
    log_step = 10
    sample_every = 200
    checkpoint_every = 400

opts = Options()

# Then you can use an instance of this class as your arguments.
args = Args()

def print_models(G, D):
    """Prints model information for the generators and discriminators.
    """
    print("                    G                  ")
    print("---------------------------------------")
    print(G)
    print("---------------------------------------")

    print("                    D                  ")
    print("---------------------------------------")
    print(D)
    print("---------------------------------------")


def create_model(opts):
    """Builds the generators and discriminators.
    """
    G = DCGenerator(noise_size=opts.noise_size, conv_dim=opts.conv_dim)
    D = DCDiscriminator(conv_dim=opts.conv_dim)

    print_models(G, D)

    if torch.cuda.is_available():
        G.cuda()
        D.cuda()
        print('Models moved to GPU.')

    return G, D


def create_image_grid(array, ncols=None):
    """
    """
    num_images, channels, cell_h, cell_w = array.shape

    if not ncols:
        ncols = int(np.sqrt(num_images))
    nrows = int(np.math.floor(num_images / float(ncols)))
    result = np.zeros((cell_h*nrows, cell_w*ncols, channels), dtype=array.dtype)
    for i in range(0, nrows):
        for j in range(0, ncols):
            result[i*cell_h:(i+1)*cell_h, j*cell_w:(j+1)*cell_w, :] = array[i*ncols+j].transpose(1, 2, 0)

    if channels == 1:
        result = result.squeeze()
    return result


def checkpoint(iteration, G, D, opts):
    """Saves the parameters of the generator G and discriminator D.
    """
    G_path = os.path.join(opts.checkpoint_dir, 'G_iter%d.pkl' % iteration)
    D_path = os.path.join(opts.checkpoint_dir, 'D_iter%d.pkl' % iteration)
    torch.save(G.state_dict(), G_path)
    torch.save(D.state_dict(), D_path)


def save_samples(G, fixed_noise, iteration, opts):
    generated_images = G(fixed_noise)
    generated_images = utils.to_data(generated_images)

    # Convert the data type to uint8
    generated_images = (generated_images * 255).astype(np.uint8)

    grid = create_image_grid(generated_images)

    # Save the grid as an image
    path = os.path.join(opts.sample_dir, 'sample-{:06d}.png'.format(iteration))
    imageio.imwrite(path, grid)
    print('Saved {}'.format(path))



def save_images(images, iteration, opts, name):
    grid = create_image_grid(utils.to_data(images))

    # Convert pixel values to uint8
    grid = (grid * 255).astype(np.uint8)

    path = os.path.join(opts.sample_dir, '{:s}-{:06d}.png'.format(name, iteration))
    imageio.imwrite(path, grid)
    print('Saved {}'.format(path))



def sample_noise(dim):
    """
    Generate a PyTorch Variable of uniform random noise.

    Input:
    - batch_size: Integer giving the batch size of noise to generate.
    - dim: Integer giving the dimension of noise to generate.

    Output:
    - A PyTorch Variable of shape (batch_size, dim, 1, 1) containing uniform
      random noise in the range (-1, 1).
    """
    return utils.to_var(torch.rand(batch_size, dim) * 2 - 1).unsqueeze(2).unsqueeze(3)


def training_loop(train_dataloader, opts):
    """Runs the training loop.
        * Saves checkpoints every opts.checkpoint_every iterations
        * Saves generated samples every opts.sample_every iterations
    """

    # Create generators and discriminators
    G, D = create_model(opts)

    # Create optimizers for the generators and discriminators
    g_optimizer = optim.Adam(G.parameters(), opts.lr, [opts.beta1, opts.beta2])
    d_optimizer = optim.Adam(D.parameters(), opts.lr, [opts.beta1, opts.beta2])



    # Generate fixed noise for sampling from the generator
    fixed_noise = sample_noise(opts.noise_size)  # batch_size x noise_size x 1 x 1

    iteration = 1

    total_train_iters = opts.num_epochs * len(train_dataloader)

    for epoch in range(opts.num_epochs):

        for batch in train_dataloader:

            real_images, labels = batch
            real_images, labels = utils.to_var(real_images), utils.to_var(labels).long().squeeze()

            #######################################python vanilla_gan.py --num_epochs=100 --data_aug=deluxe#########
            ###         TRAIN THE DISCRIMINATOR         ####
            ################################################

            d_optimizer.zero_grad()

            # FILL THIS IN
            # 1. Compute the discriminator loss on real images
            D_real_loss = torch.mean((D(real_images) - 1) ** 2)

            # 2. Sample noise
            noise = sample_noise(opts.noise_size)

            # 3. Generate fake images from the noise
            fake_images = G(noise)

            # 4. Compute the discriminator loss on the fake images
            D_fake_loss = torch.mean(D(fake_images) ** 2)

            D_total_loss = D_real_loss + D_fake_loss
            if iteration % 2 == 0:
                D_total_loss.backward()
                d_optimizer.step()

            ###########################################
            ###          TRAIN THE GENERATOR        ###
            ###########################################

            g_optimizer.zero_grad()

            # FILL THIS IN
            # 1. Sample noise
            noise = sample_noise(opts.noise_size)

            # 2. Generate fake images from the noise
            fake_images = G(noise)

            # 3. Compute the generator loss
            G_loss = torch.mean((D(fake_images) - 1) ** 2)

            G_loss.backward()
            g_optimizer.step()


            # Print the log info
            if iteration % opts.log_step == 0:
                print('Iteration [{:4d}/{:4d}] | D_real_loss: {:6.4f} | D_fake_loss: {:6.4f} | G_loss: {:6.4f}'.format(
                       iteration, total_train_iters, D_real_loss.item(), D_fake_loss.item(), G_loss.item()))
            # todo: add fake loss, real loss, G loss to tensorboard

            # Save the generated samples
            if iteration % opts.sample_every == 0:
                save_samples(G, fixed_noise, iteration, opts)
                save_images(real_images, iteration, opts, 'real')

            # Save the model parameters
            if iteration % opts.checkpoint_every == 0:
                checkpoint(iteration, G, D, opts)

            iteration += 1


def main(opts):
    """Loads the data, creates checkpoint and sample directories, and starts the training loop.
    """

    # Create a dataloader for the training images
    dataloader = get_data_loader(opts.data, opts)

    # Create checkpoint and sample directories
    utils.create_dir(opts.checkpoint_dir)
    utils.create_dir(opts.sample_dir)

    training_loop(dataloader, opts)


def create_parser():
    """Creates a parser for command-line arguments.
    """
    parser = argparse.ArgumentParser()

    # Model hyper-parameters
    parser.add_argument('--image_size', type=int, default=64, help='The side length N to convert images to NxN.')
    parser.add_argument('--conv_dim', type=int, default=32)
    parser.add_argument('--noise_size', type=int, default=100)

    # Training hyper-parameters
    parser.add_argument('--num_epochs', type=int, default=40)
    parser.add_argument('--batch_size', type=int, default=16, help='The number of images in a batch.')
    parser.add_argument('--num_workers', type=int, default=0, help='The number of threads to use for the DataLoader.')
    parser.add_argument('--lr', type=float, default=0.0003, help='The learning rate (default 0.0003)')
    parser.add_argument('--beta1', type=float, default=0.5)
    parser.add_argument('--beta2', type=float, default=0.999)

    # Data sources
    parser.add_argument('--data', type=str, default='cat/grumpifyBprocessed', help='Choose the type of emojis to generate.')
    parser.add_argument('--data_aug', type=str, default='deluxe', help='data augmentation diff / basic / deluxe')
    parser.add_argument('--ext', type=str, default='*.png', help='Choose the type of emojis to generate.')

    # Directories and checkpoint/sample iterations
    parser.add_argument('--checkpoint_dir', type=str, default='./checkpoints_vanilla')
    parser.add_argument('--sample_dir', type=str, default='./vanilla')
    parser.add_argument('--log_step', type=int , default=10)
    parser.add_argument('--sample_every', type=int , default=200)
    parser.add_argument('--checkpoint_every', type=int , default=400)

    return parser


if __name__ == '__main__':

    parser = create_parser()
    opts = parser.parse_args()

    batch_size = opts.batch_size
    opts.sample_dir = os.path.join('output', 'vanilla',
                                   '%s_%s' % (os.path.basename(opts.data), opts.data_aug)).replace('/', '\\')

    if os.path.exists(opts.sample_dir):
        cmd = 'del %s/*' % opts.sample_dir
        os.system(cmd)
    logger = SummaryWriter(opts.sample_dir)
    print(opts)
    main(opts)

usage: colab_kernel_launcher.py [-h] [--image_size IMAGE_SIZE] [--conv_dim CONV_DIM]
                                [--noise_size NOISE_SIZE] [--num_epochs NUM_EPOCHS]
                                [--batch_size BATCH_SIZE] [--num_workers NUM_WORKERS] [--lr LR]
                                [--beta1 BETA1] [--beta2 BETA2] [--data DATA]
                                [--data_aug DATA_AUG] [--ext EXT]
                                [--checkpoint_dir CHECKPOINT_DIR] [--sample_dir SAMPLE_DIR]
                                [--log_step LOG_STEP] [--sample_every SAMPLE_EVERY]
                                [--checkpoint_every CHECKPOINT_EVERY]
colab_kernel_launcher.py: error: unrecognized arguments: -f /root/.local/share/jupyter/runtime/kernel-34eb6f7f-0e53-487b-a57f-15504416d522.json


SystemExit: ignored

In [10]:
import os

print("Current directory:", os.getcwd())
print("Contents:", os.listdir('.'))


Current directory: /content
Contents: ['.config', 'drive', '__pycache__', 'cat.tar', 'models.py', 'cat', 'data_loader.py', 'pokemon.tar', 'utils.py', 'cycle_gan.py', 'vanilla_gan.py', 'sample_data']


## What I learned from this assignment

The vanilla_gan defines the architecture of a GAN with a generator (DCGenerator) and a discriminator (DCDiscriminator). These models are convolutional neural networks designed for image generation tasks.

The loss functions for both the discriminator and generator are based on the mean squared difference between the discriminator's output and target values (1 for real images and 0 for fake images).

The discriminator is trained to minimize the difference between its predictions on real images and the target value (1), and between its predictions on fake images and the target value (0).
The generator is trained to minimize the difference between the discriminator's predictions on generated images and the target value (1).


this project provided me with a example of how CNNs are used in the architecture of GANs for image generation. It demonstrates the importance of convolutional and deconvolutional layers in learning hierarchical features and generating realistic images. Understanding this architecture was valuable for tasks involving image generation, and other computer vision applications.


I used help of ChatGPT for most part , but also I had prior knowledge about CNN so it helped me with the understanding of GANs.