In [1]:
from datasets import load_dataset

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
msg = load_dataset(
    'roman-bushuiev/MassSpecGym',
    data_files='data/MassSpecGym.tsv'
)

In [3]:
df = msg['train'].shuffle(seed=42).select(range(1000)).to_pandas()

In [4]:
df = df.dropna(subset=['smiles'])

In [5]:
from rdkit import Chem

def as_formula(smiles):
    mol = Chem.MolFromSmiles(smiles)
    formula = {}
    for atom in mol.GetAtoms():
        element = atom.GetSymbol()
        if element in formula:
            formula[element] += 1
        else:
            formula[element] = 1
    return formula

df['formula'] = df['smiles'].apply(as_formula)

In [6]:
df

Unnamed: 0,identifier,mzs,intensities,smiles,inchikey,formula,precursor_formula,parent_mass,precursor_mz,adduct,instrument_type,collision_energy,fold,simulation_challenge
0,MassSpecGymID0052279,"114.0915,149.0709,176.1072,191.0815,192.0768,1...","0.0013355700000000001,0.00890994,0.00139608,0....",CC(C)C1(C(=O)NC(=N1)C2=C(C=C(C=N2)COC)C(=O)O)C,NUPJIGQFXCQJBK,"{'C': 15, 'O': 4, 'N': 3}",C15H20N3O4,305.137524,306.14480,[M+H]+,Orbitrap,21.430136,train,True
1,MassSpecGymID0167344,"75.023003,77.038002,89.038002,91.054001,101.03...","0.059,0.006,1.0,0.046,0.052,0.038,0.427,0.126,...",C1=C2C3=C(C(=C1O)O)OC(=O)C4=CC(=C(C(=C43)OC2=O...,AFSDNFLWKVMVRB,"{'C': 14, 'O': 8}",C14H7O8,302.002724,303.01000,[M+H]+,Orbitrap,,train,False
2,MassSpecGymID0400927,"91.054169,93.069809,95.049164,95.085434,97.064...","0.14086688273294626,0.11596669766096218,0.0075...",CCCCCCCCCCCCCCCC/C=C\OC[C@H](COP(=O)(O)OCCN)OC...,URPXXNCTXCOATD,"{'C': 43, 'O': 7, 'P': 1, 'N': 1}",C43H79NO7P,751.551724,752.55900,[M+H]+,Orbitrap,,train,False
3,MassSpecGymID0055045,"42.0338,44.0495,58.0651,69.0448,71.0604,83.060...","1.0,0.3083083083083083,0.3383383383383383,0.02...",C1N2CN3CN1CN(C2)C3,VKYKSIONXSXAKP,"{'C': 6, 'N': 4}",C6H13N4,140.106224,141.11350,[M+H]+,Orbitrap,21.167025,train,True
4,MassSpecGymID0227317,"41.038471,41.270458,43.054085,44.049362,71.085...","0.00196,0.00248,0.00226,0.041749999999999995,0...",CC(C)(C)CNCCN1C2=C(C(=NC=C2)N)N=C1SC3=CC4=C(C=...,RVJIQAYFTOPTKK,"{'C': 22, 'N': 6, 'S': 1, 'O': 2}",C22H31N6O2S,442.215094,443.22237,[M+H]+,Orbitrap,60.000000,train,True
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
995,MassSpecGymID0199548,"107.049797,111.038628,121.027344,123.007416,12...","0.00220512,0.00213621,0.005742509999999999,0.0...",COC1=CC(=CC(=C1C(=O)C2=C(C=C(C=C2O)CO)O)C(=O)OC)O,WSRCOMOWYVKWBE,"{'C': 17, 'O': 8}",C17H17O8,348.082724,349.09000,[M+H]+,,,train,False
996,MassSpecGymID0231288,"58.065174,68.012993,70.065186,99.091698,101.10...","0.07695,0.010820000000000001,0.018680000000000...",CC(=O)NC1=CC(=NC=N1)OC2=CC3=C(C=C2)C(=CC=C3)C(...,MZZJNOOADWVFPD,"{'C': 30, 'O': 3, 'N': 6, 'F': 3}",C30H30F3N6O3,578.225324,579.23260,[M+H]+,Orbitrap,60.000000,test,True
997,MassSpecGymID0073570,"71.0604,78.0338,83.024,93.0083,96.0443,100.039...","0.07907907907907907,0.06906906906906907,1.0,0....",CCS(=O)(=O)C1=C(N=CC=C1)S(=O)(=O)NC(=O)NC2=NC(...,MEFOUWRMVYJCQC,"{'C': 14, 'S': 2, 'O': 7, 'N': 5}",C14H18N5O7S2,431.056924,432.06420,[M+H]+,Orbitrap,77.771556,test,True
998,MassSpecGymID0061856,"68.997002,74.014999,75.023003,77.038002,79.017...","1.0,0.01,0.075,0.577,0.032,0.01,0.548,0.172,0....",C1=CC(=C(C=C1C2=C(C(=O)C3=C(C=C(C=C3O2)O)O)O)O)O,REFJWTPEDVJJIY,"{'C': 15, 'O': 7}",C15H11O7,302.042724,303.05000,[M+H]+,Orbitrap,,train,False


In [7]:
from msbuddy import Msbuddy, MsbuddyConfig

# instantiate a MsbuddyConfig object
msb_config = MsbuddyConfig(
            ms_instr='orbitrap', # supported: "qtof", "orbitrap", "fticr" or None
                                    # custom MS1 and MS2 tolerance will be used if None
            ppm=True,  # use ppm for mass tolerance
            ms1_tol=5,  # MS1 tolerance in ppm or Da
            ms2_tol=10,  # MS2 tolerance in ppm or Da
            halogen=False)

# instantiate a Msbuddy object
msb_engine = Msbuddy(msb_config)

msbuddy: molecular formula annotation for MS-based small molecule analysis.


In [8]:
import matchms as mms
import numpy as np

def as_spectrum(row):
    mzs = np.array(tuple(float(e) for e in row['mzs'].split(',')))
    intensities = np.array(tuple(float(e) for e in row['intensities'].split(',')))  
    spectrum = mms.Spectrum(mz=mzs, intensities=intensities,
                            metadata={'smiles': row['smiles'],
                                      'precursor_mz': row['precursor_mz'],
                                      'formula': row['formula']})
    return spectrum
    
specs = df.apply(as_spectrum, axis=1)

In [9]:
specs

0      Spectrum(precursor m/z=306.14, 24 fragments be...
1      Spectrum(precursor m/z=303.01, 17 fragments be...
2      Spectrum(precursor m/z=752.56, 105 fragments b...
3      Spectrum(precursor m/z=141.11, 10 fragments be...
4      Spectrum(precursor m/z=443.22, 34 fragments be...
                             ...                        
995    Spectrum(precursor m/z=349.09, 51 fragments be...
996    Spectrum(precursor m/z=579.23, 33 fragments be...
997    Spectrum(precursor m/z=432.06, 10 fragments be...
998    Spectrum(precursor m/z=303.05, 42 fragments be...
999    Spectrum(precursor m/z=780.59, 42 fragments be...
Length: 1000, dtype: object

In [10]:
import matchms as mms
!rm tmp.mgf
mms.exporting.save_as_mgf(specs.tolist(), 'tmp.mgf')
msb_engine.load_mgf('tmp.mgf')

In [11]:
msb_engine._preprocess_and_generate_candidate_formula(0, 100)

Candidate space generation:   0%|[32m          [0m| 0/100 [00:00<?, ?it/s]

Candidate space generation: 100%|[32m██████████[0m| 100/100 [00:06<00:00, 15.46it/s]


In [12]:
msb_engine.config.parallel

False

In [13]:
from msbuddy.cand import MetaFeature, CandidateFormula, CandidateSpace, Formula, \
    FragExplanation
from numba import njit

@njit
def enumerate_subformula(pre_charged_arr: np.array) -> np.array:
    """
    Enumerate all subformulas of a candidate formula. (Numba version)
    :param pre_charged_arr: precursor charged array
    :return: 2D array, each row is a subformula array
    """
    n = len(pre_charged_arr)
    total_subform_cnt = np.prod(pre_charged_arr + 1)

    subform_arr = np.zeros((total_subform_cnt, n), dtype=np.int16)
    tempSize = 1

    for i in range(n):
        count = pre_charged_arr[i]
        repeatSize = tempSize
        tempSize *= (count + 1)

        pattern = np.arange(count + 1)

        repeated_pattern = np.empty(repeatSize * len(pattern), dtype=np.int16)
        for j in range(len(pattern)):
            repeated_pattern[j * repeatSize: (j + 1) * repeatSize] = pattern[j]

        full_repeats = total_subform_cnt // len(repeated_pattern)

        for j in range(full_repeats):
            start_idx = j * len(repeated_pattern)
            end_idx = (j + 1) * len(repeated_pattern)
            subform_arr[start_idx:end_idx, i] = repeated_pattern

    return subform_arr

@njit
def _calc_subform_mass(subform_arr: np.array, adduct_charge: int) -> np.array:
    """
    Calculate mass of each subformula.
    :param subform_arr: 2D array, each row is a subformula array
    :param adduct_charge: adduct charge
    :return: 1D array, mass of each subformula
    """
    mass_arr = np.empty(subform_arr.shape[0], dtype=np.float32)
    ele_mass_arr = np.array([12.000000, 1.007825, 78.918336, 34.968853, 18.998403, 126.904473, 38.963707, 14.003074,
                             22.989769, 15.994915, 30.973762, 31.972071], dtype=np.float32)
    for i in range(subform_arr.shape[0]):
        # element wise multiplication
        mass_arr[i] = np.sum(subform_arr[i, :] * ele_mass_arr, dtype=np.float32) - np.float32(adduct_charge * 0.0005486)
    return mass_arr


@njit
def _senior_subform_filter(subform_arr: np.array) -> np.array:
    """
    Filter subformulas by SENIOR rules.
    :param subform_arr: 2D array, each row is a subformula array
    :return: boolean array
    """
    senior_1_1_arr = 6 * subform_arr[:, 11] + 5 * subform_arr[:, 10] + 4 * subform_arr[:, 0] + \
                     3 * subform_arr[:, 7] + 2 * subform_arr[:, 9] + subform_arr[:, 1] + subform_arr[:, 4] + \
                     subform_arr[:, 3] + subform_arr[:, 2] + subform_arr[:, 5] + subform_arr[:, 8] + subform_arr[:, 6]
    senior_2_arr = np.sum(subform_arr, axis=1)
    senior_bool_arr = (senior_1_1_arr >= 2 * (senior_2_arr - 1))

    return senior_bool_arr
@njit
def _dbe_subform_filter(subform_arr: np.array, cutoff: float) -> np.array:
    """
    Filter subformulas by DBE.
    :param subform_arr: 2D array, each row is a subformula array
    :return: boolean array
    """
    dbe_arr = subform_arr[:, 0] + 1 - (subform_arr[:, 1] + subform_arr[:, 4] + subform_arr[:, 3] + subform_arr[:, 2]
                                       + subform_arr[:, 5] + subform_arr[:, 8] + subform_arr[:, 6]) / 2 + \
              (subform_arr[:, 7] + subform_arr[:, 10]) / 2
    dbe_bool_arr = dbe_arr >= cutoff
    return dbe_bool_arr

@njit
def _valid_subform_check(subform_arr: np.array, pre_charged_arr: np.array) -> np.array:
    """
    Check whether a subformula (frag and loss) is valid. e.g., 'C2', 'N4', 'P2'; O >= 2*P
    :param subform_arr: 2D array, each row is a subformula array
    :return: boolean array
    """
    # for frag or loss
    frag_atom_sum = np.sum(subform_arr, axis=1)
    loss_form_arr = pre_charged_arr - subform_arr
    loss_atom_sum = np.sum(loss_form_arr, axis=1)
    invalid_bool_arr = (frag_atom_sum == subform_arr[:, 0]) | (frag_atom_sum == subform_arr[:, 7]) | \
                       (frag_atom_sum == subform_arr[:, 10]) | (loss_atom_sum == loss_form_arr[:, 0]) | \
                       (loss_atom_sum == loss_form_arr[:, 7]) | (loss_atom_sum == loss_form_arr[:, 10])

    # O >= 2*P if P > 0
    invalid_o_p_frag_bool_arr = (subform_arr[:, 9] < 2 * subform_arr[:, 10]) & (subform_arr[:, 10] > 0)
    invalid_o_p_loss_bool_arr = (loss_form_arr[:, 9] < 2 * loss_form_arr[:, 10]) & (loss_form_arr[:, 10] > 0)

    invalid_bool_arr = invalid_bool_arr | invalid_o_p_frag_bool_arr | invalid_o_p_loss_bool_arr

    return ~invalid_bool_arr


def _assign_ms2_explanation(mf: MetaFeature, cf: CandidateFormula, pre_charged_arr: np.array,
                            subform_arr: np.array, mass_arr: np.array,
                            ppm: bool, ms2_tol: float) -> CandidateFormula:
    """
    Assign MS2 explanation to a candidate formula.
    :param mf: MetaFeature object
    :param cf: CandidateFormula object
    :param pre_charged_arr: precursor charged array
    :param subform_arr: 2D array, each row is a subformula array
    :param mass_arr: 1D array, mass of each subformula
    :param ppm: whether to use ppm as the unit of tolerance
    :param ms2_tol: mz tolerance for fragment ions / neutral losses
    :return: CandidateFormula object
    """
    candidate_space = None
    ion_mode_int = 1 if mf.adduct.pos_mode else -1
    for i in range(len(mf.ms2_processed.mz_array)):
        # retrieve all indices of mass within tolerance
        this_ms2_tol = ms2_tol if not ppm else ms2_tol * mf.ms2_processed.mz_array[i] * 1e-6
        idx_list = np.where(abs(mf.ms2_processed.mz_array[i] - mass_arr) <= this_ms2_tol)[0]

        if len(idx_list) == 0:
            continue

        # retrieve all subformulas within tolerance
        this_subform_arr = subform_arr[idx_list, :]
        this_mass = mass_arr[idx_list]

        # dbe filter (DBE >= 0)
        bool_arr_1 = _dbe_subform_filter(this_subform_arr, 0)

        # SENIOR rules filter, a soft version
        bool_arr_2 = _senior_subform_filter(this_subform_arr)

        # valid subformula check
        bool_arr_3 = _valid_subform_check(this_subform_arr, pre_charged_arr)

        # combine filters
        bool_arr = bool_arr_1 & bool_arr_2 & bool_arr_3
        this_subform_arr = this_subform_arr[bool_arr, :]
        this_mass = this_mass[bool_arr]

        # if no valid subformula, skip
        if this_subform_arr.shape[0] == 0:
            continue

        frag_exp = FragExplanation(mf.ms2_processed.idx_array[i],
                                   Formula(this_subform_arr[0, :], ion_mode_int, this_mass[0]),
                                   Formula(pre_charged_arr - this_subform_arr[0, :], 0))
        # add all subformulas
        if len(this_mass) > 1:
            for j in range(1, len(this_mass)):
                frag_exp.add_frag_nl(Formula(this_subform_arr[j, :], ion_mode_int, this_mass[j]),
                                     Formula(pre_charged_arr - this_subform_arr[j, :], 0))

        if candidate_space is None:
            # create CandidateSpace object
            candidate_space = CandidateSpace(cf.formula.array, pre_charged_arr, frag_exp_ls=[frag_exp])
        else:
            candidate_space.add_frag_exp(frag_exp)

    # if no MS2 explanation, return original candidate formula
    if not candidate_space:
        return cf

    # refine MS2 explanation
    ms2_iso_tol = ms2_tol if not ppm else ms2_tol * mf.mz * 1e-6
    ms2_iso_tol = max(ms2_iso_tol, 0.02)
    candidate_form = candidate_space.refine_explanation(mf, ms2_iso_tol)

    # copy other attributes
    candidate_form.ms1_isotope_similarity = cf.ms1_isotope_similarity
    candidate_form.formula_feature_array = cf.formula_feature_array

    return candidate_form

def assign_subformula_cand_form(mf: MetaFeature, ppm: bool, ms2_tol: float) -> MetaFeature:
    for k, cf in enumerate(mf.candidate_formula_list):
        # enumerate all subformulas
        subform_arr = enumerate_subformula(cf.charged_formula.array)

        # mono mass
        mass_arr = _calc_subform_mass(subform_arr, mf.adduct.charge)
        
        # assign ms2 explanation
        mf.candidate_formula_list[k] = _assign_ms2_explanation(mf, cf, cf.charged_formula.array, subform_arr, mass_arr,
                                                               ppm, ms2_tol)

    return mf

def _gen_subformula(mf: MetaFeature, ps: MsbuddyConfig) -> MetaFeature:
    if not mf.ms2_processed:
        return mf

    if not mf.candidate_formula_list:
        return mf

    # assign subformula annotation
    mf = assign_subformula_cand_form(mf, ps.ppm, ps.ms2_tol)

    # retain candidate formula with subformula annotations if there is any candidate formula with annotations
    cand_form_exp_ms2_peak_list = []
    for cf in mf.candidate_formula_list:
        if cf.ms2_raw_explanation:
            cand_form_exp_ms2_peak_list.append(len(cf.ms2_raw_explanation))
        else:
            cand_form_exp_ms2_peak_list.append(0)
    if min(cand_form_exp_ms2_peak_list) > 0:
        mf.candidate_formula_list = [cf for cf in mf.candidate_formula_list if len(cf.ms2_raw_explanation) > 0]

    return mf

In [14]:
from tqdm import tqdm
batch_data = msb_engine.data[0: 100]
for mf in tqdm(batch_data):
    modified_mf = _gen_subformula(mf, msb_engine.config)
    # modified_mf_ls.append(modified_mf)

 52%|█████▏    | 52/100 [00:14<00:13,  3.67it/s]


KeyboardInterrupt: 