Initial Imports:

In [175]:
import torchvision.datasets as datasets
import torchvision.transforms as transforms
from torchvision.transforms import Normalize
from torchvision.datasets import MNIST
from torch.utils.data import DataLoader, Subset
import pandas as pd
from sklearn.metrics.pairwise import cosine_similarity
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
from sklearn.cluster import SpectralClustering
from sklearn.metrics import silhouette_score
from sklearn.decomposition import NMF
from sklearn.preprocessing import StandardScaler
import random
from sklearn.metrics import accuracy_score

# Loading the data
For MNIST:

In [176]:
# Transformations to apply to the dataset
transform = transforms.Compose([
    transforms.ToTensor(),  # Convert images to PyTorch tensors
    transforms.Normalize((0.5,), (0.5,))  # Normalize the data
])

# Load the MNIST dataset
mnist_trainset = datasets.MNIST(root='./data', train=True, download=True, transform=transforms.ToTensor())
mnist_testset = datasets.MNIST(root='./data', train=False, download=True, transform=transforms.ToTensor())

desired_length = 1000

# Create indices for the subsampled dataset (randomly sampled)
indices = list(range(len(mnist_trainset)))
subset_indices = random.sample(indices, desired_length)

# Create a Subset using the indices
mnist_trainset = Subset(mnist_trainset, subset_indices)

# Create a DataLoader for the subsampled dataset




Paper results:

In [177]:
file_path = 'paper_results.csv'

# Read the CSV file into a Pandas DataFrame
data = pd.read_csv(file_path)
print("Results obtained in the paper:")
data

Results obtained in the paper:


Unnamed: 0,Method,MNIST ACC,MNIST NMI,USPS ACC,USPS NMI,CIFAR-10 ACC,CIFAR-10 NMI
0,K-means,0.58,0.49,0.48,0.42,0.14,0.12
1,Deep Cluster,0.86,0.83,0.67,0.69,,
2,Deep K-means,0.84,0.8,0.76,0.78,,
3,CSC No Flatten,0.85,0.79,0.83,0.78,0.12,0.08
4,CSC No Filter,0.83,0.76,0.84,0.79,0.14,0.1
5,CSC No Voting,0.82,0.77,0.82,0.76,0.14,0.1
6,CSC,0.86,0.81,0.83,0.79,0.15,0.11


### A - 1) Normalization

In [178]:
# Data normalization with min-max method
def min_max_scale_dataset(dataset, batch_size=64):
    
    min_pixel_value = float('inf')
    max_pixel_value = float('-inf')

    for images, _ in DataLoader(dataset, batch_size=batch_size, shuffle=True):
        min_pixel_value = min(min_pixel_value, images.min())
        max_pixel_value = max(max_pixel_value, images.max())

    
    min_max_scaler = Normalize(min_pixel_value, max_pixel_value)
    # Apply the min-max scaling to the dataset
    transform = transforms.Compose([transforms.ToTensor(), min_max_scaler])
    normalized_dataset = MNIST(root='./data', train=True, download=True, transform=transform)

    #select only 1000 datapoints
    desired_length = 1000
    indices = list(range(len(normalized_dataset)))
    subset_indices = random.sample(indices, desired_length)
    subset = Subset(normalized_dataset, subset_indices)

    normalized_loader = DataLoader(subset, batch_size=batch_size, shuffle=True)
    
    return normalized_loader

In [180]:
mnist_train_loader = min_max_scale_dataset(mnist_trainset)
mnist_test_loader = min_max_scale_dataset(mnist_testset)


### A - 2) Autoencoder

The autoencoder architecture

In [181]:
#Defining the model
class ConvAutoencoder(nn.Module):
    def __init__(self, input_channels):
        super(ConvAutoencoder, self).__init__()
        self.encoder = nn.Sequential(
            nn.Conv2d(input_channels, 16, kernel_size=3, stride=2, padding=1),
            nn.ReLU(),
        )
        self.decoder = nn.Sequential(
            nn.ConvTranspose2d(16, input_channels, kernel_size=3, stride=2, padding=1, output_padding=1),
            nn.Sigmoid(),
        )

        self.bottleneck = nn.Linear(16 * 14 * 14, 500)

    def forward(self, x):
        x = self.encoder(x)
        flatten_x = x.view(x.size(0), -1)  # Flatten the output from the encoder
        bottleneck_features = self.bottleneck(flatten_x)
        reconstructed = self.decoder(x)
        return bottleneck_features, reconstructed

