In [None]:
import os
from pathlib import Path
import shutil
from multiprocessing import Pool, cpu_count
from openbabel import openbabel as ob
import numpy as np
from mofdiff.common.constants import COVALENT_RADII

def has_no_overlapping_atoms(cif_path, threshold=0.85):
    """
    判断给定的 CIF 文件中是否有重叠的原子。如果没有重叠原子则返回 True，否则返回 False。

    :param cif_path: CIF 文件路径
    :param threshold: 判定原子是否重叠的阈值，默认为 0.85
    :return: 没有重叠原子返回 True，有重叠原子返回 False
    """
    obConversion = ob.OBConversion()
    obConversion.SetInFormat("cif")
    mol = ob.OBMol()

    if not obConversion.ReadFile(mol, str(cif_path)):
        print(f"Failed to read {cif_path} file.")
        return False

    fragments = mol.Separate()

    for frag in fragments:
        frag_mol = ob.OBMol(frag)
        other_atoms = []

        for atom in ob.OBMolAtomIter(frag_mol):
            pos = np.array([atom.GetX(), atom.GetY(), atom.GetZ()])
            e1 = atom.GetType()

            for other_atom in other_atoms:
                other_pos = np.array([other_atom.GetX(), other_atom.GetY(), other_atom.GetZ()])
                e2 = other_atom.GetType()

                # 去掉 e1 和 e2 的数字，只保留字母
                e1 = ''.join([i for i in e1 if not i.isdigit()])
                e2 = ''.join([i for i in e2 if not i.isdigit()])

                try:
                    min_threshold = min(COVALENT_RADII[e1], COVALENT_RADII[e2])
                except KeyError as e:
                    continue

                if np.linalg.norm(pos - other_pos) < threshold * min_threshold:
                    return False

            other_atoms.append(atom)

    return True

def process_cif_file(args):
    """处理单个 CIF 文件，检查是否有重叠原子并复制。"""
    cif_path, target_dir, threshold = args
    if has_no_overlapping_atoms(cif_path, threshold):
        target_path = target_dir / cif_path.name
        shutil.copy(cif_path, target_path)
        print(f"Copied: {cif_path} -> {target_path}")
    else:
        print(f"Skipped: {cif_path}")

def main(source_dir, target_dir, threshold=0.85):
    """主函数，遍历源文件夹中的 CIF 文件并复制符合条件的文件。"""
    source_dir = Path(source_dir)
    target_dir = Path(target_dir)

    # 创建目标文件夹
    target_dir.mkdir(parents=True, exist_ok=True)

    # 获取所有 CIF 文件路径
    cif_files = list(source_dir.glob("**/*.cif"))

    # 准备并行处理参数
    tasks = [(cif_file, target_dir, threshold) for cif_file in cif_files]

    # 使用多进程加速
    with Pool(cpu_count()) as pool:
        pool.map(process_cif_file, tasks)

if __name__ == "__main__":
    source_directory = "/data/user2/wty/HOF/MOFDiff/mofdiff/data/mof_models/mof_models/bwdb_hoff/temp_all"
    target_directory = "/data/user2/wty/HOF/MOFDiff/mofdiff/data/mof_models/mof_models/bwdb_hoff/temp_no_overlap"
    threshold_value = 0.9

    main(source_directory, target_directory, threshold_value)

In [None]:
import os
import shutil
import torch
import numpy as np
import re


