In [None]:
# Copyright © 2022 LVCS. All Rights Reserved
import numpy as np
import pandas as pd
import scipy.stats as stats

num1 = str(1)
num2 = str(4)


In [None]:
# FDR False Discovery Rate
def correct_pvalues_for_multiple_testing(pvalues, correction_type="Benjamini-Hochberg"):
    from numpy import array, empty

    pvalues = array(pvalues)
    n = int(pvalues.shape[0])
    new_pvalues = empty(n)
    if correction_type == "Bonferroni":
        new_pvalues = n * pvalues
    elif correction_type == "Bonferroni-Holm":
        values = [(pvalue, i) for i, pvalue in enumerate(pvalues)]
        values.sort()
        for rank, vals in enumerate(values):
            pvalue, i = vals
            new_pvalues[i] = (n - rank) * pvalue
    elif correction_type == "Benjamini-Hochberg":
        values = [(pvalue, i) for i, pvalue in enumerate(pvalues)]
        values.sort()
        values.reverse()
        new_values = []
        for i, vals in enumerate(values):
            rank = n - i
            pvalue, index = vals
            new_values.append((n / rank) * pvalue)
        for i in range(0, int(n) - 1):
            if new_values[i] < new_values[i + 1]:
                new_values[i + 1] = new_values[i]
        for i, vals in enumerate(values):
            pvalue, index = vals
            new_pvalues[index] = new_values[i]
    return new_pvalues


In [None]:
# 数据导入
frame1 = pd.read_csv(num1 + ".csv")
frame2 = pd.read_csv(num2 + ".csv")
# frame2 = copy.deepcopy(frame1)
# 删除多余数据
del frame1["peak"]
del frame2["peak"]
frame1 = frame1.to_numpy()
frame2 = frame2.to_numpy()

ttest = []
ttest_pass = []


In [None]:
# t test 利用levene检验，根据p-value>0.05*10判断两总体是否具有方差齐性，并提取出ttest p-value小于0.05的样本
for i in range(len(frame1)):
    if (stats.levene(frame1[i], frame2[i])[1]) > 0.5:
        x = stats.ttest_ind(frame1[i], frame2[i], equal_var=True)[1]
        ttest.append(x)
        if x < 0.05:
            ttest_pass.append(i + 1)
    else:
        x = stats.ttest_ind(frame1[i], frame2[i], equal_var=False)[1]
        ttest.append(x)
        if x < 0.05:
            ttest_pass.append(i + 1)

# FDR修正p-value
correct_pvalues = correct_pvalues_for_multiple_testing(
    pvalues=ttest, correction_type="Benjamini-Hochberg")


In [None]:
frame_origin = pd.read_csv(num1 + ".csv")
frame_origin = frame_origin['peak'].to_numpy()
frame_output = []


In [None]:
# FDR校准数据
for i in range(len(correct_pvalues)):
    if (correct_pvalues[i]) < 0.05:
        frame_output.append(frame_origin[i])
frame_output = pd.DataFrame(frame_output)


In [None]:
# 原始数据
for i in range(len(ttest)):
    if (ttest[i]) < 0.05:
        frame_output.append(frame_origin[i])
frame_output = pd.DataFrame(frame_output)


In [None]:
# 差异表达结果保存
frame_output.to_csv(num1 + "-" + num2 + "-fine.csv", header=0, index=0)


In [None]:
# 读取ChIPseeker转化数据，并构建字典
dict_origin = pd.read_csv("peak.annotation.5000.csv")
d = {dict_origin["V4"][i]: str(dict_origin["SYMBOL"][i]) for i in range(len(dict_origin))}


In [None]:
# 目标peak转换为symbol
symbol_output = []
for i in range(len(frame_output)):
    symbol_output.append(d[frame_output.values[i][0]])


In [None]:
# kegg_pathway文件symbol读取
kegg = (pd.read_csv("kegg_symbol.csv"))["symbol"].values

# 数据取交集，提取相关基因
overlap = np.intersect1d(symbol_output, kegg)


In [None]:
# overlap结果输出
np.savetxt(num1 + "-" + num2 + "-intersect1d.csv",
           overlap, fmt="%s", delimiter=',')
