MoNA MSP -> mzVault curator with decimal-precision filter (Jupyter-ready) ---

In [None]:
import re
import pandas as pd
from typing import List, Dict, Tuple, Optional
from collections import Counter

# ========= Parameters you can tune =========
min_peaks = 3                  # require at least this many fragment peaks
require_ms2 = True             # require Spectrum_type to be MS2 (or enough peaks if field missing)
mz_decimals_format = 4         # format m/z for output (cosmetic). Set to 0 to keep original formatting
min_decimal_places = 4         # keep peaks that have at least this many decimals in m/z
min_fraction_with_decimals = 0.90  # require at least this fraction of peaks to meet the decimal rule (e.g., 0.90, or 1.0 to be strict)
# ===========================================

QUOTED_TOKEN_RE = re.compile(r'"([^"]+)"')

def parse_comments_tokens(comment_line: str) -> Dict[str, str]:
    meta = {}
    payload = comment_line.split(":", 1)[1] if ":" in comment_line else comment_line
    tokens = QUOTED_TOKEN_RE.findall(payload)
    for tok in tokens:
        if "=" in tok:
            key, val = tok.split("=", 1)
            key_norm = key.strip().lower()
            val = val.strip().strip('"').strip()
        else:
            key_norm, val = tok.strip().lower(), ""
        if key_norm.startswith("cas"):
            meta["CAS#"] = val
        elif "pubchem" in key_norm or key_norm == "cid":
            meta["PubChemID"] = val
        elif key_norm.startswith("smiles"):
            meta["SMILES"] = val
        elif key_norm.startswith("inchi") and not key_norm.startswith("inchikey"):
            if not val.startswith("InChI=") and val:
                val = "InChI=" + val
            meta["InChI"] = val
        elif key_norm.startswith("inchikey"):
            meta["InChIKey"] = val
        elif "retention time" in key_norm or key_norm.startswith("rt"):
            num = re.findall(r"[-+]?\d*\.\d+|\d+", val)
            meta["RT"] = (num[0] + " min") if num else val
        elif key_norm.startswith("computed [m+h]"):
            meta["Precursor_type"] = "[M+H]+"
            meta["PrecursorMZ"] = val
        elif key_norm.startswith("computed [m-h]"):
            meta["Precursor_type"] = "[M-H]-"
            meta["PrecursorMZ"] = val
    return meta

def format_mz(mz: str, decimals: Optional[int]) -> str:
    if not decimals:
        return mz
    try:
        return f"{float(mz):.{decimals}f}"
    except Exception:
        return mz

def parse_peak_line(line: str) -> Optional[Tuple[str, str]]:
    parts = line.strip().split()
    return (parts[0], parts[1]) if len(parts) >= 2 else None

def iter_msp_blocks(lines: List[str]) -> List[List[str]]:
    blocks, current = [], []
    for ln in lines:
        if ln.strip().startswith("Name:") and current:
            blocks.append(current); current = [ln]
        else:
            if ln.strip() == "" and current:
                blocks.append(current); current = []
            else:
                current.append(ln)
    if current: blocks.append(current)
    return blocks

def parse_msp_block(block: List[str]) -> Tuple[Dict[str, str], List[Tuple[str, str]], List[str]]:
    headers, peaks, in_peaks, comment_items = {}, [], False, []
    for raw in block:
        line = raw.rstrip("\n")
        if not in_peaks:
            if line.lower().startswith("num peaks"):
                in_peaks = True
                # store the raw count if present; we will recompute anyway
                digits = re.findall(r"\d+", line)
                headers["Num Peaks"] = digits[0] if digits else ""
                continue
            if ":" in line:
                key, val = line.split(":", 1)
                key = key.strip(); val = val.strip()
                if key.lower() == "comments":
                    comment_items = QUOTED_TOKEN_RE.findall(val)
                    headers["Comments"] = val
                else:
                    headers[key] = val
        else:
            parsed = parse_peak_line(line)
            if parsed: peaks.append(parsed)
    if "Num Peaks" not in headers:
        headers["Num Peaks"] = str(len(peaks))
    return headers, peaks, comment_items

