In [52]:
from rdkit import Chem
from sklearn.metrics import roc_auc_score
from torch_geometric.data import Data
from torch_geometric.loader import DataLoader
from torch_geometric.nn import (
    GCNConv,
    global_mean_pool,
)
import matplotlib.pyplot as plt
import pandas as pd
import torch
import torch.nn.functional as F

from torch_geometric.nn import GCNConv, global_mean_pool
from sklearn.model_selection import KFold
import optuna
from torch.utils.data import Subset


In [25]:
# Check if a GPU is available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

Using device: cuda


# Preprocessing SMILES Data into Graphs

In [44]:
# Function to extract atom features
def atom_features(atom):
    return torch.tensor(
        [
            atom.GetAtomicNum(),            # Atomic number
            atom.GetDegree(),               # Number of connected neighbors
            atom.GetImplicitValence(),      # Implicit valence
            atom.GetFormalCharge(),         # Formal charge
            atom.GetIsAromatic(),           # Aromaticity
        ],
        dtype=torch.float,
    )


# Function to extract bond features
def bond_features(bond):
    bond_type = bond.GetBondTypeAsDouble()            # Bond type as a float
    is_aromatic = bond.GetIsAromatic()                # Aromatic bond
    is_conjugated = bond.GetIsConjugated()            # Conjugated bond
    is_in_ring = bond.IsInRing()                      # Whether the bond is part of a ring
    stereo = bond.GetStereo()                         # Bond stereochemistry
    
    # Convert stereo information to a one-hot encoded format
    stereo_one_hot = [0, 0, 0, 0]  # Stereo options: None, E, Z, Other
    if stereo == Chem.BondStereo.STEREONONE:
        stereo_one_hot[0] = 1
    elif stereo == Chem.BondStereo.STEREOE:
        stereo_one_hot[1] = 1
    elif stereo == Chem.BondStereo.STEREOZ:
        stereo_one_hot[2] = 1
    else:
        stereo_one_hot[3] = 1
    
    # Combine all features into a single tensor
    return torch.tensor(
        [bond_type, float(is_aromatic), float(is_conjugated), float(is_in_ring)] + stereo_one_hot, dtype=torch.float
    )


# Convert SMILES to PyTorch Geometric Data object
def smiles_to_graph(smiles, label):
    mol = Chem.MolFromSmiles(smiles)

    atom_features_list = []
    edge_index = []
    edge_attr = []

    # Nodes (atoms)
    for atom in mol.GetAtoms():
        atom_features_list.append(atom_features(atom))

    # Edges (bonds)
    for bond in mol.GetBonds():
        i = bond.GetBeginAtomIdx()
        j = bond.GetEndAtomIdx()
        
        # Append bidirectional edges for undirected graphs
        edge_index.append([i, j])
        edge_index.append([j, i])
        
        # Append bond features for both directions
        edge_attr.append(bond_features(bond))
        edge_attr.append(bond_features(bond))

    # Convert atom features to a tensor
    x = torch.stack(atom_features_list)

    # Convert edge indices and features to tensors, handle empty edge case
    if edge_index:
        edge_index = torch.tensor(edge_index, dtype=torch.long).t().contiguous()
        edge_attr = torch.stack(edge_attr)
    else:
        # Handle molecules with no bonds
        edge_index = torch.empty((2, 0), dtype=torch.long)
        edge_attr = torch.empty((0,), dtype=torch.float)

    # Label (target)
    y = torch.tensor([label], dtype=torch.long)

    return Data(x=x, edge_index=edge_index, edge_attr=edge_attr, y=y)



# Function to load data from CSV and apply SMILES augmentation for training
def load_data_from_csv(file_path):
    df = pd.read_csv(file_path)
    smiles_list = df["Smiles"].values
    labels = df["Liver"].apply(lambda x: 1 if x == "Hepatotoxicity" else 0).values

    data_list = []

    # Initialize the SmilesEnumerator for data augmentation
    for smiles, label in zip(smiles_list, labels):
        # For test data, no augmentation, just use canonical SMILES
        graph_data = smiles_to_graph(smiles, label)
        data_list.append(graph_data)

    return data_list

