In [None]:
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.decomposition import PCA
from sklearn.manifold import TSNE

def explore_and_visualize_digits(image_dataset_path, num_samples=25, subset_size=1000):
    """
    Explore and visualize an unlabelled handwritten digits dataset.
    
    Parameters:
    - image_dataset_path (str): Path to the .npy file containing the dataset.
    - num_samples (int): Number of sample images to display in the grid.
    - subset_size (int): Size of subset for t-SNE visualization to reduce computation.
    
    Returns:
    - None: Displays plots and prints analysis results.
    """
    
    # Load the dataset
    Xtrain_img = np.load(image_dataset_path)
    print("Dataset shape:", Xtrain_img.shape)
    print("Data type:", Xtrain_img.dtype)
    print(f"Pixel value range: [{Xtrain_img.min():.2f}, {Xtrain_img.max():.2f}]")
    
    # Reshape for visualization (remove channel dimension)
    Xtrain_2d = Xtrain_img.reshape(-1, 28, 28)
    print("Reshaped for visualization:", Xtrain_2d.shape)
    
    # Compute basic statistics
    pixel_mean = Xtrain_img.mean()
    pixel_std = Xtrain_img.std()
    print(f"Mean pixel value: {pixel_mean:.4f}, Std: {pixel_std:.4f}")
    
    # Plot pixel intensity distribution
    plt.figure(figsize=(8, 6))
    sns.histplot(Xtrain_img.flatten(), bins=50, kde=True)
    plt.title("Pixel Intensity Distribution")
    plt.xlabel("Pixel Value")
    plt.ylabel("Frequency")
    plt.show()
    
    # Visualize sample images
    plt.figure(figsize=(10, 10))
    sample_indices = np.random.choice(Xtrain_2d.shape[0], num_samples, replace=False)
    for i, idx in enumerate(sample_indices):
        plt.subplot(5, 5, i + 1)
        plt.imshow(Xtrain_2d[idx], cmap='gray')
        plt.axis('off')
    plt.suptitle("Sample Handwritten Digits")
    plt.tight_layout()
    plt.show()
    
    # Reshape for dimensionality reduction
    Xtrain_flat = Xtrain_img.reshape(Xtrain_img.shape[0], -1)
    print("Flattened shape for PCA/t-SNE:", Xtrain_flat.shape)
    
    # Apply PCA for 2D visualization
    pca = PCA(n_components=2)
    X_pca = pca.fit_transform(Xtrain_flat)
    explained_variance_ratio = pca.explained_variance_ratio_.sum()
    print(f"PCA: Explained variance ratio (2 components): {explained_variance_ratio:.4f}")
    
    # Plot PCA results
    plt.figure(figsize=(8, 6))
    plt.scatter(X_pca[:, 0], X_pca[:, 1], alpha=0.5, s=10)
    plt.title("PCA Projection of Handwritten Digits")
    plt.xlabel("Principal Component 1")
    plt.ylabel("Principal Component 2")
    plt.show()
    
    # Apply t-SNE for 2D visualization (use a subset for speed)
    subset_indices = np.random.choice(Xtrain_flat.shape[0], subset_size, replace=False)
    X_subset = Xtrain_flat[subset_indices]
    tsne = TSNE(n_components=2, random_state=42)
    X_tsne = tsne.fit_transform(X_subset)
    
    # Plot t-SNE results
    plt.figure(figsize=(8, 6))
    plt.scatter(X_tsne[:, 0], X_tsne[:, 1], alpha=0.5, s=10)
    plt.title(f"t-SNE Projection of Handwritten Digits (Subset of {subset_size})")
    plt.xlabel("t-SNE Component 1")
    plt.ylabel("t-SNE Component 2")
    plt.show()

import numpy as np
import matplotlib.pyplot as plt
from sklearn.cluster import KMeans
from sklearn.mixture import GaussianMixture
from sklearn.metrics import silhouette_score, davies_bouldin_score
from sklearn.preprocessing import StandardScaler
import hdbscan
import tensorflow as tf
from tensorflow.keras import layers, models
import pandas as pd

