**重要：delta_GPA绝对不允许对本文件做任何改动**

In [None]:
'''
这个Jupyter Notebook包含了基于分子贡献法（Molecular Contribution，MC）的用于预测自由能（Gibbs Free Energy，ΔG）的函数示例。
如果用户请求使用MC方法，对于简单的情形下的生化反应热力学分析，可以直接调用这些函数进行预测。

分为两大类：
- 生成自由能预测（Formation Energy，$Δ_f G$）
- 反应自由能预测（Reaction Energy，$Δ_r G$）
这些函数可以帮助用户在不同条件下预测自由能，从而支持生化反应的热力学分析
'''

In [None]:
'''
必要库，在调用本文档的函数之前请确保import这些库
'''
import sys
import os
import json
import re
from typing import Optional, Tuple, Union, List, Dict

import numpy as np
import numpy.typing as npt
import pandas as pd
import joblib
from rdkit import Chem, RDLogger

from equilibrator_api import ComponentContribution, Q_

In [None]:
# 抑制报错输出
RDLogger.DisableLog('rdApp.*')
import warnings
warnings.filterwarnings('ignore', category=DeprecationWarning)

In [None]:
'''
初始化CC类，以便在后续函数中调用（用于环境修正的Legendre变换）
'''
cc = ComponentContribution()  # 初始化组分贡献法对象放在全局，避免重复初始化

## 1. 加载必要数据和模型

In [None]:
# 设置 dGPredictor 路径
DGPREDICTOR_PATH = os.path.abspath('./dGPredictor')
DATA_PATH = os.path.join(DGPREDICTOR_PATH, 'data')
MODEL_PATH = os.path.join(DGPREDICTOR_PATH, 'model')

# 化合物 SMILES 数据库
db = pd.read_csv(os.path.join(DATA_PATH, 'cache_compounds_20160818.csv'), index_col='compound_id')
db_smiles = db['smiles_pH7'].to_dict()
print(f"已加载 {len(db_smiles)} 个化合物的 SMILES")

# 分子签名 (radius=1 和 radius=2)
molsig_r1 = json.load(open(os.path.join(DATA_PATH, 'decompose_vector_ac.json')))
molsig_r2 = json.load(open(os.path.join(DATA_PATH, 'decompose_vector_ac_r2_py3_indent_modified_manual.json')))
print(f"分子签名: radius=1 ({len(molsig_r1)}个), radius=2 ({len(molsig_r2)}个)")

# 基团名称
with open(os.path.join(DATA_PATH, 'group_names_r1.txt')) as f:
    moie_r1 = f.read().splitlines()
with open(os.path.join(DATA_PATH, 'group_names_r2_py3_modified_manual.txt')) as f:
    moie_r2 = f.read().splitlines()

# 加载预训练模型
model = joblib.load(os.path.join(MODEL_PATH, 'M12_model_BR.pkl'))
print("模型加载成功")

## 2. 核心辅助函数

