# 脚本说明：
1.脚本用于对输入的PDB文件（蛋白）进行Cartesian搜索，并进行饱和突变。

2.脚本会生成一个文件，里面是所有突变的PDB文件和所有突变对应的得分。

# 1. 初始化与环境配置 (Cartesian Mode)

In [2]:
import pyrosetta
from pyrosetta import *
from pyrosetta.rosetta.core.scoring import ScoreType
from pyrosetta.rosetta.core.pack.task import TaskFactory
from pyrosetta.rosetta.core.pack.task.operation import (
    IncludeCurrent, InitializeFromCommandline, RestrictToRepacking, PreventRepacking
)
from pyrosetta.rosetta.core.select.residue_selector import (
    ResidueIndexSelector, NeighborhoodResidueSelector, NotResidueSelector, AndResidueSelector
)
from pyrosetta.rosetta.protocols.relax import FastRelax
from pyrosetta.rosetta.core.pack.task.operation import (
    OperateOnResidueSubset, 
    PreventRepackingRLT, 
    RestrictAbsentCanonicalAASRLT,
    RestrictToRepackingRLT 
)
import pandas as pd
import py3Dmol

pyrosetta.init("-ex1 -ex2 -use_input_sc -nstruct 1 -beta_cart")

print("PyRosetta Initialized in CARTESIAN Mode.")

┌───────────────────────────────────────────────────────────────────────────────┐
│                                  PyRosetta-4                                  │
│               Created in JHU by Sergey Lyskov and PyRosetta Team              │
│               (C) Copyright Rosetta Commons Member Institutions               │
│                                                                               │
│ NOTE: USE OF PyRosetta FOR COMMERCIAL PURPOSES REQUIRES PURCHASE OF A LICENSE │
│          See LICENSE.PyRosetta.md or email license@uw.edu for details         │
└───────────────────────────────────────────────────────────────────────────────┘
PyRosetta-4 2025 [Rosetta PyRosetta4.Release.python310.m1 2025.47+release.8bb54e8a6dc3e1c027e4f028bdace6bd4691c823 2025-11-20T15:59:05] retrieved from: http://www.pyrosetta.org
core.init: Checking for fconfig files in pwd and ./rosetta/flags
core.init: Rosetta version: PyRosetta4.Release.python310.m1 r417 2025.47+release.8bb54e8a6d 8bb54e8a6d

# 2. 辅助工具类

In [3]:
def get_pose_index(pose, chain, pdb_resnum):
    """将 PDB 编号 (e.g., Chain A, Residue 120) 转换为 Pose 索引"""
    pdb_info = pose.pdb_info()
    pose_index = pdb_info.pdb2pose(chain, int(pdb_resnum))
    if pose_index == 0:
        raise ValueError(f"Residue {chain}:{pdb_resnum} not found in PDB.")
    return pose_index

def get_pdb_label(pose, pose_index):
    """将 Pose 索引反向转换为 PDB 标签"""
    pdb_info = pose.pdb_info()
    chain = pdb_info.chain(pose_index)
    resnum = pdb_info.number(pose_index)
    aa = pose.residue(pose_index).name1()
    return f"{aa}{resnum} (Chain {chain})"

# 辅助函数：处理二硫键
from pyrosetta.rosetta.core.chemical import DISULFIDE
from pyrosetta.rosetta.core.conformation import break_disulfide

def find_disulfide_partner(pose, res_index):
    res = pose.residue(res_index)
    for i in range(1, res.n_current_residue_connections() + 1):
        partner_idx = res.connected_residue_at_resconn(i)
        if partner_idx == 0 or partner_idx == res_index: continue
        if pose.residue(partner_idx).has_variant_type(DISULFIDE):
            return partner_idx
    return None

# 3. 加载与Cartesian Pre-Relax

In [5]:
# 1. 加载 PDB
input_pdb = "bhpmeh_ns.pdb" 
# 如果没有文件，解开下面注释下载测试
# pyrosetta.toolbox.rcsb.download("1UBQ")
# input_pdb = "1UBQ_clean.pdb"

