In [17]:
import numpy as np
import pandas as pd

from sklearn.preprocessing import StandardScaler

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from sas7bdat import SAS7BDAT
import os
import os.path as osp
import sys
import seaborn as sns
from sklearn.decomposition import PCA
from scipy.stats import multivariate_normal, chi2
from scipy.integrate import quad

# Define EM, Likelihood, TPR, etc
def lrt(full_ll, reduced_ll, dof):
    
    """
    Calculates likelihood ratio test statistic and p value for the two input loglikes
    
    Args:
    - full_ll: float, full model log-likelihood
    - reduced_ll: float, reduced model loglikelihood
    - dof: degrees of freedom (n_full_params - n_reduced_params)
    
    
    Returns:
    - lrt_stat: float, LRT test statistic
    - p_value: float, unadjusted p-value for test-statistic
    """
    
    lrt_stat = -2 * (reduced_ll - full_ll)
    p_value = chi2.sf(lrt_stat, df=dof)
    
    return lrt_stat, p_value

def calc_tpr_fpr(lrt_results, threshold=0.05, solutions = solutions, FDR_BH=False):
    """
    Takes in Likelihood Ratio Test result tuples, and provides FPR and TPR based on threshold and FDR control
    
    """
    FP = 0
    TP = 0
    TN = 0
    FN = 0
    
    #Benjamini-Hochberg Procedure for FDR control
    if FDR_BH:
        
        pvalues = np.array([t[2] for t in lrt_results])
        genes = np.array([t[0] for t in lrt_results])
        
        m = len(pvalues)
        
        sorted_indices = np.argsort(pvalues)
        sorted_pvalues = pvalues[sorted_indices]
        sorted_genes = genes[sorted_indices]
        
        for i in range(m):
            corrected_alpha = threshold * ((i + 1) / m)
            
            if sorted_genes[i] in solutions and sorted_pvalues[i] < corrected_alpha:
                TP += 1
            elif sorted_genes[i] in solutions and sorted_pvalues[i] >= corrected_alpha:
                FN += 1
            elif sorted_genes[i] not in solutions and sorted_pvalues[i] < corrected_alpha:
                FP += 1
            elif sorted_genes[i] not in solutions and sorted_pvalues[i] >= corrected_alpha:
                TN += 1
    
    #Just compare against set alpha
    else:
        for i, result in enumerate(lrt_results):

            if result[0] in solutions and result[2] < threshold:
                TP += 1
            elif result[0] in solutions and result[2] >= threshold:
                FN += 1
            elif result[0] not in solutions and result[2] < threshold:
                FP += 1
            elif result[0] not in solutions and result[2] >= threshold:
                TN += 1

    FPR = (FP)/(FP+TN)
    TPR = (TP)/(TP+FN)
    
    
    return TPR, FPR

def likelihood_function_fix(y, X_fixed, beta, sigma2, alpha=2):
    """
    Calculates the log-likelihood of a linear fixed effects model.
    
    Args:
    - y: numpy array of shape (n,) (response variable)
    - X_fixed: numpy array of shape (n,p), the design matrix for fixed effects
    - 
    - beta: nump array of shape (p,), the fixed-effect coefficients
    - sigma2: float, error variance
    
    Returns:
    - loglik: float, the log-likelihood of the model given the input parameters
    """
    #num obs
    n = y.shape[0]
    
    
    #fixed effects (matrix mult of coeffs across fixed effects)
    mu = X_fixed.dot(beta)
    
    
    #total linear predictor
    eta = mu 
    
    #Expected value of complete data log-likelihood
    
    loglik = multivariate_normal.logpdf(y - eta, mean=np.zeros(n), cov=(sigma2*np.eye(n) + 1e-8*np.eye(n))).sum()
    
    #regularization penalty
    reg_pen = alpha * np.sum(beta**2)
    
    loglik -= reg_pen
    
    
    return loglik


def EM_algorithm_fix(y, X_fixed, init_beta, init_sigma2, max_iter=200, tol=1e-7):
    """
    Runs the EM algorithm to estimate the parameters of a linear mixed effects model
    
    Args:
    - y: numpy array of shape (n,) (response variable)
    - X_fixed: numpy array of shape (n,p), the design matrix for fixed effects
    - 
    - init_beta: nump array of shape (p,), initial_estimate of the fixed-effect coefficients
    - init_sigma2: float, error variance initial estimate
    - max_iter: int, maximum iterations allowed before deciding the model won't converge
    - tol: float, tolerance for convergence (if diff in improvement is too small, consider it converged)
    
    Returns:
    - beta_hat: numpy array of shape (p,), estimate of fixed-effect coeffs
    - sigma2_hat: float, the estimate of the error variance
    """
    #init params
    beta = init_beta
    sigma2 = init_sigma2
    
    #Init loglike and convergence criteria
    loglik = -np.inf
    loglik_old = -np.inf
    conv = False
    iter_count = 0
    
    
    #Begin the E step -> M step loop
    while not(conv) and (iter_count < max_iter):
        
        # Big E Step
        eta = X_fixed.dot(beta)
        
        loglik = likelihood_function_fix(y=y, X_fixed=X_fixed, beta=beta, sigma2=sigma2)
        
        
        #And for the M step
        beta_new = np.linalg.inv(X_fixed.T.dot(X_fixed)).dot(X_fixed.T).dot(y)
        
        sigma2_new = np.mean((y - eta)**2)
        
        #update params
        beta = beta_new
        
        sigma2 = sigma2_new
        
        #Check convergence
        if np.abs(loglik - loglik_old) < tol:
            conv = True
            print(f"                          Model converged on iteration {iter_count}", end = "\r")
        
        loglik_old = loglik
        iter_count += 1
        
    
    return beta, sigma2

