## Imports

In [7]:
from typing import Tuple
from torch import nn
from torch import Tensor
import torch
from torch.utils.data import DataLoader, TensorDataset
import numpy as np
import matplotlib.pyplot as plt

rs = 1234
np.random.seed(rs)

# Check if GPU is available

In [8]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [9]:
# Should print type='cuda' if GPU is available, otherwise 'cpu'
device

device(type='cuda')

## Defining the Auto-Encoder Model

In [10]:
class Encoder(nn.Module):
    """
    The Encoder network. 
    A deep neural network that learns a lower-dimensional representation of the input data by mapping it into an embedding.
    """
    def __init__(self, input_size: int, 
                hidden_sizes: Tuple[int],
                 dropout_rate: float=0.2,
                 activation=nn.ReLU()
                ):
        super().__init__()
        
        # First layer, the input layer
        self.input_layer = torch.nn.Linear(input_size, hidden_sizes[0])
        self.n_layers = 0

        ######################################################
        # Usually we could specify the layers in this way:
        # self.dense_0 = torch.nn.Linear(input_size, hidden_sizes[0])
        # self.dense_1 = torch.nn.Linear(hidden_sizes[0], hidden_sizes[1])
        # ....
        # 
        # However, instead of hardcoding this, we can do it automatically based on the hidden_sizes
        #######################################################
        for i in range(0, len(hidden_sizes) -1):
            setattr(self, f"dense_{i}", torch.nn.Linear(hidden_sizes[i],
                                                        hidden_sizes[i+1])
                   )
            self.n_layers += 1
        
        self.activation = activation
        self.hidden_sizes = hidden_sizes
        
        # Add dropout to prevent overfitting
        self.dropout  = nn.Dropout(dropout_rate)
        self.dropout_rate = dropout_rate
        self.input_size = input_size

    def forward(self, x: Tensor) -> Tensor:
        # Special Treatment for input layer
        x = self.activation(self.input_layer(x))
        
        #################################################
        # forward pass through the dense layers
        # We could have written each dense layer explicitly:
        # x = self.activation(self.dense_0(x))
        # x = self.dropout(x)
        # x = self.activation(self.dense_1(x))
        # .....
        # 
        # But we do it automatically:
        ##################################################
        for i in range(0, self.n_layers -1):
            x = self.activation(getattr(self, f"dense_{i}")(x))
            # dropout to prevent overfitting
            x = self.dropout(x)
            
        # Use layer without activation function to output embedding
        output_layer = getattr(self, f"dense_{self.n_layers-1}")
        return output_layer(x)

In [11]:
class Decoder(nn.Module):
    """
    Same as the encoder, but the layers are in reverse order. 
    So, we pass the encoder as input and use its hidden_sizes to specify the decoder network.
    """
    def __init__(self,
                 encoder,
                 activation=nn.ReLU()
                ):
        super().__init__()
        self.hidden_sizes = encoder.hidden_sizes
        n_layers = encoder.n_layers
        self.hidden_sizes = self.hidden_sizes[::-1]
        
        # Reversed order -> dense_0 will be the first to apply here
        for i in range(0, n_layers):
            setattr(self, f"dense_{i}", torch.nn.Linear(self.hidden_sizes[i],
                                                        self.hidden_sizes[i+1])
                   )
        self.output_layer = torch.nn.Linear(self.hidden_sizes[-1],
                                                        encoder.input_size)
        self.n_layers = n_layers
        self.activation = activation
        self.dropout  = nn.Dropout(encoder.dropout_rate)

        
    def forward(self, x:Tensor) -> Tensor:
        for i in range(0, self.n_layers):
            dense_i = getattr(self, f"dense_{i}")
            x = dense_i(x)
            x = self.activation(x)
            x = self.dropout(x)
        return self.output_layer(x)

In [12]:
class AutoEncoder(nn.Module):
    """
    The complete AutoEncoder that consists of the encoder and the decoder network. 
    We need this for training, but for applying the autoencoder, we will only need the encoder to map input data to an embedding.
    """
    def __init__(self, input_size: int, 
                hidden_sizes: Tuple[int],
                 dropout_rate: float=0.2,
                activation=nn.ReLU()):
        super().__init__()
        self.encoder = Encoder(input_size, hidden_sizes, dropout_rate)
        self.decoder = Decoder(self.encoder)
        self.hidden_sizes = hidden_sizes

    def forward(self, x: Tensor) -> Tuple[Tensor]:
        encoded = self.encoder(x)
        decoded = self.decoder(encoded)
        return encoded, decoded
        

