## check for GPU's

In [None]:
!nvidia-smi


## Requirements


 We install of pytorch-ignite

In [None]:
 !pip install --pre pytorch-ignite

In [None]:
import torch
import ignite
torch.__version__, ignite.__version__

In [None]:
import random
import torch
import numpy as np
seed = 17
random.seed(seed)
_ = torch.manual_seed(seed)

#Dataset

In [None]:
from google.colab import drive
drive.mount('/content/drive')


In [None]:
from torch.utils.data import Dataset, DataLoader
from PIL import Image

class FilesDataset(Dataset):
    
    def __init__(self, path, extension="*.png"):
        self.path = Path(path)
        assert self.path.exists(), "Path '{}' is not found".format(path)
        self.images = list(self.path.rglob(extension))
        assert len(self.images) > 0, "No images with extension {} found at '{}'".format(extension, path)
        
    def __len__(self):
        return len(self.images)
    
    def __getitem__(self, i):
        return Image.open(self.images[i]).convert('RGB')


class FilesDataset2(Dataset):
    
    def __init__(self, path, extension="*.jpg"):
        self.path = Path(path)
        assert self.path.exists(), "Path '{}' is not found".format(path)
        self.images = list(self.path.rglob(extension))
        assert len(self.images) > 0, "No images with extension {} found at '{}'".format(extension, path)
        
    def __len__(self):
        return len(self.images)
    
    def __getitem__(self, i):
        return Image.open(self.images[i]).convert('RGB')        

In [None]:
from pathlib import Path

root = Path("/content/drive/MyDrive/ML_Datasets/Task_4/testing/dataset/")
test_path=Path('/content/drive/MyDrive/ML_Datasets/Task_4/pizza_test')

assert root.exists(), "Path '{}' is not found".format(root)
#train_A = FilesDataset(root / "synthetic test")
#train_B = FilesDataset2(root / "real test")

train_A = FilesDataset(root)
train_B = FilesDataset(root)
test_A = FilesDataset(root)
test_B = FilesDataset(root)
#test_A = FilesDataset(root / "synthetic test") 
#test_B = FilesDataset2(root / "real test")

Some details on the datasets:

In [None]:
print("Dataset sizes: \ntrain A: {} | B: {}\ntest A: {} | B: {}\n\t".format(len(train_A), len(train_B), len(test_A), len(test_B)))

In [None]:
print("Train random image sizes (A): {}, {}, {}, {}".format(train_A[0].size, train_A[1].size, train_A[10].size, train_A[-1].size))

In [None]:
print("Train random image sizes (B): {}, {}, {}, {}".format(train_B[0].size, train_B[1].size, train_B[10].size, train_B[-1].size))

In [None]:
import matplotlib.pylab as plt
%matplotlib inline

In [None]:
plt.figure(figsize=(10, 5))
plt.subplot(121)
plt.title("Train dataset 'Real Pizza'")
plt.imshow(train_A[0])
plt.subplot(122)
plt.title("Train dataset 'Synthetic Pizza'")
plt.imshow(train_B[0])

In [None]:
plt.figure(figsize=(10, 5))
plt.subplot(121)
plt.title("Test dataset 'Real Pizza'")
plt.imshow(test_A[0])
plt.subplot(122)
plt.title("Test dataset 'Synthetic Pizza'")
plt.imshow(test_B[0])

Here we create a dataset composed of random image pairs of datasets A and B

In [None]:
import random


class Image2ImageDataset(Dataset):
    
    def __init__(self, ds_a, ds_b):
        self.dataset_a = ds_a
        self.dataset_b = ds_b
    
    def __len__(self):
        return max(len(self.dataset_a), len(self.dataset_b))

    def __getitem__(self, i):
        dp_a = self.dataset_a[i % len(self.dataset_a)]
        j = random.randint(0, len(self.dataset_b) - 1)
        dp_b = self.dataset_b[j]
        return {
            'A': dp_a,
            'B': dp_b
        }


class TransformedDataset(Dataset):
        
    def __init__(self, ds, transform):
        self.dataset = ds
        self.transform = transform
        
    def __len__(self):
        return len(self.dataset)
    
    def __getitem__(self, i):
        return {k: self.transform(v) for k, v in self.dataset[i].items()}

