<a href="https://colab.research.google.com/github/thad75/TP_ENSEA_ELEVE/blob/main/3A/SIA/TP2/Generative_Models.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!wget http://efrosgans.eecs.berkeley.edu/pix2pix/datasets/maps.tar.gz
!tar xvzf /content/maps.tar.gz
!pip install pytorch-lightning


In [None]:
import torch
import torch.nn as nn
import torchvision
import torchvision.transforms as transforms
import os
import cv2
from torch.utils.data import DataLoader
from torch.utils.data import Dataset
import random
import torch.optim as optim
import matplotlib.pyplot as plt
torch.manual_seed(42)
import numpy as np
from PIL import Image
from torch.utils.data import DataLoader
from torch.utils.data import random_split
from torchvision.datasets import MNIST
import cv2 as cv
from google.colab.patches import cv2_imshow
import pytorch_lightning as pl
from pytorch_lightning import LightningDataModule, LightningModule, Trainer

# Generating Images

In recent studies, Groundbreaking research are done using diffusion models to generate images. However, previously other models were used to perform that task. Image Generation has multiple application in Industries (NFT Creation, Virtual Try Outs, Deep Fake Generation..)

Goal of this lab : 
* Hands on Generative Models
* Solidify your knowledge in Deep Learning
* Use someone else's code
* Reuse Pytorch Lightning
* Have an Insight of industrial application of AI

## Back to the Basics : AutoEncoders

As you might remember in your 2nd year labs, we used an AutoEncoder to recreate some input images. In this part, we will reconstruct data from the MNIST Dataset.


### Lightning DataModule

We will reuse the datamodule used in the first lab. So just run the following cell.

In [None]:
class MNISTDataModule(pl.LightningDataModule):
    def __init__(self, batch_size):
        super().__init__()
        self.transform = transforms.Compose([transforms.ToTensor(), transforms.Normalize((0.1307,), (0.3081,))])
        self.data_dir = ''
        self.batch_size = 32

    def prepare_data(self):
        # This method is used to download beforehand the dataset if needed.
        MNIST(self.data_dir, train=True, download=True)
        MNIST(self.data_dir, train=False, download=True)
        
    def setup(self, stage):
        # First stage is 'fit' (or None)
        if stage == "fit" or stage is None:
            # We create a validation split to watch the training.
            mnist_train_dataset = MNIST(self.data_dir, train=True, transform=self.transform)
            self.train_size = int(0.8 * len(mnist_train_dataset))
            self.valid_size = len(mnist_train_dataset) - self.train_size
            self.mnist_train, self.mnist_valid =  torch.utils.data.random_split(mnist_train_dataset, [self.train_size, self.valid_size])          
        # Second stage is 'test' 
        if stage == "test" or stage is None:
            self.mnist_test = MNIST(self.data_dir, train=False, transform=self.transform)

    def train_dataloader(self):
        return DataLoader(self.mnist_train, batch_size=self.batch_size, shuffle=True)

    def val_dataloader(self):
        return DataLoader(self.mnist_valid, self.batch_size, shuffle=True)

    def test_dataloader(self):
        return DataLoader(self.mnist_test,self.batch_size, shuffle=True)

### Lightning Module

#### AutoEncoder : Creating and Using a Latent Space

<img src="https://miro.medium.com/max/600/1*nqzWupxC60iAH2dYrFT78Q.png">

Reminder : An AutoEncoder is composed of an Encoder and a Decoder. The Encoder creates a representation of the input datas, called latent space. The Decoder uses the latent space representation of the Input to reconstruct it.

We provide you with a code of an AutoEncoder class. 
* **Using the given code, create an AutoEncoder composed of 3 stacks.**

In [None]:
import torch.nn as nn 
import torch
class ConvDown(nn.Module):
  def __init__(self, in_channels, out_channels):
      super(ConvDown, self).__init__()
      self.in_channels = in_channels
      self.out_channels = out_channels
      self.model = nn.Sequential(nn.Conv2d(in_channels = self.in_channels,
                                            out_channels = self.out_channels,
                                            kernel_size = 3,
                                            stride = 1,
                                            padding = 0,
                                            dilation = 1),
                                  nn.BatchNorm2d(self.out_channels),
                                  nn.Dropout2d(0.5),
                                  nn.LeakyReLU(0.2))
  def forward(self,x):
      return self.model(x)

