# 01_Graph_neural_network_Amphiphilic_Molecule_Analysis.ipynb
- Amphiphilic_Molecule_Analysis
- Study on solubility prediction of amphiphilic molecules based on molecular descriptor and graph neural network

# 1. 安装必要的库

In [None]:
!pip install PyTDC rdkit-pypi torch-geometric torch-scatter torch-sparse -f https://data.pyg.org/whl/torch-1.13.0+cpu.html

# 2. 导入必要的库

In [10]:
import torch
print(torch.__version__)

1.13.0


# 3. 导入数据集和工具

In [11]:
from tdc.single_pred import ADME
from rdkit import Chem, RDLogger
from rdkit.Chem import AllChem, Descriptors
import numpy as np
import pandas as pd
import tqdm
import matplotlib.pyplot as plt
import seaborn as sns

ValueError: numpy.dtype size changed, may indicate binary incompatibility. Expected 96 from C header, got 88 from PyObject

# 关闭 rdkit 日志

In [12]:
RDLogger.DisableLog('rdApp.*')

NameError: name 'RDLogger' is not defined

In [13]:
# 加载溶解度数据集
data = ADME(name='Solubility_AqSolDB')
split = data.get_split(method='random')
data.print_stats()

# 计算分子描述符函数
def calculate_descriptors(smiles):
    mol = Chem.MolFromSmiles(smiles)
    logp = Descriptors.MolLogP(mol)
    tpsa = Descriptors.TPSA(mol)
    mol_weight = Descriptors.MolWt(mol)
    num_h_donors = Descriptors.NumHDonors(mol)
    num_h_acceptors = Descriptors.NumHAcceptors(mol)
    return [logp, tpsa, mol_weight, num_h_donors, num_h_acceptors]

# 计算分子指纹函数（Morgan Fingerprint）
from rdkit.Chem import rdFingerprintGenerator
mfpgen = rdFingerprintGenerator.GetMorganGenerator(radius=2, fpSize=1024)
smiles2fp = lambda smiles: mfpgen.GetFingerprintAsNumPy(Chem.MolFromSmiles(smiles))

# 整合指纹和描述符
def smiles_to_features(smiles):
    fingerprint = smiles2fp(smiles)
    descriptors = calculate_descriptors(smiles)
    features = np.concatenate([fingerprint, descriptors])
    return features

# 处理训练集、验证集和测试集
def process_dataset(split):
    X = np.stack(list(map(smiles_to_features, tqdm.tqdm(split['Drug']))))
    y = split['Y'].values
    return X, y

X_train, y_train = process_dataset(split['train'])
X_valid, y_valid = process_dataset(split['valid'])
X_test, y_test = process_dataset(split['test'])

# 查看特征维度
print("Feature dimension:", X_train.shape[1])

# 标准化描述符（可选）
from sklearn.preprocessing import StandardScaler

scaler = StandardScaler()
X_train[:, -5:] = scaler.fit_transform(X_train[:, -5:])
X_valid[:, -5:] = scaler.transform(X_valid[:, -5:])
X_test[:, -5:] = scaler.transform(X_test[:, -5:])

# ---------------------------
# 随机森林回归模型
# ---------------------------

from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_squared_error, mean_absolute_error

rf_model = RandomForestRegressor(n_estimators=100, random_state=0)
rf_model.fit(X_train, y_train)

y_pred_rf = rf_model.predict(X_test)

mse_rf = mean_squared_error(y_test, y_pred_rf)
mae_rf = mean_absolute_error(y_test, y_pred_rf)
print(f"Random Forest Test MSE: {mse_rf:.4f}, MAE: {mae_rf:.4f}")

# 可视化预测结果
plt.figure(figsize=(6,6))
plt.scatter(y_test, y_pred_rf, alpha=0.6)
plt.plot([-10, 2], [-10, 2], 'r--')
plt.xlabel('True Solubility')
plt.ylabel('Predicted Solubility')
plt.title('Random Forest Predictions')
plt.show()

# 特征重要性分析
importances = rf_model.feature_importances_
indices = np.argsort(importances)[::-1]

# 打印最重要的前10个特征
print("Top 10 feature importances:")
for f in range(10):
    if indices[f] < 1024:
        feature_name = f"Fingerprint {indices[f]}"
    else:
        descriptor_names = ['LogP', 'TPSA', 'MolWt', 'NumHDonors', 'NumHAcceptors']
        feature_name = descriptor_names[indices[f] - 1024]
    print(f"{f + 1}. {feature_name} ({importances[indices[f]]:.4f})")

# ---------------------------
# 多层感知器（MLP）
# ---------------------------

import torch
from torch.utils.data import Dataset, DataLoader
import torch.nn as nn
import torch.optim as optim

