代码用途：用VAE模型从分子数据生成符合类药特性的分子（含5维度评估+结构图展示）
核心流程：加载数据→预处理→训练模型→生成分子→评估质量→展示结构
    适用环境：PyCharm Jupyter Notebook（必须）+ GPU/CPU
    关键注意事项：
    1. 运行后会自动生成3个文件：临时生成结果、最终有效分子、评估报告
    2. 第一次运行会训练模型，后续运行会加载已保存的模型

#   1、初始化环境+部分函数构建

In [10]:
# 一、核心库导入与基础函数定义（修复 PyTorch 优化器导入）
import selfies as sf
import pandas as pd
import numpy as np
from tqdm import tqdm
from collections import defaultdict
from functools import lru_cache
from multiprocessing.pool import ThreadPool
from multiprocessing import Value
import torch
from torch import nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader, random_split
# 修复：替换 from torch.optim import optim → import torch.optim as optim
import torch.optim as optim
from rdkit import Chem
from rdkit.Chem import Descriptors, Lipinski, Draw
from rdkit.Chem import AllChem
from rdkit.Chem.rdForceFieldHelpers import UFFOptimizeMolecule
import rdkit
import os
import pickle
from IPython.display import display  # Jupyter展示图片必需

# 二、核心工具函数定义（数据处理+性质计算）
### 1. SA分数计算函数
def calculate_SA_score(mol):
    if mol is None:
        return 10.0
    ring_info = mol.GetRingInfo()
    num_rings = ring_info.NumRings()
    num_heavy_atoms = mol.GetNumHeavyAtoms()
    num_bonds = mol.GetNumBonds()
    sa_score = 1.0 + 0.15 * num_rings + 0.05 * num_heavy_atoms - 0.02 * num_bonds
    return min(max(sa_score, 1.0), 10.0)

### 2. 超价原子约束函数
def set_universal_bond_constraints():
    custom_constraints = sf.get_semantic_constraints()
    supervalent_constraints = {
        "S": 4, "P": 5, "Cl": 3, "Br": 5, "I": 6,
        "Se": 4, "Te": 6, "As": 5, "Si": 6, "B": 4
    }
    custom_constraints.update(supervalent_constraints)
    sf.set_semantic_constraints(custom_constraints)
set_universal_bond_constraints()

### 3. 数据过滤函数（有效性基础检查）
def filter_unreasonable_smiles(smiles):
    if not isinstance(smiles, str) or len(smiles) < 3 or len(smiles) > 100:
        return False
    mol = Chem.MolFromSmiles(smiles)
    if not mol:
        return False
    atom_count = len(mol.GetAtoms())
    if atom_count < 3 or atom_count > 50:
        return False
    total_charge = sum(atom.GetFormalCharge() for atom in mol.GetAtoms())
    if abs(total_charge) > 2:
        return False
    return True

### 4. 化学性质计算缓存函数
counter = Value('i', 0)
@lru_cache(maxsize=30000)
def calculate_chemical_properties_cached(smiles):
    global counter
    with counter.get_lock():
        counter.value += 1
        if counter.value % 100000 == 0:
            print(f"已预处理 {counter.value} 个分子...")
    mol = Chem.MolFromSmiles(smiles)
    if not mol:
        return None
    mw = Descriptors.MolWt(mol)
    logp = Descriptors.MolLogP(mol)
    h_donors = Lipinski.NumHDonors(mol)
    h_acceptors = Lipinski.NumHAcceptors(mol)
    sa_score = calculate_SA_score(mol)
    energy = 300
    try:
        mol_h = Chem.AddHs(mol)
        if AllChem.EmbedMolecule(mol_h, maxAttempts=100) == 0:
            mmff_props = AllChem.MMFFGetMoleculeProperties(mol_h)
            if mmff_props:
                ff = AllChem.MMFFGetMoleculeForceField(mol_h, mmff_props)
                ff.Minimize(maxIts=200)
                energy = ff.CalcEnergy()
    except:
        energy = 500
    try:
        selfies = sf.encoder(smiles, strict=False)
        if not selfies:
            return None
    except:
        return None
    return (smiles, selfies, [mw, logp, h_donors, h_acceptors, sa_score, energy])

### 5. 批量数据处理函数
def batch_process_smiles(smiles_list, n_workers=12):
    print(f"使用 {n_workers} 个线程处理 {len(smiles_list)} 条数据...")
    with ThreadPool(processes=n_workers) as pool:
        processed_results = list(tqdm(
            pool.imap_unordered(calculate_chemical_properties_cached, smiles_list),
            total=len(smiles_list),
            desc="预处理SMILES→SELFIES+性质计算",
            mininterval=0.5
        ))
    processed_data = []
    for res in processed_results:
        if res is not None:
            processed_data.append({"smiles": res[0], "selfies": res[1], "properties": res[2]})
    print(f"预处理完成：{len(processed_data)}/{len(smiles_list)} 条数据有效")
    return processed_data


