## 🪄 Install `wandb` library and login


Start by installing the library and logging in to your free account.



In [1]:
!pip install wandb 

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [2]:
# # Log in to your W&B account
import wandb
wandb.login()


[34m[1mwandb[0m: Currently logged in as: [33mtamuz1212[0m ([33mnn-for-images[0m). Use [1m`wandb login --relogin`[0m to force relogin


True

In [3]:
#@title Drive Mount
from google.colab import drive
drive.mount('/content/gdrive/', force_remount=True)

Mounted at /content/gdrive/


In [4]:
#@title Imports
import torch
import torch.optim as optim
import random
import pdb
import time
import torch.nn as nn
import torch.nn.functional as F
from keras.datasets import mnist
from torchvision import datasets, transforms
import torch.nn.init as init
import numpy as np
import matplotlib.pyplot as plt
import torchvision
# new 
from sklearn.model_selection import train_test_split
from PIL import Image
import os
import torchvision.transforms.functional as TF



In [5]:
#@title Initialize DCGAN Parameters

# Model parameters (feature map refers to the output of a specific layer in the network)
latent_dim = 100
out_channel = 1
random.seed(999)
torch.manual_seed(999)

# Training parameters
batch_size = 64
num_epochs = 15
inversion_num_epochs = 15

lr = 0.0002
betas = (0.5, 0.999)
num_of_updates = 5

# TO use GPU
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
fixed_noise = torch.randn(16, latent_dim, device=device) 


In [6]:
#@title Load the MNIST dataset


# transformer that  converters the data to PyTorch tensors and normalize the data
transform = transforms.Compose([
    transforms.ToTensor()])
#   transforms.Normalize((0.5, ), (0.5,))
   
# Download and load the training data
train_set = datasets.MNIST('~/.pytorch/MNIST_data/', download=True, train=True, transform=transform)
train_loader = torch.utils.data.DataLoader(train_set, batch_size=batch_size, shuffle=True)
train_loader_1_batch_size = torch.utils.data.DataLoader(train_set, batch_size=1, shuffle=True)

# Download and load the test data
test_set = datasets.MNIST('~/.pytorch/MNIST_data/', download=True, train=False, transform=transform)
test_loader = torch.utils.data.DataLoader(test_set, batch_size=batch_size, shuffle=True)




In [7]:
#@title Define Generator and Descriminator



# took from DCGAN tutorial for weights inition
def weights_init(m):
    classname = m.__class__.__name__
    if classname.find('Conv') != -1:
        nn.init.normal_(m.weight.data, 0.0, 0.02)
    elif classname.find('BatchNorm') != -1:
        nn.init.normal_(m.weight.data, 1.0, 0.02)
        nn.init.constant_(m.bias.data, 0)

# Generator - create new data that resembles the real data
class Generator(nn.Module):
    def __init__(self, latent_dim ,out_channel):
        super(Generator, self).__init__()

        self.latent_dim = latent_dim

        # Define Generators layers seperetly to print dimensions easily
        self.linear_layer = nn.Sequential(
            nn.Linear(latent_dim,128),
            nn.Linear(128,128*3*3), 
            nn.ReLU(0.2))
        
        self.reshape_linear_layer = nn.Unflatten(1, (128,3,3)) #unflattern

        self.first_cnn_layer = nn.Sequential(
            nn.ConvTranspose2d(in_channels=128, out_channels=64, kernel_size=3, stride=2),
            nn.BatchNorm2d(64),
            nn.LeakyReLU(True))

        self.second_cnn_layer = nn.Sequential(
            nn.ConvTranspose2d(in_channels=64, out_channels=32, kernel_size=3, stride=2, padding =1, output_padding =1),
            nn.BatchNorm2d(32),
            nn.LeakyReLU(True))
        
        self.third_cnn_layer = nn.Sequential(
            nn.ConvTranspose2d(in_channels=32, out_channels=out_channel, kernel_size=3, stride=2, padding =1, output_padding =1),
            nn.Sigmoid()
        ) 
        
        
        # Init models weights 
        self.apply(weights_init)
     
    def forward(self, x):
          # print("G")
          # print(x.shape)
          x =  self.linear_layer(x) # [256, 100] -> [256, 128*3*3]
          # print(x.shape)
          x = self.reshape_linear_layer(x) #[256, 128*3*3] -> [256, 128, 3, 3]
          # print(x.shape)
          x =  self.first_cnn_layer(x) # [256, 128, 3, 3] -> [256, 64, 7, 7]
          # print(x.shape)
          x =  self.second_cnn_layer(x) # [256, 64, 7, 7] -> [256, 32, 14, 14]
          # print(x.shape)
          x =  self.third_cnn_layer(x) # [256, 32, 14, 14] -> [256, 1, 28, 28] in range [0,1]
          # print(x.shape)
          return x
    




class Discriminator(nn.Module):
    def __init__(self):
        super(Discriminator, self).__init__()

        # Define Discriminator layers seperetly to print dimensions easily

        self.first_cnn_layer = nn.Sequential(
            # Layer1 - Conv2d, (1,28,28) -> (32,14,14)
            nn.Conv2d(1, 32, kernel_size=3, stride=2, padding=1),  # N, 64, 14, 14
            nn.BatchNorm2d(32),
            nn.LeakyReLU())
        self.second_cnn_layer = nn.Sequential(
            # Layer2 - Conv2d, (32,14,14) -> (64,7,7)
            nn.Conv2d(32, 64, kernel_size=3, stride=2, padding=1),  # N, 128, 7, 7
            nn.BatchNorm2d(64),
            nn.LeakyReLU())
        self.third_cnn_layer = nn.Sequential(
            # Layer3 - Conv2d, (64,7,7) -> (128,4,4)
            nn.Conv2d(64, 128, kernel_size=3, stride=2, padding=0),  # N, 128, 7, 7
            nn.BatchNorm2d(128),
            nn.LeakyReLU())
        self.flat_and_linear_layer = nn.Sequential(
            # Layer3 - fully connected, (128,7,7) 
            nn.Flatten(), # -> (128*3*3)
            nn.Linear(128 * 3 * 3, 1024),   # (128*3*3) -> (1024)
            nn.BatchNorm1d(1024),
            nn.LeakyReLU(),
            # Layer4 - fully connected, (1024) -> (1)
            nn.Linear(1024, 1),
            nn.Sigmoid()) # remove sigmoid when using MSELoss

        # Init models weights 
        self.apply(weights_init)


    def forward(self, x):
        # print("D")
        # print(x.shape)
        x = self.first_cnn_layer(x) # [256, 1, 28, 28] -> [256, 32, 14, 14]
        # print(x.shape)
        x = self.second_cnn_layer(x) # [256, 32, 14, 14] -> [256, 64, 7, 7]
        # print(x.shape)
        x = self.third_cnn_layer(x) # [256, 64, 7, 7] -> [256, 128, 3, 3]
        # print(x.shape)
        x = self.flat_and_linear_layer(x) # [256, 128, 3, 3] -> [256,1] in range [0,1]
        # print(x.shape)
        return x


     

In [32]:


def plot_generated_img(G,fixed_noise):
    with torch.no_grad():
        # Reconstruct images from the fixed_noise
        output = G(fixed_noise).cpu().detach()
        img_grid = torchvision.utils.make_grid(output, nrow=4).numpy()
        fig, ax = plt.subplots(figsize=(10, 10))
        plt.imshow(np.transpose(img_grid, (1, 2, 0)))
        plt.axis('off')
        plt.show()



def plot_image(image):
    image = image.detach().cpu().numpy()
    plt.imshow(image, cmap='gray')
    plt.axis('off')
    plt.show()

def plot_two_images(image1, image2):
    image1 = image1.detach().cpu().numpy()
    image2 = image2.detach().cpu().numpy()
    fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(10, 5))
    ax1.imshow(image1, cmap='gray')
    ax1.set_title("G(Z)")
    ax1.axis("off")
    ax2.imshow(image2, cmap='gray')
    ax2.set_title("Real Image")
    ax2.axis("off")
    plt.show()

# TypeError: can't convert cuda:0 device type tensor to numpy. Use Tensor.cpu() to copy the tensor to host memory first.

In [9]:
#@title Train the network & Loss function
# Init models
G = Generator(latent_dim,out_channel).to(device)
D = Discriminator().to(device)

# Init loss function and Optimizer
criterion = nn.BCELoss()
# criterion = nn.MSELoss() # L2 loss - least squares loss 
G_optimizer = torch.optim.Adam(G.parameters(), lr=lr, betas=betas)
D_optimizer = torch.optim.Adam(D.parameters(), lr=lr, betas=betas)

   
def trainDCGAN(saturation):

    # Start a new run to track this script
    wandb.init(
    # Set the project where this run will be logged
    project="ex3-nn", 
    # We pass a run name (otherwise it’ll be randomly assigned, like sunshine-lollypop-10)
    name=f"epochs: {num_epochs}, batch size: {batch_size}, lr: {lr}")

    # start timing
    start_time = time.time()

    # train model
    for epoch in range(num_epochs):
        print(f"##### Epoch {epoch} #####")
         # Enter Traininig mode
        G.train()
        D.train()

        d_losses = []
        g_losses = []

        # Train
        for data in train_loader:
            real_images =  data[0].to(device)
            
            images_size = len(real_images)
            # Init real\fake predictions
            real_pred = torch.ones(images_size).to(device) # predict that the images are real , shape of [batch_size]
            fake_pred = torch.zeros(images_size).to(device) # predict that the images are generate by G, shape of [batch_size]

            ##### Descriminator - multiple optimizations (maximize log(D(x)) + log(1 - D(G(z)))) #####
            noise = torch.randn(images_size, latent_dim, device=device) 
            fake_images = G(noise)
   
            
            # calc and save D loss - combination of real\fake losses 
            real_loss = criterion(D(real_images).view(-1), real_pred)
            fake_loss = criterion(D(fake_images.detach()).view(-1), fake_pred) # detach fake images

            d_loss = (real_loss + fake_loss) 

            # zero the parameter gradients
            D_optimizer.zero_grad()
            d_loss.backward()
            # update optimizer
            D_optimizer.step()


            ##### Generator - single optimization (maximize log(D(G(z)))) #####
            # for i in range(num_of_updates):
            output = D(fake_images).view(-1)
            
            # calc G loss
            if saturation:
                g_loss = -criterion(output, fake_pred)
            else: 
                g_loss = criterion(output, real_pred) # none saturation - better results

              # zero the parameter gradients
            G_optimizer.zero_grad()  
            g_loss.backward(retain_graph=True) #retain_graph=True
            G_optimizer.step()
            
            # save losses
            g_losses.append( g_loss.item())
            d_losses.append(d_loss.item())
            if saturation:
                wandb.log({"saturation Descriminator Loss": d_loss.item(),"saturation Generator Loss":  g_loss.item()})
            else:
                wandb.log({"Descriminator Loss": d_loss.item(),"Generator Loss":  g_loss.item()})

        # plot_generated_img(G,fixed_noise) # Plot generated images with the same noise to check progress

    # calculate and print total time of training
    end_time = time.time()
    total_time = str(end_time - start_time)
    print('Finished Training, it took:' + total_time + ' seconds')

trainDCGAN(saturation=False)

##### Epoch 0 #####
##### Epoch 1 #####
##### Epoch 2 #####
##### Epoch 3 #####
##### Epoch 4 #####
##### Epoch 5 #####
##### Epoch 6 #####
##### Epoch 7 #####
##### Epoch 8 #####
##### Epoch 9 #####
##### Epoch 10 #####
##### Epoch 11 #####
##### Epoch 12 #####
##### Epoch 13 #####
##### Epoch 14 #####
Finished Training, it took:238.15997314453125 seconds


In [10]:
# wandb.finish()

In [None]:
#@title Q2 - Model Inversion
   
def model_inversion(image):

    # start timing
    start_time = time.time()

    # Init loss function and Optimizer and z,  Remember - G is already pre-trained
    z = torch.randn(1, latent_dim, device=device, requires_grad=True) 
    z_optimizer = torch.optim.RMSprop([z], lr=lr) # performce better than Adam Optimizer in this case
    criterion = nn.MSELoss()

    # Enter evaluation mode with G
    G.eval()

    for epoch in range(30001):
        # if epoch % 100 == 0:
        #     print(f"##### Epoch {epoch} #####")
       
        # reconstruct the images z
        output = G(z).squeeze()
    
        # calculate loss
        loss = criterion(output, image) # compare between G(z) to the real images

        # zero the parameter gradients
        z_optimizer.zero_grad()
        loss.backward() # calc gradients
        # update optimizer
        z_optimizer.step()

        if epoch % 10000 == 0:
            print(f"##### Epoch {epoch} #####")            
            plot_two_images(output,image) # Plot generated images with the same noise to check progress

    # calculate and print total time of training
    end_time = time.time()
    total_time = str(end_time - start_time)
    print('Finished Training, it took:' + total_time + ' seconds')

image = next(iter(test_loader))[0][0].squeeze().to(device)
model_inversion(image)

In [12]:
#@ helpers for Q3
def add_gaussian_noise(image, mean=0, stddev=0.1):
    device = image.device
    noise = (torch.randn(image.size(), device=device) * stddev + mean)
    noisy_image = torch.clamp(image + noise, 0, 1) # Normalize the noise samples to the range [0, 1]
    return noisy_image

def inpaint_image(image):
    window_size = (8,8)
    h, w = image.shape
    inpainted_image = image.clone()

    # calc maximum starting positions of the inpaiting window and randomly select starting position 
    max_h = h - window_size[0]
    max_w = w - window_size[1]
    start_h = random.randint(0, max_h)
    start_w = random.randint(0, max_w)

    # inpaint window region with black color
    inpainted_image[..., start_h:start_h+window_size[0], start_w:start_w+window_size[1]] = 0


    return inpainted_image


In [None]:
#@title Q3 - Image restoration
   
def image_restoration(image, denoising):
    image = image.squeeze()

    # start timing
    start_time = time.time()
    if denoising:
        blurred_image = add_gaussian_noise(image).squeeze()
        criterion = nn.MSELoss()
        plot_image(blurred_image)

    else:
        # image = image.squeeze()
        blurred_image = inpaint_image(image)
        criterion = nn.L1Loss()


    # Init and Optimizer and z,  Remember - G is already pre-trained
    z = torch.randn(1, latent_dim, device=device, requires_grad=True)
    z_optimizer = torch.optim.Adam([z], lr=lr,betas=betas) # performce better than Adam Optimizer in this case

    # Enter evaluation mode with G
    G.eval()

    for epoch in range(200001):
        output = G(z).squeeze()
        if denoising:
            blurred_output = add_gaussian_noise(output).squeeze()
        else:
            blurred_output = inpaint_image(output)

        loss = criterion(blurred_output, blurred_image) # compare between G(z) to the real images
        # zero the parameter gradients
        z_optimizer.zero_grad()
        loss.backward() # calc gradients
        # update optimizer
        z_optimizer.step()

        if epoch % 25000 == 0:
            print(f"##### Epoch {epoch} #####")
            if denoising:  
                plot_two_images(output,image) # Plot generated images with the same noise to check progress
            else:
                plot_two_images(output,blurred_image) # Plot generated images and inpainting  to check progress

    # calculate and print total time of training
    end_time = time.time()
    total_time = str(end_time - start_time)
    print('Finished Training, it took:' + total_time + ' seconds')

    
image = (next(iter(test_loader))[0][0]).to(device)
plot_image(image.squeeze())
image_restoration(image,denoising=False)
