In [20]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import struct

In [21]:
train_images_path = 'dataset/train-images.idx3-ubyte'
train_labels_path = 'dataset/train-labels.idx1-ubyte'
test_images_path = 'dataset/t10k-images.idx3-ubyte'
test_labels_path = 'dataset/t10k-labels.idx1-ubyte'

def load_idx(filename):
    with open(filename, 'rb') as f:
        zero, data_type, dims = struct.unpack('>HBB', f.read(4))
        shape = tuple(struct.unpack('>I', f.read(4))[0] for d in range(dims))
        return np.frombuffer(f.read(), dtype=np.uint8).reshape(shape)

In [22]:
from sklearn.decomposition import PCA
pca = PCA(n_components=20)
X_train = load_idx(train_images_path).reshape(60000, 784)
y_train = load_idx(train_labels_path)
X_test = load_idx(test_images_path).reshape(10000, 784)
y_test = load_idx(test_labels_path)

X_train_pca = pca.fit_transform(X_train)
X_test_pca = pca.transform(X_test)


from sklearn.preprocessing import StandardScaler
sc = StandardScaler()
X_train_pca = sc.fit_transform(X_train_pca)
X_test_pca = sc.transform(X_test_pca)

In [23]:
print('X_train shape:', X_train_pca.shape)
print('X_test shape:', X_test_pca.shape)
print('y_train shape:', y_train.shape)
print('y_test shape:', y_test.shape)

X_train shape: (60000, 20)
X_test shape: (10000, 20)
y_train shape: (60000,)
y_test shape: (10000,)


In [24]:
X_train_pca

array([[ 0.21485558, -0.63392669, -0.05330357, ...,  1.4788542 ,
         1.38552912,  0.81848814],
       [ 1.75396433, -0.59780343,  1.296685  , ...,  0.1904717 ,
         0.26409796, -0.52068984],
       [-0.08988901,  0.7951055 , -0.40989698, ..., -0.58292729,
        -0.04552189, -2.19985469],
       ...,
       [-0.30868215,  0.32454813, -0.56015577, ..., -0.17654919,
         0.52106816,  1.17603901],
       [ 0.22642506, -0.01133729,  1.11733798, ..., -0.675643  ,
        -0.38153217,  0.98985009],
       [-0.30067703, -0.05011575,  1.20901145, ...,  1.28963031,
         0.48043359, -1.03866323]])

In [25]:
def initialize_parameters(X, K):
    n, d = X.shape
    responsibilities = np.random.rand(n, K)
    responsibilities = responsibilities / responsibilities.sum(axis=1, keepdims=True)
    
    means = X[np.random.choice(n, K, replace=False)]
    weights = np.ones(K) / K
    
    # diagonal covariances as identity matrices
    covariances = np.array([np.eye(d) for _ in range(K)])
    
    return responsibilities, means, weights, covariances

def e_step(X, means, weights, covariances, K):
    n, d = X.shape
    responsibilities = np.zeros((n, K))
    
    for k in range(K):
        diff = X - means[k]
        inv_cov_diag = 1 / np.diag(covariances[k])
        exp_term = np.exp(-0.5 * np.sum(diff ** 2 * inv_cov_diag, axis=1))
        coef = weights[k] / np.sqrt(np.prod(np.diag(covariances[k])))
        responsibilities[:, k] = coef * exp_term
    
    responsibilities /= responsibilities.sum(axis=1, keepdims=True)
    return responsibilities

def m_step(X, responsibilities, K):
    n, d = X.shape
    weights = responsibilities.sum(axis=0) / n
    means = np.dot(responsibilities.T, X) / responsibilities.sum(axis=0)[:, np.newaxis]
    
    covariances = np.zeros((K, d, d))
    for k in range(K):
        diff = X - means[k]
        weighted_diff = responsibilities[:, k, np.newaxis] * diff
        covariances[k] = np.diag(np.dot(weighted_diff.T, diff) / responsibilities[:, k].sum())
    
    return means, weights, covariances

def log_likelihood(X, means, weights, covariances, K):
    n, d = X.shape
    log_likelihoods = np.zeros(n)
    
    for k in range(K):
        diff = X - means[k]
        inv_cov_diag = 1 / np.diag(covariances[k])
        exp_term = np.exp(-0.5 * np.sum(diff ** 2 * inv_cov_diag, axis=1))
        coef = weights[k] / np.sqrt(np.prod(np.diag(covariances[k])))
        log_likelihoods += coef * exp_term
    
    return np.sum(np.log(log_likelihoods))