#   2、超参数设定与模型构建

In [11]:
# 三、超参数配置（固定不变）
os.environ["RDKit_SILENT"] = "1"  # 屏蔽RDKit的冗余日志
rdkit.RDLogger.DisableLog('rdApp.*')  # 屏蔽RDKit的警告
TRAIN_SIZE = 100000  # 训练集数据量
VAL_SIZE = 50000  # 验证集数据量
BATCH_SIZE = 32  # 每次喂给模型的样本数（固定32，防内存溢出）
MAX_SELFIES_LEN = 100  # 分子序列最大长度（超过会截断）
FINGERPRINT_NBITS = 256  # 分子指纹维度（评估多样性用，不用改）
FINGERPRINT_RADIUS = 1  # 分子指纹半径（固定1）
N_WORKERS = 12  # 处理数据的线程数（12线程提速）
CACHE_SIZE = 30000  # 性质计算缓存容量（3万条）
PREPROCESS_SAVE_PATH = "preprocess_results.pkl"  # 预处理结果保存路径（下次不用重复算）
model_path = "best_mol_vae.pth"  # 最优模型保存路径
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")  # 自动选GPU（快）/CPU（慢）


# 3、数据预处理与加载

In [12]:
## 1. 加载/保存预处理结果
print("="*50)
print("加载预处理结果（避免重复计算）...")
print("="*50)
if os.path.exists(PREPROCESS_SAVE_PATH):
    with open(PREPROCESS_SAVE_PATH, 'rb') as f:
        (train_processed, val_processed, train_smiles_raw, val_smiles_raw) = pickle.load(f)
    print(f" 成功加载本地预处理结果！")
    print(f"训练集有效数据：{len(train_processed)} 条")
    print(f"验证集有效数据：{len(val_processed)} 条")
else:
    print("本地无预处理缓存，执行第一次预处理...")
    df = pd.read_csv("dataset3/dataset3.csv")
    df.rename(columns={"0": "smiles", "1": "label"}, inplace=True)
    original_smiles = df["smiles"].tolist()
    np.random.seed(42)
    np.random.shuffle(original_smiles)
    train_smiles_raw = original_smiles[:TRAIN_SIZE]
    val_smiles_raw = original_smiles[TRAIN_SIZE:TRAIN_SIZE+VAL_SIZE]

    print("\n预处理训练集...")
    counter.value = 0
    train_processed = batch_process_smiles(train_smiles_raw)
    print("\n预处理验证集...")
    counter.value = 0
    val_processed = batch_process_smiles(val_smiles_raw)

    with open(PREPROCESS_SAVE_PATH, 'wb') as f:
        pickle.dump((train_processed, val_processed, train_smiles_raw, val_smiles_raw), f)
    print(f"\n 预处理完成并保存到本地！下次运行无需重复计算")

# 提取核心数据
train_selfies = [d["selfies"] for d in train_processed]
train_properties = [d["properties"] for d in train_processed]
train_smiles_valid = [d["smiles"] for d in train_processed]
val_selfies = [d["selfies"] for d in val_processed]
val_properties = [d["properties"] for d in val_processed]
val_smiles_valid = [d["smiles"] for d in val_processed]
print(f"\n有效训练数据：{len(train_selfies)} 条")
print(f"有效验证数据：{len(val_selfies)} 条")

### 2. 词汇表构建
def build_selfies_vocab(selfies_list):
    vocab = defaultdict(int)
    special_tokens = ["<PAD>", "<SOS>", "<EOS>"]
    for token in special_tokens:
        vocab[token] = len(vocab)
    for s in tqdm(selfies_list, desc="构建SELFIES词汇表"):
        for char in sf.split_selfies(s):
            if char not in vocab:
                vocab[char] = len(vocab)
    print(f"词汇表大小：{len(vocab)}")
    return vocab

vocab = build_selfies_vocab(train_selfies)
vocab_size = len(vocab)

### 3. SELFIES转序列工具（外部独立函数）
def selfies_to_sequence(selfies, vocab, max_len=MAX_SELFIES_LEN):
    sequence = [vocab["<SOS>"]]
    for char in sf.split_selfies(selfies):
        if char in vocab:
            sequence.append(vocab[char])
    sequence.append(vocab["<EOS>"])
    if len(sequence) < max_len:
        sequence += [vocab["<PAD>"]] * (max_len - len(sequence))
    else:
        sequence = sequence[:max_len]
    return torch.tensor(sequence, dtype=torch.long)