In [182]:
input_channels_mnist = 1  # MNIST images are grayscale
input_channels_cifar = 3  # CIFAR-10 images have 3 channels (RGB)

mnist_autoencoder = ConvAutoencoder(input_channels_mnist)
cifar_autoencoder = ConvAutoencoder(input_channels_cifar)

In [183]:
criterion = nn.MSELoss()
optimizer_mnist = torch.optim.Adam(mnist_autoencoder.parameters(), lr=0.001)
optimizer_cifar = torch.optim.Adam(cifar_autoencoder.parameters(), lr=0.001)

The loop for training the model

In [184]:
NUM_EPOCH = 10
def training_loop (model, loader, optimizer):
    model.train()

    for epoch in range(NUM_EPOCH):
        
        for data in loader:
            img, _ = data
            optimizer.zero_grad()
            _, output = model(img)
            loss = criterion(output, img)
            loss.backward()
            optimizer.step()

        print(f'Epoch [{epoch + 1}/{NUM_EPOCH}], Loss: {loss.item():.4f}')

    return model

In [185]:
mnist_autoencoder = training_loop(mnist_autoencoder,mnist_train_loader,optimizer_mnist)
#cifar_autoencoder = training_loop(cifar_autoencoder,cifar_train_loader,optimizer_cifar)

Epoch [1/10], Loss: 0.2540
Epoch [2/10], Loss: 0.2367
Epoch [3/10], Loss: 0.2174
Epoch [4/10], Loss: 0.1961
Epoch [5/10], Loss: 0.1737
Epoch [6/10], Loss: 0.1511
Epoch [7/10], Loss: 0.1312
Epoch [8/10], Loss: 0.1126
Epoch [9/10], Loss: 0.0960
Epoch [10/10], Loss: 0.0816


The evaluation loop to extract 500 features from the bottleneck layer for each input image

In [186]:
def evaluation (model, loader):
    '''
    output shape : bottelneck_feature (nb img,500)
    '''
    model.eval()

    bottleneck_features_array = []
    outputs_array = []
    total_loss = 0
 
    for data in loader:
        img, _ = data
       
        bottleneck_features, output = model(img)

        for features in bottleneck_features:
            bottleneck_features_array.append(features.detach().numpy())
        outputs_array.append(output.detach().numpy())

        loss = criterion(output, img)
        total_loss += loss.item()

    average_loss = total_loss / len(loader)
    print("Evaluation loss : ",average_loss)
    
    return np.array(bottleneck_features_array), outputs_array


### B - Best features selection

Do the Non negativ Matrix Factorization (NMF) to estimate a decomposition W*H from V \
Calcul the error rate E from W*H+E = V \
Sort feature by error rate and keep the 50% best

In [187]:
def feature_filtering_nmf(features, k=1, removal_percentage=50):
    '''
    output shape : (nb img,250)
    '''

    #it doesn't work because of negative value so we remove the negative here
    min_value = np.min(features)
    features_shifted = features - min_value + 1e-10 

    # NMF : find the decomposition V=W*H
    nmf_model = NMF(n_components=k, init='random', random_state=None)
    nmf_features = nmf_model.fit_transform(features_shifted)

    #reconstruc W*H to evaluate with V to find E
    reconstructed_features = np.dot(nmf_features, nmf_model.components_)
    reconstruction_error = np.abs(features - reconstructed_features)

    # Sort features by their E
    sorted_indices = np.argsort(reconstruction_error, axis=1)
    num_features_to_keep = int((100 - removal_percentage) / 100 * features.shape[1])
    selected_features = features[np.arange(features.shape[0])[:, None], sorted_indices[:, :num_features_to_keep]]
    scaler = StandardScaler()
    standardized_features = scaler.fit_transform(selected_features)

    return np.array(standardized_features)

### Loop over A and B

do a loop over steps A and B  to increase number of features

In [188]:
features_matrix = None

for i in range(10):
    features,_ = evaluation(mnist_autoencoder,mnist_test_loader)
    sorted_features = feature_filtering_nmf(features)
    
    if i == 0 :
        features_matrix = sorted_features
    else :
        features_matrix = np.concatenate((features_matrix,sorted_features),axis=1)
        
features_matrix

