In [None]:
from MS2LDA.motif_parser import load_m2m_folder
from MS2LDA.Add_On.MassQL.MassQL4MotifDB import load_motifDB, motifDB2motifs
from MS2LDA.utils import retrieve_spec4doc

from MS2LDA.Add_On.Fingerprints.FP_annotation import annotate_motifs as calc_fingerprints

import pickle
import tomotopy as tp
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
from adjustText import adjust_text

from rdkit.Chem import RDKFingerprint
from rdkit.DataStructs import TanimotoSimilarity
import numpy as np
from tqdm import tqdm
from rdkit import DataStructs
from rdkit.DataStructs.cDataStructs import ExplicitBitVect
from rdkit.Chem.Draw import MolsToGridImage
from rdkit.Chem import MolFromSmiles
from rdkit.Chem import MolFromSmarts


from typing import Dict, List, Optional
from rdkit.Chem import MolFromSmiles, rdFMCS, RDKFingerprint
from rdkit.DataStructs import TanimotoSimilarity
import numpy as np
from tqdm import tqdm

from collections import defaultdict
from matchms import calculate_scores
import numpy as np
import pandas as pd
from matchms import Spectrum
from matchms.similarity import CosineGreedy


The same procedure as showed below is followed for the rest of the MS2LDA runs as well: (600, 800, 1000). In order not to run the main function "process_motifs" everytime, the results can be saved in a pickle file.

In [3]:
motifDB_1, motifDB_2 = load_motifDB("/home/ioannis/thesis_data/positive_600/motifset_optimized.json")
motifs = motifDB2motifs(motifDB_2)
with open('/home/ioannis/thesis_data/positive_600/doc2spec_map.pkl', 'rb') as f:
    doc2spec_map = pickle.load(f)
lda_model = tp.LDAModel.load('/home/ioannis/thesis_data/positive_600/ms2lda.bin')

In [4]:
def safe_mol(smiles: Optional[str]):
    """
    Safely convert a SMILES string into an RDKit Mol object.

    Parameters
    ----------
    smiles : str or None
        A SMILES representation of a molecule. May be None or invalid.

    Returns
    -------
    Mol or None
        An RDKit Mol object if parsing succeeds, otherwise None.

    Notes
    -----
    This wrapper prevents RDKit parsing errors from interrupting
    downstream processing when annotations or candidate molecules
    contain invalid SMILES strings.
    """
    if smiles is None:
        return None
    try:
        return MolFromSmiles(smiles)
    except:
        return None


def calculate_sos(fp1, fp2):
    """
    Compute the Subset Overlap Score (SOS) between two binary fingerprints.

    Parameters
    ----------
    fp1, fp2 : list[int] or iterable of {0,1}
        Two molecular fingerprints represented as binary vectors.

    Returns
    -------
    float
        The fraction of 'on' bits in the smaller fingerprint that are
        also present in the larger fingerprint. Ranges from 0 to 1.

    Notes
    -----
    SOS is an asymmetric similarity measure. It quantifies whether the
    structural features of the smaller fingerprint are contained within
    the larger one, making it useful for motif–molecule consistency
    checks where subset relationships matter.
    """
    if sum(fp1) < sum(fp2):
        smaller_fp = fp1
        bigger_fp = fp2
    else:
        smaller_fp = fp2
        bigger_fp = fp1
    
    smaller_fp_sum = sum(smaller_fp)
    fp_intersection = 0
    for bit1, bit2 in zip(smaller_fp, bigger_fp):
        if bit1 == 1 and bit2 == 1:
            fp_intersection += 1

    if fp_intersection == 0:
        return 0
    else:
        return fp_intersection / smaller_fp_sum


def compute_pairwise_similarity(mols: List, threshold: float = 0.5) -> float:
    """
    Compute the fraction of molecule pairs with Tanimoto similarity above a threshold.

    Parameters
    ----------
    mols : list of RDKit Mol
        A list of molecules for which pairwise similarity is computed.
    threshold : float, optional
        Minimum Tanimoto similarity required for a pair to be counted
        as a match. Default is 0.5.

    Returns
    -------
    float
        The proportion of molecule pairs whose Tanimoto similarity
        meets or exceeds the threshold. Returns 0.0 if fewer than two
        molecules are provided.

    Notes
    -----
    This metric is used to quantify intra‑motif molecular homogeneity,
    i.e., whether molecules associated with a Mass2Motif tend to be
    structurally similar.
    """
    if len(mols) < 2:
        return 0.0
    fps = [RDKFingerprint(m) for m in mols]
    total = 0
    matches = 0
    n = len(fps)
    for i in range(n):
        for j in range(i + 1, n):
            total += 1
            if TanimotoSimilarity(fps[i], fps[j]) >= threshold:
                matches += 1
    return matches / total if total > 0 else 0.0