### 4. 数据集类定义（修复 __getitem__ 方法）
class SELFIESDataset(Dataset):
    def __init__(self, selfies_list, properties_list, vocab):
        self.selfies_list = selfies_list
        self.properties_list = properties_list
        self.vocab = vocab
        self.props_mean = torch.tensor(np.mean(properties_list, axis=0), dtype=torch.float32)
        self.props_std = torch.tensor(np.std(properties_list, axis=0) + 1e-6, dtype=torch.float32)

    def __len__(self):
        return len(self.selfies_list)

    def __getitem__(self, idx):
        selfies = self.selfies_list[idx]
        # 修复：去掉 self. ，直接调用外部函数 selfies_to_sequence
        sequence = selfies_to_sequence(selfies, self.vocab)
        input_seq = sequence[:-1]
        target_seq = sequence[1:]
        props = torch.tensor(self.properties_list[idx], dtype=torch.float32)
        props_norm = (props - self.props_mean) / self.props_std
        return input_seq, target_seq, props_norm

### 5. DataLoader构建（避免卡住）
# 构建数据集
train_dataset = SELFIESDataset(train_selfies, train_properties, vocab)
val_dataset = SELFIESDataset(val_selfies, val_properties, vocab)

# 加载指纹（若已保存）
try:
    train_fingerprints = torch.load("train_fingerprints_50k.pt")
    print(f"\n✅ 指纹加载完成：形状={train_fingerprints.shape}")
except:
    print("\n⚠️  未找到指纹文件，跳过指纹加载")

# 修复DataLoader（单进程）
train_loader = DataLoader(
    train_dataset,
    batch_size=BATCH_SIZE,
    shuffle=False,
    num_workers=0,
    drop_last=False
)
val_loader = DataLoader(
    val_dataset,
    batch_size=BATCH_SIZE,
    shuffle=False,
    num_workers=0,
    drop_last=False
)
print(f"训练集批次数量：{len(train_loader)}")
print(f"验证集批次数量：{len(val_loader)}")

### 6. 批次加载测试
print("\n" + "="*50)
print("测试加载第一个批次...")
print("="*50)
for batch in train_loader:
    input_seq, target_seq, props_norm = batch
    print(f"✅ 批次加载成功！")
    print(f"  - 输入序列形状：{input_seq.shape}")
    print(f"  - 目标序列形状：{target_seq.shape}")
    print(f"  - 性质标签形状：{props_norm.shape}")
    break

# 输出关键信息
print("\n" + "="*50)
print("Baseline数据准备完成！关键信息：")
print(f"- 词汇表大小：{vocab_size}")
print(f"- 训练集批次：{len(train_loader)}（batch_size={BATCH_SIZE}）")
print(f"- 分子性质维度：{len(train_properties[0])}")
print(f"- 分子指纹维度：{FINGERPRINT_NBITS}")
print("="*50)


加载预处理结果（避免重复计算）...
本地无预处理缓存，执行第一次预处理...

预处理训练集...
使用 12 个线程处理 100000 条数据...


预处理SMILES→SELFIES+性质计算:   0%|          | 486/100000 [00:08<23:28, 70.63it/s][16:16:57] UFFTYPER: Unrecognized atom type: Se2+2 (20)
预处理SMILES→SELFIES+性质计算:   4%|▍         | 3789/100000 [01:12<30:38, 52.34it/s]


KeyboardInterrupt: 

# 4、VAE模型定义
由编码器+解码器组成，负责“学习分子规律+生成新分子”

In [None]:
### 1. 编码器
class VAEEncoder(nn.Module):
    def __init__(self, vocab_size, embedding_dim=128, hidden_dim=256, latent_dim=64, max_len=MAX_SELFIES_LEN):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, embedding_dim, padding_idx=vocab["<PAD>"])
        self.lstm = nn.LSTM(embedding_dim, hidden_dim, batch_first=True, bidirectional=True)
        self.fc_mu = nn.Linear(hidden_dim * 2, latent_dim)
        self.fc_logvar = nn.Linear(hidden_dim * 2, latent_dim)

    def forward(self, x):
        embed = self.embedding(x)
        lstm_out, (hidden, _) = self.lstm(embed)
        hidden = torch.cat([hidden[0], hidden[1]], dim=1)
        mu = self.fc_mu(hidden)
        logvar = self.fc_logvar(hidden)
        return mu, logvar