class HydrogenBondChecker:
    def __init__(self, cifs_path):
        self.cifs_path = cifs_path

    def get_Hbond_lists(self, cif_id):
        donors, hs, acceptors = [], [], []
        lis_path = os.path.join(self.cifs_path, f"{cif_id}.lis")
        if not os.path.exists(lis_path):
            print(f"No LIS file found for CIF ID {cif_id}.")
            return donors, hs, acceptors
        with open(lis_path, 'r') as file:
            content = file.read()
            print("cid", cif_id)
            print("content:", content)
            data_block_match = re.search(r"(Nr Typ Res Donor.*?)(?=\n[A-Z])", content, re.DOTALL | re.MULTILINE)
        if data_block_match:
            data_block = data_block_match.group(0)
            lines = data_block.splitlines()
            for line in lines:
                if "?" in line:
                    continue
                line = re.sub(r'Intra', ' ', line)
                line = re.sub(r'\d\*', '1 ', line)
                line = re.sub(r'_[a-z*]', ' ', line)
                line = re.sub(r'_[0-9*]', ' ', line)
                line = re.sub(r'_', ' ', line)
                line = re.sub(r'>', ' ', line)
                line = re.sub(r'<', ' ', line)
                columns = line.split()
                if len(columns) > 1 and (columns[0].isdigit() or columns[0].startswith('**')) and columns[1].isdigit():
                    donor = re.search(r'[A-Za-z]+\d+[A-Z]*$', columns[2])
                    h = re.search(r'[A-Za-z]+\d+[A-Z]*$', columns[3])
                    acceptor = re.search(r'[A-Za-z]+\d+[A-Z]*$', columns[4])
                    if donor and not donor.group().startswith('C'):
                        donors.append(donor.group())
                        if h:
                            hs.append(h.group())
                        if acceptor:
                            acceptors.append(acceptor.group())

        return donors, hs, acceptors

    @staticmethod
    def read_cif_extract_block(file_path):
        with open(file_path, 'r') as file:
            content = file.read()
        start = content.find('_atom_site_occupancy')
        if start == -1:
            return None, 0
        data_block = content[start:].split('\n')[1:]
        return data_block, len(data_block)

    @staticmethod
    def extract_atom_labels(data_block):
        atom_labels = []
        for line in data_block:
            parts = line.strip().split()
            if len(parts) < 2:
                continue
            atom_labels.append(parts[1])
        return atom_labels

    @staticmethod
    def classify_atoms(atom_labels, donors, hs, acceptors):
        atom_classification = []
        for label in atom_labels:
            if label in donors:
                atom_classification.append(1)
            elif label in hs:
                atom_classification.append(2)
            elif label in acceptors:
                atom_classification.append(3)
            else:
                atom_classification.append(0)
        return atom_classification

    def get_Hbond(self, cif_id):
        donors, hs, acceptors = self.get_Hbond_lists(cif_id)
        file_path = os.path.join(self.cifs_path, f"{cif_id}.cif")
        data_block, _ = self.read_cif_extract_block(file_path)
        if data_block:
            atom_labels = self.extract_atom_labels(data_block)
            atom_classification = self.classify_atoms(atom_labels, donors, hs, acceptors)
            return torch.LongTensor(np.array(atom_classification, dtype=np.int8))
        else:
            return None


def find_and_copy_cif_files(src_folder, dest_folder):
    if not os.path.exists(dest_folder):
        os.makedirs(dest_folder)

    checker = HydrogenBondChecker(src_folder)
    for file in os.listdir(src_folder):
        if file.endswith(".lis"):
            cif_id = os.path.splitext(file)[0]
            hbond_tensor = checker.get_Hbond(cif_id)
            if hbond_tensor is not None and torch.any(hbond_tensor == 1):  # 检查是否存在氢键
                cif_path = os.path.join(src_folder, f"{cif_id}.cif")
                if os.path.exists(cif_path):
                    shutil.copy(cif_path, dest_folder)
                    print(f"Copied {cif_id}.cif to {dest_folder}")
                else:
                    print(f"No CIF file found for LIS ID {cif_id}.")


# 示例用法
src_folder = "/data/user2/wty/HOF/MOFDiff/mofdiff/data/mof_models/mof_models/bwdb_hoff/temp_no_overlap"  # 替换为实际的源文件夹路径
dest_folder = "/data/user2/wty/HOF/MOFDiff/mofdiff/data/mof_models/mof_models/bwdb_hoff/temp_hbond"  # 替换为实际的目标文件夹路径

find_and_copy_cif_files(src_folder, dest_folder)


In [None]:
import os
import shutil
from pathlib import Path
from mofchecker import MOFChecker

def check_mof_validity(cif_path):
    """
    检查 MOF 的有效性。
    参数:
        cif_path (str): MOF 的 CIF 文件路径。
    返回:
        bool: 如果 MOF 有效返回 True，否则返回 False。
    """
    try:
        mofchecker = MOFChecker.from_cif(cif_path)
        result = mofchecker.get_mof_descriptors()
        # 根据指定条件判断 MOF 是否有效
        is_valid = not result["has_atomic_overlaps"] and \
                not result["has_lone_molecule"] and \
                not result["has_overcoordinated_c"] and \
                not result["has_overcoordinated_n"] and \
                not result["has_overcoordinated_h"] and \
                not result["has_undercoordinated_c"] and \
                not result["has_undercoordinated_n"] and \
                not result["has_metal"] 
        return is_valid
    except Exception as e:
        print(f"Error checking MOF {cif_path}: {e}")
        return False

