In [25]:
# on s'est inspirés de https://github.com/HannesStark/GNN-primer/blob/main/GNN-primer_HIV_classification.ipynb

In [26]:
import pandas as pd
import numpy as np
import torch
from torch.nn import Linear
from torch.nn.functional import relu, dropout
from torch_geometric.data import Data, DataLoader
from torch_geometric.nn import GCNConv, global_mean_pool
from sklearn.model_selection import train_test_split
from sklearn.metrics import roc_auc_score, roc_curve, auc
import matplotlib.pyplot as plt
from rdkit import Chem

In [27]:
def smiles_to_graph(smiles, label=None, additional_features=None):
    mol = Chem.MolFromSmiles(smiles)
    if mol is None:
        return None

    atom_features = torch.tensor([atom.GetAtomicNum() for atom in mol.GetAtoms()], dtype=torch.float).view(-1, 1)
    if additional_features is not None:
        additional_features = torch.tensor(additional_features, dtype=torch.float32).view(1, -1)
        atom_features = torch.cat((atom_features, additional_features.repeat(atom_features.size(0), 1)), dim=1)

    # Edge indices and edge attributes
    edge_index = []
    edge_attr = []

    for bond in mol.GetBonds():
        i = bond.GetBeginAtomIdx()
        j = bond.GetEndAtomIdx()
        bond_type = bond.GetBondTypeAsDouble()

        
        edge_index.append([i, j])
        edge_index.append([j, i])  
        edge_attr.append(bond_type)
        edge_attr.append(bond_type)

    edge_index = torch.tensor(edge_index, dtype=torch.long).t().contiguous()  # Shape [2, num_edges]
    edge_attr = torch.tensor(edge_attr, dtype=torch.float).view(-1, 1)  

    
    data = Data(
        x=atom_features,  
        edge_index=edge_index, 
        edge_attr=edge_attr, 
        y=torch.tensor([label], dtype=torch.float32) if label is not None else None  # Label
    )

    return data

In [28]:

# Load dataset
train_df = pd.read_csv('/Users/mario/Desktop/data/train.csv') # à adapter

# Split into training and validation sets
train_data, val_data = train_test_split(train_df, test_size=0.2)


train_graphs = [
    smiles_to_graph(row['smiles'], label=row['class'], additional_features=row.iloc[1:-1].tolist())
    for _, row in train_data.iterrows()
]
val_graphs = [
    smiles_to_graph(row['smiles'], label=row['class'], additional_features=row.iloc[1:-1].tolist())
    for _, row in val_data.iterrows()
]

train_graphs = [g for g in train_graphs if g is not None]
val_graphs = [g for g in val_graphs if g is not None]

# Data loaders
train_loader = DataLoader(train_graphs, batch_size=32, shuffle=True)
val_loader = DataLoader(val_graphs, batch_size=32, shuffle=False)


KeyboardInterrupt: 

In [None]:
print(train_graphs[0])

In [None]:
class GNN(torch.nn.Module):
    def __init__(self, num_layers=4):
        super(GNN, self).__init__()
        self.conv1 = GCNConv(4296, 128)  # 4296 input features (atomic + molecular descriptors)
        self.conv2 = GCNConv(128, 64)    # 128 features from previous layer
        self.fc1 = Linear(64, 32)        # 64 features from previous layer
        self.fc2 = Linear(32, 1)         # Output 1 feature (logit for binary classification) À changer plus tard pour meiux 

    def forward(self, data):
        x, edge_index, batch = data.x, data.edge_index, data.batch
        
        x = self.conv1(x, edge_index)
        x = relu(x)  # ReLU activation
        x = self.conv2(x, edge_index)
        x = relu(x)  # ReLU activation
        x = global_mean_pool(x, batch) 
        
        x = self.fc1(x)  
        x = relu(x)  
        x = self.fc2(x)  
        
        return torch.sigmoid(x).view(-1)  


In [None]:
def train_model(model, train_loader, val_loader, epochs=17, lr=0.0001):
    optimizer = torch.optim.Adam(model.parameters(), lr=lr)
    criterion = torch.nn.BCEWithLogitsLoss()

    train_auc = []
    val_auc = []

    for epoch in range(epochs):
       
        model.train()
        train_preds, train_labels = [], []
        for batch in train_loader:
            optimizer.zero_grad()
            pred = model(batch)
            loss = criterion(pred, batch.y)
            loss.backward()
            optimizer.step()

            train_preds.append(pred.detach().numpy())
            train_labels.append(batch.y.numpy())

        model.eval()
        val_preds, val_labels = [], []
        with torch.no_grad():
            for batch in val_loader:
                pred = model(batch)
                val_preds.append(pred.numpy())
                val_labels.append(batch.y.numpy())

        
        train_auc.append(roc_auc_score(np.concatenate(train_labels), np.concatenate(train_preds)))
        val_auc.append(roc_auc_score(np.concatenate(val_labels), np.concatenate(val_preds)))

        print(f"Epoch {epoch+1}/{epochs} - Train AUC: {train_auc[-1]:.4f}, Val AUC: {val_auc[-1]:.4f}")

    return train_auc, val_auc


