# Guided Final Project



For your final project, you will be implementing a variational autoencoder (VAE) and applying it to neural data.



### Copying this Colab Notebook to your Google Drive

Since the course staff is the author of this notebook, you cannot make any lasting changes to it. You should make a copy of it to your Google Drive.

Alternatively, you may download a copy and work locally.

### Compute

You should not need to use the GPU for this problem. If you do choose to accelerate your training runs with the GPU, keep in mind that Google Colab will limit your GPU usage. You can change your runtime by going to **Runtime -> Change runtime type**.

If Google Colab restricts your access to a free GPU, you will regain access to the GPU after an indeterminate amount of time (anecdotally, anywhere from a few hours to a day). In the meantime, you can switch your runtime back to CPU.

### Code: Setup

We start by importing some packages that we'll need to complete this problem and some basic scaffolding.

In [45]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import torch.distributions as dist

import numpy as np
import matplotlib.pyplot as plt

In [2]:
def get_device():
    """
    Helper function to set the device. CUDA if available, else MPS if available (Apple Silicon,
    if you are working locally on a newer Apple device), CPU otherwise.

    Args:
        None.

    Returns:
        Device string ("cuda", "mps" or "cpu").
    """
    if torch.backends.cuda.is_built() and torch.cuda.is_available():
        device = "cuda"
        print("CUDA GPU enabled.")
    elif torch.backends.mps.is_built() and torch.backends.mps.is_available():
        device = "mps"
        print("Apple Silicon GPU enabled.")
    else:
        device = "cpu"
        print("No GPU found. Running on CPU.")

    return device

We will be using the neural data from HW2, Question 4, where we consider the delayed reach task that we've discussed in depth now. To load the data, first download the ```data.zip``` file from Canvas (available under the "Final Project" module) and place into the "Files" tab on the left hand side of this Colab notebook. Then, run the following cell to load in the data.

*Note: Once your Colab runtime has disconnected (when you close out of the notebook or change your runtime type), you will have to reload the ```data.zip``` file into your new runtime. To avoid this, you can save the ```data.zip``` file to your Google Drive and mount your Google Drive to your Colab notebook, using the following two lines:*
```
from google.colab import drive
drive.mount('/content/drive')
```
*If you choose to mount your Google Drive, update the following cells and data paths to make sure you are loading in the correct files.*

In [3]:
train_data_loaded = np.load("data/ps2a_train.npy").astype("float32")
test_data_loaded = np.load("data/ps2a_test.npy").astype("float32")
num_class, num_train_trials, num_features = train_data_loaded.shape
_, num_test_trials, _ = test_data_loaded.shape

# reshape raw data into the format that we want
train_data = train_data_loaded.reshape((-1, num_features))
test_data = test_data_loaded.reshape((-1, num_features))
# include labels for each class
train_labels = np.repeat(np.arange(8), num_train_trials)
test_labels = np.repeat(np.arange(8), num_test_trials)

num_observations, _ = train_data.shape



Now, we define a simple dataset class to format the data to utilize the PyTorch library:

In [4]:
class SimpleDataset(Dataset):
    def __init__(self, data_bxd, labels, device='cpu'):
        super().__init__()
        self.device = device
        self.data_bxd = data_bxd
        self.labels = labels

    def __len__(self):
        return self.data_bxd.shape[0]

    def __getitem__(self, idx):
        return torch.tensor(self.data_bxd[idx], device=self.device), torch.tensor(self.labels[idx])

## 1. Building a VAE

Now that you have your notebook and data set up, complete the following cells to implement your VAE.

### 1a: The Encoder/Decoder Model

First, implement a basic multilayer perceptron model that can be used for the encoder and decoder of your VAE.

You can implement any basic MLP, but as a starting point, we recommend one with two linear layers with a ReLU activation function applied between the two layers.