In [None]:
train_ab_ds = Image2ImageDataset(train_A, train_B)
test_ab_ds = Image2ImageDataset(test_A, test_B)

In [None]:
test_ab_ds[0]

In [None]:
dp = train_ab_ds[20]

plt.figure(figsize=(10, 5))
plt.subplot(121)
plt.title("Train dataset 'Pizza'")
plt.imshow(dp['A'])
plt.subplot(122)
plt.title("Train dataset 'Synthetic'")
plt.imshow(dp['B'])

In [None]:
dp = test_ab_ds[20]

plt.figure(figsize=(10, 5))
plt.subplot(121)
plt.title("Test dataset 'Real Pizza'")
plt.imshow(dp['A'])
plt.subplot(122)
plt.title("Test dataset 'Synthetic Pizza'")
plt.imshow(dp['B'])

In [None]:
from torchvision.transforms import Compose, ColorJitter, RandomHorizontalFlip, ToTensor, Normalize, RandomCrop,Resize

# here we are resizing to scale images which are not of size 256*256 to accomodate them 
train_transform = Compose([
    Resize(256),                 
    RandomCrop(256),
    RandomHorizontalFlip(),
    ColorJitter(),
    ToTensor(),
    Normalize(mean=(0.5, 0.5, 0.5), std=(0.5, 0.5, 0.5))
])
transformed_train_ab_ds = TransformedDataset(train_ab_ds, transform=train_transform)


batch_size = 1
train_ab_loader = DataLoader(transformed_train_ab_ds, batch_size=batch_size, shuffle=True, drop_last=True, pin_memory=True, num_workers=1)


test_transform = Compose([
    Resize(256),
    RandomCrop(256),
    ToTensor(),
    Normalize(mean=(0.5, 0.5, 0.5), std=(0.5, 0.5, 0.5))
])
transformed_test_ab_ds = TransformedDataset(test_ab_ds, transform=test_transform)
batch_size = 1
test_ab_loader = DataLoader(transformed_test_ab_ds, batch_size=batch_size, shuffle=False, drop_last=False, pin_memory=True, num_workers=1)

In [None]:

import torchvision.utils as vutils

# Plot some training images
real_batch = next(iter(train_ab_loader))

plt.figure(figsize=(16, 16))
plt.axis("off")
plt.title("Training Images from A")
plt.imshow( 
    vutils.make_grid(real_batch['A'][:64], padding=2, normalize=True).cpu().numpy().transpose((1, 2, 0))
)

plt.figure(figsize=(16, 16))
plt.axis("off")
plt.title("Training Images from B")
plt.imshow(
    vutils.make_grid(real_batch['B'][:64], padding=2, normalize=True).cpu().numpy().transpose((1, 2, 0))
)
real_batch = None
torch.cuda.empty_cache()

# Generator and Activations  


In [None]:
try:
  # %tensorflow_version only exists in Colab.
  %tensorflow_version 2.x
except Exception:
  pass

# Load the TensorBoard notebook extension.
%load_ext tensorboard
import tensorflow as tf
from datetime import datetime

In [None]:
def normalize(x):
    vmin = x.min()
    vmax = x.max()
    x.clamp_(min=vmin, max=vmax)
    x.add_(-vmin).div_(vmax - vmin + 1e-5)
    return x

In [None]:
def plot_graphs(c,plot_title, save_file,file_writer):
  plt.figure(figsize=(20,5))
  plt.title(plot_title)
  plt.imshow(c)
  with file_writer.as_default():
      tf.summary.image(plot_title, np.reshape(c,(1,c.shape[0],c.shape[1],1)), step=0)
  plt.savefig(save_file)
  plt.show()
  plt.clf()

In [None]:
import torch
import torch.nn as nn
import os

def get_conv_inorm_relu(in_planes, out_planes, kernel_size, stride, reflection_pad=True, with_relu=True):
    layers = []
    padding = (kernel_size - 1) // 2
    if reflection_pad:
        layers.append(nn.ReflectionPad2d(padding=padding))
        padding = 0
    layers += [
        nn.Conv2d(in_planes, out_planes, kernel_size=kernel_size, stride=stride, padding=padding),
        nn.InstanceNorm2d(out_planes, affine=False, track_running_stats=False),
    ]
    if with_relu:
        layers.append(nn.ReLU(inplace=True))
    return nn.Sequential(*layers)


