# VAE

In [None]:
import os

os.environ["CUDA_VISIBLE_DEVICES"] = "1"

In [None]:
import timm, torch, random, datasets, math, fastcore.all as fc, numpy as np, matplotlib as mpl, matplotlib.pyplot as plt

# import k_diffusion as K,
import torchvision.transforms as T
import torchvision.transforms.functional as TF, torch.nn.functional as F

from torch.utils.data import DataLoader, default_collate
from pathlib import Path
from torch.nn import init
from fastcore.foundation import L
from torch import nn, tensor
import datasets as ds
from operator import itemgetter
import pandas as pd
from torcheval.metrics import MulticlassAccuracy, Mean, Metric
from functools import partial
from torch.optim import lr_scheduler
from torch import optim
from einops import rearrange

from miniai.datasets import *
from miniai.conv import *
from miniai.learner import *
from miniai.activations import *
from miniai.training import *
from miniai.init import *
from miniai.sgd import *
from miniai.resnet import *
from miniai.augment import *
from miniai.accel import *

In [None]:
torch.set_printoptions(precision=4, linewidth=140, sci_mode=False)
torch.manual_seed(1)
mpl.rcParams["image.cmap"] = "gray_r"
mpl.rcParams["figure.dpi"] = 70

import logging

logging.disable(logging.WARNING)

set_seed(42)
if fc.defaults.cpus > 8:
    fc.defaults.cpus = 8

In [None]:
iw = partial(init_weights, leaky=0.2)

In [None]:
n_samples = 1000
# Create pandas DataFrame
df = pd.DataFrame(
    {
        "athletic_ability": np.random.normal(0, 1, n_samples),
        "academic_performance": np.random.normal(0, 1, n_samples),
    }
)

# Add scholarship (collider)
df["scholarship"] = (
    0.7 * df.athletic_ability
    + 0.7 * df.academic_performance
    + np.random.normal(0, 0.1, n_samples)
)

# Convert to HuggingFace Dataset
dataset = ds.Dataset.from_pandas(df)

# Split into train/validation
dataset = dataset.train_test_split(test_size=0.2, seed=42)


# Transform function to convert to tensors
@inplace
def transformi(b):
    features = torch.stack(
        [
            torch.tensor(b["athletic_ability"], dtype=torch.float32),
            torch.tensor(b["academic_performance"], dtype=torch.float32),
            torch.tensor(b["scholarship"], dtype=torch.float32),
        ]
    ).T  # Transpose to get correct shape [batch_size, features]
    b["xl"] = features
    b["yl"] = features


def collate(batch):
    xl = torch.stack([item["xl"] for item in batch])
    yl = torch.stack([item["yl"] for item in batch])
    return xl, yl


tds = dataset.with_transform(transformi)

# Create DataLoaders
dls = DataLoaders(
    *get_dls(tds["train"], tds["test"], bs=64, num_workers=8, collate_fn=collate)
)

In [None]:

# Test it
dl = dls.train
xb, yb = b = next(iter(dl))
print("Shape of xb:", xb.shape)  # Should print something like torch.Size([64, 3])

# Apply transforms

ni = 3  # input dimensions
nh = 32  # hidden dimensions
nl = 2  # latent dimensions

## VAE

In [None]:
# sd vae is 3 down, 1 no-down, mid, conv, sampling, conv, mid, 3 up, 1 no-up

In [None]:
class VAE(nn.Module):
    def __init__(self):
        super().__init__()
        self.enc = nn.Sequential(lin(ni, nh), lin(nh, nh))
        self.mu, self.lv = lin(nh, nl, act=None), lin(nh, nl, act=None)
        self.dec = nn.Sequential(lin(nl, nh), lin(nh, nh), lin(nh, ni, act=None))
        iw(self)

    def forward(self, x):
        x = self.enc(x)
        mu, lv = self.mu(x), self.lv(x)
        z = mu + (0.5 * lv).exp() * torch.randn_like(lv)
        return self.dec(z), mu, lv

In [None]:
def kld_loss(inp, x):
    x_hat, mu, lv = inp
    return -0.5 * (1 + lv - mu.pow(2) - lv.exp()).mean()


def bce_loss(inp, x):
    return F.binary_cross_entropy_with_logits(inp[0], x)


def vae_loss(inp, x):
    return kld_loss(inp, x) + bce_loss(inp, x)

In [None]:
x = torch.linspace(-3, 3, 100)
plt.figure(figsize=(4, 3))
plt.plot(x, -0.5 * (1 + x - x.exp()))

**Question**: What would happen if the variance of the latents were very low? What if they were very high?

**Bing**: If the variance of the latents were very low, then the encoder distribution would be very peaked and concentrated around the mean. This would make the latent space less diverse and expressive, and limit the ability of the decoder to reconstruct the data accurately. It would also make it harder to generate new data that are different from the training data.

If the variance of the latents were very high, then the encoder distribution would be very spread out and diffuse. This would make the latent space more noisy and random, and reduce the correlation between the latent codes and the data. It would also make it easier to generate new data that are unrealistic or nonsensical.

In [None]:
class FuncMetric(Mean):
    def __init__(self, fn, device=None):
        super().__init__(device=device)
        self.fn = fn

    def update(self, inp, targets):
        self.weighted_sum += self.fn(inp, targets)
        self.weights += 1


def init_weights(m, leaky=0.0):
    if isinstance(m, (nn.Conv1d, nn.Conv2d, nn.Conv3d, nn.Linear)):
        init.kaiming_normal_(m.weight, a=leaky)


iw = partial(init_weights, leaky=0.2)


def lin(ni, nf, act=nn.SiLU, norm=nn.BatchNorm1d, bias=True):
    layers = nn.Sequential(nn.Linear(ni, nf, bias=bias))
    if act:
        layers.append(act())
    if norm:
        layers.append(norm(nf))
    return layers

In [None]:
metrics = MetricsCB(kld=FuncMetric(kld_loss), bce=FuncMetric(bce_loss))
astats = ActivationStats(fc.risinstance(GeneralRelu))
opt_func = partial(optim.Adam, eps=1e-5)

In [None]:
lr = 3e-2
epochs = 20
tmax = epochs * len(dls.train)
sched = partial(lr_scheduler.OneCycleLR, max_lr=lr, total_steps=tmax)
cbs = [
    # DeviceCB(device="mps"),
    ProgressCB(plot=True),
    metrics,
    BatchSchedCB(sched),
    # astats
    TrainCB(),
    # MixedPrecision(),
]
model = VAE()
learn = Learner(model, dls, vae_loss, lr=lr, cbs=cbs, opt_func=opt_func)

In [None]:
learn.fit(epochs)

## VAE Sample

## Linear Regression