<a href="https://colab.research.google.com/github/jundev1l2l/Machine-Learning/blob/master/mfa_tutorial.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Mixture of Factor Analysis - Tutorial

# utils.py

## dataset

In [None]:
from google.colab import drive

drive.mount("/content/drive")

In [None]:
import numpy as np
import pandas as pd


class Leukemia():
    """
    Leukemia Dataset

    attributes:


    methods:

    """
    def __init__(self):
        df = pd.read_excel("./drive/My Drive/Colab Notebooks/mfa-tutorial/leukemia.xlsx")[1:]  # [7129, 73]
        self.data = np.array(df.loc[2:, df.columns != "leukemia"]).transpose()  # [72, 7128]
        self.cluster = np.array(df.loc[1][1:] == "AML").astype(float)  # [72,]
    
    def get_dataset(self):
        return (self.data, self.cluster)

# model.py

- MFA algorithm implementation

In [None]:
def inv(mtx):
    return np.linalg.inv(mtx)

In [None]:
import numpy as np
import matplotlib.pyplot as plt

class MFA():
    """
    Mixture of Factor Analysis

    attributes:
    X, S, SS, R, M, A, V, P

    methods:
    train, _estep, _mstep, _plot, _plot_latent, _evaluation
    """

    def __init__(self, data, label, latent_dim, num_mix):
        self.X = data
        self.Y = label
        self.N, self.D = data.shape

        self.L, self.K = latent_dim, num_mix

        self.S = np.empty((self.N, self.K, self.L))
        self.SS = np.empty((self.N, self.K, self.L, self.L))
        self.R = np.empty((self.N, self.K))
        
        self.M = np.random.uniform(low=np.min(data), high=np.max(data), size=(self.K, self.D))  # initialize with N(0,1)
        self.A = np.random.randn(self.K, self.D, self.L)  # initialize with N(0,1)
        self.V = np.eye(self.D)  # initialize with I
        self.P = np.ones((self.K)) / self.K  # initialize with 1/K

    def train(self, epsilon, epochs, alpha=0.3, plot=True, eval=False):
        """
        repeat _estep() and _mstep() to optimize parameters
        """
        for epoch in range(epochs):
            self._estep()
            print(f"estep {epoch+1} done")
            deltaM = self._mstep()
            print(f"mstep {epoch+1} done with deltaM = {deltaM:0.4f}")
            if deltaM < epsilon:
                print(f"finished with deltaM = {deltaM:0.4f}")
                print()
                break
        if plot==True:
            self._plot(True, True, alpha)
        print()
        self._plot_latent()
        if eval==True:
            print()
            self._evaluation()

    def _estep(self):
        """
        update R, S, SS
        """
        # R
        for t in range(self.N):
            for k in range(self.K):
                exp_term = - 0.5 * (self.X[t] - self.M[k]).transpose() @ inv(self.A[k] @ self.A[k].transpose() + self.V) @ (self.X[t] - self.M[k])
                exp_term = np.clip(exp_term, -1.0e2, 1.0e2)
                exp_term = np.nan_to_num(exp_term, nan = 0.0)
                self.R[t,k] = self.P[k] * np.exp(exp_term)
        
        self.R = self.R / np.sum(self.R, axis=1).reshape(-1,1)
        
        # S, SS
        phi = np.empty((self.K, self.L, self.D))
        for k in range(self.K):
            phi[k] = self.A[k].transpose() @ (self.A[k] @ self.A[k].transpose() + self.V)
        for t in range(self.N):
            for k in range(self.K):
                self.S[t,k] = phi[k] @ (self.X[t] - self.M[k])
                self.SS[t,k] = np.eye(self.L) - phi[k] @ self.A[k] + phi[k] @ (self.X[t] - self.M[k]).reshape(-1,1) @ (self.X[t] - self.M[k]).reshape(-1,1).transpose() @ phi[k].transpose()

    def _mstep(self):
        """
        update A, M, V, P

        return deltaM = Frobenius_Norm(M_new - M_old)
        """
        # A
        for k in range(self.K):
            a, b = 0, 0
            for t in range(self.N):
                a += self.R[t,k] * (self.X[t] - self.M[k]).reshape(-1,1) @ self.S[t,k].reshape(-1,1).transpose()
                b += self.R[t,k] * self.SS[t,k]
            self.A[k] = a @ inv(b)
        self.A = np.clip(self.A, 1e-10, 1e10)
        self.A = np.nan_to_num(self.A, nan = 0.0)

        # M
        sum = np.sum(self.R, axis=0)
        M_old = np.copy(self.M)
        for k in range(self.K):
            a = 0
            for t in range(self.N):
                a += self.R[t,k] * (self.X[t] - self.A[k] @ self.S[t,k]) / sum[k]
            self.M[k] = a
        deltaM = np.sum((self.M - M_old)**2)

        # V
        a = 0
        for t in range(self.N):
            for k in range(self.K):
                a += self.R[t,k] * ((self.X[t] - self.M[k]).reshape(-1,1) @ (self.X[t] - self.M[k]).reshape(-1,1).transpose() - self.A[k] @ self.S[t,k].reshape(-1,1) @ (self.X[t] - self.M[k]).reshape(-1,1).transpose()) / self.N
        self.V = np.diag(np.diag(a)).astype(np.float64)

        # P
        P = np.sum(self.R, axis=0) / self.N

        return deltaM

    def _plot(self, cluster, regression, alpha=0.3):
        """
        plot clusters and regression lines
        """
        print(f"num_mix: {self.K}")

        if cluster:
            # clustering
            cluster_pred = np.array([1 if self.R[t][0] > 0.5 else 0 if self.R[t][0] == 0.5 else 2 for t in range(self.N)]) # [N,]

            # plot data
            data0 = self.X[cluster_pred == 0]  # not-clustered points
            data1 = self.X[cluster_pred == 1]  # cluster1
            data2 = self.X[cluster_pred == 2]  # cluster2

            fig = plt.figure()
            ax = fig.add_subplot(1,1,1)
            ax.plot(data0[:,0], data0[:,1], "k.", alpha=alpha)
            ax.plot(data1[:,0], data1[:,1], "b.", alpha=alpha)
            ax.plot(data2[:,0], data2[:,1], "r.", alpha=alpha)
            plt.plot()

        if regression:
            s1 = self.S[:,0,:]
            s2 = self.S[:,1,:]
            p1 = (self.A[0] @ s1.transpose()).transpose() + self.M[0]
            p2 = (self.A[1] @ s2.transpose()).transpose() + self.M[1]
            # [D,L] @ [L,N] = [D,N] -> transpose -> [N,D]

            plt.plot(p1[:,0], p1[:,1], "b-")
            plt.plot(p2[:,0], p2[:,1], "r-")
        
        plt.show()

    def _plot_latent(self):
        """
        plot latent points of data
        
        if latent_dim = 1 or 2 or 3
        """
        print(f"latent_dim: {self.L}")

        # clustering
        cluster_pred = np.array([1 if self.R[t][0] > 0.5 else 0 if self.R[t][0] == 0.5 else 2 for t in range(self.N)]) # [N,]

        # plot data
        latent0 = self.S[cluster_pred == 0]  # not-clustered points
        latent1 = self.S[cluster_pred == 1]  # cluster1
        latent2 = self.S[cluster_pred == 2]  # cluster2

        fig = plt.figure()
        ax = fig.add_subplot(1,1,1)

        if self.L == 1:
            ax.plot(latent0[:,0], [0]*len(latent0), "k.")
            ax.plot(latent1[:,0], [0]*len(latent1), "b.")
            ax.plot(latent2[:,0], [0]*len(latent2), "r.")
        elif self.L == 2:
            ax.plot(latent0[:,0], latent0[:,1], "k.")
            ax.plot(latent1[:,0], latent1[:,1], "b.")
            ax.plot(latent2[:,0], latent2[:,1], "r.")

        plt.show()

    def _evaluation(self):
        p = (np.argmax(mfa.R) == mfa.Y).astype(float).mean()
        print(f"clustering accuracy: {max(p,1-p)}")