pose = pose_from_pdb(input_pdb)

# --- 关键变更 2: 使用 Cartesian 打分函数 ---
# ref2015_cart 包含了对键长、键角偏差的惩罚项
scorefxn_cart = create_score_function("ref2015_cart")

print(f"Original Energy (ref2015_cart): {scorefxn_cart(pose):.2f} REU")

# 检测是否存在WT_cartesian_relaxed.pdb,如果存在则使用WT_cartesian_relaxed.pdb作为后续突变的基准，如果不存在则进行FastRelax
import os
if os.path.exists("WT_cartesian_relaxed.pdb"):
    print("WT_cartesian_relaxed.pdb found, using it as the base pose for Cartesian mutations.")
    pose = pose_from_pdb("WT_cartesian_relaxed.pdb")
else:
    print("WT_cartesian_relaxed.pdb not found, running FastRelax to generate it.")
    relax = FastRelax()
    relax.set_scorefxn(scorefxn_cart)
    relax.cartesian(True) # <--- 开启笛卡尔模式的灵魂开关
    relax.apply(pose)
    
# 2. 执行 Cartesian FastRelax
# 传统的 Relax 只动二面角，这里我们开启 cartesian 模式，允许键长键角微调
# 注意：这通常比普通 Relax 慢 2-3 倍，但对于 ddG 计算至关重要
print("Running Cartesian Pre-Relax (This allows backbone flexing)...")



wt_score = scorefxn_cart(pose)
print(f"Relaxed WT Energy: {wt_score:.2f} REU")

pose.dump_pdb("WT_cartesian_relaxed.pdb")

core.import_pose.import_pose: File 'bhpmeh_ns.pdb' automatically determined to be of type PDB from contents.
core.conformation.Conformation: Found disulfide between residues 54 89
core.conformation.Conformation: Found disulfide between residues 239 250
core.conformation.Conformation: Found disulfide between residues 363 446
core.energy_methods.CartesianBondedEnergy: Creating new peptide-bonded energy container (459)
Original Energy (ref2015_cart): 195.90 REU
WT_cartesian_relaxed.pdb found, using it as the base pose for Cartesian mutations.
core.import_pose.import_pose: File 'WT_cartesian_relaxed.pdb' automatically determined to be of type PDB from contents.
core.conformation.Conformation: Found disulfide between residues 54 89
core.conformation.Conformation: Found disulfide between residues 239 250
core.conformation.Conformation: Found disulfide between residues 363 446
Running Cartesian Pre-Relax (This allows backbone flexing)...
core.energy_methods.CartesianBondedEnergy: Creating new

True

# 4. 核心算法: Cartesian ddG (Mutate - FlexBB - Minimize)

