In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

import torch
import torch.nn as nn
import torch_geometric
from torch_geometric.nn.acts import swish

from tqdm import tqdm
from copy import deepcopy
import json

from model.params_interpreter import string_to_object 
from model.alpha_encoder import Encoder

from model.gnn_3D.schnet import SchNet
from model.gnn_3D.dimenet_pp import DimeNetPlusPlus
from model.gnn_3D.spherenet import SphereNet

from model.train_functions import binary_ranking_regression_loop_alpha
from model.train_functions import evaluate_binary_ranking_regression_loop_alpha
from model.gnn_3D.train_functions import binary_ranking_regression_loop
from model.gnn_3D.train_functions import evaluate_binary_ranking_regression_loop

from model.datasets_samplers import Dataset_3D_GNN, MaskedGraphDataset, StereoBatchSampler, SiameseBatchSampler, Sample_Map_To_Positives, Sample_Map_To_Negatives, NegativeBatchSampler, SingleConformerBatchSampler


In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [None]:
def get_ranking_accuracies(results_df, mode = '<='):
    stats = results_df.groupby("ID")["outputs"].agg([np.mean, np.std]).merge(results_df, on = 'ID').reset_index(drop = True)
    
    smiles_groups_std = results_df.groupby(['ID', 'SMILES_nostereo'])['targets', 'outputs'].std().reset_index()
    smiles_groups_mean = results_df.groupby(['ID', 'SMILES_nostereo'])['targets', 'outputs'].mean().reset_index()
    smiles_groups_count = results_df.groupby(['ID', 'SMILES_nostereo'])['targets', 'outputs'].count().reset_index()
    
    stereoisomers_df = deepcopy(smiles_groups_mean).rename(columns = {'outputs': 'mean_predicted_score'})
    stereoisomers_df['std_predicted_score'] = smiles_groups_std['outputs']
    stereoisomers_df['count'] = smiles_groups_count.targets # score here simply contains the count
    
    stereoisomers_df_margins = stereoisomers_df.merge(pd.DataFrame(stereoisomers_df.groupby('SMILES_nostereo').apply(lambda x: np.max(x.targets) - np.min(x.targets)), columns = ['difference']), on = 'SMILES_nostereo')
    top_1_margins = []
    margins = np.arange(0.3, 2.1, 0.1)
    random_baseline_means = np.ones(len(margins)) * 0.5
    random_baseline_stds = []
    
    for margin in margins:
        if mode == '<=':
            subset = stereoisomers_df_margins[np.round(stereoisomers_df_margins.difference, 1) <= np.round(margin, 1)] # change to  ==, >=, <=
        elif mode == '>=':
            subset = stereoisomers_df_margins[np.round(stereoisomers_df_margins.difference, 1) >= np.round(margin, 1)] # change to  ==, >=, <=
        elif mode == '==':
            subset = stereoisomers_df_margins[np.round(stereoisomers_df_margins.difference, 1) == np.round(margin, 1)] # change to  ==, >=, <=
        
        top_1 = subset.groupby('SMILES_nostereo').apply(lambda x: np.argmin(np.array(x.targets)) == np.argmin(np.array(x.mean_predicted_score)))
        random_baseline_std = np.sqrt(len(top_1) * 0.5 * 0.5) # sqrt(npq) -- std of number of guesses expected to be right, when guessing randomly
        random_baseline_stds.append(random_baseline_std/len(top_1))
        acc = sum(top_1 / len(top_1))
        top_1_margins.append(acc)
    
    random_baseline_stds = np.array(random_baseline_stds)
    
    return margins, np.array(top_1_margins), random_baseline_means, random_baseline_stds

