In [2]:
import pandas as pd
import numpy as np
import subprocess
import os

In [3]:
# 配置运行路径，需要师哥你自己修改一下
workdir_omic = '/Users/dongjiacheng/Desktop/coder/mtd/code/omic_analysis/'

#### 调用R脚本，对输入的gene list进行富集分析，结果保存为tsv表中

In [4]:
def run_enrich(input_path, output_path, species, pvalue, enrich_type):
    """
    运行GO或KEGG富集分析R脚本。

    Args:
        input_path (str): 输入文件的路径。
        output_path (str): 输出文件的路径。
        species (str): 菌种名称。
        pvalue (float): P值阈值。
        enrich_type (str): 分析类型，"GO" 或 "KEGG"。

    Returns:
        str: R脚本的输出。
    """
    # 根据enrich_type确定脚本路径
    if enrich_type == 'GO':
        script_name = 'go_enrich.R'
    elif enrich_type == 'KEGG':
        script_name = 'kegg_enrich.R'
    else:
        raise ValueError("enrich_type must be 'GO' or 'KEGG'")

    script_path = os.path.join(workdir_omic, 'enrichment_analysis', script_name)

    cmd = [
        'Rscript', script_path,
        '--input', input_path,
        '--output', output_path,
        '--species', species,
        '--pvalue', str(pvalue),
    ]

    # 执行R脚本并捕获输出
    try:
        result = subprocess.run(cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
        return result.stdout
    except subprocess.CalledProcessError as e:
        return e.stderr

# 示例调用
# output_go = run_enrich("input_file/gene_list.txt", "output_file/go.tsv", "Myceliophthora thermophila", 0.05, "GO")
output_kegg = run_enrich("input_file/gene_list3.txt", "output_file/kegg.tsv", "Myceliophthora thermophila", 0.05, "KEGG")

#### 基于Python的Plotly包，对富集分析结果进行可视化

In [5]:
# 读取文件
# df_kegg = pd.read_csv('output_file/kegg.tsv', sep='\t')
df_kegg = pd.read_csv('output_file/kegg2.tsv', sep='\t')
df_go = pd.read_csv('output_file/go.tsv', sep='\t')

In [6]:
import plotly.io as pio
import plotly.express as px
import plotly.graph_objects as go

def plot_kegg_chart(df_kegg, width=1280, height=720, p_adjust=0.05, font_size=15, chart_num=30, chart_size=30, color='rdbu_r',pic_type='bubble', funciton_type='All'):
    """根据输入的kegg富集结果，绘制气泡图

    Args:
        df_kegg (pd.DataFrame): kegg富集结果
        width (int): 图表宽度. 
        height (int): 图表高度. 
        p_adjust (float): P值阈值. 
        font_size (int): 字体大小. 
        chart_num (int): 最多显示的富集通路数量.
        chart_size (int): 气泡大小. 
        pic_type (str): 图表类型.有两种选择：'bubble'和'bar'
        color (str): 颜色. 

    """
    # 数据预处理
    df_kegg = df_kegg.copy()
    df_kegg = df_kegg[['ID', 'Description', 'GeneRatio', 'p.adjust', 'Count']]
    df_kegg.columns = ["ID", "Pathway", "GeneRatio","P.adjust", 'Count']
    df_kegg["GeneRatio"] = df_kegg["GeneRatio"].apply(lambda x: round(eval(x), 3))  # GeneRatio列输出处理为浮点数
    df_kegg['P.adjust'] = df_kegg['P.adjust'].apply(lambda x: round(x, 6))  # 控制P.adjust列的小数位数

    # 数据筛选
    df_kegg = df_kegg[df_kegg['P.adjust'] < p_adjust]  # 过滤P.adjust值
    df_kegg = df_kegg.sort_values(by='Count', ascending=False)  # 按照Count列降序排列
    df_kegg = df_kegg.iloc[:chart_num]  # 取前chart_num个数据

    # 计算最小和最大P值
    min_p_adjust = df_kegg['P.adjust'].min()
    max_p_adjust = df_kegg['P.adjust'].max()

    # 获取数据集中的所有唯一P值
    unique_p_adjust_values = df_kegg['P.adjust'].unique()

    # 根据唯一P值的数量设置颜色条刻度
    if len(unique_p_adjust_values) == 1:
        # 只有一个唯一的P值
        tickvals = [min_p_adjust]
        ticktext = [f'{min_p_adjust:.3f}']
    elif len(unique_p_adjust_values) == 2:
        # 有两个唯一的P值
        tickvals = unique_p_adjust_values
        ticktext = [f'{unique_p_adjust_values[0]:.3f}', f'{unique_p_adjust_values[1]:.3f}']
    else:
        # 多于两个唯一的P值，展示最小值、中位数和最大值
        median_p_adjust = df_kegg['P.adjust'].median()
        tickvals = [min_p_adjust, median_p_adjust, max_p_adjust]
        ticktext = [
            f'Min: {min_p_adjust:.3f}', 
            f'Median: {median_p_adjust:.3f}', 
            f'Max: {max_p_adjust:.3f}'
        ]

    if funciton_type == 'All':
        pass

    # 图表公共布局设置
    layout_args = {
        'title': "KEGG Enrichment Analysis",
        'yaxis_title': "Pathway",
        'yaxis': dict(autorange="reversed"),
        'font': dict(family="Arial", size=font_size),
        'template': "plotly_white",
        'width': width,
        'height': height
    }

    # 自定义颜色轴设置
    color_axis_args = {
        'colorbar_title': "P.adjust",
        'colorbar_tickformat': ".3f",
        'colorbar': dict(
            tickvals=tickvals,  # 设置刻度值
            ticktext=ticktext  # 设置刻度标签
        )
    }

    # 根据pic_type绘制不同类型的图表
    if pic_type == 'bubble':
        fig = px.scatter(
            df_kegg,
            x='GeneRatio',
            y='Pathway',
            size='Count',
            color='P.adjust',
            color_continuous_scale=color,
            opacity=0.85,
            hover_data=["ID",'P.adjust', 'Count'],
            size_max=chart_size
        )
    elif pic_type == 'bar':
        fig = px.bar(
            df_kegg,
            x='Count',
            y='Pathway',
            color='P.adjust',
            color_continuous_scale=color,
            opacity=0.85,
            hover_data=['ID','P.adjust', 'Count']
        )

    # 应用颜色轴设置
    fig.update_layout(**layout_args)
    fig.update_coloraxes(**color_axis_args)

    # 保存为png，scale设置为4
    fig.write_image("kegg.png", scale=4)
    
    # 方案1:将fig对象转为json
    # fig_json = pio.to_json(fig)
    # return fig_json

    # 方案2:将fig转为html格式，返回html代码
    # fig_html = plot(fig, output_type='div', include_plotlyjs=False)        
    # return fig_html

    # 方案3:将fig转为html格式，保存为html文件
    # fig.write_html("kegg.html")
    # return "kegg.html"

    # 测试用
    return fig 


plot_kegg_chart(df_kegg, 
                width=1280, height=720,
                p_adjust=0.05,
                font_size=15,
                chart_num=10,
                chart_size=30,
                pic_type='bar',
                color='rdbu_r')

In [10]:
def plot_go_chart(df_go, width=1280, height=720, p_value=0.05, font_size=15, chart_num=30, chart_size=30, pic_type='bubble', color='rdbu_r', funciton_type='All'):
    """根据输入的GO富集分析结果，根据用户选择绘制气泡图或柱状图。
    Args:
        df_go (DataFrame): GO富集分析结果。
        width (int): 图表宽度. 
        height (int): 图表高度. 
        p_value (float): P值阈值. 
        chart_num (int): 最多显示富集功能数量. 
        chart_size (int): 图表大小. 
        pic_type (str): 图表类型，可选bubble或bar. 
        color (str): 颜色. Defaults to 'Geyser'.
        funciton_type (str): GO类型，可选BP, CC, MF, All. 

    Returns:
    """

    # 数据处理
    df_go = df_go.copy()
    df_go = df_go[["category", "ID", "Description", "Count", 'GeneRatio', "p.adjust"]]
    df_go.columns = ["Class", "ID", "Description", "Count", "GeneRatio", "P.adjust"]

    # 数据列处理
    df_go["GeneRatio"] = df_go["GeneRatio"].apply(lambda x: round(eval(x), 3))
    df_go['P.adjust'] = df_go['P.adjust'].apply(lambda x: round(x, 6))
    df_go = df_go.sort_values(by='Count', ascending=False)
    df_go = df_go[df_go["P.adjust"] < p_value]
    df_go = df_go.iloc[:chart_num]

    # 过滤GO类型
    if funciton_type in ["BP", "CC", "MF"]:
        df_go = df_go[df_go["Class"].str.contains(funciton_type)]

    # 图表公共布局设置
    layout_args = {
        'title': "GO Enrichment Analysis",
        'yaxis_title': "Description",
        'yaxis': dict(autorange="reversed"),
        # 'yaxis': dict(autorange="reversed", showgrid=False), # y轴反向，不显示网格线
        # 'xaxis': dict(showgrid=False), # 不显示网格线
        'font': dict(family="Arial", size=font_size),
        'template': "plotly_white", # 主题背景
        'width': width,
        'height': height
    }

    # 颜色轴设置
    color_axis_args = {
        'colorbar_title': "P.adjust",
        'colorbar_tickformat': ".3f",
        'colorbar': dict(dtick=0.005)
    }

    # 根据pic_type绘制不同类型的图表
    if pic_type == "bubble":
        fig = px.scatter(
            df_go,
            x="GeneRatio",
            y="Description",
            size="Count",
            color="P.adjust",
            color_continuous_scale=color,
            opacity=0.85,
            hover_name="Class",
            hover_data=["ID", "Description", "Count", "GeneRatio", "P.adjust"],
            size_max=chart_size,
        )
        
    elif pic_type == "bar":
        fig = px.bar(
            df_go,
            x='Count',
            y='Description',
            color='P.adjust',
            color_continuous_scale=color,
            opacity=0.85,
            hover_data=["Class", "ID", "GeneRatio", "P.adjust"],
        )

    # 应用颜色轴设置
    fig.update_layout(**layout_args)
    fig.update_coloraxes(**color_axis_args)

    # 保存为png，scale设置为4
    fig.write_image(workdir_omic + "enrichment_analysis/output_file/go.png", scale=4)

    # 方案1:将fig对象转为json
    # fig_json = pio.to_json(fig)
    # return fig_json

    # 方案2:将fig转为html格式，返回html代码
    # fig_html = pio(fig, output_type='div', include_plotlyjs=False)        
    # return fig_html

    # 方案3:将fig转为html格式，保存为html文件
    # fig.write_html("go.html")
    # return "go.html"

    # 测试用
    return fig 


# 调用函数示例
plot_go_chart(df_go, 
              width=1280, height=720, 
              p_value=0.05, 
              font_size=15,
              chart_num=50, 
              chart_size=40, 
              pic_type='bubble', 
              color='rdbu_r', 
              funciton_type='All')