def get_conv_transposed_inorm_relu(in_planes, out_planes, kernel_size, stride):
    return nn.Sequential(
        nn.ConvTranspose2d(in_planes, out_planes, kernel_size=kernel_size, stride=stride, padding=1, output_padding=1),
        nn.InstanceNorm2d(out_planes, affine=False, track_running_stats=False),
        nn.ReLU(inplace=True)
    )


class ResidualBlock(nn.Module):
    
    def __init__(self, in_planes):
        super(ResidualBlock, self).__init__()
        self.conv1 = get_conv_inorm_relu(in_planes, in_planes, kernel_size=3, stride=1)
        self.conv2 = get_conv_inorm_relu(in_planes, in_planes, kernel_size=3, stride=1, with_relu=False)        

    def forward(self, x):
        residual = x
        x = self.conv1(x)
        x = self.conv2(x)        
        return x + residual


class Generator(nn.Module):
    
    def __init__(self):
        super(Generator, self).__init__()
        
        self.c7s1_64 = get_conv_inorm_relu(3, 64, kernel_size=7, stride=1)
        self.d128 = get_conv_inorm_relu(64, 128, kernel_size=3, stride=2, reflection_pad=False)
        self.d256 = get_conv_inorm_relu(128, 256, kernel_size=3, stride=2, reflection_pad=False)

        self.resnet9 = nn.Sequential(*[ResidualBlock(256) for i in range(9)])

        self.u128 = get_conv_transposed_inorm_relu(256, 128, kernel_size=3, stride=2)
        self.u64 = get_conv_transposed_inorm_relu(128, 64, kernel_size=3, stride=2)
        self.c7s1_3 = get_conv_inorm_relu(64, 3, kernel_size=7, stride=1, with_relu=False)
        # Replace instance norm by tanh activation
        self.c7s1_3[-1] = nn.Tanh()

    def forward(self, x):
        # Encoding
        # 
        x = self.c7s1_64(x)
        x = self.d128(x)
        x = self.d256(x)
        # 9 residual blocks
        x = self.resnet9(x)
        # Decoding
        x = self.u128(x)
        
        x = self.u64(x)
      
        y = self.c7s1_3(x)
        
        return y

    
    def normalize(x):
      vmin = x.min()
      vmax = x.max()
      x.clamp_(min=vmin, max=vmax)
      x.add_(-vmin).div_(vmax - vmin + 1e-5)
      return x

    def showActivations(self,x,epoch,image_number):

        save_files ='/content/drive/MyDrive/ML_Datasets/Task_4/Inference/mdf/new_activations'
        log_path='/content/drive/MyDrive/ML_Datasets/Task_4/Inference/mdf/logs'+'/epoch_'+str(epoch)
        logdir = log_path
        file_writer = tf.summary.create_file_writer(logdir)
        path=str(save_files)+'/epoch_'+str(epoch)
        if not(os.path.isdir(path)):
          os.mkdir(path)
        print('Epoch ',epoch)
        
        #Input Image
        plt.figure(figsize=(20,5))
        plt.title(f"Input Image Epoch: {epoch}")
        plt.imshow(torch.Tensor.cpu(x[0][0]))
        plt.savefig(f'{save_files}/epoch_{epoch}/Input_Image.png')

        plt.show();
        plt.clf()

        #c7s1_64
        out = self.c7s1_64(x)
        outer=(torch.Tensor.cpu(out).detach())
        b=np.array([]).reshape(0,outer.shape[2])
        c=np.array([]).reshape(4*outer.shape[2],0)
         
        # Plotting for layer 1
        i=0
        j=0
        while(i<64):
            
            img=outer[0][i]
            img=normalize(img)
            b=np.concatenate((img,b),axis=0)
            j+=1
            if(j==4):
                c=np.concatenate((c,b),axis=1)
                b=np.array([]).reshape(0,outer.shape[2])
                j=0
                
            i+=1
        if(image_number==0):
          plot_graphs(c, f"c7s1_64 Epoch: {epoch}", f'{save_files}/epoch_{epoch}/c7s1_64_epoch_{epoch}.png',file_writer)
        

        #d128
        out = self.d128(out)
        outer=(torch.Tensor.cpu(out).detach())
        b=np.array([]).reshape(0,outer.shape[2])
        c=np.array([]).reshape(8*outer.shape[2],0)
        
        # Plotting for layer2
        i=0
        j=0
        while(i<128):
            img=outer[0][i]
            img=normalize(img)
            b=np.concatenate((img,b),axis=0)
            j+=1
            if(j==8):
                c=np.concatenate((c,b),axis=1)
                b=np.array([]).reshape(0,outer.shape[2])
                j=0
                
            i+=1


        if(image_number==0):
          plot_graphs(c, f"d128 Epoch: {epoch}", f'{save_files}/epoch_{epoch}/d128_epoch_{epoch}.png',file_writer)
        

        #d256
        out = self.d256(out)
        outer=(torch.Tensor.cpu(out).detach())
        #print('d256', outer.shape)
        b=np.array([]).reshape(0,outer.shape[2])
        c=np.array([]).reshape(8*outer.shape[2],0)
        
        # Plotting for layer3
        j=0
        i=0
        while(i<256):
            img=outer[0][i]
            img=normalize(img)
            b=np.concatenate((img,b),axis=0)
            j+=1
            if(j==8):
                c=np.concatenate((c,b),axis=1)
                b=np.array([]).reshape(0,outer.shape[2])
                j=0
                
            i+=1
        
        if(image_number==0):
          plot_graphs(c, f"d256 Epoch: {epoch}", f'{save_files}/epoch_{epoch}/d256_epoch_{epoch}.png',file_writer)
        

        #resnet
        out = self.resnet9(out)
        outer=(torch.Tensor.cpu(out).detach())
        #print('resnet', outer.shape)
      
        # Plotting for resnet
        j=0
        i=0
        
        b=np.array([]).reshape(0,outer.shape[2])
        c=np.array([]).reshape(8*outer.shape[2],0)
          
        while(i<256):
            img=outer[0][i]
            img=normalize(img)
            b=np.concatenate((img,b),axis=0)
            j+=1
            if(j==8):
                c=np.concatenate((c,b),axis=1)
                b=np.array([]).reshape(0,outer.shape[2])
                j=0
                
            i+=1
            
        if(image_number==0):
          plot_graphs(c, f"resnet Epoch: {epoch}", f'{save_files}/epoch_{epoch}/resnet_epoch_{epoch}.png',file_writer)
        
        return c

