<a href="https://colab.research.google.com/github/mengqinghu123/LearnPytorch/blob/main/AutoEncoder_KMeans_MNIST.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Imports

In [1]:
from typing import Tuple
from torch import nn
from torch import Tensor
import torch
from torch.utils.data import DataLoader, TensorDataset
import numpy as np
import matplotlib.pyplot as plt

rs = 1234
np.random.seed(rs)

## Check if GPU is available

In [2]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [3]:
# Should print type='cuda' if GPU is available, otherwise 'cpu'
device

device(type='cpu')

## Defining the Auto-Encoder Model

In [4]:
class Encoder(nn.Module):
    """
    The Encoder network.
    A deep neural network that learns a lower-dimensional representation of the input data by mapping it into an embedding.
    """
    def __init__(self, input_size: int,
                hidden_layers: Tuple[int],
                 dropout_rate: float=0.2,
                 activation=nn.ReLU()
                ):
        super().__init__()

        # First layer, the input layer
        self.input_layer = torch.nn.Linear(input_size, hidden_layers[0])
        self.n_layers = 0

        ######################################################
        # Usually we could specify the layers in this way:
        # self.dense_0 = torch.nn.Linear(input_size, hidden_layers[0])
        # self.dense_1 = torch.nn.Linear(hidden_layers[0], hidden_layers[1])
        # ....
        #
        # However, instead of hardcoding this, we can do it automatically based on the hidden_layers
        # The output of one hidden_layer will always be the input for the next hidden_layet
        #######################################################
        for i in range(0, len(hidden_layers) -1):
            setattr(self, f"dense_{i}", torch.nn.Linear(hidden_layers[i],
                                                        hidden_layers[i+1])
                   )
            self.n_layers += 1

        self.activation = activation
        self.hidden_layers = hidden_layers

        # Add dropout to prevent overfitting
        self.dropout  = nn.Dropout(dropout_rate)
        self.dropout_rate = dropout_rate
        self.input_size = input_size

    def forward(self, x: Tensor) -> Tensor:
        # Special Treatment for input layer
        x = self.activation(self.input_layer(x))

        #################################################
        # forward pass through the dense layers
        # We could have written each dense layer explicitly:
        # x = self.activation(self.dense_0(x))
        # x = self.dropout(x)
        # x = self.activation(self.dense_1(x))
        # .....
        #
        # But we do it automatically:
        ##################################################
        for i in range(0, self.n_layers -1):
            x = self.activation(getattr(self, f"dense_{i}")(x))
            # dropout to prevent overfitting
            x = self.dropout(x)

        # Use layer without activation function to output embedding
        output_layer = getattr(self, f"dense_{self.n_layers-1}")
        return output_layer(x)

In [5]:
class Decoder(nn.Module):
    """
    Same as the encoder, but the layers are in reverse order.
    So, we pass the encoder as input and use its hidden_sizes to specify the decoder network.
    """
    def __init__(self,
                 encoder,
                 activation=nn.ReLU()
                ):
        super().__init__()
        self.hidden_layers = encoder.hidden_layers
        n_layers = encoder.n_layers
        self.hidden_layers = self.hidden_layers[::-1]

        # Reversed order -> dense_0 will be the first to apply here
        for i in range(0, n_layers):
            setattr(self, f"dense_{i}", torch.nn.Linear(self.hidden_layers[i],
                                                        self.hidden_layers[i+1])
                   )
        self.output_layer = torch.nn.Linear(self.hidden_layers[-1],
                                                        encoder.input_size)
        self.n_layers = n_layers
        self.activation = activation
        self.dropout  = nn.Dropout(encoder.dropout_rate)


    def forward(self, x:Tensor) -> Tensor:
        for i in range(0, self.n_layers):
            dense_i = getattr(self, f"dense_{i}")
            x = dense_i(x)
            x = self.activation(x)
            x = self.dropout(x)
        return self.output_layer(x)