In [27]:
import numpy as np
import pandas as pd
from sklearn.preprocessing import StandardScaler

# Set random seed for reproducibility
np.random.seed(123)

# Define number of observations and fixed effect variables
n_obs = 1000
n_vars = 5

# Generate fixed effect variables
x = np.random.normal(size=(n_obs, n_vars))

# Generate random coefficients for fixed effect variables
beta = np.random.normal(size=n_vars)

# Randomly select some variables to have coefficient of 0
zero_vars = np.random.choice(n_vars, size=int(n_vars/2), replace=False)
beta[zero_vars] = 0

# Generate error term
error = np.random.normal(scale=0.5, size=n_obs)

# Generate response variable
y = np.dot(x, beta) + error

# Permute response variable
y_perm = np.random.permutation(y)

# Normalize fixed effect variables
scaler = StandardScaler()
x_norm = scaler.fit_transform(x)

# Combine into a dataframe
df = pd.DataFrame(np.hstack((y_perm.reshape(-1, 1), x_norm)), columns=['y'] + [f'x{i+1}' for i in range(n_vars)])

# Get solution indices of true model
solutions = np.where(beta != 0)

varlist = np.array(df.drop(['y'], axis=1).columns)
varlist
solutions = set(varlist[solutions])
solutions

{'x2', 'x4', 'x5'}

In [None]:

nperms = 10
first_pass = True
sigma2 = 0
roc_data = [[0],[0]]
lrt_results_lst = []

for perm in range(nperms):
    y_perm = np.random.permutation(y)
    
    df = pd.DataFrame(np.hstack((y_perm.reshape(-1, 1), x_norm)), columns=['y'] + [f'x{i+1}' for i in range(n_vars)])
    

    fixed_ = df.drop(['y'], axis = 1).to_numpy()
    y = df['y'].to_numpy()
    
    p = fixed_.shape[1]
    
    #Recycle any params? probably no
    if first_pass:
        sigma2 = np.var(y)
        beta = np.zeros(p)
        first_pass = False
        
    else:
        sigma2 = np.var(y)
        beta = np.zeros(p)

    beta, sigma2 = EM_algorithm_fix(y, fixed_, beta, sigma2, max_iter=200, tol=1e-7)
    
    print(f"Permuation {perm+1}:")
    
    full_ll = likelihood_function_fix(y, fixed_, beta, sigma2)
    
    lrt_results = []

    for j, var in enumerate(varlist):
        
        reduced_fixed = df.drop([var], axis =1).to_numpy()
        
        p_r = reduced_fixed.shape[1]
        
        reduced_beta = np.zeros(p_r)
        
        r_sigma2 = np.var(y)
        
        r_beta, r_sigma2 = EM_algorithm_fix(y, reduced_fixed, reduced_beta, sigma2, max_iter=200, tol=1e-7)
        
        print(f"Model (-var {var}):", end="\r")
        
        reduced_ll = likelihood_function_fix(y, reduced_fixed, reduced_beta, r_sigma2)
        
        lrt_result = lrt(full_ll, reduced_ll, dof=1)
        
        lrt_results.append((var, lrt_result[0], lrt_result[1]))
    
    
    TPR, FPR = calc_tpr_fpr(lrt_results,threshold=(0.05/(fixed_.shape[1]*nperms)), FDR_BH=False, solutions=solutions)
    roc_data[0].append(TPR)
    roc_data[1].append(FPR)
    lrt_results_lst.append(lrt_results)
    
print("\r\ndone")

fpr = np.array(roc_data[1])
tpr = np.array(roc_data[0])

sorted_indices = np.argsort(fpr)
fpr = fpr[sorted_indices]
tpr = tpr[sorted_indices]

plt.plot(fpr, tpr, label='ROC curve')
plt.plot([0,1],[0,1], linestyle='--', label='Random guess')
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('ROC Curve')
plt.legend()
plt.show()

Permuation 1:             Model converged on iteration 3
Model (-var x5):          Model converged on iteration 3

In [29]:
lrt_results_lst

[[('x1', 414058294097.2899, 0.0),
  ('x2', 414058294097.2899, 0.0),
  ('x3', 414058294097.2899, 0.0),
  ('x4', 414058294097.2899, 0.0),
  ('x5', 414058294097.2899, 0.0)],
 [('x1', 414058294095.55444, 0.0),
  ('x2', 414058294095.55444, 0.0),
  ('x3', 414058294095.55444, 0.0),
  ('x4', 414058294095.55444, 0.0),
  ('x5', 414058294095.55444, 0.0)],
 [('x1', 414058294099.8011, 0.0),
  ('x2', 414058294099.8011, 0.0),
  ('x3', 414058294099.8011, 0.0),
  ('x4', 414058294099.8011, 0.0),
  ('x5', 414058294099.8011, 0.0)],
 [('x1', 414058294105.6701, 0.0),
  ('x2', 414058294105.6701, 0.0),
  ('x3', 414058294105.6701, 0.0),
  ('x4', 414058294105.6701, 0.0),
  ('x5', 414058294105.6701, 0.0)],
 [('x1', 414058294099.3615, 0.0),
  ('x2', 414058294099.3615, 0.0),
  ('x3', 414058294099.3615, 0.0),
  ('x4', 414058294099.3615, 0.0),
  ('x5', 414058294099.3615, 0.0)],
 [('x1', 414058294094.9897, 0.0),
  ('x2', 414058294094.9897, 0.0),
  ('x3', 414058294094.9897, 0.0),
  ('x4', 414058294094.9897, 0.0),
  ('