In [13]:
from __future__ import annotations

import pandas as pd
import numpy as np
from pingouin import ancova
from statsmodels.formula.api import ols
from statsmodels.stats.weightstats import ttest_ind
from pathlib import Path
from dataclasses import dataclass


@dataclass
class AncovaResult:
    """Dataclass for storing ANCOVA analysis results."""
    sample_id:str
    ph_covar:bool
    p_enhancer:float
    p_suppressor:float
    
class CovarProcessor:
    """
    Main class for processing activity data.
    

    input_path: Input tsv file that contains one dependent variable (dv) column, one between-subjects factor column with two levels (bf), and one covariate (covar) column
    max_covar_p_value: maximal allowed P value for rejecting no significant correlation hypothesis (default 0.05)
    """
    
    def __init__(
        self,
        sample_id:str,
        input_path:Path,
        max_covar_p_value:float = 0.05,
        ):
        
        self.sample_id = sample_id
        self.input_path = input_path
        self.max_covar_p_value = max_covar_p_value

        self.open_tsv()
        
    def open_tsv(self):
        """load in tsv file"""

        self.df_raw = pd.read_table(self.input_path)
        
    def ancova_analysis(self) -> bool:
        """Run ANCOVA and determine if pH is a significant covariate """

        covar_stats = ancova(data = self.df_raw, dv = 'activity',between="treatment", covar="ph")
        return (covar_stats.query('Source == "ph"')['p-unc'] < self.max_covar_p_value).values[0]
        
    def t_test(self) -> AncovaResult:
        """Execute t test on activities with/ without peptide treatment and return """
        
        if significant_ph:=self.ancova_analysis(): # perform linear fitting if pH is a significant covariate otherwise use mean for residue calculaition
            lm = ols("activity ~ ph", self.df_raw).fit()
            y_hat = lm.predict(self.df_raw['ph'])
            self.df_raw["residual"] = self.df_raw['activity'] - y_hat
        else:
            self.df_raw["residual"] = self.df_raw['activity'] - np.mean(self.df_raw['activity'])

        t_test_result_1 = ttest_ind(x1=self.df_raw.query('treatment==1')['residual'], x2=self.df_raw.query('treatment==0')['residual'], alternative = "larger") # test if peptide is an enhancer
        t_test_result_2 = ttest_ind(x1=self.df_raw.query('treatment==1')['residual'], x2=self.df_raw.query('treatment==0')['residual'], alternative = "smaller") # test if peptide is a suppressor

        return AncovaResult(self.sample_id, significant_ph, t_test_result_1[1], t_test_result_2[1])
    
        

In [14]:
a1 = CovarProcessor(sample_id = "smaple1", input_path = "samples/smaple1.tsv", max_covar_p_value = 0.05).t_test()

In [15]:
a2 = CovarProcessor(sample_id = "smaple2", input_path = "samples/smaple2.tsv", max_covar_p_value = 0.05).t_test()

In [16]:
a1.ph_covar

False

In [66]:
def merger(*results):
    stats = [[sample.sample_id,
    sample.ph_covar,
    sample.p_enhancer,
    sample.p_suppressor] for sample in results]
    print(stats)

In [67]:
merger(a1,a2)

[['smaple1', False, 0.009925472776460904, 0.9900745272235391], ['smaple2', False, 0.029194715909952037, 0.970805284090048]]


In [80]:
import sqlite3
from sqlite3 import Error

In [81]:
def create_connection(db_file):
    """ create a database connection to the SQLite database
        specified by db_file
    :param db_file: database file
    :return: Connection object or None
    """
    conn = None
    try:
        conn = sqlite3.connect(db_file)
    except Error as e:
        print(e)

    return conn

In [83]:
conn = create_connection("sqlite.db")

In [84]:
def create_db(conn):
    sql_create = """CREATE TABLE IF NOT EXISTS pep_stats (sample_id TEXT PRIMARYKEY, ph_covar CHAR, p_enhancer FLOAT, p_suppressor FLOAT, CONSTRAINT id_unique UNIQUE (sample_id)) """
    with conn:
        conn.execute(sql_create)

In [85]:
def insert_db(conn, ancova_results):
    sql_insert = """ INSERT OR REPLACE INTO pep_stats (sample_id, ph_covar, p_enhancer, p_suppressor) VALUES (?,?,?,?)"""
    with conn:
        conn.execute(sql_insert, (ancova_results.sample_id, ancova_results.ph_covar, ancova_results.p_enhancer, ancova_results.p_suppressor,))


In [86]:
create_db(conn)

In [91]:
insert_db(conn, a1)

In [92]:
insert_db(conn, a2)

In [93]:
def get_db(conn):
    sql_get = """SELECT * from pep_stats """
    with conn:
        df_query = pd.read_sql_query("SELECT * from pep_stats", conn)
        df_query['ph_covar'] = df_query['ph_covar'].apply(lambda x: False if '\x00' else True)
        return df_query

In [94]:
get_db(conn)

Unnamed: 0,sample_id,ph_covar,p_enhancer,p_suppressor
0,smaple1,False,0.009925,0.990075
1,smaple2,False,0.029195,0.970805


In [95]:
a1 = CovarProcessor(sample_id = "smaple1", input_path = "samples/smaple3.tsv", max_covar_p_value = 0.05).t_test()

In [96]:
insert_db(conn, a1)

In [97]:
get_db(conn)

Unnamed: 0,sample_id,ph_covar,p_enhancer,p_suppressor
0,smaple2,False,0.029195,0.970805
1,smaple1,False,0.051893,0.948107