### 2. 解码器
class VAEDecoder(nn.Module):
    def __init__(self, vocab_size, embedding_dim=128, hidden_dim=256, latent_dim=64, max_len=MAX_SELFIES_LEN):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, embedding_dim, padding_idx=vocab["<PAD>"])
        self.lstm = nn.LSTM(embedding_dim + latent_dim, hidden_dim, batch_first=True)
        self.fc_out = nn.Linear(hidden_dim, vocab_size)
        self.max_len = max_len
        self.vocab_size = vocab_size
        self.SOS_idx = vocab["<SOS>"]
        self.EOS_idx = vocab["<EOS>"]
        self.PAD_idx = vocab["<PAD>"]

    def forward(self, z, target_seq=None):
        if target_seq is not None:
            batch_size = target_seq.shape[0]
            embed = self.embedding(target_seq[:, :-1])
            z_repeat = z.unsqueeze(1).repeat(1, embed.shape[1], 1)
            lstm_in = torch.cat([embed, z_repeat], dim=-1)
            lstm_out, _ = self.lstm(lstm_in)
            logits = self.fc_out(lstm_out)
            return logits

        batch_size = z.shape[0]
        generated_seq = torch.full((batch_size, 1), self.SOS_idx, dtype=torch.long, device=z.device)
        hidden = None
        for _ in range(self.max_len - 1):
            embed = self.embedding(generated_seq)
            z_repeat = z.unsqueeze(1).repeat(1, embed.shape[1], 1)
            lstm_in = torch.cat([embed, z_repeat], dim=-1)
            lstm_out, hidden = self.lstm(lstm_in, hidden)
            logits = self.fc_out(lstm_out[:, -1:, :])
            next_token = torch.argmax(logits, dim=-1)
            generated_seq = torch.cat([generated_seq, next_token], dim=1)
            if (next_token == self.EOS_idx).all():
                break
        return generated_seq

### 3. 完整VAE模型（含生成方法）
# 兜底词汇表（防止未定义）
if 'vocab' not in locals():
    vocab = defaultdict(int, {"<PAD>": 0, "<SOS>": 1, "<EOS>": 2})
    print(" 兜底构建基础词汇表，建议使用预处理生成的完整 vocab！")

class MolVAE(nn.Module):
    def __init__(self, vocab_size, embedding_dim=128, hidden_dim=256, latent_dim=64, max_seq_len=99):
        super(MolVAE, self).__init__()
        self.vocab_size = vocab_size
        self.embedding_dim = embedding_dim
        self.hidden_dim = hidden_dim
        self.latent_dim = latent_dim
        self.max_seq_len = max_seq_len
        self.vocab = vocab

        # 嵌入层
        self.embedding = nn.Embedding(vocab_size, embedding_dim, padding_idx=vocab["<PAD>"])
        # 编码器（双向LSTM）
        self.encoder_lstm = nn.LSTM(
            input_size=embedding_dim,
            hidden_size=hidden_dim,
            num_layers=2,
            bidirectional=True,
            batch_first=True,
            dropout=0.1
        )
        self.fc_mu = nn.Linear(2 * hidden_dim, latent_dim)
        self.fc_logvar = nn.Linear(2 * hidden_dim, latent_dim)
        # 解码器（单向LSTM）
        self.fc_latent = nn.Linear(latent_dim, hidden_dim)
        self.decoder_lstm = nn.LSTM(
            input_size=embedding_dim,
            hidden_size=hidden_dim,
            num_layers=2,
            bidirectional=False,
            batch_first=True,
            dropout=0.1
        )
        self.fc_out = nn.Linear(hidden_dim, vocab_size)

    def reparameterize(self, mu, logvar):
        std = torch.exp(0.5 * logvar)
        eps = torch.randn_like(std)
        return mu + eps * std

    def encode(self, x):
        embed = self.embedding(x)
        lstm_out, (hidden, _) = self.encoder_lstm(embed)
        hidden = torch.cat([hidden[-2], hidden[-1]], dim=1)
        mu = self.fc_mu(hidden)
        logvar = self.fc_logvar(hidden)
        return mu, logvar

    def decode(self, z):
        batch_size = z.shape[0]
        hidden = self.fc_latent(z)
        hidden = torch.stack([hidden, hidden], dim=0)
        cell = torch.zeros_like(hidden)
        sos_token = torch.tensor([self.vocab["<SOS>"]] * batch_size, device=z.device).unsqueeze(1)
        current_embed = self.embedding(sos_token)
        outputs = []
        for _ in range(self.max_seq_len):
            lstm_out, (hidden, cell) = self.decoder_lstm(current_embed, (hidden, cell))
            logit = self.fc_out(lstm_out)
            outputs.append(logit)
            next_token = logit.argmax(dim=-1)
            current_embed = self.embedding(next_token)
        logits = torch.cat(outputs, dim=1)
        return logits

    def forward(self, x):
        mu, logvar = self.encode(x)
        z = self.reparameterize(mu, logvar)
        logits = self.decode(z)
        return logits, mu, logvar

    def generate(self, num_samples, device="cuda"):
        self.eval()
        with torch.no_grad():
            z = torch.randn(num_samples, self.latent_dim, device=device)
            logits = self.decode(z)
            generated_seqs = logits.argmax(dim=-1)
        return generated_seqs


