In [None]:
from sklearn.preprocessing import LabelEncoder
import numpy as np

# 示例数据，假设这两个列表分别代表所有样本中的不同细胞定位、疾病和细胞表型
cell_locations = ['Nucleus', 'Cytoplasm', 'Mitochondria', 'Nucleus', 'Cytoplasm', 'Mitochondria', ...]
diseases = ['Disease_A', 'Disease_B', 'Disease_C', 'Disease_A', 'Disease_B', 'Disease_C', ...]
phenotypes = ['Phenotype_A', 'Phenotype_B', 'Phenotype_C', 'Phenotype_A', 'Phenotype_B', 'Phenotype_C', ...]

# 创建 LabelEncoder 实例并进行编码
location_encoder = LabelEncoder()
disease_encoder = LabelEncoder()
phenotype_encoder = LabelEncoder()

# 将类别标签转换为整数ID
location_ids = location_encoder.fit_transform(cell_locations)
disease_ids = disease_encoder.fit_transform(diseases)
phenotype_ids = phenotype_encoder.fit_transform(phenotypes)


In [None]:
# 假设我们有多个样本，每个样本有10个细胞定位、10个疾病、10个细胞表型
# 下面是每个样本的示例数据
sample_1_locations = ['Nucleus', 'Cytoplasm', 'Mitochondria', 'Nucleus', 'Cytoplasm', 'Mitochondria', ...]
sample_1_diseases = ['Disease_A', 'Disease_B', 'Disease_C', 'Disease_A', 'Disease_B', 'Disease_C', ...]
sample_1_phenotypes = ['Phenotype_A', 'Phenotype_B', 'Phenotype_C', 'Phenotype_A', 'Phenotype_B', 'Phenotype_C', ...]

# 为每个样本进行编码
sample_1_location_ids = location_encoder.transform(sample_1_locations)
sample_1_disease_ids = disease_encoder.transform(sample_1_diseases)
sample_1_phenotype_ids = phenotype_encoder.transform(sample_1_phenotypes)

# 构建最终的特征向量
sample_1_feature_vector = np.concatenate([sample_1_location_ids, sample_1_disease_ids, sample_1_phenotype_ids])

print("Sample 1 feature vector:", sample_1_feature_vector)


In [None]:
# 假设有多个样本
samples = [
    {
        'locations': ['Nucleus', 'Cytoplasm', 'Mitochondria', ...],
        'diseases': ['Disease_A', 'Disease_B', 'Disease_C', ...],
        'phenotypes': ['Phenotype_A', 'Phenotype_B', 'Phenotype_C', ...]
    },
    # 更多样本...
]

# 构建所有样本的特征向量
all_feature_vectors = []

for sample in samples:
    location_ids = location_encoder.transform(sample['locations'])
    disease_ids = disease_encoder.transform(sample['diseases'])
    phenotype_ids = phenotype_encoder.transform(sample['phenotypes'])
    
    # 构建样本的特征向量
    feature_vector = np.concatenate([location_ids, disease_ids, phenotype_ids])
    all_feature_vectors.append(feature_vector)

print("All feature vectors:", all_feature_vectors)


In [None]:
# 假设我们有对应的标签数据
labels = np.array([1, 0, 1, ...])

# 转换为PyTorch Tensor
import torch

features_tensor = torch.tensor(all_feature_vectors, dtype=torch.float32)
labels_tensor = torch.tensor(labels, dtype=torch.float32)

# 将数据和标签包装为数据集
from torch.utils.data import TensorDataset, DataLoader

dataset = TensorDataset(features_tensor, labels_tensor)
dataloader = DataLoader(dataset, batch_size=32, shuffle=True)


In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class AttentionMechanism(nn.Module):
    def __init__(self, feature_dim, hidden_dim):
        super(AttentionMechanism, self).__init__()
        self.fc1 = nn.Linear(feature_dim, hidden_dim)
        self.fc2 = nn.Linear(hidden_dim, 1)

    def forward(self, features):
        attn_weights = F.relu(self.fc1(features))
        attn_weights = torch.sigmoid(self.fc2(attn_weights))
        return attn_weights

