In [1]:
import itertools
from scipy import stats
import numpy as np

def get_rss(y_np, fixedEff_np):
    """
    compute residual sum of squares of H0: marker has no effect on phenotype
    
    :param y: phenotype vector
    :param fixedEff: vector or matrix of fixed effects
    
    :return: residual sum of squares
    """
    
    # Compute the transpose of fixedEff
    fixedT = fixedEff_np.T   
    # Compute beta, the coefficients for the fixed effects
    beta = np.linalg.lstsq(fixedT @ fixedEff_np, fixedT @ y_np, rcond=None)[0]
    # Compute the difference between the actual y and the predicted values
    dif = y_np - fixedEff_np @ beta
    # Compute the residual sum of squares
    rss = dif.T @ dif
    
    return rss

def get_f_score(rss0, rss1, n, freedom_deg):
    """
    compute test statistics in batches
    
    :param rss0: residual sum of squares of H0: marker has no effect on phenotype
    :param rss1: residual sum of squares of H1: marker has effect on phenotype
    :param n: number of samples
    :param freedom_deg: degrees of freedom
    
    :return: F1 score
    """
    return (n-freedom_deg)*(rss0-rss1)/rss1

def get_p_value(f1, n: int, freedom_deg: int):
    """
    compute p-value using survival function of f distribution
    :param f1: F1 score
    :param n: number of samples
    :param freedom_deg: degrees of freedom
    :return: p-value
    """
    return stats.f.sf(f1, 1, n-freedom_deg)


def FeatureEncoder(np_genotype_rsid, np_genotype, int_dim):
    """
    Implementation of the two-element combinatorial encoding.

    :param np_genotype_rsid (ndarray): 1D array containing rsid of genotype data with `str` type
    :param np_genotype (ndarray): 2D array containing genotype data with `int8` type
    :param int_dim (int): The dimension of a variant (default: 3. AA, AB and BB)

    :return: list_interaction_rsid (ndarray): 1D array containing rsid of genotype data with `str` type
    :return: np_interaction (ndarray): 2D array containing genotype data with `int8` type
    
    """
    
    np_interaction = np_genotype
    list_interaction_rsid = np_genotype_rsid
    list_combs = list(itertools.combinations(range(int(np_interaction.shape[1]/int_dim)), 2))[0]

    

    np_this_interaction = np.zeros([np_genotype.shape[0], int_dim**2], dtype='int8')
    list_this_interaction_id = []
    for idx_x in range(int_dim):
            for idx_y in range(int_dim):
                np_this_interaction_term = (np_genotype[:, list_combs[0] * int_dim + idx_x] * np_genotype[:, list_combs[1] * int_dim + idx_y]).astype(np.int8)
                if not(np.array_equal(np_this_interaction_term, np_genotype[:, list_combs[0] * int_dim + idx_x])) and not(np.array_equal(np_this_interaction_term, np_genotype[:, list_combs[1] * int_dim + idx_y])):
                        np_this_interaction[:, idx_x * int_dim + idx_y] = np_this_interaction_term
                list_this_interaction_id.append(np_genotype_rsid[list_combs[0] * int_dim + idx_x] + "*" + np_genotype_rsid[list_combs[1] * int_dim + idx_y])
            
    
    np_this_interaction_id = np.array(list_this_interaction_id)
    np_interaction_append = np.empty((np_interaction.shape[0], np_interaction.shape[1] + np_this_interaction.shape[1]), dtype='int')
    np_interaction_append[:,:-(np_this_interaction.shape[1])] = np_interaction
    np_interaction_append[:,-(np_this_interaction.shape[1]):] = np_this_interaction
    np_interaction = np_interaction_append
    list_interaction_rsid.extend(list(np_this_interaction_id))
        
    return list_interaction_rsid, np_interaction 

In [4]:
import sys
import yaml
from pathlib import Path
import sqlite3
import pandas as pd
from select_parameter_utils import group_shuffle_split
import numpy as np



configure_file = "/exeh_4/yuping/Epistasis_Interaction/02_Explore_interaction_term/model_configure/IRF_RF_test.yaml"
try:
    with open(configure_file) as infile:
        load_configure = yaml.safe_load(infile)
