# Calculate Mass Uncertainty Function definition

In [None]:
import pandas as pd
import numpy as np
from os import listdir
from os.path import isfile, join
import seaborn as sns
import matplotlib.pyplot as plt
import random
import scipy.signal as ss
import math

In [None]:
df = pd.read_csv("Example_Inputs/UncertaintyDf.csv")

The calculateMassUncertainty function takes a dataframe containing a processed spectrum with the following columns: formula, mz, observerd.  
It returns a dictionary with the uncertainty for each element calculated as the average of all the uncertainties for wich the element was present.  
By default it will take perform a weighted averged based on the number of elements in the compounds/ions.  
If weighted is set to False, it will only average the uncertainty based solely on the presence of the element.

In [None]:
from pyteomics import mass

def calculateMassUncertainty(processedSpectrum, weighted=True, dfOutput=True, show=False):
    data = [processedSpectrum["formula"],processedSpectrum["mz"]-processedSpectrum["observed"]]
    headers = ["formula", "uncertainty"]
    instance = pd.concat(data, axis=1, keys=headers)
    elements = {}
    for index, row in instance.iterrows():
        ion = row["formula"]
        ion = ion if ion[-1] != '-' else ion[:-1]
        tmp = mass.Composition(formula=ion)
        v = row["uncertainty"]
        total = sum(tmp.values())
        for e in tmp.keys():
            f = 1
            if weighted:
                f = tmp[e] / total
            if e not in elements:
                elements[e] = [v*f]
            else:
                elements[e].append(v*f)
    for e in elements.keys():
        elements[e] = sum(elements[e]) / len(elements[e])
    if show:
        keys = elements.keys()
        values = elements.values()
        plt.figure(1)
        plt.bar(keys, values)
    if dfOutput:
        df = pd.DataFrame(elements.items(), columns=['Element', 'Uncertainty'])
        return df
    else:
        return elements


In [None]:
%matplotlib notebook
#calculateMassUncertainty(df)
calculateMassUncertainty(df, False, True,True)

# Experiments with the function above
Here I try to graph the uncertainty for each element from all the spectra in the first small deviation file.  
If you want to try, just change the folder variable below.

**Important** Since this function requires processed spectra I am using Koli's code for that part. It is implemented with copy paste at the end, so run those cells before running mine.

In [None]:
folder = spectrum_data_directory = "C:/Users/Antonio/Google Drive/Orbitrap project/Data/First Large Deviation file/2 mins"
peak_list_file = "C:/Users/Antonio/Desktop/Esercizi prog/Data science project/peak list/peaklist_1e5_background.csv"

spectrum_data_files = []
spectrum_data = []

# Get data files
for file in [f for f in listdir(folder) if isfile(join(folder, f))]:
    path_to_file = join(folder, file)
    #print(path_to_file)
    if file.endswith(".csv"):
        spectrum_data_files.append(path_to_file)
for file in spectrum_data_files:
    spectrum_data.append(readFile(file))

peak_list = pd.read_csv(peak_list_file)

In [None]:
identified_spectra = []
uncertaintydf = pd.DataFrame({'Element' : [], 'Uncertainty' : []})
dfs = []
for i in range(len(spectrum_data)):
    identified_spectra.append(identifyPeaks(spectrum_data[i], peak_list, 0.002))
    dfs.append(calculateMassUncertainty(identified_spectra[i]))

In [None]:
uncertaintydf = pd.DataFrame({'Element' : [], 'Uncertainty' : []})
for i in range(1, len(dfs)):
    uncertaintydf = uncertaintydf.append(dfs[i], ignore_index = True)

In [None]:
names = uncertaintydf["Element"]
values = uncertaintydf["Uncertainty"]
plt.figure(2)
sns.set()
plt.xlabel("Elements")
plt.ylabel("Uncertainty")
plt.title("Uncertainty by element")
plt.scatter(names, values)