class SolubilityDataset(Dataset):
    def __init__(self, X, y):
        self.features = torch.tensor(X, dtype=torch.float32)
        self.labels = torch.tensor(y, dtype=torch.float32)

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):
        return self.features[idx], self.labels[idx]

train_dataset = SolubilityDataset(X_train, y_train)
valid_dataset = SolubilityDataset(X_valid, y_valid)
test_dataset = SolubilityDataset(X_test, y_test)

train_dataloader = DataLoader(train_dataset, batch_size=32, shuffle=True)
valid_dataloader = DataLoader(valid_dataset, batch_size=32)
test_dataloader = DataLoader(test_dataset, batch_size=32)

# 定义 MLP 模型
class SolubilityMLP(nn.Module):
    def __init__(self, input_dim):
        super(SolubilityMLP, self).__init__()
        self.fc1 = nn.Linear(input_dim, 512)
        self.fc2 = nn.Linear(512, 256)
        self.fc3 = nn.Linear(256, 1)
        self.relu = nn.ReLU()
        self.dropout = nn.Dropout(p=0.2)

    def forward(self, x):
        x = self.relu(self.fc1(x))
        x = self.dropout(x)
        x = self.relu(self.fc2(x))
        x = self.fc3(x)
        return x

input_dim = X_train.shape[1]
model = SolubilityMLP(input_dim)
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# 训练和验证函数
def train(model, dataloader, criterion, optimizer):
    model.train()
    total_loss = 0
    for features, labels in dataloader:
        optimizer.zero_grad()
        outputs = model(features)
        loss = criterion(outputs.squeeze(), labels)
        loss.backward()
        optimizer.step()
        total_loss += loss.item() * len(labels)
    return total_loss / len(dataloader.dataset)

def evaluate(model, dataloader, criterion):
    model.eval()
    total_loss = 0
    with torch.no_grad():
        for features, labels in dataloader:
            outputs = model(features)
            loss = criterion(outputs.squeeze(), labels)
            total_loss += loss.item() * len(labels)
    return total_loss / len(dataloader.dataset)

# 开始训练
num_epochs = 50
for epoch in range(num_epochs):
    train_loss = train(model, train_dataloader, criterion, optimizer)
    valid_loss = evaluate(model, valid_dataloader, criterion)
    if (epoch+1) % 5 == 0 or epoch == 0:
        print(f'Epoch: {epoch + 1:03d}, Train Loss: {train_loss:.4f}, Validation Loss: {valid_loss:.4f}')

# 测试集评估
test_loss = evaluate(model, test_dataloader, criterion)
print(f'Test Loss (MSE): {test_loss:.4f}')

# 可视化预测结果
model.eval()
with torch.no_grad():
    y_pred_mlp = model(torch.tensor(X_test, dtype=torch.float32)).squeeze().numpy()

mae_mlp = mean_absolute_error(y_test, y_pred_mlp)
print(f"MLP Test MAE: {mae_mlp:.4f}")

plt.figure(figsize=(6,6))
plt.scatter(y_test, y_pred_mlp, alpha=0.6)
plt.plot([-10, 2], [-10, 2], 'r--')
plt.xlabel('True Solubility')
plt.ylabel('Predicted Solubility')
plt.title('MLP Predictions')
plt.show()

# ---------------------------
# 图神经网络（GNN）
# ---------------------------

from torch_geometric.utils import from_smiles
from torch_geometric.loader import DataLoader as GeoDataLoader
from torch_geometric.nn import global_mean_pool, GINConv
from torch.nn import Linear, Sequential, ReLU
import torch.nn.functional as F

# 处理分子图数据
def process_smiles(row):
    data = from_smiles(row.Drug)
    data.x = data.x.to(torch.float)
    # 添加描述符作为全局特征
    descriptors = calculate_descriptors(row.Drug)
    data.descriptors = torch.tensor(descriptors, dtype=torch.float).unsqueeze(0)  # 修改此处
    data.y = torch.tensor([row.Y], dtype=torch.float)  # 确保 y 是二维的
    return data

train_graphs = list(map(process_smiles, split['train'].itertuples()))
valid_graphs = list(map(process_smiles, split['valid'].itertuples()))
test_graphs = list(map(process_smiles, split['test'].itertuples()))

train_dataloader = GeoDataLoader(train_graphs, batch_size=32, shuffle=True)
valid_dataloader = GeoDataLoader(valid_graphs, batch_size=32)
test_dataloader = GeoDataLoader(test_graphs, batch_size=32)