In [45]:
# Load training and testing data
training_data = load_data_from_csv("data_smiles/Training_Group.csv")
testing_data = load_data_from_csv("data_smiles/Testing_Group.csv")

# Create data loaders
train_loader = DataLoader(training_data, batch_size=32, shuffle=True)
test_loader = DataLoader(testing_data, batch_size=32, shuffle=False)

In [6]:
# for i, data in enumerate(test_loader):
#     print(data)

#     if i == 2:
#         break

DataBatch(x=[40, 5], edge_index=[2, 88], edge_attr=[88, 8], y=[1], batch=[40], ptr=[2])
DataBatch(x=[15, 5], edge_index=[2, 30], edge_attr=[30, 8], y=[1], batch=[15], ptr=[2])
DataBatch(x=[27, 5], edge_index=[2, 58], edge_attr=[58, 8], y=[1], batch=[27], ptr=[2])


# GCN approach

## 2. Define GCN Model

In [28]:
class GCN(torch.nn.Module):
    def __init__(self, num_node_features, num_classes, hidden_channels):
        super(GCN, self).__init__()
        self.conv1 = GCNConv(num_node_features, hidden_channels)
        self.conv2 = GCNConv(hidden_channels, hidden_channels)
        self.fc = torch.nn.Linear(hidden_channels, num_classes)

    def forward(self, data):
        x, edge_index, batch = data.x, data.edge_index, data.batch
        x = self.conv1(x, edge_index)
        x = F.relu(x)
        x = self.conv2(x, edge_index)
        x = F.relu(x)
        x = global_mean_pool(x, batch)
        return F.log_softmax(self.fc(x), dim=1)


## 3. Training Loop with Early Stopping and Metric Tracking

In [53]:
# Define the objective function for Optuna
def objective(trial):
    # Hyperparameters to tune
    hidden_channels = trial.suggest_int('hidden_channels', 16, 128)
    learning_rate = trial.suggest_float('learning_rate', 1e-5, 1e-2, log=True)
    weight_decay = trial.suggest_float('weight_decay', 1e-5, 1e-2, log=True)
    
    # Set up 5-fold cross-validation
    num_epochs = 50
    k_folds = 5
    kf = KFold(n_splits=k_folds, shuffle=True, random_state=42)
    
    # Load dataset
    dataset = load_data_from_csv("data_smiles/Training_Group.csv")
    num_node_features = 5
    num_classes = 2
    
    fold_accuracies = []
    
    for train_idx, test_idx in kf.split(dataset):
        # Use Subset to split the dataset based on train and test indices
        train_dataset = Subset(dataset, train_idx)
        test_dataset = Subset(dataset, test_idx)

        train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
        test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

        # Initialize the model, optimizer, and loss function
        model = GCN(num_node_features, num_classes, hidden_channels)
        optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate, weight_decay=weight_decay)
        
        # Training loop
        for epoch in range(num_epochs):
            model.train()
            for data in train_loader:
                optimizer.zero_grad()
                out = model(data)
                loss = F.nll_loss(out, data.y)
                loss.backward()
                optimizer.step()

        # Evaluation on test data
        model.eval()
        correct = 0
        total = 0
        with torch.no_grad():
            for data in test_loader:
                out = model(data)
                pred = out.argmax(dim=1)
                correct += (pred == data.y).sum().item()
                total += data.y.size(0)
        
        fold_accuracies.append(correct / total)
    
    # Return the average accuracy across all folds
    return sum(fold_accuracies) / len(fold_accuracies)

In [54]:
# Set up the Optuna study and run optimization
study = optuna.create_study(direction='maximize')
study.optimize(objective, n_trials=50)


