In [None]:
import time
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision as tv
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms, datasets
from torch.autograd import Variable
# from torchsummary import summary
from numpy import genfromtxt
import matplotlib.pyplot as plt
from sklearn.datasets import make_circles
from sklearn.model_selection import train_test_split
import os
from datetime import datetime
import matplotlib.image as mpimg
import matplotlib.pyplot as plt
from torch.utils.data import Dataset, TensorDataset

In [None]:
#Load the images into numpy array
def load_images(folder):
    images_list = []
    for filename in os.listdir(folder):
        img = mpimg.imread(os.path.join(folder, filename))
        if img is not None:
            images_list.append(img)
    
    return np.array(images_list)
images_list = load_images(r'data\anime_faces_data')
print(images_list.shape)

In [None]:
my_images = images_list
tensor_images = torch.Tensor(my_images)
# we are turning our data to tensors to create the pytorch dataset
my_dataset = TensorDataset(tensor_images.permute(0, 3, 1, 2),torch.tensor(np.ones(len(images_list))))
my_dataloader = DataLoader(my_dataset)

In [None]:
#Checking whether cuda is properly existed within the device
dev = 'cuda' if torch.cuda.is_available() == True else 'cpu'
if torch.cuda.is_available() == True:
    print("ok")
device =  torch.device(dev)


In [None]:
# repeatedly reduce the size an ecode it
class Encoder(nn.Module):
    def __init__(self):
        super().__init__()        
        self.encoder = nn.Sequential(
            nn.Conv2d(3, 64, 4, stride=1, padding=2), # -> N, 16, 14, 14
            nn.BatchNorm2d(64),
            nn.LeakyReLU(0.05, inplace=True),
            nn.Conv2d(64, 128, 4, stride=1, padding=2), # -> N, 32, 7, 7
            nn.BatchNorm2d(128),
            nn.LeakyReLU(0.05, inplace=True),
            nn.Conv2d(128, 256, 3), # -> N, 64, 1, 1
            nn.BatchNorm2d(256),
            nn.Flatten(),
            nn.Linear(1048576,128)
        )

    def forward(self, x):
        encoded = self.encoder(x)
        # print("nooluyo olumencoder")
        return encoded
    
# Input [-1, +1] -> use nn.Tanh

In [None]:
class Unflatten(nn.Module):
    def __init__(self, shape):
        super(Unflatten, self).__init__()
        self.shape = shape
        
    def forward(self, input):
        return input.view(len(input), self.shape[0], self.shape[1], self.shape[2])
# repeatedly increases the size of the encoded image and tries to recontstruct it properly
class Decoder(nn.Module):
    def __init__(self):
        super(Decoder, self).__init__()
        self.net = nn.Sequential(
            nn.Linear(in_features=128, out_features=16*16*256*8),
            nn.BatchNorm1d(num_features=16*16*256*8),
            nn.LeakyReLU(0.05, inplace=True),

            #we have used this solution, otherwise we couldn't jump to the next step.
            Unflatten((128, 64, 64)),
            
            nn.ConvTranspose2d(128, 64, 4, stride=1,padding=2),
            nn.BatchNorm2d(64),
            nn.LeakyReLU(0.05, inplace=True),
            
            
            nn.ConvTranspose2d(64, 32, 4, stride=1, padding=2),
            nn.BatchNorm2d(32),
            nn.LeakyReLU(0.05, inplace=True),
            
            
            nn.Conv2d(32, 3, 3, stride=1,padding=2),
            nn.Tanh()
        )
    def forward(self, input):
        # print("nooluyo olumdecoder")
        return self.net(input)


    

In [None]:
# checking the encoded images
class Discriminator(nn.Module):
    def __init__(self):
        super(Discriminator, self).__init__()
        self.net = nn.Sequential(
            nn.Linear(128, 256),
            nn.LeakyReLU(0.1, inplace=True),
            
            nn.Linear(256, 512),
            nn.LeakyReLU(0.1, inplace=True),
            
            nn.Linear(512, 1),
            nn.Sigmoid()
        )
    def forward(self, input):
        # print("nooluyo olumdiscriminator")
        return self.net(input)

In [None]:
# sending the network classes to GPU and assigning them to variables. 
encoder = Encoder().to(device)
decoder = Decoder().to(device)
discriminator = Discriminator().to(device)