In [6]:
def calculate_ddg_cartesian(wt_pose, chain_id, pdb_resnum, target_aa_one_letter, packing_radius=8.0):
    """
    Cartesian ddG 版本：
    允许突变位点周围的骨架 (Backbone) 和侧链 (Sidechain) 在 XYZ 空间自由移动，
    以缓解空间位阻。
    """
    # 1. 准备工作
    mut_pose = wt_pose.clone()
    res_index = get_pose_index(mut_pose, chain_id, pdb_resnum)
    wt_aa_name = mut_pose.residue(res_index).name1()
    
    print(f"--- Processing {wt_aa_name}{pdb_resnum} -> {target_aa_one_letter} (Cartesian Protocol) ---")

    # 2. 处理二硫键 (同原代码)
    if mut_pose.residue(res_index).has_variant_type(DISULFIDE):
        partner_index = find_disulfide_partner(mut_pose, res_index)
        if partner_index:
            print(f"Action: Breaking disulfide bond with {partner_index}...")
            break_disulfide(mut_pose.conformation(), res_index, partner_index)

    # 3. 突变操作 (使用 PackerTask 进行强制突变)
    # 这一步先仅仅把氨基酸换掉，侧链放进去，不做深度优化
    tf = TaskFactory()
    tf.push_back(InitializeFromCommandline())
    tf.push_back(IncludeCurrent())

    # A. 目标位点：强制突变
    target_selector = ResidueIndexSelector(res_index)
    rlt = RestrictAbsentCanonicalAASRLT()
    rlt.aas_to_keep(target_aa_one_letter)
    tf.push_back(OperateOnResidueSubset(rlt, target_selector))

    # B. 邻居位点：暂时只允许 Repack (稍后在 Minimization 阶段动骨架)
    nbr_selector = NeighborhoodResidueSelector(target_selector, packing_radius, True)
    nbr_only_selector = AndResidueSelector(nbr_selector, NotResidueSelector(target_selector))
    tf.push_back(OperateOnResidueSubset(RestrictToRepackingRLT(), nbr_only_selector))
    
    # C. 其他位点：冻结
    not_nbr_selector = NotResidueSelector(nbr_selector)
    tf.push_back(OperateOnResidueSubset(PreventRepackingRLT(), not_nbr_selector))

    # 执行突变和初步侧链摆放
    print(f"Action: Mutating to {target_aa_one_letter}...")
    packer = pyrosetta.rosetta.protocols.minimization_packing.PackRotamersMover(scorefxn_cart)
    packer.task_factory(tf)
    packer.apply(mut_pose)

    # 4. Cartesian Minimization (核心差异)
    # 我们需要构建一个 MoveMap，允许邻居范围内的原子(包括骨架)在 XYZ 空间移动
    
    mm = MoveMap()
    mm.set_bb(False)   # 默认关闭所有骨架
    mm.set_chi(False)  # 默认关闭所有侧链
    
    # 获取邻居的布尔向量
    selection_vector = nbr_selector.apply(mut_pose)
    
    # 遍历开启邻居的自由度
    for i in range(1, mut_pose.total_residue() + 1):
        if selection_vector[i]:
            mm.set_chi(i, True) # 允许侧链动
            mm.set_bb(i, True)  # <--- 关键：允许骨架动 (Backbone Flexibility)
    
    # 设置 MinMover
    min_mover = pyrosetta.rosetta.protocols.minimization_packing.MinMover()
    min_mover.movemap(mm)
    min_mover.score_function(scorefxn_cart)
    min_mover.min_type("lbfgs_armijo_nonmonotone")
    
    # --- 开启 Cartesian 模式 ---
    min_mover.cartesian(True) 
    
    print("Action: Running Cartesian Minimization (Relaxing strain)...")
    min_mover.apply(mut_pose)
    
    # 5. 计算最终分数
    # 再次确认突变是否成功
    actual_new_aa = mut_pose.residue(res_index).name1()
    if actual_new_aa != target_aa_one_letter:
        print("CRITICAL ERROR: Mutation failed.")
        return None

    mut_score = scorefxn_cart(mut_pose)
    ddg = mut_score - wt_score 
    
    return {
        "Mutation": f"{wt_aa_name}{pdb_resnum}{target_aa_one_letter}",
        "ddG (REU)": ddg,
        "Mutant_Pose": mut_pose
    }

# 5. 运行计算 (Cartesian)

In [19]:
# 示例：将 A链 第 20 号残基突变为 丙氨酸 (A)
target_chain = "A"
target_resnum = 239 
target_amino_acid = "C" 

# 注意：Cartesian 计算通常比普通计算慢一点
result = calculate_ddg_cartesian(pose, target_chain, target_resnum, target_amino_acid)

# 打印报表
print("-" * 40)
print(f"Cartesian ddG Result for {result['Mutation']}")
print(f"ddG: {result['ddG (REU)']:.3f} REU")
print("-" * 40)

if result['ddG (REU)'] < -1.0:
    print("结论: Stabilizing (显著稳定)")
elif result['ddG (REU)'] > 1.0:
    print("结论: Destabilizing (显著破坏)")
