In [1]:
import numpy as np

In [2]:
def mySweep(A, m):
    """
    Perform a SWEEP operation on A with the pivot element A[m,m].

    :param A: a square matrix.
    :param m: the pivot element is A[m, m].
    :returns a swept matrix. Original matrix is unchanged.
    """

    ## No need to change anything here
    B = np.copy(A)
    n = B.shape[0]
    for k in range(m):
        for i in range(n):
            for j in range(n):
                if i!=k and j!=k:
                    B[i,j] = B[i,j] - B[i,k]*B[k,j] / B[k,k]
        for i in range(n):
            if i!=k:
                 B[i,k] = B[i,k] / B[k,k]
        for j in range(n):
            if j!=k:
                B[k,j] = B[k,j] / B[k,k]
        B[k,k] = -1/B[k,k]

    return(B)

In [16]:
def factorAnalysis(n = 10, p = 5, d = 2, sigma = 1, nIter = 1000):

    """
    Perform factor analysis on simulated data.
    Simulate data X from the factor analysis model, e.g.
    X = Z_true * W.T + epsilon
    where W_true is p * d loading matrix (numpy array), Z_true is a n * d matrix
    (numpy array) of latent factors (assumed normal(0, I)), and epsilon is iid
    normal(0, sigma^2) noise. You can assume that W_true is normal(0, I)

    :param n: Sample size.
    :param p: Number of variables
    :param d: Number of latent factors
    :param sigma: Standard deviation of noise
    :param nIter: Number of iterations
    """

    ## FILL CODE HERE
    W_true = np.random.standard_normal(size=(p, d))
    Z_true = np.random.standard_normal(size=(d, n))
    epsilon = np.random.standard_normal(size=(p, n)) * sigma
    X = W_true.dot(Z_true) + epsilon

    sq = 1.
    XX = X.dot(X.T)
    w = np.random.standard_normal(size=(p, d)) * 0.1

    for it in range(nIter):
        A = np.vstack((
            np.hstack((w.T.dot(w) / sq + np.identity(d), w.T / sq)),
            np.hstack((w / sq, np.identity(p)))
        ))
        AS = mySweep(A, d)
        alpha = AS[:d, d:(d + p + 1)]
        D = -AS[:d, :d]
        Zh = alpha.dot(X)
        ZZ = Zh.dot(Zh.T) + D * n

        B = np.vstack((
            np.hstack((ZZ, Zh.dot(X.T))),
            np.hstack((X.dot(Zh.T), XX))
        ))
        BS = mySweep(B, d)
        w = BS[:d, d : (d + p + 1)].T
        sq = np.mean(np.diagonal(BS[d : (d + p + 1), d : (d + p + 1)])) / n
        
    ## Return the p * d np.array w, the estimate of the loading matrix
    return(w)


In [17]:
factorAnalysis()

array([[ 0.0600283 , -0.93915474],
       [ 1.44209374,  0.95141248],
       [-0.29164922, -0.4898029 ],
       [-0.65040592,  0.54617001],
       [-0.28447444,  0.1911135 ]])

In [20]:
def matrixCompletion(n = 200, p = 100, d = 3, sigma = 0.1, nIter = 100,
                     prob = 0.2, lam = 0.1):
   
    """
    Perform matrix completion on simulated data.
    Simulate data X from the factor analysis model, e.g. 
    X = Z_true * W.T + epsilon
    where W_true is p * d loading matrix (numpy array), Z_true is a n * d matrix 
    (numpy array) of latent factors (assumed normal(0, I)), and epsilon is iid 
    normal(0, sigma^2) noise. You can assume that W_true is normal(0, I)
    
    :param n: Sample size.
    :param p: Number of variables
    :param d: Number of latent factors
    :param sigma: Standard deviation of noise
    :param nIter: Number of iterations
    :param prob: Probability that an entry of the matrix X is not missing
    :param lam: Regularization parameter
    """

    ## FILL CODE HERE
    W_true = np.random.standard_normal(size=(p,d))
    Z_true = np.random.standard_normal(size=(d,n))
    epsilon = np.random.standard_normal(size=(p,n)) * sigma
    
    X = W_true.dot(Z_true) + epsilon
    R = np.random.uniform(size=(p,n)) < prob

    W = np.random.standard_normal(size=(p,d)) * 0.1
    Z = np.random.standard_normal(size=(d,n)) * 0.1
    
    for it in range(nIter):
        for i in range(n):
            WW = (W.T).dot(np.diag(R[:,i])).dot(W) + lam * np.identity(d)
            WX = (W.T).dot(np.diag(R[:,i])).dot(X[:,i])

            A = np.vstack((
                np.column_stack((WW, WX)),
                np.hstack((WX.T, 0)),
            ))
            AS = mySweep(A, d)
            Z[:,i] = AS[:d, d]

        for j in range(p):
            ZZ = Z.dot(np.diag(R[j,:])).dot(Z.T) + lam * np.identity(d)
            ZX = Z.dot(np.diag(R[j,:])).dot(X[j,:])
            
            B = np.vstack((
                np.column_stack((ZZ, ZX)),
                np.hstack((ZX.T, 0))
            ))
            BS = mySweep(B, d)
            W[j,:] = BS[:d, d]

        sd1 = np.sqrt(np.sum(R * np.square(X-W.dot(Z))) / np.sum(R))
        sd0 = np.sqrt(np.sum((1.0 - R) * np.square(X - W.dot(Z))) / np.sum(1.0 - R))
        
    ## Return estimates of Z and W (both numpy arrays)
    return Z, W

In [21]:
matrixCompletion()
print("END")

(1.482478171819805, 1.9041148376323171)
(0.85556718689560662, 1.2357489242460962)
(0.37857168408568348, 0.53104312509357898)
(0.13812118370968229, 0.23028491211213875)
(0.10808313321376473, 0.16766206053321639)
(0.10073237298522733, 0.14924710197802107)
(0.098067079261694459, 0.14208638162699008)
(0.09666664255189629, 0.13824998540117825)
(0.095729041555431049, 0.13564741294800733)
(0.095018466492189368, 0.1336381269227423)
(0.094446254566525348, 0.13198539787618771)
(0.09397037919556657, 0.13058197084599316)
(0.093566891123140455, 0.1293689704285943)
(0.093220274015829996, 0.12830875961708796)
(0.092919597484214786, 0.12737463681482672)
(0.092656734251972711, 0.12654638479348465)
(0.092425421655903922, 0.12580807803141211)
(0.092220713746949207, 0.12514685844960302)
(0.092038634371276809, 0.12455217390607295)
(0.0918759434560079, 0.12401526322858868)
(0.091729972061455264, 0.12352878661480879)
(0.091598501857243539, 0.12308654941484953)
(0.091479674735217828, 0.12268328998935592)
(0.0