In [1]:
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
from torch.optim import Adam
from torch.optim.lr_scheduler import CosineAnnealingLR
from torch.utils.data import DataLoader, TensorDataset
import warnings
warnings.filterwarnings("ignore")


# ---------------------- 1. 回归神经网络定义（与文档S12一致，复用PLD网络结构） ----------------------
class PropertyRegressionNet(nn.Module):
    def __init__(self):
        super(PropertyRegressionNet, self).__init__()
        # 特征提取器：1维（SSA数值）→16维特征（文档S12要求维度）
        self.feature_extractor = nn.Sequential(
            nn.Linear(in_features=1, out_features=16),
            nn.LayerNorm(normalized_shape=16),  # 稳定训练，避免梯度消失
            nn.ReLU()  # 引入非线性，捕捉SSA排序的复杂关系
        )
        # 回归头：16维→1维（适配[0,1]排序标签）
        self.regressor = nn.Sequential(
            nn.Linear(in_features=16, out_features=1),
            nn.Sigmoid()
        )
    
    def forward(self, x):
        feature = self.feature_extractor(x)  # [batch_size, 16]，SSA的16维初步表征
        output = self.regressor(feature)     # [batch_size, 1]，SSA排序标签预测值
        return output, feature


# ---------------------- 2. 数据预处理（SSA专项适配：m²/g单位+数据清洗） ----------------------
def preprocess_ssa_data(excel_path, ssa_col_name="SSA", ssa_unit="m²/g"):
    """
    输入：
        excel_path: SSA数据的Excel文件路径
        ssa_col_name: Excel中SSA列的标题（默认"SSA"）
        ssa_unit: SSA单位（固定为m²/g，文档S11标准）
    输出：训练/验证/测试集DataLoader、全量SSA输入Tensor（用于提取表征）
    """
    # 步骤1：读取Excel文件（支持.xlsx格式，需安装openpyxl）
    try:
        df = pd.read_excel(excel_path, engine="openpyxl")
    except FileNotFoundError:
        raise ValueError(f"Excel文件未找到，请检查路径：{excel_path}")
    except Exception as e:
        raise RuntimeError(f"Excel读取失败：{str(e)}（建议检查文件是否损坏或格式是否为.xlsx）")
    
    # 步骤2：提取SSA列并校验列名
    if ssa_col_name not in df.columns:
        raise ValueError(f"Excel中未找到'{ssa_col_name}'列，请确认列标题与数据匹配（例：'SSA(m²/g)'需对应修改参数）")
    raw_ssa = df[ssa_col_name].values  # 原始SSA数据（单位：m²/g）
    initial_samples = len(raw_ssa)
    print(f"读取Excel完成：初始样本数={initial_samples}，SSA单位={ssa_unit}")

    # 步骤3：SSA专项数据清洗（严格遵循文档S11要求）
    # 3.1 剔除空值（NaN）
    raw_ssa = raw_ssa[~np.isnan(raw_ssa)]
    # 3.2 剔除0值（文档S11：SSA=0的样本无效，无实际比表面积）
    raw_ssa = raw_ssa[raw_ssa != 0]
    # 3.3 剔除异常值（参考CoRE MOF数据库：SSA通常在10~6000 m²/g，超出视为异常）
    raw_ssa = raw_ssa[(raw_ssa >= 10) & (raw_ssa <= 6000)]
    valid_samples = len(raw_ssa)

    # 校验有效样本数
    if valid_samples == 0:
        raise ValueError(f"清洗后无有效SSA样本！请检查Excel数据：\n- 单位是否为{ssa_unit}\n- 数值是否在10~6000{ssa_unit}范围内（正常MOF的SSA范围）")
    print(f"数据清洗完成：有效样本数={valid_samples}（剔除空值/0值/异常值）")

    # 步骤4：生成排序标签（文档S12核心：学习SSA的相对排序而非绝对数值）
    # 排名从1开始，归一化到[0,1]（适配Sigmoid输出范围）
    rank = np.argsort(np.argsort(raw_ssa)) + 1  # 排序后获取每个样本的排名（1~valid_samples）
    rank_norm = (rank - 1) / (valid_samples - 1)  # 归一化公式，确保标签在[0,1]内

    # 步骤5：转换为Tensor并扩展维度（模型输入需为[样本数, 1]）
    ssa_input = torch.tensor(raw_ssa, dtype=torch.float32).unsqueeze(1)  # [valid_samples, 1]
    ssa_label = torch.tensor(rank_norm, dtype=torch.float32).unsqueeze(1)  # [valid_samples, 1]

    # 步骤6：划分训练/验证/测试集（7:2:1，分层抽样确保数据分布一致）
    train_size = int(0.7 * valid_samples)
    val_size = int(0.2 * valid_samples)
    test_size = valid_samples - train_size - val_size

    train_x, val_x, test_x = torch.split(ssa_input, [train_size, val_size, test_size])
    train_y, val_y, test_y = torch.split(ssa_label, [train_size, val_size, test_size])

    # 步骤7：构建DataLoader（文档S12指定batch_size=64）
    train_loader = DataLoader(TensorDataset(train_x, train_y), batch_size=64, shuffle=True)  # 训练集打乱
    val_loader = DataLoader(TensorDataset(val_x, val_y), batch_size=64, shuffle=False)     # 验证集不打乱
    test_loader = DataLoader(TensorDataset(test_x, test_y), batch_size=64, shuffle=False)   # 测试集不打乱

    print(f"数据集划分：训练集={train_size} | 验证集={val_size} | 测试集={test_size}")
    return train_loader, val_loader, test_loader, ssa_input