else:
    print("结论: Neutral (中性)")

--- Processing C239 -> C (Cartesian Protocol) ---
Action: Breaking disulfide bond with 250...
Action: Mutating to C...
core.pack.task: Packer task: initialize from command line()
core.energy_methods.CartesianBondedEnergy: Creating new peptide-bonded energy container (459)
core.pack.pack_rotamers: built 373 rotamers at 11 positions.
core.pack.interaction_graph.interaction_graph_factory: Instantiating DensePDInteractionGraph
Action: Running Cartesian Minimization (Relaxing strain)...
----------------------------------------
Cartesian ddG Result for C239C
ddG: 5.836 REU
----------------------------------------
结论: Destabilizing (显著破坏)


# 6. 交互式可视化 (前后对比)

In [20]:
import py3Dmol
from pyrosetta.rosetta.std import ostringstream

def get_pdb_string(pose):
    oss = ostringstream()
    pose.dump_pdb(oss) 
    return oss.str()

view = py3Dmol.view(width=800, height=400)

# 加载 WT (灰色)
wt_pdb_str = get_pdb_string(pose) 
view.addModel(wt_pdb_str, "pdb")
view.setStyle({'model': 0}, {'cartoon': {'color': 'lightgray', 'opacity': 0.6}})

# 加载 Mutant (蓝色)
if 'Mutant_Pose' in result:
    mutant_pose = result['Mutant_Pose']
    mut_pdb_str = get_pdb_string(mutant_pose)
    view.addModel(mut_pdb_str, "pdb")
    view.setStyle({'model': 1}, {'cartoon': {'color': 'blue'}})

    # 聚焦突变位点
    idx = get_pose_index(pose, target_chain, target_resnum)
    pdb_res_label = str(pose.pdb_info().number(idx))

    # 显示 WT 侧链 (红)
    view.addStyle({'model': 0, 'resi': pdb_res_label, 'chain': target_chain}, 
                  {'stick': {'colorscheme': 'redCarbon'}})

    # 显示 Mutant 侧链 (绿)
    view.addStyle({'model': 1, 'resi': pdb_res_label, 'chain': target_chain}, 
                  {'stick': {'colorscheme': 'greenCarbon'}})

    view.zoomTo({'resi': pdb_res_label, 'chain': target_chain})

    label_text = f"Cartesian Mode: WT:{pose.residue(idx).name1()} -> Mut:{mutant_pose.residue(idx).name1()}"
    view.addLabel(label_text, 
                  {'position': {'x':0, 'y':0, 'z':0}, 'useScreen': True, 'fontColor':'black', 'backgroundColor':'white'})

view.show()

# 饱和突变全自动工作流 (Saturation Mutagenesis Pipeline)

1. 继承 Cartesian ddG 高精度协议
2. 多进程并行计算 (自动调用所有 CPU 核心)
3. 自动生成所有突变的 PDB 结构文件
4. 汇总结果为 CSV 报表

In [31]:
%%writefile my_worker.py
import os
import pyrosetta
from pyrosetta import *
from pyrosetta.rosetta.core.pack.task import TaskFactory
from pyrosetta.rosetta.core.pack.task.operation import (
    IncludeCurrent, InitializeFromCommandline, RestrictToRepacking, 
    PreventRepacking, OperateOnResidueSubset, RestrictAbsentCanonicalAASRLT, 
    RestrictToRepackingRLT, PreventRepackingRLT
)
from pyrosetta.rosetta.core.select.residue_selector import (
    ResidueIndexSelector, NeighborhoodResidueSelector, 
    NotResidueSelector, AndResidueSelector
)
from pyrosetta.rosetta.protocols.minimization_packing import MinMover, PackRotamersMover
from pyrosetta.rosetta.core.chemical import DISULFIDE
from pyrosetta.rosetta.core.conformation import break_disulfide