def count_decimals(mz_str: str) -> int:
    """Count digits after the decimal point in a simple numeric string like '356.29293486800003'."""
    s = mz_str.strip()
    # remove trailing punctuation just in case
    s = s.rstrip(",;")
    if "e" in s.lower():  # scientific notation unlikely in MoNA peaks, but guard anyway
        try:
            # Convert to normalized string with many decimals, then count
            val = float(s)
            s = f"{val:.10f}".rstrip("0").rstrip(".")
        except:
            pass
    if "." not in s:
        return 0
    return len(s.split(".", 1)[1])

def passes_decimal_precision(peaks: List[Tuple[str, str]],
                             min_places: int,
                             min_fraction: float) -> bool:
    if not peaks:
        return False
    qualified = 0
    total = 0
    for mz, inten in peaks:
        # Only consider non-zero intensity lines, but count all peaks usually
        total += 1
        if count_decimals(mz) >= min_places:
            qualified += 1
    frac = qualified / total if total else 0.0
    return frac >= min_fraction

def curate_block(headers,
                 peaks,
                 comment_items,
                 min_peaks_: int = min_peaks,
                 require_ms2_: bool = require_ms2,
                 mz_decimals_: Optional[int] = mz_decimals_format) -> Optional[str]:
    # MS level/peak-count filter
    spectrum_type = headers.get("Spectrum_type", headers.get("SPECTRUM_TYPE","")).upper()
    if require_ms2_ and "MS2" not in spectrum_type and len(peaks) < max(min_peaks_, 2):
        return None
    if not require_ms2_ and len(peaks) < min_peaks_:
        return None

    # NEW: decimal-precision filter on fragment m/z
    if not passes_decimal_precision(peaks, min_decimal_places, min_fraction_with_decimals):
        return None

    # Promote selected metadata from Comments
    meta = parse_comments_tokens("Comments: " + " ".join(f"\"{x}\"" for x in comment_items)) if comment_items else {}
    for k in ("CAS#","PubChemID","SMILES","InChI","InChIKey","RT"):
        if k in meta and meta[k]:
            headers[k] = meta[k]

    # Build output block
    out = []
    def put(k: str):
        v = headers.get(k)
        if v is not None and str(v).strip():
            out.append(f"{k}: {v}")

    for k in ("Name","Synon","DB#","InChIKey","Spectrum_type","Instrument_type","Instrument",
              "Ion_mode","Formula","MW","ExactMass","CAS#","PubChemID","SMILES","InChI","RT"):
        put(k)

    out.append(f"Num Peaks: {len(peaks)}")
    for mz,inten in peaks:
        out.append(f"{format_mz(mz, mz_decimals_)} {inten}")

    return "\n".join(out) + "\n\n"

def curate_msp(in_path, out_path):
    with open(in_path,"r",encoding="utf-8",errors="ignore") as f:
        lines=f.readlines()
    blocks = iter_msp_blocks(lines)
    kept=0; dropped=0
    with open(out_path,"w",encoding="utf-8") as out:
        for block in blocks:
            headers, peaks, comment_items = parse_msp_block(block)
            curated = curate_block(headers, peaks, comment_items)
            if curated:
                out.write(curated); kept+=1
            else:
                dropped+=1
    print(f"Total: {len(blocks)}, Kept: {kept}, Dropped: {dropped}")

In [None]:
# Load the selected precursor type mapping
mapping_path = "Precursor_Type_Mapping.csv"
precursor_map_df = pd.read_csv(mapping_path)

# Convert to dictionary for quick lookup
precursor_type_to_computed_key = dict(zip(precursor_map_df["Precursor_type"], precursor_map_df["Computed_key_in_comments"]))

precursor_type_to_computed_key  # Show mapping dictionary for use in next step

In [None]:
# === Reuse precursor mapping from previous cell ===
precursor_key_map = precursor_type_to_computed_key

# === Helper regex to extract computed precursor mz from comments ===
PRECURSOR_REGEX = re.compile(r'"computed\s+\[.*?\]=([\d\.Ee+-]+)"')