class ConvUp(nn.Module):
  def __init__(self, in_channels, out_channels):
      super(ConvUp, self).__init__()
      self.in_channels = in_channels
      self.out_channels = out_channels
      self.model = nn.Sequential(nn.ConvTranspose2d(in_channels = self.in_channels,
                                      out_channels = self.out_channels,
                                      kernel_size = 3,
                                      stride = 1,
                                      padding = 0,
                                      dilation = 1),
                    nn.LeakyReLU(0.2))
  def forward(self,x):
      return self.model(x)

class Encoder(nn.Module):
  def __init__(self, in_channels, number_of_stack):
    super(Encoder, self).__init__()
    self.in_channels = in_channels
    self.number_of_stack = number_of_stack
    channels = [in_channels]+ [2**i for i in range(3,10)]
    self.encoder = nn.ModuleList([ConvDown(channels[i], channels[i+1]) for i in range(number_of_stack)])

  def forward(self, x):
    for i, layer in enumerate(self.encoder):
      x = layer(x)
    return x

class Decoder(nn.Module):
  def __init__(self, out_channels,number_of_stack ):
    super(Decoder, self).__init__()
    channels = [out_channels]+ [2**i for i in range(3,10)]
    self.decoder = nn.ModuleList([ConvUp(channels[i+1], channels[i]) for i in range(number_of_stack)])[::-1]

  def forward(self, x):
    for i, layer in enumerate(self.decoder):
      x = layer(x)
    return x

############################################## TODO ##############################################
class AutoEncoder(nn.Module):
  def __init__(self, in_channels, number_of_stack):
    super().__init__()
    self.in_channels = in_channels
    self.number_of_stack = number_of_stack
    self.encoder = Encoder(self.in_channels, number_of_stack)
    self.decoder = Decoder(self.in_channels, number_of_stack)

  def forward(self, x):
    # TODO : Define your forward
    latent_representation = self.encoder(...)
    reconstructed_image = self.decoder(...)
    return reconstructed_image

# TODO : Create a model composed of 3 stacks
model = ...



*   Create your Lightning Module



In [None]:
class AutoEncoderPL(pl.LightningModule):
    def __init__(self, in_channels, number_of_stack):
        super().__init__()
        self.save_hyperparameters()
        # TODO : Define your model here.
        self.model = ...

    def forward(self,x):
        # TODO : Send the input through your model
        return ...

    def configure_optimizers(self):
        # Choose your optimizer 
        optimizer = torch.optim.Adam(self.parameters(), lr=1e-3)
        return optimizer

    def training_step(self, train_batch, batch_idx):
        # Define your Training Step
        # This method is pretty much similar to what your did in the first lab to train your model.
        x,y = train_batch
        x_reconstructed = ...
        loss = ...
        # Don't remove the next line, you will understand why later
        self.log('train_loss', loss)
        return loss

    def validation_step(self, val_batch, batch_idx):
        # Define your Validation Step
        # What is the difference between the Training and the Validation Step ?
        x,y = val_batch
        x_reconstructed = ...
        loss = ...
        self.log('val_loss', loss)
    
    def test_step(self, test_batch, batch_idx):
        # Define your Test Step
        # What is the difference between the Training, Validation and Test Step ?
        x,y = test_batch
        x_reconstructed = ...
        loss = ...
        # Don't remove the next line, you will understand why later
        self.log('test_loss', loss)

    # TODO : Do we have to add some functions (test_epoch_start, test_epoch_end..) to compute a reconstruction metric ?

#### Training

##### Setting TensorBoard up

In [None]:
%reload_ext tensorboard
%tensorboard --logdir "/content/tb_logs/my_model/version_0" # You might have to change the name of the folder, just look what folder were created on your Colab environment

##### Launch the Training

In [None]:
# TODO : 
model = AutoEncoderPL(...,...)
dm = MNISTDataModule(...)
trainer = pl.Trainer(gpus=-1,max_epochs=...)
trainer.fit(model, dm)

#### Testing

Test some images from the testing dataset to verify that your model works.