# ---------------------- 3. 模型训练（参数严格匹配文档S12，SSA与PLD共用训练逻辑） ----------------------
def train_ssa_model(train_loader, val_loader, device, save_model_path):
    # 初始化模型、优化器、损失函数（文档S12参数）
    model = PropertyRegressionNet().to(device)
    criterion = nn.MSELoss()  # 文档指定MSE损失（回归任务，适配排序标签）
    optimizer = Adam(model.parameters(), lr=1e-3)  # 初始学习率1e-3
    scheduler = CosineAnnealingLR(optimizer, T_max=100)  # 余弦退火，100轮训练（S12要求）

    best_val_loss = float('inf')  # 保存验证损失最小的模型（避免过拟合）
    print(f"\n开始SSA模型训练（共100轮，设备={device}，SSA单位=m²/g）")

    # 训练循环
    for epoch in range(100):
        # 训练阶段
        model.train()
        train_loss = 0.0
        for batch_x, batch_y in train_loader:
            batch_x, batch_y = batch_x.to(device), batch_y.to(device)
            optimizer.zero_grad()  # 清零梯度
            pred_y, _ = model(batch_x)  # 仅用预测值计算损失
            loss = criterion(pred_y, batch_y)
            loss.backward()  # 反向传播
            optimizer.step()  # 更新参数
            train_loss += loss.item() * batch_x.size(0)  # 累计批次损失
        avg_train_loss = train_loss / len(train_loader.dataset)  # 平均训练损失

        # 验证阶段（无梯度更新）
        model.eval()
        val_loss = 0.0
        with torch.no_grad():
            for batch_x, batch_y in val_loader:
                batch_x, batch_y = batch_x.to(device), batch_y.to(device)
                pred_y, _ = model(batch_x)
                loss = criterion(pred_y, batch_y)
                val_loss += loss.item() * batch_x.size(0)
        avg_val_loss = val_loss / len(val_loader.dataset)  # 平均验证损失

        # 更新学习率（余弦退火调度）
        scheduler.step()

        # 保存最优模型（基于验证损失）
        if avg_val_loss < best_val_loss:
            best_val_loss = avg_val_loss
            torch.save(model.state_dict(), save_model_path)
            # 每20轮打印进度与模型保存信息
            if (epoch + 1) % 20 == 0:
                print(f"Epoch {epoch+1:3d}/100 | 训练损失：{avg_train_loss:.6f} | 验证损失：{avg_val_loss:.6f} | 保存最优模型")

    # 训练完成后加载最优模型
    model.load_state_dict(torch.load(save_model_path))
    print(f"\nSSA模型训练完成：最优验证损失={best_val_loss:.6f} | 模型保存路径={save_model_path}")
    return model


