In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import torchvision.transforms as transforms

from PIL import Image
import pandas as pd
import os

In [None]:
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Device: {device}")

# Dataset Class

In [None]:
def Create_dataset(portion_train, path_rollouts_csv="./DATASET_ROLLOUTS/rollouts.csv"):
    # if dataset for VAE already exists, just load it
    if os.path.exists("./DATASET_ROLLOUTS/dataset_vae.csv"):
        df_obs = pd.read_csv("./DATASET_ROLLOUTS/dataset_vae.csv", sep=";", skipinitialspace=True)
        print(f"Dataset loaded from csv")
    else:
        # if not, create it from the main dataset
        
        # load into dataframe the csv of the rollouts
        df_rollouts = pd.read_csv(path_rollouts_csv, sep=";", skipinitialspace=True)
        # init the resulting dataframe
        df_obs = pd.DataFrame(columns=["Path image observation"])
        
        # iterate over each rollout
        i = 0
        for path_obs_csv in df_rollouts["Path csv"]:
            # load into dataframe the csv of the rollouts' observations
            new_df = pd.read_csv(path_obs_csv, sep=";", skipinitialspace=True)
            # add the observations' path to the resulting dataframe
            df_obs = pd.concat([df_obs, new_df[["Path image observation"]]], axis=0)
            
            if (i+1) % 1000 == 0:
                print(f"Observation {i+1}/10000")
            i+=1
            
        print(f"Creating new csv")
        # save the resulting dataframe into csv
        df_obs.to_csv("./DATASET_ROLLOUTS/dataset_vae.csv", sep=";", index=False)
        print(f"Csv created")
        
    # divide the observations into trainset and testset
    n_train = int(len(df_obs) * portion_train)
    trainset = Trainset(df_obs.sample(n_train))
    print(f"Train set created")
    testset = None
    if portion_train < 1:
        df_obs = df_obs.drop(list(trainset.df.index.values))
        testset = Testset(df_obs)
        print(f"Test set created")
    return trainset, testset

In [None]:
class Trainset(Dataset):
    def __init__(self, df):
        self.transform = transforms.PILToTensor()
        self.df = df
            
    def __len__(self):
        return len(self.df)
    
    def __getitem__(self, idx):
        # returns the image's tensor, normalized
        img = Image.open(self.df.iloc[idx, 0])
        return self.transform(img).float()/255
    
class Testset(Dataset):
    def __init__(self, df):
        self.transform = transforms.PILToTensor()
        self.df = df
            
    def __len__(self):
        return len(self.df)
    
    def __getitem__(self, idx):
        # returns the image's tensor, normalized
        img = Image.open(self.df.iloc[idx, 0])
        return self.transform(img).float()/255     

# Encoder

In [None]:
class Encoder(nn.Module):
    def __init__(self, latent_dim):
        super().__init__()
        self.conv1 = nn.Conv2d(in_channels=3,
                               out_channels=32,
                               kernel_size=4,
                               stride=2
                              )
        self.conv2 = nn.Conv2d(in_channels=32,
                               out_channels=64,
                               kernel_size=4,
                               stride=2
                              )
        self.conv3 = nn.Conv2d(in_channels=64,
                               out_channels=128,
                               kernel_size=4,
                               stride=2
                              )
        self.conv4 = nn.Conv2d(in_channels=128,
                               out_channels=256,
                               kernel_size=4,
                               stride=2
                              )
        
        self.fc_mu = nn.Linear(in_features=2*2*256, out_features=latent_dim)
        self.fc_logvar = nn.Linear(in_features=2*2*256, out_features=latent_dim)
        
        self.activation = nn.ReLU()
        
    def forward(self, x):
        x = self.activation(self.conv1(x))
        x = self.activation(self.conv2(x))
        x = self.activation(self.conv3(x))
        x = self.activation(self.conv4(x))
        x = x.view(x.shape[0], -1)
        x_mu = self.fc_mu(x)
        x_logvar = self.fc_logvar(x)
        
        return x_mu, x_logvar
        
        

# Decoder

In [None]:
class Decoder(nn.Module):
    def __init__(self, latent_dim):
        super().__init__()
        self.fc = nn.Linear(in_features=latent_dim, out_features=1024)
        
        self.conv4 = nn.ConvTranspose2d(in_channels=1024,
                                       out_channels=128,
                                       kernel_size=5,
                                       stride=2)
        self.conv3 = nn.ConvTranspose2d(in_channels=128,
                                       out_channels=64,
                                       kernel_size=5,
                                       stride=2)
        self.conv2 = nn.ConvTranspose2d(in_channels=64,
                                       out_channels=32,
                                       kernel_size=6,
                                       stride=2)
        self.conv1 = nn.ConvTranspose2d(in_channels=32,
                                       out_channels=3,
                                       kernel_size=6,
                                       stride=2)
        
        self.ReLU_activation = nn.ReLU()
        
    def forward(self, x):
        x = self.fc(x)
        x = x.view(x.shape[0], 1024, 1, 1)
        x = self.ReLU_activation(self.conv4(x))
        x = self.ReLU_activation(self.conv3(x))
        x = self.ReLU_activation(self.conv2(x))
        x = torch.sigmoid(self.conv1(x))
        return x
        

# VAE

In [None]:
class VariationalAutoencoder(nn.Module):
    def __init__(self, latent_dim):
        super().__init__()
        self.encoder = Encoder(latent_dim)
        self.decoder = Decoder(latent_dim)
        
    def forward(self, x):
        latent_mu, latent_logvar = self.encoder(x)
        latent = self.latent_sample(latent_mu, latent_logvar)
        x_recon = self.decoder(latent)
        return x_recon, latent_mu, latent_logvar
        
    def latent_sample(self, mu, logvar):
        if self.training:
            std = (logvar * 0.5).exp()
            return torch.distributions.Normal(loc=mu, scale=std).rsample()
        else:
            return mu

# Training

In [None]:
# loss of the vae with mse
def vae_loss(recon_x, x, mu, logvar, variational_beta):
    recon_loss = F.mse_loss(recon_x.view(-1,12288), x.view(-1,12288))
    kldivergence = -0.5 * torch.sum(1 + logvar - mu.pow(2) - logvar.exp())
    return recon_loss + variational_beta*kldivergence

In [None]:
# obtain trainset and testset
trainset, testset = Create_dataset(portion_train=1)

In [None]:
# starting with beta at 0, after obtaining a decent reconstruction, increase it incrementally until 1
var_beta = 0
latent_dim = 512
lr = 1e-3

In [None]:
vae = VariationalAutoencoder(latent_dim=latent_dim).to(device)

vae.train()
optimizer = optim.Adam(params=vae.parameters(), lr=lr)
    
i=0     
while(True):
    losses = []
    n_batch = 1
    train_loader = DataLoader(trainset, batch_size=100, shuffle=True)
    for img_batch in train_loader:
        img_batch = img_batch.to(device)
        
        img_batch_recon, latent_mu, latent_logvar = vae(img_batch)
        
        loss = vae_loss(img_batch_recon, img_batch, latent_mu, latent_logvar, var_beta)

        optimizer.zero_grad()
        loss.backward()
        losses.append(loss.item())
        optimizer.step()
    print(f"EPOCH: {i+1} MEAN LOSSES EPOCH: {sum(losses)/len(losses)}")
    print(f"Loss: {loss.item()}")
    i+=1
    torch.save(vae, f"./BACKUP_MODELS/vae")