# 安装环境

In [3]:
# !pip install torchtext
!pip install pandas
!pip install scikit-optimize
!git clone https://github.com/xzxg001/AI-chemistry.git

fatal: destination path 'AI-chemistry' already exists and is not an empty directory.


# 对数据处理进行初步尝试

In [4]:
import pandas as pd
from torch.utils.data import Dataset, DataLoader, Subset
from typing import List, Tuple
import re
import torch
import torch.nn as nn
import time
import torch.optim as optim

In [5]:
# tokenizer，鉴于SMILES的特性，这里需要自己定义tokenizer和vocab
# 这里直接将smiles str按字符拆分，并替换为词汇表中的序号
class Smiles_tokenizer():
    def __init__(self, pad_token, regex, vocab_file, max_length):
        self.pad_token = pad_token
        self.regex = regex
        self.vocab_file = vocab_file
        self.max_length = max_length

        with open(self.vocab_file, "r") as f:
            lines = f.readlines()
        lines = [line.strip("\n") for line in lines]
        vocab_dic = {}
        for index, token in enumerate(lines):
            vocab_dic[token] = index
        self.vocab_dic = vocab_dic

    def _regex_match(self, smiles):
        regex_string = r"(" + self.regex + r"|"
        regex_string += r".)"
        prog = re.compile(regex_string)

        tokenised = []
        for smi in smiles:
            tokens = prog.findall(smi)
            if len(tokens) > self.max_length:
                tokens = tokens[:self.max_length]
            tokenised.append(tokens) # 返回一个所有的字符串列表
        return tokenised

    def tokenize(self, smiles):
        tokens = self._regex_match(smiles)
        # 添加上表示开始和结束的token：<cls>, <end>
        tokens = [["<CLS>"] + token + ["<SEP>"] for token in tokens]
        tokens = self._pad_seqs(tokens, self.pad_token)
        token_idx = self._pad_token_to_idx(tokens)
        return tokens, token_idx

    def _pad_seqs(self, seqs, pad_token):
        pad_length = max([len(seq) for seq in seqs])
        padded = [seq + ([pad_token] * (pad_length - len(seq))) for seq in seqs]
        return padded

    def _pad_token_to_idx(self, tokens):
        idx_list = []
        new_vocab = []
        for token in tokens:
            tokens_idx = []
            for i in token:
                if i in self.vocab_dic.keys():
                    tokens_idx.append(self.vocab_dic[i])
                else:
                    new_vocab.append(i)
                    self.vocab_dic[i] = max(self.vocab_dic.values()) + 1
                    tokens_idx.append(self.vocab_dic[i])
            idx_list.append(tokens_idx)

        with open("../new_vocab_list.txt", "a") as f:
            for i in new_vocab:
                f.write(i)
                f.write("\n")

        return idx_list

    def _save_vocab(self, vocab_path):
        with open(vocab_path, "w") as f:
            for i in self.vocab_dic.keys():
                f.write(i)
                f.write("\n")
        print("update new vocab!")

In [6]:
# 处理数据

def read_data(file_path, train=True):
    df = pd.read_csv(file_path)
    reactant1 = df["Reactant1"].tolist()
    reactant2 = df["Reactant2"].tolist()
    product = df["Product"].tolist()
    additive = df["Additive"].tolist()
    solvent = df["Solvent"].tolist()
    if train:
        react_yield = df["Yield"].tolist()
    else:
        react_yield = [0 for i in range(len(reactant1))]

    # 将reactant\additive\solvent拼到一起，之间用.分开。product也拼到一起，用>>分开
    input_data_list = []
    for react1, react2, prod, addi, sol in zip(reactant1, reactant2, product, additive, solvent):
        # input_info = ".".join([react1, react2, addi, sol])
        input_info = ".".join([react1, react2])
        input_info = ">".join([input_info, prod])
        input_data_list.append(input_info)
    output = [(react, y) for react, y in zip(input_data_list, react_yield)]

    return output


In [7]:
# 定义数据集
class ReactionDataset(Dataset):
    def __init__(self, data: List[Tuple[List[str], float]]):
        self.data = data

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        return self.data[idx]

def collate_fn(batch):
    REGEX = r"\[[^\]]+]|Br?|Cl?|N|O|S|P|F|I|b|c|n|o|s|p|\(|\)|\.|=|#|-|\+|\\\\|\/|:|~|@|\?|>|\*|\$|\%[0-9]{2}|[0-9]"
    tokenizer = Smiles_tokenizer("<PAD>", REGEX, "/content/AI-chemistry/mp/vocab_full.txt", 300)
    smi_list = []
    yield_list = []
    for i in batch:
        smi_list.append(i[0])
        yield_list.append(i[1])
    tokenizer_batch = torch.tensor(tokenizer.tokenize(smi_list)[1])
    yield_list = torch.tensor(yield_list)
    return tokenizer_batch, yield_list


