In [2]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from tqdm import tqdm
import numpy as np
import pandas as pd
from sklearn.metrics import cohen_kappa_score,accuracy_score, precision_score, recall_score, f1_score
from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plt

In [3]:
from rdkit import Chem, DataStructs
from rdkit.Chem import AllChem
from PyBioMed.PyMolecule.cats2d import CATS2D
from PyBioMed.PyMolecule import cats2d


In [4]:
def convert_to_graph(smiles_list):
    adj = []
    adj_norm = []
    features = []
    maxNumAtoms = 100
    cnt = 0
    new_smiles_list = []
    for i in smiles_list:
        cnt+=1
        # Mol
        iMol = Chem.MolFromSmiles(i.strip())
        #Adj
        iAdjTmp = Chem.rdmolops.GetAdjacencyMatrix(iMol)
        # Feature
        if( iAdjTmp.shape[0] <= maxNumAtoms):
            # Feature-preprocessing
            iFeature = np.zeros((maxNumAtoms, 65))
            iFeatureTmp = []
            for atom in iMol.GetAtoms():
                iFeatureTmp.append( atom_feature(atom) ) ### atom features only
            iFeature[0:len(iFeatureTmp), 0:65] = iFeatureTmp ### 0 padding for feature-set
            features.append(iFeature)

            # Adj-preprocessing
            iAdj = np.zeros((maxNumAtoms, maxNumAtoms))
            iAdj[0:len(iFeatureTmp), 0:len(iFeatureTmp)] = iAdjTmp + np.eye(len(iFeatureTmp))
            adj.append(np.asarray(iAdj))
            new_smiles_list.append(i)
        else :
            print("Molecule is too big")
            adj_norm.append()
            
    features = np.asarray(features)
    adj = np.asarray(adj)
    # ensure the the length is the same as the input
    assert len(features) == len(smiles_list)
    return features, adj, new_smiles_list

def one_of_k_encoding_unk(x, allowable_set):
    """Maps inputs not in the allowable set to the last element."""
    if x not in allowable_set:
        x = allowable_set[-1]
    return list(map(lambda s: x == s, allowable_set))

def one_of_k_encoding(x, allowable_set):
    if x not in allowable_set:
        raise Exception("input {0} not in allowable set{1}:".format(x, allowable_set))
    return list(map(lambda s: x == s, allowable_set))

def atom_feature(atom):
    return np.array(one_of_k_encoding_unk(atom.GetSymbol(),
                                      ['C', 'N', 'O', 'S', 'F', 'H', 'Si', 'P', 'Cl', 'Br',
                                       'Li', 'Na', 'K', 'Mg', 'Ca', 'Fe', 'As', 'Al', 'I', 'B',
                                       'V', 'Tl', 'Sb', 'Sn', 'Ag', 'Pd', 'Co', 'Se', 'Ti', 'Zn',
                                       'Ge', 'Cu', 'Au', 'Ni', 'Cd', 'Mn', 'Cr', 'Pt', 'Hg', 'Pb']) +
                    one_of_k_encoding(atom.GetDegree(), [0, 1, 2, 3, 4, 5]) +
                    one_of_k_encoding(atom.GetTotalNumHs(), [0, 1, 2, 3, 4]) +
                    one_of_k_encoding(atom.GetTotalValence(), [0, 1, 2, 3, 4, 5, 6]) + [atom.GetIsAromatic()] + get_ring_info(atom))

def get_ring_info(atom):
    ring_info_feature = []
    for i in range(3, 9):
        if atom.IsInRingSize(i):
            ring_info_feature.append(1)
        else:
            ring_info_feature.append(0)
    return ring_info_feature

MODEL de GNN

In [5]:

class GraphNeuralNetwork(nn.Module):
    def __init__(self, num_features, hidden_channels, num_gcn_layers, 
                 dnn_hidden_nodes, num_dnn_layers, dropout_rate, l2_lambda, num_classes=1):
        """
        Enhanced Graph Neural Network with adaptive batch normalization
        """
        super(GraphNeuralNetwork, self).__init__()
        
        # Regularization parameters
        self.dropout_rate = dropout_rate
        self.l2_lambda = l2_lambda
        
        # Graph Convolution Layers
        self.gcn_layers = nn.ModuleList()
        
        # Input layer
        self.gcn_layers.append(
            nn.Sequential(
                nn.Linear(num_features, hidden_channels),
                nn.ReLU(),
                nn.BatchNorm1d(hidden_channels, momentum=0.1),
                nn.Dropout(dropout_rate)
            )
        )
        
        # Add GCN layers
        for _ in range(num_gcn_layers - 1):
            self.gcn_layers.append(
                nn.Sequential(
                    nn.Linear(hidden_channels, hidden_channels),
                    nn.ReLU(),
                    nn.BatchNorm1d(hidden_channels, momentum=0.1),
                    nn.Dropout(dropout_rate)
                )
            )
        
        # Hidden Layers
        self.dnn_layers = nn.ModuleList()
        input_size = hidden_channels
        
        for _ in range(num_dnn_layers):
            self.dnn_layers.append(
                nn.Sequential(
                    nn.Linear(input_size, dnn_hidden_nodes),
                    nn.ReLU(),
                    nn.BatchNorm1d(dnn_hidden_nodes, momentum=0.1),
                    nn.Dropout(dropout_rate)
                )
            )
            input_size = dnn_hidden_nodes
        
        # Final output layer
        self.output_layer = nn.Linear(input_size, num_classes)
    
    def graph_convolution(self, X, A):
        """
        Enhanced graph convolution with adjacency normalization
        """
        # Normalize adjacency matrix
        degrees = torch.sum(A, dim=2)
        D_inv_sqrt = torch.pow(degrees + 1e-7, -0.5)
        A_norm = A * D_inv_sqrt.unsqueeze(-1) * D_inv_sqrt.unsqueeze(-2)
        
        for layer in self.gcn_layers:
            # Linear transformation
            X = layer[0](X)
            
            # Message passing
            X = torch.bmm(A_norm, X)
            
            # Activation
            X = layer[1](X)
            
            # Batch normalization 
            X = layer[2](X.transpose(1, 2)).transpose(1, 2)

            # Dropout
            X = layer[3](X)
        
        return X
    
    def forward(self, X, A):
        """
        Forward pass with graph convolution and regularization
        """
        # Graph convolution
        X = self.graph_convolution(X, A)
        
        # Global pooling
        X = torch.mean(X, dim=1)
        
        # Dense layers
        for layer in self.dnn_layers:
            X = layer[0](X)  # Linear
            X = layer[1](X)  # ReLU
            X = layer[2](X)  # BatchNorm
            X = layer[3](X)  # Dropout
        
        # Output layer
        return self.output_layer(X)


class GraphDataset(Dataset):
    def __init__(self, X_data, A_data, y_data):
        """
        Dataset with tensor conversion
        """
        self.X = torch.tensor(X_data, dtype=torch.float32)
        self.A = torch.tensor(A_data, dtype=torch.float32)
        self.y = torch.tensor(y_data, dtype=torch.float32).view(-1, 1)
    
    def __len__(self):
        return len(self.X)
    
    def __getitem__(self, idx):
        return self.X[idx], self.A[idx], self.y[idx]


def train_graph_neural_network(
    model, train_loader,test_loader, val_loader=None, 
    epochs=100, 
    learning_rate=0.001, 
    weight_decay=1e-5, 
    patience=10, 
    save_path="gnn_model.pth"
):
    """
    Training function with early stopping and validation
    """
    # Optimizer with weight decay
    optimizer = optim.AdamW(
        model.parameters(), 
        lr=learning_rate, 
        weight_decay=weight_decay
    )
    
    # Learning rate scheduler
    scheduler = optim.lr_scheduler.ReduceLROnPlateau(
        optimizer, mode='min', factor=0.5, patience=5
    )
    
    # Loss function
    criterion = nn.BCEWithLogitsLoss()
    
    # Early stopping
    best_val_loss = float('inf')
    patience_counter = 0
        
    plt.ion()
    fig, ax = plt.subplots()
    epoches = []
    kappa_scores = []
    for epoch in range(epochs):
        # Training phase
        model.train()
        train_losses = []
        train_preds = []
        train_true = []
        
        for X_batch, A_batch, y_batch in tqdm(train_loader, desc=f"Epoch {epoch+1}"):
            optimizer.zero_grad()
            outputs = model(X_batch, A_batch)
            loss = criterion(outputs, y_batch)
            loss.backward()
            optimizer.step()
            
            train_losses.append(loss.item())
            train_preds.extend(torch.sigmoid(outputs).detach().numpy())
            train_true.extend(y_batch.numpy())
        
        # Validation phase
        if val_loader:
            model.eval()
            val_losses = []
            val_preds = []
            val_true = []
            
            with torch.no_grad():
                for X_batch, A_batch, y_batch in val_loader:
                    outputs = model(X_batch, A_batch)
                    loss = criterion(outputs, y_batch)
                    val_losses.append(loss.item())
                    
                    val_preds.extend(torch.sigmoid(outputs).numpy())
                    val_true.extend(y_batch.numpy())
            
            # Compute metrics
            val_loss = np.mean(val_losses)
            val_preds_binary = (np.array(val_preds) > 0.5).astype(int)
            
            # Logging and early stopping
            print(f"Epoch {epoch+1}: Val Loss {val_loss:.4f}")
            
            if val_loss < best_val_loss:
                best_val_loss = val_loss
                patience_counter = 0
                torch.save(model.state_dict(), save_path)
            else:
                patience_counter += 1
            
            # Learning rate scheduling
            scheduler.step(val_loss)
            
            
        # Évaluation après l'époque (par exemple sur val_loader)
        d = evaluate_graph_neural_network(model, test_loader)
        kappa = d['Cohens_kappa']

        # Stockage
        epoches.append(epoch)
        kappa_scores.append(kappa)
        # print(epoches)
        # print(kappa_scores)

        # # Mise à jour du graphique
        # plt.plot(epoches, kappa_scores, label='Cohen\'s Kappa')
        # plt.xlabel('Epochs')
        # plt.ylabel('Cohen\'s Kappa')
        # plt.show()
    return model


