In [1]:
import numpy as np
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import time

from torch.utils.data import Dataset, DataLoader
from torchvision import datasets, transforms


import os
from datetime import datetime
from torch.utils.tensorboard import SummaryWriter


import wandb

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")


Using device: cpu


In [2]:
FOLDER_PATH = '/content/drive/MyDrive/Deep Learning/ex3_305673212_312349509/FashionMNIST'
if (os.path.exists(FOLDER_PATH)):
  path = FOLDER_PATH
else:
  path = "data" #for git runs

In [3]:
#hyper parameters
#TODO: the WGAN paper states lr= 5e-5, should we use it?
MODE = 'wgan-gp' # dcgan, wgan, or wgan-gp
DIM = 64 # Model dimensionality
BATCH_SIZE = 50 # Batch size
CRITIC_ITERS = 5 # For WGAN and WGAN-GP, number of critic iters per gen iter
LAMBDA = 10 # Gradient penalty lambda hyperparameter
ITERS = 200000 # How many generator iterations to train for
OUTPUT_DIM = 784 # Number of pixels in MNIST (28*28)
LATENT_DIM = 128
in_channels = 1


# W&B





In [4]:
#TODO: remove before submission
import wandb
from mycreds import WANDB_API_KEY
wandb.login(key=WANDB_API_KEY)



wandb: Currently logged in as: nadavo11 (nadavoteam). Use `wandb login --relogin` to force relogin
wandb: Appending key for api.wandb.ai to your netrc file: C:\Users\nadav\_netrc


True

# Data Preprocessing
## Data loaders
let's prepare our data by loading it, normalizing it, and creating the data loaders.

In [5]:


# train_data_raw = open(f'{path}/ptb.train.txt', 'r').read()
# test_data_raw = open(f'{path}/ptb.test.txt', 'r').read()
# valid_data_raw = open(f'{path}/ptb.valid.txt', 'r').read()
# data =  train_data_raw + ' ' + test_data_raw + ' ' + valid_data_raw
#
# # Create dataloaders
# train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, drop_last=True)
# valid_loader = DataLoader(valid_dataset, batch_size=batch_size, shuffle=False, drop_last=True)
# test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False, drop_last=True)
#
# test_data_iter = iter(test_loader)
# train_data_iter = iter(train_loader)
# valid_data_iter = iter(valid_loader)

In [6]:
# TODO: can we use this for FashionMNIST instead?

# Define the transform to convert the images to tensors and normalize them
transform = transforms.Compose([
    transforms.ToTensor(),
    # transforms.RandomRotation((-30, 30)),
    # transforms.RandomHorizontalFlip(),
])

# Download and load the training data
train_dataset = datasets.FashionMNIST(root='data', train=True, download=True, transform=transform)
test_dataset = datasets.FashionMNIST(root='data', train=False, download=True, transform=transform)

# Create DataLoader for training and testing
train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False)

# Network Architecture
We will implemnet gernerator and descriminator/critic as defined in "Improved Training of Wasserstein GANs" papaer for CIFAR10, then we will make adjustments to be compatible to FashuionMNIST data set

MNIST/ Fashion MNIST Input Dimensionsimages are 28x28 grayscale images, while CIFAR-10 images are 32x32 RGB images. We will modify the input and output layers of the networks accordingly.



# generator

### conv dimensions
convolution dimensions are calculated as follows: $ x = \frac{W - k + 2P}{S} + 1 $ deconv dimensions are calculated as follows: $ x = S(W-1) + k - 2P + F $ where: - W is the input image size - k is the kernel size - P is the padding - S is the stride - F is the output padding


In [7]:

#Input to Generator is noise which can be random or not, nosie dimention is 128

