In [4]:
import matplotlib.pyplot as plt
import numpy as np
import tqdm

In [5]:
from torchvision import datasets #to load datasets
from torchvision.transforms import ToTensor #to make useful for torch

In [6]:
import torch #for various utility functions

import torch.nn as nn #for model library
import torch.nn.functional as F #for additional optimised routines

from torch.utils.data import DataLoader #for an easy dataloader


In [7]:
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA

In [8]:
train_dataset = datasets.MNIST(root = 'datasets', 
                               train = True, 
                               transform = ToTensor(),
                               download = True) 

test_dataset = datasets.MNIST(root = 'datasets', 
                              train = False, 
                              transform = ToTensor(),
                              download = True) 


In [9]:
batch_size = 128
loaders = {
    'train' : torch.utils.data.DataLoader(train_dataset, 
                                          batch_size=batch_size, 
                                          shuffle=True, 
                                          num_workers=2),
    
    'test'  : torch.utils.data.DataLoader(test_dataset,                                         
                                          shuffle=True, 
                                          num_workers=2),
}


In [13]:
class VariationalEncoder(nn.Module):
    def __init__(self, input_size, layers, latent_dims):
        super().__init__()
        self.layer0 = nn.Linear(input_size, layers[0])
        self.layer1 = nn.Linear(layers[0], layers[1])
        self.layer2 = nn.Linear(layers[1], latent_dims*2)
                
    def forward(self, X): 
        X = torch.flatten(X, start_dim=1)
        X = F.relu(self.layer0(X))
        X = F.relu(self.layer1(X))
        X = self.layer2(X).view(-1, 2, latent_dims)
        
        mu = X[:, 0, :] # the first feature values as mean
        logvar = X[:, 1, :] # the other feature values as log variance
        return mu, logvar
        

In [14]:
class Decoder(nn.Module):
    def __init__(self, latent_dims, layers, output_size):
        super().__init__()
        self.layer0 = nn.Linear(latent_dims, layers[1])
        self.layer1 = nn.Linear(layers[1], layers[0])
        self.layer2 = nn.Linear(layers[0], output_size)
        
    def forward(self, z): 
        z = F.relu(self.layer0(z))
        z = F.relu(self.layer1(z))
        return torch.sigmoid(self.layer2(z))

In [15]:
class VariationalAutoEncoder(nn.Module):
    def __init__(self, input_size, layers, latent_dims):
        super().__init__()
        self.encoder = VariationalEncoder(input_size, layers, latent_dims)
        self.decoder = Decoder(latent_dims, layers, input_size)

    def reparametrize(self, mu, logvar):
        # Reparametrization trick allows gradients from  stochastic part of the model
        # to backpropagate from the
        if self.training:
            sigma = torch.exp(0.5*logvar)
            z = torch.randn_like(sigma)
            return z.mul(sigma).add_(mu)
        else:
            return mu
        
    def forward(self, x):
        mu, logvar = self.encoder(x)
        z = self.reparametrize(mu, logvar)
        return self.decoder(z), mu, logvar

In [17]:
class VAE(nn.Module):
    def __init__(self):
        super(VAE, self).__init__()

        self.fc1 = nn.Linear(784, 400)
        self.fc21 = nn.Linear(400, 20)
        self.fc22 = nn.Linear(400, 20)
        self.fc3 = nn.Linear(20, 400)
        self.fc4 = nn.Linear(400, 784)

    def encode(self, x):
        h1 = F.relu(self.fc1(x))
        return self.fc21(h1), self.fc22(h1)

    def reparameterize(self, mu, logvar):
        if self.training:
            std = torch.exp(0.5*logvar)
            eps = torch.randn_like(std)
            return eps.mul(std).add_(mu)
        else:
            return mu

    def decode(self, z):
        h3 = F.relu(self.fc3(z))
        return torch.sigmoid(self.fc4(h3))

    def forward(self, x):
        mu, logvar = self.encode(x.view(-1, 784))
        z = self.reparameterize(mu, logvar)
        return self.decode(z), mu, logvar



In [18]:
input_size, latent_dims = 28*28, 2
layers = [512, 256]

model = VariationalAutoEncoder(input_size, layers, latent_dims)

