In [None]:
import pandas as pd
import gzip
from sklearn.neural_network import MLPClassifier
import numpy as np
from sklearn.ensemble import VotingClassifier
import xgboost as xgb
from sklearn.model_selection import cross_validate, StratifiedKFold
from sklearn.model_selection import RepeatedStratifiedKFold
from statistics import mean
import time
from sklearn.model_selection import train_test_split, cross_val_score, KFold, GridSearchCV
from sklearn.preprocessing import StandardScaler,label_binarize, MinMaxScaler
from sklearn.preprocessing import LabelEncoder
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score, make_scorer,roc_curve, auc
from sklearn.tree import DecisionTreeClassifier
from sklearn.linear_model import Lasso,LinearRegression, LassoCV
import matplotlib.ticker as ticker
import seaborn as sns
from tqdm import tqdm
from xgboost import XGBClassifier
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.linear_model import Ridge
import matplotlib.pyplot as plt
from sklearn.svm import SVC
from sklearn.metrics import confusion_matrix, classification_report, accuracy_score, precision_score, recall_score
from sklearn.neighbors import KNeighborsClassifier
import shap
import warnings
from sklearn.feature_selection import SelectKBest, RFE
from sklearn.feature_selection import f_regression
from sklearn.decomposition import PCA
from rdkit import Chem
from rdkit.Chem import MACCSkeys
from PyFingerprint.fingerprint import get_fingerprint, get_fingerprints
# Suppress RDKit deprecation warnings
from rdkit import RDLogger
RDLogger.DisableLog('rdApp.warning')
import glob
import re

In [None]:
def calculate_fingerprints(df, smiles_col, fp_type):
    """
    Calculates fingerprints for a given type and add them to the dataframe.

    Parameters:
    - df (pd.DataFrame): The input DataFrame with a SMILES column.
    - smiles_col (str): Name of the column containing SMILES strings.
    - fp_type (str): The fingerprint type to calculate (from predefined lists).
    
    Returns:
    - pd.DataFrame: The original DataFrame with fingerprint columns added.
    """
    # Checking if the fingerprint type is valid
    valid_types = ['standard', 'extended', 'graph', 'maccs', 'pubchem', 'estate', 
                   'hybridization', 'lingo', 'klekota-roth', 'shortestpath', 
                   'cdk-substructure', 'rdkit', 'morgan', 'rdk-maccs', 
                   'topological-torsion', 'avalon', 'atom-pair', 'mol2vec']
    
    if fp_type not in valid_types:
        raise ValueError(f"Invalid fingerprint type '{fp_type}'. Choose from {valid_types}.")
    
    # a helper function for fingerprint calculation
    def calculate_fp(smi):
        try:
            fp = get_fingerprint(smi, fp_type)
            # Convert to list for easier addition to DataFrame
            return fp.to_numpy().tolist() if hasattr(fp, 'to_numpy') else None
        except Exception as e:
            print(f"Error generating fingerprint for SMILES '{smi}': {e}")
            return None
    
    # Calculates fingerprints and store as a new column
    fingerprints = df[smiles_col].apply(calculate_fp)
    
    # Splits fingerprint lists into separate columns
    fingerprint_df = pd.DataFrame(fingerprints.tolist(), index=df.index)
    fingerprint_df.columns = [f"{fp_type}_fp_{i}" for i in range(fingerprint_df.shape[1])]
    
    return_df = pd.concat([df, fingerprint_df], axis=1)
    ic50_column = return_df.pop('IC50')  
    return_df['IC50'] = ic50_column         

    # Removes the 'SMILES' column
    return_df = return_df.drop(columns=['SMILES'])
    # Combines with the original DataFrame
    return return_df