class Generator(nn.Module):

    def __init__(self,
                 dim=DIM,
                 mode='wgan',
                 latent_dim = LATENT_DIM):

        super(Generator, self).__init__()

        self.dim = dim
        if mode: self.mode = mode.lower()
        self.latent_dim = latent_dim

        # 1. Fully connected:
        # 128 -> 4*4*4*dim
        self.linear = nn.Linear(latent_dim,
                                4*4*4*dim)

        self.bn1 = nn.BatchNorm1d(4*4*4*dim)

        # 2. deConv
        # (4x4) 4dim -> (10x10) 2dim
        self.deconv2 = nn.ConvTranspose2d(4*dim,
                                          2*dim,
                                          kernel_size=5,
                                          stride=2,
                                          padding=2,
                                          output_padding=1)
        self.bn2 = nn.BatchNorm2d(2*dim)

        # 3. deConv
        # (12x12) 2dim -> (24x24) dim
        self.deconv3 = nn.ConvTranspose2d(2*dim,
                                          dim,
                                          kernel_size=5,
                                          stride=2,
                                          padding=2,
                                          output_padding=1)
        self.bn3 = nn.BatchNorm2d(dim)

        # 4. deConv
        #(24x24) dim -> (28x28) 3
        self.deconv4 = nn.ConvTranspose2d(dim,
                                          3,
                                          kernel_size=5,
                                          stride=1,
                                          padding=0,
                                          output_padding=0)


    # TODO : complete forward function
    #  check if we need to add noise to the forward function
    def forward(self, n_samples, noise=None):
        if noise is None:
            noise = torch.randn(n_samples, self.latent_dim).to(device)

        # 1. Fully connected
        output = self.linear(noise)
        if self.mode == 'wgan':
            output = self.bn1(output)
        output = F.relu(output)
        output = output.view(-1, 4*self.dim, 4, 4)

        # 2.Deconv
        output = self.deconv2(output)
        if self.mode == 'wgan':
            output = self.bn2(output)
        output = F.relu(output)

        # 3. Deconv
        output = self.deconv3(output)
        output = self.bn3(output)
        output = F.relu(output)

        # 4. Deconv
        output = self.deconv4(output)
        output = torch.tanh(output)

        return output






 # Discriminator

convolution dimensions are calculated as follows: $ x = \frac{W - k + 2P}{S} + 1 $ deconv dimensions are calculated as follows: $ x = S(W-1) + k - 2P + F $ where: - W is the input image size - k is the kernel size - P is the padding - S is the stride - F is the output padding

In [8]:
class Discriminator(nn.Module):
    def __init__(self,
                dim = DIM,
                mode = 'wgan',
                in_channels = 3,):

        super(Discriminator, self).__init__()
        if mode:
            self.mode = mode.lower()
        self.conv1 = nn.Conv2d(kernel_size=5,
                             in_channels=in_channels,
                             out_channels=dim,
                             stride=2,) # 12x12

        self.conv2 = nn.Conv2d(kernel_size=5,
                                in_channels=dim,
                                out_channels=2*dim,
                                stride=2,) # 4x4
        # Unique to WGAN
        self.batch_norm2 = nn.BatchNorm2d(2*dim)

        self.conv3 = nn.Conv2d(kernel_size=1,
                                in_channels=2*dim,
                                out_channels=4*dim,
                                stride=1,) # 4x4
        # Unique to WGAN
        self.batch_norm3 = nn.BatchNorm2d(4*4*4*dim)

        self.fc = nn.Linear(4*4*4*dim, 1)


    def forward(self, x):

        x = F.leaky_relu(self.conv1(x), negative_slope=0.2)

        x = self.conv2(x)

        if self.mode == 'wgan':
            x = self.bn2(x)

        x = F.leaky_relu(x, negative_slope=0.2)

        x = self.conv3(x)

        if self.mode == 'wgan':
            x = self.bn3(x)

        x = F.leaky_relu(x, negative_slope=0.2)

        x = x.view(-1, 4*4*4*self.dim)  # Flatten
        x = self.fc(x)
        return x


In [9]:
class GAN(nn.Module):
    def __init__(self,
                 dim=DIM,
                 mode='wgan',
                 latent_dim=LATENT_DIM,
                 ):
        super(GAN, self).__init__()
        self.generator = Generator(dim = dim,
                                   mode = mode,
                                   latent_dim= latent_dim)
        self.discriminator = Discriminator(dim = dim,
                                           mode = mode,)


# Evaluation
we define our evaluation metric as the inception score:

In [10]:
# TODO: Implement inception score function
def inception_score():
    pass

# TODO : Implement the evaluator loop
def evaluate():
    pass

#### Test the generator
can be removed before submission

In [None]:
generator = Generator()
img = generator.forward(10)


