The objective is to compare experimental spectra with 2b matching to referense spectra 
Level 2a confirmation is pursued by matching experimental MS/MS spectra against experimental reference spectra from the MassBank of North America and report cosine matches

In [None]:
import numpy as np
import pandas as pd
from matchms.importing import load_from_msp
from matchms import Spectrum

In [None]:
# === Load 2b annotated features ===
annotated_df = pd.read_csv("NIAS_2b_Confirmed_Annotated_With_areas.csv", dtype=str)
annotated_df = annotated_df[["Feature_ID", "Canonical_SMILES"]].dropna()
feature_to_smiles = dict(zip(annotated_df["Feature_ID"].astype(str), annotated_df["Canonical_SMILES"]))
print(f"🔍 Loaded {len(feature_to_smiles)} annotated features with SMILES.")

In [None]:
# Step 1: Load 2b annotations
annotations = pd.read_csv("NIAS_2b_Confirmed_Annotated_With_areas.csv")
annotations["Feature_ID"] = annotations["Feature_ID"].astype(str)
feature_to_smiles = dict(zip(annotations["Feature_ID"], annotations["Canonical_SMILES"]))
valid_feature_ids = set(feature_to_smiles.keys())

# Step 2: Load experimental MSP and filter only those with matching Feature_IDs
experimental_all = list(load_from_msp("Msp_2025_04_18_18_27_22_NIAS_allsamples_PI_03.msp"))
filtered_spectra = []

for spec in experimental_all:
    comment = spec.get("comment")
    if comment and "|PEAKID=" in comment:
        # Extract Feature_ID from comment (after PEAKID=)
        try:
            for part in comment.split("|"):
                if part.startswith("PEAKID="):
                    fid = part.replace("PEAKID=", "").strip()
                    break
            if fid in valid_feature_ids:
                spec.set("feature_id", fid)
                spec.set("target_smiles", feature_to_smiles[fid])
                filtered_spectra.append(spec)
        except Exception as e:
            print(f"⚠️ Failed parsing PEAKID in comment: {comment}")
            continue

print(f"✅ Loaded {len(filtered_spectra)} experimental spectra linked to 2b-annotated features.")


In [None]:
# === Load MassBank ===
massbank = list(load_from_msp("MoNA-export-LC-MS-MS_Spectra.msp"))
print(f"📚 Loaded {len(massbank)} MassBank reference spectra.")

In [None]:
# === Filter MassBank to only relevant SMILES ===
smiles_set = set(feature_to_smiles.values())
massbank_filtered = [s for s in massbank if s.get("smiles") in smiles_set]
print(f"🧬 Retained {len(massbank_filtered)} MassBank entries matching annotated SMILES.")


In [None]:
# === PPM cosine similarity ===
def cosine_similarity_ppm(spec1, spec2, ppm_tol=5, ppm_tol_low=10, mz_cutoff=100, precursor_tol_ppm=10):
    mz1, intens1 = np.array(spec1.mz), np.array(spec1.intensities)
    mz2, intens2 = np.array(spec2.mz), np.array(spec2.intensities)
    precursor1 = spec1.get("precursor_mz")

    intens1 = intens1 / intens1.max() if intens1.max() > 0 else intens1
    intens2 = intens2 / intens2.max() if intens2.max() > 0 else intens2

    i, j = 0, 0
    matched1, matched2 = [], []

    while i < len(mz1) and j < len(mz2):
        mz_val = mz1[i]
        current_tol = ppm_tol_low if mz_val < mz_cutoff else ppm_tol
        ppm_diff = abs(mz_val - mz2[j]) / mz_val * 1e6

        if ppm_diff <= current_tol:
            precursor_hit1 = abs(mz1[i] - precursor1) / precursor1 * 1e6 <= precursor_tol_ppm if precursor1 else False
            precursor_hit2 = abs(mz2[j] - precursor1) / precursor1 * 1e6 <= precursor_tol_ppm if precursor1 else False
            if precursor_hit1 or precursor_hit2:
                i += 1
                j += 1
                continue
            matched1.append(intens1[i])
            matched2.append(intens2[j])
            i += 1
            j += 1
        elif mz1[i] < mz2[j]:
            i += 1
        else:
            j += 1

    if len(matched1) == 0:
        return 0.0, 0

    dot = np.dot(matched1, matched2)
    norm1 = np.linalg.norm(matched1)
    norm2 = np.linalg.norm(matched2)
    cosine_score = dot / (norm1 * norm2) if norm1 > 0 and norm2 > 0 else 0.0

    return cosine_score, len(matched1)


In [None]:
# === Match only if SMILES match ===
results = []
for exp in experimental_spectra:
    fid = exp.get("feature_id")
    smiles = exp.get("target_smiles")
    mz_exp = exp.get("precursor_mz")
    rt = exp.get("retention_time")

    for ref in massbank_filtered:
        if ref.get("smiles") != smiles:
            continue

        mz_ref = ref.get("precursor_mz")
        if not mz_exp or not mz_ref:
            continue

        ppm_prec = abs(mz_exp - mz_ref) / mz_exp * 1e6
        if ppm_prec > 5:
            continue

        score, n_matches = cosine_similarity_ppm(exp, ref)

        if score >= 0.1 and n_matches >= 1:
            results.append({
                "Feature_ID": fid,
                "Experimental_mz": mz_exp,
                "RT_min": rt,
                "SMILES": smiles,
                "MassBank_Name": ref.get("compound_name"),
                "MassBank_InChIKey": ref.get("inchikey"),
                "Cosine_Score": round(score, 4),
                "Num_Matching_Peaks": n_matches
            })

df = pd.DataFrame(results)
print(df.head())  # See what actually got collected

if not df.empty:
    df.sort_values("Cosine_Score", ascending=False).to_csv("2a_MassBank_Confirmed_By_SMILES.csv", index=False)
    print(f"✅ Saved {len(df)} SMILES-matched spectral confirmations to '2a_MassBank_Confirmed_By_SMILES.csv'")
else:
    print("⚠️ No matches found. Check filtering criteria or input data.")

# === Save ===
pd.DataFrame(results).sort_values("Cosine_Score", ascending=False).to_csv("2a_MassBank_Confirmed_By_SMILES.csv", index=False)
print(f"✅ Saved {len(results)} SMILES-matched spectral confirmations to '2a_MassBank_Confirmed_By_SMILES.csv'")