In [1]:
import numpy as np
from scipy import linalg, stats
from sklearn import linear_model
from sklearn import decomposition
import time
import itertools

## Simulating Additive Effects and Higher Rank Matrices

In [2]:
class tools:
    def outer(mat, weights, skip = None):
        (m, k) = mat.shape
        ret = np.zeros(shape = (m, m))
        
        for i in range(k):
            for j in range(k):
                if i == j:
                    continue
                    
                if i == skip or j == skip:
                    continue
                    
                u = mat[:, i]
                v = mat[:, j]
                ret += weights[i, j] * np.outer(u, v)
                
        return ret

In [3]:
class dataset:
    def __init__(self, n, m, k, h2, sbeta, somega):
        self.n = n 
        self.m = m 
        self.k = k 
        self.h2 = h2
        self.sbeta = sbeta
        self.somega = somega

        self.simGeno()
        self.simEffects()
        self.simPheno()

    def simGeno(self):
        # genotypes are iid binom(2, p) where p normal
        # genotypes are scaled and centered
        
        geno = np.zeros([self.n, self.m])
        for i in range(self.m):
            p = np.random.beta(2, 2)
            snps = np.random.binomial(2, p, self.n)
            geno.T[i] = (snps - (2*p))/np.sqrt(2*p*(1-p))
        
        # interaction effects as khatri rao
        inter = linalg.khatri_rao(geno.T, geno.T).T
        self.geno, self.inter = geno, inter

    def simEffects(self):
        m = self.m
        k = self.k
        
        # simulated latent pathways
        pathways = np.random.normal(0, 1, m * k).reshape(m, -1) 
        
        # main effects as sum of pathways
        beta = np.sum(pathways, axis=1, keepdims=True)
                
        # currently generate weights from normal (0, 1)
        #weights = np.random.normal(0, 1, k * k).reshape(k, -1) 
        
        weights = np.ones(k*k).reshape(k, -1)
        weights = np.tril(weights, -1) + np.tril(weights, -1).T
        
        # simulate interaction matrix by summing over weighted outerproducts
        omega = tools.outer(pathways, weights)
                
        # adding gaussian noise to interaction effects
        eomega = np.random.normal(0, self.somega, m * m).reshape(m, -1) 
        eomega = np.tril(eomega) + np.tril(eomega, -1).T
        
        # adding gaussian noise to main effects
        ebeta = np.random.normal(0, self.sbeta, m).reshape(-1, 1)
        
        # instance variables
        self.pathways = pathways
        self.beta = beta + ebeta
        self.omegaMat = omega + eomega
        self.omega = self.omegaMat.reshape(-1, 1)
        self.omegaWeight = weights
    
    def simPheno(self):
        # model with main effects and interactions
        mean = self.inter @ self.omega + self.geno @ self.beta
        
        # add noise to simulate heritability
        var = np.var(mean) * (1 - self.h2)/self.h2
        sd = np.sqrt(var)
        noise = np.random.normal(0, sd, self.n).reshape(-1, 1)
        self.pheno = mean + noise

In [8]:
class decomp:
    def __init__(self):
        self.data = None
    def simData(self, n, m, k = 2, h2 = 0.9, sbeta = 0, somega = 0):
        self.data = dataset(n, m, k, h2, sbeta, somega)
        self.k = k
    
    def directDecomp(self):
        m = self.data.m
        k = self.k
        
        G = self.data.geno
        inter = self.data.inter
        Y = self.data.pheno#.reshape(-1, 1)
        
        thresh = 0.0001
                
        #weights = np.random.normal(0, 1, k * k).reshape(k, -1) 
        
        # initialize pathways and weights
        weights = np.ones(k*k).reshape(k, -1)
        weights = np.tril(weights, -1) + np.tril(weights, -1).T
        pathways = np.random.normal(0, 1, m * k).reshape(m, k)
        
        iterations = 0
        
        while(True):
            pathwaysPrev = np.copy(pathways)
            for i in range(k):
                # compute constants
                C1 = tools.outer(pathways, weights, skip=i).reshape(-1, 1)
                C1 = inter @ C1
                C2 = np.sum(G @ pathways @ np.diagflat(weights[i]), axis=1, keepdims=True) 
                C = C1 + C2
                A = np.sum(G @ pathways @ np.diagflat(weights[i]), axis=1, keepdims=True)
                
                # equations from the first order conditions
                u1 = (4 * G.T @ (A*A*G)) + (4 * G.T @ (A*G)) + (G.T @ G)
                u2 = (2 * (A*G).T @ Y) + (G.T @ Y) - (2 * (A*G).T @ C) - (G.T @ C)
                u = linalg.inv(u1) @ u2
                pathways[:, i] = u.reshape(-1,)
                
            # monitor convergence
            iterations += 1
            if iterations % 100 == 0: 
                print(iterations,
                      self.getLoss(pathways, weights),
                      linalg.norm(pathways -  pathwaysPrev))
            if linalg.norm(pathways -  pathwaysPrev) < thresh: break
            
                
                
        self.weights = weights
        self.pathways = pathways                
    
    def getLoss(self, pathways, weights):
        G = self.data.geno
        Y = self.data.pheno
        inter = self.data.inter
        
        interEffect = tools.outer(pathways, weights).reshape(-1, 1)
        interEffect = inter @ interEffect
        mainEffect = np.sum(G @ pathways, axis=1, keepdims=True)
        loss = linalg.norm(Y - mainEffect - interEffect)/linalg.norm(Y)
        return(loss)
    
    def evalAcc(self):
        maxCorr = 0
        
        groundTruth = self.data.pathways.reshape(-1,)
        for perm in itertools.permutations(range(self.k)):
            prediction = self.pathways[:,perm].reshape(-1,)
            r = stats.pearsonr(prediction, groundTruth)[0]
            rsquared = r ** 2
            if rsquared > maxCorr: maxCorr = rsquared
        
        return(maxCorr)
            
                