In [6]:
class AutoEncoder(nn.Module):
    """
    The complete AutoEncoder that consists of the encoder and the decoder network.
    We need this for training, but for applying the autoencoder, we will only need the encoder to map input data to an embedding.
    """
    def __init__(self, input_size: int,
                hidden_layers: Tuple[int],
                 dropout_rate: float=0.2,
                activation=nn.ReLU()):
        super().__init__()
        self.encoder = Encoder(input_size, hidden_layers, dropout_rate)
        self.decoder = Decoder(self.encoder)
        self.hidden_layers = hidden_layers

    def forward(self, x: Tensor) -> Tuple[Tensor]:
        encoded = self.encoder(x)
        decoded = self.decoder(encoded)
        return encoded, decoded


### Loading MNIST Dataset

In [7]:
from torchvision.datasets import MNIST
from torch.utils.data import ConcatDataset
from torchvision import transforms
transform = transforms.Compose([transforms.ToTensor(),
                              transforms.Normalize((0.5,), (0.5,)),
                              ])
trainset = MNIST('./', download=True,
                 train=True,
                 transform=transform)
testset = MNIST('./', download=True,
                 train=False,
                 transform=transform)
dataset = ConcatDataset([trainset, testset])
dataloader = torch.utils.data.DataLoader(dataset,
                                         batch_size=256,
                                         shuffle=True,
                                         num_workers=10)

Downloading http://yann.lecun.com/exdb/mnist/train-images-idx3-ubyte.gz
Failed to download (trying next):
HTTP Error 404: Not Found

Downloading https://ossci-datasets.s3.amazonaws.com/mnist/train-images-idx3-ubyte.gz
Downloading https://ossci-datasets.s3.amazonaws.com/mnist/train-images-idx3-ubyte.gz to ./MNIST/raw/train-images-idx3-ubyte.gz


100%|██████████| 9.91M/9.91M [00:00<00:00, 37.8MB/s]


Extracting ./MNIST/raw/train-images-idx3-ubyte.gz to ./MNIST/raw

Downloading http://yann.lecun.com/exdb/mnist/train-labels-idx1-ubyte.gz
Failed to download (trying next):
HTTP Error 404: Not Found

Downloading https://ossci-datasets.s3.amazonaws.com/mnist/train-labels-idx1-ubyte.gz
Downloading https://ossci-datasets.s3.amazonaws.com/mnist/train-labels-idx1-ubyte.gz to ./MNIST/raw/train-labels-idx1-ubyte.gz


100%|██████████| 28.9k/28.9k [00:00<00:00, 18.4MB/s]


Extracting ./MNIST/raw/train-labels-idx1-ubyte.gz to ./MNIST/raw

Downloading http://yann.lecun.com/exdb/mnist/t10k-images-idx3-ubyte.gz
Failed to download (trying next):
HTTP Error 404: Not Found

Downloading https://ossci-datasets.s3.amazonaws.com/mnist/t10k-images-idx3-ubyte.gz
Downloading https://ossci-datasets.s3.amazonaws.com/mnist/t10k-images-idx3-ubyte.gz to ./MNIST/raw/t10k-images-idx3-ubyte.gz


100%|██████████| 1.65M/1.65M [00:00<00:00, 37.3MB/s]


Extracting ./MNIST/raw/t10k-images-idx3-ubyte.gz to ./MNIST/raw

Downloading http://yann.lecun.com/exdb/mnist/t10k-labels-idx1-ubyte.gz
Failed to download (trying next):
HTTP Error 404: Not Found

Downloading https://ossci-datasets.s3.amazonaws.com/mnist/t10k-labels-idx1-ubyte.gz
Downloading https://ossci-datasets.s3.amazonaws.com/mnist/t10k-labels-idx1-ubyte.gz to ./MNIST/raw/t10k-labels-idx1-ubyte.gz


100%|██████████| 4.54k/4.54k [00:00<00:00, 8.29MB/s]


Extracting ./MNIST/raw/t10k-labels-idx1-ubyte.gz to ./MNIST/raw





In [8]:
X_train = trainset.data.numpy().reshape(60000, 784)
X_test = testset.data.numpy().reshape(10000, 784)
X_test.shape

