In [308]:
# python
import numpy as np
import matplotlib.pyplot as plt
from typing import Any, List, Dict
from enum import Enum
from icecream import ic
import copy

import time

import a4_lib

In [309]:
# Loading dataset:
train_dataset = a4_lib.A4_EX1_CNN_HELPER.load_mnist_data(
    n_workers    = 1,
    augmentation = ["FLATTEN", "NORMALIZE"],
    shuffle      = True,
    train_set    = True,
)
test_dataset  = a4_lib.A4_EX1_CNN_HELPER.load_mnist_data(
    n_workers    = 1,
    augmentation = ["FLATTEN", "NORMALIZE"],
    shuffle      = False,
    train_set    = False,
)

train_X = next(iter(train_dataset))[0].numpy()
train_y = next(iter(train_dataset))[1].numpy()
test_X = next(iter(test_dataset))[0].numpy()
test_y = next(iter(test_dataset))[1].numpy()

=== Loading Data ... 
> Augmentation: ['FLATTEN', 'NORMALIZE']
=== Data Loaded [Testing] ===
=== Loading Data ... 
> Augmentation: ['FLATTEN', 'NORMALIZE']
=== Data Loaded [Training] ===


In [310]:
# PCA Feature Reduction
from sklearn.decomposition import PCA 

pca = PCA(20)
train_X_reduced = pca.fit_transform(train_X) 
test_X_reduced = pca.transform(test_X) 



In [311]:
ic(np.shape(train_X))
ic(np.shape(train_y))
ic(np.shape(test_X))
ic(np.shape(test_y))

TRAINING_SET = {}
TESTING_SET = {}
# Split training dataset based on labels:
for i in range(10):
    TRAINING_SET[i] = train_X_reduced[train_y == i]
    TESTING_SET[i] = test_X_reduced[test_y == i]


ic| np.shape(train_X): (60000, 784)
ic| np.shape(train_y): (60000,)
ic| np.shape(test_X): (10000, 784)
ic| np.shape(test_y): (10000,)


In [312]:
for i in range(10):
    ic(np.shape(TRAINING_SET[i]))

ic| np.shape(TRAINING_SET[i]): (5923, 20)
ic| np.shape(TRAINING_SET[i]): (6742, 20)
ic| np.shape(TRAINING_SET[i]): (5958, 20)
ic| np.shape(TRAINING_SET[i]): (6131, 20)
ic| np.shape(TRAINING_SET[i]): (5842, 20)
ic| np.shape(TRAINING_SET[i]): (5421, 20)
ic| np.shape(TRAINING_SET[i]): (5918, 20)
ic| np.shape(TRAINING_SET[i]): (6265, 20)
ic| np.shape(TRAINING_SET[i]): (5851, 20)
ic| np.shape(TRAINING_SET[i]): (5949, 20)


