In [None]:
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torch.utils.data as data

import torchvision.transforms as transforms
import torchvision.datasets as datasets

from sklearn import metrics
from sklearn import decomposition
from sklearn import manifold
import matplotlib.pyplot as plt

import copy
import random

In [None]:
SEED = 1

random.seed(SEED)
np.random.seed(SEED)
torch.manual_seed(SEED)
torch.cuda.manual_seed(SEED)
torch.backends.cudnn.deterministic = True

In [None]:


ROOT = '.data'
train_data = datasets.MNIST(root = ROOT, 
                            train = True, 
                            download = True)




In [None]:
mean = train_data.data.float().mean()
std = train_data.data.float().std()

print(mean)
print(std)

In [None]:
print(f'Calculated mean: {mean}')
print(f'Calculated std: {std}')

In [None]:
train_transforms = transforms.Compose([
                            #transforms.RandomRotation(5, fill=(0,)),
                            #transforms.RandomCrop(28, padding = 2),
                            transforms.ToTensor(),
                            transforms.Normalize(mean=[0.5], std=[0.5])
                                      ])

test_transforms = transforms.Compose([
                           transforms.ToTensor(),
                           transforms.Normalize(mean=(0.5), std=(0.5))
                                     ])

In [None]:
train_data = datasets.MNIST(root = ROOT, 
                            train = True, 
                            download = True, 
                            transform = train_transforms)

test_data = datasets.MNIST(root = ROOT, 
                           train = False, 
                           download = True, 
                           transform = test_transforms)

In [None]:
train_data.data.float().mean()

In [None]:
def plot_images(images):

    n_images = len(images)

    rows = int(np.sqrt(n_images))
    cols = int(np.sqrt(n_images))

    fig = plt.figure()
    for i in range(rows*cols):
        ax = fig.add_subplot(rows, cols, i+1)
        ax.imshow(images[i].view(28, 28).cpu().numpy())
        ax.axis('off')

In [None]:
VALID_RATIO = 0.9

n_train_examples = int(len(train_data) * VALID_RATIO)
n_valid_examples = len(train_data) - n_train_examples

In [None]:
train_data, valid_data = data.random_split(train_data, 
                                           [n_train_examples, n_valid_examples])

In [None]:
N_IMAGES = 25

images = [image for image, label in [train_data[i] for i in range(N_IMAGES)]] 

plot_images(images)

In [None]:
print(f'Number of training examples: {len(train_data)}')
print(f'Number of validation examples: {len(valid_data)}')
print(f'Number of testing examples: {len(test_data)}')

In [None]:
valid_data = copy.deepcopy(valid_data)
valid_data.dataset.transform = test_transforms

In [None]:
N_IMAGES = 25

images = [image for image, label in [valid_data[i] for i in range(N_IMAGES)]] 

plot_images(images)

In [None]:
BATCH_SIZE = 256

train_iterator = data.DataLoader(train_data, 
                                 shuffle = True, 
                                 batch_size = BATCH_SIZE)

valid_iterator = data.DataLoader(valid_data, 
                                 batch_size = BATCH_SIZE)

test_iterator = data.DataLoader(test_data, 
                                batch_size = BATCH_SIZE)

In [None]:
images, labels = iter(train_iterator).next()
print(images)
print(images.max())
print(images.min())

In [None]:
config = {
    "embedding_size":784,
    "hidden_size":500,
    "hidden_size1":256,
    "hidden_size2":128,
    "hidden_size3":64,
    "hidden_size4":32
}

configVAE = {
    "embedding_size":784,
    "hidden_size":400,
    "hidden_size1":4,
    "hidden_size2":2
}

In [None]:
def to_img(x, size_x=28):
    x = 0.5 * (x + 1)
    x = x.view(x.size(0), size_x, size_x)
    return x

In [None]:
def display_images(in_, out, n=1, size_x=28):
    for N in range(n):
        if in_ is not None:
            in_pic = to_img(in_.cpu().data)
            plt.figure(figsize=(18, 6))
            for i in range(4):
                plt.subplot(1,4,i+1)
                plt.imshow(in_pic[i+4*N])
                plt.axis('off')
        out_pic = to_img(out.cpu().data)
        plt.figure(figsize=(18, 6))
        for i in range(4):
            plt.subplot(1,4,i+1)
            plt.imshow(out_pic[i+4*N])
            plt.axis('off')