def evaluate_graph_neural_network(model, dataloader):
    """
    Comprehensive model evaluation
    """
    model.eval()
    all_preds = []
    all_true = []
    
    with torch.no_grad():
        for X_batch, A_batch, y_batch in dataloader:
            outputs = model(X_batch, A_batch)
            preds = torch.sigmoid(outputs).numpy().squeeze()
            preds_binary = (preds > 0.5).astype(int)
            
            all_preds.extend(preds_binary)
            all_true.extend(y_batch.numpy())
    
    # Compute metrics
    accuracy = accuracy_score(all_true, all_preds)
    precision = precision_score(all_true, all_preds)
    recall = recall_score(all_true, all_preds)
    f1 = f1_score(all_true, all_preds)
    kappa = cohen_kappa_score(all_true, all_preds)
    
    # print(f"Accuracy: {accuracy:.4f}")
    # print(f"Precision: {precision:.4f}")
    # print(f"Recall: {recall:.4f}")
    # print(f"F1 Score: {f1:.4f}")
    print(f"Cohen's Kappa: {kappa:.4f}")
    # Return metrics as a dictionary
    return {
        'accuracy': accuracy,
        'precision': precision,
        'recall': recall,
        'f1_score': f1,
        'Cohens_kappa': kappa
    }


TRAINING

In [6]:
data_train = pd.read_csv("train/train_80.csv")
data_test = pd.read_csv("train/valid_20.csv")

In [7]:

smiles_list = data_train['smiles'].tolist() 
smiles_list_test = data_test['smiles'].tolist()
y_train = data_train['class'].values
y_test = data_test['class'].values

# Convert to graph data

X_train, A_train, smiles_list = convert_to_graph(smiles_list)
X_test, A_test, smiles_list_test = convert_to_graph(smiles_list_test)

    # Model hyperparameters
num_features = X_train.shape[2]
gnn_params = {
        'num_features': num_features,
        'hidden_channels': 128,
        'num_gcn_layers': 4,
        'dnn_hidden_nodes': 512,
        'num_dnn_layers': 2,
        'dropout_rate': 0.33356257977269954,
        'l2_lambda': 0.0007517360053320633
    }

    # Prepare datasets
train_dataset = GraphDataset(X_train, A_train, y_train)
test_dataset = GraphDataset(X_test, A_test, y_test)
    
train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=64)

    # train model and save weights
model = GraphNeuralNetwork(**gnn_params)
model = train_graph_neural_network(model, train_loader,test_loader, val_loader=None, epochs=200, learning_rate=0.0001, weight_decay=1e-5, patience=2, save_path="gnn_model.pth")


    # # Evaluate the model
evaluate_graph_neural_network(model, test_loader)


KeyboardInterrupt: 

In [None]:
torch.save(model.state_dict(), "gnn_model_final.pth")

In [8]:
model = GraphNeuralNetwork(**gnn_params)  # instancier ton modèle avec ses paramètres
model.load_state_dict(torch.load("gnn_model_final.pth"))  # charger les poids sauvegardés
model.eval()

data1 = pd.read_csv("train/valid_20.csv")
new_smiles_list = data1["smiles"].tolist()