### Loading MNIST Dataset

In [13]:
from torchvision.datasets import MNIST
from torch.utils.data import ConcatDataset
from torchvision import transforms
transform = transforms.Compose([transforms.ToTensor(),
                              transforms.Normalize((0.5,), (0.5,)),
                              ])
trainset = MNIST('./', download=True,
                 train=True,
                 transform=transform)
testset = MNIST('./', download=True,
                 train=False,
                 transform=transform)
dataset = ConcatDataset([trainset, testset])
dataloader = torch.utils.data.DataLoader(dataset,
                                         batch_size=256, 
                                         shuffle=True,
                                         num_workers=10)

In [14]:
X_train = trainset.data.numpy().reshape(60000, 784)
X_test = testset.data.numpy().reshape(10000, 784)
X_test.shape

(10000, 784)

In [15]:
y_train = np.array(trainset.targets)
y_test = np.array(testset.targets)

In [16]:
y = np.concatenate([y_train, y_test])
X = np.concatenate([X_train, X_test])
X.shape

(70000, 784)

### Create Model and Specify Training Parameters

In [17]:
import torch.optim.lr_scheduler as lr_scheduler

loss_ = nn.MSELoss()
n_input_features = X.shape[1]
# Initialize architecture of our Auto-Encoder
model = AutoEncoder(input_size=n_input_features, 
                    hidden_sizes=[500, 500, 2000, 
                                 10 # This is the dimension of the embedding
                                 ],
                   # Prevent overfitting by deactivating 20% of the neurons during training
                    dropout_rate=0.2 
                   ).to(device) # use GPU if available

# Activate training mode
model.train()

# We could restore a model to continue training from a checkpoint
#model = torch.load("./torch_models/autoencoder")

# Learning Rate
lr = 0.1

# Use Stochastic Gradient Descent as optimizer with momentum 0.9
optimizer = torch.optim.SGD(lr=lr, 
                            momentum=0.9,
                            params=model.parameters())

# reduce learning rate as training continues
scheduler = lr_scheduler.StepLR(optimizer, 
                                  step_size=100,
                                  gamma=0.1)


### Pre-train AutoEncoder

In [18]:
n_epochs = 300
eval_every = 10
best_loss = np.infty

for epoch in range(n_epochs):
    losses = []
    # Iterate over data in batches
    for x_batch, y_batch in dataloader:
        # Transform input batch data
        x_batch = x_batch.to(device)
        x_batch = x_batch.view(x_batch.shape[0], -1)

        # Apply AutoEncoder model
        output = model(x_batch)[1]

        # Calculate the reconstruction loss
        loss = loss_(output, x_batch)
        losses.append(loss.item())
        optimizer.zero_grad()
        
        loss.backward()
        
        optimizer.step()

    # Update learning rate
    scheduler.step()
    mean_loss = np.round(np.mean(losses),
                         5)
    if (epoch+1) % eval_every == 0:   
        print(f"Loss at epoch [{epoch+1} / {n_epochs}]: {mean_loss}")

    if mean_loss < best_loss:
        best_loss = loss
        # Store the model
        torch.save(model, "./torch_models/autoencoder")

Loss at epoch [10 / 300]: 0.15144
Loss at epoch [20 / 300]: 0.11768
Loss at epoch [30 / 300]: 0.10562
Loss at epoch [40 / 300]: 0.09888
Loss at epoch [50 / 300]: 0.09435
Loss at epoch [60 / 300]: 0.09095
Loss at epoch [70 / 300]: 0.08822
Loss at epoch [80 / 300]: 0.08609
Loss at epoch [90 / 300]: 0.08431
Loss at epoch [100 / 300]: 0.08283
Loss at epoch [110 / 300]: 0.08216
Loss at epoch [120 / 300]: 0.08196
Loss at epoch [130 / 300]: 0.08188
Loss at epoch [140 / 300]: 0.08165
Loss at epoch [150 / 300]: 0.08155
Loss at epoch [160 / 300]: 0.08142
Loss at epoch [170 / 300]: 0.08128
Loss at epoch [180 / 300]: 0.08116
Loss at epoch [190 / 300]: 0.08103
Loss at epoch [200 / 300]: 0.08083
Loss at epoch [210 / 300]: 0.08084
Loss at epoch [220 / 300]: 0.08085
Loss at epoch [230 / 300]: 0.08083
Loss at epoch [240 / 300]: 0.0808
Loss at epoch [250 / 300]: 0.08074
Loss at epoch [260 / 300]: 0.0807
Loss at epoch [270 / 300]: 0.0808
Loss at epoch [280 / 300]: 0.08078
Loss at epoch [290 / 300]: 0.080

