In [1]:
import numpy as np
import sys
import scipy.stats

In [2]:
%load_ext autoreload
%autoreload 2

In [3]:
sys.path.append('../codes')

In [4]:
%pprint

Pretty printing has been turned OFF


# Chromatogram

In [5]:
class Chromatogram(object):
    
    def get_relative_intensity(self, query_rt):
        raise NotImplementedError()
        
    def get_relative_mz(self, query_rt):
        raise NotImplementedError()
        
    def _rt_match(self, rt):
        raise NotImplementedError()

# Empirical Chromatogram

In [6]:
def chromatogramDensityNormalisation(rts, intensities):
    """
    Definition to standardise the area under a chromatogram to 1. Returns updated intensities
    """
    area = 0.0
    for rt_index in range(len(rts)-1):
        area += ((intensities[rt_index] + intensities[rt_index + 1])/2) / (rts[rt_index+1] - rts[rt_index])
    new_intensities = [x * (1 / area) for x in intensities]
    return new_intensities

In [7]:
chromatogramDensityNormalisation([2,3,4],[0.3,0.3,0.3])

[0.5, 0.5, 0.5]

In [8]:
class EmpiricalChromatogram(Chromatogram):
    """
    Empirical Chromatograms to be used within Chemicals
    """
    def __init__(self, rts, mzs, intensities):
        self.rts = [x - min(rts) for x in rts]
        self.mzs = [x - sum(mzs)/len(mzs) for x in rts] # may want to just set this to 0 and remove from input
        self.intensities = chromatogramDensityNormalisation(rts, intensities)

    def get_relative_intensity(self, query_rt):
        if self._rt_match(query_rt) == False:
            return None
        else:
            return((self.intensities[self._get_rt_neighbours_which(query_rt)[0]] + 
                    (self.intensities[self._get_rt_neighbours_which(query_rt)[1]]
                     -self.intensities[self._get_rt_neighbours_which(query_rt)[0]]) * self._get_distance(query_rt)))
        
    def get_relative_mz(self, query_rt):
        if self._rt_match(query_rt) == False:
            return None
        else:
            return((self.mzs[self._get_rt_neighbours_which(query_rt)[0]] + 
                    (self.mzs[self._get_rt_neighbours_which(query_rt)[1]]
                     -self.mzs[self._get_rt_neighbours_which(query_rt)[0]]) * self._get_distance(query_rt)))
        
    def _get_rt_neighbours(self, query_rt):
        rt_below = max(x for x in self.rts if x <= query_rt)
        rt_above = min(x for x in self.rts if x >= query_rt)
        return([rt_below, rt_above])
    
    def _get_rt_neighbours_which(self, query_rt):
        which_rt_below = self.rts.index(self._get_rt_neighbours(query_rt)[0])
        which_rt_above = self.rts.index(self._get_rt_neighbours(query_rt)[1])
        return([which_rt_below, which_rt_above])
        
    def _get_distance(self, query_rt):
        return((query_rt - self._get_rt_neighbours(query_rt)[0]) / 
               (self._get_rt_neighbours(query_rt)[0] - self._get_rt_neighbours(query_rt)[1]))
    
    def _rt_match(self, query_rt):
        if query_rt < min(self.rts) or query_rt > max(self.rts):
            return False
        else:
            return True

In [9]:
m = EmpiricalChromatogram([0,2,4,6,8],[1,2,3,4,5],[100,200,300,400,500])

In [10]:
print(m.get_relative_intensity(3))
print(m.get_relative_intensity(10))

0.25000000000000006
None


In [11]:
print(m.get_relative_mz(3))
print(m.get_relative_mz(10))

-2.0
None


# Functional Chromatogram

In [12]:
class FunctionalChromatogram(Chromatogram):
    """
    Functional Chromatograms to be used within Chemicals
    """
    def __init__(self, distribution, parameters, cutoff = 0.01):
        self.cutoff = cutoff
        self.mz = 0
        if distribution == "normal":
            self.distrib = scipy.stats.norm(parameters[0],parameters[1])
        elif distribution == "gamma":
            self.distrib = scipy.stats.gamma(parameters[0],parameters[1],parameters[2])
        elif distribution == "uniform":
            self.distrib = scipy.stats.uniform(parameters[0],parameters[1])
        else:
            raise NotImplementedError("distribution not implemented")
            
    def get_relative_intensity(self, query_rt):
        if self._rt_match(query_rt) == False:
            return None
        else:
            return(self.distrib.pdf(query_rt + self.distrib.ppf(self.cutoff/2)) * ( 1 / (1 - self.cutoff)))
        
    def get_relative_mz(self, query_rt):
        if self._rt_match(query_rt) == False:
            return None
        else:
            return self.mz

    def _rt_match(self, query_rt):
        if query_rt < 0 or query_rt > self.distrib.ppf(1-(self.cutoff/2)) - self.distrib.ppf(self.cutoff/2):
            return False
        else:
            return True