class InteractionPredictionModel(nn.Module):
    def __init__(self, input_dim, embedding_dim, hidden_dim):
        super(InteractionPredictionModel, self).__init__()
        self.embedding1 = nn.Linear(input_dim, embedding_dim)
        self.embedding2 = nn.Linear(input_dim, embedding_dim)
        self.attention = AttentionMechanism(embedding_dim, hidden_dim)
        self.fc = nn.Sequential(
            nn.Linear(embedding_dim * 2, hidden_dim),
            nn.ReLU(),
            nn.Dropout(0.5),
            nn.Linear(hidden_dim, 1)
        )

    def forward(self, metabolite_features, protein_features):
        # Embed the features
        metabolite_embedding = F.relu(self.embedding1(metabolite_features))
        protein_embedding = F.relu(self.embedding2(protein_features))
        
        # Apply attention mechanism
        attn_weights = self.attention(protein_embedding)
        weighted_protein_embedding = attn_weights * protein_embedding
        
        # Concatenate metabolite and weighted protein features
        combined_features = torch.cat((metabolite_embedding, weighted_protein_embedding), dim=1)
        
        # Predict interaction score
        interaction_score = torch.sigmoid(self.fc(combined_features))
        return interaction_score

# Example usage
input_dim = 100  # Example input dimension for metabolite/protein features
embedding_dim = 64  # Embedding dimension for both metabolite and protein features
hidden_dim = 128  # Hidden dimension for the attention mechanism and fully connected layers

model = InteractionPredictionModel(input_dim, embedding_dim, hidden_dim)

# Dummy inputs
metabolite_features = torch.randn((1, input_dim))  # Batch size of 1
protein_features = torch.randn((1, input_dim))

# Forward pass
interaction_score = model(metabolite_features, protein_features)
print(interaction_score)


In [None]:
import torch
from torch.utils.data import DataLoader, Dataset

class InteractionDataset(Dataset):
    def __init__(self, metabolite_features, protein_features, labels):
        self.metabolite_features = metabolite_features
        self.protein_features = protein_features
        self.labels = labels

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):
        return self.metabolite_features[idx], self.protein_features[idx], self.labels[idx]

# 生成数据集实例
train_dataset = InteractionDataset(train_metabolite_features, train_protein_features, train_labels)
val_dataset = InteractionDataset(val_metabolite_features, val_protein_features, val_labels)

# 创建数据加载器
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False)


In [None]:
import torch.optim as optim

criterion = nn.BCELoss()  # 二分类交叉熵损失
optimizer = optim.Adam(model.parameters(), lr=0.001)  # Adam优化器


In [None]:
num_epochs = 20  # 设置训练的epoch数
for epoch in range(num_epochs):
    model.train()  # 设置模型为训练模式
    running_loss = 0.0
    
    for metabolite_features, protein_features, labels in train_loader:
        optimizer.zero_grad()  # 清空梯度
        
        # 前向传播
        outputs = model(metabolite_features, protein_features).squeeze()
        
        # 计算损失
        loss = criterion(outputs, labels.float())
        
        # 反向传播和优化
        loss.backward()
        optimizer.step()
        
        running_loss += loss.item()
    
    print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {running_loss/len(train_loader):.4f}')
    
    # 验证阶段
    model.eval()
    val_loss = 0.0
    correct = 0
    total = 0
    with torch.no_grad():
        for metabolite_features, protein_features, labels in val_loader:
            outputs = model(metabolite_features, protein_features).squeeze()
            loss = criterion(outputs, labels.float())
            val_loss += loss.item()
            
            # 计算准确率
            predicted = (outputs > 0.5).float()
            correct += (predicted == labels).sum().item()
            total += labels.size(0)
    
    print(f'Validation Loss: {val_loss/len(val_loader):.4f}, Accuracy: {100 * correct / total:.2f}%')


In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.model_selection import train_test_split
import numpy as np

# 假设我们有以下数据结构
# proteins_data = [("protein1", diseases, localization, phenotypes), ...]
# compounds_data = [("compound1", diseases, localization, phenotypes), ...]

# 定义类别数量
num_disease_categories = 1000
num_phenotype_categories = 1000
num_localization_categories = 30

