In [1]:
import torch
import os
from pathlib import Path
from torchvision.io import read_image
from torchvision.datasets.folder import default_loader
import torchvision.transforms as T
from torch.utils.data import Dataset
from torch.utils.data import DataLoader
from torch import nn
from torch.nn import functional as F
from torchsummary import summary
import matplotlib.pyplot as plt
import torch.optim as optim
import wandb

  warn(


In [2]:
wandb.login()

Failed to detect the name of this notebook, you can set it manually with the WANDB_NOTEBOOK_NAME environment variable to enable code saving.
[34m[1mwandb[0m: Currently logged in as: [33mhrishi23[0m. Use [1m`wandb login --relogin`[0m to force relogin


True

In [3]:
class animalFaceDataset(Dataset):
    def __init__(self, dataPath) -> None:
        self.dataPath = dataPath

        catPath = Path(os.path.join(self.dataPath,'cat'))
        dogPath = Path(os.path.join(self.dataPath,'dog'))
        wildPath = Path(os.path.join(self.dataPath,'wild'))

        self.images = []
        self.labels = []
        
        PathsToTraverse = [catPath,dogPath,wildPath]

        for labelIndex in range(3):
            for eachImg in PathsToTraverse[labelIndex].iterdir():
                self.images.append(eachImg)
                self.labels.append(labelIndex)

        self.transforms = T.Compose([T.Resize((128,128)),T.ToTensor()])
        
    def __len__(self):
        return len(self.images)
    
    def __getitem__(self, index):
        originalImg = default_loader(self.images[index])
        resized_img = self.transforms(originalImg)
        return resized_img
        
        

In [4]:
traindataPath = r"D:\MTech_IISc\OneDrive - Indian Institute of Science\Third Semester\ADRL\Assignment1\Dataset\train"
train_dataset = animalFaceDataset(traindataPath)

evaldataPath = r"D:\MTech_IISc\OneDrive - Indian Institute of Science\Third Semester\ADRL\Assignment1\Dataset\val"
eval_dataset = animalFaceDataset(evaldataPath)

In [5]:
#Hyperparameters
batchSize = 16
latent_dim = 150
cnn_hidden_channels = [32, 64, 128, 256, 512,1024]
reversedChannels = cnn_hidden_channels.copy()
reversedChannels.reverse()
learning_rate = 0.001
number_of_epochs = 10

In [6]:
train_loader = DataLoader(dataset=train_dataset, batch_size=batchSize, shuffle=True)
eval_loader =  DataLoader(dataset=eval_dataset, batch_size= batchSize, shuffle=True)

In [7]:
class Encoder(nn.Module):
    def __init__(self, latent_dim, cnn_hidden_channels) -> None:
        super(Encoder,self).__init__()
        self.latent_dim = latent_dim
        self.cnn_hidden_channels = cnn_hidden_channels

        modules = []
        in_channels = 3 

        for h_dim in self.cnn_hidden_channels:
            modules.append(
                nn.Sequential(
                    nn.Conv2d(in_channels, out_channels=h_dim,
                                kernel_size= 3, stride= 2, padding  = 1),
                    nn.BatchNorm2d(h_dim),
                    nn.LeakyReLU())
            )
            in_channels = h_dim

        self.allCNNLayers = nn.Sequential(*modules)

        self.fcForMu = nn.Linear(self.cnn_hidden_channels[-1]*4, latent_dim)
        self.fcForVar = nn.Linear(self.cnn_hidden_channels[-1]*4, latent_dim)

    def forward(self,sample):
        #sample : [bs,Channels, Height, Width]

        sample = self.allCNNLayers(sample)
        #sample : [bs, 1024, 2, 2]

        sample = torch.flatten(sample,start_dim=1)
        #sample : [bs, 1024*4]

        mu = self.fcForMu(sample)
        logvar = self.fcForVar(sample)

        return mu,logvar

In [8]:
class SampleLatentVars(nn.Module):
    def __init__(self) -> None:
        super(SampleLatentVars,self).__init__()
    
    def forward(self, mu, logvar):
        #mu : [bs, latent_dim], logvar : [bs, latent_dim]

        std = torch.exp(0.5 * logvar)
        stdGauss = torch.rand_like(std)
        #stdGauss : [bs, latent_dim]

        sampledZs = stdGauss*std + mu
        #samplesZs : [bs, latent_dim]

        return sampledZs

In [9]:
class Decoder(nn.Module):
    def __init__(self, latent_dim, cnn_hidden_channels) -> None:
        super(Decoder,self).__init__()
        self.latent_dim = latent_dim
        self.cnn_hidden_channels = cnn_hidden_channels #[1024,512,256,128,64,32]

        modules = []

        self.InputToConvTranspose = nn.Linear(latent_dim, cnn_hidden_channels[0] * 4)

        for i in range(len(cnn_hidden_channels) - 1):
            modules.append(
                nn.Sequential(
                    nn.ConvTranspose2d(cnn_hidden_channels[i],
                                       cnn_hidden_channels[i + 1],
                                       kernel_size=3,
                                       stride = 2,
                                       padding=1,
                                       output_padding=1),
                    nn.BatchNorm2d(cnn_hidden_channels[i + 1]),
                    nn.LeakyReLU())
            )

        self.deconvolve = nn.Sequential(*modules)

        self.final_layer = nn.Sequential(
                            nn.ConvTranspose2d(cnn_hidden_channels[-1],
                                               cnn_hidden_channels[-1],
                                               kernel_size=3,
                                               stride=2,
                                               padding=1,
                                               output_padding=1),
                            nn.BatchNorm2d(cnn_hidden_channels[-1]),
                            nn.LeakyReLU(),
                            nn.Conv2d(cnn_hidden_channels[-1], out_channels= 3,
                                      kernel_size= 3, padding= 1),
                            nn.Tanh())
        
    def forward(self, sampledZs):
        #sampledZs : [bs, latent_dim]

        inputToDeconvolve = self.InputToConvTranspose(sampledZs).view(-1,1024,2,2)
        #inputToDeconvolve : [bs, 1024, 2, 2]

        deconvolved = self.deconvolve(inputToDeconvolve)
        #deconvolved : [bs, 32, 64,64]

        img = self.final_layer(deconvolved)
        #img : [bs, 3, 128,128]

        return img

In [10]:
class vaeModel(nn.Module):
    def __init__(self, encoder, sampler, decoder) -> None:
        super(vaeModel,self).__init__()
        self.encoder = encoder
        self.sampler = sampler
        self.decoder = decoder

    def forward(self, sample):
        #sample : [bs, channels, H, W]

        mu, logvar = self.encoder(sample)
        #mu : [bs, latent_dim]
        #logVar : [bs,latent_dim]
        
        sampledZs = self.sampler(mu,logvar)
        #sampledZs : [bs, latent_dim]

        reconstructed = self.decoder(sampledZs)
        #reconstructed: [bs, 3, H, W]

        return reconstructed, mu, logvar # We need to return mu and logvar because it is needed further to calculate loss

In [11]:
encoder = Encoder(latent_dim=latent_dim, cnn_hidden_channels=cnn_hidden_channels)
sampler = SampleLatentVars()
decoder = Decoder(latent_dim=latent_dim, cnn_hidden_channels=reversedChannels)

model = vaeModel(encoder=encoder, sampler=sampler, decoder=decoder)

In [12]:
def calculate_loss(org_image, reconstructed_image, mu, logvar):
    #org_image : [bs, channels, H, W]
    #reconstructed_image : [bs, channels, H, W]
    #mu : [bs, latent_dim]
    #logvar : [bs, logvar]

    mse = F.mse_loss(org_image,reconstructed_image)
    kld_loss = torch.mean(-0.5 * torch.sum(1 + logvar - mu ** 2 - logvar.exp(), dim = 1), dim = 0)

    loss = mse + kld_loss

    return loss, mse, kld_loss

In [13]:
optimizer = optim.Adam(model.parameters(),lr = learning_rate)

In [14]:
def calcValidationLoss(model,valid_loader):
    model.eval()
    running_loss = 0.0
    for eachBatch in valid_loader:
        reconstructed,mu,logvar = model(eachBatch)
        loss,_,_ = calculate_loss(eachBatch,reconstructed,mu,logvar)
        running_loss += loss

    avg_loss = running_loss/len(valid_loader)
    return avg_loss

In [15]:
#train loop
wandb.init(project="ADRLAssignment1VAE",
        config={
            "epochs": number_of_epochs,
            "batch_size": batchSize,
            "lr": learning_rate
            })

config = wandb.config
model.train()

step = 0
for epochs in range(config.epochs):
    for eachBatch in train_loader:
        optimizer.zero_grad()
        reconstructed,mu,logvar = model(eachBatch)
        loss,_,_ = calculate_loss(eachBatch,reconstructed,mu,logvar)

        loss.backward()
        optimizer.step()

        if step%50 == 49:
            train_metrics = {
                "train_loss":loss,
                "train_step":step,
            }
            wandb.log(train_metrics)
            val_loss = calcValidationLoss(model,eval_loader)
            val_metrics = {
                "val_loss":val_loss,
                "train_step":step
            }
            wandb.log(val_metrics)
            print("train Loss : ",loss)
            print("Validation loss : ", val_loss)
        step += 1

wandb.finish()

train Loss :  tensor(1.4483, grad_fn=<AddBackward0>)
Validation loss :  tensor(0.9316, grad_fn=<DivBackward0>)


RuntimeError: [enforce fail at ..\c10\core\impl\alloc_cpu.cpp:72] data. DefaultCPUAllocator: not enough memory: you tried to allocate 8388608 bytes.