In [None]:

model = GNN()
train_auc, val_auc = train_model(model, train_loader, val_loader)

# Plot AUC-ROC
plt.figure(figsize=(8, 6))
plt.plot(train_auc, label="Train AUC")
plt.plot(val_auc, label="Validation AUC")
plt.xlabel("Epochs")
plt.ylabel("AUC-ROC")
plt.title("Training and Validation AUC-ROC")
plt.legend()
plt.show()

In [None]:
import torch
import matplotlib.pyplot as plt
from sklearn.metrics import roc_curve, roc_auc_score, precision_recall_curve, cohen_kappa_score
import numpy as np
from torch_geometric.data import DataLoader


def predict_probabilities(model, val_loader, device):
    model.eval()  # Set model to evaluation mode
    all_labels = []
    all_preds = []

    with torch.no_grad():
        for batch in val_loader:
            batch = batch.to(device)  #
            output = model(batch)  # Forward pass
            probs = torch.sigmoid(output)  # Convert logits to probabilities
            all_preds.extend(probs.cpu().numpy())  # Store predicted probabilities
            all_labels.extend(batch.y.cpu().numpy())  # Store true labels

    return np.array(all_labels), np.array(all_preds)

def calculate_kappa(model, val_loader, device, best_threshold):
    model.eval()  # Set model to evaluation mode
    true_labels, predicted_labels = [], []
    
    with torch.no_grad():
        for batch in val_loader:
            batch = batch.to(device)  # Send batch to device (GPU or CPU)
            logits = model(batch)  # Forward pass
            probs = torch.sigmoid(logits)  # Get probabilities from logits
            preds = (probs > best_threshold).float()  # 

            true_labels.extend(batch.y.cpu().numpy())  # Store true labels
            predicted_labels.extend(preds.cpu().numpy())  # Store predicted labels

    # Calculate Cohen's Kappa score
    kappa = cohen_kappa_score(true_labels, predicted_labels)
    print(f"Cohen's Kappa score: {kappa:.4f}")



# Function to calculate and plot the ROC curve
def plot_roc_curve(model, val_loader, device):
    true_labels, predicted_probs = predict_probabilities(model, val_loader, device)
    
    # Compute ROC curve and ROC AUC
    fpr, tpr, thresholds = roc_curve(true_labels, predicted_probs)
    auc = roc_auc_score(true_labels, predicted_probs)
    
    # Plot ROC curve
    plt.figure(figsize=(8, 6))
    plt.plot(fpr, tpr, label=f'ROC curve (AUC = {auc:.2f})')
    plt.plot([0, 1], [0, 1], color='gray', linestyle='--')
    plt.xlim([0.0, 1.0])
    plt.ylim([0.0, 1.05])
    plt.xlabel('False Positive Rate')
    plt.ylabel('True Positive Rate')
    plt.title('Receiver Operating Characteristic (ROC) Curve')
    plt.legend(loc="lower right")
    plt.show()

# Assuming you have a validation DataLoader, e.g., val_loader
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = model.to(device)  # Send model to device (e.g., GPU)

# Call the function to plot ROC curve
plot_roc_curve(model, val_loader, device)

true_labels, pred_probs = predict_probabilities(model, val_loader, device)
    
precision, recall, thresholds = precision_recall_curve(true_labels, pred_probs)
f1_scores = 2 * recall * precision / (recall + precision)
best_threshold = thresholds[f1_scores.argmax()]
print(best_threshold)


calculate_kappa(model, val_loader, device, best_threshold)



In [None]:
test_data = pd.read_csv('/Users/mario/Desktop/data/test_1.csv')

test_data.iloc[0]

In [None]:

# Load the test dataset
test_data = pd.read_csv('/Users/mario/Desktop/data/test_1.csv')

# Prepare test graphs with additional features
#test_graphs = prepare_test_data(test1_df, smiles_to_graph)

test_graphs = [
    smiles_to_graph(row['smiles'], label=None, additional_features=row.iloc[1:].tolist())
    for _, row in test_data.iterrows()
]

# Create DataLoader for test set (batch size of 32)
test_loader = DataLoader(test_graphs, batch_size=32, shuffle=False)