def compute_mcs_num_atoms(mols):
    """
    Compute the Maximum Common Substructure (MCS) across a set of molecules.

    Parameters
    ----------
    mols : list of RDKit Mol
        Molecules for which the MCS is computed.

    Returns
    -------
    tuple
        (num_atoms, smarts_string)
        num_atoms : int
            Number of atoms in the MCS. Returns 0 if no MCS is found.
        smarts_string : str
            SMARTS representation of the MCS. Empty string if MCS fails.

    Notes
    -----
    Uses RDKit's FMCS algorithm with constraints that enforce chemically
    meaningful matches (e.g., ring‑only matching). This is used to assess
    structural coherence of motif annotations.
    """
    if not mols:
        return 0, ""
    try:
        mcs = rdFMCS.FindMCS(
            mols,
            bondCompare=rdFMCS.BondCompare.CompareAny,
            completeRingsOnly=True,
            ringMatchesRingOnly=True,
            timeout=30,
        )
        return int(mcs.numAtoms), mcs.smartsString
    except:
        return 0, ""


In [None]:

def build_motif_to_docs(lda_model, prob_threshold: float = 0.5):
    """
    Construct a mapping from motif IDs to the spectra (documents) in which
    they appear above a given probability threshold.

    Parameters
    ----------
    lda_model : tomotopy.LDAModel
        The MS2LDA model containing per-document motif probabilities.
        Each document corresponds to one MS/MS spectrum.
    prob_threshold : float, optional
        Minimum motif probability required for a motif to be considered
        present in a document. Default is 0.5.

    Returns
    -------
    dict
        A dictionary mapping:
            motif_id (int) → list of document IDs (ints)
        where each document ID corresponds to a spectrum in which the
        motif is active above the specified threshold.

    Notes
    -----
    This mapping is fundamental for downstream motif evaluation. It
    determines which spectra contribute to:
        - spectral reproducibility checks
        - MCS support calculations
        - intra/inter molecular similarity
        - motif prevalence (degree)
    """
    motif_to_docs = defaultdict(list)
    for doc_id, doc in enumerate(lda_model.docs):
        for motif_id, prob in doc.get_topics():
            if prob >= prob_threshold:
                motif_to_docs[motif_id].append(doc_id)
    return motif_to_docs