In [None]:
# adjusting learning rates and assigning these learning rates to the classes.
encoder_optimization = torch.optim.Adam(encoder.parameters(), lr=3e-4)
decoder_optimization = torch.optim.Adam(decoder.parameters(), lr=3e-4)
discriminator_optimization = torch.optim.Adam(discriminator.parameters(), lr=8e-4)

In [None]:


batch_size = 128
# image_size = 32
# dataroot = '/raid/artem/tmp/celeba'
# transform = tv.transforms.Compose([tv.transforms.Resize(image_size),
#                                 tv.transforms.CenterCrop(image_size),
#                                 tv.transforms.ToTensor(),
#                                 tv.transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])
# dataset = tv.datasets.ImageFolder(root=dataroot, transform = transform)
dataloader = torch.utils.data.DataLoader(my_dataset, batch_size=batch_size,
                                         shuffle=True, num_workers=4, drop_last=True)
print(len(dataloader))



In [None]:


criterion_gan = nn.BCELoss()
criterion_rec = nn.MSELoss(reduction='mean')
criterion_l1 = nn.L1Loss()




In [None]:
discriminator = torch.load('data\pretrained\AAEdiscriminatorlast')
encoder = torch.load('data\pretrained\AAEencoderlast')
decoder = torch.load('data\pretrained\AAEdecoderlast')

In [None]:
encoder_losses = []
decoder_losses = []
discriminator_losses = []

In [None]:
raw_images_batch_list=[]
reconstructed_images_batch_list=[]

In [None]:

counter = 0
epochs = 1
for epoch in range(1, epochs + 1):
    start = time.time()
    for index, (data, _) in enumerate(dataloader):
        counter = counter + 1
        batch_size = data.shape[0]
        
        image = Variable(data).to(device)
        encoded_image = encoder(image)
        
        # x=image
        # z = encoded_image
        # z_real = encoded_image_real
        # y_real = ones_dis
        # y_fake = zeros_dis
        
        encoded_image_real = Variable(torch.randn(batch_size, 128).to(device))
        ones_dis = Variable(torch.ones(batch_size).to(device))
        discriminator_real_loss = criterion_gan(discriminator(encoded_image_real).view(-1), ones_dis)
        
        zeros_dis = Variable(torch.zeros(batch_size).to(device))
        discriminator_fake_loss = criterion_gan(discriminator(encoded_image).view(-1), zeros_dis)
        
        discriminator.zero_grad()
        discriminator_loss = discriminator_fake_loss + discriminator_real_loss
        discriminator_loss.backward(retain_graph=True)
        discriminator_optimization.step()
        discriminator_losses.append(discriminator_loss.item())
        
        zeros_dis = Variable(torch.ones(batch_size).to(device))
        encoder_gan_loss = criterion_gan(discriminator(encoded_image).view(-1), ones_dis)
        
        encoder_loss = encoder_gan_loss
        
        encoder.zero_grad()
        encoder_loss.backward(retain_graph=True)
        encoder_optimization.step()
        encoder_losses.append(encoder_loss.item())
        
        reconstructed_image = decoder(encoded_image)
        if counter > 165:
            raw_images_batch_list.append(data)
            reconstructed_images_batch_list.append(reconstructed_image)
        decoder_reconstructed_loss = criterion_l1(image, reconstructed_image)
       
        decoder_loss = decoder_reconstructed_loss
        decoder.zero_grad()
        decoder_loss.backward()
        decoder_optimization.step()
        decoder_losses.append(decoder_loss.item())
        
        print(counter)
        

In [None]:
# printing raw image grid
index, (data, _) = next(enumerate(dataloader))
raw_images = Variable(data).to(device)
plt.figure(figsize=(20,20))
plt.imshow(tv.utils.make_grid(raw_images.detach().cpu(), nrow=16, normalize=True).permute(1, 2, 0))
plt.show()

In [None]:
# printing reconstructed image grid
raw_images_encoded = encoder(raw_images)
reconstructed_images = decoder(raw_images_encoded)
plt.figure(figsize=(20,20))
plt.imshow(tv.utils.make_grid(reconstructed_images.detach().cpu(),  nrow=16, normalize=True).permute(1, 2, 0))
plt.show()

In [None]:
# printing reconstructed noise grid 
noise = Variable(torch.randn(batch_size, 128).to(device))
reconstructed_1 = decoder(noise)
plt.figure(figsize=(20,20))
plt.imshow(tv.utils.make_grid(reconstructed_1.detach().cpu(),  nrow=16, normalize=True).permute(1, 2, 0))
plt.show()

In [None]:
import io

