In [2]:
import dflow
import getpass
import subprocess
from typing import Dict
from typing import List
from pathlib import Path
from dflow.plugins import bohrium
from dflow.plugins.bohrium import TiefblueClient
from dflow import config, s3_config, Step,upload_artifact, Workflow,download_artifact
from dflow.python import OP, OPIO, Artifact, OPIOSign, PythonOPTemplate

config["host"] = "https://workflows.deepmodeling.com"
config["k8s_api_server"] = "https://workflows.deepmodeling.com"
bohrium.config["username"] = getpass.getpass("Bohrium username: ")
bohrium.config["password"] = getpass.getpass("Bohrium password: ")
bohrium.config["project_id"] = getpass.getpass("Bohrium project_id: ")
s3_config["repo_key"] = "oss-bohrium"
s3_config["storage_client"] = TiefblueClient()

Bohrium username: ········
Bohrium password: ········
Bohrium project_id: ········


In [3]:
from dflow.plugins.dispatcher import DispatcherExecutor

dispatcher_executor = DispatcherExecutor(
        machine_dict={
            "batch_type": "Bohrium",
            "context_type": "Bohrium",
            "remote_profile": {
                "input_data": {
                    "job_type": "container",
                    "platform": "ali",
                    "scass_type" : "c8_m16_cpu"
                },
            },
        },
    )

## 定义OP

In [6]:
class grompp(OP):
    # grompp类用于执行GROMACS预处理，创建模拟所需的tpr文件。
    def __init__(self):
        pass

    @classmethod
    def get_input_sign(cls):
        # 定义类方法，返回输入参数签名。输入包括：mdp文件、gro文件、top文件以及附加命令。
        return OPIOSign({"mdp": Artifact(Path),"gro": Artifact(Path),"topol": Artifact(Path),"addition_cmd":str})

    @classmethod
    def get_output_sign(cls):
        # 定义类方法，返回输出参数签名。输出包括：tpr文件。
        return OPIOSign({"tpr": Artifact(Path)})

    @OP.exec_sign_check
    def execute(self, op_in: OPIO) -> OPIO:
        # 根据输入参数，执行grompp命令生成tpr文件。
        
        # 从输入参数中提取信息。
        mdp = str(op_in["mdp"])
        gro = str(op_in["gro"])
        top = str(list(op_in["topol"].glob('**/*.top'))[0].resolve())
        mdp_path = Path(mdp)
        process =  str(mdp_path.parent / mdp_path.stem)
                
        # 构造grompp命令。
        cmd = "gmx grompp -f " + mdp + " -c " + gro + " -r " + gro + " -p " + top + " -o " + process + ".tpr" + " " + op_in["addition_cmd"]
        # 执行grompp命令。
        subprocess.call(cmd, shell=True)
    
        # 生成输出参数。
        op_out = OPIO({
        "tpr":  Path(process + ".tpr")})
        return op_out

In [7]:
class mdrun(OP):
    # mdrun类用于执行GROMACS分子动力学模拟，根据提供的tpr文件生成模拟输出文件。
    def __init__(self):
        pass

    @classmethod
    def get_input_sign(cls):
        # 定义类方法，返回输入参数签名。输入包括：tpr文件以及附加命令。
        return OPIOSign({"mdrun_file": Artifact(Path),"addition_cmd":str})

    @classmethod
    def get_output_sign(cls):
        # 定义类方法，返回输出参数签名。输出包括：各种输出文件，取决于生成tpr时的mdp。
        return OPIOSign({"output_files": Artifact(Dict[str, Path]),"out_info":Artifact(Path)})

    @OP.exec_sign_check
    def execute(self, op_in: OPIO) -> OPIO:
        # 根据输入参数，执行mdrun命令生成模拟输出文件。
        
        # 从输入参数中提取信息。
        tpr_path = op_in["mdrun_file"]
        
        # 获取相对路径。
        cwd = Path.cwd()
        rel_path = tpr_path.relative_to(cwd)
        process  = str(tpr_path.parent / tpr_path.stem)
        
        # 构造mdrun命令。
        cmd = "gmx mdrun -deffnm " + process + " " + op_in['addition_cmd']
        # 执行mdrun命令。
        subprocess.call(cmd, shell=True)
        
        # 定义一个辅助函数，用于生成包含所有输出文件路径的字典。
        def generate_output_files_dict(filename):
            """
            生成一个字典，其中包含与提供的文件名位于同一目录的所有文件(包括子文件夹里的文件）的路径。
            字典的键是文件扩展名（不带文件分隔符），值是文件的路径。

            :param filename: 提供的文件名（可以包括路径）
            :return: 一个包含文件扩展名和相应路径的字典
            """

            # 获取输入文件的父目录。
            tpr_path = Path(filename).parent

            # 获取父目录下的所有文件，并将它们的路径添加到 files_list 中。
            files_list = []
            for path in tpr_path.rglob('*'):
                if path.is_file():
                    files_list.append(path)

            # 生成以文件后缀名为键的字典。
            output_files = {}
            for file_path in files_list:
                output_files[file_path.suffix[1:]] = file_path

            # 将字典写入一个文件以备检查正确性。
            output_file_path = "output_info.txt"
            with open(output_file_path, "w") as output_file:
                output_file.write(f"Output files: {output_files}\n")

            # 返回包含文件后缀名和路径的字典。
            return output_files

        # 注意，提供的需是相对于当前目录的相对路径。使用绝对路径会出现错误。
        output_files = generate_output_files_dict(rel_path)
        
        # 生成输出参数。
        op_out = OPIO({
        "output_files": output_files,"out_info":Path("output_info.txt")})
        return op_out

