Import libraries

In [1]:
import os 
import collections
import numpy as np
import pandas as pd
from tqdm import tqdm
import matplotlib.pyplot as plt
from scipy.stats import chi2_contingency
from utils import load_pickle, pickle_data

Settings

In [2]:
data_folder = "/data/rbg/users/klingmin/projects/MS_processing/data/"
results_folder = "/data/rbg/users/klingmin/projects/ML_MS_analysis/FP_prediction/mist/best_ls_results"
analysis_folder = "./cache/ls_plots"
if not os.path.exists(analysis_folder): os.makedirs(analysis_folder)

dataset_mapping = {"canopus": "Canopus", 
                   "massspecgym": "MassSpecGym",
                   "nist2023": "NIST2023"}

model_mapping = {"binned": "Binned + MLP",
                 "MS" : "MS + Transformer",
                 "formula": "Formula + Transformer",
                 "MIST": "MIST"}

splits_mapping = {"scaffold_vanilla": "scaffold split",
                  "inchikey_vanilla": "inchikey split",
                  "random": "random split"}

NIST_instrument_mapping = {"Agilent QTOF 6530": "Agilent QTOF \n 6530",
                           "Thermo Finnigan Elite Orbitrap": "Thermo \n Finnigan \n Elite Orbitrap",
                           "Orbitrap Fusion Lumos": "Orbitrap \n Fusion Lumos",
                           "Thermo Finnigan Velos Orbitrap": "Thermo \n Finnigan \n Velos Orbitrap"}

train_color_code, test_color_code = "#92898A", "#C95D63"

Helper Functions

In [3]:
def get_train_test_ratio(train_data, test_data):

    ratio = round(len(test_data) / (len(train_data) + len(test_data)) * 100, 3)

    return ratio

def get_percentage_mol_overlap(train_data, test_data):

    train_mols = set([r["inchikey_original"][:14] for r in train_data])
    test_mols = set([r["inchikey_original"][:14] for r in test_data])

    percent_overlap = round(len(train_mols.intersection(test_mols)) / len(test_data) * 100, 3)

    return percent_overlap

def get_oov_rate(train_data, test_data):
 
    train_formula = set([p["comment"]["f_pred"] for f in train_data for p in f["peaks"] if p["comment"]["f_pred"] != ""])
    test_formula = set([p["comment"]["f_pred"] for f in test_data for p in f["peaks"] if p["comment"]["f_pred"] != ""])

    oov_rate = len(test_formula - train_formula) / len(test_formula) * 100
    
    return oov_rate

def get_instrument_breakdown(train_data, test_data, key = "instrument_type"):
        
    train_instruments = collections.Counter([r[key] for r in train_data])
    test_instruments = collections.Counter([r[key] for r in test_data])

    unique_instruments = set(list(train_instruments.keys()) + list(test_instruments.keys()))

    train_instrument_counts = [train_instruments[i] if i in train_instruments else 0 for i in unique_instruments]
    test_instrument_counts = [test_instruments[i] if i in test_instruments else 0 for i in unique_instruments]

    return (train_instruments,train_instrument_counts), (test_instruments, test_instrument_counts), unique_instruments

def get_adduct_breakdown(train_data, test_data):
                
    train_adducts = collections.Counter([r["precursor_type"] for r in train_data])
    test_adducts = collections.Counter([r["precursor_type"] for r in test_data])

    unique_adducts = set(list(train_adducts.keys()) + list(test_adducts.keys()))

    train_adduct_counts = [train_adducts[i] if i in train_adducts else 0 for i in unique_adducts]
    test_adduct_counts = [test_adducts[i] if i in test_adducts else 0 for i in unique_adducts]

    return (train_adducts, train_adduct_counts), (test_adducts, test_adduct_counts), unique_adducts

