# Variational Auto Encoder Notebook
Generative Algorithm that has been trained using FIRST Radio Sources. Makes use of the FRDEEP dataset of FIRST Radio Sources.

In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torchvision
import torchvision.transforms as transforms
from torch.autograd import Variable
from torchvision.utils import save_image
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
from torchsummary import summary
from MiraBest import MiraBest
from PIL import Image

In [2]:
import torch.nn as nn
import torch.nn.functional as F

In [3]:
import torch.optim as optim

Imports the images from the FRDEEP, we make use of the FIRST radio sources

In [4]:
from FRDEEP import FRDEEPF

In [32]:
def dataloader_first():
    transform = transforms.Compose([transforms.ToTensor(),transforms.Normalize([0.5],[0.5])])
    
    trainset = FRDEEPF(root='./FIRST_data', train=True, download=True, transform=transform)  
    trainloader = torch.utils.data.DataLoader(trainset, shuffle=True, num_workers=2, batch_size=len(trainset))
    
    testset = FRDEEPF(root='./FIRST_data', train=False, download=True, transform=transform) 
    testloader = torch.utils.data.DataLoader(testset, shuffle=True, num_workers=2,batch_size = len(trainset))
    
    classes = ('FRI', 'FRII')
    
    array_train= next(iter(trainloader))[0].numpy() # Training Datasets is loaded in numpy array
    
    array_label= next(iter(trainloader))[1].numpy() # Training Datasets labels is loaded in seperate numpy array
    
    augmented_data=np.zeros((19800,1,100,100))
    
    count=0
    
    for j in range(0,550):
        image_object=Image.fromarray(array_train[j,0,:,:])
        for i in range(0,36):
            rotated=image_object.rotate(i*10)
            imgarr = np.array(rotated)
            temp_img_array=imgarr[25:125,25:125]
            augmented_data[count,0,:,:]=temp_img_array
            count+=1
           
    augmented_data=(augmented_data-np.min(augmented_data))/(np.max(augmented_data)-np.min(augmented_data))
    
    X=augmented_data
    
    X_random_mix=np.take(X,np.random.permutation(X.shape[0]),axis=0,out=X);
    
    array_test = next(iter(testloader))[0].numpy()
    
    augmented_data_test=np.zeros((50,1,100,100))
    
    count=0
    
    for j in range(0,50):
        image_object=Image.fromarray(array_test[j,0,:,:])
        for i in range(0,1):
            rotated=image_object.rotate(i*10)
            imgarr = np.array(rotated)
            temp_img_array=imgarr[25:125,25:125]
            augmented_data[count,0,:,:]=temp_img_array
            count+=1
           
    augmented_data_test=(augmented_data_test-np.min(augmented_data_test))/(np.max(augmented_data_test)-np.min(augmented_data_test))
    
    X_test=augmented_data_test
    
    X_random_mix_test=np.take(X,np.random.permutation(X_test.shape[0]),axis=0,out=X_test);
    
    
    
    
    return X_random_mix,X_random_mix_test

# Variational Auto Encoder Neural Network 
The Variational Auto Encoder consists of two neural network: i. An encoder and ii.a decoder.  
- The Encoder\
The enncoder consists of 5 layers and an input layer. The 100 by 100 pizels are reshaped to 10000 input features that are then reduced to 4096 in the first connected layer, then to 2048 features, then to 1024 features, to 512 features, 256 features and finally to the final 2 dimensional latent space z.