# DENOISE AUTOECODER

In [None]:
class AE(nn.Module):
    def __init__(self, config):
        super(AE, self).__init__()
        self.Encoder = nn.Sequential(nn.Linear(config['embedding_size'], config['hidden_size']), nn.ReLU(inplace=True))
        self.Decoder = nn.Sequential(nn.Linear(config['hidden_size'], config['embedding_size']), nn.LeakyReLU(0.8))
        
    def forward(self, x):
        z = self.Encoder(x)
        out = self.Decoder(z)
        return out

In [None]:
def criterion(model, images, y_pred, reg_param):
    loss = nn.MSELoss()
    loss1 = loss(y_pred, images)

    return loss1 

In [None]:
def train_func(optimizer , criterion, train_data, model, device, reg_param=0.001):
    model.train()
    
    loss_train = []
    model.to(device)
    do = nn.Dropout()
    for (x,_) in train_data:
        x = x.to(device)
        x = x.view(x.shape[0], -1)
        
        noise = do(torch.ones(x.shape)).to(device)
        x_bad = (x * noise).to(device)
        
        out = model(x_bad)
        
        loss = criterion(model, x, out, reg_param)
        
        loss.backward()
        
        optimizer.step()
        
        optimizer.zero_grad()
        
        loss_train.append(loss.item())
        
    return loss_train
        

In [None]:
def valid_func(criterion, valid_data, model, device, reg_param=0.001):
    model.eval()
    
    valid_loss = []
    model.to(device)
    do = nn.Dropout()
    with torch.no_grad():
        for (x,_) in valid_data:

            x = x.to(device)
            x = x.view(x.shape[0], -1)
            
            noise = do(torch.ones(x.shape)).to(device)
            x_bad = (x * noise).to(device)

            out = model(x_bad)

            loss = criterion(model, x, out, reg_param)

            valid_loss.append(loss.item())
        
    return valid_loss
        

In [None]:
epochs = 100
learning_rate = 1e-3

device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
model = AE(config)
optimizer = optim.Adam(model.parameters(), lr=learning_rate)
model_children = list(model.children())

for epoch in range(epochs):
    loss_train = train_func(optimizer, criterion, train_iterator, model, device)
    print("Epoch: {} || Loss: {}".format(epoch, (np.sum(loss_train)/len(loss_train))))
    valid_loss = valid_func(criterion, valid_iterator, model, device)
    print("Epoch: {} || Validation Loss: {}".format(epoch, (np.sum(valid_loss)/len(valid_loss))))

In [None]:
def pred_fn(model, test_iterator, device):
    with torch.no_grad():
        for x,_ in test_iterator:
            x = x.view(x.shape[0], -1).to(device)
            out = model(x)
            display_images(None, out, 5)
            break

In [None]:
pred_fn(model, test_iterator, device)

In [None]:
display_images(None, model.Encoder[0].weight, 20)

# **SPARSE AUTOENCODER**

In [None]:
class Sparse_AE(nn.Module):
    def __init__(self, config):
        super(Sparse_AE, self).__init__()
        self.Encoder = nn.Sequential(
            nn.Linear(config['embedding_size'], config['hidden_size']),
            nn.ReLU(inplace=True),
            nn.Linear(config['hidden_size'], config['hidden_size1']),
            nn.ReLU(inplace=True),
            nn.Linear(config['hidden_size1'], config['hidden_size2']),
            nn.ReLU(inplace=True),
            nn.Linear(config['hidden_size2'], config['hidden_size3']),
            nn.ReLU(inplace=True),
        )
        self.Decoder = nn.Sequential(
             nn.Linear(config['hidden_size3'], config['hidden_size2']),
            nn.ReLU(inplace=True),
            nn.Linear(config['hidden_size2'], config['hidden_size1']),
            nn.ReLU(inplace=True),
            nn.Linear(config['hidden_size1'], config['hidden_size']),
            nn.ReLU(inplace=True),
            nn.Linear(config['hidden_size'], config['embedding_size']),
            nn.LeakyReLU(0.1))
        
    def forward(self, x):
        z = self.Encoder(x)
        out = self.Decoder(z)
        return out

