In [None]:
import json
import intervaltree
import csv
import pandas as pd
import numpy as np

In [None]:
def read_CD(path):
    t = pd.read_csv(path)
    annotations = []
    primary_rtime = None
    primary_mz = None
    for name, mz, rtime, sub_name in zip(t["Name"], t["m/z"], t["RT [min]"], t["Annot. Source: mzVault Search"]):
        if name != "No results" and name != "Tags":
            if isinstance(name, str):
                primary_rtime = rtime
                primary_mz = mz
                annotations.append({"Name": name, "m/z": mz, "RT [min]": rtime})
            elif isinstance(sub_name, str) and sub_name != "No results":
                annotations.append({"Name": sub_name, "m/z": primary_mz, "RT [min]": primary_rtime})
    return annotations

In [None]:
def read_pcpfm(path):
    return json.load(open(path))

In [None]:
def map_CD_annots(cd_annots, pcpfm_annots, mz_ppm_tol=10, rt_err=30):
    mz_tree, rt_tree = intervaltree.IntervalTree(), intervaltree.IntervalTree()
    khipu_to_features, features_to_khipu = {}, {}
    for kp_id, kp in pcpfm_annots.items():
        for peak in kp["MS1_pseudo_Spectra"]:
            id = peak['id_number']
            features_to_khipu[id] = kp_id
            if kp_id in khipu_to_features:
                khipu_to_features[kp_id].append(id)
            else:
                khipu_to_features[kp_id] = [id]
            mz, rtime = peak['mz'], peak['rtime']
            mz_err = mz / 1e6 * mz_ppm_tol
            mz_tree.addi(mz - mz_err, mz + mz_err, id)
            rt_tree.addi(rtime - rt_err, rtime + rt_err, id)            
    
    for cd_annot in cd_annots:
        cd_annot['mapped_to'] = set()
        if cd_annot['Name']:
            mz = float(cd_annot['m/z'])
            rt = float(cd_annot['RT [min]']) * 60
            mz_matches = {x.data for x in mz_tree.at(mz)}
            rt_matches = {x.data for x in rt_tree.at(rt)}
            for match in mz_matches.intersection(rt_matches):
                for khipu_feature in khipu_to_features[features_to_khipu[match]]:
                    cd_annot['mapped_to'].add(khipu_feature)
    return cd_annots

In [None]:
def eval_overlap(cd_path, pcpfm_path):
    CD_annots = read_CD(cd_path)
    PCPFM_annots = read_pcpfm(pcpfm_path)
    CD_annots = map_CD_annots(CD_annots, PCPFM_annots, mz_ppm_tol=10)

    all_CD_annot_cpds = set()
    all_PCPFM_annot_cpds = set()

    for cd_annot in CD_annots:
        for feature in cd_annot["mapped_to"]:
            all_CD_annot_cpds.add(cd_annot["Name"] + "_" + feature)

    for kp_id, kp in PCPFM_annots.items():
        if "MS2_Spectra" in kp:
            for MS2 in kp["MS2_Spectra"]:
                all_annotations = [(annotation, annotation["msms_score"]) for annotation in MS2["annotations"]]
                all_annotations = sorted(all_annotations, key=lambda x: -x[1])
                all_annotations = [x[0] for x in all_annotations[:10]]
                for annotation in all_annotations:
                    for peak in kp["MS1_pseudo_Spectra"]:
                        all_PCPFM_annot_cpds.add(annotation['reference_id'] + "_" + peak['id_number'])

    print("CD All:", len(all_CD_annot_cpds), "PCPFM All:", len(all_PCPFM_annot_cpds))
    CD_in_PCPFM = all_CD_annot_cpds.intersection(all_PCPFM_annot_cpds)
    print("Shared Annotations: ", len(CD_in_PCPFM))
    print("CD Only:", len(all_CD_annot_cpds) - len(CD_in_PCPFM))
    print("PCPFM Only:", len(all_PCPFM_annot_cpds) - len(CD_in_PCPFM))
    print(CD_in_PCPFM)

In [None]:
# RP NEG 
eval_overlap("./CD_MoNA_RP_neg.csv", "./HZV029_RP_neg_MoNA_annotations.json") # 30 sec


In [None]:
# HILIC POS
eval_overlap("./CD_MoNA_HILIC_pos.csv", "./HZV029_HILIC_pos_MoNA_annotations.json")
