# Imports

In [None]:
# Install the necessary Python packages
!pip install numpy
!pip install tqdm
!pip install torch
!pip install torchvision
!pip install matplotlib
!pip install Pillow

In [6]:
# prerequisites
import torch
import torch.utils.data
from torch.utils.data import DataLoader
from torch import nn
import torch.nn.functional as F
from torch import optim

import torchvision
from torchvision import datasets, transforms
from torchvision.utils import save_image

from tqdm import tqdm
import numpy as np
import matplotlib.pyplot as plt
from typing import Callable

# Download Dataset

In [None]:
bs = 100
# MNIST Dataset
train_dataset = datasets.MNIST(root='./mnist_data/', train=True, transform=transforms.ToTensor(), download=True)
test_dataset = datasets.MNIST(root='./mnist_data/', train=False, transform=transforms.ToTensor(), download=False)

# Data Loaders
train_loader = torch.utils.data.DataLoader(dataset=train_dataset, batch_size=bs, shuffle=True)
test_loader = torch.utils.data.DataLoader(dataset=test_dataset, batch_size=bs, shuffle=False)

# Section 1: Autoencoder

In [None]:
class Autoencoder(nn.Module):

  def __init__(
      self,
      input_dim: int = 784,
      hidden_dim: int = 256,
      device: str = "cuda"
  ):
    super(Autoencoder, self).__init__()
    self.device = device

    ######### Your code starts here #########
    # Encoder architecture:
    # 2 fully connected layers with ReLU activations

    # Decoder architecture:
    # 3 fully connected layers with ReLU activations

    # Choose the correct final layer output activation

    self.encoder =

    self.decoder =
    ######### Your code ends here #########

  def forward(self, x):
    latent = self.encoder(x)
    reconstruction = self.decoder(latent)
    return reconstruction

# Section 2: Variational Autoencoder

In [None]:
class VAE(nn.Module):

  def __init__(self, input_dim: int = 784, hidden_dim: int = 256, latent_dim: int = 2, device: str = "cuda"):
    super(VAE, self).__init__()
    self.latent_dim = latent_dim
    self.device = device

    ######### Your code starts here #########
    # Encoder architecture:
    # 2 fully connected layers with ReLU activations

    # Decoder architecture:
    # 3 fully connected layers with ReLU activations

    # Separate linear layers to predict the latent mean and logvar

    # Choose the correct final layer output activation

    self.encoder =

    self.decoder =

    self.mean =

    self.log_var =
    ######### Your code ends here #########

  def reparameterize(self, mu, log_var):
    std = torch.exp(0.5 * log_var)
    eps = torch.randn_like(std)
    return eps.mul(std).add_(mu)

  def sample(self, n):
    sample = torch.randn(n, self.latent_dim).to(self.device)
    return self.decoder(sample)

  def forward(self, x):
    ######### Your code starts here #########

    ######### Your code ends here #########
    return reconstruction, mu, log_var

In [7]:
# Define training variables, feel free to modify these for the problem
log_interval = 100
num_epochs = 10
image, cls = train_dataset[0]
input_dim = np.product(image.shape)
hidden_dim = 256
latent_dim = 2
batch_size = 100
num_examples = len(train_dataset)
device = "cuda" if torch.cuda.is_available() else "cpu"

def train_single_epoch(
    model_cls: str,
    model: nn.Module,
    loss_fn: Callable,
    data_loader: DataLoader,
    optimizer,
    epoch: int
):
  # set model to training mode
  model.train()
  train_loss = 0
  for batch_idx, (data, class_label) in enumerate(data_loader):
    data = data.to(device)
    class_label = class_label.to(device)

    optimizer.zero_grad()

    if model_cls == "ae":
      recon_batch = model(data)
      # compute loss
      loss = loss_fn(recon_batch, data)
    elif model_cls == "vae":
      recon_batch, mu, log_var = model(data)
      # compute loss
      loss = loss_fn(recon_batch, data, mu, log_var)
    elif model_cls == "cvae":
      recon_batch, mu, log_var = model(data, class_label)
      # compute loss
      loss = loss_fn(recon_batch, data, mu, log_var)
    else:
      raise NotImplementedError

    loss.backward()
    train_loss += loss.item()
    optimizer.step()

    if batch_idx % log_interval == 0:
      print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
            epoch, batch_idx * len(data), len(data_loader.dataset),
            100. * batch_idx / len(data_loader), loss.item() / len(data)))

  print('====> Epoch: {} Average loss: {:.4f}'.format(epoch, train_loss / len(data_loader.dataset)))

  # Plot reconstructions
  n = min(num_examples, 8)
  imgs = data[:n]
  reconstructed_imgs = recon_batch.view(batch_size,-1, 28, 28)[:n]

  comparisons = torch.cat([imgs, reconstructed_imgs])

  # sample some from the latent space, with class label 0
  if model_cls in ["vae", "cvae"]:
    latent_samples = model.sample(n=n)
    latent_samples = latent_samples.view(n, -1, 28, 28)
    comparisons = torch.cat([comparisons, latent_samples])

  comparisons = torchvision.utils.make_grid(comparisons)
  comparisons = comparisons.detach().cpu().numpy()

  print("Reconstructions: ")
  plt.imshow(comparisons.transpose(1,2,0))
  plt.axis('off')
  plt.show()

