# Tutorial

https://www.kaggle.com/code/frankmollard/a-story-about-unsupervised-learning

In [66]:
import pandas as pd
import os
import seaborn as sns
import matplotlib.pyplot as plt
import numpy as np

from sklearn.preprocessing import Normalizer
from sklearn.decomposition import PCA
from sklearn.manifold import TSNE

import warnings
warnings.simplefilter(action='ignore', category=FutureWarning)

import torch
from torch import nn

from tqdm import tqdm
from collections import defaultdict
import torchvision
from torchvision.transforms import v2
from torch.utils.data import DataLoader
from torch import functional as F

In [3]:
ROOT = os.getcwd()
DATASET_PATH = os.path.join(ROOT, 'dataset')
if os.path.exists(DATASET_PATH) == False: 
    os.makedirs(DATASET_PATH)


# Import MNIST

In [76]:
compose = v2.Compose([
    v2.ToTensor(),
    v2.ToDtype(torch.float32, scale=True),
    v2.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
    v2.Grayscale(),

])
mnist = torchvision.datasets.MNIST(root=DATASET_PATH, download = True, transform=compose)

train_dataloader = DataLoader(mnist, batch_size = 32)

In [77]:
sampleX, sampleY = next(iter(train_dataloader))
print(sampleX.shape)
print(sampleY.shape)


torch.Size([32, 1, 28, 28])
torch.Size([32])


# Autoencoder

In [109]:
class AutoEncoder(nn.Module):

    def __init__(self, input_size, embedding_size = 32) -> None:
        super().__init__()
        assert len(input_size) == 3, 'input shape must be int format of (1,x,x)'
        self.input_size = input_size
        flatten_dim = input_size[0] * input_size[1] * input_size[2] 

        self.encoder = nn.Sequential(
            nn.Linear(flatten_dim, 512),
            nn.LeakyReLU(),
            nn.Dropout(0.5),
            nn.BatchNorm1d(512),
            nn.Linear(512, 256),
            nn.LeakyReLU(),
            nn.Dropout(0.5),
            nn.BatchNorm1d(256),
            nn.Linear(256, 64),
            nn.LeakyReLU(),
            nn.Dropout(0.5),
            nn.BatchNorm1d(64),
            nn.Linear(64,embedding_size),
            nn.LeakyReLU(),
        ) 

        self.decoder = nn.Sequential(
            nn.Linear(embedding_size, 64),
            nn.LeakyReLU(),
            nn.Dropout(0.5),
            nn.BatchNorm1d(64),
            nn.Linear(64, 256),
            nn.LeakyReLU(),
            nn.Dropout(0.5),
            nn.BatchNorm1d(256),
            nn.Linear(256, 512),
            nn.LeakyReLU(),
            nn.Dropout(0.5),
            nn.BatchNorm1d(512),
            nn.Linear(512, flatten_dim),
            nn.Sigmoid(),
        ) 

    
    def forward(self, X):
        X = torch.flatten(X, start_dim = 1)
        X = self.encoder(X)
        X = self.decoder(X)
        w,h = self.input_size[1:]
        X = X.reshape(-1, w,h)
        return X

        
        
    
class Trainer:
    def __init__(self, model: nn.Module, optimizer_fn, criterion_fn, lr = 0.001, epochs = 1000) -> None:
        self.model = model
        self.epochs = epochs
        self.lr = lr

        self.optimizer = optimizer_fn(self.model.parameters(), lr = lr)
        self.criterion = criterion_fn()
        self.history = defaultdict(list)

    def optimize(self, input, target):
        self.optimizer.zero_grad()
        loss = self.criterion(input, target)
        loss.backward()
        self.optimizer.step()

    def train(self, train_dataloader):
        self.model.train()
        self.history.clear()

        loop = tqdm(range(self.epochs))
        val_loss = None
        running_loss = 0.0

        for e in loop:

            for X, y in train_dataloader:
                X_pred = self.model(X)
                
                loss = self.criterion(X_pred, X)
                self.optimizer.zero_grad()
                loss.backward()
                self.optimizer.step()

                self.history['train loss'].append(loss.item())
                running_loss += loss.item()
        
            loop.set_description(f'train loss = {running_loss:.3f}')
        self.plot(self.history)
        
    # def evaluate(self, X_valid):
    #     self.model.eval()
    #     with torch.no_grad():
    #         X_valid_pred = self.model(X_valid)
    #         val_loss = self.criterion(X_valid_pred, X_valid)
    #     self.model.train()

    #     return val_loss
    
    def plot(self, history):        
        indices = list(range(len(history['train loss'])))
        sns.lineplot(y = history['train loss'], x =indices, label = 'train')
        if len(history['valid loss']) > 0:
            sns.lineplot(y = history['valid loss'], x =indices, label = 'valid')
        plt.title("Loss")
        plt.show()


In [85]:
print(sampleX[0].shape)
print(len(sampleX[0].shape))

torch.Size([1, 28, 28])
3


In [103]:
ae = AutoEncoder(sampleX[0].shape)
sample_pred = ae(sampleX)
print(sample_pred.shape)

torch.Size([32, 784])
torch.Size([32, 32])
torch.Size([32, 28, 28])


In [110]:
sampleX, sampleY = next(iter(train_dataloader))

trainer = Trainer(
    model = AutoEncoder(sampleX[0].shape),
    optimizer_fn= torch.optim.Adam,
    criterion_fn= nn.MSELoss,
    lr = 1e-4,
    epochs = 100,
)

trainer.train(train_dataloader)




  return F.mse_loss(input, target, reduction=self.reduction)
train loss = 8299.500:   1%|          | 1/100 [00:27<45:29, 27.57s/it]


KeyboardInterrupt: 