In [39]:
class MLP(nn.Module):
    def __init__(self,
        in_size,
        out_size,
        hidden_size,
        act=nn.ReLU,
        device='cpu'):
        """
        Multi-layer Perceptron.

        Args:
            in_size (positive integer): dimensionality of the MLP inputs
            out_size (positive integer): dimensionality of the MLP outputs
            hidden_size (list): list of sizes for each hidden layer.
                The length of this list corresponds to the number of hidden
                layers in the MLP.
            act (PyTorch function): activation function.
            device (string): 'cpu', 'mps', or 'cuda', indicating the hardware
                device on which to place all associated parameters and
                variables.

        Returns:
            None.
        """
        super().__init__()

        net = []

        prev_dim = torch.tensor(in_size)

        for hidden in hidden_size:
            layer = nn.Linear(prev_dim, hidden)
            net.append(layer)
            net.append(act())
            prev_dim = hidden
        
        layer = nn.Linear(prev_dim, out_size)

        net.append(layer)
        self.net = nn.Sequential(*net)

    def forward(self, x):
        return self.net(x)

    def predict(self, x):
        output = self.forward(x)
        return torch.argmax(output, dim = 1)


### 1b: Implementing a VAE class

Here, we provide the basic scaffolding for a VAE model. To complete the implementation, fill in each section with ```"""YOUR CODE HERE"""```.