In [None]:
def sparse_loss(model, images):
    loss = 0
    values = images
    for i in range(4):
        fc_layer = list(model.Encoder.children())[2 * i]
        relu = list(model.Encoder.children())[2 * i + 1]
        values = relu(fc_layer(values))
        loss += torch.mean(torch.abs(values))
    for i in range(4-1):
        fc_layer = list(model.Decoder.children())[2 * i]
        relu = list(model.Decoder.children())[2 * i + 1]
        values = relu(fc_layer(values))
        loss += torch.mean(torch.abs(values))
    return loss

In [None]:
def sparse_criterion(model, images, y_pred, reg_param):
    loss = nn.MSELoss()
    loss1 = loss(y_pred, images)
    loss2 = sparse_loss(model, images)

    return loss1 + reg_param*loss2

In [None]:
def train_func(optimizer , criterion, train_data, model, device, reg_param=0.001):
    model.train()
    
    loss_train = []
    model.to(device)
    for (x,_) in train_data:
        x = x.to(device)
        x = x.view(x.shape[0], -1)
        
        out = model(x)
        
        loss = criterion(model, x, out, reg_param)
        
        loss.backward()
        
        optimizer.step()
        
        optimizer.zero_grad()
        
        loss_train.append(loss.item())
        
    return loss_train

In [None]:
def valid_func(criterion, valid_data, model, device, reg_param=0.001):
    model.eval()
    
    valid_loss = []
    model.to(device)
    with torch.no_grad():
        for (x,_) in valid_data:
            x = x.to(device)
            x = x.view(x.shape[0], -1)
            
            out = model(x)

            loss = criterion(model, x, out, reg_param)

            valid_loss.append(loss.item())
        
    return valid_loss
        

In [None]:
epochs = 100
learning_rate = 1e-3

device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
model = Sparse_AE(config)
optimizer = optim.Adam(model.parameters(), lr=learning_rate)
model_children = list(model.children())

for epoch in range(epochs):
    loss_train = train_func(optimizer, sparse_criterion, train_iterator, model, device)
    print("Epoch: {} || Loss: {}".format(epoch, (np.sum(loss_train)/len(loss_train))))
    valid_loss = valid_func(sparse_criterion, valid_iterator, model, device)
    print("Epoch: {} || Validation Loss: {}".format(epoch, (np.sum(valid_loss)/len(valid_loss))))

In [None]:
model.Encoder[4].weight.shape

In [None]:
display_images(None, model.Encoder[0].weight, 20)

In [None]:
pred_fn(model, test_iterator, device)

# VARIATIONAL-AUTOENCODER

In [None]:
class VAE(nn.Module):
    def __init__(self, config):
        super(VAE, self).__init__()
        self.encoder = nn.Sequential(
            nn.Linear(config["embedding_size"],config["hidden_size"]),#784, 625
            nn.ReLU(),
            nn.Linear(config["hidden_size"],config["hidden_size1"]),#625, 50
        )
        
        self.decoder = nn.Sequential(
            nn.Linear(config['hidden_size2'], config["hidden_size"]),#25 , 625
            nn.ReLU(),
            nn.Linear(config["hidden_size"], config["embedding_size"]),#625, 784
            nn.Tanh()
        )
        
        self.config = config
    
    def reparameterise(self, mu, logvar):
        #print(self.training)
        if self.training:
            std = logvar.mul(0.5).exp_()
            eps = std.data.new(std.size()).normal_()
            return eps.mul(std).add_(mu)#will make the sphere centerd in mu(mean) and have a radius of std
        else:
            return mu
        
    def forward(self, x):
        out = self.encoder(x.view(-1, config['embedding_size'])).view(-1, 2, self.config['hidden_size2'])
        mean = out[:,0,:]
        logvar = out[:,1,:]
        z = self.reparameterise(mean, logvar)
        return self.decoder(z), mean, logvar