In [313]:
class EM_GMM:
    class VERSION(Enum):
        DIAGNOAL = "Diagnoal"
        SPHERICAL = "Spherical"

    PDF_MODELS = {
        VERSION.DIAGNOAL: # O(K + ND)
            lambda pi, mu, S, k, X:  \
                pi[k] * ( (np.product(S[k]) * (2 * np.pi)) ** (-1/2)) * np.exp(- 0.5 * np.sum(np.divide((X - mu[k]) ** 2, S[k]), axis=1)  ),
        VERSION.SPHERICAL: # O(ND)
            lambda pi, mu, S, k, X:  \
                pi[k] * ((S[k] ** len(X[0]) * (2 * np.pi)) ** (-1/2)) * np.exp( - 0.5 / S[k] * np.diag((X - mu[k]) @ (X - mu[k]).T )),
    }
    S_UPDATE = {
        VERSION.DIAGNOAL: # O(KND + K)
            lambda R_norm, X, mu:  [ R_norm[:,k].dot((X - mu[k]) ** 2) for k in range(len(mu)) ],
        VERSION.SPHERICAL: # O(KND)
            lambda R_norm, X, mu:  np.sum([ R_norm[:,k].dot((X - mu[k]) ** 2) for k in range(len(mu)) ]) / len(mu),
    }

    @staticmethod
    def fit(
        X,
        K: int,
        MAX_ITR: int,
        VER: VERSION,
        EARLY_STOPPING_LOSS_TOL: float = 0.001,
        verbose_level: a4_lib.VerboseLevel = a4_lib.VerboseLevel.HIGH
    ) -> Dict[str, Any]:
        print("=== Fitting [{}] :".format(VER))
        # const:                                (static memory):
        N, D = np.shape(X)

        # params      (run-time static memory, long-term cache):
        pi = np.full(K, 1/K)                                    # O(K)
        mu = np.random.rand(K, D)                               # O(K * D)
        if VER == EM_GMM.VERSION.DIAGNOAL:
            S = np.ones((K, D))                                 # O(K * D)
        elif VER == EM_GMM.VERSION.SPHERICAL:
            S = np.ones((K))                                    # O(1)
        else:
            raise ValueError("Invalid Version for EM_GMM fitting!")
        
        # placeholder  (run-time heap memory, short-term cache):
        R  = np.zeros((N, K))                                    # O(N * K)
        R_sum_n  = np.zeros((N))                                  # O(N)
        R_sum_k  = np.zeros((K))                                  # O(K)
        loss = np.zeros(MAX_ITR)
        iter_count = 0 
        # -----> [Space Complexity]: O(N x D + K x D + N x K)

        # select model:
        # Responsibility:
        pdf_ = EM_GMM.PDF_MODELS[VER]
        # S update:
        s_update_ = EM_GMM.S_UPDATE[VER]

        # train:
        for iter in range(MAX_ITR):                                     # O(MAX_ITR * (...))
            # [Step 2] Evaluate - Expectation:
            # Expectation:
            for k in range(K):
                R[:, k] = pdf_(pi=pi, mu=mu, S=S, k=k, X=X) # O(K * (O(__pdf_models__))) = O(KND)

            # Normalization:
            R_sum_n = np.sum(R, axis=1)                                 # O(NK)
            R = np.divide(R.T, R_sum_n).T                               # O(NK)

            # Negative log-likelihood (with correction):
            loss[iter] = - np.sum(np.log(R_sum_n))                      # O(2 * N)

            if verbose_level >= a4_lib.VerboseLevel.HIGH:
                print(" - itr:{:3d}> pi:{} mu:{} S:{} => Loss: {}".format(iter, pi, mu, S, loss[iter]))
            if verbose_level == a4_lib.VerboseLevel.MEDIUM:
                print(" - itr:{:3d}> pi:{} => Loss: {}".format(iter, pi, loss[iter]))

            # early stopping condition:
            if (iter > 1) and (np.abs(loss[iter] - loss[iter-1]) <= np.abs(loss[iter]) * EARLY_STOPPING_LOSS_TOL):
                break

            # [Step 1] Update - Maximization:
            # Normalization:
            R_sum_k = np.sum(R, axis=0)                                 # O(NK)
            R = np.divide(R, R_sum_k)                                   # O(NK)
            # Update:
            pi = R_sum_k / N                                            # O(K)
            mu = np.dot(R.T, X)                                         # O(<(KN), (ND)>) = O(KND)
            S  = s_update_(R_norm=R, X=X, mu=mu)                        # O(__s_update__)

            # print("     itr:{:3d}> Updated: pi:{} mu:{} S:{} ".format(iter, pi, mu, S))
            iter_count += 1
        # -----> [Time Complexity]: O(MAX_ITR x K x N x D)

        # pack parameters into a dictionary => model:
        return {
            "Ver."  : VER,
            # fit params:
            "pi"    : pi,
            "mu"    : mu,
            "S"     : S,
            # fit model status:
            "K"             : K,
            "iter_count"    : iter_count,
            "MAX_ITR"       : EARLY_STOPPING_LOSS_TOL,
            "ESTOP_LOSS_TOL": EARLY_STOPPING_LOSS_TOL,
        }
    
    def predict(
        X,
        model
    ):
        pdf_ = EM_GMM.PDF_MODELS[model["Ver."]]
        N, D = np.shape(X)
        R  = np.zeros((N, model["K"]))
        for k in range(model["K"]):
            R[:, k] = pdf_(pi=model["pi"],mu=model["mu"],S= model["S"], k=k, X=X) 
        Probability = np.dot(R, model["pi"])
        return Probability