# 每种类别我们会用一个Embedding层进行处理
disease_embedding_dim = 64
phenotype_embedding_dim = 64
localization_embedding_dim = 16

class FeatureEmbedding(nn.Module):
    def __init__(self, num_categories, embedding_dim):
        super(FeatureEmbedding, self).__init__()
        self.embedding = nn.Embedding(num_categories, embedding_dim)

    def forward(self, x):
        return self.embedding(x)

# 定义蛋白质和化合物的嵌入层
disease_embedding = FeatureEmbedding(num_disease_categories, disease_embedding_dim)
phenotype_embedding = FeatureEmbedding(num_phenotype_categories, phenotype_embedding_dim)
localization_embedding = FeatureEmbedding(num_localization_categories, localization_embedding_dim)

def encode_features(data, disease_embedding, phenotype_embedding, localization_embedding):
    diseases, localization, phenotypes = [], [], []
    for _, d, l, p in data:
        diseases.append(d)
        localization.append(l)
        phenotypes.append(p)
    
    diseases = torch.tensor(diseases, dtype=torch.long)
    localization = torch.tensor(localization, dtype=torch.long)
    phenotypes = torch.tensor(phenotypes, dtype=torch.long)
    
    disease_features = disease_embedding(diseases).sum(dim=1)
    localization_features = localization_embedding(localization).sum(dim=1)
    phenotype_features = phenotype_embedding(phenotypes).sum(dim=1)
    
    return torch.cat([disease_features, localization_features, phenotype_features], dim=1)

# 对蛋白质和化合物的特征进行编码
protein_features = encode_features(proteins_data, disease_embedding, phenotype_embedding, localization_embedding)
compound_features = encode_features(compounds_data, disease_embedding, phenotype_embedding, localization_embedding)

# 将蛋白质和化合物特征拼接在一起
features = torch.cat([protein_features, compound_features], dim=1)

# 假设labels为0或1的标签
labels = torch.tensor([0, 1, 0, 1, ...], dtype=torch.float32).unsqueeze(1)  # 与数据对齐的标签

# 分割数据集为训练集和测试集
X_train, X_test, y_train, y_test = train_test_split(features, labels, test_size=0.2, random_state=42)


In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
from sklearn.metrics import roc_auc_score, roc_curve
from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plt

# 定义模型（代码同前）
class AttentionLayer(nn.Module):
    def __init__(self, input_dim, hidden_dim):
        super(AttentionLayer, self).__init__()
        self.attention = nn.Sequential(
            nn.Linear(input_dim, hidden_dim),
            nn.Tanh(),
            nn.Linear(hidden_dim, 1)
        )

    def forward(self, x):
        attn_weights = self.attention(x)
        attn_weights = torch.softmax(attn_weights, dim=1)
        weighted_input = x * attn_weights
        return weighted_input.sum(dim=1)