def _worker_calculate_mutation(args):
    """
    工作进程：执行单个突变计算
    """
    target_chain, target_resnum, target_aa, config, input_pdb_path = args
    
    # ================= 修正核心 =================
    # 强制初始化逻辑：
    # 不要使用复杂的 options 判断，直接尝试 init。
    # 即使之前初始化过，PyRosetta 通常也只是报个 Warning，或者是抛出 RuntimeError。
    # 我们捕获这个错误即可，确保这一步“尽力而为”地通过。
    try:
        # 这里的参数必须和主程序保持一致
        pyrosetta.init("-ex1 -ex2 -use_input_sc -beta_cart -nstruct 1 -mute all")
    except RuntimeError:
        # 如果报 "already initialized"，说明环境是好的，直接跳过
        pass
    except Exception as e:
        # 其他未知错误，打印出来方便调试
        print(f"Worker Init Warning: {e}")
    # ===========================================

    # 准备数据结构
    result_entry = {
        "Chain": target_chain, "ResNum": target_resnum, "Mutant_AA": target_aa,
        "WT_AA": "", "ddG": 0.0, "Status": "Success", "PDB_File": ""
    }

    # 加载 PDB
    # 如果 init 失败，这里依然会爆 RuntimeError，但现在我们大大增加了 init 成功的概率
    try:
        pose = pose_from_pdb(input_pdb_path)
    except Exception as e:
        result_entry["Status"] = f"Error loading PDB: {e}"
        return result_entry

    scorefxn = create_score_function("ref2015_cart")
    wt_score = scorefxn(pose)
    
    try:
        # --- 1. 定位与预处理 ---
        pdb_info = pose.pdb_info()
        res_index = pdb_info.pdb2pose(target_chain, int(target_resnum))
        if res_index == 0:
            raise ValueError(f"Residue {target_chain}:{target_resnum} not found")
            
        wt_aa_name = pose.residue(res_index).name1()
        result_entry["WT_AA"] = wt_aa_name
        
        if wt_aa_name == target_aa:
            result_entry["Status"] = "Skipped (WT)"
            return result_entry

        # 处理二硫键
        if pose.residue(res_index).has_variant_type(DISULFIDE):
            res = pose.residue(res_index)
            curr_conns = res.n_current_residue_connections()
            for i in range(1, curr_conns + 1):
                partner = res.connected_residue_at_resconn(i)
                if partner != 0 and partner != res_index:
                    if pose.residue(partner).has_variant_type(DISULFIDE):
                        break_disulfide(pose.conformation(), res_index, partner)
                        break

        # --- 2. 突变操作 ---
        tf = TaskFactory()
        tf.push_back(InitializeFromCommandline())
        tf.push_back(IncludeCurrent())
        
        target_selector = ResidueIndexSelector(res_index)
        rlt = RestrictAbsentCanonicalAASRLT()
        rlt.aas_to_keep(target_aa)
        tf.push_back(OperateOnResidueSubset(rlt, target_selector))
        
        nbr_selector = NeighborhoodResidueSelector(target_selector, 8.0, True)
        nbr_only = AndResidueSelector(nbr_selector, NotResidueSelector(target_selector))
        tf.push_back(OperateOnResidueSubset(RestrictToRepackingRLT(), nbr_only))
        
        not_nbr = NotResidueSelector(nbr_selector)
        tf.push_back(OperateOnResidueSubset(PreventRepackingRLT(), not_nbr))
        
        packer = PackRotamersMover(scorefxn)
        packer.task_factory(tf)
        packer.apply(pose)
        
        # --- 3. Cartesian Minimization ---
        mm = MoveMap()
        mm.set_bb(False)
        mm.set_chi(False)
        
        selection = nbr_selector.apply(pose)
        for i in range(1, pose.total_residue() + 1):
            if selection[i]:
                mm.set_chi(i, True)
                mm.set_bb(i, True)
        
        min_mover = MinMover()
        min_mover.movemap(mm)
        min_mover.score_function(scorefxn)
        min_mover.min_type("lbfgs_armijo_nonmonotone")
        min_mover.cartesian(True)
        min_mover.apply(pose)
        
        # --- 4. 结算 ---
        mut_score = scorefxn(pose)
        ddg = mut_score - wt_score
        
        result_entry["ddG"] = ddg
        
        if config["SAVE_PDB"]:
            filename = f"{target_chain}_{target_resnum}_{target_aa}.pdb"
            filepath = os.path.join(config["OUTPUT_DIR"], filename)
            pose.dump_pdb(filepath)
            result_entry["PDB_File"] = filepath
            
    except Exception as e:
        result_entry["Status"] = f"Error: {str(e)}"

    return result_entry