# main.py

## toy example

### data

In [None]:
# transformation
T1 = np.array([[3.0,0],[0,0.5]])
T2 = np.array([[0.5,0],[0,3.0]])

In [None]:
data_toy = np.concatenate([np.random.randn(100,2) @ T1 / 2, np.random.randn(100,2)  @ T2 / 2 + np.array([5,5])], axis=0)
cluster_toy = np.concatenate([np.zeros((100,)), np.ones((100,))], axis=0)

In [None]:
plt.plot(data_toy[:,0], data_toy[:,1], "k.")

### training, plotting

In [None]:
mfa_toy = MFA(data_toy, cluster_toy, latent_dim = 1, num_mix = 2)

In [None]:
mfa_toy.N, mfa_toy.D, mfa_toy.L, mfa_toy.K

In [None]:
mfa_toy.train(epsilon=1e-3, epochs=10)

## Leukemia analysis

### data

In [None]:
leukemia = Leukemia()

In [None]:
data, cluster = leukemia.get_dataset()

In [None]:
data.shape, cluster.shape

### t-test

In [None]:
def t_test(data0, data1):
    t = (data0.mean() - data1.mean()) / (data0.sum()**2/len(data0) + data1.sum()**2/len(data1))**0.5
    return t

In [None]:
data0 = data[cluster == 0]
data1 = data[cluster == 1]

t_stats = np.array([t_test(data0[:,i], data1[:,i]) for i in range(data.shape[1])])  # t-statistic values of features
feat_index =[np.where(t_stats==t)[0][0] for t in np.sort(t_stats)[-100:]]  # index of 100 features of largest t-stats

data = data[:,feat_index]  # only use selected 100 features

### training, plotting

- latent dimesion = 1, 2

In [None]:
mfa = MFA(data, cluster, latent_dim=1, num_mix=2)

In [None]:
mfa.N, mfa.D, mfa.L, mfa.K

In [None]:
mfa.train(epsilon=1e-1, epochs=10, alpha=0.5, plot=False, eval=True)

In [None]:
mfa = MFA(data, cluster, latent_dim=2, num_mix=2)

In [None]:
mfa.N, mfa.D, mfa.L, mfa.K

In [None]:
mfa.train(epsilon=1e-1, epochs=10, alpha=0.5, plot=False, eval=True)