In [None]:
import numpy as np
import torch
from torch import nn
import matplotlib.pyplot as plt

# Autoencoder Implementation

In [None]:
class Autoencoder(nn.Module):
  def __init__(self, input_size, latent_size, p_drop = 0.2):
    super(Autoencoder, self).__init__()
    
    self.drop = nn.Dropout(p = p_drop)

    #Define Encoder Layers
    self.fc_e1 = nn.Linear(in_features = input_size, out_features = input_size//8)
    self.fc_e2 = nn.Linear(in_features = input_size//8, out_features = latent_size)

    #Define Decoder Layers
    self.fc_d1 = nn.Linear(in_features = latent_size, out_features = input_size//4)
    self.fc_d2 = nn.Linear(in_features = input_size//4, out_features = input_size)

  def forward(self, input):
    enc = self.drop(nn.Tanh()(self.fc_e1(input)))
    latent = self.fc_e2(enc)
    dec = self.drop(nn.Tanh()(self.fc_d1(latent)))
    output = self.fc_d2(dec)
    return output, latent

In [None]:
X = torch.rand(64, 256)
ae = Autoencoder(input_size= 256, latent_size = 16)

# Example: MNIST AutoEncoder

In [None]:
import torchvision
from math import floor, ceil
batch_size_train = 64 #Define train batch size
batch_size_test  = 256 #Define test batch size (can be larger than train batch size)


# Use the following code to load and normalize the dataset
train_set = torchvision.datasets.MNIST('/files/', train=True, download=True,
                              transform=torchvision.transforms.Compose([
                                torchvision.transforms.ToTensor(),
                                torchvision.transforms.Normalize(
                                  (0.1307,), (0.3081,))
                              ]))


test_set = torchvision.datasets.MNIST('/files/', train=False, download=True,
                             transform=torchvision.transforms.Compose([
                               torchvision.transforms.ToTensor(),
                               torchvision.transforms.Normalize(
                                 (0.1307,), (0.3081,))
                             ]))

In [None]:
train_set[0][0].shape

In [None]:
train_set, val_set = torch.utils.data.random_split(train_set, [floor(len(train_set)*0.8), ceil(len(train_set)*0.2)])
train_dataloader = torch.utils.data.DataLoader(train_set, batch_size = batch_size_train, shuffle = True)
val_dataloader  = torch.utils.data.DataLoader(val_set, batch_size = batch_size_test, shuffle = False)
test_dataloader  = torch.utils.data.DataLoader(test_set, batch_size = batch_size_test, shuffle = False)


In [None]:
max_epochs = 30
learning_rate = 0.001
input_size = 28*28
latent_size = 32
model = Autoencoder(input_size, latent_size)
optimizer = torch.optim.Adam(model.parameters(), lr = learning_rate)
criterion = torch.nn.MSELoss()

In [None]:
from tqdm import tqdm
train_losses = []
val_losses = []
for epoch in tqdm(range(max_epochs)):
  model.train()
  for i, sample in enumerate(train_dataloader):
    optimizer.zero_grad()
    data, label = sample
    data_in = data.view(data.shape[0], input_size)
    output = model(data_in)[0]
    loss = criterion(data_in, output)
    loss.backward()
    train_losses.append(loss.item())
    optimizer.step()
  model.eval()
  vl = 0
  with torch.no_grad():
    for sample in val_dataloader:
      data, label = sample
      data_in = data.view(data.shape[0], input_size)
      output = model(data_in)[0]
      loss = criterion(data_in, output)
      vl += loss.item()
  val_losses.append(vl)

In [None]:
plt.plot(val_losses)

In [None]:
inds = torch.randint(low = 0, size = (8,), high = len(test_set))
samples = 5
f, ax = plt.subplots(samples, 2, figsize = (3, 8))
for i, idx in enumerate(inds[:samples]):
  ax[i,1].imshow(model(test_set[idx][0].flatten().unsqueeze(0))[0].view(28, 28).detach())
  ax[i,0].imshow(test_set[idx][0][0])
f.tight_layout()

In [None]:
test_dataloader = torch.utils.data.DataLoader(test_set, 
                              batch_size = 10000, shuffle = False)
labels = torch.zeros((0,))
for sample in test_dataloader:
  data = sample[0]
  data_in = data.view(data.shape[0], -1)
  latents = model(data_in)[1]
  labels = torch.hstack([labels, sample[1]])
U, S, V = torch.pca_lowrank(latents)

In [None]:
plt.figure(figsize = (8,6))
for i in range(10):
  mask = labels == i
  Y = (U[mask]@torch.diag(S)).detach()
  plt.scatter(Y[:,0], Y[:,1], label = i, alpha = 0.4, s = 10)
  plt.legend()

In [None]:
full_test = torch.zeros((0, 784))
labels = torch.zeros((0,))
for sample in test_dataloader:
  full_test = torch.vstack([full_test, sample[0].view(-1, 784)])
  labels = torch.hstack([labels, sample[1]])
U, S, V = torch.svd(full_test)

In [None]:
for i in range(10):
  mask = labels.long() == i
  Y = (U[mask]@torch.diag(S)).detach()
  plt.scatter(Y[:,0], Y[:,1], label = i, alpha = 0.4, s = 7)
  plt.legend()

# Assignment: Enhanced Autoencoder

In [None]:
class EnhancedAutoencoder(nn.Module):
  def __init__(self, input_size, latent_size, p_drop = 0.2):
    super(EnhancedAutoencoder, self).__init__()
    
    self.drop = nn.Dropout(p = p_drop)

    #Define Encoder Layers


    #Define Decoder Layers


  def forward(self, input):
    #Define forward
    return output, latent

In [None]:
# If implementing VAE:
def kld_loss(mus, vars):
  # Define KLD loss with standard normal prior
  return loss

## Sample VAE Implementation (Change this if using for for Assignment)

In [None]:
class ExampleVAE(nn.Module):
  def __init__(self, input_size, latent_size, p_drop = 0.2):
    super(ExampleVAE, self).__init__()
    
    self.drop = nn.Dropout(p = p_drop)

    #Define Encoder Layers
    self.fc_e1 = nn.Linear(in_features = input_size, out_features = input_size//8)
    self.fc_e2_1 = nn.Linear(in_features = input_size//8, out_features = latent_size)
    self.fc_e2_2 = nn.Linear(in_features = input_size//8, out_features = latent_size)

    #Define Decoder Layers
    self.fc_d1 = nn.Linear(in_features = latent_size, out_features = input_size//4)
    self.fc_d2 = nn.Linear(in_features = input_size//4, out_features = input_size)

  def forward(self, input):
    enc = self.drop(nn.Tanh()(self.fc_e1(input)))
    mus = self.fc_e2_1(enc) #means of latent Gaussian
    var =self.fc_e2_2(enc)  #(log) var of latent Gaussian
    sample = self.reparameterize(mus, var)

    dec = self.drop(nn.Tanh()(self.fc_d1(sample)))
    output = self.fc_d2(dec)
    return output, (mus, var)

  def reparameterize(self, mu, logvar):
    '''
    Used to generate sample from latent Gaussian
    '''
    std = torch.exp(0.5 * logvar)
    eps = torch.randn_like(std)
    return eps * std + mu