except Exception:
        sys.stderr.write("Please specify valid yaml file.")
        sys.exit(1)
        
        
# generate group label
group_raw_df = pd.read_csv(Path(load_configure['dataset']['group_dir']))

weight_tissue = "Brain_Amygdala"     
phen_dir =  Path(load_configure['dataset']['data_dir']) / "2024-03-27T11:03:04.174838_phenotype_residualed.db"
data_dir =  Path(load_configure['dataset']['data_dir']) / weight_tissue / "2024-03-25T11:01:20.810692_predictor_feature.csv"
# Connect to the SQLite database
conn = sqlite3.connect(phen_dir)
# Read data from the "Phenotype" table into a DataFrame
query = "SELECT * FROM Phenotype"
df = pd.read_sql_query(query, conn, index_col=None)
except_column = ["MotherEducation", "FatherEducation"]
data = pd.read_csv(data_dir, sep="\t")
data = data.drop(except_column, axis=1)



X_train_raw_df, X_test_raw_df, y_train_tran_df, y_test_tran_df, train_index, test_index = group_shuffle_split(data.values, 
                                                                                                              df.values, 
                                                                                                              seed=load_configure['default_seed'],
                                                                                                              test_size=0.2,
                                                                                                              groups=group_raw_df.values.flatten())     

genotype_df = pd.DataFrame(X_train_raw_df, columns=data.columns.to_list())


# Create example dataframe for interaction
data = {'Pattern Value': ['Gender_ENSG00000001561.6', 'Ageyr_ENSG00000002933.3', "ENSG00000002933.3_ENSG00000001561.6"]}
df = pd.DataFrame(data)

P_value_list = []
F_score_list = []
Stability_term_list = []
for i in range(df.shape[0]):
    # analysis feature term
    select_column = df['Pattern Value'][i].split("_")
    # analysis genotype
    test_genotype = np.array(genotype_df[select_column])
    # get interaction term
    np_genotype_rsid, np_genotype = FeatureEncoder(select_column, test_genotype, int_dim=1)
    if "Gender" in select_column and "Ageyr" not in select_column:
        covariate = np.array(genotype_df["Ageyr"])
        # covaraite and analysis feature term
        reduce_genotype = np.hstack((covariate[:,np.newaxis], test_genotype))
        # covaraite, analysis feature term and interaction term
        full_genotype = np.hstack((covariate[:,np.newaxis], np_genotype))
    if "Ageyr" in select_column and "Gender" not in select_column:
        covariate = np.array(genotype_df["Gender"])
        # covaraite and analysis feature term
        reduce_genotype = np.hstack((covariate[:,np.newaxis], test_genotype))
        # covaraite, analysis feature term and interaction term
        full_genotype = np.hstack((covariate[:,np.newaxis], np_genotype))
    
    if "Gender" not in select_column and "Ageyr" not in select_column:
        covariate = np.array(genotype_df[["Gender","Ageyr"]])
        # covaraite and analysis feature term
        reduce_genotype = np.hstack((covariate,test_genotype))
        full_genotype = np.hstack((covariate, np_genotype))
    else:
        pass
    
    # number of sample
    n = len(y_train_tran_df)
    # degrees of freedom for full_genotype
    freedom_deg = full_genotype.shape[1]
    # get error sum of squares for reduce and full model
    get_rss_ho = get_rss(y_train_tran_df, reduce_genotype)
    get_rss_h1 = get_rss(y_train_tran_df, full_genotype)
    # get f-score and p-value
    f_score = get_f_score(get_rss_ho, get_rss_h1, n, freedom_deg)[0][0]
    p_value = get_p_value(f_score, n, freedom_deg)
    # append cacluated result to list
    P_value_list.append(p_value)
    F_score_list.append(f_score)
    Stability_term_list.append(np_genotype_rsid)
  


output = pd.DataFrame(Stability_term_list, columns=["Feature_1", "Feature_2", "Interaction_term"])
output['F_Score'] = F_score_list
output["P_value"] = P_value_list

                         Pattern Value
0             Gender_ENSG00000001561.6
1              Ageyr_ENSG00000002933.3
2  ENSG00000002933.3_ENSG00000001561.6