# 5、模型训练与加载

In [None]:
# 检查必要组件
if 'train_loader' not in locals() or 'val_loader' not in locals():
    raise ValueError("请先运行「预处理+DataLoader构建」的代码，确保 train_loader/val_loader 已定义！")

# 加载或训练模型
if os.path.exists(model_path):
    print(f" 加载已保存的最优模型：{model_path}")
    vae_model = MolVAE(vocab_size=len(vocab), max_seq_len=99)
    vae_model.load_state_dict(torch.load(model_path, map_location=device))
    vae_model.to(device)
    trained_model = vae_model
    print("模型加载完成，可直接生成分子！")
else:
    print(f" 未找到保存的模型，开始训练...")
    def train_vae(model, train_loader, val_loader, epochs=2, lr=1e-3, device="cuda"):
        model.to(device)
        criterion = nn.CrossEntropyLoss(ignore_index=vocab["<PAD>"])
        # 优化器调用适配修复后的导入（无需修改）
        optimizer = optim.Adam(model.parameters(), lr=lr)
        scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, patience=3, factor=0.5)
        best_val_loss = float("inf")
        early_stop_patience = 5
        early_stop_counter = 0

        for epoch in range(epochs):
            # 训练阶段
            model.train()
            train_loss = 0.0
            for batch in tqdm(train_loader, desc=f"Epoch {epoch+1}/{epochs} (Train)"):
                input_seq, target_seq, _ = batch
                input_seq, target_seq = input_seq.to(device), target_seq.to(device)
                optimizer.zero_grad()
                logits, mu, logvar = model(input_seq)
                assert logits.shape[1] == target_seq.shape[1], f"长度不匹配：{logits.shape[1]} vs {target_seq.shape[1]}"
                recon_loss = criterion(logits.reshape(-1, logits.shape[-1]), target_seq.reshape(-1))
                kl_loss = -0.5 * torch.sum(1 + logvar - mu.pow(2) - logvar.exp()) / input_seq.shape[0]
                total_loss = recon_loss + 0.01 * kl_loss
                total_loss.backward()
                torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
                optimizer.step()
                train_loss += total_loss.item()
            train_loss /= len(train_loader)

            # 验证阶段
            model.eval()
            val_loss = 0.0
            with torch.no_grad():
                for batch in tqdm(val_loader, desc=f"Epoch {epoch+1}/{epochs} (Val)"):
                    input_seq, target_seq, _ = batch
                    input_seq, target_seq = input_seq.to(device), target_seq.to(device)
                    logits, mu, logvar = model(input_seq)
                    recon_loss = criterion(logits.reshape(-1, logits.shape[-1]), target_seq.reshape(-1))
                    kl_loss = -0.5 * torch.sum(1 + logvar - mu.pow(2) - logvar.exp()) / input_seq.shape[0]
                    total_loss = recon_loss + 0.01 * kl_loss
                    val_loss += total_loss.item()
            val_loss /= len(val_loader)
            scheduler.step(val_loss)

            # 日志输出
            print(f"\nEpoch {epoch+1} | Train Loss: {train_loss:.4f} | Val Loss: {val_loss:.4f}")

            # 早停和模型保存
            if val_loss < best_val_loss:
                best_val_loss = val_loss
                early_stop_counter = 0
                torch.save(model.state_dict(), model_path)
                print(f"  → 保存最优模型（Val Loss: {best_val_loss:.4f}）")
            else:
                early_stop_counter += 1
                print(f"  → 早停计数器：{early_stop_counter}/{early_stop_patience}")
                if early_stop_counter >= early_stop_patience:
                    print("  → 早停触发，训练结束！")
                    break

        # 加载最优模型
        model.load_state_dict(torch.load(model_path, map_location=device))
        return model

    # 启动训练
    vae_model = MolVAE(vocab_size=len(vocab), max_seq_len=99)
    trained_model = train_vae(vae_model, train_loader, val_loader, device=device)
    print(f" 训练完成！最优模型已保存为 {model_path}")


