In [1]:
import numpy as np
import pickle
import pandas as pd
import json
import math
import torch
import string
from random import randint
from collections import defaultdict, Counter
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms, utils
import torch.nn as nn
from torch.autograd import Variable
from torch import optim
import torchvision

import torch.nn.functional as F
import torch.nn as nn
import os
from torchvision import utils
from itertools import chain
import torch.utils.data as data_utils

import random
from tqdm import tqdm
from inception_score import get_inception_score

In [2]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
device

device(type='cuda')

In [3]:
print(torch.__version__)


1.10.2


In [4]:
# custom weights initialization called on gen_model and disc_model
def weights_init(m):
    classname = m.__class__.__name__
    if classname.find('Conv') != -1:
        m.weight.data.normal_(0.0, 0.02)
    elif classname.find('BatchNorm') != -1:
        m.weight.data.normal_(1.0, 0.02)
        m.bias.data.fill_(0)

In [5]:
class Generator(nn.Module):
    def __init__(self):
        super(Generator, self).__init__()

        self.label_emb = nn.Embedding(10, 100)

        self.init_size = 32 // 4  # Initial size before upsampling
        self.l1 = nn.Sequential(nn.Linear(100, 128 * self.init_size ** 2))

        self.conv_blocks = nn.Sequential(
            nn.BatchNorm2d(128),
            nn.Upsample(scale_factor=2),
            nn.Conv2d(128, 128, 3, stride=1, padding=1),
            nn.BatchNorm2d(128, 0.8),
            nn.LeakyReLU(0.2, inplace=True),
            nn.Upsample(scale_factor=2),
            nn.Conv2d(128, 64, 3, stride=1, padding=1),
            nn.BatchNorm2d(64, 0.8),
            nn.LeakyReLU(0.2, inplace=True),
            nn.Conv2d(64, 3, 3, stride=1, padding=1),
            nn.Tanh(),
        )

    def forward(self, noise, labels):
        gen_input = torch.mul(self.label_emb(labels), noise)
        out = self.l1(gen_input)
        out = out.view(out.shape[0], 128, self.init_size, self.init_size)
        img = self.conv_blocks(out)
        return img

In [6]:
class Discriminator(nn.Module):
    def __init__(self):
        super(Discriminator, self).__init__()

        def discriminator_block(in_filters, out_filters, bn=True):
            """Returns layers of each discriminator block"""
            block = [nn.Conv2d(in_filters, out_filters, 3, 2, 1), nn.LeakyReLU(0.2, inplace=True), nn.Dropout2d(0.25)]
            if bn:
                block.append(nn.BatchNorm2d(out_filters, 0.8))
            return block

        self.conv_blocks = nn.Sequential(
            *discriminator_block(3, 16, bn=False),
            *discriminator_block(16, 32),
            *discriminator_block(32, 64),
            *discriminator_block(64, 128),
        )

        # The height and width of downsampled image
        ds_size = 32 // 2 ** 4

        # Output layers
        self.adv_layer = nn.Sequential(nn.Linear(128 * ds_size ** 2, 1), nn.Sigmoid())
        self.aux_layer = nn.Sequential(nn.Linear(128 * ds_size ** 2, 10), nn.Softmax())

    def forward(self, img):
        out = self.conv_blocks(img)
        out = out.view(out.shape[0], -1)
        validity = self.adv_layer(out)
        label = self.aux_layer(out)

        return validity, label

In [7]:
disc_model=Discriminator()
gen_model=Generator()
disc_model.to(device)
gen_model.to(device)