def cluster_digits_with_autoencoder(image_dataset_path, num_clusters=10, subset_size=20000):
    """
    Cluster unlabelled handwritten digits using a convolutional autoencoder and compare with other algorithms.
    
    Parameters:
    - image_dataset_path (str): Path to the .npy file containing the dataset.
    - num_clusters (int): Number of clusters (default 10 for digits 0-9).
    - subset_size (int): Size of subset for faster computation (default 20,000).
    
    Returns:
    - results (dict): Dictionary with algorithm names and their results (labels, metrics).
    """
    
    # Load and preprocess the dataset
    Xtrain_img = np.load(image_dataset_path)
    print("Dataset shape:", Xtrain_img.shape)
    print("Data type:", Xtrain_img.dtype)
    print(f"Pixel value range: [{Xtrain_img.min():.2f}, {Xtrain_img.max():.2f}]")
    
    # Use a subset for faster computation
    indices = np.random.choice(Xtrain_img.shape[0], subset_size, replace=False)
    X_subset = Xtrain_img[indices]
    X_flat = X_subset.reshape(subset_size, -1)
    print("Subset shape (flattened):", X_flat.shape)
    
    # Standardize features for clustering (raw data)
    scaler = StandardScaler()
    X_scaled = scaler.fit_transform(X_flat)
    
    # Initialize results dictionary
    results = {}
    
    # Autoencoder + K-means
    print("Training Convolutional Autoencoder...")
    X_images = X_subset.reshape(-1, 28, 28, 1)
    
    # Build autoencoder
    input_img = layers.Input(shape=(28, 28, 1))
    x = layers.Conv2D(32, (3, 3), activation='relu', padding='same')(input_img)
    x = layers.MaxPooling2D((2, 2), padding='same')(x)
    x = layers.Conv2D(16, (3, 3), activation='relu', padding='same')(x)
    encoded = layers.MaxPooling2D((2, 2), padding='same')(x)
    
    x = layers.Conv2D(16, (3, 3), activation='relu', padding='same')(encoded)
    x = layers.UpSampling2D((2, 2))(x)
    x = layers.Conv2D(32, (3, 3), activation='relu', padding='same')(x)
    x = layers.UpSampling2D((2, 2))(x)
    decoded = layers.Conv2D(1, (3, 3), activation='sigmoid', padding='same')(x)
    
    autoencoder = models.Model(input_img, decoded)
    autoencoder.compile(optimizer='adam', loss='mse')
    
    # Define encoder and decoder
    encoder = models.Model(input_img, encoded)
    encoded_input = layers.Input(shape=(7, 7, 16))
    x = autoencoder.layers[-5](encoded_input)
    for layer in autoencoder.layers[-4:]:
        x = layer(x)
    decoder = models.Model(encoded_input, x)
    
    # Train autoencoder
    autoencoder.fit(X_images, X_images, epochs=10, batch_size=128, verbose=1)
    
    # Extract encoded features
    encoded_features = encoder.predict(X_images)
    encoded_features_flat = encoded_features.reshape(subset_size, -1)
    
    # Standardize encoded features
    encoded_scaled = scaler.fit_transform(encoded_features_flat)
    
    # Apply K-means on encoded features
    print("Running K-means on Autoencoder features...")
    kmeans_ae = KMeans(n_clusters=num_clusters, random_state=42)
    ae_labels = kmeans_ae.fit_predict(encoded_scaled)
    
    # Compute metrics
    sil_score_ae = silhouette_score(encoded_scaled, ae_labels, sample_size=1000)
    db_score_ae = davies_bouldin_score(encoded_scaled, ae_labels)
    
    # Decode centroids to image space
    centroids_encoded = kmeans_ae.cluster_centers_.reshape(num_clusters, 7, 7, 16)
    centroids_decoded = decoder.predict(centroids_encoded).reshape(num_clusters, 28, 28)
    
    # Store results
    results['Autoencoder_KMeans'] = {
        'labels': ae_labels,
        'silhouette_score': sil_score_ae,
        'davies_bouldin_score': db_score_ae,
        'num_clusters': num_clusters,
        'centroids': centroids_decoded,
        'reconstruction_loss': autoencoder.evaluate(X_images, X_images, verbose=0)
    }
    print(f"Autoencoder + K-means: {num_clusters} clusters, Silhouette: {sil_score_ae:.4f}, "
          f"DB: {db_score_ae:.4f}, Reconstruction Loss: {results['Autoencoder_KMeans']['reconstruction_loss']:.4f}")
    
    # Compare with other algorithms
    algorithms = {
        'KMeans': KMeans(n_clusters=num_clusters, random_state=42),
        'GMM': GaussianMixture(n_components=num_clusters, random_state=42),
        'HDBSCAN': hdbscan.HDBSCAN(min_cluster_size=50)
    }
    
    for name, algo in algorithms.items():
        print(f"Running {name}...")
        try:
            if name == 'GMM':
                cluster_labels = algo.fit_predict(X_scaled)
            else:
                cluster_labels = algo.fit_predict(X_scaled)
            
            num_clusters_found = len(set(cluster_labels)) - (1 if -1 in cluster_labels else 0)
            
            if num_clusters_found > 1:
                sil_score = silhouette_score(X_scaled, cluster_labels, sample_size=1000)
                db_score = davies_bouldin_score(X_scaled, cluster_labels)
            else:
                sil_score = np.nan
                db_score = np.nan
            
            results[name] = {
                'labels': cluster_labels,
                'silhouette_score': sil_score,
                'davies_bouldin_score': db_score,
                'num_clusters': num_clusters_found,
                'centroids': (algo.cluster_centers_ if name == 'KMeans' else 
                             algo.means_ if name == 'GMM' else None)
            }
            print(f"{name}: {num_clusters_found} clusters, Silhouette: {sil_score:.4f}, DB: {db_score:.4f}")
        
        except Exception as e:
            print(f"Error in {name}: {str(e)}")
            results[name] = {
                'labels': None,
                'silhouette_score': np.nan,
                'davies_bouldin_score': np.nan,
                'num_clusters': 0,
                'centroids': None
            }
    
    # Compare results
    comparison = {
        'Algorithm': [],
        'Silhouette Score': [],
        'Davies-Bouldin Score': [],
        'Number of Clusters': []
    }
    for name, result in results.items():
        comparison['Algorithm'].append(name)
        comparison['Silhouette Score'].append(result['silhouette_score'])
        comparison['Davies-Bouldin Score'].append(result['davies_bouldin_score'])
        comparison['Number of Clusters'].append(result['num_clusters'])
    
    comparison_df = pd.DataFrame(comparison)
    print("\nComparison of Clustering Algorithms:")
    print(comparison_df)
    
    # Visualize centroids
    for name in ['KMeans', 'GMM', 'Autoencoder_KMeans']:
        if name in results and results[name]['labels'] is not None and results[name]['centroids'] is not None:
            centroids = results[name]['centroids'].reshape(num_clusters, 28, 28)
            plt.figure(figsize=(10, 4))
            for i in range(num_clusters):
                plt.subplot(2, 5, i + 1)
                plt.imshow(centroids[i], cmap='gray')
                plt.title(f"Cluster {i}")
                plt.axis('off')
            plt.suptitle(f"{name} Cluster Centroids")
            plt.tight_layout()
            plt.show()
    
    return results



