# Factor analysis on chemical abundances spaces

Authors: Yangda Bei, Yuan-Sen Ting

The following notebook acts as an interactive companion to research project (ASC) completed during the Summer session 2022 and contains the code for the latent factor model created for the ASC. Although such a model exists within the ``FactorAnalyzer`` package created by ``scikit-learn``, a factor model was created from scratch following ``Algorithm 21.1`` from Barber's *Bayesian Reasoning and Machine Learning* as an exercise in data analysis using Python and introduction to machine learning models. The notebook provides better readability of the individual steps performed using the algorithm.

Before we begin, let's import all the necessary packages.

In [None]:
%matplotlib inline

# import packages
import numpy as np
import pandas as pd
import seaborn as sns
import csv

from sklearn.decomposition import FactorAnalysis

import matplotlib
import matplotlib.pyplot as plt
import matplotlib.cm as cm

from cycler import cycler

#------------------------------------------------------------------------------------------------------------
# define plot properties

from matplotlib import rcParams
from matplotlib import rc
from matplotlib.colors import hsv_to_rgb
from mpl_toolkits.axes_grid1 import make_axes_locatable

def rgb(r,g,b):
    return (float(r)/256.,float(g)/256.,float(b)/256.)

cb2 = [rgb(0,0,0), rgb(255,127,0), rgb(31,120,180), rgb(51,160,44), rgb(227,26,28), \
       rgb(166,206,227), rgb(253,191,111), rgb(178,223,138), rgb(251,154,153)]

rcParams['figure.figsize'] = (9,8)
rcParams['figure.dpi'] = 300

rcParams['lines.linewidth'] = 1

rcParams['axes.prop_cycle'] = cycler('color', cb2)
rcParams['axes.facecolor'] = 'white'
rcParams['axes.grid'] = False

rcParams['patch.facecolor'] = cb2[0]
rcParams['patch.edgecolor'] = 'white'

rcParams['font.size'] = 10
rcParams['font.weight'] = 300

***

## 1. Creating mock factors and dataset

First, we set our latent factor vectors, $\mathbf{v}\in\mathbb{R}^{H\times D}$. We wish to compare how well these latent factors can be recovered by using the ``FactorAnalyzer`` package and by using our own factor model. The data used will be drawn from ``latentfactors.csv``. It consists of $H=5$ latent factors with $D=25$ dimensions drawn from random for the sake of consistency however there is the option to generate random factors with different values for ``num_latent`` and ``num_dimension``.

Run the first code block for randomly generated latent factors.

Run the second code block to get latent factors from ``latentfactors.csv``.

In [None]:
#----------------------------------------------------
# run for randomly generated latent factors
#----------------------------------------------------

# number of latent factors
num_latent = 5

# number of observed dimensions
num_dimension = 25

# make latent vectors
v = np.random.normal(size=(num_latent, num_dimension))
print(v)
print(v.shape)

In [None]:
#-------------------------------------------------------
# run for latent factors used in report
#-------------------------------------------------------

num_latent = 5

num_dimension = 25

with open("latentfactors.csv") as latent_factors:
    v = np.loadtxt("latentfactors.csv", delimiter = ",")
print(v)
print(v.shape)

Next, we create a mock dataset with $D$ dimensions (features) and $N$ samples of random values drawn from a Gaussian. To do so, we first create our coefficients for our factor loading matrix $\mathbf{\alpha}\in\mathbb{R}^{N\times H}$, then multiply with the $\mathbf{v}$ factors. 

The mock dataset used will be drawn from ``mockdata.csv`` with $D=25$ and $N=10\,000$. Note that for samples of order >$10^4$, memory allocation will become an issue. Future improvements include improving scalability of SVD algorithm (``dask.array.linalg.svd`` is a promising solution).

Run the first code block for randomly generated samples.

Run the second code block to get dataset from ``mockdata.csv``.

In [None]:
#-------------------------------------------------------
# run for randomly generated samples
#-------------------------------------------------------

# number of mock samples
num_sample = 10**4

# make the factor loading matrix (N x H)
alpha = np.random.normal(size=(num_sample, num_latent))

# make mock samples (N x D)
V = np.dot(alpha,v)
print(V.shape)

In [None]:
#-------------------------------------------------------
# run for dataset used in analysis in report
#-------------------------------------------------------

num_sample = 10**4

with open("mockdata.csv") as mock_data:
    V = np.loadtxt("mockdata.csv", delimiter = ",")
print(V.shape)

Now that we have made our dataset with underlying factors, we can visualise the correlation between the $D$ features. 

In [None]:
# reads the dataset into a Pandas dataframe for convenience in
# creating heatmap
data = pd.DataFrame(V)
data.rename(columns={i:i+1 for i in range(num_dimension)}, inplace=True)

In [None]:
# creates correlation heatmap
plt.title("Correlation Heatmap", fontsize = 10)

plt.imshow(data.corr(), cmap = "cividis", interpolation = "none")
plt.xticks(range(len(data.columns)), data.columns)
plt.yticks(range(len(data.columns)), data.columns)