In [None]:
test = decomp()
test.simData(100000, 10, k=3, h2=0.999)
test.directDecomp()

test.evalAcc()



100 0.028852380644061325 0.00869082638427985


## Scratch work

In [9]:
n =2
m = 5
k = 3

pathways = np.random.normal(0, 1, m * k).reshape(m, -1) 
beta = np.sum(pathways, axis=1, keepdims=True)

omega = np.zeros(shape = (m, m))

weights = np.random.normal(0, 1, k * k).reshape(k, -1) 
weights = np.tril(weights, -1) + np.tril(weights, -1).T

eomega = np.random.normal(0, 1, m * m).reshape(m, -1) 
eomega = np.tril(eomega) + np.tril(eomega, -1).T

ebeta = np.random.normal(0, 1, m).reshape(-1, 1)


for i in range(k):
    for j in range(k):
        if i == j:
            continue
        
        u = pathways[:, i]
        v = pathways[:, j]
        omega += weights[i, j] * np.outer(u, v)

#print(omega)

#print(tools.outer(pathways, weights, skip=0))
#print(tools.outer(pathways, weights))

        
geno = np.random.normal(0, 1, n*m).reshape(n, -1)    
inter = linalg.khatri_rao(geno.T, geno.T).T

c = tools.outer(pathways, weights, skip = 0).reshape(-1, 1)
        
(inter @ c)

pathways[:,i] = np.zeros(shape = 5)
pathways

array([[-0.71441183,  1.31916775,  0.        ],
       [-0.90898988, -1.02426831,  0.        ],
       [ 0.08962613, -0.15611624,  0.        ],
       [ 0.24102687,  1.26765401,  0.        ],
       [-0.853715  ,  0.97542689,  0.        ]])

## Rank two factorization without additive effects

In [87]:
class dataset:
    def __init__(self, n, m, k = 1, h2 = 0.9, noise = 0):
        self.n = n 
        self.m = m 
        self.k = k 
        self.h2 = h2
        self.noise = noise

        self.simGeno()
        self.simEffects()
        self.simPheno()

    def simGeno(self):
        geno = np.zeros([self.n, self.m])
        for i in range(self.m):
            p = np.random.beta(2, 2)
            snps = np.random.binomial(2, p, self.n)
            geno.T[i] = (snps - (2*p))/np.sqrt(2*p*(1-p))

        inter = linalg.khatri_rao(geno.T, geno.T).T
        self.geno, self.inter = geno, inter

    def simEffects(self):
        u = np.random.normal(0, 1, self.m * self.k).reshape(self.m, -1) 
        v = np.random.normal(0, 1, self.m * self.k).reshape(self.m, -1) 
        noise = np.random.normal(0, self.noise, self.m ** 2).reshape(self.m, -1) 
        noise = np.tril(noise) + np.tril(noise, -1).T
        omega = noise + u @ v.T + v @ u.T 

        self.uv = np.column_stack((u, v)) 
        self.vu = np.column_stack((v, u)) 

        self.u = u 
        self.v = v 
        self.omegaMat = omega
        self.omega = omega.reshape(1, -1)[0]
        
    
    def simPheno(self):
        mean = np.matmul(self.inter, self.omega)
        var = np.var(mean) * (1 - self.h2)/self.h2
        sd = np.sqrt(var)
        noise = np.random.normal(0, sd, self.n)
        self.pheno = mean + noise
        