# Section 3: Train Autoencoder

In [None]:
# build model
ae = Autoencoder(
  input_dim=input_dim,
  hidden_dim=hidden_dim,
  device=device
)

# put model on device
if torch.cuda.is_available():
  ae.cuda()

# device optimizer
optimizer = optim.Adam(ae.parameters())

######### Your code starts here #########
# Define the loss function for a vanilla Autoencoder.
loss_fn =
######### Your code ends here #########

# train
for epoch in range(1, num_epochs):
  train_single_epoch(
    model_cls="ae",
    model=ae,
    loss_fn=loss_fn,
    data_loader=train_loader,
    optimizer=optimizer,
    epoch=epoch
  )

# Section 4: Train Variational Autoencoder

In [None]:
# build model
vae = VAE(
  input_dim=input_dim,
  hidden_dim=hidden_dim,
  latent_dim=latent_dim,
  device=device
)

# put model on device
if torch.cuda.is_available():
  vae.cuda()

# device optimizer
vae_optimizer = optim.Adam(vae.parameters())

######### Your code starts here #########
# Define the loss function for a vanilla Autoencoder.
vae_loss_fn =
######### Your code ends here #########

for epoch in range(1, num_epochs):
  train_single_epoch(
    model_cls="vae",
    model=vae,
    loss_fn=vae_loss_fn,
    data_loader=train_loader,
    optimizer=vae_optimizer,
    epoch=epoch
  )

# Bonus task: Conditional VAE

In [None]:
class ConditionalVAE(nn.Module):

  def __init__(
      self,
      input_dim: int,
      hidden_dim: int = 256,
      latent_dim: int= 2,
      embedding_dim: int = 64,
      num_classes: int = 10,
      device: str = "cuda"
  ):
    super(ConditionalVAE, self).__init__()

    self.latent_dim = latent_dim
    self.device = device

    ######### Your code starts here #########
    # Encoder architecture:
    # 2 fully connected layers with ReLU activations

    # Decoder architecture:
    # 3 fully connected layers with ReLU activations

    # Separate linear layers to predict the latent mean and logvar

    # Use an embedding layer to encode the class label.
    # See https://pytorch.org/docs/stable/generated/torch.nn.Embedding.html.

    # Choose the correct final layer output activation

    self.label_embed =

    self.encoder =

    self.decoder =

    self.mean =

    self.log_var =
    ######### Your code ends here #########

  def reparameterize(self, mu, log_var):
    std = torch.exp(0.5 * log_var)
    eps = torch.randn_like(std)
    return eps.mul(std).add_(mu)  # return z sample

  def sample(self, n, class_label=0):
    sample = torch.randn(n, self.latent_dim).to(self.device)
    class_label = torch.tensor(class_label).repeat(n).to(self.device)
    return self.decode(sample, class_label)

  def decode(self, x, class_label):
    # embed the class label
    label_embed = self.label_embed(class_label)
    decoder_input = torch.cat([x, label_embed], dim=-1)
    z = self.decoder(decoder_input)
    return z

  def encode(self, x, class_label):
    # embed the class label
    label_embed = self.label_embed(class_label)

    encoder_input = torch.cat([x.view(x.shape[0], -1), label_embed], dim=-1)
    encode_output = self.encoder(encoder_input)

    mu = self.mean(encode_output)
    log_var = self.log_var(encode_output)
    return mu, log_var

  def forward(self, x, class_label):
    mu, log_var = self.encode(x, class_label)
    z = self.reparameterize(mu, log_var)
    return self.decode(z, class_label), mu, log_var

# Bonus task: Train CVAE

In [None]:
# build model
cvae = ConditionalVAE(
  input_dim=input_dim,
  hidden_dim=hidden_dim,
  latent_dim=latent_dim,
  num_classes=10,
  device=device
)

# put model on device
if torch.cuda.is_available():
  cvae.cuda()

# device optimizer
cvae_optimizer = optim.Adam(cvae.parameters())

for epoch in range(1, num_epochs):
  train_single_epoch(
    model_cls="cvae",
    model=cvae,
    loss_fn=vae_loss_fn,
    data_loader=train_loader,
    optimizer=cvae_optimizer,
    epoch=epoch
  )