In [None]:
def get_compound(identifier: str, cc) -> Optional[object]:
    """
    根据标识符获取化合物对象，按优先级尝试多种策略

    注意：调用本函数必须全局初始化cc = ComponentContribution()
    
    Args:
        identifier: 化合物标识符(支持InChI、KEGG、BIGG、Metacyc、SMILES等格式)
        - 格式1: InChI (以 "InChI=" 开头)
        - 格式2: KEGG (以 "C" 开头，后跟5位数字)
        - 格式3: BIGG (直接使用BIGG ID)
        - 格式4: Metacyc (直接使用Metacyc ID)
        - 格式5: SMILES (有效的SMILES字符串，最后尝试)
        cc: 已初始化的 ComponentContribution 对象
    
    Returns:
        成功时返回化合物对象，失败时返回 None
    """

    RDLogger.DisableLog('rdApp.error')  # 禁用RDKit错误日志输出

    def try_get_compound(query: str) -> Optional[object]:
        """尝试获取化合物，失败或返回None时返回None"""
        try:
            result = cc.get_compound(query)
            return result if result is not None else None
        except Exception:
            return None
    
    def is_smiles(s: str) -> bool:
        """判断字符串是否为有效的SMILES格式"""
        try:
            mol = Chem.MolFromSmiles(s)
            return mol is not None
        except Exception:
            return False
    
    def smiles_to_inchi(smiles: str) -> Optional[str]:
        """将SMILES转换为InChI"""
        try:
            mol = Chem.MolFromSmiles(smiles)
            if mol is not None:
                inchi = Chem.MolToInchi(mol)
                return inchi
            return None
        except Exception:
            return None
    
    # 策略0: 直接查询（原始标识符）
    compound = try_get_compound(identifier)
    if compound is not None:
        return compound
    
    # 策略0.5: 通用搜索
    compound = cc.search_compound(identifier)
    if compound is not None:
        return compound

    
    # 策略1: InChI格式
    compound = cc.get_compound_by_inchi(identifier)
    if compound is not None:
        return compound


    # 策略2: KEGG格式 (C + 5位数字)
    compound = try_get_compound(f"kegg:{identifier}")
    if compound is not None:
        return compound
    
    # 策略3: BIGG数据库
    compound = try_get_compound(f"bigg.metabolite:{identifier}")
    if compound is not None:
        return compound
    
    # 策略4: MetaCyc数据库
    compound = try_get_compound(f"metacyc.compound:{identifier}")
    if compound is not None:
        return compound
    
    # 策略5: SMILES格式（最后尝试，因为检测成本较高）
    if is_smiles(identifier):
        try:
            inchi = smiles_to_inchi(identifier)
            if inchi:
                compound = cc.get_compound_by_inchi(inchi)
                if compound is not None:
                    return compound
        except Exception:
            pass
    
    # 所有策略均失败
    return None

In [None]:
def convert_to_inchi(input_string: str) -> str:
    """
    输入字符串，如果是SMILES就转换成InChI，如果是InChI就直接保留
    
    参数:
        input_string: 输入的化学结构字符串
    
    返回:
        InChI格式的字符串
    """
    input_string = input_string.strip()
    
    # 判断是否为InChI格式（InChI以"InChI="开头）
    if input_string.startswith("InChI="):
        return input_string
    else:
        # 假设是SMILES格式，尝试转换为InChI
        try:
            mol = Chem.MolFromSmiles(input_string, sanitize=False)
            if mol is not None:
                inchi = Chem.MolToInchi(mol)
                return inchi
            else:
                return "错误：无效的SMILES字符串"
        except Exception as e:
            return f"错误：转换失败 - {str(e)}"

## 3. 分子签名分解函数

In [None]:
def count_substructures(radius: int, mol) -> dict:
    """统计分子中指定半径的子结构数量
    
    Args:
        radius: 子结构半径 (1 或 2)
        mol: RDKit 分子对象
    
    Returns:
        dict: {子结构SMILES: 数量}
    """
    smi_count = {}
    for i in range(mol.GetNumAtoms()):
        env = Chem.FindAtomEnvironmentOfRadiusN(mol, radius, i)
        atoms = set()
        for bidx in env:
            bond = mol.GetBondWithIdx(bidx)
            atoms.add(bond.GetBeginAtomIdx())
            atoms.add(bond.GetEndAtomIdx())
        
        if len(atoms) == 0:
            atoms = {i}  # 孤立原子
        
        smi = Chem.MolFragmentToSmiles(mol, atomsToUse=list(atoms),
                                       bondsToUse=env, canonical=True)
        smi_count[smi] = smi_count.get(smi, 0) + 1
    return smi_count


def decompose_smiles(smiles: str, radius: int = 1) -> dict:
    """将 SMILES 分解为分子签名向量"""
    mol = Chem.MolFromSmiles(smiles)
    mol = Chem.RemoveHs(mol)
    return count_substructures(radius, mol)


