In [4]:
'''
tool to handle the spectra
'''
import pandas as pd
import numpy as np
from collections import namedtuple
import logging, os
# log = logging.getLogger(os.path.basename(__file__))
from pyteomics import mzml, auxiliary


In [8]:
def mzML2DataFrames(filename): #this is with pytheomics
    scans = []
    peaks_dfs = []
    
    with mzml.read(filename) as reader:
        for item in reader:
            id  = item['id']
            idx = item['index']
            fs  = item['scanList']['scan'][0]['filter string']vbxbzb #TODO check if there is more than one
            time = item['scanList']['scan'][0]['scan start time'] # * 1 to make a unitfloat into a float
            msLevel = item['ms level']
            positive_scan = True if 'positive scan' in item else False
            if not positive_scan: item['negative scan'] # raise exceltion if not positive or negative
            p_data = item.get('precursorList',None) #helper
            precursor_id = p_data['precursor'][0]['spectrumRef'] if p_data else Nonezsdcasdf#check if more than one
            target_mz = p_data['precursor'][0]['isolationWindow']['isolation window target m/z'] if p_data else None
            max_i = item['base peak intensity']
            tic = item['total ion current']
            
            #collect the scans data
            row = (id,idx,fs,time,msLevel,positive_scan,precursor_id,max_i, tic, target_mz)
            scans.append(row)
            
            #collect the peaks data
            i   = item['intensity array']
            m   = item['m/z array']
            cols = {'m':m, 'i':i}
            df = pd.DataFrame(cols)
            df['id']=id
            df.set_index('id', inplace = True)
            peaks_dfs.append(df)
            
            #for testing
            if len(scans) >100: #TODO remove this
                print(' remove this ')
                break
        
        scansDF = pd.DataFrame(scans, columns=['id','idx','filter_string','time','msLevel','positive_scan','precursor_id', 'max_i', 'tic','target_mz'])
        scansDF.set_index('id', inplace = True)
        peaksDF = pd.concat(peaks_dfs)

    return scansDF, peaksDF


In [54]:
class SpectraUtil:
    'Util to handle spectra'
    
    def __init__(self, scansDF, peaksDF, filename = None):
        self._original_scansDF = scansDF
        self._original__peaksDF = peaksDF
        self.scansDF = self._original_scansDF
        self.peaksDF = self._original__peaksDF
        self._filename = filename

    def fromFile(filename):
        return SpectraUtil(*mzML2DataFrames(filename), filename)
    
    #note to help debug maybe use
    # @property
    # def scansDF(self):
    #     return self.scansDF
    

    def reset(self):
        print(f'reseting to original')
        self.scansDF = self.self._original_scansDF

    def get_reset_copy(self):
        print(f'a copy of the original with nothing set... sorry no undo')
        return SpectraUtil(self._original_scansDF, self._original__peaksDF, self._filename)
    
    def get_current_copy(self):
        print(f'a copy of the current set... just in case')
        return SpectraUtil(self.self.scansDF, self.self.peaksDF, self._filename)

    def set_timerange(self,t0,t1):
        print(f'time range in seconds: {t0} to {t1}')
        self.scansDF = self.scansDF.loc[self.scansDF.time.multiply(60).between(t0,t1)]
    def set_mode(self,positive_mode=True):
        print(f'set mode to positive : {positive_mode}, false means negative ')
        self.scansDF = self.scansDF.loc[self.scansDF.positive_scan == positive_mode]
    def set_ms_level(self,level=1):
        print(f'set ms level to  : {level}')
        self.scansDF = self.scansDF.loc[self.scansDF.msLevel == level]

    def set_mass_range(self,m0,m1):
        print(f'time mass range from: {m0} to {m1}')
        self.peaksDF = self.peaksDF.loc[self.peaksDF.m.between(m0,m1)]
    
    def make_rel_i(self):
        print(f'calculate the relative intensities as: rel_i')
        #left_ and right_ index to keep the index
        spectraDF = self.peaksDF.merge(self.scansDF.max_i, left_index=True, right_index=True) 
        self.peaksDF['rel_i'] = spectraDF.i / spectraDF.max_i
    
    def set_min_i(self, min_i = 0):
        print(f'set the minimum intensity to {min_i}')
        self.peaksDF = self.peaksDF.loc[self.peaksDF.i > min_i]
    
    def round_m(self, decimals=4):
        print(f'set the precision of m/z to {decimals} decimal places')
        self.peaksDF['m'] = self.peaksDF.m.round(decimals)
    
    def get_fragments(self, scan_index = None):
        print(f'Peaks triggered by the scans if scan_index is none, else the ones from the scan_index')
        if scan_index == None:
            res = self.peaksDF.loc[self.peaksDF.index.isin(self.scansDF.precursor_id)]
        else:
            res = self.peaksDF.loc[scan_index]
        return res
    
    def get_triggered_scans()



In [55]:
filename = 'test_resources\\small_test\\190321_Serum_Lipidextract_368723_01.mzML'
spectraUtil = SpectraUtil.fromFile(filename)

remove this 


In [61]:

spectraUtil.peaksDF.merge(spectraUtil.scansDF.setindex())

Unnamed: 0_level_0,m,i
id,Unnamed: 1_level_1,Unnamed: 2_level_1
controllerType=0 controllerNumber=1 scan=1,354.751740,158.179474
controllerType=0 controllerNumber=1 scan=1,371.186859,162.768585
controllerType=0 controllerNumber=1 scan=1,401.924194,193.525879
controllerType=0 controllerNumber=1 scan=1,419.076324,161.930161
controllerType=0 controllerNumber=1 scan=1,450.792603,193.176483
...,...,...
controllerType=0 controllerNumber=1 scan=101,420.265381,40343.316406
controllerType=0 controllerNumber=1 scan=101,420.280701,50330.796875
controllerType=0 controllerNumber=1 scan=101,420.348145,16238.416016
controllerType=0 controllerNumber=1 scan=101,420.363007,26733.033203