# Function to predict classes using the trained model
def predict_classes(model, test_loader, device):
    model.eval()  # Set model to evaluation mode
    predictions = []

    with torch.no_grad():
        for batch in test_loader:
            batch = batch.to(device)  # Send batch to device (e.g., GPU)
            logits = model(batch)  
            probs = torch.sigmoid(logits)  
            #print(probs)
            preds = (probs > best_threshold).float()  # Threshold at 0.5 for binary classification
            predictions.extend(preds.cpu().numpy())  # Convert to CPU and numpy

    return predictions

# Predict on test data
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = model.to(device)  # Send model to device (e.g., GPU)

predicted_classes = predict_classes(model, test_loader, device)

# Create a DataFrame with only smiles and predicted class (0 or 1)
test1_df_predictions = pd.DataFrame({
    'smiles': test1_df['smiles'],
    'class': predicted_classes
})

test1_df_predictions.to_csv('test1_predictions.csv', index=False)

print("Predictions saved to test1_predictions.csv")

In [None]:
class_counts = train_df['class'].value_counts()
print("Class distribution in training data:")
print(class_counts)

In [None]:


class_counts = test1_df_predictions['class'].value_counts()
print("Class distribution in test data:")
print(class_counts)

In [None]:

# Load the test dataset
test_data2 = pd.read_csv('/Users/mario/Desktop/data/test_2.csv')

# Prepare test graphs with additional features
#test_graphs = prepare_test_data(test1_df, smiles_to_graph)

test_graphs2 = [
    smiles_to_graph(row['smiles'], label=None, additional_features=row.iloc[2:].tolist())
    for _, row in test_data2.iterrows()
]

# Create DataLoader for test set (batch size of 32)
test_loader2 = DataLoader(test_graphs2, batch_size=32, shuffle=False)

# Function to predict classes using the trained model
def predict_classes(model, test_loader, device):
    model.eval()  # Set model to evaluation mode
    predictions = []

    with torch.no_grad():
        for batch in test_loader:
            batch = batch.to(device)  # Send batch to device (e.g., GPU)
            logits = model(batch)  # Forward pass
            probs = torch.sigmoid(logits)  # Convert logits to probabilities (sigmoid for binary classification)
            print(probs)
            preds = (probs > best_threshold).float()  # Threshold at 0.5 for binary classification
            predictions.extend(preds.cpu().numpy())  # Convert to CPU and numpy

    return predictions

# Predict on test data
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = model.to(device)  # Send model to device (e.g., GPU)

predicted_classes2 = predict_classes(model, test_loader2, device)

# Create a DataFrame with only smiles and predicted class (0 or 1)
test2_df_predictions = pd.DataFrame({
    'smiles': test_data2['smiles'],
    'class': predicted_classes2
})

# Save the predictions to a new CSV file with only smiles and predicted class
test2_df_predictions.to_csv('test2_predictions.csv', index=False)

print("Predictions saved to test2_predictions.csv")

In [24]:
class_counts = test2_df_predictions['class'].value_counts()
print("Class distribution in test2 data:")
print(class_counts)

Class distribution in test2 data:
class
0.0    279
1.0    199
Name: count, dtype: int64


In [204]:
import scipy.stats as stats

# Standard normal distribution CDF values for 0.05 and -0.05
probability = stats.norm.cdf(0.05) - stats.norm.cdf(-0.05)
print(probability)

0.03987761167674497


In [205]:
import pandas as pd
import torch
from torch_geometric.data import DataLoader

# Assuming model, device, and test_loader are defined
def predict_probabilities(model, test_loader, device):
    model.eval()  # Set model to evaluation mode
    probabilities = []

    with torch.no_grad():
        for batch in test_loader:
            batch = batch.to(device)  # Send batch to device (e.g., GPU)
            logits = model(batch)  # Forward pass
            probs = torch.sigmoid(logits)  # Convert logits to probabilities (sigmoid for binary classification)
            probabilities.extend(probs.cpu().numpy())  # Convert to CPU and numpy

    return probabilities

# Predict probabilities on test data
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = model.to(device)  # Send model to device (e.g., GPU)

probabilities = predict_probabilities(model, test_loader, device)

# Assuming you have the smiles in the test1_df (test set)
test1_df['probability'] = probabilities  # Add the probabilities to the DataFrame

# Sort by probabilities in descending order and get top 200 molecules
top_200_molecules = test1_df.sort_values(by='probability', ascending=False).head(200)

# Save the top 200 molecules with their smiles and probabilities to a CSV
top_200_molecules = top_200_molecules[['smiles', 'probability']]
top_200_molecules.to_csv('top_200_molecules_predictions.csv', index=False)

print("Top 200 molecules with highest probabilities saved to 'top_200_molecules_predictions.csv'")


Top 200 molecules with highest probabilities saved to 'top_200_molecules_predictions.csv'