In [40]:
class VAE(nn.Module):
    def __init__(self, in_size,
        enc_size=[32,],
        hidden_size=4,
        dec_size=[32,],
        act=nn.ReLU,
        kl_beta=1.0,
        prior_mu=0.0,
        prior_sigma=1.0,
        device='cpu'):
        """
        Variational Autoencoder.

        Args:
            in_size (positive integer): dimensionality of the input data. For an
                autoencoder, this also corresponds to the dimensionality of the
                outputs.
            enc_size (list): list of sizes for each hidden layer of the encoder
                MLP.

            hidden_size (positive integer): dimensionlity of the VAE's latent
                variables.

            dec_size (list): list of sizes for each hidden layer of the decoder
                MLP.
            act (Pytorch function): activation function.
            device (string): 'cpu', 'mps', or 'cuda', indicating the hardware
                device on which to place all associated parameters and
                variables.

        Returns:
            None.
        """
        super().__init__()

        self.in_size = in_size
        self.enc_size = enc_size
        self.hidden_size = hidden_size
        self.dec_size = dec_size
        self.act = act
        self.kl_beta = kl_beta
        self.device = device

        # ----- YOUR CODE HERE ----- #
        
        self.prior = dist.Normal(loc = prior_mu, scale= prior_sigma)

        # ----- YOUR CODE HERE ----- #
        self.encoder = MLP(in_size= in_size, out_size= hidden_size * 2, hidden_size= enc_size, act = act, device = device)


        # ----- YOUR CODE HERE ----- #
        self.decoder = MLP(in_size= in_size, out_size= hidden_size * 2, hidden_size= dec_size, act = act, device = device)

    # input dimension to latent dimension- encoder
    # decoder = gets out the output size 

    def encode(self, data_bxd):
        """
        Encodes the input data into the latent space.

        Parameters:
            data_bxd (torch.Tensor): Input data to be encoded. Should be of shape
                (batch_size, num_features).

        Returns:
            tuple: A tuple (mu_bxd, sigma_bxd) containing the means and standard
                deviations of the approximate posterior distribution over the
                latent space. Both tensors should be of shape
                (batch_size, hidden_size).
        """
        # ----- YOUR CODE HERE ----- #
        
        params = self.encoder(data_bxd)
        dist_size = 2
        mu, sigma = torch.split(params, dist_size, dim = 1)
        log_sigma = sigma.exp()

        return mu, log_sigma
    
        # return NotImplemented

    def decode(self, latents_bxd):
        """
        Decodes the latent variables into the output space.

        Parameters:
            latents_bxd (torch.Tensor): Latent variables to be decoded. Should be of shape
                (batch_size, hidden_size).

        Returns:
            tuple: A tuple containing the means and standard deviations of the
                decoded distribution over the output space. Both tensors should be of shape
                (batch_size, num_features).
        """
        # ----- YOUR CODE HERE ----- #

        params = self.decoder(latents_bxd)
        dist_size = 2
        mu, sigma = torch.split(params, dist_size, dim = 1)
        log_sigma = sigma.exp()

        return mu, log_sigma

    def forward(self, data_bxd):
        """
        Performs a forward pass through the VAE model.

        Parameters:
            data_bxd (torch.Tensor): Input data. Should be of shape (batch_size, num_features).

        Returns:
            dict: A dictionary containing the output tensors for the mean and
                standard deviation of the approximate posterior distribution, mean and
                standard deviation of the decoded distribution, log likelihood, KL divergence,
                and the total loss.
        """
        # ----- YOUR CODE HERE ----- #

        # Generate parameters of the approximate posterior q(z|x) using the encode() function
        # Build approximate posterior distribution from parameters (hint: look at torch.distributions)
        # Randomly sample from approximate posterior

        mu_bxd, logvar_bxd = self.encode(data_bxd)

        std_bxd = torch.exp(0.5 * logvar_bxd)

        q_z = dist.Independent(dist.Normal(mu_bxd, std_bxd), 1)

        z_bxd = q_z.rsample()

        # Decode samples
        recon_mu_bxd, recon_logvar_bxd = self.decode(z_bxd)

        recon_std_bxd = torch.exp(0.5 * recon_logvar_bxd)

        log_likelihood = self.log_like(recon_mu_bxd, recon_std_bxd, data_bxd)
        
        kl_div = self.kl_divergence(q_z)

        # Compute loss
        total_loss = -torch.mean(log_likelihood - self.kl_beta * kl_div)

        return {
            'mu': mu_bxd,
            'logvar': logvar_bxd,
            'recon_mu': recon_mu_bxd,
            'recon_logvar': recon_logvar_bxd,
            'log_likelihood': log_likelihood,
            'kl_divergence': kl_div,
            'loss': total_loss
        }

    def log_like(self, mu_bxd, sigma_bxd, data_bxd):

      """
      Computes the log likelihood of the input data given the latent variables.

      Parameters:
          mu_bxd (torch.Tensor): Mean of the decoded distribution for each data point. Should be of shape
              (batch_size, num_features).
          sigma_bxd (torch.Tensor): Standard deviation of the decoded distribution for each data point.
              Should be of shape (batch_size, num_features).
          data_bxd (torch.Tensor): Input data points. Should be of shape (batch_size, num_features).

      Returns:
          torch.Tensor: The log likelihood of each data point. Should be of shape (batch_size,1).
      """

      p = dist.Independent(dist.Normal(loc = mu_bxd, scale = sigma_bxd), 1)
      log_likelihood = p.log_prob(data_bxd).sum(dim = 1).mean()

      return log_likelihood

    def kl_divergence(self, approx_posterior):

      """
      Computes the KL divergence between the approximate posterior distribution
      (given by 'approx_posterior') and the prior distribution defined during
      VAE initialization.

      Parameters:
          approx_posterior (torch.distributions.Distribution): The approximate
              posterior distribution q(z|x), typically obtained from the encoder
              of the VAE.

      Returns:
          torch.Tensor: KL divergence for each sample in the batch.
      """

      prior = self.prior    
      kl_div = dist.kl_divergence(approx_posterior, prior).sum(dim = 1).mean()

      return kl_div

### 1c: Implement the Main Training Loop for your VAE

Implement a function which iterates through a dataset and trains the model over a specified number of epochs. Clip the gradient norm to value of 1, like was done for HW4.