# 6、分子生成函数（去重以及生成变量供后续评估使用）

In [None]:
def generate_molecules(model, num_samples=100, device="cuda"):
    model.eval()
    generated_smiles_before_dedup = []  # 去重前的结果（用于唯一性计算）
    generated_smiles_after_dedup = []
    idx_to_char = {v: k for k, v in vocab.items()}

    with torch.no_grad():
        batch_size = 64
        for i in range(0, num_samples, batch_size):
            curr_batch_size = min(batch_size, num_samples - i)
            generated_seqs = model.generate(curr_batch_size, device)

            for seq in generated_seqs.cpu().numpy():
                valid_idx = (seq != vocab["<SOS>"]) & (seq != vocab["<EOS>"]) & (seq != vocab["<PAD>"])
                seq = seq[valid_idx]
                if len(seq) == 0 or len(seq) > 100:
                    continue
                try:
                    selfies_chars = [idx_to_char[idx] for idx in seq]
                    selfies = "".join(selfies_chars)
                    smiles = sf.decoder(selfies)
                    if smiles and filter_unreasonable_smiles(smiles):
                        generated_smiles_before_dedup.append(smiles)  # 去重前保存
                except:
                    continue

    # 去重
    generated_smiles_after_dedup = list(set(generated_smiles_before_dedup))
    print(f"生成分子完成：")
    print(f"  - 目标生成数量：{num_samples}")
    print(f"  - 去重前有效分子：{len(generated_smiles_before_dedup)} 条")
    print(f"  - 去重后有效分子：{len(generated_smiles_after_dedup)} 条")
    print(f"  - 初始有效率：{len(generated_smiles_before_dedup)/num_samples*100:.1f}%")

    # 保存结果
    temp_df = pd.DataFrame({"Generated_SMILES": generated_smiles_after_dedup})
    temp_df.to_csv("temp_generated_smiles.csv", index=False)
    print(f"  - 生成结果已保存至：temp_generated_smiles.csv")

    # 返回去重前后结果（关键：给后续评估提供变量）
    return generated_smiles_before_dedup, generated_smiles_after_dedup

# 执行生成（必须先运行这一步，才能生成评估所需的变量）
generated_smiles_before_dedup, generated_smiles_after_dedup = generate_molecules(trained_model, num_samples=1000, device=device)


# 7、分子评估函数（合理性+新颖性+多样性+唯一性+有效性）

In [None]:
### 1. 指纹计算函数
def calculate_fingerprint(smiles):
    mol = Chem.MolFromSmiles(smiles)
    if mol is None:
        return None
    sparse_fp = AllChem.GetHashedMorganFingerprint(
        mol, radius=FINGERPRINT_RADIUS, nBits=FINGERPRINT_NBITS
    )
    dense_fp = np.zeros(FINGERPRINT_NBITS, dtype=np.int32)
    for idx, count in sparse_fp.GetNonzeroElements().items():
        dense_fp[idx] = count
    return dense_fp

### 2. 化学合理性评估（有效性核心）
def evaluate_chemical_validity(smiles_list):
    valid_count = 0
    valid_smiles = []
    for smiles in tqdm(smiles_list, desc="评估化学合理性"):
        mol = Chem.MolFromSmiles(smiles)
        if mol is None:
            continue
        # 验证无超价原子
        has_supervalent = False
        atom_constraints = {
            "S": 4, "P": 5, "Cl": 3, "Br": 5, "I": 6,
            "Se": 4, "Te": 6, "As": 5, "Si": 6, "B": 4
        }
        for atom in mol.GetAtoms():
            symbol = atom.GetSymbol()
            valence = atom.GetExplicitValence()
            if symbol in atom_constraints and valence > atom_constraints[symbol]:
                has_supervalent = True
                break
        if not has_supervalent:
            valid_count += 1
            valid_smiles.append(smiles)
    validity = valid_count / len(smiles_list) * 100 if len(smiles_list) > 0 else 0.0
    print(f"\n1. 化学合理性：{validity:.1f}%（{valid_count}/{len(smiles_list)} 个有效）")
    return validity, valid_smiles