In [None]:
image_dataset_path = "../datasets/unlabelled_train_data_images.npy"
explore_and_visualize_digits(image_dataset_path, num_samples=25, subset_size=20000)

In [None]:
image_dataset_path = "../datasets/unlabelled_train_data_images.npy"
results = cluster_digits_with_autoencoder(image_dataset_path, num_clusters=10, subset_size=20000)
print(results)

In [None]:
import numpy as np
import matplotlib.pyplot as plt
from sklearn.cluster import KMeans
from sklearn.mixture import GaussianMixture
from sklearn.metrics import silhouette_score, davies_bouldin_score
from sklearn.preprocessing import StandardScaler
import hdbscan
import tensorflow as tf
from tensorflow.keras import layers, models
from tensorflow.keras.preprocessing.image import ImageDataGenerator
import pandas as pd
from scipy.ndimage import center_of_mass, rotate

def preprocess_images_for_rotation(X_images):
    X_aligned = np.zeros_like(X_images)
    for i in range(X_images.shape[0]):
        img = X_images[i].squeeze()
        cy, cx = center_of_mass(img)
        if np.isnan(cy) or np.isnan(cx):
            X_aligned[i] = X_images[i]
            continue
        m = np.zeros((3, 3))
        for y in range(img.shape[0]):
            for x in range(img.shape[1]):
                m[0, 0] += img[y, x]
                m[1, 0] += (x - cx) * img[y, x]
                m[0, 1] += (y - cy) * img[y, x]
                m[1, 1] += (x - cx) * (y - cy) * img[y, x]
                m[2, 0] += (x - cx)**2 * img[y, x]
                m[0, 2] += (y - cy)**2 * img[y, x]
        if m[0, 0] == 0:
            X_aligned[i] = X_images[i]
            continue
        angle = 0.5 * np.arctan2(2 * m[1, 1], m[2, 0] - m[0, 2]) * 180 / np.pi
        img_rotated = rotate(img, -angle, reshape=False, mode='nearest')
        X_aligned[i] = img_rotated[np.newaxis, :, :] if X_images.shape[1] == 1 else img_rotated[:, :, np.newaxis]
    return X_aligned