def extract_precursor_mz(comment_items: List[str], computed_key: str) -> Optional[str]:
    """Find computed precursor m/z from comment block based on precursor type key."""
    target_prefix = f"computed {computed_key}="
    for item in comment_items:
        if item.strip().startswith(target_prefix):
            try:
                return item.split("=", 1)[1].strip()
            except IndexError:
                return None
    return None

def curate_block_with_precursor(headers,
                                 peaks,
                                 comment_items,
                                 min_peaks_: int = 3,
                                 require_ms2_: bool = True,
                                 mz_decimals_: Optional[int] = 4) -> Optional[str]:

    # === Spectrum quality filters ===
    spectrum_type = headers.get("Spectrum_type", headers.get("SPECTRUM_TYPE", "")).upper()
    if require_ms2_ and "MS2" not in spectrum_type and len(peaks) < max(min_peaks_, 2):
        return None
    if not require_ms2_ and len(peaks) < min_peaks_:
        return None

    if not passes_decimal_precision(peaks, min_decimal_places, min_fraction_with_decimals):
        return None

    # === Promote metadata ===
    meta = parse_comments_tokens("Comments: " + " ".join(f"\"{x}\"" for x in comment_items)) if comment_items else {}
    for k in ("CAS#","PubChemID","SMILES","InChI","InChIKey","RT"):
        if k in meta and meta[k]:
            headers[k] = meta[k]

    # === Precursor extraction logic ===
    precursor_type_raw = headers.get("Precursor_type")
    if precursor_type_raw:
        precursor_key = precursor_key_map.get(precursor_type_raw)
        if precursor_key:
            precursor_mz = extract_precursor_mz(comment_items, precursor_key)
            if precursor_mz:
                headers["Precursor_type"] = precursor_type_raw
                headers["PrecursorMZ"] = precursor_mz

    # === Format output ===
    out = []
    def put(k: str):
        v = headers.get(k)
        if v is not None and str(v).strip():
            out.append(f"{k}: {v}")

    for k in ("Name","Synon","DB#","InChIKey","Spectrum_type","Instrument_type","Instrument",
              "Ion_mode","Formula","MW","ExactMass","CAS#","PubChemID","SMILES","InChI",
              "Precursor_type","PrecursorMZ","RT"):
        put(k)

    out.append(f"Num Peaks: {len(peaks)}")
    for mz, inten in peaks:
        out.append(f"{format_mz(mz, mz_decimals_)} {inten}")

    return "\n".join(out) + "\n\n"  # <- block end


# Wrap up the full runner
def curate_msp_with_precursors(in_path: str, out_path: str):
    with open(in_path, "r", encoding="utf-8", errors="ignore") as f:
        lines = f.readlines()

    blocks = iter_msp_blocks(lines)
    kept = 0
    dropped = 0

    with open(out_path, "w", encoding="utf-8") as out:
        for block in blocks:
            headers, peaks, comment_items = parse_msp_block(block)
            curated = curate_block_with_precursor(headers, peaks, comment_items)
            if curated:
                out.write(curated)
                kept += 1
            else:
                dropped += 1

    print(f"✅ Total: {len(blocks)}, Kept: {kept}, Dropped: {dropped}")

In [None]:
curate_msp_with_precursors("MoNA-export-LC-MS-MS_Spectra.msp", "MoNA_curated_with_precursors.msp")

In [None]:
msp_file = "MoNA-export-LC-MS-MS_Spectra.msp"
precursor_types = []

with open(msp_file, "r", encoding="utf-8", errors="ignore") as f:
    current_type = None
    for line in f:
        if line.strip().lower().startswith("precursor_type:"):
            current_type = line.strip().split(":", 1)[1].strip()
            precursor_types.append(current_type)

# Count unique types
type_counts = Counter(precursor_types)

# Display results
print(f"Found {len(type_counts)} unique Precursor_type values.\n")
for ptype, count in type_counts.most_common():
    print(f"{ptype:15} → {count} entries")


In [None]:
# ---------- Set your file paths and run ----------
input_msp  = r"./MoNA-export-LC-MS-MS_Spectra.msp"            # change to your file
output_msp = r"./MoNA_curated.msp"    # desired output

curate_msp(input_msp, output_msp)
print("Saved to:", output_msp)