### 3. 新颖性评估
def evaluate_novelty(generated_smiles, train_smiles, val_smiles, threshold=0.8):
    all_known_smiles = train_smiles + val_smiles
    known_fps = []
    for smiles in tqdm(all_known_smiles[:10000], desc="计算已知分子指纹（抽样1万条加速）"):
        fp = calculate_fingerprint(smiles)
        if fp is not None:
            known_fps.append(fp)
    known_fps = np.array(known_fps)
    if len(known_fps) == 0:
        print("警告：已知分子无有效指纹，新颖性默认为100%")
        return 100.0, generated_smiles

    novel_count = 0
    novel_smiles = []
    for smiles in tqdm(generated_smiles, desc="评估新颖性"):
        fp = calculate_fingerprint(smiles)
        if fp is None:
            continue
        max_similarity = 0.0
        for known_fp in known_fps:
            intersection = np.sum(np.logical_and(fp, known_fp))
            union = np.sum(np.logical_or(fp, known_fp))
            similarity = intersection / union if union != 0 else 0.0
            if similarity > max_similarity:
                max_similarity = similarity
                if max_similarity >= threshold:
                    break
        if max_similarity < threshold:
            novel_count += 1
            novel_smiles.append(smiles)

    novelty = novel_count / len(generated_smiles) * 100 if len(generated_smiles) > 0 else 0.0
    print(f"2. 新颖性：{novelty:.1f}%（{novel_count}/{len(generated_smiles)} 个新颖）")
    return novelty, novel_smiles

### 4. 多样性评估
def evaluate_diversity(generated_smiles, threshold=0.7):
    generated_fps = []
    for smiles in tqdm(generated_smiles, desc="计算生成分子指纹"):
        fp = calculate_fingerprint(smiles)
        if fp is not None:
            generated_fps.append(fp)
    generated_fps = np.array(generated_fps)

    if len(generated_fps) < 2:
        print("3. 多样性：0.0%（生成分子数量不足或无有效指纹）")
        return 0.0

    similarities = []
    n = len(generated_fps)
    for i in range(n):
        for j in range(i+1, n):
            intersection = np.sum(np.logical_and(generated_fps[i], generated_fps[j]))
            union = np.sum(np.logical_or(generated_fps[i], generated_fps[j]))
            sim = intersection / union if union != 0 else 0.0
            similarities.append(sim)

    avg_similarity = np.mean(similarities)
    diversity = (1 - avg_similarity) * 100
    print(f"3. 多样性：{diversity:.1f}%（平均相似度：{avg_similarity:.3f}）")
    return diversity
### 唯一性评判（新增）
def evaluate_uniqueness(generated_smiles_before_dedup, generated_smiles_after_dedup):
    total_valid_before_dedup = len(generated_smiles_before_dedup)
    total_unique_after_dedup = len(generated_smiles_after_dedup)

    if total_valid_before_dedup == 0:
        uniqueness = 0.0
    else:
        uniqueness = (total_unique_after_dedup / total_valid_before_dedup) * 100

    print(f"4. 唯一性：{uniqueness:.1f}%")
    print(f"  - 去重前有效分子数：{total_valid_before_dedup}")
    print(f"  - 去重后唯一分子数：{total_unique_after_dedup}")
    print(f"  - 重复分子数：{total_valid_before_dedup - total_unique_after_dedup}")
    return uniqueness

### 有效性补充评估（Lipinski规则符合性，新增：修复非法字符）
def evaluate_lipinski_compliance(smiles_list):
    """评估分子是否符合Lipinski四规则（类药性质有效性）"""
    compliant_count = 0
    compliant_smiles = []
    for smiles in tqdm(smiles_list, desc="评估Lipinski规则符合性"):
        mol = Chem.MolFromSmiles(smiles)
        if not mol:
            continue
        # Lipinski四规则
        mw = Descriptors.MolWt(mol)
        logp = Descriptors.MolLogP(mol)
        h_donors = Lipinski.NumHDonors(mol)
        h_acceptors = Lipinski.NumHAcceptors(mol)

        # 满足≥3条规则即视为符合（修复：≤→<=，≥→>=）
        rules_met = 0
        if mw < 500: rules_met += 1
        if logp < 5: rules_met += 1
        if h_donors <= 5: rules_met += 1  # 已修复：≤→<=
        if h_acceptors <= 10: rules_met += 1  # 已修复：≤→<=

        if rules_met >= 3:  # 已修复：≥→>=
            compliant_count += 1
            compliant_smiles.append(smiles)

    compliance_rate = compliant_count / len(smiles_list) * 100 if len(smiles_list) > 0 else 0.0
    print(f"5. Lipinski规则符合性（类药有效性）：{compliance_rate:.1f}%（{compliant_count}/{len(smiles_list)} 个符合）")
    return compliance_rate, compliant_smiles


# 8、执行完整评估

In [None]:
# 加载生成分子（双重保险：从文件加载，避免变量丢失）
temp_df = pd.read_csv("temp_generated_smiles.csv")
generated_smiles = temp_df["Generated_SMILES"].tolist()
print(f"加载生成分子：共 {len(generated_smiles)} 条（去重后）")