In [314]:
# PART 3 : d-GMM -------:
# Param:
USER_PARAM_N_SAMPLES                = 100
USER_PARAM_VERSION                  = EM_GMM.VERSION.DIAGNOAL#DIAGNOAL
USER_PARAM_K                        = 5
USER_PARAM_MAX_ITR                  = 100
USER_PARAM_EARLY_STOPPING_LOSS_TOL  = 1e-30
USER_PAEAM_VERBOSE                  = a4_lib.VerboseLevel.MEDIUM
# Placeholder:
D_GMM_MODELS = []

# TRAIN:
for d in range(10):
    t = time.time()
    model = EM_GMM.fit(
        X       = TRAINING_SET[d][0:USER_PARAM_N_SAMPLES, :],
        K       = USER_PARAM_K,
        VER     = USER_PARAM_VERSION,
        MAX_ITR = USER_PARAM_MAX_ITR,
        EARLY_STOPPING_LOSS_TOL = USER_PARAM_EARLY_STOPPING_LOSS_TOL,
        verbose_level = USER_PAEAM_VERBOSE,
    )
    D_GMM_MODELS.append(model)
    # report:
    if USER_PAEAM_VERBOSE >= a4_lib.VerboseLevel.HIGH:
        print(" Summary: [{}] n={} > Elappsed: {} s | MODEL: \n {} \n".format(d, USER_PARAM_N_SAMPLES, time.time() - t, model))
    elif USER_PAEAM_VERBOSE >= a4_lib.VerboseLevel.LOW:
        print(" Summary: [{}] n={} > Elappsed: {} s  \n".format(d, USER_PARAM_N_SAMPLES, time.time() - t))


# PREDICT:
dGMM_predict_ = lambda X, models: np.argmax([ EM_GMM.fit(X=x, model=m_) for m_ in models])

= Fitting [VERSION.DIAGNOAL] :
 - itr:  0> pi:[0.2 0.2 0.2 0.2 0.2] => Loss: 17631.32903879596
 - itr:  1> pi:[0.22217994 0.22586526 0.175752   0.13083922 0.24536357] => Loss: 3707.5333827660693
 - itr:  2> pi:[0.22854145 0.23028609 0.17119499 0.1311431  0.23883438] => Loss: 3682.7066394727594
 - itr:  3> pi:[0.24028672 0.22992826 0.16838285 0.12829961 0.23310255] => Loss: 3662.065630036023
 - itr:  4> pi:[0.25369746 0.23105428 0.16993218 0.12765408 0.21766199] => Loss: 3646.1307756301335
 - itr:  5> pi:[0.26489997 0.23118074 0.17095323 0.12660068 0.20636539] => Loss: 3640.956321800594
 - itr:  6> pi:[0.27185966 0.2279557  0.17098427 0.12913576 0.2000646 ] => Loss: 3637.039741769289
 - itr:  7> pi:[0.27993847 0.2239817  0.16892868 0.13008876 0.19706239] => Loss: 3634.3003610443866
 - itr:  8> pi:[0.28835825 0.2206566  0.16257035 0.13123071 0.19718409] => Loss: 3631.8032877371224
 - itr:  9> pi:[0.29416535 0.21749894 0.15279393 0.13626842 0.19927336] => Loss: 3628.8950187803384
 - itr: 

In [315]:
# TEST
for d in range(10):
    y_predx = []
    for i in range(10):
        y_pred = EM_GMM.predict(
            X     = TESTING_SET[d][0:USER_PARAM_N_SAMPLES, :],
            model = D_GMM_MODELS[i],
        )
        y_predx.append(y_pred)
    accuracy = np.sum(np.argmax(y_predx, axis=0) == d)/USER_PARAM_N_SAMPLES
    print("[{}]> Accuracy: {:0.2f} %".format(d, accuracy*100))

[0]> Accuracy: 4.00 %
[1]> Accuracy: 8.00 %
[2]> Accuracy: 8.00 %
[3]> Accuracy: 12.00 %
[4]> Accuracy: 17.00 %
[5]> Accuracy: 19.00 %
[6]> Accuracy: 8.00 %
[7]> Accuracy: 20.00 %
[8]> Accuracy: 12.00 %
[9]> Accuracy: 18.00 %


In [316]:
np.diag([[0,1],[1,2]])

array([0, 2])