def parse_reaction(formula: str, arrow='<=>') -> dict:
    """解析反应式，返回 {compound_id: stoichiometry}
    
    Args:
        formula: 反应式，如 "2 C00001 + C00002 <=> C00003"
        arrow: 箭头符号
    
    Returns:
        dict: {化合物ID: 化学计量数} (反应物为负，产物为正)
    """
    def parse_side(s):
        result = {}
        for member in re.split(r'\s*\+\s*', s.strip()):
            tokens = member.split(None, 1)
            if len(tokens) == 0:
                continue
            if len(tokens) == 1:
                amount, key = 1, tokens[0]
            else:
                amount, key = float(tokens[0]), tokens[1]
            result[key] = result.get(key, 0) + amount
        return result
    
    left, right = formula.split(arrow)
    rxn_dict = {}
    
    for cid, count in parse_side(left).items():
        rxn_dict[cid] = rxn_dict.get(cid, 0) - count
    for cid, count in parse_side(right).items():
        rxn_dict[cid] = rxn_dict.get(cid, 0) + count
    
    return rxn_dict


def get_reaction_rule(rxn_dict: dict, molsig1: dict, molsig2: dict) -> np.ndarray:
    """计算反应规则向量（分子签名变化）
    
    Args:
        rxn_dict: 反应字典 {cid: stoic}
        molsig1: radius=1 分子签名
        molsig2: radius=2 分子签名
    
    Returns:
        np.ndarray: 组合的反应规则向量
    """
    # 创建 DataFrame
    df1 = pd.DataFrame.from_dict(molsig1).fillna(0).reindex(moie_r1).fillna(0)
    df2 = pd.DataFrame.from_dict(molsig2).fillna(0).reindex(moie_r2).fillna(0)
    
    # 计算反应前后的分子签名变化
    rule1 = pd.Series(0.0, index=df1.index)
    rule2 = pd.Series(0.0, index=df2.index)
    
    for cid, stoic in rxn_dict.items():
        if cid in ['C00080', 'C00282']:  # 忽略 H+ 和 H2
            continue
        if cid in df1.columns:
            rule1 += df1[cid] * stoic
        if cid in df2.columns:
            rule2 += df2[cid] * stoic
    
    # 组合向量 (添加零填充以匹配模型输入维度)
    vec1 = np.concatenate([rule1.values, np.zeros(44)])
    vec2 = np.concatenate([rule2.values, np.zeros(44)])
    
    return np.concatenate([vec1, vec2]).reshape(1, -1)

## 生成自由能预测（Formation Energy，$Δ_f G'$）

这部分包含了用于计算**化合物生成自由能**的函数

**注意**：MC方法本身不直接预测生成自由能，但我们可以通过虚拟反应（0 -> 1 化合物）来近似计算，并使用eQuilibrator的Legendre变换进行环境修正。