Evaluation loss :  0.08111747447401285
Evaluation loss :  0.08110800059512258
Evaluation loss :  0.08110654586926103
Evaluation loss :  0.08110963786020875
Evaluation loss :  0.08112103212624788
Evaluation loss :  0.08110666135326028
Evaluation loss :  0.08111035078763962
Evaluation loss :  0.0811111694201827
Evaluation loss :  0.08110742270946503
Evaluation loss :  0.08110571559518576


array([[ 1.5090107 ,  0.23449323,  0.08312765, ...,  1.6877185 ,
         0.5099983 , -1.462617  ],
       [-2.2245822 , -0.59771365, -1.9275753 , ...,  1.442345  ,
         0.05739015,  0.79237   ],
       [ 0.05494425,  0.50627655, -1.3250059 , ..., -0.29777125,
         0.00958227,  1.9062787 ],
       ...,
       [-2.855108  , -0.34227055, -1.0463021 , ...,  1.2188302 ,
        -1.3655035 , -1.5777268 ],
       [ 0.63287157, -1.5598602 ,  0.99546474, ...,  0.08805925,
        -2.3163583 ,  1.2161319 ],
       [-1.2884805 , -0.4293022 , -2.388851  , ..., -0.9516373 ,
         0.13769329,  1.4528776 ]], dtype=float32)

### C - Variational Autoencoder

# Define the dimensions

# Define the Encoder

In [189]:
class Encoder(nn.Module):
    def __init__(self, latent_dim, hidden_dim, input_dim):
        super(Encoder, self).__init__()
        self.fc1 = nn.Linear(input_dim, hidden_dim)
        self.fc_mu = nn.Linear(hidden_dim, latent_dim)
        self.fc_logvar = nn.Linear(hidden_dim, latent_dim)

    def forward(self, x):
        x = F.relu(self.fc1(x))
        mu = self.fc_mu(x)
        logvar = self.fc_logvar(x)
        return mu, logvar

# Define the Decoder

In [190]:
class Decoder(nn.Module):
    def __init__(self, latent_dim, hidden_dim, input_dim):
        super(Decoder, self).__init__()
        self.fc2 = nn.Linear(latent_dim, hidden_dim)
        self.fc3 = nn.Linear(hidden_dim, input_dim)
        self.sigmoid = nn.Sigmoid()  # Adding a sigmoid activation function

    def forward(self, z):
        z = F.relu(self.fc2(z))
        reconstruction = self.sigmoid(self.fc3(z))  # Applying sigmoid activation
        return reconstruction

In [191]:
class SecondDecoder(nn.Module):
    def __init__(self, latent_dim, hidden_dim, input_dim):
        super(SecondDecoder, self).__init__()
        self.fc1 = nn.Linear(latent_dim, 2 * hidden_dim)
        self.fc2 = nn.Linear(2 * hidden_dim, 2 * hidden_dim)
        self.fc3 = nn.Linear(2 * hidden_dim, input_dim)
        self.relu = nn.ReLU()
        self.sigmoid = nn.Sigmoid()

    def forward(self, z):
        z = self.relu(self.fc1(z))
        z = self.relu(self.fc2(z))
        reconstruction = self.sigmoid(self.fc3(z))
        return reconstruction

In [192]:
class ThirdDecoder(nn.Module):
    def __init__(self, latent_dim, hidden_dim, input_dim):
        super(ThirdDecoder, self).__init__()
        self.fc1 = nn.Linear(latent_dim, 4 * hidden_dim)
        self.fc2 = nn.Linear(4 * hidden_dim, 2 * hidden_dim)
        self.fc3 = nn.Linear(2 * hidden_dim, hidden_dim)
        self.fc4 = nn.Linear(hidden_dim, input_dim)
        self.leaky_relu = nn.LeakyReLU()
        self.tanh = nn.Tanh()

    def forward(self, z):
        z = self.leaky_relu(self.fc1(z))
        z = self.leaky_relu(self.fc2(z))
        z = self.leaky_relu(self.fc3(z))
        reconstruction = self.tanh(self.fc4(z))
        return reconstruction

# Define the VAE

# Create instances of each encoder

In [193]:
input_dim = 2500
hidden_dim = 500
latent_dim = 100

In [194]:
decoder = Decoder(latent_dim, hidden_dim, input_dim)
secondDecoder = SecondDecoder(latent_dim, hidden_dim,input_dim)
thirdDecoder = ThirdDecoder(latent_dim, hidden_dim,input_dim)

# Put all encoders into a list
all_decoders = [decoder, secondDecoder, thirdDecoder]
all_decoders