In [6]:
def process_motifs(
    motifs,
    lda_model,
    doc2spec_map,
    motif_to_docs,
    prob_threshold: float = 0.5,
    sim_threshold: float = 0.5,
    sos_cal: bool = True
) -> dict:
    """
    Compute structural and similarity-based quality metrics for Mass2Motifs.

    Parameters
    ----------
    motifs : list
        Motif objects from a motifDB, each containing peaks and auto-annotations.
    lda_model : tomotopy.LDAModel
        MS2LDA model used to determine motif usage across documents.
    doc2spec_map : dict
        Mapping from document IDs to spectrum metadata (including SMILES).
    motif_to_docs : dict
        Mapping of motif_id → list of documents where the motif probability
        exceeds `prob_threshold`.
    prob_threshold : float, optional
        Minimum motif probability for including a document.
    sim_threshold : float, optional
        Tanimoto cutoff for candidate–candidate similarity.
    sos_cal : bool, optional
        If True, compute inter-similarity using SOS; otherwise use Tanimoto.

    Returns
    -------
    dict
        {
            "motif_ids": list of processed motif IDs,
            "num_atoms": MCS atom counts from annotation molecules,
            "len_frag_loss": number of MS2 features per motif,
            "mcs_smarts": SMARTS strings of MCS results,
            "molecules_by_motif": candidate RDKit molecules per motif,
            "intra_sims": similarity among candidate molecules,
            "inter_sims": similarity between representative fingerprint and candidates
        }

    Notes
    -----
    Motifs with ≤1 annotation or no qualifying documents are skipped.
    The function evaluates motif coherence by combining annotation chemistry,
    MS2LDA usage patterns, and molecular similarity metrics.
    """

    motif_ids = []
    num_atoms = []
    len_frag_loss = []
    mcs_smarts = []
    intra_sims = []
    inter_sims = []
    molecules_by_motif = {}

    for motif in tqdm(motifs):
        annotation = motif.get("auto_annotation", [])
        if len(annotation) <= 1:
            continue

        try:
            motif_id = int(motif.get("id").split("_")[1])
        except Exception:
            continue

        # if motif never appears in any doc above prob_threshold, skip
        if motif_id not in motif_to_docs:
            continue

        motif_ids.append(motif_id)
        len_frag_loss.append(len(getattr(motif.peaks, "mz", [])))

        # ---- MCS on annotation molecules ----
        ann_mols = [safe_mol(s) for s in annotation]
        ann_mols = [m for m in ann_mols if m is not None]
        n_atoms, smarts = compute_mcs_num_atoms(ann_mols)
        num_atoms.append(n_atoms)
        mcs_smarts.append(smarts)

        # ---- representative fingerprint from annotations ----
        rep_fp_ = calc_fingerprints([annotation], fp_type="maccs", threshold=0.9)[0]

        rep_fp = ExplicitBitVect(len(rep_fp_))
        for i, bit in enumerate(rep_fp_):
            if bit:
                rep_fp.SetBit(i)

        # ---- Collect candidate molecules using motif_to_docs ----
        candidate_mols = []
        for doc_id in motif_to_docs[motif_id]:
            spec = retrieve_spec4doc(doc2spec_map, lda_model, doc_id)
            mol = safe_mol(spec.get("smiles"))
            if mol is not None:
                candidate_mols.append(mol)

        molecules_by_motif[motif_id] = candidate_mols

        # ---- Intra similarity (candidates vs candidates) ----
        if len(candidate_mols) < 2:
            intra_sims.append(0.0)
        else:
            fps = [RDKFingerprint(m) for m in candidate_mols]
            total = 0
            matches = 0
            n = len(fps)
            for i in range(n):
                for j in range(i + 1, n):
                    total += 1
                    if TanimotoSimilarity(fps[i], fps[j]) >= sim_threshold:
                        matches += 1
            intra_sims.append(matches / total if total > 0 else 0.0)

        # ---- Inter similarity (motif representative vs candidates) ----
        if not candidate_mols:
            inter_sims.append(0.0)
        elif sos_cal:
            fps = [RDKFingerprint(m) for m in candidate_mols]
            rep_fp_list = list(rep_fp)

            sims = []
            for fp in fps:
                fp_list = list(fp)
                sim = calculate_sos(rep_fp_list, fp_list)
                sims.append(sim)

            inter_sims.append(float(np.mean(sims)) if sims else 0.0)
        else:
            fps = [RDKFingerprint(m) for m in candidate_mols]
            sims = [TanimotoSimilarity(rep_fp, fp) for fp in fps]
            inter_sims.append(float(np.mean(sims)) if sims else 0.0)

    return {
        "motif_ids": motif_ids,
        "num_atoms": num_atoms,
        "len_frag_loss": len_frag_loss,
        "mcs_smarts": mcs_smarts,
        "molecules_by_motif": molecules_by_motif,
        "intra_sims": intra_sims,
        "inter_sims": inter_sims
    }



In [7]:
prob_threshold = 0.5
motif_to_docs = build_motif_to_docs(lda_model, prob_threshold=prob_threshold)

results_sos = process_motifs(
    motifs,
    lda_model,
    doc2spec_map,
    motif_to_docs,
    prob_threshold=prob_threshold,
    sim_threshold=0.5,
    sos_cal=True
)


100%|██████████| 282/282 [03:35<00:00,  1.31it/s]


The computing time for the function above is around 4-5 minutes, so the results from each run are saved in pkl files.

After running the same workflow for all 4 runs of MS2LDA, the Mass2Motifs that remain after the process_motifs function are:
- From the 400 run: 173 Mass2Motifs
- From the 600 run: 244 Mass2Motifs
- From the 800 run: 295 Mass2Motifs
- From the 1000 run: 337 Mass2Motifs