class decomp:
    def __init__(self):
        self.data = None
    def simData(self, n, m, k = 1, h2 = 0.9, noise = 0):
        self.data = dataset(n, m, k = k, h2 = h2, noise = noise)
    def fitMarginal(self):
        lm = linear_model.LinearRegression(fit_intercept=True)
        omegaHat = list()
        for i in range(self.data.m):
            for j in range(self.data.m):
                interIndex = i * self.data.m + j
                maini = self.data.geno.T[i]
                mainj = self.data.geno.T[j]
                interij = self.data.inter.T[interIndex]
                
                regress = np.vstack((interij, maini, mainj)).T
                
                lm.fit(regress, self.data.pheno)
                omegaHat.append(lm.coef_[0])

        omegaHat = np.array(omegaHat)
        omegaHatMat = omegaHat.reshape(self.data.m, -1)

        self.omegaHat, self.omegaHatMat = omegaHat, omegaHatMat

    def fitRidge(self):
        lm = linear_model.Ridge()
        lm.fit(self.data.inter, self.data.pheno)
        omegaHat = lm.coef_
        omegaHatMat = omegaHat.reshape(self.data.m, -1)

        self.omegaHat, self.omegaHatMat = omegaHat, omegaHatMat



    def fitSVD(self):
        rank = self.data.k * 2
        lm = linear_model.LinearRegression(fit_intercept=True)

        A, singular, B = linalg.svd(self.omegaHatMat)

        A = A[:, :rank]
        singular = singular[:rank]
        B = B[:, :rank]


        lm.fit(self.data.uv, A)
        transform = lm.coef_.T
        
        self.u = (A @ linalg.inv(transform))[:,:self.data.k]
        self.v = (A @ linalg.inv(transform))[:,self.data.k:]

    def symmetricDecomp(self):
        m = self.data.m
        
        thresh = 0.0001
                
        u = np.random.rand(m, 1)
        v = np.random.rand(m, 1)
        
        uprev = np.copy(u)
        vprev = np.copy(v)
        
        i = 0
        
        while(True):
            if i % 2 == 0:
                u = linalg.inv(v @ v.T + (v.T @ v)*np.eye(m)) @ self.omegaHatMat @ v
            else:
                v = linalg.inv(u @ u.T + (u.T @ u)*np.eye(m)) @ self.omegaHatMat @ u
            
            udiff = linalg.norm(u - uprev)
            vdiff = linalg.norm(v - vprev)
            
            if udiff < thresh and vdiff < thresh:
                break
            else:
                uprev = np.copy(u)
                vprev = np.copy(v)
            i += 1

        self.u = u
        self.v = v   
        
    def directDecomp(self):
        m = self.data.m
        
        G = self.data.geno
        Y = self.data.pheno.reshape(-1, 1)
        
        thresh = 0.0001
                
        u = np.random.rand(m, 1)
        v = np.random.rand(m, 1)
        
        uprev = np.copy(u)
        vprev = np.copy(v)
        
        i = 0
        
        while(True):
            if i % 2 == 0:
                A = (G @ v)
                u = 1/2 * linalg.inv((G.T) @ (A * A * G)) @ G.T @ (A * Y)
            else:
                A = (G @ u)
                v = 1/2 * linalg.inv((G.T) @ (A * A * G)) @ G.T @ (A * Y)
            
            udiff = linalg.norm(u - uprev)
            vdiff = linalg.norm(v - vprev)
            
            if udiff < thresh and vdiff < thresh:
                break
            else:
                uprev = np.copy(u)
                vprev = np.copy(v)
            i += 1
            om = (u @ v.T + v @ u.T).reshape(-1, 1)
            loss = linalg.norm(Y - self.data.inter @ om)
            print(i, loss)

        self.u = u
        self.v = v   

    def evalNorm(self):
        
        trueU = self.data.u.reshape(1, -1)[0]
        trueV = self.data.v.reshape(1, -1)[0]
        
        estU = self.u.reshape(1, -1)[0]
        estV = self.v.reshape(1, -1)[0]
        
        original = ("original", 
                    stats.pearsonr(estU, trueU)[0] ** 2,
                    stats.pearsonr(estV, trueV)[0] ** 2)
        switched = ("switched", 
                    stats.pearsonr(estV, trueU)[0] ** 2, 
                    stats.pearsonr(estU, trueV)[0] ** 2)
        
        
        if original[1] + original[2] > switched[1] + switched[2]:
            return original
        else: 
            return switched


In [93]:
test = decomp()
test.simData(100000, 10)
test.directDecomp()
test.evalNorm()

1 6727.048615743148
2 4826.95509297564
3 3707.44605657285
4 2974.4999576065143
5 2330.520507013332
6 2126.3751703435196
7 2095.119766302487
8 2092.102691820174
9 2091.8328371521593
10 2091.808915209902
11 2091.806789220403
12 2091.806599234845


('original', 0.9999924332681807, 0.9999925693263748)