In [19]:
import torch
import torch.nn as nn
import torch_optimizer as optim_look  # Using RAdam
from torch.utils.data import DataLoader
import torchvision.transforms as transforms
import torchvision.datasets as datasets

import numpy as np
from sklearn.cluster import MiniBatchKMeans
from sklearn.metrics import silhouette_score, davies_bouldin_score, calinski_harabasz_score
from sklearn.preprocessing import normalize
from sklearn.decomposition import PCA
import matplotlib.pyplot as plt

In [20]:
class Autoencoder(nn.Module):
    def __init__(self, input_dim=784, latent_dim=64):
        super(Autoencoder, self).__init__()
        self.encoder = nn.Sequential(
            nn.Linear(input_dim, 512),
            nn.BatchNorm1d(512),
            nn.ReLU(),
            nn.Linear(512, 256),
            nn.BatchNorm1d(256),
            nn.ReLU(),
            nn.Linear(256, latent_dim),
            nn.Tanh()
        )
        self.decoder = nn.Sequential(
            nn.Linear(latent_dim, 256),
            nn.BatchNorm1d(256),
            nn.ReLU(),
            nn.Linear(256, 512),
            nn.BatchNorm1d(512),
            nn.ReLU(),
            nn.Linear(512, input_dim),
            nn.Sigmoid()
        )

    def forward(self, x):
        z = self.encoder(x)
        return self.decoder(z)

    def embed(self, x):
        return self.encoder(x)


In [21]:
input_size = 784
latent_size = 64
learning_rate = 0.001
batch_size = 256
epochs = 20

In [22]:
transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Lambda(lambda x: x.view(-1))
])

train_dataset = datasets.MNIST(root='./data', train=True, transform=transform, download=True)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)

In [23]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = Autoencoder(input_size, latent_size).to(device)
criterion = nn.SmoothL1Loss()  # Changed from MSELoss
optimizer = optim_look.RAdam(model.parameters(), lr=learning_rate, weight_decay=1e-5)
scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=epochs)

In [24]:
patience = 3
best_loss = float('inf')
no_improvement_epochs = 0
model.train()
for epoch in range(epochs):
    total_loss = 0
    for data, _ in train_loader:
        data = data.to(device)
        recon = model(data)
        loss = criterion(recon, data)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        total_loss += loss.item()

    scheduler.step()
    avg_loss = total_loss / len(train_loader)
    print(f"Epoch [{epoch + 1}/{epochs}] - Loss: {avg_loss:.4f}")
    if avg_loss < best_loss:
        best_loss = avg_loss
        torch.save(model.state_dict(), "best_autoencoder.pth")
        no_improvement_epochs = 0  # Reset counter
    else:
        no_improvement_epochs += 1

    if no_improvement_epochs >= patience:
        print("Early stopping triggered.")
        break

Epoch [1/20] - Loss: 0.0391
Epoch [2/20] - Loss: 0.0106
Epoch [3/20] - Loss: 0.0066
Epoch [4/20] - Loss: 0.0049
Epoch [5/20] - Loss: 0.0041
Epoch [6/20] - Loss: 0.0035
Epoch [7/20] - Loss: 0.0032
Epoch [8/20] - Loss: 0.0029
Epoch [9/20] - Loss: 0.0027
Epoch [10/20] - Loss: 0.0026
Epoch [11/20] - Loss: 0.0024
Epoch [12/20] - Loss: 0.0023
Epoch [13/20] - Loss: 0.0023
Epoch [14/20] - Loss: 0.0022
Epoch [15/20] - Loss: 0.0021
Epoch [16/20] - Loss: 0.0021
Epoch [17/20] - Loss: 0.0020
Epoch [18/20] - Loss: 0.0020
Epoch [19/20] - Loss: 0.0020
Epoch [20/20] - Loss: 0.0020


In [None]:
model.load_state_dict(torch.load("best_autoencoder.pth"))
model.eval()

all_embeddings = []
all_labels = []
with torch.no_grad():
    for data, labels in train_loader:
        data = data.to(device)
        embeddings = model.embed(data)
        all_embeddings.append(embeddings.cpu().numpy())
        all_labels.append(labels.numpy())

X = np.vstack(all_embeddings)
y = np.hstack(all_labels)

In [None]:
X_norm = normalize(X)

In [None]:
kmeans = MiniBatchKMeans(n_clusters=10, batch_size=1024, random_state=42)
clusters = kmeans.fit_predict(X_norm)

In [None]:
# 1. Silhouette Score
sil_score = silhouette_score(X_norm, clusters)
print(f"\nSilhouette Score: {sil_score:.4f}")

# 2. Davies-Bouldin Index
db_index = davies_bouldin_score(X_norm, clusters)
print(f"Davies-Bouldin Index: {db_index:.4f}")

# 3. Calinski-Harabasz Index
ch_index = calinski_harabasz_score(X_norm, clusters)
print(f"Calinski-Harabasz Index: {ch_index:.4f}")

# 4. Visual clustering in 2D using PCA
pca = PCA(n_components=2)
X_pca = pca.fit_transform(X_norm)

In [None]:
plt.figure(figsize=(8, 6))
plt.scatter(X_pca[:, 0], X_pca[:, 1], c=clusters, cmap='tab10', s=10)
plt.title("Cluster Visualization (PCA Projection)")
plt.xlabel("Principal Component 1")
plt.ylabel("Principal Component 2")
plt.colorbar()
plt.show()

# 5. Evaluate with Actual Labels (not as a metric, but visualization aid)
plt.figure(figsize=(8, 6))
plt.scatter(X_pca[:, 0], X_pca[:, 1], c=y, cmap='tab10', s=10)
plt.title("Ground Truth Labels (PCA Projection)")
plt.xlabel("Principal Component 1")
plt.ylabel("Principal Component 2")
plt.colorba