def model_pipeline(df_smiles, smiles_col, perform_feature_selection=False, perform_pca=False):
    """
    Performs the full model development and evaluation pipeline for all fingerprints.
    
    Parameters:
    - df_smiles (pd.DataFrame): The DataFrame containing the SMILES strings. (data resulting from data preparation process in 
    previous code file)
    - smiles_col (str): The column containing SMILES strings.
    - perform_feature_selection (bool): Whether to perform feature selection using Lasso. Default is False.
    
    Returns:
    - None: This function prints the evaluation results for each fingerprint and model.
    """
    
    results = []

    feature_flag = False
    pca_flag = False
    # List of fingerprint types
    fingerprint_types = ['standard', 'extended', 'graph', 'maccs', 'pubchem', 'estate', 
                   'hybridization', 'lingo', 'klekota-roth', 'shortestpath', 
                   'cdk-substructure', 'rdkit', 'morgan', 'rdk-maccs', 
                   'topological-torsion', 'avalon', 'atom-pair', 'mol2vec']

    # Defines parameter grids for each model
    param_grids = {
        "RandomForest": {
             "n_estimators": [150]
        },
         "DecisionTree": {
             "max_depth": [5, 10, 15, 20, None],
             "min_samples_split": [2, 5, 10],
             "min_samples_leaf": [1, 2, 4],
             "max_features": ['sqrt', 'log2', None]
         },
         "SVM": {
             "C": [0.1, 1, 10, 100],
             "kernel": ["linear", "rbf", "poly", "sigmoid"],
             "gamma": ["scale", "auto"]
         },
         "XGBoost": {
             "n_estimators": [150]
         },
         "KNN": {
             "n_neighbors": [3, 5, 7, 10],
             "weights": ['uniform', 'distance'],
             "metric": ['euclidean', 'manhattan', 'chebyshev']
         }, 
         "MLP": {
         "hidden_layer_sizes": [(100, 100)],
         "activation": ["tanh"],
         "solver": ["sgd"],
         "alpha": [0.0001],
         "learning_rate": ["adaptive"]
         }
    }

    # Defines models for each algorithm
    models = {
        "RandomForest": RandomForestClassifier(random_state=42),
        "DecisionTree": DecisionTreeClassifier(random_state=42),
        "SVM": SVC(probability=True, random_state=42),
        "XGBoost": XGBClassifier(eval_metric='logloss', random_state=42),
        "KNN": KNeighborsClassifier(), 
        "MLP": MLPClassifier(random_state=42, max_iter=10000)
    }

    # Function to calculate and print metrics
    def evaluate_and_print_results(model_name, y_test, y_pred, y_probs, fing_type, feature_selection, pca, results, hyperparams=None):
        tn, fp, fn, tp = confusion_matrix(y_test, y_pred).ravel()
        accuracy = (tp + tn) / (tp + tn + fp + fn)
        precision = tp / (tp + fp) if (tp + fp) != 0 else 0
        sensitivity = tp / (tp + fn) if (tp + fn) != 0 else 0
        specificity = tn / (tn + fp) if (tn + fp) != 0 else 0
        
        # Compute ROC and AUC
        y_test_bin = label_binarize(y_test, classes=[0, 1]).ravel()
        fpr, tpr, _ = roc_curve(y_test_bin, y_probs)
        auc_score = auc(fpr, tpr)
        
        # Append results to the list
        results.append({
            "Model": model_name,
            "fingerprint type": fing_type,
            "Best Parameters": hyperparams,
            "feature_selection": feature_selection,
            "PCA": pca,
            "Accuracy": accuracy,
            "Sensitivity (Recall)": sensitivity,
            "Specificity": specificity,
            "Precision": precision,
            "AUC score": auc_score, 
            "Confusion Matrix": f"TP={tp}, TN={tn}, FP={fp}, FN={fn}"
        })

        print(f"Results for {model_name}")
        if hyperparams:
            print(f"Hyperparameters: {hyperparams}")
        print(f"Accuracy: {accuracy:.4f}")
        print(f"Sensitivity (Recall): {sensitivity:.4f}")
        print(f"Specificity: {specificity:.4f}")
        print(f"Precision: {precision:.4f}")
        print(f"AUC: {auc_score: .4f}")
        print(f"Confusion Matrix: TP={tp}, TN={tn}, FP={fp}, FN={fn}")
        print("-" * 40)
        
        return results

    # Loop through all fingerprints
    for fp_type in fingerprint_types:
        print(f"\nProcessing fingerprint: {fp_type}")
        
        # Step 1: Extract fingerprints for the current type
        updated_df = calculate_fingerprints(df_smiles, smiles_col, fp_type)
        
        # Checking for missing values
        print("Missing values: ", updated_df.isna().all().sum())
        print(updated_df['IC50'].value_counts())
        

        X = updated_df.iloc[:, :-1]
        y = updated_df.iloc[:, -1]
        

        # Initialize the LabelEncoder
        label_encoder = LabelEncoder()
        y = label_encoder.fit_transform(y)
        updated_df[updated_df.columns[-1]] = y
        

        # Step 2: Train-test split
        X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

        # Scale the data
        scaler = MinMaxScaler()
        X_train_scaled = scaler.fit_transform(X_train)
        X_test_scaled = scaler.transform(X_test)
        

        # Performing Lasso feature selection if requested
        if perform_feature_selection:
            feature_flag = True
            print("Performing feature selection with Lasso...")
            lasso = LassoCV(cv=5, random_state=42, max_iter=100000).fit(X_train_scaled, y_train)
            selected_features_mask = lasso.coef_ != 0
            selected_feature_names = X_train.columns[selected_features_mask]
            print("Number of selected features:", len(selected_feature_names))
            print("Selected features:", list(selected_feature_names))

            X_train_scaled = X_train_scaled[:, selected_features_mask]
            X_test_scaled = X_test_scaled[:, selected_features_mask]
            
        if perform_pca:
            pca_flag = True
            # Step 2: Perform PCA on the training set
            pca = PCA()
            pca.fit(X_train_scaled)

            # Step 3: Compute the cumulative explained variance ratio
            explained_variance_ratio = np.cumsum(pca.explained_variance_ratio_)

            # Step 4: Determine the number of principal components for 95% variance
            num_components = np.argmax(explained_variance_ratio >= 0.99) + 1

            # Step 5: Project training and test data onto selected components
            pca = PCA(n_components=num_components)
            X_train_scaled = pca.fit_transform(X_train_scaled)
            X_test_scaled = pca.transform(X_test_scaled)

        # Step 3: Model training and evaluation
        for model_name, model in models.items():
            print(f"\nTraining {model_name} model")

            if model_name == "SVM":
                # Train SVM with fixed hyperparameters
                print("Using fixed hyperparameters for SVM...")
                fixed_params = {"kernel": "rbf"}
                model.set_params(**fixed_params)
                model.fit(X_train_scaled, y_train)
                y_pred = model.predict(X_test_scaled)
                y_probs = model.predict_proba(X_test_scaled)[:, 1]
                results = evaluate_and_print_results(model_name, y_test, y_pred, y_probs, fp_type, feature_flag, pca_flag, results, fixed_params)
            else:
                # Set up GridSearchCV for the current model
                grid_search = GridSearchCV(model, param_grid=param_grids[model_name], cv=5, scoring='accuracy', n_jobs=-1, verbose=1)
                grid_search.fit(X_train_scaled, y_train)

                # Get the best model and best parameters from GridSearchCV
                best_model = grid_search.best_estimator_
                best_params = grid_search.best_params_

                # Evaluate the model on the test set
                y_pred = best_model.predict(X_test_scaled)
                y_probs = best_model.predict_proba(X_test_scaled)[:, 1]
                results = evaluate_and_print_results(model_name, y_test, y_pred, y_probs, fp_type, feature_flag, pca_flag, results, best_params)
    
    results_df = pd.DataFrame(results)
    if perform_feature_selection:
        results_df.to_excel("results_features.xlsx", index=False)
    elif perform_pca:
        results_df.to_excel("results_pca.xlsx", index=False)
    else:
        results_df.to_excel("results_80.xlsx", index=False)
    feature_flag = False
    pca_flag = False