<a href="https://www.kaggle.com/code/nikhilsumesh/ml-final-project?scriptVersionId=152884718" target="_blank"><img align="left" alt="Kaggle" title="Open in Kaggle" src="https://kaggle.com/static/images/open-in-kaggle.svg"></a>

# ML Final Project
Names: Rohit Reddy Goli, Nikhil Sumesh

#### We will be creating a GAN to generate fake pokemon 
Here, we import the neccesary libraries

In [None]:
import torch
from torch.utils.data import DataLoader, ConcatDataset
from torchvision.datasets import ImageFolder
import torchvision.transforms as T
from torchvision.utils import make_grid
import matplotlib.pyplot as plt
%matplotlib inline
from tqdm import tqdm
import random
import torch.nn as nn 
import torch.nn.functional as F


Here, we will process the dataset of images and add new images of pokemon that have been recolored and mirrored/upside down. First, we set our directory and our image size for our tensors.

In [None]:
image_directory = "/kaggle/input/mganset/mgan_dataset/mgan_dataset"
image_size = 64
batch_size = 16
normalize_factor = (0.5,0.5,0.5),(0.5,0.5,0.5) #normalize data with mean and std

Now, we will create our dataset from the given pokemon we will work with

In [None]:
normal_dataset = ImageFolder(image_directory,transform=T.Compose([
    T.Resize(image_size),
    T.CenterCrop(image_size),
    T.ToTensor(),
    T.Normalize(*normalize_factor)]))
colored_dataset = ImageFolder(image_directory,transform=T.Compose([
    T.Resize(image_size),
    T.CenterCrop(image_size),
    T.ColorJitter(brightness=(1.5,2), hue=(-0.5,0.5)),
    T.ToTensor(),
    T.Normalize(*normalize_factor)]))

#dataset = [normal_dataset, upside_dataset,mirror_dataset]
dataset = [colored_dataset, normal_dataset]
dataset = ConcatDataset(dataset)
dataloader = DataLoader(dataset, batch_size, shuffle=True, num_workers=2)

Since we normalized the images that we converted to tensors, we will now denormalize them for viewing purposes

In [None]:
def denormalize(image):
    return image * normalize_factor[1][0] + normalize_factor[0][0]

Now, we will display a batch of images which our GAN will use

In [None]:
def display_img(images, nmax=64):
    fig, ax = plt.subplots(figsize = (32,32))
    ax.set_xticks([])
    ax.set_yticks([])
    ax.imshow(make_grid(images.detach()[:nmax], nrow=16).permute(1,2,0))
    plt.title(label="Pokemon Dataset Images",)

def show(dataloader, nmax=64):
    for images, _ in dataloader:
        display_img(images,nmax)
        test_image = images[random.randint(0, len(images)-1)]
        print(test_image.shape)
        break


show(dataloader)


Discriminator Model below

In [None]:
disc_1 = nn.Sequential(
    nn.Conv2d(3, 64, kernel_size=4, stride=2, padding=1, bias=False),
    nn.BatchNorm2d(64),
    nn.LeakyReLU(0.2, inplace=True),
    
    nn.Conv2d(64, 128, kernel_size=4, stride=2, padding=1, bias=False),
    nn.BatchNorm2d(128),
    nn.LeakyReLU(0.2, inplace=True),
    
    nn.Conv2d(128, 128, kernel_size=4, stride=2, padding=1, bias=False),
    nn.BatchNorm2d(128),
    nn.LeakyReLU(0.2, inplace=True),
    
    nn.Conv2d(128, 128, kernel_size=4, stride=2, padding=1, bias=False),
    nn.BatchNorm2d(128),
    nn.LeakyReLU(0.2, inplace=True),

    nn.Conv2d(128, 1, kernel_size=4, stride=2, padding=0, bias=False),
    nn.Flatten(),
    nn.Sigmoid())

In [None]:
seed_size = 3
gen_1 = nn.Sequential(
nn.ConvTranspose2d(seed_size, 128, kernel_size=4, stride=2, padding=0, bias=False),
nn.BatchNorm2d(128),
nn.ReLU(),

nn.ConvTranspose2d(128, 128, kernel_size=4, stride=2, padding = 1, bias=False),
nn.BatchNorm2d(128),
nn.ReLU(),

nn.ConvTranspose2d(128, 128, kernel_size=4, stride = 2, padding=1, bias=False),
nn.BatchNorm2d(128),
nn.ReLU(),

nn.ConvTranspose2d(128,64, kernel_size=4, stride=2, padding=1, bias=False),
nn.BatchNorm2d(64),
nn.ReLU(),



nn.ConvTranspose2d(64, 3, kernel_size=4, stride=2, padding=1, bias=False),
nn.Tanh()
)

### GPU Setup

In [None]:
def get_training_device():
    # Use the GPU if possible
    if torch.cuda.is_available():
        return torch.device('cuda')
    # Otherwise use the CPU :-(
    return torch.device('cpu')

def to_device(data, device):
    # This moves the tensors to the device (GPU, CPU)
    if isinstance(data, (list,tuple)):
        return [to_device(x, device) for x in data]
    return data.to(device, non_blocking=True)