Overwriting my_worker.py


In [33]:
import multiprocessing
import pandas as pd
import os
import time
import pyrosetta
# === 关键修改：从文件导入 worker 函数 ===
import my_worker 
from importlib import reload
reload(my_worker) # 防止你修改了文件但 notebook 没更新
# ====================================

# 配置保持不变
CONFIG = {
    "INPUT_PDB": "WT_cartesian_relaxed.pdb", 
    "OUTPUT_DIR": "saturation_output",       
    "TARGETS": [
        {"chain": "A", "resnum": 235},
        {"chain": "A", "resnum": 239}       
    ],
    "N_STRUCT": 1,                           
    "CPU_CORES": max(1, multiprocessing.cpu_count() - 2),
    "SAVE_PDB": True                          
}

def run_saturation_pipeline():
    # ... (前面的检查代码保持不变) ...
    
    # 1. 环境检查
    if not os.path.exists(CONFIG["INPUT_PDB"]):
        print(f"Error: Input file {CONFIG['INPUT_PDB']} not found!")
        return

    if not os.path.exists(CONFIG["OUTPUT_DIR"]):
        os.makedirs(CONFIG["OUTPUT_DIR"])

    # 2. 生成任务
    amino_acids = list("ACDEFGHIKLMNPQRSTVWY")
    tasks = []
    
    for target in CONFIG["TARGETS"]:
        chain = target["chain"]
        resnum = target["resnum"]
        for aa in amino_acids:
            tasks.append((chain, resnum, aa, CONFIG, CONFIG["INPUT_PDB"]))
            
    print(f"Using {CONFIG['CPU_CORES']} CPU cores...")
    
    # 3. 并行执行
    # === 关键修改：调用 my_worker._worker_calculate_mutation ===
    with multiprocessing.Pool(processes=CONFIG["CPU_CORES"]) as pool:
        results = pool.map(my_worker._worker_calculate_mutation, tasks)
    # ========================================================
    
    # 4. 结果汇总
    df = pd.DataFrame(results)
    
    # 简单清洗数据
    df = df[df["Status"] == "Success"]
    csv_filename = os.path.join(CONFIG["OUTPUT_DIR"], "saturation_results.csv")
    df.to_csv(csv_filename, index=False)
    
    print("Done!")
    return df

if __name__ == "__main__":
    if os.path.exists(CONFIG["INPUT_PDB"]):
        df = run_saturation_pipeline()
        print(df.head())

Using 8 CPU cores...
┌───────────────────────────────────────────────────────────────────────────────┐
│                                  PyRosetta-4                                  │
│               Created in JHU by Sergey Lyskov and PyRosetta Team              │
│               (C) Copyright Rosetta Commons Member Institutions               │
│                                                                               │
│ NOTE: USE OF PyRosetta FOR COMMERCIAL PURPOSES REQUIRES PURCHASE OF A LICENSE │
│          See LICENSE.PyRosetta.md or email license@uw.edu for details         │
└───────────────────────────────────────────────────────────────────────────────┘
PyRosetta-4 2025 [Rosetta PyRosetta4.Release.python310.m1 2025.47+release.8bb54e8a6dc3e1c027e4f028bdace6bd4691c823 2025-11-20T15:59:05] retrieved from: http://www.pyrosetta.org
┌───────────────────────────────────────────────────────────────────────────────┐
│                                  PyRosetta-4                  