#### Fine-Tune Auto-Encoder

In [19]:
# Load the model
model = torch.load("./torch_models/autoencoder")

# Inference Mode for fine-tuning
model.eval()

lr = 0.1
optimizer = torch.optim.SGD(lr=lr, 
                            momentum=0.9,
                            params=model.parameters()
                           )
n_epochs = 300
eval_every = 10
best_loss = np.infty

for epoch in range(n_epochs):
    for x_batch, y_batch in dataloader:
        # Use GPU
        x_batch = x_batch.to(device)
        
        # Image has shape 28 x 28 -> Transform to 784 features
        x_batch = x_batch.view(x_batch.shape[0], -1)
        
        # Apply the model
        output = model(x_batch)[1]

        # Calculate the loss
        loss = loss_(output, x_batch)
        losses.append(loss.item())

        # Reset gradients --> Specific for PyTorch
        optimizer.zero_grad()
        
        # Backpropagate the loss
        loss.backward()

        # update weights
        optimizer.step()
        
    mean_loss = np.round(np.mean(losses),5)
    if (epoch+1) % eval_every == 0:   
        print(f"Loss at epoch [{epoch+1} / {n_epochs}]: {mean_loss}")
    torch.save(model, "./torch_models/autoencoder-finetuned")

Loss at epoch [1 / 300]: 0.07079
Loss at epoch [2 / 300]: 0.06713
Loss at epoch [3 / 300]: 0.06512
Loss at epoch [4 / 300]: 0.0638
Loss at epoch [5 / 300]: 0.06284
Loss at epoch [6 / 300]: 0.06208
Loss at epoch [7 / 300]: 0.06146
Loss at epoch [8 / 300]: 0.06093
Loss at epoch [9 / 300]: 0.06046
Loss at epoch [10 / 300]: 0.06005
Loss at epoch [11 / 300]: 0.05968
Loss at epoch [12 / 300]: 0.05934
Loss at epoch [13 / 300]: 0.05903
Loss at epoch [14 / 300]: 0.05874
Loss at epoch [15 / 300]: 0.05846
Loss at epoch [16 / 300]: 0.0582
Loss at epoch [17 / 300]: 0.05796
Loss at epoch [18 / 300]: 0.05773
Loss at epoch [19 / 300]: 0.0575
Loss at epoch [20 / 300]: 0.05729
Loss at epoch [21 / 300]: 0.05708
Loss at epoch [22 / 300]: 0.05689
Loss at epoch [23 / 300]: 0.0567
Loss at epoch [24 / 300]: 0.05651
Loss at epoch [25 / 300]: 0.05634
Loss at epoch [26 / 300]: 0.05616
Loss at epoch [27 / 300]: 0.056
Loss at epoch [28 / 300]: 0.05583
Loss at epoch [29 / 300]: 0.05568
Loss at epoch [30 / 300]: 0.0

## Baseline KMeans Clustering

In [20]:
from sklearn.cluster import KMeans
import numpy as np
# Use the actual number of clusters as parameter
n_clusters = len(np.unique(y))

# Apply kmeans using sklearn
kmeans = KMeans(n_clusters=n_clusters, random_state=rs)

# Get training predictions
y_pred_kmeans = kmeans.fit_predict(X)

In [53]:
from sklearn.metrics import adjusted_mutual_info_score, adjusted_rand_score
print("Accuracy of k-Means Clustering:")
ami_kmeans = adjusted_mutual_info_score(y, y_pred_kmeans)
ari_kmeans = adjusted_rand_score(y, y_pred_kmeans)
print(f"AMI: {np.round(ami_kmeans, 3)}")
print(f"ARI: {np.round(ari_kmeans, 3)}")

Accuracy of k-Means Clustering:
AMI: 0.5
ARI: 0.367


## Apply Auto-Encoder

### Evaluate Pre-trained Auto-Encoder

In [40]:
model = torch.load("./torch_models/autoencoder")
X_embedded_pretrained = model(Tensor(X).to(device))[0]