[Decoder(
   (fc2): Linear(in_features=100, out_features=500, bias=True)
   (fc3): Linear(in_features=500, out_features=2500, bias=True)
   (sigmoid): Sigmoid()
 ),
 SecondDecoder(
   (fc1): Linear(in_features=100, out_features=1000, bias=True)
   (fc2): Linear(in_features=1000, out_features=1000, bias=True)
   (fc3): Linear(in_features=1000, out_features=2500, bias=True)
   (relu): ReLU()
   (sigmoid): Sigmoid()
 ),
 ThirdDecoder(
   (fc1): Linear(in_features=100, out_features=2000, bias=True)
   (fc2): Linear(in_features=2000, out_features=1000, bias=True)
   (fc3): Linear(in_features=1000, out_features=500, bias=True)
   (fc4): Linear(in_features=500, out_features=2500, bias=True)
   (leaky_relu): LeakyReLU(negative_slope=0.01)
   (tanh): Tanh()
 )]

In [195]:
class VAE(nn.Module):
    def __init__(self, all_decoders):
        super(VAE, self).__init__()
        self.encoder = Encoder(latent_dim, hidden_dim, input_dim)
        self.decoders = nn.ModuleList(all_decoders)

    def reparameterize(self, mu, logvar):
        std = torch.exp(0.5 * logvar)
        eps = torch.randn_like(std)
        return mu + eps * std

    def forward(self, x):
        mu, logvar = self.encoder(x)
        z = self.reparameterize(mu, logvar)
        
        reconstructions = []
        for decoder in self.decoders:
            reconstruction = decoder(z)
            reconstructions.append(reconstruction)

        return reconstructions, mu, logvar

# Initialize the VAE model

In [197]:
model = VAE(all_decoders)
model