In [8]:
class gmxtrjconv(OP):
    # gmxtrjconv类用于处理GROMACS分子动力学模拟生成的轨迹文件，例如去除周期性边界条件带来的跳跃现象。
    def __init__(self):
        pass

    @classmethod
    def get_input_sign(cls):
        # 定义类方法，返回输入参数签名。输入包括：xtc轨迹文件。
        return OPIOSign({"in_xtc":Artifact(Path)})

    @classmethod
    def get_output_sign(cls):
        # 定义类方法，返回输出参数签名。输出包括：处理后的xtc轨迹文件。
        return OPIOSign({"out_xtc":Artifact(Path)})

    @OP.exec_sign_check
    def execute(self, op_in: OPIO) -> OPIO:
        # 根据输入参数，执行gmx trjconv命令处理轨迹文件。
        
        # 从输入参数中提取信息。
        xtc_path = op_in["in_xtc"]
              
        # 获取相对路径。
        cwd = Path.cwd()
        rel_path = xtc_path.relative_to(cwd)
        process  = str(xtc_path.parent / xtc_path.stem)
        
        # 构造gmx trjconv命令。
        cmd = "gmx trjconv -f " + process + ".xtc" + " -pbc nojump -o " + process + "_out.xtc"
        # 执行gmx trjconv命令。
        subprocess.call(cmd, shell=True)

        # 生成输出参数。
        op_out = OPIO({
        "out_xtc": Path(process + "_out.xtc")})
        return op_out

In [9]:
class MSD(OP):
    # MSD类用于计算分子动力学模拟轨迹中粒子的均方位移（Mean Squared Displacement，MSD）。
    def __init__(self):
        pass

    @classmethod
    def get_input_sign(cls):
        # 定义类方法，返回输入参数签名。输入包括：tpr文件、xtc轨迹文件、所选原子的选择表达式、计算起始帧和终止帧。
        return OPIOSign({"tpr": Artifact(Path),"xtc": Artifact(Path),"atoms":str,"start":int,"stop":int})

    @classmethod
    def get_output_sign(cls):
        # 定义类方法，返回输出参数签名。输出包括：包含MSD结果的npy文件。
        return OPIOSign({"MSD_npy": Artifact(Path)})

    @OP.exec_sign_check
    def execute(self, op_in: OPIO) -> OPIO:
        # 根据输入参数，使用MDAnalysis库计算MSD。

        # 导入所需库。
        import MDAnalysis as mda
        import MDAnalysis.analysis.msd as msd
        import numpy as np
        
        # 从输入参数中提取信息，并创建Universe对象。
        u = mda.Universe(str(op_in["tpr"]),str(op_in["xtc"]))
        atom_select = op_in["atoms"]

        # 计算MSD。
        MSD = msd.EinsteinMSD(u, select=atom_select, msd_type='xyz', fft=True)
        MSD.run(start=op_in["start"],stop=op_in["stop"])
        msd_result =  MSD.results.timeseries

        # 保存MSD结果为npy文件。
        np.save("MSD.npy",msd_result)
                        
        # 生成输出参数。
        op_out = OPIO({
        "MSD_npy":  Path("MSD.npy")})
        return op_out