class DeviceDataLoader():
    """Wrap a dataloader to move data to a device"""
    def __init__(self, dataloader, device):
        self.dataloader = dataloader
        self.device = device
        
    def __iter__(self):
        """Yield a batch of data after moving it to device"""
        for b in self.dataloader: 
            yield to_device(b, self.device)

    def __len__(self):
        """Number of batches"""
        return len(self.dataloader)

device = get_training_device()
dev_dataloader = DeviceDataLoader(dataloader, device)
device

Training for Discriminator and Generator

In [None]:
generator = gen_1
discriminator = disc_1
discriminator = to_device(discriminator, device)
generator = to_device(generator, device)

In [None]:
def train_discriminator(dataset, optim):
   
    optim.zero_grad()
    prediction = discriminator(dataset)
    targets = torch.rand(dataset.size(0), 1) * (0.1-0) + 0
    print(targets.size())
    loss = F.binary_cross_entropy(prediction, targets)
    score = torch.mean(prediction).item()

    generated_pokemon = generator(dataset)
    g_prediction = discriminator(generated_pokemon)
    g_targets = torch.rand(generated_pokemon.size(0),1) * (1-0.9)+0.9
    g_loss = F.binary_cross_entropy(g_prediction, g_targets)
    g_score = torch.mean(g_prediction).item()

    total_loss = loss + g_loss
    total_loss.backward()
    optim.step()
    return total_loss.item(), score, g_score

In [None]:
def train_generator(optim):
    discriminator = discriminator
    optim.zero_grad()
    latent_batch = torch.randn(batch_size, seed_size, 1,1)
    generated_pokemon = generator(latent_batch)

    prediction = discriminator(generated_pokemon)
    targets = torch.zeros(generated_pokemon.size(0), 1)
    loss = F.binary_cross_entropy(prediction, targets)

    loss.backward()
    optim.step()
    return loss.item()

In [None]:
import os
from torchvision.utils import save_image

RESULTS_DIR = 'results'
os.makedirs(RESULTS_DIR, exist_ok=True)


In [None]:
def save_results(index, latent_batch, show=True):
    # Generate fake pokemon
    
    fake_pokemon = generator(latent_batch)
    
    # Make the filename for the output
    fake_file = "result-image-{0:0=4d}.png".format(index)
    
    # Save the image
    save_image(denormalize(fake_pokemon), os.path.join(RESULTS_DIR, fake_file), nrow=8)
    print("Result Saved!")
    
    if show:
        fig, ax = plt.subplots(figsize=(8, 8))
        ax.set_xticks([]); ax.set_yticks([])
        ax.imshow(make_grid(fake_pokemon.cpu().detach(), nrow=8).permute(1, 2, 0))


Full Training Loop

In [None]:
def train(epochs, learning_rate, start_idx=1):
    # Empty the GPU cache to save some memory
    fixed_latent_batch = torch.randn(64, seed_size, 1, 1, device=device)
    
    torch.cuda.empty_cache()
    # Track losses and scores
    disc_losses = []
    disc_scores = []
    gen_losses = []
    gen_scores = []
    
    # Create the optimizers
    disc_optimizer = torch.optim.Adam(discriminator.parameters(), lr=learning_rate, betas=(0.5, 0.9))
    gen_optimizer = torch.optim.Adam(generator.parameters(), lr=learning_rate, betas=(0.5, 0.9))
    
    # Run the loop
    for epoch in range(epochs):
        # Go through each image
        for real_img, _ in tqdm(dev_dataloader):
            # Train the discriminator
            disc_loss, real_score, gen_score = train_discriminator(real_img, disc_optimizer)

            gen_loss = train_generator(real_img, gen_optimizer)
        disc_losses.append(disc_loss)
        disc_scores.append(real_score)
        gen_scores.append(gen_score)
        gen_losses.append(gen_loss)

        print("Epoch [{}/{}], gen_loss: {:.4f}, disc_loss: {:.4f}, real_score: {:.4f}, gen_score: {:.4f}".format(
            epoch+start_idx, epochs, gen_loss, disc_loss, real_score, gen_score))
        
        # Save the images and show the progress
        save_results(epoch + start_idx, fixed_latent_batch, show=False)
    
    # Return stats
    return disc_losses, disc_scores, gen_losses, gen_scores

In [None]:
device = get_training_device()
device

def debug_memory():
    import collections, gc, resource, torch
    print('maxrss = {}'.format(
        resource.getrusage(resource.RUSAGE_SELF).ru_maxrss))
    tensors = collections.Counter((str(o.device), o.dtype, tuple(o.shape))
                                  for o in gc.get_objects()
                                  if torch.is_tensor(o))
    for line in tensors.items():
        print('{}\t{}'.format(*line))

mem_debug = False
if mem_debug:
    debug_memory()

# Clean up everything
cleanup = False
if cleanup:
    import gc
    del dev_dataloader
    del discriminator
    del generator
    dev_dataloader = None
    discriminator = None
    generator = None
    gc.collect()
    torch.cuda.empty_cache()

# Re-initialize the device dataloader
dev_dataloader = DeviceDataLoader(dataloader, device)

##### Time to Train

In [None]:
lr = 1e-03
epochs = 50

data = train(epochs, lr)