plt.imshow(img[0].transpose(0,2).detach().numpy(), cmap='gray')
plt.show()

Clipping input data to the valid range for imshow with RGB data ([0..1] for floats or [0..255] for integers).


#Lost Function/ Inception

In [None]:

class GAN:
  def __init__(self, dim, mode, train_loader):
    self.generator = Generator(dim)
    self.discriminator = Discriminator(in_features, input_img_w, input_img_h, DIM, kernel_size)
    self.mode = mode
    self.gen_optimizer = optim.RMSprop(self.generator.parameters(), lr=5e-5)
    self.disc_optimizer = optim.RMSprop(self.discriminator.parameters(), lr=5e-5)
    self.setup_optimizers()
    self.train_loader = train_loader

    # NADAV/REUT: Do we need to defrenciate the optimizers?

    #def setup_optimizers(self):
    #     if self.mode == 'wgan':
    #         self.gen_optimizer = optim.RMSprop(self.generator.parameters(), lr=5e-5)
    #         self.disc_optimizer = optim.RMSprop(self.discriminator.parameters(), lr=5e-5)
    #     elif self.mode == 'dcgan':
    #         self.gen_optimizer = optim.Adam(self.generator.parameters(), lr=2e-4, betas=(0.5, 0.999))
    #         self.disc_optimizer = optim.Adam(self.discriminator.parameters(), lr=2e-4, betas=(0.5, 0.999))

    def inf_train_gen(self):
        while True:
            for images, _ in self.train_loader:
                yield images

    def generate_image(self, frame):
        fixed_noise_128 = torch.randn(128, 128).to(device)
        samples = self.generator(128, fixed_noise_128).detach().cpu().numpy()
        samples = ((samples + 1.) * (255. / 2)).astype('int32')
        save_image(torch.tensor(samples).view(128, 3, 32, 32), 'samples_{}.jpg'.format(frame))

    def get_inception_score(self):
        samples_100 = self.generator(100).detach().cpu().numpy()
        all_samples = []
        for _ in range(10):
            all_samples.append(samples_100)
        all_samples = np.concatenate(all_samples, axis=0)
        all_samples = ((all_samples + 1.) * (255. / 2)).astype('int32')
        all_samples = all_samples.reshape((-1, 3, 32, 32)).transpose(0, 2, 3, 1)
        return lib.inception_score.get_inception_score(list(all_samples))



    def train(self):
        for iteration in range(self.iters):
            start_time = time.time()
            # Train generator
            if iteration > 0:
                self.gen_optimizer.zero_grad()
                fake_data = self.generator(self.batch_size)
                disc_fake = self.discriminator(fake_data)
                gen_cost = -torch.mean(disc_fake) if self.mode == 'wgan' else F.binary_cross_entropy_with_logits(disc_fake, torch.ones_like(disc_fake))
                gen_cost.backward()
                self.gen_optimizer.step()

            # Train critic
            disc_iters = 1 if self.mode == 'dcgan' else self.critic_iters
            for _ in range(disc_iters):
                _data = next(self.inf_train_gen()).to(device)
                self.disc_optimizer.zero_grad()
                disc_real = self.discriminator(_data)
                fake_data = self.generator(self.batch_size)
                disc_fake = self.discriminator(fake_data)

                disc_cost = torch.mean(disc_fake) - torch.mean(disc_real) if self.mode == 'wgan' else \
                            (F.binary_cross_entropy_with_logits(disc_fake, torch.zeros_like(disc_fake)) +
                             F.binary_cross_entropy_with_logits(disc_real, torch.ones_like(disc_real))) / 2.
                disc_cost.backward()
                self.disc_optimizer.step()

                if self.mode == 'wgan':
                    for p in self.discriminator.parameters():
                        p.data.clamp_(-0.01, 0.01)

            # Logging
            print(f"Iteration {iteration}, Disc Cost: {disc_cost.item()}, Time: {time.time() - start_time}")

            # Calculate inception score every 1K iters
            if iteration % 1000 == 999:
                inception_score = self.get_inception_score()
                print(f"Inception Score: {inception_score[0]}")

            # Generate samples every 100 iters
            if iteration % 100 == 99:
                self.generate_image(iteration)







