In [1]:
import numpy as np
from collections import namedtuple

from pathlib import Path

In [2]:
Spec = namedtuple("Spec", ["mz", "int_"]) # Used for represent a spectra for convenience

def range_sel_max(mz:np.ndarray, int_:np.ndarray, sels:list[tuple[float, float]]):
    """Use for selecting the highest intensity value among a range of m/z

    Parameters
    ----------
    mz : np.ndarray
        m/z
    int_ : np.ndarray
        the corresponding intensity
    sels : list[tuple[float, float]]
        a list of selecting range. Each range is represented by a tuple of (min_mz, max_mz)
    """
    def max_sel(_mz, _int_, _min, _max):
        v = _int_[(_mz > _min) & (mz < _max)]
        if v.size == 0:
            return 0
        else:
            return v.max()
    return np.asarray([max_sel(mz, int_, min, max) for (min, max) in sels])

class QuanDA:
    def __init__(self, chem1:tuple[np.ndarray, np.ndarray], chem2:tuple[np.ndarray, np.ndarray], sel:list[tuple[float, float]]) -> None:
        """The QuanDA algorithm

        Parameters
        ----------
        chem1 : tuple[np.ndarray, np.ndarray]
            The spectra of one pure chemical (e.g. the Pure Pep-D) (m/z, intense)
        chem2 : tuple[np.ndarray, np.ndarray]
            The spectra of another pure chemical (e.g. the Pure Pep-N) (m/z, intense)
        sel : list[tuple[float, float]]
            The ranges of selected m/z regions. They will be used for extracting the highest intensity in each interval.
            Each interval is represented by a tuple of (min_mz, max_mz)
        """
        from copy import deepcopy
        self.sel = deepcopy(sel)
        self.before = range_sel_max(chem1[0], chem1[1], self.sel)
        self.after = range_sel_max(chem2[0], chem2[1], self.sel)
        self.before /= self.before.max()
        self.after /= self.after.max()

        self.coef = np.column_stack((self.before, self.after))

    def peak_sel(self, data:list[Spec]):
        """extract the isotopic peaks from the spectra.

        Parameters
        ----------
        data : list[Spec]
            list of spectra

        Returns
        -------
        np.ndarray
            The extracted intensity array
        """
        peaks = []
        for spec in data:
            _peak_int = range_sel_max(spec.mz, spec.int_, self.sel)
            peaks.append(_peak_int)
        return np.stack(peaks, 0)

    def calc(self, B, n_jobs=None):
        """Compute the percentage of `chem1` and `chem2` (refering to the parameters in `__init__`)

        Parameters
        ----------
        B : np.ndarray
            The matrix composed of the extracted isotopic peaks from samples.
        n_jobs : int, optional
            number of parallel computing workers, by default None

        Returns
        -------
        np.ndarray
            The estimated percentage of two chemicals.
        """
        from scipy.optimize import nnls
        import joblib
        @joblib.delayed
        def _nnls(A, b):
            return nnls(A, b)[0]
        results = joblib.Parallel(n_jobs=n_jobs)(
        [_nnls(self.coef, B[i, :]) for i in range(B.shape[0])])
        results = np.stack(results, 0)
        results /= results.sum(1, keepdims=True)
        return results        

In [3]:
def read_data(fpath):
    """Utilty function for loading data"""
    return np.loadtxt(fpath, unpack=True, dtype=float, skiprows=2)

In [4]:
range_sel = [(2007.4 + i, 2008.4 + i) for i in range(7)] # The interval that the istopic peaks would be
D_x, D_y = np.loadtxt(r"data\D.txt", unpack=True, dtype=float) # load the raw spectra, x is m/z, y is intensity
N_x, N_y = np.loadtxt(r"data\N.txt", unpack=True, dtype=float)
regressor = QuanDA((D_x, D_y), (N_x, N_y), range_sel) # construct the QuanDA class

In [5]:
# Load data
N1D9 = []
for f in Path(r"data\Asn2Asp18").iterdir():
    x, y = read_data(f)
    N1D9.append(Spec(x, y))
N9D1 = []
for f in Path(r"data\Asn18Asp2").iterdir():
    x, y = read_data(f)
    N9D1.append(Spec(x, y))

In [6]:
# extrac the isotopic peaks from spectra and form them to matrix
N1D9 = regressor.peak_sel(N1D9)
N9D1 = regressor.peak_sel(N9D1)

In [7]:
# calculate and print results
print("N1D9")
print("D%: {obj[0]:%} \t N%: {obj[1]:%}".format(obj=regressor.calc(N1D9).mean(0)))
print("N9D1")
print("D%: {obj[0]:%} \t N%: {obj[1]:%}".format(obj=regressor.calc(N9D1).mean(0)))

N1D9
D%: 86.284993% 	 N%: 13.715007%
N9D1
D%: 4.136729% 	 N%: 95.863271%