In [6]:
class VAE(nn.Module):
    def __init__(self, x_dim, h_dim1, h_dim2, h_dim3, h_dim4, h_dim5, z_dim):
        super(VAE, self).__init__()
        
        # encoder part
        self.fc1 = nn.Linear(x_dim, h_dim1) #x_dim=10000 to h_dim1=4096 
        self.fc2 = nn.Linear(h_dim1, h_dim2) #h_dim1=4096 to h_dim2=2048
        self.fc3 = nn.Linear(h_dim2, h_dim3) #h_dim2=2048 to h_dim3=1024
        self.fc4 = nn.Linear(h_dim3, h_dim4) #h_dim3=1024 to h_dim4=512
        self.fc5 = nn.Linear(h_dim4, h_dim5) #h_dim4=512 to h_dim5=256
        self.fc61 = nn.Linear(h_dim5, z_dim) #h_dim5=256 to z_dim=2
        self.fc62 = nn.Linear(h_dim5, z_dim) #h_dim5=256 to z_dim=2
        self.softplus = nn.Softplus()
        
        # decoder part
        self.fc7 = nn.Linear(z_dim, h_dim5) #z_dim=2 to h_dim5=256
        self.fc8 = nn.Linear(h_dim5, h_dim4) #h_dim5=256 to h_dim4=512
        self.fc9 = nn.Linear(h_dim4, h_dim3) #h_dim4=512 to h_dim3=1024
        self.fc10 = nn.Linear(h_dim3, h_dim2) #h_dim3=1024 to h_dim2=2048
        self.fc11 = nn.Linear(h_dim2, h_dim1) #h_dim2=2048 to h_dim1=4096
        self.fc12 = nn.Linear(h_dim1, x_dim)  #h_dim1=4096 to x_dim=10000
        self.softplus = nn.Softplus()
        self.sigmoid = nn.Sigmoid()
        
    def encoder(self, x):
        
        slope_param = 0.0001
        
        h = self.softplus(self.fc1(x))
        h = F.leaky_relu(self.fc2(h),slope_param)
        h = F.leaky_relu(self.fc3(h),slope_param)
        h = F.leaky_relu(self.fc4(h),slope_param)
        h = F.leaky_relu(self.fc5(h),slope_param)
        return self.fc61(h), self.fc62(h) # mu, log_var
    
    def sampling(self, mu, log_var):
        std = torch.exp(0.5*log_var)
        eps = torch.randn_like(std)
        return eps.mul(std).add_(mu) # return z sample
        
    def decoder(self, z):
        slope_param = 0.0001
        h = F.leaky_relu(self.fc7(z),slope_param)
        h = F.leaky_relu(self.fc8(h),slope_param)
        h = F.leaky_relu(self.fc9(h),slope_param)
        h = F.leaky_relu(self.fc10(h),slope_param)
        h = F.leaky_relu(self.fc11(h),slope_param)
        return F.sigmoid(self.fc12(h)) # Try to see what happens when we put a Softplus Here
    
    def forward(self, x):
        mu, log_var = self.encoder(x.view(-1, 10000))
        z = self.sampling(mu, log_var)
        return self.decoder(z), mu, log_var


In [7]:
# build model
vae = VAE(x_dim=10000, h_dim1= 4096, h_dim2=2048, h_dim3=1024, h_dim4=512, h_dim5=256, z_dim=2)

if torch.cuda.is_available():
    vae.cuda()

In [8]:
optimizer = optim.Adagrad(vae.parameters(), lr=0.3e-3)

# return reconstruction error + KL divergence losses
def loss_function(recon_x, x, mu, log_var):
    BCE = F.binary_cross_entropy(recon_x, x.view(-1, 10000), reduction='sum')
    KLD = -0.5 * torch.sum(1 + log_var - mu.pow(2) - log_var.exp())
    return BCE + KLD


In [33]:
X,X_test= dataloader_first()

Files already downloaded and verified
Files already downloaded and verified




In [34]:
X_test.shape

(50, 1, 100, 100)

In [37]:
def train(epoch):
    vae.train()
    train_loss = 0   
    data=torch.zeros(100, 1, 100, 100).cpu
    epoch_loss = 0
    for j in range(0,198):
        data = torch.from_numpy(X[j*100:(j+1)*100,:,:])
        data = data.cuda()
        optimizer.zero_grad()
        
        recon_batch, mu, log_var = vae(data.float())
        loss = loss_function(recon_batch.float(), data.float(), mu, log_var)
        
        loss.backward()
        train_loss += loss.item()
        optimizer.step()
        epoch_loss+=loss.item() / len(data)
        
        #Testing Part:
        data_test = torch.from_numpy(X_test)
        data_test = data_test.cuda()
        recon_batch_test,mu_test,log_var_test = vae(data_test.float())
        test_loss = loss_function(recon_batch_testfloat(), data_test, mu_test, log_var_test).item()
        
        
    print("Training Epoch:"+str(epoch)+' Training Loss: '+str(epoch_loss/198.0))

In [38]:
for epoch in range(1, 100):
    train(epoch)

RuntimeError: reduce failed to synchronize: device-side assert triggered

Sample the images from the latent space z, sample range between $-4.0 \leqslant z_1 \leqslant 4.0$ and $-4.0 \leqslant z_2 \leqslant 4.0$. Image are sampled at steps of 0.2 in both directions.

In [None]:
with torch.no_grad():
    z = torch.randn(1600, 2).cuda()
    count_x=-4.0
    count_y=-4.0
    x=0
    y=0
    for i in range (0,40):
        for j in range (0,40):
            z[x,0]=count_x
            z[x,1]=count_y
            x=x+1
            count_y=count_y+0.2
        y=y+1
        count_x=count_x+0.2
        count_y=-4.0
        
    sample = vae.decoder(z).cuda()
    
    save_image(sample.view(1600, 1, 100, 100), 'sample_z_space_' +str(epoch)+'.png',nrow=40)

In [33]:
with torch.no_grad():
    z = torch.randn(100, 2).cuda()    
    sample = vae.decoder(z).cuda()
    save_image(sample.view(100, 1, 100, 100), 'sample_random_' +str(epoch)+'.png',nrow=10)