In [1]:
import sys
sys.path.append('./src')

from chatgpt_util_imports import generate_dataset
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd

import torch
import torch.nn as nn
from torch.nn import functional as F

from torch.utils.data import Dataset
from torch.utils.data.dataloader import DataLoader
from torch.utils.data import random_split

In [2]:
device = torch.device('mps' if torch.backends.mps.is_available() else 'cpu')

In [3]:
# Load +31k patches as compiled by `bwhitman` for `learnfm`.
fpath = 'data/compact.bin'

df = generate_dataset(fpath)
df = pd.DataFrame(df)

print(f"Num patches: {len(df)}")
print(f"Num features: {len(df.keys())}")

# df.hist(figsize=(50, 50))
# plt.show()

In [76]:
class PatchDataset(Dataset):
    def __init__(self, df):

        if 'VOICE NAME' in df.keys():
            df = df.drop(columns=['VOICE NAME'])
        
        self.parameter_names = df.keys()
        self.means = df.mean()
        self.stds = df.std()

        df = df - df.mean()
        df = df / df.std()

        self.df = torch.tensor(df.values, dtype=torch.float32)

    def __len__(self):
        return len(self.df)
    
    def __getitem__(self, idx):
        return self.df[idx]

In [77]:
dataset = PatchDataset(df)

p_train = 0.8
batch_size = 32

n_train = int(p_train * len(dataset))
n_val = len(dataset) - n_train

train_dataset, val_dataset = random_split(dataset, [n_train, n_val])

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size)

In [78]:
class VAE(nn.Module):
    """ 
    Variational autoencoder class, as per _Auto-Encoding Variational Bayes_. 
        https://arxiv.org/pdf/1312.6114.pdf

    Code written with reference to:
        https://github.com/AntixK/PyTorch-VAE/

    """

    def __init__(self, n_features, n_hidden, n_latent):
        super(VAE, self).__init__()

        self.encoder = nn.Sequential(
            nn.Linear(n_features, n_hidden),
            nn.ReLU(),
            nn.Linear(n_hidden, n_latent),
            nn.ReLU(),
            )
        
        self.decoder = nn.Sequential(
            nn.Linear(n_latent, n_latent),
            nn.ReLU(),
            nn.Linear(n_latent, n_hidden),
            nn.ReLU(),
            nn.Linear(n_hidden, n_features),
        )

        self.fc_mu = nn.Linear(n_latent, n_latent)
        self.fc_var = nn.Linear(n_latent, n_latent)

    def encode(self, x):
        x = self.encoder(x)

        # From latent embedding, generate mean and standard deviation.
        mu = self.fc_mu(x)
        log_var = self.fc_var(x)

        return [mu, log_var]
    
    def decode(self, z):
        return self.decoder(z)
    
    def reparameterize(self, mu, log_var):
        # Render as a random sample from Gaussian distribution.
        std = torch.exp(0.5 * log_var)
        eps = torch.randn_like(std)
        return eps * std + mu

    def forward(self, x):
        mu, log_var = self.encode(x)

        # Latent representation.
        z = self.reparameterize(mu, log_var)

        y = self.decode(z)
        return [y, mu, log_var]

def vae_loss(x, y, mu, log_var, kld_weight=1.0):

    reconstruction_loss = F.mse_loss(x, y)
    kullback_liebler_loss = torch.mean(
        -0.5 * torch.sum(1 + log_var - mu ** 2 - log_var.exp(), dim=1), dim=0
    )

    loss = reconstruction_loss + kld_weight * kullback_liebler_loss
    return {'loss': loss, 'mse': reconstruction_loss, 'kld': kullback_liebler_loss}

In [79]:
n_features = len(dataset.parameter_names)
n_hidden = 64
n_latent = 16

model = VAE(n_features, n_hidden, n_latent)
model = model.to(device)

optimizer = torch.optim.Adam(model.parameters(), lr=1e-1)

In [None]:
# Training.
n_epochs = 10

for i in range(n_epochs):

    model.train()
    epoch_train_loss = {key: 0.0 for key in ['loss', 'mse', 'kld']}
    epoch_val_loss = {key: 0.0 for key in ['loss', 'mse', 'kld']}

    for x in train_loader:
        x = x.to(device)
        optimizer.zero_grad()

        y, mu, log_var = model(x)
        loss = vae_loss(x, y, mu, log_var)
        loss['loss'].backward()
        optimizer.step()

        for key in loss.keys():
            epoch_train_loss[key] += loss[key].item()

    # Report average loss.
    for key in loss.keys():
        epoch_train_loss[key] /= len(train_dataset)


    model.eval()

    with torch.no_grad():
        for x in val_loader:
            x = x.to(device)

            y, mu, log_var = model(x)
            loss = vae_loss(x, y, mu, log_var)

            for key in loss.keys():
                epoch_val_loss[key] += loss[key].item()

        for key in loss.keys():
                epoch_val_loss[key] /= len(val_dataset)

    print(f"Epoch {i}, Train loss: {epoch_train_loss['loss']:.8f}, (MSE: {epoch_train_loss['mse']:.8f}, KLD: {epoch_train_loss['kld']:.8f})")
    print(f"Epoch {i}, Val loss: {epoch_val_loss['loss']:.8f}, (MSE: {epoch_val_loss['mse']:.8f}, KLD: {epoch_val_loss['kld']:.8f})")

Epoch 0, Train loss: 0.03139978, (MSE: 0.03139829, KLD: 0.00000150)
Epoch 0, Val loss: 0.03211069, (MSE: 0.03210773, KLD: 0.00000296)
Epoch 1, Train loss: 0.03139569, (MSE: 0.03139399, KLD: 0.00000170)
Epoch 1, Val loss: 0.03206557, (MSE: 0.03206410, KLD: 0.00000146)
Epoch 2, Train loss: 0.03140579, (MSE: 0.03140438, KLD: 0.00000141)
Epoch 2, Val loss: 0.03207097, (MSE: 0.03206869, KLD: 0.00000228)
Epoch 3, Train loss: 0.03140994, (MSE: 0.03140814, KLD: 0.00000181)
Epoch 3, Val loss: 0.03205190, (MSE: 0.03204911, KLD: 0.00000279)