In [None]:
def loss_function(x_hat, x, mu, logvar):
    MSE = F.mse_loss(
        x_hat, x.view(-1, configVAE['embedding_size']), reduction='sum'
    )
    KLD = 0.5 * torch.sum(logvar.exp() - logvar - 1 + mu.pow(2))
    
    #KLD = -0.5 * torch.sum(1 + logvar - mu.pow(2) - logvar.exp())

    return MSE + KLD

In [None]:
def train_func(optimizer , criterion, train_data, model, device, reg_param=0.001):
    model.train()
    
    loss_train = []
    model.to(device)
    for (x,_) in train_data:
        x = x.to(device)
        x = x.view(x.shape[0], -1)
        
        out, mean, logvar = model(x)
        
        #print(out, x)
        loss = loss_function(out, x, mean, logvar)
        
        loss.backward()
        
        optimizer.step()
        
        optimizer.zero_grad()
        
        loss_train.append(loss.item())
        
    return loss_train

In [None]:
def valid_func(criterion, valid_data, model, device, reg_param=0.001):
    model.eval()
    
    valid_loss = []
    model.to(device)
    means, logvars, labels = list(), list(), list()
    with torch.no_grad():
        for (x,y) in valid_data:
            x = x.to(device)
            x = x.view(x.shape[0], -1)

            out, mean, logvar = model(x)
            
            loss = loss_function(out, x, mean, logvar)

            valid_loss.append(loss.item())
            
            means.append(mean.detach())
            logvars.append(logvar.detach())
            labels.append(y.detach())
        
    return valid_loss, means, logvars, labels
        

In [None]:
len(train_iterator.dataset)

In [None]:
epochs = 100
learning_rate = 1e-3

device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
model = VAE(configVAE)
optimizer = optim.Adam(model.parameters(), lr=learning_rate)
model_children = list(model.children())
codes = {'means':list(), 'logvars':list(), 'y':list()}
for epoch in range(epochs):
    loss_train = train_func(optimizer, loss_function, train_iterator, model, device)
    print("Epoch: {} || Loss: {}".format(epoch, (np.sum(loss_train)/len(train_iterator.dataset))))
    valid_loss, means, logvars, labels = valid_func(loss_function, valid_iterator, model, device)
    codes['means'].append(torch.cat(means))
    codes['logvars'].append(torch.cat(logvars))
    codes['y'].append(torch.cat(labels))
    print("Epoch: {} || Validation Loss: {}".format(epoch, (np.sum(valid_loss)/len(valid_iterator.dataset))))

In [None]:
display_images(None, model.encoder[0].weight, 20)

In [None]:
N = 16
z = torch.randn((N, configVAE['hidden_size2'])).to(device)
sample = model.decoder(z)
display_images(None, sample, N // 4)

In [None]:

# Perform an interpolation between input A and B, in N steps

N = 16
code = torch.Tensor(N, 2).to(device)
sample = torch.Tensor(N, 28, 28).to(device)
for i in range(N):
    code[i] = i / (N - 1) * codes['means'][3][B].data + (1 - i / (N - 1) ) * codes['means'][3][A].data
    # sample[i] = i / (N - 1) * x[B].data + (1 - i / (N - 1) ) * x[A].data
sample = model.decoder(code)
display_images(None, sample, N // 4)

In [None]:
from sklearn.manifold import TSNE

In [None]:
X, Y, E = list(), list(), list()  # input, classes, embeddings
N = 1000  # samples per epoch
epochs = (0, 50, 99)
for epoch in epochs:
    X.append(codes['means'][epoch][:N])
    E.append(TSNE(n_components=2).fit_transform(X[-1]))
    Y.append(codes['y'][epoch][:N])

In [None]:
plt.rcParams['figure.figsize'] = (20, 10)

In [None]:
f, a = plt.subplots(ncols=3)
for i, e in enumerate(epochs):
    s = a[i].scatter(E[i][:,0], E[i][:,1], c=Y[i], cmap='tab10')
    a[i].grid(False)
    a[i].set_title(f'Epoch {e}')
    a[i].axis('equal')
f.colorbar(s, ax=a[:], ticks=np.arange(10), boundaries=np.arange(11) - .5)