def get_energy_breakdown(train_data, test_data):

    train_energies = collections.Counter([r["collision_energy"] for r in train_data])
    test_energies = collections.Counter([r["collision_energy"] for r in test_data])

    energy_bins = ["-", "0-20", "20-40", "40-60", "60-80", "80-100", "100-120", "120-150", ">150"]
    train_energies_binned = {"-": 0, "0-20": 0, "20-40": 0, "40-60": 0, "60-80": 0, "80-100": 0, "100-120":0, "120-150": 0, ">150": 0}
    test_energies_binned = {"-": 0, "0-20": 0, "20-40": 0, "40-60": 0, "60-80": 0, "80-100": 0, "100-120":0, "120-150": 0, ">150": 0}

    for e,c in train_energies.items():

        if e == "-": train_energies_binned["-"] += c 
        elif e is None: train_energies_binned["-"] += c
        elif e < 20: train_energies_binned["0-20"] += c 
        elif e < 40: train_energies_binned["20-40"] += c 
        elif e < 60: train_energies_binned["40-60"] += c 
        elif e < 80: train_energies_binned["60-80"] += c 
        elif e < 100: train_energies_binned["80-100"] += c 
        elif e < 120: train_energies_binned["100-120"] += c 
        elif e < 150: train_energies_binned["120-150"] += c 
        else: train_energies_binned[">150"] += c 

    for e,c in test_energies.items():

        if e == "-": test_energies_binned["-"] += c 
        elif e is None: test_energies_binned["-"] += c
        elif e < 20: test_energies_binned["0-20"] += c 
        elif e < 40: test_energies_binned["20-40"] += c 
        elif e < 60: test_energies_binned["40-60"] += c 
        elif e < 80: test_energies_binned["60-80"] += c 
        elif e < 100: test_energies_binned["80-100"] += c 
        elif e < 120: test_energies_binned["100-120"] += c 
        elif e < 150: test_energies_binned["120-150"] += c 
        else: test_energies_binned[">150"] += c 

    train_binned_energy_counts = [train_energies_binned[i] for i in test_energies_binned.keys()]
    test_binned_energy_counts = [test_energies_binned[i] for i in test_energies_binned.keys()]

    return (train_energies_binned, train_binned_energy_counts), (test_energies_binned, test_binned_energy_counts), energy_bins

def get_molecule_class(train_data, test_data):
    print()

Iterate through the various LS results to obtain the charts

In [4]:
for dataset in ["canopus", "massspecgym", "nist2023"]:

    current_data_folder = os.path.join(data_folder, dataset, "frags_preds")

    ls_results_folder = [os.path.join(results_folder, f) for f in os.listdir(results_folder) if dataset in f and "sieved" not in f]
    assert len(ls_results_folder) == 1
    ls_results_folder = ls_results_folder[0]

    stats_path = os.path.join(analysis_folder, f"{dataset}_ls_stats.pkl")
    if os.path.exists(stats_path): continue

    split = load_pickle(os.path.join(ls_results_folder, "best_split.pkl"))
    data_ids = load_pickle(os.path.join(ls_results_folder, "data_ids.pkl"))

    # Get the train and test data now
    train_ids = [data_ids[i] for i in split["train_indices"]]
    test_ids = [data_ids[i] for i in split["test_indices"]]

    train_data = [load_pickle(os.path.join(current_data_folder, f"{i}.pkl")) for i in train_ids]
    test_data = [load_pickle(os.path.join(current_data_folder, f"{i}.pkl")) for i in test_ids]

    # # Get the train test ratio
    # if dataset == "nist2023": instrument_key = "instrument"
    # else: instrument_key = "instrument_type"
    # stats = {} 
    # stats["train_test_ratio"] = get_train_test_ratio(train_data, test_data)
    # stats["percent_overlap"] = get_percentage_mol_overlap(train_data, test_data)
    # stats["oov_rate"] = get_oov_rate(train_data, test_data) 
    # stats["instrument_breakdown"]  = get_instrument_breakdown(train_data, test_data, key = instrument_key)
    # stats["adduct_breakdown"] = get_adduct_breakdown(train_data, test_data)
    # stats["energy_breakdown"] = get_energy_breakdown(train_data, test_data)

    # # Add to the data 
    # pickle_data(stats, stats_path)

Iterate through the various LS results to obtain OOV rate for sieved results