criterion = nn.MSELoss(reduction='sum')
optimizer = torch.optim.Adam(model.parameters(), lr= 0.001)

In [19]:
def train(model, iterator, criterion, optimizer):
    #initial train step
    model.train()
    
    #zero the loss and accuracy
    epoch_loss = 0
    
    for x,_ in tqdm.tqdm(iterator, desc="Evaluating", leave=False):
        #reset weights in optimizer
        optimizer.zero_grad()

        #predict using the model
        xbar, mu, logvar  = model(x)
        
        #reshape to images
        xbar = xbar.reshape((-1, 1, 28, 28))
        
        #calculate the loss        
        kl_loss = (-0.5*(1+logvar - mu**2 -
                 torch.exp(logvar)).sum(dim=1)).mean(dim=0)
    
        loss = criterion(xbar, x) + kl_loss

        #propagate the loss backwards
        loss.backward()
        
        #update the weights
        optimizer.step()
        
        #update the loss
        epoch_loss += loss.item()

    print(f"training loss: {epoch_loss /len(iterator):3.7}")        
        
        

In [None]:
epochs = 10

for e in range(1,epochs):
    print(f"Epoch: {e}")
    train(model, loaders['train'], criterion, optimizer)
    

Evaluating:   0%|          | 0/469 [00:00<?, ?it/s]

Epoch: 1


Evaluating:   0%|          | 0/469 [00:00<?, ?it/s]          

training loss: 5537.852
Epoch: 2


Evaluating:   0%|          | 0/469 [00:00<?, ?it/s]          

training loss: 4399.939
Epoch: 3


Evaluating:   0%|          | 0/469 [00:00<?, ?it/s]          

training loss: 4163.838
Epoch: 4


Evaluating:  61%|██████    | 285/469 [00:10<00:04, 41.04it/s]

In [None]:
@torch.no_grad()
def evaluate(model, iterator, criterion, optimizer):
    model.eval()
    
    #zero the loss
    epoch_loss = 0
    
    for x,label in tqdm.tqdm(iterator, desc="Evaluating", leave=False):
        #predict using the model
        xbar, mu, logvar = model(x)
        
        #reshape to images
        xbar = xbar.reshape((-1, 1, 28, 28))
 
        #calculate the loss
        kl_loss = -0.5 * torch.sum(1 + logvar - mu.pow(2) - logvar.exp())
        loss = criterion(xbar, x) + kl_loss

        #update the loss
        epoch_loss += loss.item()
    
    print(f"test loss: {epoch_loss /len(iterator):3.7}")        
        
        

In [None]:
evaluate(model, loaders['test'], criterion, optimizer)
    

In [None]:
# creating a file
with open('VAE_MNIST.p', 'wb') as f:
    torch.save(model, f)

In [None]:
# model(x[0])[-1]

In [None]:
xbar = model(x[0])[0].detach().numpy()
xbar = xbar.reshape((-1, 1, 28, 28))
fig, ax = plt.subplots(2,4)
for n in range(4):
    ax[0][n].imshow(x[0][n][0])
    ax[1][n].imshow(xbar[n][0])
plt.show()

In [None]:
latent_space = model.encoder(train_dataset.data.reshape(-1,28*28).float())

In [None]:
labels = train_dataset.targets.numpy()
latent_space = latent_space[0].detach().numpy()

In [None]:
latent_space.shape

In [None]:
colors = plt.get_cmap('tab10')

plt.figure(dpi=200)
for i in range(10):
    index = labels == i
    plt.scatter(latent_space[index,0], latent_space[index,1], s=0.1, c=colors(labels[index]), label=i)
plt.legend(markerscale=10)

plt.xlabel('1st Dimension')
plt.ylabel('2nd Dimension')

In [None]:
X_train = model.encoder(train_dataset.data.reshape(-1,28*28).float())[0].detach().numpy()
y_train = train_dataset.targets.numpy()

X_test = model.encoder(test_dataset.data.reshape(-1,28*28).float())[0].detach().numpy()
y_test = test_dataset.targets.numpy()

clf = RandomForestClassifier(max_depth=4, random_state=0)
clf.fit(X_train, y_train)
y_pred = clf.predict(X_test)

print(accuracy_score(y_test, y_pred))