In [41]:
# Apply kmeans using sklearn
kmeans = KMeans(n_clusters=n_clusters, random_state=rs)

# Convert Data to CPU and apply kmeans to get the cluster predictions
y_pred_AE_pretrained = kmeans.fit_predict(X_embedded_pretrained.detach().cpu())

In [60]:
print("Accuracy for Auto-Encoder:")
ami_AE_pretrained = adjusted_mutual_info_score(y, y_pred_AE_pretrained)
ari_AE_pretrained = adjusted_rand_score(y, y_pred_AE_pretrained)
print(f"AMI: {np.round(ami_AE_pretrained * 100, 1)}")
print(f"ARI: {np.round(ari_AE_pretrained * 100, 1)}")

Accuracy for Auto-Encoder:
AMI: 55.2
ARI: 47.2


### Evaluate Fine-tuned Auto-Encoder

In [43]:
model = torch.load("./torch_models/autoencoder-finetuned")
X_embedded = model(Tensor(X).to(device))[0]

In [45]:
# Apply kmeans using sklearn
kmeans = KMeans(n_clusters=n_clusters, random_state=rs)

# Get training predictions
y_pred_AE_finetuned = kmeans.fit_predict(X_embedded.detach().cpu())

In [61]:
print("Accuracy for Auto-Encoder:")
ami_AE_finetuned = adjusted_mutual_info_score(y, y_pred_AE_finetuned)
ari_AE_finetuned = adjusted_rand_score(y, y_pred_AE_finetuned)
print(f"AMI: {np.round(ami_AE_finetuned*100, 1)}")
print(f"ARI: {np.round(ari_AE_finetuned*100, 1)}")

Accuracy for Auto-Encoder:
AMI: 70.7
ARI: 63.6


## Overall Evaluation Result

In [71]:
import pandas as pd
df = pd.DataFrame({"Clustering Approach": ["k-Means", "Auto-Encoder (pre-trained)", "Auto-Encoder (fine-tuned)"],
                   "AMI": [ami_kmeans, ami_AE_pretrained, ami_AE_finetuned],
                  "ARI": [ari_kmeans, ari_AE_pretrained, ari_AE_finetuned]})
df["AMI"] *= 100
df["ARI"] *= 100
df["AMI"] = df["AMI"].round(1)
df["ARI"] = df["ARI"].round(1)

In [72]:
df

Unnamed: 0,Clustering Approach,AMI,ARI
0,k-Means,50.0,36.7
1,Auto-Encoder (pre-trained),55.2,47.2
2,Auto-Encoder (fine-tuned),70.7,63.6


In [None]:
print("Accuracy for Auto-Encoder:")
print(f"AMI: {adjusted_mutual_info_score(y, y_pred_train_AE)}")
print(f"ARI: {adjusted_rand_score(y, y_pred_train_AE)}")
print(f"Accuracy: {clustering_accuracy(y, y_pred_train_AE)}")

In [None]:
decoded_cluster_centers = model.decoder(Tensor(kmeans.cluster_centers_).cuda())

In [None]:
fig, axes = plt.subplots(1, n_clusters)
n_clusters = len(decoded_cluster_centers)

for c_i in range(n_clusters):

    # get center for cluster c_i and reshape it for plotting
    image = decoded_cluster_centers[c_i].reshape(28,28)

    # plot the cluster centers
    axes[c_i].imshow(image.detach().cpu(), 
              cmap='grey')

    # Styling: Turn off x/y ticks
    axes[c_i].set_yticks([])
    axes[c_i].set_xticks([])

Using Auto-Encoders, we are able to increase the accuracy of the simple k-Means clustering algorithm by more than 20%-points!

In [None]:
from sklearn.manifold import TSNE
X_viz = TSNE(n_components=2, n_iter=1000).fit_transform(X_train_embedded.detach().cpu()[0:10000])

In [None]:
kmeans_viz = KMeans(n_clusters=10)
y_viz = kmeans_viz.fit_predict(X_viz)
ax = plt.scatter(X_viz[0:1000, 0],
                 X_viz[0:1000, 1], 
                 c=y_viz[0:1000])
plt.xticks([])
plt.yticks([])

In [None]:
ax = plt.scatter(X_viz[0:1000, 0], X_viz[0:1000, 1], c="grey")
plt.xticks([])
plt.yticks([])