colonne1=data1['smiles']


X_new, A_new, _ = convert_to_graph(new_smiles_list)

new_dataset = GraphDataset(X_new, A_new, np.zeros(len(X_new)))  # y factice
new_loader = DataLoader(new_dataset, batch_size=64, shuffle=False)

# 3. Prédire
all_preds = []
with torch.no_grad():
    for X_batch, A_batch, _ in new_loader:
        outputs = model(X_batch, A_batch)
        preds = torch.sigmoid(outputs).squeeze().numpy()
        all_preds.extend(preds)

serie = pd.Series(all_preds, name='predictions')
merged_df = pd.concat([colonne1, serie], axis=1)
print(merged_df.head(30))

# 4. Sauvegarder les résultats
merged_df.to_csv("predictions_GNN.csv", index=False)

new_data = pd.read_csv("data/test_1.csv")


                                               smiles  predictions
0   O=C(O)C(Cc1ccccc1)N1CCC(CN2CCC(Oc3ccc(CO)c(Cl)...     0.005069
1   CN1CCN(CCCn2nc(C3=C(c4cn(-c5ccc6ccccc6c5)c5ccc...     0.978255
2   OCC1(N2CCN(C3CCc4ccc(OCc5noc(-c6ccc(Cl)cc6)n5)...     0.978666
3   CCn1c(=O)oc2ccc(-c3ccc(CC(C#N)NC(=O)C4CNCCCO4)...     0.004499
4   O=C(C1CC1c1ccc(C(F)(F)F)cc1)N1CCN(S(=O)(=O)c2c...     0.998825
5   O=C(O)CCCCOc1ccc2ncc(F)c(CCC34CCC(NCc5ccc6c(n5...     0.287143
6   COc1nc(N2CC3C(=O)N(C)C(=N)NC3(c3ccc(F)cc3F)C2)...     0.645876
7     CNC(=O)c1ccc(-c2ccc3c(c2)CCN(CCN2CCCC2)C3=O)cc1     0.341835
8     Nc1nc(-c2ccc(F)cc2)cn1CC(O)c1ccc(C(F)(F)F)cc1Cl     0.581977
9         CCCNC(=O)C1c2ccccc2C(=O)N1CC(C)c1ccc(Cl)cc1     0.215037
10         Cn1cc(C2CC3CSC(N)=NC3(c3ccc(F)cc3F)CO2)cn1     0.370659
11    CCCCCN(CCC12CC3CC(CC(C3)C1)C2)C(=O)NCCCc1ccncc1     0.908153
12  O=[N+]([O-])c1ccc(CCN2CCN(CCc3ccc([N+](=O)[O-]...     0.999863
13  Cn1c(SCCCN2CCC3(CC3c3ccc(C(F)(F)F)cc3)C2)nnc1-...     0.99

In [14]:
import torch
from torch.utils.data import DataLoader
import pandas as pd
import numpy as np

# 1. Charger le modèle entraîné
model = GraphNeuralNetwork(**gnn_params)  # Remplace par tes vrais paramètres
model.load_state_dict(torch.load("gnn_model_final.pth"))  # Charger les poids
model.eval()

# 2. Charger les nouveaux SMILES à prédire
new_data = pd.read_csv("/Users/rayanedakhlaoui/Desktop/VivaAfricAI/Final/test/test_1.csv")
new_smiles_list = new_data["smiles"].tolist()  # Assure-toi que la colonne s'appelle "smiles"

# 3. Transformer les SMILES en graphes
X_new, A_new, _ = convert_to_graph(new_smiles_list)
new_dataset = GraphDataset(X_new, A_new, np.zeros(len(X_new)))  # Labels factices
new_loader = DataLoader(new_dataset, batch_size=64, shuffle=False)

# 4. Prédire
all_preds = []
with torch.no_grad():
    for X_batch, A_batch, _ in new_loader:
        outputs = model(X_batch, A_batch)
        preds = torch.sigmoid(outputs).squeeze().numpy()
        all_preds.extend(preds)

# 5. Fusionner et sauvegarder
serie = pd.Series(all_preds, name='predictions')
merged_df = pd.concat([new_data["smiles"], serie], axis=1)
merged_df.to_csv("predictions_GNN.csv", index=False)

print("Fichier predictions_GNN.csv généré avec succès.")


Fichier predictions_GNN.csv généré avec succès.


In [12]:
new_data.shape

(750, 4296)

In [13]:
pd.read_csv("predictions_GNN.csv").shape

(750, 2)