So far the motifs have already passed 2 filtering steps. The optimization step from MS2LDA, and now also the filtering from the process_motifs function.

In [3]:
def load_run(path):
    with open(path, "rb") as f:
        cache = pickle.load(f)
    return cache["results_sos"], cache["motif_to_docs"]

In [27]:
results_sos, motif_to_docs = load_run("results_sos_1000.pkl")

In [28]:
print('The number of motifs after the processing is:', len(results_sos['motif_ids']))

The number of motifs after the processing is: 337


In [29]:
empty_mcs_count = sum(1 for mcs in results_sos["mcs_smarts"] if mcs == "")
print(empty_mcs_count)

8


In [30]:
print(results_sos)

{'motif_ids': [416, 518, 711, 760, 262, 343, 373, 911, 621, 178, 847, 154, 712, 992, 207, 575, 837, 378, 432, 747, 80, 351, 77, 280, 172, 785, 795, 688, 737, 681, 545, 204, 652, 87, 415, 322, 740, 50, 762, 977, 218, 478, 752, 519, 842, 145, 123, 976, 970, 850, 38, 531, 555, 138, 904, 812, 53, 45, 146, 70, 800, 609, 285, 566, 707, 628, 825, 868, 299, 605, 508, 139, 696, 215, 260, 775, 793, 700, 898, 65, 275, 28, 641, 325, 543, 880, 346, 339, 29, 468, 564, 502, 227, 969, 382, 449, 444, 809, 446, 542, 122, 851, 254, 930, 421, 901, 175, 403, 926, 52, 579, 606, 308, 156, 952, 283, 758, 500, 943, 756, 491, 481, 547, 59, 687, 972, 570, 480, 451, 475, 541, 355, 836, 101, 234, 465, 698, 528, 433, 600, 180, 199, 380, 498, 224, 521, 945, 209, 393, 829, 284, 354, 368, 167, 112, 932, 782, 678, 392, 83, 108, 424, 885, 327, 43, 777, 294, 922, 535, 190, 854, 200, 82, 786, 761, 794, 708, 660, 703, 993, 840, 151, 476, 844, 648, 549, 961, 120, 195, 239, 85, 363, 862, 613, 438, 41, 361, 335, 347, 619, 289

The dataframes below are provided just for a quick lookup of motifs based on their intra and inter similarity, and also the Maximum Common Substructure (mcs)

In [None]:
df = pd.DataFrame({
    "motif_id": results_sos["motif_ids"],
    "intra": results_sos["intra_sims"],
    "inter": results_sos["inter_sims"],
    "mcs_size": results_sos["num_atoms"],
    "n_features": results_sos["len_frag_loss"],
})

# Filter out motifs with low scores of intra and inter similarity
df_good = df[(df["intra"] => 0.1) &
    (df["inter"] => 0.3)].copy()

# Quality score used for sorting
df_good["quality_score"] = (df_good["intra"] + df_good["inter"])

df_good = df_good.sort_values("quality_score", ascending=False)

In [32]:
print(len(df_good))

5


In [33]:
df_good

Unnamed: 0,motif_id,intra,inter,mcs_size,n_features,quality_score
82,641,0.094862,0.0,1,1,0.094862
133,101,0.054579,0.0,0,9,0.054579
147,209,0.02509,0.0,0,6,0.02509
59,70,0.023164,0.0,0,12,0.023164
227,938,0.0,0.0,1,1,0.0


In [18]:
df_filtered = df[(df["inter"] > 0.1) & (df["mcs_size"] > 11) & (df["intra"] > 0.1)]
df_filtered

Unnamed: 0,motif_id,intra,inter,mcs_size,n_features
17,96,0.224599,0.484314,25,19
38,460,0.350649,0.559961,19,1
57,696,0.232288,0.43315,13,1
62,456,0.286232,0.297348,12,1
63,230,0.419966,0.443362,20,4
81,8,0.333333,0.449612,12,1
83,729,0.49887,0.237778,23,1
98,706,0.147059,0.41033,25,1
108,713,0.274775,0.286318,24,6
110,404,0.105051,0.429365,15,1