# 定义 GIN 模型，整合描述符
class GINWithDescriptors(torch.nn.Module):
    def __init__(self, dim_h):
        super(GINWithDescriptors, self).__init__()

        self.conv1 = GINConv(Sequential(Linear(9, dim_h), ReLU(), Linear(dim_h, dim_h), ReLU()))
        self.conv2 = GINConv(Sequential(Linear(dim_h, dim_h), ReLU(), Linear(dim_h, dim_h), ReLU()))
        self.conv3 = GINConv(Sequential(Linear(dim_h, dim_h), ReLU(), Linear(dim_h, dim_h), ReLU()))

        self.lin1 = Linear(dim_h + 5, dim_h)  # 加上描述符的维度
        self.lin2 = Linear(dim_h, 1)

    def forward(self, data):
        x, edge_index, batch = data.x, data.edge_index, data.batch

        # 节点嵌入
        x = self.conv1(x, edge_index)
        x = F.relu(x)
        x = self.conv2(x, edge_index)
        x = F.relu(x)
        x = self.conv3(x, edge_index)

        # 图级读出
        x = global_mean_pool(x, batch)

        # 整合描述符
        descriptors = data.descriptors.view(data.num_graphs, -1)  # 修改此处

        x = torch.cat([x, descriptors], dim=1)

        x = self.lin1(x)
        x = F.relu(x)
        x = F.dropout(x, p=0.2, training=self.training)
        x = self.lin2(x)

        return x.squeeze()

# 模型、损失函数和优化器
model = GINWithDescriptors(dim_h=64)
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
criterion = nn.MSELoss()

# 训练和验证函数
def train(model, dataloader, criterion, optimizer):
    model.train()
    total_loss = 0
    for data in dataloader:
        optimizer.zero_grad()
        out = model(data)
        loss = criterion(out, data.y.view(-1))
        loss.backward()
        optimizer.step()
        total_loss += loss.item() * data.num_graphs
    return total_loss / len(dataloader.dataset)

def evaluate(model, dataloader, criterion):
    model.eval()
    total_loss = 0
    with torch.no_grad():
        for data in dataloader:
            out = model(data)
            loss = criterion(out, data.y.view(-1))
            total_loss += loss.item() * data.num_graphs
    return total_loss / len(dataloader.dataset)

# 开始训练
num_epochs = 50
for epoch in range(num_epochs):
    train_loss = train(model, train_dataloader, criterion, optimizer)
    valid_loss = evaluate(model, valid_dataloader, criterion)
    if (epoch+1) % 5 == 0 or epoch == 0:
        print(f'Epoch: {epoch + 1:03d}, Train Loss: {train_loss:.4f}, Validation Loss: {valid_loss:.4f}')

# 测试集评估
test_loss = evaluate(model, test_dataloader, criterion)
print(f'Test Loss (MSE): {test_loss:.4f}')

# 可视化预测结果
model.eval()
with torch.no_grad():
    predictions = []
    ground_truth = []
    for data in test_dataloader:
        outputs = model(data)
        predictions.extend(outputs.cpu().numpy())
        ground_truth.extend(data.y.cpu().numpy())

mae_gnn = mean_absolute_error(ground_truth, predictions)
print(f"GNN Test MAE: {mae_gnn:.4f}")

plt.figure(figsize=(6,6))
plt.scatter(ground_truth, predictions, alpha=0.6)
plt.plot([-10, 2], [-10, 2], 'r--')
plt.xlabel('True Solubility')
plt.ylabel('Predicted Solubility')
plt.title('GNN Predictions')
plt.show()

# ---------------------------
# 分析两亲性分子
# ---------------------------

# 添加一列用于存储 LogP 和 TPSA 差值的组合指标（示例）
split['test']['LogP'] = split['test']['Drug'].apply(lambda x: Descriptors.MolLogP(Chem.MolFromSmiles(x)))
split['test']['TPSA'] = split['test']['Drug'].apply(lambda x: Descriptors.TPSA(Chem.MolFromSmiles(x)))
split['test']['Hydrophilic_Lipophilic_Balance'] = split['test']['TPSA'] / (split['test']['LogP'] + 1e-6)  # 防止除零

# 可视化 Hydrophilic-Lipophilic Balance 与溶解度的关系
plt.figure(figsize=(8,6))
sns.scatterplot(data=split['test'], x='Hydrophilic_Lipophilic_Balance', y='Y')
plt.xlabel('Hydrophilic-Lipophilic Balance (TPSA / LogP)')
plt.ylabel('Solubility')
plt.title('Hydrophilic-Lipophilic Balance vs Solubility')
plt.show()

# 筛选可能的两亲性分子
amphiphilic_molecules = split['test'][(split['test']['LogP'] > 0) & (split['test']['LogP'] < 5) &
                                      (split['test']['TPSA'] > 20) & (split['test']['TPSA'] < 100)]

print(f"Potential amphiphilic molecules: {len(amphiphilic_molecules)}")

# 显示一些可能的两亲性分子
from rdkit.Chem import Draw

sample_mols = [Chem.MolFromSmiles(smi) for smi in amphiphilic_molecules['Drug'].head(10)]
Draw.MolsToGridImage(sample_mols, molsPerRow=5, subImgSize=(200, 200))

NameError: name 'ADME' is not defined