(10000, 784)

In [9]:
y_train = np.array(trainset.targets)
y_test = np.array(testset.targets)

In [10]:
y = np.concatenate([y_train, y_test])
X = np.concatenate([X_train, X_test])
X.shape

(70000, 784)

### Create Model and Specify Training Parameters

In [11]:
import torch.optim.lr_scheduler as lr_scheduler

loss_ = nn.MSELoss()
n_input_features = X.shape[1]
# Initialize architecture of our Auto-Encoder
model = AutoEncoder(input_size=n_input_features,
                    hidden_layers=[500, 500, 2000,
                                 10 # This is the dimension of the embedding
                                 ],
                   # Prevent overfitting by deactivating 20% of the neurons during training
                    dropout_rate=0.2
                   ).to(device) # use GPU if available

# Activate training mode
model.train()

# We could restore a model to continue training from a checkpoint
#model = torch.load("./torch_models/autoencoder")

# Learning Rate
lr = 0.1

# Use Stochastic Gradient Descent as optimizer with momentum 0.9
optimizer = torch.optim.SGD(lr=lr,
                            momentum=0.9,
                            params=model.parameters())

# reduce learning rate as training continues
scheduler = lr_scheduler.StepLR(optimizer,
                                  step_size=100,
                                  gamma=0.1)


### Pre-train AutoEncoder

In [None]:
n_epochs = 300
eval_every = 10
best_loss = np.infty

for epoch in range(n_epochs):
    losses = []
    # Iterate over data in batches
    for x_batch, y_batch in dataloader:
        # PyTorch specific; We need to reset all gradients
        optimizer.zero_grad()

        # 0. Transform input batch data from 28 X 28 to 784 features
        #   Note that our encoder maps the data into just 10 features!
        x_batch = x_batch.to(device)
        x_batch = x_batch.view(x_batch.shape[0], -1)

        # 1. Apply AutoEncoder model (forward pass).
        #    We use the output of the decoder for training.
        output = model(x_batch)[1]

        # 2. Calculate the reconstruction loss
        loss = loss_(output, x_batch)
        losses.append(loss.item())

        # 3. Backpropagate less
        loss.backward()

        # 4. Update the weights
        optimizer.step()


    mean_loss = np.round(np.mean(losses),
                         5)
    if (epoch+1) % eval_every == 0:
        print(f"Loss at epoch [{epoch+1} / {n_epochs}]: {mean_loss}")

    # Update learning rate as training continues
    scheduler.step()

    if mean_loss < best_loss:
        best_loss = loss
        # Store the model
        torch.save(model, "./torch_models/autoencoder")

#### Fine-Tune Auto-Encoder

In [None]:
# Load the model
model = torch.load("./torch_models/autoencoder")

# Inference Mode for fine-tuning
model.eval()

lr = 0.1
optimizer = torch.optim.SGD(lr=lr,
                            momentum=0.9,
                            params=model.parameters()
                           )
n_epochs = 100
eval_every = 10
best_loss = np.infty

for epoch in range(n_epochs):
    for x_batch, y_batch in dataloader:
        # Reset gradients --> Specific for PyTorch
        optimizer.zero_grad()

        # Use GPU
        x_batch = x_batch.to(device)

        # Image has shape 28 x 28 -> Transform to 784 features using flattening
        x_batch = x_batch.view(x_batch.shape[0], -1)

        # Apply the model
        output = model(x_batch)[1]

        # Calculate the loss
        loss = loss_(output, x_batch)
        losses.append(loss.item())

        # Backpropagate the loss
        loss.backward()

        # update weights
        optimizer.step()

    mean_loss = np.round(np.mean(losses),5)
    if (epoch+1) % eval_every == 0:
        print(f"Loss at epoch [{epoch+1} / {n_epochs}]: {mean_loss}")
    torch.save(model, "./torch_models/autoencoder-finetuned")