In [41]:
def train(model, dataloader, optimizer,
          num_epochs=100,
          disp_every=10,
          max_norm=1.,
          ):
    """
    Trains the Variational Autoencoder (VAE) model.
    
# Poisson log liklihood 

    Parameters:
        model (nn.Module): The VAE model to be trained.
        dataloader (DataLoader): DataLoader object containing the training data.
        optimizer (torch.optim.Optimizer): Optimizer to be used for training.
        num_epochs (int): Number of training epochs (default is 1).
        disp_every (int): Interval for displaying training progress (default is 100).
        max_norm (float): Maximum norm value for gradient clipping (default is 1.0).

    Returns:
        dict: A dictionary containing training metrics including losses, log likelihood (LL),
            KL divergence (KL), and gradient norms.
            # Per Epoch (should be a list)
    """
    loss_fn = nn.MSELoss()
    losses = []
    log_likelihood = []
    KL = []
    grad_norm = []

    for epoch in range(num_epochs):
        sum_losses = 0
        sum_grad_mags = 0
        sum_KL = 0
        sum_LL = 0 

        for batch in enumerate(dataloader):
            input, target = batch
            optimizer.zero_grad()
            output, _ = model(input)
            loss = loss_fn(output, target)
            sum_losses += loss
            loss.backward()

            nn.utils.clip_grad_norm_(model.parameters(), max_norm)

            sum_grad_mags += np.mean([p.grad.norm() for p in model.parameters()])
            optimizer.step()

        loss_fn.zero_grad()
        avg_loss = (sum_losses / len(dataloader)).item()
        losses.append(avg_loss)

        avg_grad = (sum_grad_mags / len(dataloader)).item()
        grad_norm.append(avg_grad)

        if epoch % disp_every == 0:
            print('Epoch: ', epoch)
            print('Average Loss: ', avg_loss)
            print('Average Gradient Norm: ', avg_grad)
            # print('Log Likelihood: ', avg_ll)
            # print('KL Divergence: ', avg_kl)

### 1d: Putting it all together

First, create an instance of a VAE using your class. Then create an optimizer (use Adam) and pass the parameters of the model as an argument. Finally, create a dataset using the provided dataset class and data. Wrap this dataset in a `DataLoader`, which will handle batching of the data.

The specific architecture details of the VAE are up to you. A good starting point would be encoder and decoder sizes (```enc_size```, ```dec_size```) of [32,], a hidden size of 4, batch size of 32, and learning rate of 1e-3. *Note: You should **not** modify ```kl_beta``` = 1.0.*

In [42]:
# ----- YOUR CODE HERE ----- #

model = VAE(in_size = 32)

optimizer = optim.Adam(model.parameters(), lr = 1e-3)

loss_fn = nn.MSELoss()

dataset = SimpleDataset(train_data, train_labels)
loader = DataLoader(dataset, batch_size = 32)

### 1e: Train the VAE

Train your model using your training loop. We suggest starting with ```num_epochs```=100.

In [43]:
# ----- YOUR CODE HERE ----- #
train(model= model, dataloader= loader, optimizer= optimizer)



TypeError: linear(): argument 'input' (position 1) must be Tensor, not int

### 1f: Generate Plots

Plot the loss, log likelihood, KL divergence, and gradient norm throughout training. Create four plots with "Epoch" on the x-axis and  "Loss", "LL", "KL", and "Gradient Norm" on the y-axes, respectively. For the loss, use a log scale for the y-axis.

In [None]:
# ----- YOUR CODE HERE ----- #


### 1g: Visualizing VAE encodings

Visualize the encoded datapoints. Use the trained VAE's ```encode()``` function to get the means of the approximate posteriors.  If your VAE has a latent dimensionality (```hidden_size```) greater than 2 (we recommended 4 above), apply PCA to further reduce the dimensionality down to 2. Visualize these 2-dimensional embeddings on a scatter plot. Color each dot appropriately according to reaching angle (there should be a total of 728 dots).

In [None]:
# ----- YOUR CODE HERE ----- #


## 2. Building a Simple Classifier

Now, we would like to build a simple classifier to compare how it performs when trained on the raw data versus when trained on the low-dimensional embeddings encoded by your trained VAE. While it's typical for VAE model encoders to output both the mean and log standard deviation of the approximate posterior distribution, it is common to use the mean only as the encoded features for downstream tasks. We will only use the mean features to train the simple classifier.

Define a SimpleClassifier class (PyTorch ```nn.Module```) that is a (relatively) small neural network. One starting point would be two linear layers with a ReLU activation applied between the layers. Then, complete the training loop and evaluation code.

In [24]:
# ----- YOUR CODE HERE ----- #