In [8]:
# 模型
'''
直接采用一个transformer encoder model就好了
'''
class TransformerEncoderModel(nn.Module):
    def __init__(self, input_dim, d_model, num_heads, fnn_dim, num_layers, dropout):
        super().__init__()
        self.embedding = nn.Embedding(input_dim, d_model)
        self.layerNorm = nn.LayerNorm(d_model)
        self.encoder_layer = nn.TransformerEncoderLayer(d_model=d_model,
                                                        nhead=num_heads,
                                                        dim_feedforward=fnn_dim,
                                                        dropout=dropout,
                                                        batch_first=True,
                                                        norm_first=True # pre-layernorm
                                                        )
        self.transformer_encoder = nn.TransformerEncoder(self.encoder_layer,
                                                         num_layers=num_layers,
                                                         norm=self.layerNorm)
        self.dropout = nn.Dropout(dropout)
        self.lc = nn.Sequential(nn.Linear(d_model, 256),
                                nn.Sigmoid(),
                                nn.Linear(256, 96),
                                nn.Sigmoid(),
                                nn.Linear(96, 1))

    def forward(self, src):
        # src shape: [batch_size, src_len]
        embedded = self.dropout(self.embedding(src))
        # embedded shape: [batch_size, src_len, d_model]
        outputs = self.transformer_encoder(embedded)
        # outputs shape: [batch_size, src_len, d_model]

        # fisrt
        z = outputs[:,0,:]
        # z = torch.sum(outputs, dim=1)
        # print(z)
        # z shape: [bs, d_model]
        outputs = self.lc(z)
        # print(outputs)
        # outputs shape: [bs, 1]
        return outputs.squeeze(-1)

In [9]:
def adjust_learning_rate(optimizer, epoch, start_lr):
    """Sets the learning rate to the initial LR decayed by 10 every 30 epochs"""
    lr = start_lr * (0.1 ** (epoch // 3))
    for param_group in optimizer.param_groups:
        param_group['lr'] = lr


In [None]:
import os
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, random_split
from sklearn.preprocessing import StandardScaler
from skopt import gp_minimize
from skopt.space import Real, Integer
from skopt.utils import use_named_args

# 假设 read_data, ReactionDataset, collate_fn, TransformerEncoderModel 等函数和类已经定义

def prepare_data(data_path, batch_size, test_size=0.1, valid_size=0.1, scaler=None):
    data = read_data(data_path)
    dataset = ReactionDataset(data)

    # 应用归一化
    if scaler:
        dataset.data = scaler.fit_transform(dataset.data)

    test_size = int(len(dataset) * test_size)
    valid_size = int(len(dataset) * valid_size)
    train_size = len(dataset) - test_size - valid_size

    train_dataset, valid_test_dataset = random_split(dataset, [train_size, len(dataset) - train_size])
    valid_dataset, test_dataset = random_split(valid_test_dataset, [valid_size, test_size])

    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, collate_fn=collate_fn)
    valid_loader = DataLoader(valid_dataset, batch_size=batch_size, shuffle=False, collate_fn=collate_fn)
    test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False, collate_fn=collate_fn)

    return train_loader, valid_loader, test_loader

class TransformerEncoderModel(nn.Module):
    def __init__(self, input_dim, d_model, num_heads, fnn_dim, num_layers, dropout):
        super().__init__()
        self.embedding = nn.Embedding(input_dim, d_model)
        self.layerNorm = nn.LayerNorm(d_model)
        self.encoder_layer = nn.TransformerEncoderLayer(d_model=d_model,
                                                        nhead=num_heads,
                                                        dim_feedforward=fnn_dim,
                                                        dropout=dropout,
                                                        batch_first=True,
                                                        norm_first=True)
        self.transformer_encoder = nn.TransformerEncoder(self.encoder_layer, num_layers=num_layers, norm=self.layerNorm)
        self.dropout = nn.Dropout(dropout)
        self.lc = nn.Sequential(nn.Linear(d_model, 256),
                                nn.Sigmoid(),
                                nn.Linear(256, 96),
                                nn.Sigmoid(),
                                nn.Linear(96, 1))

    def forward(self, src):
        embedded = self.dropout(self.embedding(src))
        outputs = self.transformer_encoder(embedded)
        z = outputs[:,0,:]
        outputs = self.lc(z)
        return outputs.squeeze(-1)