plt.gcf().set_size_inches(6,6)
plt.tick_params(labelsize=7)
cbar = plt.colorbar(shrink = 0.7)
cbar.ax.tick_params(labelsize=7)

# show correlation value of each square
labels = data.corr().values
for y in range(labels.shape[0]):
    for x in range(labels.shape[1]):
        plt.text(x,y, "{:.2f}".format(labels[y,x]), ha = "center", 
                 va="center", color = "white", fontsize = 3)

***

## 2. Factor analysis using ``FactorAnalyzer`` by ``scikit-learn``

The following code blocks generate an exploratory factor observation of the underlying latent factors of $\mathbf{V}$ using the ``FactorAnalyzer`` package after specifying the number of components wished to be extracted. 

First, we must add some noise in order to properly define the likelihood of our data. We map two dimensions of the mockdata after noise has been added.

In [None]:
# add noise 
V_noise = V+0.01*np.random.normal(size=V.shape)

# show two of the dimensions
plt.hexbin(V_noise[:,0], V_noise[:,1])

For *confirmatory* factor analysis, we already know the number of latent factors and are just *confirming* that they are there in the dataset. For *exploratory* factor analysis on the other hand, we need to determine the number of compenents that are hidden before we can call ``.fit_transform()``. Since we made up the number of latent factors, we will just determine the number of factors as a check.

To find out how many factors we can extract, we look to the eigenvalues of the correlation matrix. These eigenvalues tell us how much variance of the variables a factor explains. Therefore, an eigenvalue that is $>1$ means the factor that corresponds to it explains the variance of more than one variable.

In [None]:
# convert correlation DataFrame made in section 1 to an
# ndarray
correlation = data.corr()
correlation.to_numpy

# extract eigenvalues and sort them in descending order
evalues,_ = np.linalg.eig(correlation)
evalues[::-1].sort()

#----------------------------------------------------------

# scree plot
plt.plot(range(1, correlation.shape[1] + 1),evalues, 'o-')

plt.ylabel("Eigenvalue")
plt.xlabel("Factor")

plt.show()

We can see that the number of factors matches the value of ``num_latent``! Now, we can call ``.fit_transform()`` on the dataset to transform all the features using its mean and variance.

In [None]:
# perform latent decomposition using sklearn package
fa = FactorAnalysis(n_components=num_latent, noise_variance_init=0.01*np.ones(num_dimension))
v_transformed = fa.fit_transform(V_noise)

print(v_transformed)

The factors are then extracted using ``.components`` and reordered by how close they are to the true latent factors. The components are then compared to the true latent factors.

In [None]:
# extracts components
fa_components = fa.components_
print(fa_components)

In [None]:
# reorders the latent vectors
ind_best = np.zeros(num_latent)

for i in range(num_latent):
    ind_temp = -999
    diff_temp = np.inf
    
    for j in range(num_latent):
        diff = np.sum((v[i,:] - fa_components[j,:])**2)
        if diff < diff_temp:
            diff_temp = diff
            ind_temp = j
            
    ind_best[i] = ind_temp

print(ind_best)

In [None]:
# creates plot comparing true latent factors and package fit
fig, axs = plt.subplots(nrows=5, ncols=1, sharex=True, sharey=True, figsize=(10, 6))

fig.text(0.5, 0.06, r"Index of $v_i$", ha='center')
fig.text(0.06, 0.5, r"Value of $v_i$", va='center', rotation='vertical')

fig.suptitle("Comparison of true factors and package fit", y = 0.92)

# creates subplots
for i in range(num_latent):

    axs[i].plot(v[i,:], label = "Latent factor", ls = "--", )
    axs[i].plot(fa_components[int(ind_best[i]),:], label = r"$\mathtt{FactorAnalyzer}$ fit")
    
plt.legend()
plt.legend(loc='center left', bbox_to_anchor=(1, 2.9), prop={'size': 8})

plt.show()

Note that some factors may be deemed to be the best fit for more than one extracted component.

Lastly, the noise estimated by ``FactorAnalyzer`` is evaluated.

In [None]:
# estimated noise
print(fa.noise_variance_)

***

## 3. Factor analysis from scratch

Next, we build our factor model based on ``Algorithm 21.1`` from Barber's *Bayesian Reasoning and Machine Learning* (2020).

The factor analysis model generates an observation $\mathbf{v}$ of a given dataset $\mathbf{V} = \{\mathbf{v}^1,\dots,\mathbf{v}^N\}$ according to 

$$\mathbf{V} = \mathbf{Fh} + \mathbf{M} + \mathbf{\varepsilon}.$$

Here, we use slightly different notation from Section 2. $\mathbf{V}\in \mathbb{R}^{D\times N}$ is our output observation. $\mathbf{F}\in\mathbb{R}^{D\times H}$ is our factor matrix that contains the "weights" of each factor. $\mathbf{h}\in\mathbb{R}^{H\times N}$ is our factor loading matrix. $\mathbf{M}\in \mathbb{R}^{D\times N}$ is the constant bias which is the mean in this instance.

**Algorithm 21.1**