[I 2024-10-15 20:30:25,447] Trial 1 finished with value: 0.6018979142375956 and parameters: {'hidden_channels': 56, 'learning_rate': 0.00040116355330898517, 'weight_decay': 3.284779124451802e-05}. Best is trial 1 with value: 0.6018979142375956.
[I 2024-10-15 20:31:08,393] Trial 2 finished with value: 0.6163848944163752 and parameters: {'hidden_channels': 72, 'learning_rate': 0.004960221156461526, 'weight_decay': 0.00020930099487375363}. Best is trial 2 with value: 0.6163848944163752.
[I 2024-10-15 20:31:54,454] Trial 3 finished with value: 0.5793205078378028 and parameters: {'hidden_channels': 112, 'learning_rate': 7.942332796144281e-05, 'weight_decay': 0.0008252519892737347}. Best is trial 2 with value: 0.6163848944163752.
[I 2024-10-15 20:32:41,235] Trial 4 finished with value: 0.6107624044565358 and parameters: {'hidden_channels': 117, 'learning_rate': 0.0003041727049453514, 'weight_decay': 1.9375522348387526e-05}. Best is trial 2 with value: 0.6163848944163752.
[I 2024-10-15 20:33:

In [55]:
# To print the best trial's hyperparameters
print("Best trial:")
trial = study.best_trial
print(f"AUC: {trial.value}")
print(f"Best hyperparameters: {trial.params}")

Best trial:
AUC: 0.6357332556030574
Best hyperparameters: {'hidden_channels': 80, 'learning_rate': 0.003477833496936736, 'weight_decay': 1.8809161976703587e-05}


## 4. Evaluation function

In [56]:
import numpy as np
from sklearn.metrics import roc_auc_score, accuracy_score, precision_score, recall_score


# Define the function to train and evaluate the model based on the best parameters
def evaluate_best_model(best_params, train_loader, test_loader):
    num_node_features = 5
    num_classes = 2

    # Initialize the model using the best hyperparameters
    model = GCN(num_node_features, num_classes, hidden_channels=best_params['hidden_channels']).to(device)
    optimizer = torch.optim.Adam(model.parameters(), 
                                 lr=best_params['learning_rate'], 
                                 weight_decay=best_params['weight_decay'])

    # Train the model on the full training data
    num_epochs = 50
    for epoch in range(num_epochs):
        model.train()
        for data in train_loader:
            optimizer.zero_grad()
            out = model(data)
            loss = F.nll_loss(out, data.y)
            loss.backward()
            optimizer.step()

    # Evaluate the model on the test set
    model.eval()
    y_true = []
    y_pred_proba = []
    with torch.no_grad():
        for data in test_loader:
            out = model(data)
            y_pred_proba.extend(F.softmax(out, dim=1)[:, 1].cpu().numpy())  # Get probabilities for the positive class
            y_true.extend(data.y.cpu().numpy())

    # Convert true labels and predictions to numpy arrays
    y_true = np.array(y_true)
    y_pred_proba = np.array(y_pred_proba)

    # Set the optimal threshold (e.g., 0.5, but you can further tune this if needed)
    optimal_threshold = 0.5
    y_pred = (y_pred_proba >= optimal_threshold).astype(int)

    # Calculate the metrics
    accuracy = accuracy_score(y_true, y_pred)
    auc_score = roc_auc_score(y_true, y_pred_proba)
    precision = precision_score(y_true, y_pred)
    recall = recall_score(y_true, y_pred)

    # Print or return the evaluation results
    print(f"Accuracy: {accuracy}")
    print(f"AUC: {auc_score}")
    print(f"Precision: {precision}")
    print(f"Recall: {recall}")


In [57]:
# Evaluate the model with the best hyperparameters
evaluate_best_model(trial.params, train_loader, test_loader)

Accuracy: 0.7797202797202797
AUC: 0.736094674556213
Precision: 0.8590909090909091
Recall: 0.8552036199095022