In [5]:
for dataset in ["massspecgym", "nist2023"]:

    current_data_folder = os.path.join(data_folder, dataset, "frags_preds")

    ls_results_folder = [os.path.join(results_folder, f) for f in os.listdir(results_folder) if dataset in f and "sieved" in f]
    assert len(ls_results_folder) == 1
    ls_results_folder = ls_results_folder[0]

    stats_path = os.path.join(analysis_folder, f"{dataset}_sieved_ls_stats.pkl")
    if os.path.exists(stats_path): continue

    split = load_pickle(os.path.join(ls_results_folder, "best_split.pkl"))
    data_ids = load_pickle(os.path.join(ls_results_folder, "data_ids.pkl"))

    # Get the train and test data now
    train_ids = [data_ids[i] for i in split["train_indices"]]
    test_ids = [data_ids[i] for i in split["test_indices"]]

    train_data = [load_pickle(os.path.join(current_data_folder, f"{i}.pkl")) for i in train_ids]
    test_data = [load_pickle(os.path.join(current_data_folder, f"{i}.pkl")) for i in test_ids]

    # Get the stats
    stats = {} 
    stats["train_test_ratio"] = get_train_test_ratio(train_data, test_data)
    stats["percent_overlap"] = get_percentage_mol_overlap(train_data, test_data)
    stats["oov_rate"] = get_oov_rate(train_data, test_data) 

    # Add to the data 
    pickle_data(stats, stats_path)

``Let us plot the histograms and compute the chi square test scores``

1. Plot the instrument split

In [None]:
# 1. Get the instrument split
for dataset in ["canopus", "massspecgym", "nist2023"]:

    instrument_output_path = os.path.join(analysis_folder, f"{dataset}_instrument_split.jpg")
    stats = load_pickle(os.path.join(analysis_folder, f"{dataset}_ls_stats.pkl"))

    plt.figure(figsize=(8, 6))
    bar_width = 0.23

    (train_instruments_breakdown, _), (test_instruments_breakdown, _), unique_instruments = stats["instrument_breakdown"]
    unique_instruments = list(set(list(train_instruments_breakdown.keys()) + list(test_instruments_breakdown.keys())))
    train_instruments_counts = [train_instruments_breakdown[u] for u in unique_instruments]
    test_instruments_counts = [test_instruments_breakdown[u] for u in unique_instruments]

    _, p_instrument, _, _ = chi2_contingency(np.array([train_instruments_counts, test_instruments_counts]))

    train_bars = np.arange(len(unique_instruments))
    test_bars = [x + bar_width + 0.01 for x in train_bars]
    n_train, n_test = sum(train_instruments_counts), sum(test_instruments_counts)

    unique_instruments = [str(u) for u in unique_instruments]
    
    if dataset == "massspecgym":

        idx = unique_instruments.index("None")
        train_instruments_counts = [c for i, c in enumerate(train_instruments_counts) if i != idx]
        test_instruments_counts = [c for i, c in enumerate(test_instruments_counts) if i != idx]
        unique_instruments = [u for u in unique_instruments if u != "None"]

    train_bars = np.arange(len(unique_instruments))
    test_bars = [x + bar_width + 0.01 for x in train_bars]

    plt.bar(train_bars, [x / n_train for x in train_instruments_counts], width = bar_width, label ='train', color = train_color_code) 
    plt.bar(test_bars, [x / n_test for x in test_instruments_counts], width = bar_width, label ='test', color =  test_color_code) 

    plt.ylabel('Percentage of samples (%)', fontsize = 12)
    
    if dataset == "massspecgym": unique_instruments = [u for u in unique_instruments if u != "None"]
    if dataset == "canopus": unique_instruments = [u.replace("(LCMS)", "").strip() for u in unique_instruments]
    if dataset == "nist2023": unique_instruments = [NIST_instrument_mapping[u] for u in unique_instruments]
    plt.xticks([r + bar_width - 0.1 for r in range(len(unique_instruments))], unique_instruments, rotation = 90)

    plt.legend()
    dataset_ = dataset_mapping[dataset]
    plt.title(f"Breakdown of measurement instrument ({dataset_}) - p-value: {p_instrument: .2e}")
    plt.savefig(instrument_output_path, bbox_inches = "tight")
    plt.show()

2. Plot the adduct split