In [10]:
class EC(OP):
    # EC类用于计算离子电导率（电解质的电导率）。
    def __init__(self):
        pass

    @classmethod
    def get_input_sign(cls):
        # 定义类方法，返回输入参数签名。输入包括：包含MSD数据的npy文件、时间步长(ns)、温度(K)和离子电荷数。
        return OPIOSign({"npy": Artifact(Path),"timestep":float,"temperature":float,"charge":int})

    @classmethod
    def get_output_sign(cls):
        # 定义类方法，返回输出参数签名。输出包括：离子电导率值。
        return OPIOSign({"EC":float})

    @OP.exec_sign_check
    def execute(self, op_in: OPIO) -> OPIO:
        # 根据输入参数，计算离子电导率。

        # 导入所需库。
        import numpy as np
        from scipy.stats import linregress
        
        # 从输入参数中提取信息，并读取MSD数据。
        MSD = np.load(str(op_in["npy"]))
        temperature = op_in["temperature"]
        charge = op_in['charge']
        
        # 使用线性回归计算MSD与时间的关系。
        linear_model = linregress(np.arange(len(MSD))*op_in["timestep"],MSD)
        slope = linear_model.slope
        error = linear_model.rvalue
        
        # 计算扩散系数。
        diffusion_coefficient = slope * 1/6
        # 计算F*F/RT常数。
        FF_RT = (9.6485*10**4)**2/8.314/temperature  # F:法拉第常数，R:气体常数，T:绝对温度

        # 计算离子电导率。S/m
        conductivity = (charge**2)*(diffusion_coefficient/100*10**(-9))*FF_RT*100
                            
        # 生成输出参数。
        op_out = OPIO({
        "EC":  float(conductivity)})
        return op_out

## workflow

In [13]:
# Step1：执行grompp-mini，获取能量最小化所需的tpr文件。
step1 = Step(
    "grompp-mini",
    PythonOPTemplate(grompp,image="registry.dp.tech/dptech/prod-14962/gromacs-dflow-mda:gromacs-dflow-mda"),
    parameters={"addition_cmd": ""}, 
    artifacts={"mdp": upload_artifact("mini.mdp"),
               "gro": upload_artifact("step3_input.gro"),
               "topol": upload_artifact(["topol.top","toppar"])},
    executor=dispatcher_executor,
)

# Step2：执行模拟mini过程。
step2 = Step(
    "mini",
    PythonOPTemplate(mdrun,image="registry.dp.tech/dptech/prod-14962/gromacs-dflow-mda:gromacs-dflow-mda"),
    parameters={"addition_cmd": ""}, 
    artifacts={"mdrun_file": step1.outputs.artifacts["tpr"]},
    executor=dispatcher_executor,
)

# Step3：执行grompp-npt，获取NPT模拟所需的tpr文件。
step3 = Step(
    "grompp-npt",
    PythonOPTemplate(grompp,image="registry.dp.tech/dptech/prod-14962/gromacs-dflow-mda:gromacs-dflow-mda"),
    parameters={"addition_cmd": "-maxwarn 1"}, 
    artifacts={"mdp": upload_artifact("npt.mdp"),
               "gro": step2.outputs.artifacts["output_files"]["gro"],
               "topol": upload_artifact(["topol.top","toppar"])},
    executor=dispatcher_executor,
)

# Step4：执行NPT模拟。
step4 = Step(
    "npt",
    PythonOPTemplate(mdrun,image="registry.dp.tech/dptech/prod-14962/gromacs-dflow-mda:gromacs-dflow-mda"),
    parameters={"addition_cmd": ""}, 
    artifacts={"mdrun_file":  step3.outputs.artifacts["tpr"]},
    executor=dispatcher_executor,
)

# Step5：执行trjconv操作，去除周期性边界条件带来的跳跃现象。
step5 = Step(
    "trjconv",
    PythonOPTemplate(gmxtrjconv,image="registry.dp.tech/dptech/prod-14962/gromacs-dflow-mda:gromacs-dflow-mda"),
    artifacts={"in_xtc": step4.outputs.artifacts["output_files"]["xtc"]},
    executor=dispatcher_executor,
)

# Step6：计算平均平方位移（MSD）。
step6 = Step(
    "msd",
    PythonOPTemplate(MSD,image="registry.dp.tech/dptech/prod-14962/gromacs-dflow-mda:gromacs-dflow-mda"),
    parameters={"atoms": "name CLA","start":0,"stop":200},
    artifacts={"tpr": step3.outputs.artifacts["tpr"],"xtc": step5.outputs.artifacts["out_xtc"]},
)

# Step7：计算离子电导率。
step7 = Step(
    "ec",
    PythonOPTemplate(EC,image="registry.dp.tech/dptech/prod-14962/gromacs-dflow-mda:gromacs-dflow-mda"),
    parameters={"timestep": 0.004,"temperature":300,"charge":1},
    artifacts={"npy": step6.outputs.artifacts["MSD_npy"]},
)

In [14]:
wf = Workflow(name="md")
wf.add(step1)
wf.add(step2)
wf.add(step3)
wf.add(step4)
wf.add(step5)
wf.add(step6)
wf.add(step7)
wf.submit();

Workflow has been submitted (ID: md-g5zrv, UID: 74b691ea-7312-4b99-ad17-2abf2c83f5d5)
