# AutoEncoding & Self-Supervision

### Loading Libraries

In [None]:
# Numerical Computing
import numpy as np

# Data Manipulation
import pandas as pd

# Data Visualization
import seaborn as sns
import matplotlib
import matplotlib_inline
import matplotlib.pyplot as plt
from matplotlib.pyplot import imshow

# Dataset's Iteration Performance
from tqdm import tqdm

# Time
import time

# OS
import sys
import string
import unicodedata
import requests, zipfile, io

# SciPy
from scipy.signal import convolve

# PyTorch
import torch
import torchvision
import torch.nn as nn
from torch.utils.data import *
import torch.nn.functional as F
from torchvision import transforms
from torch.utils.data import Dataset, DataLoader


# IDLMAM Libraries
from idlmam import moveTo, run_epoch, set_seed
from idlmam import train_simple_network, set_seed, Flatten, weight_reset, train_network
from idlmam import LanguageNameDataset, pad_and_pack, EmbeddingPackable, LastTimeStep, LambdaLayer


# Scikit-Learn
from sklearn.metrics import accuracy_score
from sklearn.metrics import f1_score

#  IPython Display
from IPython.display import Latex
from IPython.display import display_pdf
from IPython.display import set_matplotlib_formats

In [None]:
%matplotlib inline

matplotlib_inline.backend_inline.set_matplotlib_formats('png', 'pdf')

In [None]:
torch.backends.cudnn.deterministic=True

set_seed(42)

In [None]:
device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")

In [None]:
# Input Size
D = 28*28 

#Hidden Layer Size
n = 2 
# Channels
C = 1

# Classes
classes = 10

In [None]:
class TransposeLinear(nn.Module): #Our class extends nn.Module, all PyTorch layers must extend this. 
    def __init__(self, linearLayer, bias=True):
        super().__init__()
        
        self.weight = linearLayer.weight
        if bias:
            self.bias = nn.Parameter(torch.Tensor(linearLayer.weight.shape[1]))
        else:
            self.register_parameter('bias', None)
    
    def forward(self, x):
        return F.linear(x, self.weight.t(), self.bias)

In [None]:
# Encoder
linearLayer = nn.Linear(D,  n, bias=False)
#The encoder just falltens and then uses the linear layer
pca_encoder = nn.Sequential(
    nn.Flatten(),
    linearLayer,
)

# Decoder
pca_decoder = nn.Sequential(
    TransposeLinear(linearLayer, bias=False),
    View(-1, 1, 28, 28)#Shape the data back to it's original form
)

# PCA Model
pca_model = nn.Sequential(
    pca_encoder,
    pca_decoder
)

In [None]:
# Orthogonal Random Set
nn.init.orthogonal_(linearLayer.weight)

In [None]:
# Loss Function
mse_loss = nn.MSELoss() 

# PCA Loss Function
def mseWithOrthoLoss(x, y):
    W = linearLayer.weight
    I = torch.eye(W.shape[0]).to(device)
    normal_loss =  mse_loss(x, y) 
    regularization_loss = 0.1*mse_loss(torch.mm(W, W.t()), I)
    return normal_loss + regularization_loss

In [None]:
class AutoEncodeDataset(Dataset):

    def __init__(self, dataset):
        self.dataset = dataset

    def __len__(self):
        return len(self.dataset)

    def __getitem__(self, idx):
        x, y = self.dataset.__getitem__(idx)
        return x, x#just throw away the original label. 

### Loading Data

In [None]:
# Train Set
train_data = AutoEncodeDataset(torchvision.datasets.MNIST("./", train=True, transform=transforms.ToTensor(), download=True))

# Test Set
test_data_xy = torchvision.datasets.MNIST("./", train=False, transform=transforms.ToTensor(), download=True)
test_data_xx = AutoEncodeDataset(test_data_xy)

# Data Loader
train_loader = DataLoader(train_data, batch_size=128, shuffle=True)
test_loader = DataLoader(test_data_xx, batch_size=128)

In [None]:
train_network(pca_model, 
mseWithOrthoLoss, 
train_loader, 
test_loader=test_loader, 
epochs=10, 
device=device)

In [None]:
def encode_batch(encoder, dataset_to_encode):
    projected = []
    labels = []
    encoder = encoder.eval()
    encoder = encoder.cpu()
    with torch.no_grad():
        for x, y in  DataLoader(dataset_to_encode, batch_size=128):
            z = encoder(x.cpu()) 
            projected.append( z.numpy() ) 
            labels.append( y.cpu().numpy().ravel() )
    projected = np.vstack(projected)
    labels = np.hstack(labels)
    return projected, labels


projected, labels = encode_batch(pca_encoder, test_data_xy)

In [None]:
sns.scatterplot(x=projected[:,0], 
y=projected[:,1], 
hue=[str(l) for l in labels], 
hue_order=[str(i) for i in range(10)], 
legend="full")

plt.grid(True)
plt.show()

In [None]:
def showEncodeDecode(encode_decode, x):
    encode_decode = encode_decode.eval()
    encode_decode = encode_decode.cpu()
    with torch.no_grad():
        x_recon = encode_decode(x.cpu())
    f, axarr = plt.subplots(1,2)
    axarr[0].imshow(x.numpy()[0,:])
    axarr[1].imshow(x_recon.numpy()[0,0,:])

In [None]:
showEncodeDecode(pca_model, test_data_xy[0][0])

showEncodeDecode(pca_model, test_data_xy[2][0])

showEncodeDecode(pca_model, test_data_xy[10][0])

### A simpler non-linear PCA

In [None]:
pca_nonlinear_encode = nn.Sequential(
    nn.Flatten(),
    nn.Linear(D,  n),
    nn.Tanh(), 
)

pca_nonlinear_decode = nn.Sequential(
    nn.Linear(n, D),
    View(-1, 1, 28, 28)
)

pca_nonlinear = nn.Sequential(
    pca_nonlinear_encode,
    pca_nonlinear_decode
)

In [None]:
train_network(pca_nonlinear, 
mse_loss, 
train_loader, 
test_loader=test_loader, 
epochs=10, 
device=device)

In [None]:
projected, labels = encode_batch(pca_nonlinear_encode, test_data_xy)

sns.scatterplot(x=projected[:,0], 
y=projected[:,1], 
hue=[str(l) for l in labels], 
hue_order=[str(i) for i in range(10)], 
legend="full" )

plt.grid(True)
plt.show()

In [None]:
showEncodeDecode(pca_nonlinear, test_data_xy[0][0])

showEncodeDecode(pca_nonlinear, test_data_xy[2][0])

showEncodeDecode(pca_nonlinear, test_data_xy[10][0])