def process_mofs(input_dir, output_dir):
    """
    遍历输入文件夹中的 CIF 文件，检查有效性，并将有效文件复制到目标文件夹。
    参数:
        input_dir (str): 输入文件夹路径，包含 CIF 文件。
        output_dir (str): 输出文件夹路径，用于存储有效的 CIF 文件。
    """
    input_dir = Path(input_dir)
    output_dir = Path(output_dir)
    output_dir.mkdir(parents=True, exist_ok=True)

    cif_files = list(input_dir.glob("*.cif"))

    print(f"Found {len(cif_files)} CIF files to process.")
    
    valid_count = 0
    for cif_file in cif_files:
        print(f"Checking {cif_file}...")
        if check_mof_validity(cif_file):
            shutil.copy(cif_file, output_dir / cif_file.name)
            print(f"Copied valid CIF: {cif_file.name}")
            valid_count += 1
        # else:
            # print(f"Invalid CIF: {cif_file.name}")

    print(f"Processing complete. {valid_count} valid CIF files were copied to {output_dir}.")

if __name__ == "__main__":
    # 替换为实际路径
    input_directory = "/data/user2/wty/HOF/MOFDiff/mofdiff/data/mof_models/mof_models/bwdb_hoff/test_relax"  # CIF 文件的源文件夹
    output_directory = "/data/user2/wty/HOF/MOFDiff/mofdiff/data/mof_models/mof_models/bwdb_hoff/hofchecker"  # 用于存储有效 CIF 文件的目标文件夹

    process_mofs(input_directory, output_directory)


In [None]:
import os
import shutil
from pathlib import Path
from mofchecker import MOFChecker
from concurrent.futures import ProcessPoolExecutor
from tqdm import tqdm


def check_mof_validity(cif_path):
    """
    检查 MOF 的有效性。
    参数:
        cif_path (str): MOF 的 CIF 文件路径。
    返回:
        tuple: (cif_path, is_valid)，路径和有效性标识。
    """
    try:
        mofchecker = MOFChecker.from_cif(cif_path)
        result = mofchecker.get_mof_descriptors()
        # 根据指定条件判断 HOF 是否有效
        is_valid = not result["has_atomic_overlaps"] and \
                   not result["has_metal"]
        return cif_path, is_valid
    except Exception as e:
        print(f"Error checking HOF {cif_path}: {e}")
        return cif_path, False


def process_mofs(input_dir, output_dir, n_workers=4):
    """
    遍历输入文件夹中的 CIF 文件，检查有效性，并将有效文件复制到目标文件夹。
    参数:
        input_dir (str): 输入文件夹路径，包含 CIF 文件。
        output_dir (str): 输出文件夹路径，用于存储有效的 CIF 文件。
        n_workers (int): 并行处理的进程数。
    """
    input_dir = Path(input_dir)
    output_dir = Path(output_dir)
    output_dir.mkdir(parents=True, exist_ok=True)

    cif_files = list(input_dir.glob("*.cif"))
    print(f"Found {len(cif_files)} CIF files to process.")

    valid_count = 0

    # 使用 ProcessPoolExecutor 并行化处理
    with ProcessPoolExecutor(max_workers=n_workers) as executor:
        # 用 tqdm 包裹文件列表，显示进度条
        results = list(
            tqdm(
                executor.map(check_mof_validity, cif_files),
                total=len(cif_files),
                desc="Processing CIF files"
            )
        )

    # 复制有效的文件到目标文件夹
    for cif_path, is_valid in results:
        if is_valid:
            shutil.copy(cif_path, output_dir / cif_path.name)
            valid_count += 1

    print(f"Processing complete. {valid_count} valid CIF files were copied to {output_dir}.")


if __name__ == "__main__":
    # 替换为实际路径
    input_directory = "/data/user2/wty/HOF/MOFDiff/mofdiff/data/mof_models/mof_models/bwdb_hoff/test_relax"  # CIF 文件的源文件夹
    output_directory = "/data/user2/wty/HOF/MOFDiff/mofdiff/data/mof_models/mof_models/bwdb_hoff/hofchecker"  # 用于存储有效 CIF 文件的目标文件夹

    # 调整 n_workers 的值以控制并行进程数
    process_mofs(input_directory, output_directory, n_workers=100)