def em_algorithm(X, K, tol=1e-5, max_iter=500):
    responsibilities, means, weights, covariances = initialize_parameters(X, K)
    log_likelihoods = []
    
    for iteration in range(max_iter):
        # e step, update responsibilities
        responsibilities = e_step(X, means, weights, covariances, K)
        
        # m step, update parameters (means, weights, covariances)
        means, weights, covariances = m_step(X, responsibilities, K)
        
        # Compute the log-likelihood
        log_likelihood_curr = log_likelihood(X, means, weights, covariances, K)
        log_likelihoods.append(log_likelihood_curr)
        
        # Check for convergence
        if iteration > 0 and abs(log_likelihood_curr - log_likelihoods[-2]) < tol * abs(log_likelihood_curr):
            break
    
    return means, weights, covariances, responsibilities, log_likelihoods

# K = 10
# means, weights, covariances, responsibilities, log_likelihoods = em_algorithm(X_train_pca, K)
# print(means, weights, covariances, responsibilities, log_likelihoods)


[[ 2.02437606 -0.49148246  0.14507152 -0.21892737 -1.99115848  0.54067845
   0.26848735 -0.78103489  0.32132791  0.74703492 -0.30804126  0.51787238
  -0.28139649  0.17177089  0.44335001  0.01631751  0.28366148 -0.11129851
   0.23263761 -0.31363938]
 [ 0.94034654 -0.40714981 -0.19704464 -0.18651383 -0.04433035 -0.3694063
  -0.09271826  0.07401276  0.26528453 -0.54436722  0.15430353 -0.43640322
  -0.28380789 -0.33495419 -0.06661999 -0.08351695 -0.06578946  0.02744067
  -0.49125219  0.49683461]
 [-0.77331498  0.94010884 -0.05878871 -0.47965368 -0.0409193   0.15337332
   0.24957371 -0.12728275 -0.26043702 -0.28752336 -0.53537827  0.10265759
  -0.58524666 -0.24768054  0.59051776 -0.27507232  0.12821122  0.05479359
   0.13330785  0.15129254]
 [ 0.36096793 -0.58948847  0.7406867   1.17462259  0.88813236  0.9644507
   0.33173513  0.74662143 -0.84649634  0.55517848  0.30719582  0.04344685
  -0.70681113 -0.30147136  0.65986934  0.05799599 -0.40952841 -0.04957824
   0.01290013 -0.30615894]
 [ 0.6

In [26]:

def bayes_classifier(X_test, means_list, covariances_list, weights_list, priors, K):
    n_test = X_test.shape[0]
    num_classes = len(means_list)
    
    log_probs = np.zeros((n_test, num_classes))
    
    for c in range(num_classes):
        for k in range(K):
            diff = X_test - means_list[c][k]
            inv_cov_diag = 1 / np.diag(covariances_list[c][k])
            exp_term = np.exp(-0.5 * np.sum(diff ** 2 * inv_cov_diag, axis=1))
            coef = weights_list[c][k] / np.sqrt(np.prod(np.diag(covariances_list[c][k])))
            log_probs[:, c] += coef * exp_term
        
        log_probs[:, c] = np.log(log_probs[:, c] + 1e-8)  # Log to avoid underflow
        log_probs[:, c] += np.log(priors[c]) 
    
    return np.argmax(log_probs, axis=1)

def classify_and_evaluate(X_train, y_train, X_test, y_test, K_values=[5, 10, 20, 30]):
    num_classes = 10
    error_rates = {}

    for K in K_values:
        print(f"Training GMM with K = {K}")
        means_list, covariances_list, weights_list, priors = [], [], [], []

        for c in range(num_classes):
            X_class = X_train[y_train == c]  # get training data for class c
            priors.append(len(X_class) / len(X_train))  # prior
            
            # Fit GMM for this currentt class
            means, weights, covariances, responsibilities, _ = em_algorithm(X_class, K)
            means_list.append(means)
            covariances_list.append(covariances)
            weights_list.append(weights)

        # Classify test data
        y_pred = bayes_classifier(X_test, means_list, covariances_list, weights_list, priors, K)
        
        # Compute error rate
        error_rate = np.mean(y_pred != y_test)
        error_rates[K] = error_rate
        print(f"Error rate for K = {K}: {error_rate:.4f}")
    
    return error_rates

error_rates = classify_and_evaluate(X_train_pca, y_train, X_test_pca, y_test)
# print(error_rates)


Training GMM with K = 5
Error rate for K = 5: 0.0908
Training GMM with K = 10
Error rate for K = 10: 0.0737
Training GMM with K = 20
Error rate for K = 20: 0.0589
Training GMM with K = 30
Error rate for K = 30: 0.0566
{5: 0.0908, 10: 0.0737, 20: 0.0589, 30: 0.0566}