# ---------------------- 4. 提取SSA的16维初步表征（文档S12核心产出） ----------------------
def extract_ssa_16d_repr(model, full_ssa_input, device, save_repr_path):
    model.eval()  # 切换到评估模式，关闭梯度计算
    full_ssa_input = full_ssa_input.to(device)
    valid_samples = len(full_ssa_input)
    print(f"\n开始提取SSA的16维表征（单位：m²/g，样本数={valid_samples}）")

    # 分批提取（避免GPU内存不足，批次大小=64）
    batch_size = 64
    all_16d_repr = []
    with torch.no_grad():
        for i in range(0, valid_samples, batch_size):
            batch_x = full_ssa_input[i:i+batch_size]
            _, batch_16d = model(batch_x)  # 仅获取feature_extractor输出的16维特征
            all_16d_repr.append(batch_16d.cpu().numpy())  # 转CPU存储，避免GPU内存溢出

    # 合并所有批次的表征
    full_16d_repr = np.concatenate(all_16d_repr, axis=0)
    # 步骤：min-max归一化（文档S12要求，将16维特征标准化到[0,1]，适配后续PGDAE模型）
    min_vals = full_16d_repr.min(axis=0)
    max_vals = full_16d_repr.max(axis=0)
    full_16d_repr_norm = (full_16d_repr - min_vals) / (max_vals - min_vals + 1e-8)  # +1e-8避免除零

    # 保存16维表征（用于S15的PGDAE统一表征拼接）
    np.save(save_repr_path, full_16d_repr_norm)
    print(f"SSA 16维表征提取完成！\n- 表征形状：{full_16d_repr_norm.shape}（样本数×16维）\n- 保存路径：{save_repr_path}\n- 单位：m²/g（已归一化到[0,1]）")
    return full_16d_repr_norm


# ---------------------- 5. 主函数（整合全流程，一键运行SSA表征构建） ----------------------
def main_ssa(excel_path, save_model_path, save_repr_path, ssa_col_name="SSA"):
    # 步骤1：设置计算设备（优先GPU，无GPU则用CPU）
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    print(f"使用计算设备：{device}")

    # 步骤2：SSA数据预处理（Excel读取+清洗）
    train_loader, val_loader, test_loader, full_ssa_input = preprocess_ssa_data(
        excel_path=excel_path,
        ssa_col_name=ssa_col_name,
        ssa_unit="m²/g"  # 固定为文档标准单位，无需修改
    )

    # 步骤3：训练SSA回归模型
    trained_model = train_ssa_model(train_loader, val_loader, device, save_model_path)

    # 步骤4：提取并保存SSA的16维初步表征
    ssa_16d_repr = extract_ssa_16d_repr(trained_model, full_ssa_input, device, save_repr_path)

    return ssa_16d_repr


# ---------------------- 6. 运行入口（需修改为你的实际文件路径） ----------------------
if __name__ == "__main__":
    # 配置文件路径（请根据你的数据存储位置修改！）
    EXCEL_SSA_PATH = "mof_candidate_ssa.xlsx"          # 你的SSA Excel文件路径（单位：m²/g）
    SAVE_MODEL_PATH = "ssa_regression_best.pth"        # SSA模型权重保存路径
    SAVE_REPR_PATH = "ssa_16d_preliminary_repr.npy"    # SSA 16维表征保存路径
    SSA_COL_NAME = "SSA"                               # Excel中SSA列的标题（如为"比表面积(m²/g)"，需修改此参数）

    # 一键运行SSA的16维表征构建全流程
    ssa_16d_repr = main_ssa(
        excel_path=EXCEL_SSA_PATH,
        save_model_path=SAVE_MODEL_PATH,
        save_repr_path=SAVE_REPR_PATH,
        ssa_col_name=SSA_COL_NAME
    )

使用计算设备：cuda
读取Excel完成：初始样本数=11393，SSA单位=m²/g
数据清洗完成：有效样本数=11218（剔除空值/0值/异常值）
数据集划分：训练集=7852 | 验证集=2243 | 测试集=1123

开始SSA模型训练（共100轮，设备=cuda，SSA单位=m²/g）
Epoch  20/100 | 训练损失：0.000379 | 验证损失：0.000438 | 保存最优模型
Epoch  80/100 | 训练损失：0.000093 | 验证损失：0.000097 | 保存最优模型
Epoch 100/100 | 训练损失：0.000084 | 验证损失：0.000093 | 保存最优模型

SSA模型训练完成：最优验证损失=0.000093 | 模型保存路径=ssa_regression_best.pth

开始提取SSA的16维表征（单位：m²/g，样本数=11218）
SSA 16维表征提取完成！
- 表征形状：(11218, 16)（样本数×16维）
- 保存路径：ssa_16d_preliminary_repr.npy
- 单位：m²/g（已归一化到[0,1]）