#### generator check

In [None]:
x = torch.rand(4, 3, 256, 256)
g = Generator()
#y = g(x)




In [None]:
c=g.showActivations(x,200000,0)

# Discriminator

In [None]:
def get_conv_inorm_lrelu(in_planes, out_planes, stride=2, negative_slope=0.2):
    return nn.Sequential(
        nn.Conv2d(in_planes, out_planes, kernel_size=4, stride=stride, padding=1),
        nn.InstanceNorm2d(out_planes, affine=False, track_running_stats=False),
        nn.LeakyReLU(negative_slope=negative_slope, inplace=True)
    )


class Discriminator(nn.Module):

    def __init__(self):
        super(Discriminator, self).__init__()
        self.c64 = nn.Conv2d(3, 64, kernel_size=4, stride=2, padding=1)
        self.relu = nn.LeakyReLU(0.2, inplace=True)
        self.c128 = get_conv_inorm_lrelu(64, 128)
        self.c256 = get_conv_inorm_lrelu(128, 256)
        self.c512 = get_conv_inorm_lrelu(256, 512, stride=1)
        self.last_conv = nn.Conv2d(512, 1, kernel_size=4, stride=1, padding=1)

    def forward(self, x):
        x = self.c64(x)
        x = self.relu(x)

        x = self.c128(x)
        x = self.c256(x)
        x = self.c512(x)
        y = self.last_conv(x)
        return y


### Sanity check the Discriminator network:

In [None]:
x = torch.rand(4, 3, 256, 256)
d = Discriminator()
y = d(x)
y.shape

#Model Definition