def train(**params):
    INPUT_DIM = 292
    D_MODEL = int(params['D_MODEL'])
    NUM_HEADS = int(params['NUM_HEADS'])
    FNN_DIM = int(params['FNN_DIM'])
    NUM_LAYERS = int(params['NUM_LAYERS'])
    DROPOUT = params['DROPOUT']
    LR = params['LR']
    CLIP = 1
    N_EPOCHS = 40
    model_file = "/content/AI-chemistry/mp/model/transformer.pth"

    train_loader, valid_loader, _ = prepare_data(
        "/content/AI-chemistry/mp/dataset/round1_train_data.csv",
        batch_size=128,
        scaler=StandardScaler()
    )

    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    model = TransformerEncoderModel(INPUT_DIM, D_MODEL, NUM_HEADS, FNN_DIM, NUM_LAYERS, DROPOUT).to(device)
    model.train()

    optimizer = optim.AdamW(model.parameters(), lr=LR, weight_decay=0.01)
    scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.1, patience=10)
    criterion = nn.MSELoss()

    best_valid_loss = float('inf')
    for epoch in range(N_EPOCHS):
        epoch_loss = 0
        for i, (src, y) in enumerate(train_loader):
            src, y = src.to(device), y.to(device)
            optimizer.zero_grad()
            output = model(src)
            loss = criterion(output, y)
            loss.backward()
            torch.nn.utils.clip_grad_norm_(model.parameters(), CLIP)
            optimizer.step()
            epoch_loss += loss.detach().item()

            if i % 50 == 0:
                print(f'Step: {i} | Train Loss: {epoch_loss:.4f}')

        scheduler.step(epoch_loss / len(train_loader))
        print(f'Epoch: {epoch+1:02} | Train Loss: {epoch_loss / len(train_loader):.3f}')

        valid_loss = 0
        model.eval()
        with torch.no_grad():
            for src, y in valid_loader:
                src, y = src.to(device), y.to(device)
                output = model(src)
                valid_loss += criterion(output, y).item()

        valid_loss /= len(valid_loader)
        print(f'Epoch: {epoch+1:02} | Valid Loss: {valid_loss:.3f}')
        model.train()

        if valid_loss < best_valid_loss:
            best_valid_loss = valid_loss
            torch.save(model.state_dict(), model_file)

    return {'loss': best_valid_loss, 'status': 'ok'}

# 定义超参数搜索空间
search_spaces = {
    'D_MODEL': Integer(128, 512),
    'NUM_HEADS': Integer(2, 8),
    'FNN_DIM': Integer(256, 2048),
    'NUM_LAYERS': Integer(2, 6),
    'DROPOUT': Real(0.1, 0.5),
    'LR': Real(1e-5, 1e-1, prior='log-uniform'),
}

# 使用贝叶斯优化
@use_named_args(search_spaces)
def objective(D_MODEL, NUM_HEADS, FNN_DIM, NUM_LAYERS, DROPOUT, LR):
    return train(D_MODEL=D_MODEL, NUM_HEADS=NUM_HEADS, FNN_DIM=FNN_DIM, NUM_LAYERS=NUM_LAYERS, DROPOUT=DROPOUT, LR=LR)

res = gp_minimize(objective, search_spaces, n_calls=50, random_state=0)

print("最佳超参数: ", res.x)

In [None]:
import torch
import torch.nn as nn
from torch.utils.data import DataLoader
from sklearn.metrics import r2_score

def test(model, test_loader, device):
    model.eval()
    total_mse_loss = 0
    total_mae_loss = 0
    total_r2 = 0
    total_count = 0
    all_predictions = []
    all_targets = []

    with torch.no_grad():
        for src, y in test_loader:
            src, y = src.to(device), y.to(device)
            output = model(src)
            mse_loss = nn.MSELoss()(output, y)
            mae_loss = nn.L1Loss()(output, y)
            r2 = r2_score(y.cpu().numpy(), output.cpu().numpy())  # 计算 R-squared
            total_mse_loss += mse_loss.item() * len(y)
            total_mae_loss += mae_loss.item() * len(y)
            total_r2 += r2 * len(y)
            total_count += len(y)

            # 保存所有预测和目标值用于整体 R-squared 计算
            all_predictions.extend(output.cpu().numpy().flatten())
            all_targets.extend(y.cpu().numpy().flatten())

    avg_mse_loss = total_mse_loss / total_count if total_count > 0 else 0
    avg_mae_loss = total_mae_loss / total_count if total_count > 0 else 0
    avg_r2 = total_r2 / total_count if total_count > 0 else 0

    print(f'Test MSE Loss: {avg_mse_loss:.3f}')
    print(f'Test MAE Loss: {avg_mae_loss:.3f}')
    print(f'Test R-squared: {avg_r2:.3f}')

    return avg_mse_loss, avg_mae_loss, avg_r2

# 加载模型并测试
def load_and_test(model_file, test_loader, INPUT_DIM, D_MODEL, NUM_HEADS, FNN_DIM, NUM_LAYERS, DROPOUT):
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    model = TransformerEncoderModel(INPUT_DIM, D_MODEL, NUM_HEADS, FNN_DIM, NUM_LAYERS, DROPOUT).to(device)
    if os.path.exists(model_file):
        model.load_state_dict(torch.load(model_file, map_location=device))
        model.eval()
        print(f"Model loaded from {model_file}")
        return test(model, test_loader, device)
    else:
        print("No existing model found. Please train the model first.")
        return None

# 调用测试函数
# 确保在调用之前已经定义了 model_file, test_loader, INPUT_DIM, D_MODEL, NUM_HEADS, FNN_DIM, NUM_LAYERS, DROPOUT
mse_loss, mae_loss, r2_score = load_and_test(model_file, test_loader, INPUT_DIM, D_MODEL, NUM_HEADS, FNN_DIM, NUM_LAYERS, DROPOUT)

  model.load_state_dict(torch.load(model_file, map_location=device))


Model loaded from /content/AI-chemistry/mp/model/transformer.pth
Test MSE Loss: 0.048
Test MAE Loss: 0.180
Test R-squared: 0.168