In [None]:
def get_docking_predictions_ChIRo(path_to_results = None, path_to_params_file = None, path_to_model_dict = None):
    try:
        results_df = pd.read_csv(str(path_to_results))
    except:
        print('creating model...')
        best_state_dict = str(path_to_model_dict)

        with open(path_to_params_file) as f: # should contain path to params.json file
            params = json.load(f)
        
        layers_dict = deepcopy(params['layers_dict'])
        
        activation_dict = deepcopy(params['activation_dict'])
        for key, value in params['activation_dict'].items(): 
            activation_dict[key] = string_to_object[value] # convert strings to actual python objects/functions using pre-defined mapping
        
        num_node_features = 52
        num_edge_features = 14
        
        model = Encoder(
            F_z_list = params['F_z_list'], # dimension of latent space
            F_H = params['F_H'], # dimension of final node embeddings, after EConv and GAT layers
            F_H_embed = num_node_features, # dimension of initial node feature vector, currently 41
            F_E_embed = num_edge_features, # dimension of initial edge feature vector, currently 12
            F_H_EConv = params['F_H_EConv'], # dimension of node embedding after EConv layer
            layers_dict = layers_dict,
            activation_dict = activation_dict,
            GAT_N_heads = params['GAT_N_heads'],
            chiral_message_passing = params['chiral_message_passing'],
            CMP_EConv_MLP_hidden_sizes = params['CMP_EConv_MLP_hidden_sizes'],
            CMP_GAT_N_layers = params['CMP_GAT_N_layers'],
            CMP_GAT_N_heads = params['CMP_GAT_N_heads'],
            c_coefficient_normalization = params['c_coefficient_normalization'], # None, or one of ['softmax']
            encoder_reduction = params['encoder_reduction'], #mean or sum
            output_concatenation_mode = params['output_concatenation_mode'], # none (if contrastive), conformer, molecule, or z_alpha (if regression)
            EConv_bias = params['EConv_bias'], 
            GAT_bias = params['GAT_bias'], 
            encoder_biases = params['encoder_biases'], 
            dropout = params['dropout'], # applied to hidden layers (not input/output layer) of Encoder MLPs, hidden layers (not input/output layer) of EConv MLP, and all GAT layers (using their dropout parameter)
            )
        
        model.load_state_dict(torch.load(best_state_dict, map_location=next(model.parameters()).device), strict=True)
        model.to(device)
        
        test_dataframe = pd.read_pickle(params['test_datafile'])
        test_dataset = MaskedGraphDataset(test_dataframe, 
                                            regression = 'top_score', # top_score, RS_label_binary, sign_rotation
                                            stereoMask = params['stereoMask'],
                                            mask_coordinates = params['mask_coordinates'], 
                                            )
        
        test_loader = torch_geometric.data.DataLoader(test_dataset, num_workers = 0, batch_size = 1000, shuffle = False)
        
        with torch.no_grad():
            targets, outputs = evaluate_binary_ranking_regression_loop_alpha(model, test_loader, device, batch_size = 1000, dataset_size = len(test_dataset))
        
        results_df = deepcopy(test_dataframe[['ID', 'SMILES_nostereo', 'top_score']])
        results_df['targets'] = targets
        results_df['outputs'] = outputs
        
    margins, ranking_accuracy, random_baseline_means, random_baseline_stds = get_ranking_accuracies(results_df, mode = '<=')
    
    return results_df, margins, ranking_accuracy, random_baseline_means, random_baseline_stds

In [None]:
def get_docking_predictions_spherenet(path_to_results = None, path_to_params_file = None, path_to_model_dict = None):
    try:
        results_df = pd.read_csv(str(path_to_results))
    except:
        print('creating model...')
        best_state_dict = str(path_to_model_dict)

        with open(path_to_params_file) as f: # should contain path to params.json file
            params = json.load(f)
        
        model = SphereNet(
                    energy_and_force = False, 
                    cutoff = params['cutoff'],
                    num_layers = params['num_layers'], 
                    hidden_channels = params['hidden_channels'],
                    out_channels = params['out_channels'], 
                    int_emb_size = params['int_emb_size'],
                    basis_emb_size_dist = params['basis_emb_size_dist'],
                    basis_emb_size_angle = params['basis_emb_size_angle'], 
                    basis_emb_size_torsion = params['basis_emb_size_torsion'],
                    out_emb_channels = params['out_emb_channels'], 
                    num_spherical = params['num_spherical'],
                    num_radial = params['num_radial'],
                    envelope_exponent = params['envelope_exponent'],
                    num_before_skip = params['num_before_skip'],
                    num_after_skip = params['num_after_skip'], 
                    num_output_layers = params['num_output_layers'],
                    act=swish, 
                    output_init='GlorotOrthogonal', 
                    use_node_features = True,
                    MLP_hidden_sizes = params['MLP_hidden_sizes'], # [] for contrastive
            )
        
        model.load_state_dict(torch.load(best_state_dict, map_location=next(model.parameters()).device), strict=True)
        model.to(device)
        
        test_dataframe = pd.read_pickle(params['test_datafile'])
        test_dataset = MaskedGraphDataset(test_dataframe, 
                                            regression = 'top_score', # top_score, RS_label_binary, sign_rotation
                                            stereoMask = params['stereoMask'],
                                            mask_coordinates = params['mask_coordinates'], 
                                            )
        
        test_loader = torch_geometric.data.DataLoader(test_dataset, num_workers = 0, batch_size = 1000, shuffle = False)
        
        with torch.no_grad():
            targets, outputs = evaluate_binary_ranking_regression_loop(model, test_loader, device, batch_size = 1000, dataset_size = len(test_dataset))
        
        results_df = deepcopy(test_dataframe[['ID', 'SMILES_nostereo', 'top_score']])
        results_df['targets'] = targets
        results_df['outputs'] = outputs
        
    margins, ranking_accuracy, random_baseline_means, random_baseline_stds = get_ranking_accuracies(results_df, mode = '>=')
    
    return results_df, margins, ranking_accuracy, random_baseline_means, random_baseline_stds

In [None]:
# ChIRo
results_df, margins, ranking_accuracy, random_baseline_means, random_baseline_stds = get_docking_predictions_ChIRo(
    path_to_results = 'paper_results/docking_experiment/ChIRo/best_model_test_results.csv', 
    path_to_params_file = 'paper_results/docking_experiment/ChIRo/params_binary_ranking_ChIRo.json', 
    path_to_model_dict = 'paper_results/docking_experiment/ChIRo/best_model.pt',
)

In [None]:
# SphereNet
results_df, margins, ranking_accuracy, random_baseline_means, random_baseline_stds = get_docking_predictions_spherenet(
    path_to_results = 'paper_results/docking_experiment/spherenet/best_model_test_results.csv', 
    path_to_params_file = 'paper_results/docking_experiment/spherenet/params_binary_ranking_spherenet.json', 
    path_to_model_dict = 'paper_results/docking_experiment/spherenet/best_model.pt',
)