In [None]:
# TODO : Load your test dataloader from the datamodule

# TODO : Compute your average reconstruction on the test set.


In [None]:
# TODO : Plot some reconstructed images. Are they correct ?


### Going Further on AutoEncoders (Optional)

If you have some time, create other AutoEncoder with differnet latent sizes and train it.
* **What is the impact of the latent space toward the images reconstructions ?**

## Industrial Application of GAN : Creating Google Maps Calque from Google Map Satellite view 

Working on basic datasets is cool but as you are engineers to be, let's take it to industrial level. Given an image of a Google Map Satellite view, we want the model to predict a Calque view from it. This task is an Image translation task.

For example, using the image on the left, we want the model to predict the image on the right.
<img src="https://eu42.github.io/images/gcgan/aerial_image_map_example.png" height = 200>



### Lightning DataModule 



#### Dataset

First, we need to write the Dataset. As usual, a custom Dataset class must implement three functions: 
* __init__
* __len__
* __getitem__

Each image's shape is 1200x600x3. The satellite and the calque has the same size and are exactly half of the image

As the dataset doesn't have a proper test dataset, we will use the validation dataset as the test set. We will further create a validation dataset by taking a portion of the train dataset

In [None]:
class GoogleDataset(Dataset):

  def __init__(self, path, transform = None):
    self.path = path
    self.images = [path + i for i in os.listdir(path)]
    self.transform = transform

  def __getitem__(self,idx):
    image = self.images[idx]
    full_image = cv2.imread(image)
    h,w,c = full_image.shape
    # TODO : Retrieve the correct parts of the image.
    satellite = ...
    calque = ...
    if self.transform : 
      satellite = self.transform(satellite)
      calque = self.transform(calque)
    return {'satellite':satellite,
            'calque':calque}

  def __len__(self):
    return len(self.images)


#### Lightning DataModule

Now, let's prepare our DataModule.

In [None]:
class GoogleMapDataModule(pl.LightningDataModule):

    def __init__(self, batch_size):
        super().__init__()
        # TODO : In your Transformation, transform to Tensor and resize images to 128x128
        self.transform =  transforms.Compose([...,
                                              ...])
        self.batch_size = batch_size

        # we are hardcoding the path are they won't change
        self.train_path = '/content/maps/train/' 
        self.test_path = '/content/maps/val/'

    def prepare_data(self):
        # TODO : load the train and test dataset
        GoogleDataset(..., self.transform)
        GoogleDataset(..., self.transform)


    def setup(self, stage):

        #First stage is 'fit' (or None)
        if stage == "fit" or stage is None:
            # We create a validation split to watch the training.
            google_train = GoogleDataset(self.train_path, self.transform)
            train_size = int(0.7 * len(google_train ))
            test_size = len(google_train ) - train_size
            self.google_train, self.google_valid =  torch.utils.data.random_split(google_train , [train_size, test_size])
        #Second stage is 'test' 
        if stage == "test" or stage is None:
            self.google_test = GoogleDataset(self.test_path, self.transform)

    def train_dataloader(self):
        # TODO : Now create your Training DataLoader
        return ...

    def val_dataloader(self):
        # TODO : Now create your Validation DataLoader
        return ...

    def test_dataloader(self):
        # TODO : Now create your Testing DataLoader
        return ...

### Lightning Module

* **Can we use an AutoEncoder to perform this task ?**


Let's use a much complex model. Generative Adversarial Networks.

#### Pix2Pix : cGAN 

Instead of using a simple GAN, we will use a  conditional GAN (cGAN).

<img src='https://www.researchgate.net/profile/Gerasimos-Spanakis/publication/330474693/figure/fig1/AS:956606955139072@1605084279074/GAN-conditional-GAN-CGAN-and-auxiliary-classifier-GAN-ACGAN-architectures-where-x_Q320.jpg'>

 On the contrary of a normal GAN, cGAN has a condition that will help us CONTROL how the GAN should generate images. The generator will take some "inspiration" from the Condition. For example, if you want to generate digits from the MNIST Dataset, you can add a condition to force the GAN to create a specific Digit. In the MNIST case, the condition would be the class label.

Further details are in the following paper : https://arxiv.org/pdf/1411.1784.pdf



##### The Generator : Generating Images