In [None]:
# saving reconstructed images tensor to a file
torch.save(reconstructed_images_batch_list[2], 'AAEreconstructed_images_tensorlast.t')
# Save to io.BytesIO buffer
buffer = io.BytesIO()
torch.save(reconstructed_images_batch_list[2], buffer)

In [None]:
# saving raw images tensor to a file
torch.save(raw_images_batch_list[2], 'AAEraw_images_tensorlast.t')
# Save to io.BytesIO buffer
buffer = io.BytesIO()
torch.save(raw_images_batch_list[2], buffer)

In [None]:
#Preperation of the Inception Model for both score calculations
import h5py 
from keras.applications.inception_v3 import InceptionV3
from keras.applications.inception_v3 import preprocess_input

InceptionModel = InceptionV3(include_top=False, pooling='avg', input_shape=(128,128,3))


In [None]:
#Required libraries for the score calculations
from numpy import cov
from numpy import trace
from numpy import iscomplexobj
from numpy import asarray
from numpy.random import randint
from scipy.linalg import sqrtm

from math import floor
from numpy import ones
from numpy import expand_dims
from numpy import log
from numpy import mean
from numpy import std
from numpy import exp

In [None]:
import tensorflow as tf
import cv2

In [None]:
# loading saved tensors
tf_list_raw_Last_start = torch.load('AAEraw_images_tensorstart.t')
tf_list_generated_last_start = torch.load('AAEreconstructed_images_tensorstart.t')
# we are loading the tensors that we saved to compare both the tensors we have saved before and the tensors we have saved while training the network
# in this file.


In [None]:
#turnin the 1 epoch model's batch to tensoflow to use it inside FID and IS
tf_list_raw_list_last = []
tf_list_reconstructed_list_last = []

for i in range(128):
    img_upsample = cv2.resize(reconstructed_images_batch_list[2][i].permute(1,2,0).detach().cpu().numpy(), dsize=(128, 128), interpolation=cv2.INTER_NEAREST)
    # print(img_upsample.shape)
    img_tf_v1 = tf.convert_to_tensor(img_upsample)
    img_tf_v2 = tf.convert_to_tensor(img_tf_v1)
    tf_list_reconstructed_list_last.append(img_tf_v2)
torhc_list_tensor_generated_start = tf.convert_to_tensor(tf_list_reconstructed_list_last)

for i in range(128):
    img_upsample = cv2.resize(raw_images_batch_list[2][i].permute(1,2,0).detach().cpu().numpy(), dsize=(128, 128), interpolation=cv2.INTER_NEAREST)
    img_tf_v1 = tf.convert_to_tensor(img_upsample)
    img_tf_v2 = tf.convert_to_tensor(img_tf_v1)
    tf_list_raw_list_last.append(img_tf_v2)
torhc_list_tensor_raw_start = tf.convert_to_tensor(tf_list_raw_list_last)
# print(len(tf_list_1[0][0][0]))
print(torhc_list_tensor_generated_last.shape)
print(torhc_list_tensor_raw_last.shape)

In [None]:
#turnin the 10 epoch model's batch to tensoflow to use it inside FID and IS
tf_list_raw_list_start = []
tf_list_reconstructed_list_start = []

for i in range(128):
    img_upsample = cv2.resize(tf_list_generated_last_start[i].permute(1,2,0).detach().cpu().numpy(), dsize=(128, 128), interpolation=cv2.INTER_NEAREST)
    # print(img_upsample.shape)
    img_tf_v1 = tf.convert_to_tensor(img_upsample)
    img_tf_v2 = tf.convert_to_tensor(img_tf_v1)
    tf_list_reconstructed_list_start.append(img_tf_v2)
torhc_list_tensor_generated_last = tf.convert_to_tensor(tf_list_reconstructed_list_start)

for i in range(128):
    img_upsample = cv2.resize(tf_list_raw_Last_start[i].permute(1,2,0).detach().cpu().numpy(), dsize=(128, 128), interpolation=cv2.INTER_NEAREST)
    img_tf_v1 = tf.convert_to_tensor(img_upsample)
    img_tf_v2 = tf.convert_to_tensor(img_tf_v1)
    tf_list_raw_list_start.append(img_tf_v2)
torhc_list_tensor_raw_last = tf.convert_to_tensor(tf_list_raw_list_start)
# print(len(tf_list_1[0][0][0]))
print(torhc_list_tensor_generated_last.shape)
print(torhc_list_tensor_raw_last.shape)