## Koli's code for identifying peaks
Imported Koli's code for identifying peaks. Run this before the cell above.  
I tried using 
```python
from ipynb.fs.full.<notebook_name> import <function_name>
```
but I wasn't able to make it work so I just copy pasted the cells.  
[An alternative](https://stackoverflow.com/questions/54317381/selectively-import-from-another-jupyter-notebook) that I didn't look much into was to import specific cells, but as I said, I didn't tried it very much

In [None]:
import numpy as np
import pandas as pd

def readFile(fileName):
    return pd.read_csv(fileName).iloc[1:,:2].to_numpy().astype("float64")

In [None]:
import numpy as np

def findPeakIndices(data):
    n = int(round(sum(data[:,1] == 0)/2))
    indices = np.zeros([n+1,2], dtype="uint32")
    a = False
    ii = 0
    for i in range(len(data[:,1])-1):
        if data[i,1] == 0 and a:
            indices[ii, 1] = i - 1
            if ii != n:
                indices[ii+1, 0] = i
            ii += 1
        a = data[i,1] == 0
    last = n-sum(indices[:,0]>=indices[:,1])
    indices = indices[0:last+1,:]
    indices[last, 1] = len(data[:,0]) - 1
    return indices

In [None]:
from scipy.optimize import leastsq
import numpy as np

def getMean(peak):
    mu = np.average(peak[:,0], weights=peak[:,1])
    sigma = mu/(280000*2*np.sqrt(2*np.log(2)))
    fitfunc  = lambda p, x: p[0]*np.exp(-0.5*((x-p[1])/sigma)**2)
    errfunc  = lambda p, x, y: (y - fitfunc(p, x))
    init  = [1.0, mu]
    out = leastsq(errfunc, init, args=(peak[:,0], peak[:,1]))
    c = out[0]
    return c

In [None]:
from scipy.optimize import minimize
import numpy as np
from scipy.signal import find_peaks

# Calculate mean square error for the model
def MSE(x, y, a, mu, sigma):
    norm = lambda a, mu, x: a*np.exp(-0.5*((x-mu)/sigma)**2)
    error = 0
    for i in range(len(y)):
        error = error + (y[i] - np.sum(norm(a,mu,x[i])))**2
    return error/len(y)


def errorFunction(x, y, sigma):
    return lambda params: MSE(x, y, params[::2], params[1::2], sigma)

# Function used to group values that are too close (abs(mu[n]-mu[n+1])<th) to each other together
def group(mu, th):
    groups = []
    unassigned = np.linspace(0,len(mu)-1,len(mu)).astype(int)
    while len(unassigned) > 0:
        dist = abs(np.array(mu[unassigned]) - mu[unassigned[0]])
        group = unassigned[dist < th]
        unassigned = unassigned[dist >= th]
        groups += [group]
    return groups

# Get one or more means depending on if there are multiple peaks in the data
# by fitting one or more Gaussian distributions to the data using resolution of 280000 
def getMeans(peaks):
    ind = find_peaks(peaks[:,1], max(peaks[:,1])/20, prominence=max(peaks[:,1])/10)[0]
    mu = peaks[ind,0]
    n = len(mu)
    sigma = mu/(280000*2*np.sqrt(2*np.log(2)))

    init  = []
    for i in range(len(mu)):
        init += [np.mean(peaks[:,1]), mu[i]]
    
    # Fit n Gaussian distributions to the data
    if n>1:
        f = errorFunction(peaks[:,0], peaks[:,1], sigma)
        params = minimize(f, init, method='BFGS').x
    else:
        params = getMean(peaks)
    
    a = params[::2]
    mu = params[1::2]
    
    # Group fitted distributions that are too close to each other together
    # and combine them
    groups = group(mu, 1e-5)
    a2 = np.zeros(len(groups))
    mu2 = np.zeros(len(groups))
    sigma2 = np.zeros(len(groups))
    for i in range(len(groups)):
        mu2[i] = np.mean(mu[groups[i]])
        a2[i] = np.sum(a[groups[i]])
        sigma2[i] = np.mean(sigma[groups[i]])
    return mu2, a2, sigma2


In [None]:
import numpy as np

# Get all peak means for the data
def getAllMeans(peaks):
    indices = findPeakIndices(peaks)
    n = len(indices[:,0])
    means = []
    aa = []
    sigmas = []
    peaki = []
    for i in range(n):
        peak = peaks[indices[i,0]:indices[i,1]+1,:]
        mu, a, sigma = getMeans(peak)
        for ii in range(len(a)):
            means += [mu[ii]]
            aa += [a[ii]]
            sigmas += [sigma[ii]]
            peaki += [i]
    return means, aa, sigmas, peaki

In [None]:
import pandas as pd
import numpy as np

# Identify peaks and return a data frame of peaks and their properties
def identifyPeaks(data, peaklist, th):
    peaklist['observed'] = np.zeros(len(peaklist['mz']))
    peaklist['a'] = np.zeros(len(peaklist['mz']))
    peaklist['sigma'] = np.zeros(len(peaklist['mz']))
    peaklist['peak'] = np.zeros(len(peaklist['mz']))
    peakMeans, a, sigmas, ind = getAllMeans(data)
    trueValue = peaklist['mz'].to_numpy()
    for i in range(len(trueValue)):
        if min(abs(peakMeans - trueValue[i])) < th:
            j = np.argmin(abs(peakMeans - trueValue[i]))
            peaklist.iloc[i,2] = peakMeans[j]
            peaklist.iloc[i,3] = a[j]
            peaklist.iloc[i,4] = sigmas[j]
            peaklist.iloc[i,5] = ind[j]
    unidentified = np.linspace(0, len(trueValue)-1, len(trueValue))
    unidentified = unidentified[peaklist['observed'].to_numpy() == 0]
    return peaklist.drop(unidentified)

In [None]:
def selectBestPeaks(data, n=10, test_n=0, randomTestSamples=True, threshold=0, forceThreshold=False, debug=False):
    if n <=0:
        return None
    result = data
    result["uncertainty"] =  abs(data["formula_mz"] - data["observed_mz"])
    result = result.sort_values(by=['uncertainty'], ascending=True) #specified ascending parameter for flexibility
    diff = result
    test_n = abs(test_n)
    
    if debug:    
        print(result)
    
    if threshold:
        threshold = abs(threshold)
        filtered = result[result['uncertainty']<=threshold]
    
    if len(result) > n:
        if threshold:
            if len(filtered) > n:
                result = filtered[0:n]
            else:
                if forceThreshold:
                    result = filtered
                else:
                    result = result[0:n]
        else:
            result = result[0:n]
    else:
        if forceThreshold and threshold:
            result = filtered
        else:
            x = max(len(result), n)
            result = result[0:x]
            
    diff = pd.concat([result,diff]).drop_duplicates(keep=False)
    if test_n != 0:
        test_n = min(test_n, len(diff))
        if randomTestSamples:
            diff = diff.sample(test_n)
        else:
            diff = diff[0:test_n]
    return result, diff

In [None]:
df = pd.read_csv("Example_Inputs/UncertaintyDf.csv")

In [None]:
a, b = selectBestPeaks(df, 5, 10, True, 1.397722e-06, False)

In [None]:
a

In [None]:
b