def cluster_digits_improved(image_dataset_path, num_clusters=10, subset_size=20000, use_augmentation=True):
    # Load and preprocess dataset
    Xtrain_img = np.load(image_dataset_path)
    print("Dataset shape:", Xtrain_img.shape)
    print(f"Pixel value range: [{Xtrain_img.min():.2f}, {Xtrain_img.max():.2f}]")
    
    indices = np.random.choice(Xtrain_img.shape[0], subset_size, replace=False)
    X_subset = Xtrain_img[indices]
    print("Subset shape:", X_subset.shape)
    
    print("Aligning images...")
    X_aligned = preprocess_images_for_rotation(X_subset)
    X_flat = X_aligned.reshape(subset_size, -1)
    
    scaler = StandardScaler()
    X_scaled = scaler.fit_transform(X_flat)
    
    results = {}
    
    # Autoencoder + K-means
    print("Training Convolutional Autoencoder...")
    X_images = X_aligned.reshape(-1, 28, 28, 1)
    
    # Build autoencoder
    input_img = layers.Input(shape=(28, 28, 1))
    x = layers.Conv2D(64, (3, 3), activation='relu', padding='same')(input_img)
    x = layers.MaxPooling2D((2, 2), padding='same')(x)
    x = layers.Conv2D(32, (3, 3), activation='relu', padding='same')(x)
    x = layers.MaxPooling2D((2, 2), padding='same')(x)
    x = layers.Conv2D(16, (3, 3), activation='relu', padding='same')(x)
    encoded = layers.MaxPooling2D((2, 2), padding='same')(x)
    
    x = layers.Conv2D(16, (3, 3), activation='relu', padding='same')(encoded)
    x = layers.UpSampling2D((2, 2))(x)
    x = layers.Conv2D(32, (3, 3), activation='relu', padding='same')(x)
    x = layers.UpSampling2D((2, 2))(x)
    x = layers.Conv2D(64, (3, 3), activation='relu', padding='same')(x)
    x = layers.UpSampling2D((2, 2))(x)
    x = layers.Cropping2D(cropping=((2, 2), (2, 2)))(x)
    decoded = layers.Conv2D(1, (3, 3), activation='sigmoid', padding='same')(x)
    
    autoencoder = models.Model(input_img, decoded)
    autoencoder.compile(optimizer='adam', loss='mse')
    
    # Define encoder
    encoder = models.Model(input_img, encoded)
    
    # Define decoder explicitly
    encoded_input = layers.Input(shape=(4, 4, 16))
    x = layers.Conv2D(16, (3, 3), activation='relu', padding='same')(encoded_input)
    x = layers.UpSampling2D((2, 2))(x)
    x = layers.Conv2D(32, (3, 3), activation='relu', padding='same')(x)
    x = layers.UpSampling2D((2, 2))(x)
    x = layers.Conv2D(64, (3, 3), activation='relu', padding='same')(x)
    x = layers.UpSampling2D((2, 2))(x)
    x = layers.Cropping2D(cropping=((2, 2), (2, 2)))(x)
    x = layers.Conv2D(1, (3, 3), activation='sigmoid', padding='same')(x)
    decoder = models.Model(encoded_input, x)
    
    # Copy weights from autoencoder to decoder
    for l1, l2 in zip(decoder.layers[1:], autoencoder.layers[7:]):
        l1.set_weights(l2.get_weights())
    
    # Train autoencoder
    if use_augmentation:
        datagen = ImageDataGenerator(rotation_range=30, fill_mode='nearest')
        datagen.fit(X_images)
        autoencoder.fit(datagen.flow(X_images, X_images, batch_size=128), 
                        steps_per_epoch=len(X_images) // 128, epochs=20, verbose=1)
    else:
        autoencoder.fit(X_images, X_images, epochs=20, batch_size=128, verbose=1)
    
    # Extract encoded features
    encoded_features = encoder.predict(X_images)
    encoded_features_flat = encoded_features.reshape(subset_size, -1)
    encoded_scaled = scaler.fit_transform(encoded_features_flat)
    
    # K-means on encoded features
    print("Running K-means on Autoencoder features...")
    kmeans_ae = KMeans(n_clusters=num_clusters, random_state=42)
    ae_labels = kmeans_ae.fit_predict(encoded_scaled)
    
    sil_score_ae = silhouette_score(encoded_scaled, ae_labels, sample_size=1000)
    db_score_ae = davies_bouldin_score(encoded_scaled, ae_labels)
    
    # Decode centroids
    centroids_encoded = kmeans_ae.cluster_centers_.reshape(num_clusters, 4, 4, 16)
    centroids_decoded = decoder.predict(centroids_encoded).reshape(num_clusters, 28, 28)
    
    results['Autoencoder_KMeans'] = {
        'labels': ae_labels,
        'silhouette_score': sil_score_ae,
        'davies_bouldin_score': db_score_ae,
        'num_clusters': num_clusters,
        'centroids': centroids_decoded,
        'reconstruction_loss': autoencoder.evaluate(X_images, X_images, verbose=0)
    }
    print(f"Autoencoder + K-means: {num_clusters} clusters, Silhouette: {sil_score_ae:.4f}, "
          f"DB: {db_score_ae:.4f}, Reconstruction Loss: {results['Autoencoder_KMeans']['reconstruction_loss']:.4f}")
    
    # Other algorithms
    algorithms = {
        'KMeans': KMeans(n_clusters=num_clusters, random_state=42),
        'GMM': GaussianMixture(n_components=num_clusters, random_state=42),
        'HDBSCAN': hdbscan.HDBSCAN(min_cluster_size=20)
    }
    
    for name, algo in algorithms.items():
        print(f"Running {name}...")
        try:
            if name == 'GMM':
                cluster_labels = algo.fit_predict(X_scaled)
            else:
                cluster_labels = algo.fit_predict(X_scaled)
            
            num_clusters_found = len(set(cluster_labels)) - (1 if -1 in cluster_labels else 0)
            sil_score = silhouette_score(X_scaled, cluster_labels, sample_size=1000) if num_clusters_found > 1 else np.nan
            db_score = davies_bouldin_score(X_scaled, cluster_labels) if num_clusters_found > 1 else np.nan
            
            results[name] = {
                'labels': cluster_labels,
                'silhouette_score': sil_score,
                'davies_bouldin_score': db_score,
                'num_clusters': num_clusters_found,
                'centroids': (algo.cluster_centers_ if name == 'KMeans' else algo.means_ if name == 'GMM' else None)
            }
            print(f"{name}: {num_clusters_found} clusters, Silhouette: {sil_score:.4f}, DB: {db_score:.4f}")
        except Exception as e:
            print(f"Error in {name}: {str(e)}")
            results[name] = {'labels': None, 'silhouette_score': np.nan, 'davies_bouldin_score': np.nan, 'num_clusters': 0, 'centroids': None}
    
    # Comparison
    comparison = {
        'Algorithm': [],
        'Silhouette Score': [],
        'Davies-Bouldin Score': [],
        'Number of Clusters': []
    }
    for name, result in results.items():
        comparison['Algorithm'].append(name)
        comparison['Silhouette Score'].append(result['silhouette_score'])
        comparison['Davies-Bouldin Score'].append(result['davies_bouldin_score'])
        comparison['Number of Clusters'].append(result['num_clusters'])
    
    comparison_df = pd.DataFrame(comparison)
    print("\nComparison of Clustering Algorithms:")
    print(comparison_df)
    
    # Visualize centroids
    for name in ['KMeans', 'GMM', 'Autoencoder_KMeans']:
        if name in results and results[name]['centroids'] is not None:
            centroids = results[name]['centroids'].reshape(num_clusters, 28, 28)
            plt.figure(figsize=(10, 4))
            for i in range(num_clusters):
                plt.subplot(2, 5, i + 1)
                plt.imshow(centroids[i], cmap='gray')
                plt.title(f"Cluster {i}")
                plt.axis('off')
            plt.suptitle(f"{name} Cluster Centroids")
            plt.tight_layout()
            plt.show()
    
    return results

if __name__ == "__main__":
    image_dataset_path = "../datasets/unlabelled_train_data_images.npy"
    results = cluster_digits_improved(image_dataset_path, num_clusters=10, subset_size=20000, use_augmentation=True)
    print(results)