> 1. Initialise diagonal noise $\mathbf{\Psi}$.

We initialise our noise by setting $\mathbf{\Psi}=\text{diag}(\psi_1,\dots,\psi_D)$.

In [None]:
# 1. Initialise diagonal noise
noise = 0.01*np.random.normal(size=(num_dimension,num_dimension))
psi = np.diag(np.diag(np.cov(noise)))

> 2. Find the mean $\mathbf{\bar v}$ of the data $\mathbf{v}^1,\dots, \mathbf{v}^N$.

This is our constant bias that we use to "shift" our coordinate system to better fit the data. The mean matrix is set to be ``M`` in the code.

In [None]:
# 2. Find the mean of the data
result = []
for vector in V:
    result.append([sum(vector)/len(vector)])

M = np.array(result)
print(M)

> 3. Find the variance $\sigma_i^2$ for each component $i$ of the data $v_i^1,\dots, v_i^N$

In [None]:
#3. Find the variance
var = np.var(V, axis=0)
print(var)

> 4. Compute the centred matrix $\mathbf{X} = \mathbf{V} - \mathbf{M}$.

In [None]:
# 4. Compute the centred matrix
X = V - M
print(X)

> 5. **while** Likelihood not converged or termination criterion not reached **do**

In [None]:
# initial constants and variables
L_old = -np.infty
likelihoods = []
    
# error tolerance
tol = 1e-4

# max iterations before termination
max_iter = 100 

In [None]:
# 5. While loop runs if error is greater than tolerance or until max_iter
for i in range(max_iter):
    
    #----------------------------------------------------------------------------
    # 6. Form the scaled data matrix
    X_tilde = np.dot(np.linalg.inv(np.sqrt(psi)), X.T / (num_sample ** 0.5))
    #----------------------------------------------------------------------------
    # 7. Perform SVD for the scaled data matrix
    U,Lambda_tilde,WT = np.linalg.svd(X_tilde)
    Lambda_tilde = np.diag(Lambda_tilde)
    Lambda = Lambda_tilde ** 2
    #----------------------------------------------------------------------------
    # 8. Set U_H to the first H columns of U and set Lambda_H to contain the 
    #    first H diagonal entries of Lambda
    H = num_latent
    U_H = U[:,:H]
    Lambda_H = Lambda_tilde[:H,:H]
    #----------------------------------------------------------------------------
    # 9. Factor update
    #if i == 0:
        #F = np.copy(v.T)
    #else:
    F = np.sqrt(psi) @ U_H @ np.sqrt(Lambda_H - np.identity(H))
    #----------------------------------------------------------------------------
    # 10. Log likelihood
    a = 0
    for j in range(H):
        a += np.log(Lambda[j,j])
    b = 0
    for k in range(H+1,num_dimension):
        b += Lambda[k,k]
    
    L_new = (num_dimension/2)*(a + H + b + np.log(np.linalg.det(2*np.pi*psi)))
    likelihoods.append(L_new)
    #----------------------------------------------------------------------------
    # 11. Psi update
    psi = np.maximum(np.diag(var)-np.diag(F@F.T),1e-12)
    #----------------------------------------------------------------------------
    print("Iteration: %d" % (i+1))
    print("Loglikehood: ", L_new)
        
    if np.abs(L_new-L_old)<tol:
        break
    
    # log likelihood update
    L_old = L_new

> 12. **end while**

The following plot shows how well the model fits:

In [None]:
# creates plot comparing true latent factors and model fit
fig, axs = plt.subplots(nrows=5, ncols=1, sharex=True, sharey=True, figsize=(10, 6))

fig.text(0.5, 0.06, r"Index of $v_i$", ha='center')
fig.text(0.06, 0.5, r"Value of $v_i$", va='center', rotation='vertical')

fig.suptitle("Comparison of true factors and model fit", y = 0.92)

for i in range(num_latent):

    # creates subplots
    axs[i].plot(v[i,:], label = "Latent factor", ls = "--", )
    axs[i].plot(F.T[int(ind_best[i]),:], label = "Model fit")
    
plt.legend()
plt.legend(loc='center left', bbox_to_anchor=(1, 2.9), prop={'size': 8})

plt.show()

The following plot shows the comparison between ``FactorAnalyzer`` and the model:

In [None]:
# creates plot comparing package fit and model fit
fig, axs = plt.subplots(nrows=5, ncols=1, sharex=True, sharey=True, figsize=(10, 6))

fig.text(0.5, 0.06, r"Index of $v_i$", ha='center')
fig.text(0.06, 0.5, r"Value of $v_i$", va='center', rotation='vertical')

fig.suptitle("Comparison of package fit and model fit", y = 0.92)

# creates subplots
for i in range(num_latent):

    axs[i].plot(fa_components[int(ind_best[i]),:], label = r"$\mathtt{FactorAnalyzer}$ fit")
    axs[i].plot(F.T[i,:], label = "Model fit")
    
plt.legend()
plt.legend(loc='center left', bbox_to_anchor=(1, 2.9), prop={'size': 8})

plt.show()