In general, the Generator is here to generate Data from Noise. For example if we train a GAN on MNIST dataset, the generator will create digits using the noise we give it as input. In this lab, we are dealing with cGANs, so we need a Condition to condition our GAN. 
* **What could be the condition in this task ?** 

To answer that :
* **What are we trying to do ?**
* **What should the Generator do ? Should it recreate something specific ?**



Let's use an AutoEncoder to perform the image translation. 
* **Using previous classes, initialize your Generator using a 3 stack AutoEncoder.**

In [None]:
class Generator(nn.Module):
    # TODO : Create your Generator using the previous AutoEncoder Class
    def __init__(self,...,...,...):
        super().__init__()

    def forward(self,x):


##### The Discriminator : Forcing the Generator to predict better

The generator is able to create Images from Inputs, but we want the best quality possible. We need a model that would indicate to the Generator whether the generated Image are correct or not : the Discriminator.

The Discriminator is here to force the Generator to create better and better images. While the Generator creates bad quality generated image, the Discriminator will give a feedback.

<img src="https://i.imgflip.com/6y0kqe.jpg">

* **What could be the worst case scenario with the Generator and the Discriminator ?**

We provide you the code of a Discriminator that "works well".

In [4]:
class DiscriConv(nn.Module):    
  def __init__(self, in_channels, out_channels, kernel_size):
    super().__init__()
    self.model = nn.Sequential(nn.Conv2d(in_channels = in_channels,
                                  out_channels = out_channels,
                                  kernel_size = kernel_size,
                                  stride = 2,
                                  padding = 1),
                                  nn.BatchNorm2d(out_channels),
                                  nn.LeakyReLU(0.2, inplace=True))
       
  def forward(self,x):
      return self.model(x)

class Discriminator(nn.Module):
  def __init__(self, in_channels, out_channels):
    super(Discriminator, self).__init__()
    self.main = nn.Sequential(DiscriConv(in_channels, 32,3),
                              DiscriConv(32, 64,4),
                              DiscriConv(64, 128,4),
                              # 128x4x4
                              nn.Conv2d(in_channels = 128,
                                        out_channels = out_channels,
                                        kernel_size = 3,
                                        bias=False))

  def forward(self, input):
    return self.main(input)

##### The Loss : Adversarial Battle

<img src="https://miro.medium.com/max/1400/1*d96q6bCKbmZT9Ls7f3X6xg.jpeg">

This formula is the basic loss of the cGAN model. There are two terms, each optimizing specific.
Beautiful formula's no ? Let's understand them like humans.
* What is x, y, z in our Case ? 


If we look at the formulas, there are two losses :
* What kind of losses are they ? Cross Entropies, Distance Losses ?
* Which term is the Discriminator loss ? Generator loss ?

The entire model will converge when the Generator provides the best fakes possible that the Discriminator won't be able to differentiate from the real images.

In [None]:
class GANLoss(nn.Module):
      def __init__(self, real_label=1, fake_label=0):
        super().__init__()
        self.loss = nn.BCEWithLogitsLoss()
        # Initialization a binary label, if real label 1, if fake label 0
        self.real_label = real_label
        self.fake_label = fake_label

      def get_labels(self, predictions, real_or_not):
          # TODO : If we have a real data as input, we want its target label to be self.real_label. If we have fake data as input, we want its target lable to be self.fake_label
          # Fill in the ...
          labels = ... if real_or_not else ...
          return torch.tensor(labels).expand_as(predictions).to(predictions.device)
      
      def forward(self, predictions, real_or_not):
          # TODO : Return the Loss between the models prediction and the labels
          return 

##### The Final Module

Let's encompass everything under our Lightning Module. However, there are many steps to follow. In order to understand the code, read the commented parts.