In [None]:
def standard_dgf_prime_MC(
    cpd: str, 
    cc: ComponentContribution,
    p_h: float = 7.0, 
    p_mg: float = 3.0, 
    I: float = 0.25, 
    T: float = 298.15,
    physiological: bool = False  # 是否转换为1mM生理浓度
) -> Tuple[np.floating, np.floating]:
    '''
    使用分子贡献法(Molecular Contribution)计算化合物的变换生成自由能 (Δ_fG'°或Δ_fG'm)

    注意：MC方法本身设计用于反应自由能预测，生成自由能需要通过虚拟反应间接估算。
    环境修正使用eQuilibrator的Legendre变换。
    
    参数:
    cpd: 化合物KEGG ID (如 "C00002")
    cc: 已初始化的 ComponentContribution 对象
    p_h: 溶液的pH值 (默认值: 7.0)
    p_mg: 溶液的pMg值 (默认值: 3.0)
    I: 离子强度，单位为M (默认值: 0.25M)
    T: 温度，单位为K (默认值: 298.15K)
    physiological: 如果为 True，返回 1mM 浓度下的结果；
                   如果为 False (默认)，返回 1M 标准态结果。
    
    返回:
    (能量值, 误差值) 单位: kJ/mol
    '''
    try:
        # 获取化合物对象（用于环境修正）
        compound = get_compound(cpd, cc)
        if compound is None:
            raise ValueError(f"无法找到化合物: {cpd}")
        
        # 检查化合物是否在 MC 数据库中
        cid = cpd.replace('kegg:', '').replace('KEGG:', '')
        if cid not in molsig_r1:
            raise ValueError(f"化合物 {cid} 不在 MC 数据库中")
        
        # 创建虚拟反应: 0 -> 1 化合物 (使用参考化合物作为"基准")
        # 这里我们使用 H2O (C00001) 作为参考，计算相对生成自由能
        # 注意：这只是一个近似方法
        reaction_formula = f"C00001 <=> {cid}"
        
        rxn_dict = parse_reaction(reaction_formula)
        X = get_reaction_rule(rxn_dict, molsig_r1, molsig_r2)
        
        # 预测
        y_mean, y_std = model.predict(X, return_std=True)
        
        # 加上 H2O 的标准生成自由能 (Δ_fG°(H2O) ≈ -237.1 kJ/mol at pH 7)
        dGf_H2O = -157.6  # Δ_fG'° of H2O at pH 7, I=0.25M (from eQuilibrator)
        val = y_mean[0] + dGf_H2O
        err = 1.96 * y_std[0] / np.sqrt(4001)  # 95% 置信区间
        
        # 环境修正 (使用 eQuilibrator 的 transform 函数)
        # 先从标准条件 (pH 7, pMg 14, I=0.25M, T=298.15K) 变换到目标条件
        delta_std = compound.transform(
            p_h=Q_(7.0), p_mg=Q_(14.0), 
            ionic_strength=Q_('0.25M'), temperature=Q_('298.15K')
        )
        delta_user = compound.transform(
            p_h=Q_(p_h), p_mg=Q_(p_mg), 
            ionic_strength=Q_(f'{I}M'), temperature=Q_(f'{T}K')
        )
        
        # 应用环境修正
        val = val - delta_std.m_as("kJ/mol") + delta_user.m_as("kJ/mol")
        
        # 如果需要生理浓度 (1mM)，加上修正项
        if physiological:
            from scipy.constants import R as R_const  # J/(K·mol)
            R = R_const * 1e-3  # kJ/(K·mol)
            correction = R * T * np.log(1e-3)
            val += correction
        
        return np.float64(val), np.float64(err)
    
    except Exception as e:
        print(f"处理化合物 {cpd} 时出错: {str(e)}")
        return np.nan, np.nan

In [None]:
# 示例调用
dGf, std = standard_dgf_prime_MC('C00031', cc)  # Glucose
print(f"Glucose Δ_fG'° (Standard conditions): {dGf:.2f} ± {std:.2f} kJ/mol")

In [None]:
# 示例调用（不同条件）
dGf, std = standard_dgf_prime_MC('C00002', cc, p_h=8, p_mg=3, I=0.3, physiological=True)
print(f"ATP Δ_fG'^m (pH=8, pMg=3, I=0.3, physiological): {dGf:.2f} ± {std:.2f} kJ/mol")

## 反应自由能预测（Reaction Energy，$Δ_r G'$）

这部分包含了用于计算**生化反应自由能**的函数

MC (dGPredictor) 方法的核心功能就是预测反应自由能。