In [None]:
# 2. Get the adduct split 
for dataset in ["canopus", "massspecgym", "nist2023"]:

    adduct_output_path = os.path.join(analysis_folder, f"{dataset}_adduct_split.jpg")
    stats = load_pickle(os.path.join(analysis_folder, f"{dataset}_ls_stats.pkl"))

    plt.figure(figsize=(8, 6))
    bar_width = 0.23

    (train_adducts_breakdown, _), (test_adducts_breakdown, _), unique_adducts = stats["adduct_breakdown"]

    unique_adducts = list(set(list(train_adducts_breakdown.keys()) + list(test_adducts_breakdown.keys())))
    train_adducts_counts = [train_adducts_breakdown[u] for u in unique_adducts]
    test_adducts_counts = [test_adducts_breakdown[u] for u in unique_adducts]

    n_train, n_test = sum(train_adducts_counts), sum(test_adducts_counts)
    _, p_adduct, _, _ = chi2_contingency(np.array([train_adducts_counts, test_adducts_counts]))
    
    train_bars = np.arange(len(unique_adducts))
    test_bars = [x + bar_width + 0.01 for x in train_bars]

    plt.bar(train_bars, [x / n_train for x in train_adducts_counts], width = bar_width, label ='train', color = train_color_code) 
    plt.bar(test_bars, [x / n_test for x in test_adducts_counts], width = bar_width, label ='test', color  = test_color_code) 

    plt.ylabel('Percentage of samples (%)', fontsize = 12)
    plt.xticks([r + bar_width - 0.1 for r in range(len(unique_adducts))], unique_adducts, rotation = 90)

    plt.legend()
    dataset_ = dataset_mapping[dataset]
    plt.title(f"Breakdown of adduct ({dataset_}) - p-value: {p_adduct: .2e}")
    plt.savefig(adduct_output_path, bbox_inches = "tight")
    plt.show()
    

In [None]:
# 3. Energy split 

for dataset in ["canopus", "massspecgym", "nist2023"]:

    energy_output_path = os.path.join(analysis_folder, f"{dataset}_energy_split.jpg")
    stats = load_pickle(os.path.join(analysis_folder, f"{dataset}_ls_stats.pkl"))

    plt.figure(figsize=(8, 6))
    bar_width = 0.23

    (train_binned_energy_breakdown,_), (test_binned_energy_breakdown, _), _ = stats["energy_breakdown"]
    energy_bins = list(train_binned_energy_breakdown) 

    train_binned_energy_counts = [train_binned_energy_breakdown[e] for e in energy_bins]
    test_binned_energy_counts = [test_binned_energy_breakdown[e] for e in energy_bins]
    
    n_train, n_test = sum(train_binned_energy_counts), sum(test_binned_energy_counts)

    if dataset == "massspecgym":

        train_binned_energy_counts = train_binned_energy_counts[1:]
        test_binned_energy_counts = test_binned_energy_counts[1:]
        energy_bins = energy_bins[1:]

    _, p_energy, _, _ = chi2_contingency(np.array([train_binned_energy_counts, test_binned_energy_counts]))

    train_bars = np.arange(len(train_binned_energy_counts))
    test_bars = [x + bar_width for x in train_bars]

    plt.bar(train_bars, [x / n_train for x in train_binned_energy_counts], width = bar_width, label ='train', color = train_color_code) 
    plt.bar(test_bars, [x / n_test for x in test_binned_energy_counts], width = bar_width, label ='test', color = test_color_code) 

    plt.ylabel('Percentage of samples (%)', fontsize = 12)
    plt.xticks([r + bar_width - 0.05 for r in range(len(energy_bins))], energy_bins)

    plt.legend()
    dataset_ = dataset_mapping[dataset]
    plt.title(f"Breakdown of energy levels ({dataset_}) - p-value: {p_energy:.2e}")
    plt.savefig(energy_output_path, bbox_inches = "tight")
    plt.show()
    

Print the other statistics

In [7]:

for dataset in ["canopus", "massspecgym", "nist2023"]:

    stats = load_pickle(os.path.join(analysis_folder, f"{dataset}_ls_stats.pkl"))

    train_test_ratio = stats["train_test_ratio"]
    percent_overlap = stats["percent_overlap"]
    oov_rate = stats["oov_rate"]
    print(dataset, train_test_ratio, percent_overlap, oov_rate)
    print()

canopus 29.698 2.884 38.128515106784505

massspecgym 17.501 10.884 36.797562626946515

nist2023 4.881 9.513 9.520076160609285



In [6]:

for dataset in ["massspecgym", "nist2023"]:

    stats = load_pickle(os.path.join(analysis_folder, f"{dataset}_sieved_ls_stats.pkl"))

    train_test_ratio = stats["train_test_ratio"]
    percent_overlap = stats["percent_overlap"]
    oov_rate = stats["oov_rate"]
    print(dataset, train_test_ratio, percent_overlap, oov_rate)
    print()

massspecgym 38.361 4.805 51.54240301160724

nist2023 15.103 13.346 22.29890643985419