In [None]:
def init_weights(module):
    assert isinstance(module, nn.Module)
    if hasattr(module, "weight") and module.weight is not None:
        torch.nn.init.normal_(module.weight, mean=0.0, std=0.02)
    if hasattr(module, "bias") and module.bias is not None:
        torch.nn.init.constant_(module.bias, 0.0)
    for c in module.children():
        init_weights(c)

In [None]:
g = None; d = None

In [None]:
assert torch.backends.cudnn.enabled
torch.backends.cudnn.benchmark = True

In [None]:
# device = "cuda"
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")


generator_A2B = Generator().to(device)
init_weights(generator_A2B)

discriminator_B = Discriminator().to(device)
init_weights(discriminator_B)

generator_B2A = Generator().to(device)
init_weights(generator_B2A)
discriminator_A = Discriminator().to(device)
init_weights(discriminator_A)

## Train the networks with a learning rate of `0.0002`.

In [None]:
from itertools import chain
import torch.optim as optim

lr = 0.0002
beta1 = 0.5

optimizer_G = optim.Adam(chain(generator_A2B.parameters(), generator_B2A.parameters()), lr=lr, betas=(beta1, 0.999))
optimizer_D = optim.Adam(chain(discriminator_A.parameters(), discriminator_B.parameters()), lr=lr, betas=(beta1, 0.999))

# Inference with trained generator

In [None]:
import os
d = "/content/drive/MyDrive/ML_Datasets/Task_4/Inference/mse/models"
epochs=[]
for path in os.listdir(d):
  epochs.append(int(path[path.rindex('_')+1:path.index('.')]))
epochs.sort()

In [None]:

for epoch in epochs:
  total_c=np.array([]).reshape(0, 512*2048)
  total_image=np.array([]).reshape(0, 256*256*3)
  file='epoch__checkpoint_'+str(epoch)+'.pt'
  full_path = os.path.join(d, file)
  counter = int(epoch/2664*2)
  #Load Model
  checkpoint_state_dict = torch.load(full_path, map_location = torch.device('cuda:0'))
  generator_A2B.load_state_dict(checkpoint_state_dict["generator_A2B"])
    
  for i in range(100):
    img = test_ab_ds[i]['A']
    x = test_transform(img)
    total_image=np.concatenate((total_image,np.reshape(torch.Tensor.cpu(x),(1,3*256*256))),axis=0) 
    
    x = x.unsqueeze(0).to(device)
    
    # genrate the fake image 
    with torch.no_grad():
      g = generator_A2B
      y_pred = g(x)
      c=g.showActivations(x,counter,i)
      total_c=np.concatenate((total_c,np.reshape(c,(1,2048*512))), axis=0)


    
    # nomralize image 
    img_pred = (255 * normalize(y_pred[0, ...])).cpu().numpy().transpose((1, 2, 0)).astype('uint8')

    
    plt.figure(figsize=(20, 5))
    plt.title(f"Generated Image epoch{counter}")
    plt.imshow(img_pred)
    plt.savefig(f'{save_files}/epoch_{counter}/Ouput_epoch_{counter}.png')
    plt.show()
    plt.close()


# t-SNE

In [None]:
import os 
import math
import pandas as pd
import numpy as np 
from sklearn.manifold import TSNE

In [None]:
pip install --upgrade Pillow

In [None]:
# Enter code here
import matplotlib.pyplot as plt
def tsne_results(features, new_features, perplexity=10, n_components=2, n_iter=100, n_classes=10, color_palette = "hls"):
    # Enter code here
    # tSNE applied to features with target.
    tsne_input = TSNE(n_components = n_components, perplexity = perplexity, n_iter = n_iter).fit_transform(features)
    tsne_latent= TSNE(n_components = n_components, perplexity = perplexity, n_iter = n_iter).fit_transform(new_features)



    plt.scatter(tsne_input[:,0], tsne_input[:,1], label='Input')
    plt.scatter(tsne_latent[:,0], tsne_latent[:,1], label='Latent Space')
    plt.legend()
    plt.show()
    return tsne_em
res = tsne_results(total_image,total_c, perplexity=300, n_components=2, n_iter=250, n_classes=10, color_palette = "hls")