In [None]:
def standard_dgr_prime_MC(
    rxn: str,
    cc: ComponentContribution,
    p_h: float = 7.0, 
    p_mg: float = 3.0, 
    I: float = 0.25, 
    T: float = 298.15,
    physiological: bool = False  # 是否转换为1mM生理浓度
) -> Tuple[np.floating, np.floating, np.floating]:
    """
    使用分子贡献法(Molecular Contribution)计算反应的变换生成自由能 (Δ_rG'°或Δ_rG'^m)和平衡常数 (K')

    注意：调用本函数必须提前全局初始化cc = ComponentContribution()
    环境修正使用eQuilibrator的Legendre变换。
    
    参数:
    rxn: 反应式字符串 (KEGG ID 格式，如 "C00002 + C00001 <=> C00008 + C00009")
    cc: 已初始化的 ComponentContribution 对象
    p_h: 溶液的pH值 (默认值: 7.0)
    p_mg: 溶液的pMg值 (默认值: 3.0)
    I: 离子强度，单位为M (默认值: 0.25M)
    T: 温度，单位为K (默认值: 298.15K)
    physiological: 如果为 True，返回 1mM 浓度下的结果；
                   如果为 False (默认)，返回 1M 标准态结果。
    
    返回:
    (能量值, 误差值, 平衡常数) 单位: kJ/mol    
    """
    try:
        # 解析反应
        rxn_dict = parse_reaction(rxn)
        
        # 检查所有化合物是否在数据库中
        missing = [cid for cid in rxn_dict.keys() 
                   if cid not in ['C00080', 'C00282'] and cid not in molsig_r1]
        if missing:
            raise ValueError(f"化合物不在数据库中: {missing}")
        
        # 获取反应规则向量
        X = get_reaction_rule(rxn_dict, molsig_r1, molsig_r2)
        
        # 预测
        y_mean, y_std = model.predict(X, return_std=True)
        val = y_mean[0]
        err = 1.96 * y_std[0] / np.sqrt(4001)  # 95% 置信区间
        
        # 环境修正 (使用 eQuilibrator 的 transform 函数)
        # 计算每个化合物的 transform 贡献
        transform_correction = 0.0
        for cid, stoic in rxn_dict.items():
            if cid in ['C00080', 'C00282']:  # 忽略 H+ 和 H2
                continue
            
            compound = get_compound(f"kegg:{cid}", cc)
            if compound is not None:
                # 从标准条件变换到目标条件
                delta_std = compound.transform(
                    p_h=Q_(7.0), p_mg=Q_(14.0),
                    ionic_strength=Q_('0.25M'), temperature=Q_('298.15K')
                )
                delta_user = compound.transform(
                    p_h=Q_(p_h), p_mg=Q_(p_mg),
                    ionic_strength=Q_(f'{I}M'), temperature=Q_(f'{T}K')
                )
                transform_correction += stoic * (delta_user.m_as("kJ/mol") - delta_std.m_as("kJ/mol"))
        
        val += transform_correction
        
        from scipy.constants import R as R_const  # J/(K·mol)
        R = R_const * 1e-3  # kJ/(K·mol)
        
        # 如果需要生理浓度 (1mM)
        if physiological:
            lnQ = 0
            for cid, stoic in rxn_dict.items():
                if cid not in ['C00080', 'C00282']:  # 忽略 H+ 和 H2
                    lnQ += stoic * np.log(1e-3)
            val = val + R * T * lnQ
        
        # 计算平衡常数
        RT = R * T
        K_prime = np.exp(-val / RT)
        
        return np.float64(val), np.float64(err), np.float64(K_prime)
    
    except Exception as e:
        print(f"处理反应 {rxn} 时出错: {str(e)}")
        return np.nan, np.nan, np.nan

In [None]:
# 示例调用: ATP 水解反应
# ATP + H2O <=> ADP + Pi
dGr, std, K_prime = standard_dgr_prime_MC('C00002 + C00001 <=> C00008 + C00009', cc)
print(f"ATP 水解反应 Δ_rG'° (Standard conditions): {dGr:.2f} ± {std:.2f} kJ/mol, K': {K_prime:.2e}")

In [None]:
# 示例调用: 己糖激酶反应（不同条件）
# Glucose + ATP <=> Glucose-6-phosphate + ADP
dGr, std, K_prime = standard_dgr_prime_MC(
    'C00031 + C00002 <=> C00092 + C00008',
    cc,
    p_h=7.4, 
    p_mg=2.5, 
    I=0.15, 
    physiological=True
)
print(f"己糖激酶反应 Δ_rG'^m (pH=7.4, pMg=2.5, I=0.15, physiological): {dGr:.2f} ± {std:.2f} kJ/mol, K': {K_prime:.2e}")

## 高级用法：自定义 SMILES 反应