In [13]:
m = FunctionalChromatogram("normal",[0,1])

In [14]:
print(m.get_relative_intensity(0))
print(m.get_relative_intensity(6))

0.014605801037290308
None


In [17]:
print(m.get_relative_mz(0))
print(m.get_relative_mz(6))

0
None


# Chemicals

In [18]:
class Chemical(object):
    
    def __repr__(self):
        raise NotImplementedError()

    def get_mz_peaks(self, rt, ms_level, isolation_windows):
        raise NotImplementedError()
        
    def _rt_match(self, query_rt): # could remove this if we wanted to link chemicals and MSNs
        raise NotImplementedError()

# Unknown Chemicals

In [82]:
class UnknownChemical(Chemical):

    def __init__(self, mz, rt, max_intensity, chromatogram, children):
        self.mz = mz
        self.rt = rt
        self.max_intensity = max_intensity
        self.chromatogram = chromatogram
        self.children = children
        
    def __repr__(self):
        return 'Peak mz=%.4f rt=%.2f intensity=%.2f' % (self.mz, self.rt, self.max_intensity)

    def get_mz_peaks(self, query_rt, ms_level, isolation_windows):
        if not self._rt_match(query_rt):
            return None
        if not self._isolation_match(isolation_windows[0]):
            return None
        if ms_level == 1:
            intensity = self._get_intensity(query_rt)
            mz = self.mz + self.chromatogram.get_relative_mz(query_rt - self.rt)
            return [(mz, intensity)]
        else:
            mz_peaks = []
            for i in range(len(self.children)):
                mz_peaks.append(self.children[i].get_mz_peaks(query_rt,ms_level,isolation_windows)) #check whether append is right
            return mz_peaks
        
    def _get_intensity(self, query_rt):
        return(self.max_intensity * self.chromatogram.get_relative_intensity(query_rt - self.rt))
    
    def _rt_match(self, query_rt):
        if self.chromatogram._rt_match(query_rt - self.rt) == True:
            return True
        else:
            return False

    def _isolation_match(self, isolation_windows):                        
        # assumes list is formated like:
        # [[(ms1_min_1,ms1_max_1),(ms1_min_2,ms1_max_2),...],...,[(msn_min_1,msn_max_1),(msn_min_2,msn_max_2),...]]
        for window in isolation_windows:
            if (self.mz > window[0] and self.mz <= window[1]):
                return True
        return False

# Known Chemicals

# MSN

In [83]:
class MSN(object):
    def __init__(self, mz,
                 ms_level,                          # the ms-level: 1, 2, ...
                 parent_mass_prop,                  # proportion of parents mass
                 children=None,                     # other MSN objects which are children
                 parent= None):
        self.mz = mz
        self.ms_level = ms_level
        self.parent_mass_prop = parent_mass_prop
        self.children = children
        self.parent = parent

    def __repr__(self):
        return 'Peak mz=%.4f ms_level=%d' % (self.mz, self.ms_level) # may need to update naming convention

    def get_mz_peaks(self, query_rt, ms_level, isolation_windows):
        if not self._isolation_match(isolation_windows[ms_level-1]):
            return None
        if ms_level == self.ms_level:
            intensity = self._get_intensity(query_rt)
            mz = self.mz
            return (mz, intensity)
        else:
            mz_peaks = []
            for i in range(len(self.children)):
                mz_peaks.append(self.children[i].get_mz_peaks(ms_level,isolation_windows)) #check whether append is right
            return mz_peaks
        
    def _get_intensity(self, query_rt):
        return self.parent._get_intensity(query_rt) * self.parent_mass_prop
    
    def _isolation_match(self, isolation_windows):
        # assumes list is formated like:
        # [(min_1,max_1),(min_2,max_2),...],
        for window in isolation_windows:
            if (self.mz > window[0] and self.mz <= window[1]):
                return True
        return False

# Test

In [84]:
chrom = FunctionalChromatogram("normal", [0,1])
frag1 = MSN(50,2,0.5)
frag2 = MSN(50,2,0.7)
chem = UnknownChemical(100, 100, 10000, chrom, [frag1, frag2])
frag1.parent = chem
frag2.parent = chem

In [85]:
chem.get_mz_peaks(101,2,[[(0,300)],[(0,100)]])

[(50, 582.1254632557357), (50, 814.9756485580299)]

In [40]:
print(frag.children)

None