# 确保训练/验证集有效分子已加载
try:
    print(f"训练集有效分子：{len(train_smiles_valid)} 条 | 验证集有效分子：{len(val_smiles_valid)} 条")
except NameError:
    df = pd.read_csv("dataset3/dataset3.csv")
    df.rename(columns={"0": "smiles", "1": "label"}, inplace=True)
    np.random.seed(42)
    original_smiles = df["smiles"].tolist()
    np.random.shuffle(original_smiles)

    train_smiles_raw = original_smiles[:TRAIN_SIZE]
    val_smiles_raw = original_smiles[TRAIN_SIZE:TRAIN_SIZE+VAL_SIZE]
    train_smiles_valid = [s for s in train_smiles_raw if filter_unreasonable_smiles(s)]
    val_smiles_valid = [s for s in val_smiles_raw if filter_unreasonable_smiles(s)]
    print(f"重新加载有效分子：训练集 {len(train_smiles_valid)} 条 | 验证集 {len(val_smiles_valid)} 条")

# 执行评估（此时 generated_smiles_before_dedup 已在第七部分定义）
print("\n" + "="*60)
print("开始五维度评估...")
print("="*60)

# 1. 化学合理性（基础有效性）
validity, valid_generated = evaluate_chemical_validity(generated_smiles)

# 2. 新颖性
novelty, novel_generated = evaluate_novelty(valid_generated, train_smiles_valid, val_smiles_valid)

# 3. 多样性
diversity = evaluate_diversity(novel_generated)

# 4. 唯一性（新增：变量已定义）
uniqueness = evaluate_uniqueness(generated_smiles_before_dedup, generated_smiles_after_dedup)

# 5. Lipinski规则符合性（类药有效性，新增）
lipinski_compliance, lipinski_compliant_smiles = evaluate_lipinski_compliance(novel_generated)

# 输出最终报告
print("\n" + "="*60)
print("生成分子五维度评估报告")
print("="*60)
print(f"1. 化学合理性（基础有效性）：{validity:.1f}%（合格线≥60%，理想线≥75%）")
print(f"2. 新颖性：{novelty:.1f}%（合格线≥50%，理想线≥70%）")
print(f"3. 多样性：{diversity:.1f}%（合格线≥40%，理想线≥60%）")
print(f"4. 唯一性：{uniqueness:.1f}%（合格线≥80%，理想线≥95%）")
print(f"5. Lipinski规则符合性（类药有效性）：{lipinski_compliance:.1f}%（合格线≥60%，理想线≥80%）")
print("="*60)

# 保存最终有效+新颖+类药分子
final_df = pd.DataFrame({
    "Generated_SMILES": lipinski_compliant_smiles,
    "Evaluation_Result": "Valid_Novel_Lipinski_Compliant"
})
final_df.to_csv("final_generated_molecules.csv", index=False)
print(f"\n评估完成！最终有效分子保存为 final_generated_molecules.csv（共 {len(lipinski_compliant_smiles)} 条）")


# 9、分子绘制（PyCharm Jupyter Notebook分子结构图展示）

In [None]:
def display_molecule_structures(smiles_list, num_display=5):
    """
    在PyCharm Jupyter Notebook中展示分子结构图
    :param smiles_list: 有效SMILES列表
    :param num_display: 展示的分子数量
    """
    # 筛选有效SMILES（防止绘制失败）
    valid_smiles_for_display = []
    for smiles in smiles_list[:num_display]:
        mol = Chem.MolFromSmiles(smiles)
        if mol:
            valid_smiles_for_display.append(smiles)

    if not valid_smiles_for_display:
        print("无有效分子可展示")
        return

    # 绘制分子结构（支持批量展示）
    mols = [Chem.MolFromSmiles(smiles) for smiles in valid_smiles_for_display]
    img = Draw.MolsToGridImage(
        mols,
        molsPerRow=2,  # 每行展示2个
        subImgSize=(300, 300),  # 每个分子图大小
        legends=valid_smiles_for_display  # 显示SMILES作为图例
    )

    # 在Jupyter中显示图片
    display(img)
    print(f"\n展示 {len(valid_smiles_for_display)} 个生成分子的结构图（共筛选前 {num_display} 个有效分子）")

# 执行展示（需在PyCharm Jupyter Notebook中运行）
print("\n" + "="*60)
print("PyCharm Jupyter Notebook分子结构图展示")
print("="*60)
display_molecule_structures(lipinski_compliant_smiles, num_display=5)