Loss at epoch [10 / 100]: 0.04415
Loss at epoch [20 / 100]: 0.0443
Loss at epoch [30 / 100]: 0.04442
Loss at epoch [40 / 100]: 0.0445
Loss at epoch [50 / 100]: 0.04457
Loss at epoch [60 / 100]: 0.04461
Loss at epoch [70 / 100]: 0.04464
Loss at epoch [80 / 100]: 0.04466
Loss at epoch [90 / 100]: 0.04466
Loss at epoch [100 / 100]: 0.04466


## Baseline KMeans Clustering

In [None]:
from sklearn.cluster import KMeans
import numpy as np
# Use the actual number of clusters as parameter
n_clusters = len(np.unique(y))

# Apply kmeans using sklearn
kmeans = KMeans(n_clusters=n_clusters, random_state=rs)

# Get training predictions
y_pred_kmeans = kmeans.fit_predict(X)

In [None]:
from sklearn.metrics import adjusted_mutual_info_score, adjusted_rand_score
print("Accuracy of k-Means Clustering:")
ami_kmeans = adjusted_mutual_info_score(y, y_pred_kmeans)
ari_kmeans = adjusted_rand_score(y, y_pred_kmeans)
print(f"AMI: {np.round(ami_kmeans, 3)}")
print(f"ARI: {np.round(ari_kmeans, 3)}")

Accuracy of k-Means Clustering:
AMI: 0.5
ARI: 0.367


## Apply Auto-Encoder

### Evaluate Pre-trained Auto-Encoder

In [None]:
model = torch.load("./torch_models/autoencoder")
X_embedded_pretrained = model(Tensor(X).to(device))[0]

In [None]:
# Apply kmeans using sklearn
kmeans = KMeans(n_clusters=n_clusters, random_state=rs)

# Convert Data to CPU and apply kmeans to get the cluster predictions
y_pred_AE_pretrained = kmeans.fit_predict(X_embedded_pretrained.detach().cpu())

In [None]:
print("Accuracy for Auto-Encoder:")
ami_AE_pretrained = adjusted_mutual_info_score(y, y_pred_AE_pretrained)
ari_AE_pretrained = adjusted_rand_score(y, y_pred_AE_pretrained)
print(f"AMI: {np.round(ami_AE_pretrained * 100, 1)}")
print(f"ARI: {np.round(ari_AE_pretrained * 100, 1)}")

Accuracy for Auto-Encoder:
AMI: 55.4
ARI: 47.1


### Evaluate Fine-tuned Auto-Encoder

In [None]:
model = torch.load("./torch_models/autoencoder-finetuned-old")
X_embedded = model(Tensor(X).to(device))[0]

In [None]:
# Apply kmeans using sklearn
kmeans = KMeans(n_clusters=n_clusters, random_state=rs)

# Get training predictions
y_pred_AE_finetuned = kmeans.fit_predict(X_embedded.detach().cpu())

In [None]:
print("Accuracy for Auto-Encoder:")
ami_AE_finetuned = adjusted_mutual_info_score(y, y_pred_AE_finetuned)
ari_AE_finetuned = adjusted_rand_score(y, y_pred_AE_finetuned)
print(f"AMI: {np.round(ami_AE_finetuned*100, 1)}")
print(f"ARI: {np.round(ari_AE_finetuned*100, 1)}")

Accuracy for Auto-Encoder:
AMI: 72.8
ARI: 66.3


## Overall Evaluation Result

In [None]:
import pandas as pd
df = pd.DataFrame({"Clustering Approach": ["k-Means", "Auto-Encoder (pre-trained)", "Auto-Encoder (fine-tuned)"],
                   "AMI": [ami_kmeans, ami_AE_pretrained, ami_AE_finetuned],
                  "ARI": [ari_kmeans, ari_AE_pretrained, ari_AE_finetuned]})
df["AMI"] *= 100
df["ARI"] *= 100
df["AMI"] = df["AMI"].round(1)
df["ARI"] = df["ARI"].round(1)

In [None]:
df

Unnamed: 0,Clustering Approach,AMI,ARI
0,k-Means,50.0,36.7
1,Auto-Encoder (pre-trained),55.4,47.1
2,Auto-Encoder (fine-tuned),72.8,66.3