Generator(
  (label_emb): Embedding(10, 100)
  (l1): Sequential(
    (0): Linear(in_features=100, out_features=8192, bias=True)
  )
  (conv_blocks): Sequential(
    (0): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (1): Upsample(scale_factor=2.0, mode=nearest)
    (2): Conv2d(128, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (3): BatchNorm2d(128, eps=0.8, momentum=0.1, affine=True, track_running_stats=True)
    (4): LeakyReLU(negative_slope=0.2, inplace=True)
    (5): Upsample(scale_factor=2.0, mode=nearest)
    (6): Conv2d(128, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (7): BatchNorm2d(64, eps=0.8, momentum=0.1, affine=True, track_running_stats=True)
    (8): LeakyReLU(negative_slope=0.2, inplace=True)
    (9): Conv2d(64, 3, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (10): Tanh()
  )
)

In [8]:
gen_model

Generator(
  (label_emb): Embedding(10, 100)
  (l1): Sequential(
    (0): Linear(in_features=100, out_features=8192, bias=True)
  )
  (conv_blocks): Sequential(
    (0): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (1): Upsample(scale_factor=2.0, mode=nearest)
    (2): Conv2d(128, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (3): BatchNorm2d(128, eps=0.8, momentum=0.1, affine=True, track_running_stats=True)
    (4): LeakyReLU(negative_slope=0.2, inplace=True)
    (5): Upsample(scale_factor=2.0, mode=nearest)
    (6): Conv2d(128, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (7): BatchNorm2d(64, eps=0.8, momentum=0.1, affine=True, track_running_stats=True)
    (8): LeakyReLU(negative_slope=0.2, inplace=True)
    (9): Conv2d(64, 3, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (10): Tanh()
  )
)

In [9]:
disc_model

Discriminator(
  (conv_blocks): Sequential(
    (0): Conv2d(3, 16, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1))
    (1): LeakyReLU(negative_slope=0.2, inplace=True)
    (2): Dropout2d(p=0.25, inplace=False)
    (3): Conv2d(16, 32, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1))
    (4): LeakyReLU(negative_slope=0.2, inplace=True)
    (5): Dropout2d(p=0.25, inplace=False)
    (6): BatchNorm2d(32, eps=0.8, momentum=0.1, affine=True, track_running_stats=True)
    (7): Conv2d(32, 64, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1))
    (8): LeakyReLU(negative_slope=0.2, inplace=True)
    (9): Dropout2d(p=0.25, inplace=False)
    (10): BatchNorm2d(64, eps=0.8, momentum=0.1, affine=True, track_running_stats=True)
    (11): Conv2d(64, 128, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1))
    (12): LeakyReLU(negative_slope=0.2, inplace=True)
    (13): Dropout2d(p=0.25, inplace=False)
    (14): BatchNorm2d(128, eps=0.8, momentum=0.1, affine=True, track_running_stats=True)
  )
  (ad

In [10]:
def create_dataset(batchsize):
    trans = transforms.Compose([
            transforms.Resize(32),
            transforms.ToTensor(),
            transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5)),
        ])

    train_dataset = torchvision.datasets.CIFAR10(root='./files/', train=True, download=True, transform=trans)
    test_dataset = torchvision.datasets.CIFAR10(root='./files/', train=False, download=True, transform=trans)

    
    train_dataloader = data_utils.DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    test_dataloader = data_utils.DataLoader(test_dataset,  batch_size=batch_size, shuffle=True)
    return train_dataloader,test_dataloader

In [11]:
learn_rate=0.0002
epochs=100
batch_size=64
beta_1,beta_2=0.5,0.999

In [12]:
gen_optim=torch.optim.Adam(gen_model.parameters(), lr=learn_rate,betas=(beta_1,beta_2))
disc_optim=torch.optim.Adam(disc_model.parameters(), lr=learn_rate,betas=(beta_1,beta_2))

In [13]:
source_criterion = nn.BCELoss()
class_criterion = nn.NLLLoss()

In [14]:
input = torch.FloatTensor(batch_size, 3, 32, 32).to(device)
noise = torch.FloatTensor(batch_size, 100, 1, 1).to(device)
fixed_noise = torch.FloatTensor(batch_size, 100, 1, 1).normal_(0, 1).to(device)
source_label = torch.FloatTensor(batch_size).to(device)
class_label = torch.LongTensor(batch_size).to(device)
real_label = 1
fake_label = 0

In [15]:
input = Variable(input)
source_label = Variable(source_label)
class_label = Variable(class_label)
noise = Variable(noise)
fixed_noise = Variable(fixed_noise)
fixed_noise_ = np.random.normal(0, 1, (batch_size, 100))
random_label = np.random.randint(0, 10, batch_size)
random_onehot = np.zeros((batch_size, 10))
random_onehot[np.arange(batch_size), random_label] = 1
fixed_noise_[np.arange(batch_size), :10] = random_onehot[np.arange(batch_size)]

In [16]:
def save_model(gen,disc):
    torch.save(gen.state_dict(), './generator_agan.pkl')
    torch.save(disc.state_dict(), './discriminator_agan.pkl')
    print('Models save to ./generator_agan.pkl & ./discriminator_agan.pkl ')

In [17]:
def test(predict, labels):
    correct = 0
    pred = predict.data.max(1)[1]
    correct = pred.eq(labels.data).cpu().sum()
    return correct, len(labels.data)

In [18]:
FloatTensor = torch.cuda.FloatTensor 
LongTensor = torch.cuda.LongTensor

In [19]:
def train(train_loader):
    file = open("inception_score_graph_acgan.txt", "w")
    generator_iter = 0

    for epoch in tqdm(range(epochs)):
        for i, (imgs, labels) in enumerate(train_loader):
            batch_size = imgs.shape[0]

            # Adversarial ground truths
            valid = torch.ones(batch_size).to(device)
            fake = torch.zeros(batch_size).to(device)

            # Configure input
            real_imgs = Variable(imgs.type(FloatTensor)).to(device)
            labels = Variable(labels.type(LongTensor)).to(device)

            # -----------------
            #  Train Generator
            # -----------------

            gen_optim.zero_grad()

            # Sample noise and labels as generator input
            z = Variable(FloatTensor(np.random.normal(0, 1, (batch_size, 100))))
            gen_labels = Variable(LongTensor(np.random.randint(0, 10, batch_size)))

            # Generate a batch of images
            gen_imgs = gen_model(z, gen_labels)

            # Loss measures generator's ability to fool the discriminator
            validity, pred_label = disc_model(gen_imgs)
            g_loss = 0.5 * (source_criterion(validity, valid.unsqueeze(1)) + class_criterion(pred_label, gen_labels))

            g_loss.backward()
            gen_optim.step()

            # ---------------------
            #  Train Discriminator
            # ---------------------

            disc_optim.zero_grad()

            # Loss for real images
            real_pred, real_aux = disc_model(real_imgs)
            d_real_loss = (source_criterion(real_pred, valid.unsqueeze(1)) + class_criterion(real_aux, labels)) / 2

            # Loss for fake images
            fake_pred, fake_aux = disc_model(gen_imgs.detach())
            d_fake_loss = (source_criterion(fake_pred, fake.unsqueeze(1)) + class_criterion(fake_aux, gen_labels)) / 2

            # Total discriminator loss
            d_loss = (d_real_loss + d_fake_loss) / 2
            d_loss.backward()
            disc_optim.step()
            generator_iter+=1
            if generator_iter % 1000 == 0:
                # Workaround because graphic card memory can't store more than 800+ examples in memory for generating image
                # Therefore doing loop and generating 800 examples and stacking into list of samples to get 8000 generated images
                # This way Inception score is more correct since there are different generated examples from every class of Inception model
                sample_list = []
                for i in range(10):
                    z = Variable(FloatTensor(np.random.normal(0, 1, (batch_size, 100))))
                    gen_labels = Variable(LongTensor(np.random.randint(0, 10, batch_size)))
                    samples = gen_model(z, gen_labels)
                    sample_list.append(samples.data.cpu().numpy())

                # Flattening list of lists into one list of numpy arrays
                new_sample_list = list(chain.from_iterable(sample_list))
    #             print("Calculating Inception Score over 8k generated images")
                # Feeding list of numpy arrays
                inception_score = get_inception_score(new_sample_list, cuda=True, batch_size=32,
                                                      resize=True, splits=10)
    #             print('Epoch-{}'.format(epoch + 1))

                if not os.path.exists('training_result_images_acgan_real/'):
                    os.makedirs('training_result_images_acgan_real/')
                if not os.path.exists('training_result_images_acgan_fake/'):
                    os.makedirs('training_result_images_acgan_fake/')
                # Denormalize images and save them in grid 8x8
                z = Variable(FloatTensor(np.random.normal(0, 1, (batch_size, 100))))
                gen_labels = Variable(LongTensor(np.random.randint(0, 10, batch_size)))
                samples = gen_model(z, gen_labels)
                samples = samples.mul(0.5).add(0.5)
                samples = samples.data.cpu()[:64]
    #             grid = utils.make_grid(samples)
                utils.save_image(samples, 'training_result_images_acgan_fake/img_generatori_iter_{}.png'.format(str(generator_iter).zfill(3)))
                utils.save_image(imgs,'training_result_images_acgan_real/img_generatori_iter_real_{}.png'.format(str(generator_iter).zfill(3)))
                #print("Inception score: {}".format(inception_score))

                # Write to file inception_score, gen_iters, time
                output = str(generator_iter) + " " + str(inception_score[0]) + "\n"
                file.write(output)



    file.close()



    # do checkpointing
    save_model(gen_model,disc_model)

In [20]:
def generate_images(noise, number_of_images):
    samples = gen(z).data.cpu().numpy()[:number_of_images]
    generated_images = []
    for sample in samples:
        generated_images.append(sample.reshape(3, 32, 32))
    return generated_images

In [21]:
def load_model(D_model_filename, G_model_filename):
    D_model_path = os.path.join(os.getcwd(), D_model_filename)
    G_model_path = os.path.join(os.getcwd(), G_model_filename)
    disc_model.load_state_dict(torch.load(D_model_path))
    gen_model.load_state_dict(torch.load(G_model_path))
    print('Generator model loaded from {}.'.format(G_model_path))
    print('Discriminator model loaded from {}-'.format(D_model_path))

In [22]:
epochs

100

In [23]:
train_loader,_=create_dataset(batch_size)
train(train_loader)


Files already downloaded and verified
Files already downloaded and verified


  input = module(input)
  return F.softmax(x).data.cpu().numpy()
100%|██████████| 100/100 [25:58<00:00, 15.59s/it]

Models save to ./generator_agan.pkl & ./discriminator_agan.pkl 





In [24]:
def evaluate(test_loader, D_model_path='./discriminator_dcgan.pkl', G_model_path='./generator_dcgan.pkl'):
#     load_model( G_model_path,D_model_path)
    z = Variable(FloatTensor(np.random.normal(0, 1, (batch_size, 100))))
    gen_labels = Variable(LongTensor(np.random.randint(0, 10, batch_size)))
    samples = gen_model(z, gen_labels)

    samples = samples.mul(0.5).add(0.5)
    samples = samples.data.cpu()
    grid = utils.make_grid(samples)
    print("Grid of 8x8 images saved to 'acgan_model_image.png'.")
    utils.save_image(grid, 'acgan_model_image.png')

In [26]:
_,test_loader=create_dataset(batch_size)
evaluate(test_loader)

Files already downloaded and verified
Files already downloaded and verified
Grid of 8x8 images saved to 'acgan_model_image.png'.