In [None]:
class GAN(pl.LightningModule):
  def __init__(self ):
    super().__init__()
    # TODO : Initalize your Generator and Discriminator
    self.generator = Generator(in_channels = ...,
                               latent_size = ...,
                               out_channels = ...)
    self.discriminator = Discriminator(in_channels = 2*3,
                                       out_channels = ...)

  def forward(self,satellite):
    img_fake = self.generator(satellite)
    return img_fake

  def training_step(self, train_batch, batch_idx, optimizer_idx):
    # TODO:  Follow the steps
    satellite, calque = train_batch['satellite'], train_batch['calque']
    # Disclaimer : You might have to put some self. ...
    criterion = GANLoss()
    criterionL1 = nn.L1Loss() # We add the L1 Loss for better correspondance between the colors
    if optimizer_idx == 1:
        # Discriminator Training Part
        # Goal : Train the Discriminator to differentiate fake and real data
        # Part 0 : Train on Fake data
        # TODO : Step 1 : Send the Satellite image through the generator to create a a Fake Calque
        fake_calque = self.generator(...)
        # Concatenating the Input Image and the Fake Generated Calque before sending to the Discriminator
        fake_data = torch.cat([satellite, fake_calque], dim = 1)
        # TODO : Step 2 : Send the fake data to the Discriminator
        fake_prediction = self.discriminator(...)
        # Compute Loss
        loss_fake = criterion(real_prediction, real_or_not = False) # In this case, we know that the Image are fake.
        # Part 1 : Train on Real data
        # TODO : Step 1 : Concatenate the Real Calque to the Satellite
        real_data = torch.cat([...,...], dim = 1)
        # TODO : Step 2 : Send the Data through the Discriminator
        real_prediction = self.discriminator(...)
        # TODO : Compute the loss.
        loss_real = criterion(...,real_or_not = ...)
        loss =(loss_real + loss_fake)/2
        self.log('discriminator loss', loss)
        return loss

    if optimizer_idx == 0 :
        # Generator Training Part
        # Goal : Train the Generator to create best Images
        # TODO : Step 1 : Send the Satellite image through the generator to create a a Fake Calque
        fake_calque = self.generator(...)
        fake_data = torch.cat([satellite, fake_calque], dim = 1)
        # TODO : Step 2 : Send the fake data to the Discriminator
        fake_prediction = self.discriminator(...)
        # TODO : Step 3 : Compute the loss, don't forget that we want to create the best images possible, so what must be the value of real_or_not ?
        loss_true = criterion(...,real_or_not = ...)
        loss_l1 = criterionL1(fake_calque, calque)* 100 # Adding the L1 Loss for color matching
        self.log('generator loss', loss_true)

        return loss_true + loss_l1

  def validation_step(self, val_batch, batch_idx):

    satellite, calque = val_batch['satellite'], val_batch['calque']
    criterion = GANLoss()
    criterionL1 = nn.L1Loss()
    # TODO : Send the Satellite Image through the Generator
    fake_calque = self.generator(...)
    # TODO : Compute the Loss between the fake calque and the real calque

    # TODO : Send the fake calque, the satellite to the Discriminator, and compute the loss

    # TODO : Send the real calque, the satellite to the Discriminator,and compute the loss

    # TODO : Don't forget to log the losses

  def test_step(self, test_batch, batch_idx):

    satellite, calque = test_batch['satellite'], test_batch['calque']
    criterion = GANLoss()
    criterionL1 = nn.L1Loss()
    # TODO : Send the Satellite Image through the Generator
    fake_calque = self.generator(...)
    # TODO : Compute the Loss between the fake calque and the real calque

    # TODO : Send the fake calque, the satellite to the Discriminator, and compute the loss

    # TODO : Send the real calque, the satellite to the Discriminator,and compute the loss

    # TODO : Don't forget to log the losses

    
  def configure_optimizers(self):
    # As we are optimizing to model, we will use 2 optimizer as the generator and discriminator 
    # don't have the same architecture. 
    optimizer_generator = torch.optim.Adam(self.generator.parameters(), 
                                           lr=2e-4, 
                                           betas= (0.005, 0.999))
    optimizer_discriminator = torch.optim.Adam(self.discriminator.parameters(),  
                                           lr=2e-4, 
                                           betas= (0.005, 0.999))
    # Lightning will take this list of generator along with the index of each optimizer
    return [optimizer_generator, optimizer_discriminator], []

### Training

Train your model. We suggest you to train the model at least 10 epochs to see 'results'


In [None]:
datamodule = ...
model = ...
trainer = ...

### Testing

Test your model on few images of the test dataset. You can further try the model by cropping Satellite views from Google Maps