class SimpleClassifier(nn.Module):
    def __init__(self,
        in_size,
        out_size,
        hidden_size,
        act=nn.ReLU,
        device='cpu'):
        
        super().__init__()

        net = []

        prev_dim = in_size
        for hidden in hidden_size:
            layer = nn.Linear(prev_dim, hidden)
            net.append(layer)
            net.append(act())
            prev_dim = hidden
        
        layer = nn.Linear(prev_dim, out_size)
        net.append(layer)
        self.net = nn.Sequential(net)

    def forward(self, x):
        return self.net(x)

    def predict(self, x):
        output = self.forward(x)
        return torch.argmax(output, dim = 1)

Implement a training loop for the classifier.

In [None]:
def train_classifier(model, dataloader, criterion, optimizer,
          num_epochs=100,
          disp_every=10):
    """
    Trains the classifier model.

    Parameters:
        model (nn.Module): The classifier model to be trained.
        dataloader (DataLoader): DataLoader object containing the training data.
        criterion (torch.nn.modules.loss._Loss): Loss function to be used.
        optimizer (torch.optim.Optimizer): Optimizer to be used for training.
        num_epochs (int): Number of training epochs (default is 1).
        disp_every (int): Interval for displaying training progress (default is 100).

    Returns:
        list: A list containing the training losses for each epoch.
    """
    losses = []
    # ----- YOUR CODE HERE ----- #


    return losses

In [None]:
def evaluate_classifier(model, dataloader):
    """
    Evaluates the classifier model on the test data.

    Parameters:
        model (nn.Module): The trained classifier model to be evaluated.
        dataloader (DataLoader): DataLoader object containing the test data.
    """
    # ----- YOUR CODE HERE ----- #


## 3. Comparing Training Data

### 3a: Dataset preparation

First, we will need to set up the datasets and DataLoaders for the comparison between the raw data and encoded embeddings. You can reuse the provided ```SimpleDataset``` class for PyTorch Datasets.

To get the encoded embeddings, use the ```encode()``` function of your trained VAE. As a reminder, we will be using the mean only as the encoded features.

In [None]:
# Raw data datasets and DataLoaders
# ----- YOUR CODE HERE ----- #


In [None]:
# Encoded data datasets and DataLoaders
# ----- YOUR CODE HERE ----- #


### 3b: Train a SimpleClassifier model using the raw data

Using the defined model from (2) and the datasets/DataLoaders created above, train an instance of the simple classifier using the raw data.

We recommend using cross entropy loss and the Adam optimizer, training for 1000 epochs. However, you may change any of the hyperparameters.

In [None]:
# ----- YOUR CODE HERE ----- #


Plot the training loss curve with epochs on the x-axis and loss on the y-axis.


In [None]:
# ----- YOUR CODE HERE ----- #


### 3c: Train a SimpleClassifier model using the encoded data

Uisng the defined model from (2) and the datasets/DataLoaders created above, train an instance of the simple classifier using the encoded data.

We recommend using cross entropy loss and the Adam optimizer, training for 100 epochs. However, you may change any of the hyperparameters.


In [None]:
# ----- YOUR CODE HERE ----- #


Plot the training loss curve with epochs on the x-axis and loss on the y-axis.

In [None]:
# ----- YOUR CODE HERE ----- #


### 3d: Evaluate and compare classifiers

Using your defined evaluation code, evaluate the two trained classifiers.

1. How many parameters do each of your trained models include?
2. Which of the models perform better? Why might this be the case?



In [None]:
# ----- YOUR CODE HERE ----- #


Your answers to the written questions here:

1.

2.

## 4. Compare with a classifer from HW2

Now, try applying one of the classifiers you implemented from HW2, Question 4, to the encoded data.

1. How does training the classifier on top of the VAE compare to training it on just the raw data?

You may choose any of the classifier(s) from HW2, and may change the VAE hyperparameters above. Please list out any important hyperparameters or training details when comparing the classifier trained on top of the VAE to the classifier trained on just the raw data.

In [None]:
# ----- YOUR CODE HERE ----- #


Your answers to the written question here:

1.