<a href="https://colab.research.google.com/github/prasadshreyas/CS7290-Causal-ML-PyTorch/blob/main/TMNIST_MNIST_demo_markdown.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Importing the libraries
```python

import torch
import pandas as pd
import numpy as np
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
import os
from torchvision.transforms import ToTensor, Lambda

from pyro.contrib.examples.util import MNIST
import torch.nn as nn
import torchvision.transforms as transforms

import pyro
import pyro.distributions as dist
import pyro.contrib.examples.util  
from pyro.infer import SVI, Trace_ELBO
from pyro.optim import Adam
```


```python
assert pyro.__version__.startswith('1.8.1') # make sure we have Pyro 1.8.1
pyro.distributions.enable_validation(False) 
pyro.set_rng_seed(0) # make results reproducible
# Enable smoke test - run the notebook cells on CI.
smoke_test = 'CI' in os.environ 

```

A custom dataloader for T-MNIST Dataset. [link](https://raw.githubusercontent.com/prasadshreyas/CS7290-Causal-ML-PyTorch/main/data/TMNIST/TMNIST_Data.csv)



```python

class TMNIST_MNIST_Dataset(Dataset):
    '''
    Dataset containing both MNIST and T-MNIST data.

    Input: 'path/to/csv/file' (Required)
    Output: torch.utils.data.Dataset
    
    csv file should have the following columns:
    - label: 0-9
    - image: flattened image (28x28)
    - is_handwritten: 0 or 1
    '''



    def __init__(self, csv_file, root_dir='', transform=None, target_transform = Lambda(lambda y: torch.zeros(
    10, dtype=torch.float).scatter_(dim=0, index=torch.tensor(y), value=1))):
        '''
        Args: csv_file (string): Path to the csv file with annotations.    
        '''
        self.tmnist_frame = pd.read_csv(csv_file) # read the csv file
        self.transform = transform # transform the data
        self.root_dir = root_dir 
        self.target_transform = target_transform # transform the target into one-hot vector of size 10
    
    def __len__(self):
        return len(self.tmnist_frame) # return the length of the dataset
    
    def __getitem__(self, idx):
        if torch.is_tensor(idx):
            idx = idx.tolist() # if the index is tensor, convert it to list
        
        digits = self.tmnist_frame.iloc[idx, 2:-1] # get the digits from the csv file
        digits = np.array([digits])/255 # normalize the digits
        digits = digits.reshape(28,28) # reshape the digits to 28x28 as the original MNIST dataset does
        digits = digits.astype('float32') 
        
        label = self.tmnist_frame.iloc[idx , 1]
        label = np.array([label]) # get the label from the csv file
        label = label.astype('int')


        # is_hand is a boolean variable which is True if the digit is hand written and False if it is not.
        is_handwritten = self.tmnist_frame.iloc[idx , -1] # get the is_handwritten from the csv file
        is_handwritten = np.array([is_handwritten]) # convert the is_handwritten to numpy array
        is_handwritten = is_handwritten.astype('int') # convert the is_handwritten to int
        
        

        if self.transform:
            digits = self.transform(digits)
        return digits, int(label.squeeze()), int(is_hand.squeeze()), int(is_handwritten.squeeze())  # return the digits, label and the handwritten status

```

```python

# for loading and batching TMNIST-MNIST dataset
def setup_data_loaders(batch_size=64, use_cuda=False): 
    root = './data'
    download = True
    trans = transforms.ToTensor()
    
    tmnist_dataset = TMNISTDataset("https://raw.githubusercontent.com/prasadshreyas/CS7290-Causal-ML-PyTorch/main/data/TMNIST/TMNIST_Data.csv", transform= transforms.ToTensor() )
    n = len(tmnist_dataset) # number of samples in the dataset

    # Split the dataset into train and test
    train_length = int(n*0.7) 
    test_length = n - train_length
    train_set,test_set = torch.utils.data.random_split( dataset = tmnist_dataset, lengths = [train_length,test_length], generator=torch.Generator().manual_seed(42))
    

    # Create data loaders for train and test
    kwargs = {'num_workers': 1, 'pin_memory': use_cuda}
    train_loader = torch.utils.data.DataLoader(dataset=train_set,
        batch_size=batch_size, shuffle=True, **kwargs)
    test_loader = torch.utils.data.DataLoader(dataset=test_set,
        batch_size=batch_size, shuffle=False, **kwargs)
    return train_loader, test_loader

```

```python
class VAE(nn.Module):
    # by default our latent space is 50-dimensional
    # and we use 400 hidden units
    def __init__(self, z_dim=50, hidden_dim=400, use_cuda=False):
        super().__init__()
        # create the encoder and decoder networks
        self.encoder = Encoder(z_dim, hidden_dim) 
        self.decoder = Decoder(z_dim, hidden_dim)

        if use_cuda:
            # calling cuda() here will put all the parameters of
            # the encoder and decoder networks into gpu memory
            self.cuda()
        self.use_cuda = use_cuda
        self.z_dim = z_dim

# define the model p(x|z)p(z) and p(x|h)p(h) for handwritten digits
def model(self, x):
    # register PyTorch module `decoder` with Pyro
    pyro.module("decoder", self.decoder)
    with pyro.plate("data", x.shape[0]):
        # setup hyperparameters for prior p(z)
        z_loc = x.new_zeros(torch.Size((x.shape[0], self.z_dim)))
        z_scale = x.new_ones(torch.Size((x.shape[0], self.z_dim)))
        # sample from prior (value will be sampled by guide when computing the ELBO)
        z = pyro.sample("latent", dist.Normal(z_loc, z_scale).to_event(1))

        h_loc = x.new_zeros(torch.Size((x.shape[0], self.z_dim)))
        h_scale = x.new_ones(torch.Size((x.shape[0], self.z_dim)))
        # sample from prior (value will be sampled by guide when computing the ELBO)
        h = pyro.sample("hidden", dist.Normal(h_loc, h_scale).to_event(1))
        # decode the latent code z
        loc_img = self.decoder(z)
        
        # score against actual images
        pyro.sample("obs", dist.Bernoulli(loc_img).to_event(1), obs=x.reshape(-1, 784))
        
        # decode the latent code h
        loc_img = self.decoder(h)
        # score against actual images
        pyro.sample("obs", dist.Bernoulli(loc_img).to_event(1), obs=x.reshape(-1, 784))

   
    # define the guide (i.e. variational distribution) q(z|x) and q(h|x) for handwritten digits
    def guide(self, x):
    # register PyTorch module `encoder` with Pyro
        pyro.module("encoder", self.encoder)
        with pyro.plate("data", x.shape[0]):
        
            # use the encoder to get the parameters used to define q(z|x)
            z_loc, z_scale = self.encoder(x)
            # sample the latent code z
            pyro.sample("latent", dist.Normal(z_loc, z_scale).to_event(1))
            # use the encoder to get the parameters used to define q(h|x)
            h_loc, h_scale = self.encoder(x)
            # sample the latent code h
            pyro.sample("hidden", dist.Normal(h_loc, h_scale).to_event(1))

    # define a helper function for reconstructing images
    def reconstruct_img(self, x):
        # encode image x
        z_loc, z_scale = self.encoder(x)
        # sample in latent space
        z = dist.Normal(z_loc, z_scale).sample()
        # decode the image (note we don't sample in image space)
        loc_img = self.decoder(z)
        return loc_img
```

```python

def train(svi, train_loader, use_cuda=False):
    # initialize loss accumulator
    epoch_loss = 0.
    # do a training epoch over each mini-batch x returned
    # by the data loader
    for x, label, hand in train_loader:
        # if on GPU put mini-batch into CUDA memory
        if use_cuda:
            x = x.cuda()
        # do ELBO gradient and accumulate loss
        epoch_loss += svi.step(x)

    # return epoch loss
    normalizer_train = len(train_loader.dataset)
    total_epoch_loss_train = epoch_loss / normalizer_train
    return total_epoch_loss_train

def evaluate(svi, test_loader, use_cuda=False):
    # initialize loss accumulator
    test_loss = 0.
    # compute the loss over the entire test set
    for x, label, hand  in test_loader:
        # if on GPU put mini-batch into CUDA memory
        if use_cuda:
            x = x.cuda()
        # compute ELBO estimate and accumulate loss
        test_loss += svi.evaluate_loss(x)
    normalizer_test = len(test_loader.dataset)
    total_epoch_loss_test = test_loss / normalizer_test
    return total_epoch_loss_test

```

```python

# Run options
LEARNING_RATE = 1.0e-3
USE_CUDA = False

# Run only for a single iteration for testing
NUM_EPOCHS = 1 if smoke_test else 1
TEST_FREQUENCY = 5

```

```python

train_loader, test_loader = setup_data_loaders(batch_size=256, use_cuda=USE_CUDA)

```

```python

# clear param store
pyro.clear_param_store()

# setup the VAE
vae = VAE(use_cuda=USE_CUDA)

# setup the optimizer
adam_args = {"lr": LEARNING_RATE}
optimizer = Adam(adam_args)

# setup the inference algorithm
svi = SVI(vae.model, vae.guide, optimizer, loss=Trace_ELBO())
```

```python
train_elbo = []
test_elbo = []
# training loop
for epoch in range(NUM_EPOCHS):
    total_epoch_loss_train = train(svi, train_loader, use_cuda=USE_CUDA)
    train_elbo.append(-total_epoch_loss_train)
    print("[epoch %03d]  average training loss: %.4f" % (epoch, total_epoch_loss_train))

    if epoch % TEST_FREQUENCY == 0:
        # report test diagnostics
        total_epoch_loss_test = evaluate(svi, test_loader, use_cuda=USE_CUDA)
        test_elbo.append(-total_epoch_loss_test)
        print("[epoch %03d] average test loss: %.4f" % (epoch, total_epoch_loss_test))

```