In [None]:
#The inception score is calculated by first using a pre-trained Inception v3 model to predict the class probabilities for each real and generated image.
def InceptionScoreCalculation(images, number_of_splits):
    #Predict the class probabilites for the images from the pretrained model
    #These prediction reflect conditional probability
    #High quality means low entropy
    predicted_class_probabilites = InceptionModel.predict(images)
    # enumerate splits of images/predictions
    inception_scores = []
    
    #In most of the approaches for the calculation of inception score 
    #it is suggested that splitting the imageset
    #and considering the average inception scores and standard deviations is sufficient
    
    for index in range(number_of_splits):
        #Splitting and taking the conditional probabilities
        indexStart, indexEnd = (index * floor(images.shape[0] / number_of_splits)), (index * floor(images.shape[0] / number_of_splits) + floor(images.shape[0] / number_of_splits))
        conditional_set = predicted_class_probabilites[indexStart:indexEnd]
        #Calculate the probability of the class
        classProbability = expand_dims(conditional_set.mean(axis=0), 0)
        # Calculating KL Divergence with log probabilties
        KLDivergence = conditional_set * (log(conditional_set + 1e-3) - log(classProbability +1e-3))
        #Summing the values of divergences
        sumKLDivergence = KLDivergence.sum(axis=1)
        #Averaging the KL values over sumKLDivergence
        averageKLDivergence = mean(sumKLDivergence)
        #Reverse the log operation
        inceptionScore = exp(averageKLDivergence)
        # store the inception score
        inception_scores.append(inceptionScore)
        #Take the average of the inception score and standard devaiton on images
        inceptionScoreAverage, inceptionScoreStandarDeviation = mean(inception_scores), std(inception_scores)
    return inceptionScoreAverage, inceptionScoreStandarDeviation
 
#Calculate inception score with the given image set
#Do not forget to choose the splits properly according to the image set size
inceptionScoreAverage, inceptionScoreStandarDeviation = InceptionScoreCalculation(torhc_list_tensor_generated_last,128)
print('Average Inception Score', inceptionScoreAverage)
print('Standard Deviation of the Inception Score', inceptionScoreStandarDeviation)

In [None]:
############################################# PART FOR THE GAN EVALUATION METRICS ########################################################
#This part corresponds to the objective GAN evaluation metrics which are IS(Inception Score) and FID(Fretchet Inception Distannce)
##########################################################################################################################################
# example of calculating the frechet inception distance in Keras

# calculate frechet inception distance
def FIDScoreCalculation(inceptionModel, image1, image2):
    
    #Calculate the class prediction probabilities for the first image
    classProbabilitiesImage1 = inceptionModel.predict(image1)
    
    #Calculate the class prediction probabilities for the first image
    classProbabilitiesImage2 = inceptionModel.predict(image2)
    
    #Calculate the mean and the covarinces of the images
    meanFirstImage, StandardDeviationFirstImage = classProbabilitiesImage1.mean(axis=0), cov(classProbabilitiesImage1, rowvar=False)
    meanSecondImage, StandardDeviationSecondImage = classProbabilitiesImage2.mean(axis=0), cov(classProbabilitiesImage2, rowvar=False)

    #Calculate the sum of squared differences between means
    sumOfSqauredDistanceMean = numpy.sum((meanFirstImage - meanSecondImage)**2.0)
    
    #Calculate the square root of product between covariances
    meanOfTheCovariances = sqrtm(StandardDeviationFirstImage.dot(StandardDeviationSecondImage))
    
    #Check whether the imaginary numbers are in correct format after the square root
    if iscomplexobj(meanOfTheCovariances):
        meanOfTheCovariances = meanOfTheCovariances.real
    
    #Calculate FID Score
    FIDScore = sumOfSqauredDistanceMean + trace(StandardDeviationFirstImage + StandardDeviationSecondImage - 2.0 * meanOfTheCovariances)
    return FIDScore
 

# fid between images1 and images1
fid = FIDScoreCalculation(InceptionModel, torhc_list_tensor2.numpy(), torhc_list_tensor2.numpy())
print('If the image sets are same FID score will be', fid)
# fid between images1 and images2
fid = FIDScoreCalculation(InceptionModel,torhc_list_tensor2.numpy(), torhc_list_tensor1.numpy())
print('If the image sets are different FID score will be', fid)

In [None]:
# saving the pretrained model
torch.save(discriminator,'data\pretrained\AAEdiscriminatorlast')
torch.save(encoder,'data\pretrained\AAEencoderlast')
torch.save(decoder,'data\pretrained\AAEdecoderlast')