VAE(
  (encoder): Encoder(
    (fc1): Linear(in_features=2500, out_features=500, bias=True)
    (fc_mu): Linear(in_features=500, out_features=100, bias=True)
    (fc_logvar): Linear(in_features=500, out_features=100, bias=True)
  )
  (decoders): ModuleList(
    (0): Decoder(
      (fc2): Linear(in_features=100, out_features=500, bias=True)
      (fc3): Linear(in_features=500, out_features=2500, bias=True)
      (sigmoid): Sigmoid()
    )
    (1): SecondDecoder(
      (fc1): Linear(in_features=100, out_features=1000, bias=True)
      (fc2): Linear(in_features=1000, out_features=1000, bias=True)
      (fc3): Linear(in_features=1000, out_features=2500, bias=True)
      (relu): ReLU()
      (sigmoid): Sigmoid()
    )
    (2): ThirdDecoder(
      (fc1): Linear(in_features=100, out_features=2000, bias=True)
      (fc2): Linear(in_features=2000, out_features=1000, bias=True)
      (fc3): Linear(in_features=1000, out_features=500, bias=True)
      (fc4): Linear(in_features=500, out_features=25

# Create a DataLoader

In [203]:
# Normalize the data to be in the [0, 1] range
max_val = features_matrix.max()
min_val = features_matrix.min()
normalized_features_matrix = (features_matrix - min_val) / (max_val - min_val)

features_tensor = torch.Tensor(normalized_features_matrix)
batch_size = 64
data_loader = DataLoader(features_tensor, batch_size=batch_size, shuffle=True)

#Dimension of data_loader : [938,1,64,2500]

##### Define the loss function (using the reconstruction loss and the KL divergence)

In [204]:
def loss_function(recon_x, x, mu, logvar):
    #print(f"recon_x shape: {recon_x.shape}, x shape: {x.shape}")
    recon_loss = F.binary_cross_entropy(recon_x, x, reduction='sum')
    kld_loss = -0.5 * torch.sum(1 + logvar - mu.pow(2) - logvar.exp())
    return recon_loss + kld_loss

##### Define the optimizer

In [205]:
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
optimizer

Adam (
Parameter Group 0
    amsgrad: False
    betas: (0.9, 0.999)
    capturable: False
    differentiable: False
    eps: 1e-08
    foreach: None
    fused: None
    lr: 0.001
    maximize: False
    weight_decay: 0
)

##### Training loop

In [206]:
num_epochs = 10

for epoch in range(num_epochs):
    total_loss = 0.0

    for batch in data_loader:
        optimizer.zero_grad()

        # Forward pass
        reconstructions, mu, logvar = model(batch)

        # Calculate the loss using your custom loss function
        loss = loss_function(reconstructions[0], batch, mu, logvar)  # Assuming using the first decoder for loss calculation

        # Backward pass and optimization
        loss.backward()
        optimizer.step()

        total_loss += loss.item()

    # Print average loss for the epoch
    average_loss = total_loss / len(data_loader.dataset)
    print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {average_loss:.4f}')

Epoch [1/10], Loss: 1743.2358
Epoch [2/10], Loss: 1736.9198
Epoch [3/10], Loss: 1734.8615
Epoch [4/10], Loss: 1733.8452
Epoch [5/10], Loss: 1733.0767
Epoch [6/10], Loss: 1732.5852
Epoch [7/10], Loss: 1732.2618
Epoch [8/10], Loss: 1731.9439
Epoch [9/10], Loss: 1731.6566
Epoch [10/10], Loss: 1731.5085


In [207]:
model.eval()

final_representations = []


for batch in data_loader:
    with torch.no_grad():
        outputs, _, _ = model(batch)

    representations = torch.stack(outputs, dim=1)

    final_representations.append(representations)


final_representations = torch.cat(final_representations, dim=0)


### D- Basic Subspace Clustering

##### Compute the similarity matrix using cosine similarity

In [208]:
similarity_list = []
for i in range(3):
    mat = final_representations[:,i,:]
    similarity_matrix = cosine_similarity(mat)
    print(similarity_matrix.shape)
    similarity_list.append(similarity_matrix)

similarity_list = np.array(similarity_list)
print(similarity_list.shape)


(1000, 1000)
(1000, 1000)
(1000, 1000)
(3, 1000, 1000)


##### Generate the graph Laplacian

In [209]:
laplacian_list = []
for similarity_matrix in similarity_list :
    degree_matrix = np.diag(np.sum(similarity_matrix, axis=1))
    laplacian_matrix = degree_matrix - similarity_matrix
    normalized_laplacian = np.dot(np.dot(np.sqrt(np.linalg.inv(degree_matrix)), laplacian_matrix), np.sqrt(np.linalg.inv(degree_matrix)))
    laplacian_list.append(normalized_laplacian)

laplacian_list = np.array(laplacian_list)
laplacian_list.shape
    

(3, 1000, 1000)

##### Compute the eigenvalues and eigenvectors

In [210]:
k_eigen_vector_list = []
for i in range(len(laplacian_list)):
    eigenvalues, eigenvectors = np.linalg.eigh(laplacian_list[i])
    k = 10
    list_k = []
    for j in range(k): 
        k_eigenvectors = eigenvectors[:, j] / np.linalg.norm(eigenvectors[:, j])
        list_k.append(k_eigenvectors)
    k_eigen_vector_list.append(list_k)
k_eigen_vector_list = np.array(k_eigen_vector_list)
print(k_eigen_vector_list.shape)
k_eigenvectors_matrix = np.reshape(k_eigen_vector_list,(30,1000))
k_eigenvectors_matrix.shape

(3, 10, 1000)


(30, 1000)

##### Perform K-Means clustering on selected eigenvectors

In [211]:


def kmean():
    max_r_ratio = float('-inf')
    best_r_ratio_clusters = None

    for i in range(2, 30):  # Starting from 2 clusters as a minimum
        spectral_model = SpectralClustering(n_clusters=i, affinity='nearest_neighbors')
        pseudo_labels = spectral_model.fit_predict(k_eigenvectors_matrix)
        # Calculate silhouette score as an example metric
        silhouette = silhouette_score(k_eigenvectors_matrix, pseudo_labels)
        if silhouette > max_r_ratio:
            max_r_ratio = silhouette
            best_r_ratio_clusters = i
            

    print(f"Maximum R ratio: {max_r_ratio} achieved with {best_r_ratio_clusters} clusters")
    return i

nb_clusters = kmean()

Maximum R ratio: 0.08098910003900528 achieved with 8 clusters


#### Accuracy 

In [212]:
spectral_model = SpectralClustering(n_clusters=nb_clusters, affinity='nearest_neighbors')
pseudo_labels = spectral_model.fit_predict(features_matrix)

#get all original labels
all_targets =  []
for _,targets in mnist_train_loader:
     all_targets= np.concatenate((all_targets,targets.tolist()))
all_targets = np.array(all_targets)

accuracy = accuracy_score(all_targets, pseudo_labels)
print(accuracy)
 
# VERY BAD xd

0.025