class InteractionModel(nn.Module):
    def __init__(self, input_dim, hidden_dim):
        super(InteractionModel, self).__init__()
        self.fc1 = nn.Linear(input_dim, hidden_dim)
        self.attention = AttentionLayer(hidden_dim, hidden_dim // 2)
        self.fc2 = nn.Linear(hidden_dim, hidden_dim // 2)
        self.fc3 = nn.Linear(hidden_dim // 2, 1)
        self.dropout = nn.Dropout(0.3)
        self.relu = nn.LeakyReLU()

    def forward(self, x):
        x = self.relu(self.fc1(x))
        x = self.dropout(x)
        x = self.attention(x)
        x = self.relu(self.fc2(x))
        x = self.fc3(x)
        return torch.sigmoid(x)

# 计算AUC-ROC分数
def calculate_auc_roc(model, X, y):
    model.eval()
    with torch.no_grad():
        inputs = torch.tensor(X, dtype=torch.float32)
        labels = torch.tensor(y, dtype=torch.float32).unsqueeze(1)
        outputs = model(inputs)
        auc_roc = roc_auc_score(labels.numpy(), outputs.numpy())
        return auc_roc, outputs.numpy()

# 训练模型，并在每个epoch后计算训练集和验证集的Loss和AUC-ROC
def train_model(model, X_train, y_train, X_test, y_test, criterion, optimizer, scheduler, epochs=20, patience=5):
    best_auc_roc = 0.0
    best_epoch = 0
    best_model_wts = model.state_dict()
    epochs_no_improve = 0

    for epoch in range(epochs):
        model.train()
        inputs = torch.tensor(X_train, dtype=torch.float32)
        labels = torch.tensor(y_train, dtype=torch.float32).unsqueeze(1)

        optimizer.zero_grad()
        outputs = model(inputs)
        train_loss = criterion(outputs, labels)
        train_loss.backward()
        optimizer.step()

        # 计算训练集的AUC-ROC
        train_auc_roc, _ = calculate_auc_roc(model, X_train, y_train)

        # 验证阶段
        model.eval()
        with torch.no_grad():
            val_inputs = torch.tensor(X_test, dtype=torch.float32)
            val_labels = torch.tensor(y_test, dtype=torch.float32).unsqueeze(1)
            val_outputs = model(val_inputs)
            val_loss = criterion(val_outputs, val_labels)
            val_auc_roc, val_predictions = calculate_auc_roc(model, X_test, y_test)

        # 更新学习率调度器
        scheduler.step(val_loss)

        # 检查是否有最佳AUC-ROC（基于验证集）
        if val_auc_roc > best_auc_roc:
            best_auc_roc = val_auc_roc
            best_epoch = epoch
            best_model_wts = model.state_dict()
            epochs_no_improve = 0
        else:
            epochs_no_improve += 1

        # 输出当前epoch的训练集和验证集表现
        print(f'Epoch [{epoch+1}/{epochs}]')
        print(f'Train Loss: {train_loss.item():.4f}, Train AUC-ROC: {train_auc_roc:.4f}')
        print(f'Val Loss: {val_loss.item():.4f}, Val AUC-ROC: {val_auc_roc:.4f}')

        # 每5个epoch绘制一次验证集的ROC曲线
        if (epoch + 1) % 5 == 0:
            plot_roc_curve(val_labels.numpy(), val_predictions)

        # 实现早停
        if epochs_no_improve >= patience:
            print(f'Early stopping triggered. Best Val AUC-ROC: {best_auc_roc:.4f} at epoch {best_epoch+1}.')
            break

    # 恢复最佳模型权重
    model.load_state_dict(best_model_wts)

# 数据准备（前述数据处理步骤）
# X_train, X_test, y_train, y_test = ...

# 初始化模型
input_dim = X_train.shape[1]  # 输入维度是特征数
hidden_dim = 256  # 隐藏层维度，可以调整

model = InteractionModel(input_dim, hidden_dim)
criterion = nn.BCELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)
scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.1, patience=3, verbose=True)

# 开始训练
train_model(model, X_train, y_train, X_test, y_test, criterion, optimizer, scheduler, epochs=20, patience=5)


In [None]:
# 假设新的数据集与之前的数据集结构相同
# new_proteins_data = ...
# new_compounds_data = ...

# 使用相同的嵌入层对新数据集进行编码
new_protein_features = encode_features(new_proteins_data, disease_embedding, phenotype_embedding, localization_embedding)
new_compound_features = encode_features(new_compounds_data, disease_embedding, phenotype_embedding, localization_embedding)

# 拼接蛋白质和化合物的特征
new_features = torch.cat([new_protein_features, new_compound_features], dim=1)

# 假设你已经保存了训练好的模型
# torch.save(model.state_dict(), 'model.pth')

# 加载模型
model = InteractionModel(input_dim, hidden_dim)
model.load_state_dict(torch.load('model.pth'))
model.eval()

# 确保模型处于评估模式
model.eval()

# 进行预测
with torch.no_grad():
    new_inputs = new_features.float()  # 将数据转换为浮点类型张量
    predictions = model(new_inputs)

# 将预测结果转化为二进制标签或概率
binary_predictions = (predictions > 0.5).int()  # 0 或 1
probability_predictions = predictions.numpy()  # 概率值

# 保存预测结果
np.savetxt("predictions.csv", binary_predictions.numpy(), delimiter=",")