In [None]:
def standard_dgr_prime_MC_from_smiles(
    reactants: Dict[str, float],
    products: Dict[str, float],
    p_h: float = 7.0,
    I: float = 0.1
) -> Tuple[np.floating, np.floating]:
    """从 SMILES 预测反应自由能 (无环境修正)
    
    注意：此函数不进行 Legendre 变换环境修正，因为自定义分子缺少 pKa 信息。
    返回的是近似的标准条件 (pH 7, I=0.1M) 下的反应自由能。
    
    Args:
        reactants: 反应物 {SMILES: 化学计量数}
        products: 产物 {SMILES: 化学计量数}
        p_h: pH 值 (仅作参考，不进行实际修正)
        I: 离子强度 [M] (仅作参考，不进行实际修正)
    
    Returns:
        tuple: (ΔrG'° [kJ/mol], 标准误差 [kJ/mol])
    """
    # 分解所有分子
    custom_sig1 = {}
    custom_sig2 = {}
    rxn_dict = {}
    
    for i, (smiles, stoic) in enumerate(reactants.items()):
        cid = f"R{i:04d}"
        custom_sig1[cid] = decompose_smiles(smiles, radius=1)
        custom_sig2[cid] = decompose_smiles(smiles, radius=2)
        rxn_dict[cid] = -stoic
    
    for i, (smiles, stoic) in enumerate(products.items()):
        cid = f"P{i:04d}"
        custom_sig1[cid] = decompose_smiles(smiles, radius=1)
        custom_sig2[cid] = decompose_smiles(smiles, radius=2)
        rxn_dict[cid] = stoic
    
    # 合并分子签名
    combined_sig1 = {**molsig_r1, **custom_sig1}
    combined_sig2 = {**molsig_r2, **custom_sig2}
    
    # 获取反应规则向量
    X = get_reaction_rule(rxn_dict, combined_sig1, combined_sig2)
    
    # 预测
    y_mean, y_std = model.predict(X, return_std=True)
    dG0 = y_mean[0]
    conf_int = 1.96 * y_std[0] / np.sqrt(4001)
    
    return np.float64(dG0), np.float64(conf_int)

In [None]:
# 使用 SMILES 预测 (示例：乙醇氧化为乙醛)
# CH3CH2OH + NAD+ <=> CH3CHO + NADH + H+
reactants = {
    "CCO": 1,      # 乙醇
}
products = {
    "CC=O": 1,     # 乙醛
    "O": 1         # 水
}

try:
    dG, std = standard_dgr_prime_MC_from_smiles(reactants, products)
    print(f"乙醇氧化反应 (SMILES):")
    print(f"  ΔrG'° ≈ {dG:.2f} ± {std:.2f} kJ/mol")
    print(f"  注意: 此值未经环境修正")
except Exception as e:
    print(f"预测失败: {e}")

## 批量预测

In [None]:
def calculate_standard_dgr_batch(
    reactions: List[Tuple[str, str]],
    cc: ComponentContribution,
    p_h: float = 7.0,
    p_mg: float = 3.0,
    I: float = 0.25,
    T: float = 298.15
) -> pd.DataFrame:
    """
    批量计算多个反应的标准反应自由能
    
    Args:
        reactions: 反应列表 [(反应式, 反应名称), ...]
        cc: ComponentContribution 对象
        p_h: pH值，默认7.0
        p_mg: pMg值，默认3.0
        I: 离子强度，默认0.25 M
        T: 温度，默认298.15 K
    
    Returns:
        pd.DataFrame: 包含所有计算结果的数据框
    """
    results = []
    
    print("批量预测结果:")
    print("-" * 70)
    
    for rxn, name in reactions:
        try:
            dG, std, K = standard_dgr_prime_MC(rxn, cc, p_h, p_mg, I, T)
            results.append({
                'reaction': rxn,
                'name': name,
                'dGr': float(dG),
                'uncertainty': float(std),
                'K_prime': float(K),
                'status': 'success'
            })
            print(f"{name:30s} | ΔrG'° = {dG:8.2f} ± {std:.2f} kJ/mol")
        except Exception as e:
            results.append({
                'reaction': rxn,
                'name': name,
                'dGr': np.nan,
                'uncertainty': np.nan,
                'K_prime': np.nan,
                'status': str(e)
            })
            print(f"{name:30s} | Error: {e}")
    
    return pd.DataFrame(results)

In [None]:
# 示例：批量预测糖酵解反应
reactions = [
    ("C00002 + C00001 <=> C00008 + C00009", "ATP hydrolysis"),
    ("C00031 + C00002 <=> C00092 + C00008", "Hexokinase"),
    ("C00092 <=> C00085", "Glucose-6-P isomerase"),
    ("C00085 + C00002 <=> C00354 + C00008", "Phosphofructokinase"),
    ("C00354 <=> C00111 + C00118", "Aldolase"),
]

results_df = calculate_standard_dgr_batch(reactions, cc)
print("\n完整结果:")
print(results_df.to_string())