name: 241114_data_analysis \
date: 11/14/2024 \
version: 2.0 \
github root: #11 \
author: Justin Sankey, Johanna Ganglbauer \

**description**: Takes raw liquit chromatography mass spectroscopy (LCMS) data (exported table from SCIEX Analyst Software), computes recovery rates, method detection limits, and ratios of default channel to MS TOF channel. Generates plots and writes results to excel and creates long format table (.csv) for data publications.

**changes in comparison to previous version**:
*   inputs through files: when the code is run for the first time it creates an empty .csv file containing all your sample names as rows. You should indicate your sample volume/weight, unit and if the sample is considered for the mdl calculation.
The same holds true for the threshold of your recovery rates per extracted standard.
*   order/mass tables: in order to facilitate the selection of samples for the mdl evaluation the table of detected PFAS masses as well as a plot with the PFAS concentrations per sample is created in the beginning.
*   final concentration per ml or g is evluated for each sample separately based on new inputs.
*   recovery rates: each extracted standard (IDA) has individual limits for the allowed recovery range. If both core and extended method are available, you can know choose to use (i) only the recovery rates of the core method, (ii) only the recovery rates of the extended method, (iii) the average of recovery rates of core and extended method
*   channel ratios: are also evaluated for standards. Are set to NaN if one of the channels (TOF MS or MS/MS is not included in either method)

**what needs to be implemented**:
*   JB: read in different IDL files depending on sample matrix
*   ...
*   ...
*   **feel free to add your thoughts!**

**contact/help/complaints:** johanna.ganglbauer@uri.edu

# Specifying Inputs - Loading files and packages
The block below will load all python packages needed for the following analysis.

**Uncomment the first code block if you are using google colab and want to connect to the cloud**

In [None]:
# connect google colab with your google drive
# from google.colab import drive
# drive.mount('/content/drive')

# import all needed packages
import os
import numpy as np
import seaborn as sns
from openpyxl import load_workbook
from openpyxl.drawing.image import Image
import matplotlib.pyplot as plt
import matplotlib.patches as mpatches
import pathlib as pali

# import pandas and suppress warnings
import pandas as pd
pd.options.mode.chained_assignment = None

**Enter in your input filepaths:**

(1) raw_data_filepaths is a list of files pointing to the exported table from the Sciex Analyst Software you want to analyze. Each file representing results from the core method must end with '_core', each file representing results from the extended method must end with '_extended'. \
(2) idl_filepath is a .csv file containing instrumentation detection limits (IDLs)

You can copy the filepath of each file on windows by right clicking on the file in your file explorer and choosing the option *Copy as Path*. \
The same works on Mac OS: Control-click or right-click on the file in Finder. Press the Option (Alt) key. Choose *Copy [filename] as Pathname*.

**Enter in your output filepaths:** \
When calling the script for the first time, two csv tables will be created which assist you in providing all the input parameters: \
(1) sample_input_parameter_filepath: contains a list of all samples in your input data and empty fields for you to provide the original sample volume or weight and allows you to decide if the sample should be considered for the evaluation of method detection limits. In the variable below you indicate where the sample_input_file should be saved to. Once the input file was created it will not be changed by the script. \
(2) threshold_input_parameter_filepath: contains a list of all injection standards and allows you to provide the valid range of recovery rates. In the variable below you indicate where the threshold_input_parameter should be saved to. Once the input file was created it will not be changed by the script.

Finally, the script will create a .csv file containing the most relevant data, as well as an excel file containing the most relevant results of the evaluation.\
(1) The variable processed_filepath indicates location and file prefix where your results should be saved to. \
(2) The variable plot_directory indictes location where your plots should be saved to.

**Make sure all your input and output filepaths are within apostrophes and start with an r. (r'...')** \
The appostrophes make python understand the variable is a string (text) and not a number.\
The r in front of the string (raw string) makes python interpret the backslashes in the file path correctly.

In [None]:
### INPUT FILEPATHS
# raw data upload file path
raw_filepaths = [
    r'example_data_raw\20240920_PFAS_Standard_Check_Updated_core.txt',
]

# file paths for IDL and IQL data - not meant to be adopted
idl_filepath = r'example_data_raw/IDL_2024.csv'

### OUTPUT FILLEPATHS
# input parameter filepath
sample_input_parameter_filepath = r'sample_input_parameter.csv'
threshold_input_parameter_filepath = r'threshold_input_parameter.csv'

# processed data output excel/csv file path - without .xlsx/.csv ending
processed_filepath =r'example_data_processed/test_data_justin_check'

# directory to save plots to
plot_directory = r'example_figures'

**Enter in your parameters:**

(1) Indicate the maximal allowed percentage deviation between MS TOF channel and MS/MS channel. \
(2) Provide a default value for the instrumentation detection limit (IDL) in case the component is not listed in the csv. \
(3) Indicate which data you want to use to determine recovery rates: \
    (i)'core': only data from core method is used to determine recovery rates. This means peak integration on non-extracted standards (IPS) must be done for core method only. \
    (ii) 'extended': only data from from extended methode is used to determine recovery rates. This means peak integration of non-extracted standards (IPS) must be done for extended method only. \
    (iii) 'average': the average peak areas from core method and extended methods are used to determine recovery rates. This means peak integration of IPS peaks must be done for both core method and extended method.

In [None]:
# threshold for acceptance of absolute percentage difference between default channel and TOF MS channel
allowed_channel_deviation = 30

# idl default value
idl_unknown = 1e-3

# data to determine recovery rates:
# available options: 'core', 'extended' and 'average'
recovery_data = 'core'

# sanity check - will produce an error if you indicated a variable recovery_data which is not allowed.
if recovery_data not in ['core', 'extended', 'average']:
    raise Exception("""
        Invalid input for variable recovery_data, use either 'core', 'extended', or 'average'.
                    """)

# will be used to identify standards by the column 'Component Name'
standard_identifiers = 'IDA|IPS|13C|d-|d3-|d5-|18O'

The following code block makes sure that the output folders exist and reads in data. Moreover, it sets the display options to show data tables in your console.

In [None]:
# Ensure file path and folder path exist to write outputs to and create folders, if they do not exist
folder_path = os.path.dirname(processed_filepath)
if folder_path and not os.path.exists(folder_path):
    os.makedirs(folder_path)
processed_filepath_xlsx = processed_filepath + '.xlsx'
processed_filepath_csv = processed_filepath + '.csv'

if not os.path.exists(plot_directory):
    os.makedirs(plot_directory)

# Define columns of input which are needed for further processes:
columns_considered = [
    'Sample Name', 'Sample Index', 'Sample Comment', 'Sample Type',
    'Component Name',  'Component Group Name', 'Component Comment', 'IS Name',
    'Acquisition Date & Time', 'Injection Volume', 'Used',
    'Calculated Concentration', 'Actual Concentration',
    'Reported Recovery', 'IDA Average Response Factor',
    'Area',
]

# Load input data files and put them all in one dataframe
data = pd.DataFrame()  # initialize empty data frames
sample_index = 0  # initialize Component Index
for file in raw_filepaths:
    # read in file
    if file[-4:] == '.csv':
        this_data = pd.read_csv(file, delimiter=',', encoding='utf-8', low_memory=False, header=0,)
    elif file[-4:] == '.txt':
        this_data = pd.read_csv(file, delimiter='\t', encoding='utf-8', low_memory=False, header=0,)
    else:
        raise ImportError('Raw input file paths must either be .csv or .txt files.')

    # increase component index to remain unique for multiple data frames
    this_data['Sample Index'] = this_data['Sample Index'] + sample_index
    sample_index += max(this_data['Sample Index'])
    check = this_data['Sample Index'].value_counts()

    # make sure each sample name ends with Ext for extended method and with Core for core method
    if file[-12:-4] == 'extended':
        mask_names = this_data['Sample Name'].str.endswith('Ext')
        this_data['Sample Name'][~mask_names] = [compound + ' Ext' for compound in this_data['Sample Name'][~mask_names].to_list()]

        # checks if extended data has 107 channels
        for index, elem in check[(check != 108)].items():
            print(f'The sample with Index {index} has {elem} channels.')

    elif file[-8:-4] == 'core':
        mask_names = this_data['Sample Name'].str.endswith('Core')
        this_data['Sample Name'][~mask_names] = [compound + ' Core' for compound in this_data['Sample Name'][~mask_names].to_list()]

        # checks if extended data has 107 channels
        for index, elem in check[(check != 142)].items():
            print(f'The sample with Index {index} has {elem} channels.')
    else:
        print('If you combine core method and extended method make sure your input file names end with _core and _extended respectively.')

    # append actual dataframe in list (this_data) to huge dataframe (data)
    if data.empty:
        data = this_data[columns_considered]  # initialize data in first step (when data is empty)
    else:
        data = pd.concat([data, this_data[columns_considered]], ignore_index=True)  # append to data

# display settings
pd.set_option('display.max_rows', None)  # Show all rows
pd.set_option('display.max_columns', None)  # Show all columns
pd.set_option('display.max_colwidth', None)  # Show full width of columns

The following Code block cleans up data: \
 (i) replace strange strings with NaN, \
 (ii) make sure the component IPS-13C2_PFOA is not used as non extracted standard, replace it with IPS-13C4_PFOA, \
 (iii) make sure all _TOF MS channel names are labelled right.

In [None]:
# Clean up 'Calculated Concentration' column - set all strange strings to NaN
data['Calculated Concentration'] = data['Calculated Concentration'].replace(
    {'<1 points': np.nan, '< 0': np.nan, 'no root': np.nan, 'NaN': np.nan}
    ).astype('float')

# Replace 'IPS-13C2_PFOA' values: optional, only when component name occurs
if any(data['Component Name'].isin(['IPS-13C4_PFOA'])):
    data['Component Group Name'] = data['Component Group Name'].replace('IPS-13C2_PFOA', 'IPS-13C4_PFOA')

    # Find rows where 'Component Group Name' is 'IPS-13C4_PFOA' (after replacement)
    mask = data['Component Group Name'] == 'IPS-13C4_PFOA'

    # Iterate through each of these rows and replace area in column
    for idx, row in data[mask].iterrows():
        sample_name = row['Sample Name']

        # Find the corresponding row with 'Component Name' == 'IPS-13C4_PFOA' and the same 'Sample Name'
        matching_row = data[(data['Component Name'] == 'IPS-13C4_PFOA') & (data['Sample Name'] == sample_name)]

        if not matching_row.empty:
            # Update the 'Area IPS' with the value from 'Area' in the matching row
            data.at[idx, 'Area IPS'] = matching_row['Area'].values[0]

# Correct channel names in original data (all of the TOF channels are labelled by _TOF MS, only 2 of them are labeled by only _TOF)
mask_names = data['Component Name'].str.endswith('_TOF')
data['Component Name'][mask_names] = [compound + ' MS' for compound in data['Component Name'][mask_names].to_list()]

# some have an underscore between TOF and MS, this is removed
mask_names = data['Component Name'].str.endswith('_TOF_MS')
data['Component Name'][mask_names] = [compound[:-3] + ' MS' for compound in data['Component Name'][mask_names].to_list()]

The following code block combines samples fromt the extended method and the core method and gives them the same sample index.

In [None]:
# make sample index equal if one sample exist as both "Core Method" and "Extended Method"
sample_names = data['Sample Name'].unique()

# combine indices of Core Method and Extended Method if both are available
detect = 0
index_mapper = {}
one_method_only_index = []
# loop over unique sample names
for sample_name in sample_names:
    # if sample name exists for the related core method:
    # search related core samples
    # reset index of the core samples
    # make sure the (index: sample name tuple is saved to index mapper)
    # account for exceptions: more or less samples of core method than of extended one.
    if sample_name[-3:] == 'Ext':
        if sample_name[:-3] + 'Core' in sample_names:
            sample_indices_core = data.loc[data['Sample Name'].isin([sample_name[:-3] + 'Core']), 'Sample Index'].value_counts().index
            sample_indices_extended = data.loc[data['Sample Name'].isin([sample_name]), 'Sample Index'].value_counts().index
            for index_core, index_extended in zip(sample_indices_core, sample_indices_extended):
                data.loc[data['Sample Index'].isin([index_core, index_extended]), 'Sample Index'] = index_core
                index_mapper[index_core] = sample_name[:-3]
                detect+=1

            if len(sample_indices_core) < len(sample_indices_extended):
                print(f'At least one of the sample {sample_name} has no related Core Method. Find out if this is a problem.')
                for index_extended in sample_indices_extended[len(sample_indices_core):]:
                    index_mapper[index_extended] = sample_name
                    one_method_only_index.append(index_extended)
                    detect+=1

            elif len(sample_indices_core) > len(sample_indices_extended):
                print(f'At least one of the sample {sample_name} has no related extended Method. Find out if this is a problem.')
                for index_core in sample_indices_core[len(sample_indices_extended):]:
                    index_mapper[index_core] = sample_name
                    one_method_only_index.append(index_core)
                    detect+=1
        else:
            print(f'The sample {sample_name} has no related Core Method. Find out if this is a problem.')
            sample_indices_extended = data.loc[data['Sample Name'].isin([sample_name]), 'Sample Index'].value_counts().index
            for index_extended in sample_indices_extended:
                index_mapper[index_extended] = sample_name
                one_method_only_index.append(index_extended)
                detect+=1

    # catch all samples from the methods, which do not have an extended one.
    elif sample_name[-4:] == 'Core':
        if not sample_name[:-4] + 'Ext' in sample_names:
            print(f'The sample {sample_name} has no related Extended Method. Find out if this is a problem.')
            sample_indices_core = data.loc[data['Sample Name'].isin([sample_name]), 'Sample Index'].value_counts().index
            for index_core in sample_indices_core:
                index_mapper[index_core] = sample_name
                one_method_only_index.append(index_core)
                detect+=1
    else:
        print(f'Make sure your input data name ends either with _core or with _extended.')

In the following code block the order of components is conserved and split to default channel (MS/MS) screening fragmented masses, and TOF channel screening precursor masses.
If either channel is not available it is set to nan.

In [None]:
# extract channel names from first sample and separate them into standards and non-standards
first_sample_id = data['Sample Index'].value_counts().index[0]  # index of first sample
components_filtered = data.loc[data['Sample Index'] == first_sample_id, 'Component Name']  # channel names of first sample
components_sorted = components_filtered[~components_filtered.str.contains(standard_identifiers)].to_list()  # channel names excluding IPS and IDA
ida_ips_sorted = components_filtered[components_filtered.str.contains(standard_identifiers)].to_list()  # channel names excluding IPS and IDA

# initialize and fill lists of sorted components
components_fragmented = []
components_precursor = []
skip_components = []
for component in components_sorted:
    if component in skip_components:
        continue
    if '_TOF MS' in component:
        components_precursor.append(component)
        skip_components.append(component)
        if component[:-7] in components_sorted:
            components_fragmented.append(component[:-7])
        else:
            components_fragmented.append(np.nan)
        skip_components.append(component[:-7])
    else:
        components_fragmented.append(component)
        skip_components.append(component)
        if component + '_TOF MS' in components_sorted:
            components_precursor.append(component + '_TOF MS')
        else:
            components_precursor.append(np.nan)
        skip_components.append(component + '_TOF MS')

# initialize and fill lists of sorted internal standards
ida_ips_fragmented = []
ida_ips_precursor = []
skip_standards = []
for standard in ida_ips_sorted:
    if standard in skip_standards:
        continue
    if '_TOF MS' not in standard:
        ida_ips_fragmented.append(standard)
        skip_standards.append(standard)
        if standard[4:] + '_TOF MS' in ida_ips_sorted:
            ida_ips_precursor.append(standard[4:] + '_TOF MS')
            skip_standards.append(standard[4:] + '_TOF MS')
        else:
            ida_ips_precursor.append(np.nan)
    else:
        if 'IDA-' + standard[:-7] in ida_ips_sorted:
            ida_ips_precursor.append(standard)
            skip_standards.append(standard)
            if standard[4:] + '_TOF MS' in ida_ips_sorted:
                ida_ips_fragmented.append('IDA-' + standard[:-7])
                skip_standards.append('IDA-' + standard[:-7])
            else:
                ida_ips_fragmented.append(np.nan)
        elif 'IPS-' + standard[:-7] in ida_ips_sorted:
            ida_ips_precursor.append(standard)
            skip_standards.append(standard)
            if standard[4:] + '_TOF MS' in ida_ips_sorted:
                ida_ips_fragmented.append('IPS-' + standard[:-7])
                skip_standards.append('IPS-' + standard[:-7])
            else:
                ida_ips_fragmented.append(np.nan)
        else:
            print(f'The standard: {standard} has no corresponding IDA or IPS in the default MS channel. It is ignored in the following calculations.')

The following code block separates data in calibration data, and quantification data.

In [None]:
# Split data into quantification data, calibration data, and blanks for mdl calculation
calibration_only = data[(data['Sample Type'] == 'Standard')]
quantification_blank = data[(data['Sample Type'] != 'Standard')]

In the following section a table with concentrations (masses) of PFAS for all samples and all analyzed PFAS components will be created and shown as well as a plot which shows the concentrations (masses).

Note: The LCMS quantifies ng/sample which is commonly referred to as concentration.

In [None]:
# get rid of np.nan component names when used as index to create tables
pfas_components = [component for component in components_fragmented if str(component) != 'nan']

pfas_mass = quantification_blank.pivot_table(
    index=('Sample Index'), columns='Component Name', values='Calculated Concentration', aggfunc='first', dropna=False,
)
pfas_mass.rename(index=index_mapper, inplace=True)
pfas_mass = pfas_mass[pfas_components]

# write initial concentrations to excel file
with pd.ExcelWriter(processed_filepath_xlsx, engine='openpyxl') as writer:
    pfas_mass.to_excel(writer, sheet_name='Calculated Concentrations in ng')

cmap = plt.cm.YlGnBu(np.linspace(0, 1, len(pfas_components)))
fig, ax = plt.subplots()
pfas_mass.plot.bar(stacked=True, ax=ax, color=cmap)
plt.xlabel('Sample')
plt.ylabel('PFAS [ng/sample]')
plt.legend(ncol=3, loc='upper center', bbox_to_anchor=(0.5, 2.1))
fig.savefig(
    os.path.join(plot_directory, 'concentrations_iteration0.png'), bbox_inches='tight'
)
plt.show()

# save IPS area comparison plot to excel file
workbook = load_workbook(processed_filepath_xlsx)
plot_sheet = workbook.create_sheet('Details Concentrations')

img = Image(os.path.join(plot_directory, 'concentrations_iteration0.png'))

cell_position = plot_sheet.cell(row=1, column=1).coordinate
plot_sheet.add_image(img, cell_position)

workbook.save(processed_filepath_xlsx)

In the following code block an empty csv table with a list of samples will be created for you to input the sample volume/weight, the extracted volume and select the samples considered for your mdl calculation.\
In addition, an empty csv table with a list of extracted standards will be created for you to input the accepted range for the recovery rates.

**After the code was run for the first time, input all your parameters to the two files, save them and run the code again.**
Once the input files are created, they won't be changed by the script anymore.

In [None]:
if not pali.Path(sample_input_parameter_filepath).exists():
    sample_input_data = pd.DataFrame(columns=[
        'sample name', 'volume/weight', 'unit [ml or g]', 'extracted sample volume [ml]', 'used for mdl calculation'
        ],
        index=quantification_blank['Sample Index'].unique()
        )
    sample_input_data['sample name'] = [index_mapper[sample_index] for sample_index in sample_input_data.index]
    sample_input_data['used for mdl calculation'] = False
    sample_input_data['extracted sample volume [ml]'] = 0.5
    sample_input_data.to_csv(sample_input_parameter_filepath)
    del sample_input_data
    raise ImportError(
        f'The file {sample_input_parameter_filepath} was created for the first time.' + \
        'Adopt your sample quantities and units and select the samples you want to use for the evaluation of the method detection limit.'
    )

if not pali.Path(threshold_input_parameter_filepath).exists():
    threshold_input_data = pd.DataFrame(columns=[
        'lower threshold for recoveries [%]', 'upper threshold for recoveries [%]'
        ],
        index=[ida for ida in ida_ips_fragmented if 'IDA' in ida]
        )
    threshold_input_data['lower threshold for recoveries [%]'] = 50
    threshold_input_data['upper threshold for recoveries [%]'] = 150
    threshold_input_data.to_csv(threshold_input_parameter_filepath)
    del threshold_input_data


The following code block loads your input parameters and input thresholds. \
**Make sure you input all the information in the provided input files.**

In [None]:
sample_input_data = pd.read_csv(sample_input_parameter_filepath, index_col=[0])
recovery_thresholds = pd.read_csv(threshold_input_parameter_filepath, index_col=[0])
recovery_thresholds_dict = recovery_thresholds.to_dict()

In the following code block the area of IPS in the calibration data is compared to the area of IPS in the batch data.

**Doublecheck that the information you provided in the "Actual Concentration" column considering the amount of IPS and IDA you added to your samples is correct! This will affect your recovery rates**.

Note: the "Actual Concentration" column provides information on how much ng of a component you added per sample.\
Within the calibration 4 ng non-extracted standard (=IPS) are added per ml. The lab samples commonly have a volume of 0.5 ml.
If you pipetted 4 ng non-extracted standard (=IPS) to your samples, the IPS areas should be twice as large when compared to the calibration.

In [None]:
extracted_sample_volume = sample_input_data['extracted sample volume [ml]'].mean()

# Calculate IPS average area per compound in calibration data and plot it
calibration_only_ips = calibration_only[calibration_only['Component Name'].str.contains('IPS')]
quantification_blank_only_ips = quantification_blank[quantification_blank['Component Name'].str.contains('IPS')]

# evaluate mean per component
calibration_ips_area_averages = calibration_only_ips.groupby('Component Name')['Area'].mean()
quantification_blank_ips_area_averages = quantification_blank_only_ips.groupby('Component Name')['Area'].mean()

# evaluate IPS concentration in quantification
ips_concentration = quantification_blank_only_ips['Actual Concentration'].mean() / extracted_sample_volume

# create plot for comparison
image_path = os.path.join(plot_directory, 'ips_areas.png')
fig, ax = plt.subplots(ncols=2, figsize=(8, 8), sharey=True)
calibration_only_ips.boxplot(column='Area', by='Component Name', ax=ax[0])
ax[0].plot([np.nan] + calibration_ips_area_averages.to_list(), color='red', linestyle='', marker="o", label='calibration average')
quantification_blank_only_ips.boxplot(column='Area', by='Component Name', ax=ax[1])
ax[1].plot([np.nan] + quantification_blank_ips_area_averages.to_list(), color='red', linestyle='', marker="o", label='quantification average')
fig.suptitle('')
ax[0].set_title('Calibration: 4 ng/ml')
ax[1].set_title(f'Quantification: {ips_concentration * extracted_sample_volume} ng per {extracted_sample_volume} ml (?)')
ax[0].set_xticks(
    ticks=range(len(calibration_ips_area_averages) + 1),
    labels=[''] + calibration_ips_area_averages.index.to_list(), rotation=90
    )
ax[1].set_xticks(
    ticks=range(len(quantification_blank_ips_area_averages) + 1),
    labels=[''] + quantification_blank_ips_area_averages.index.to_list(), rotation=90
    )
ax[0].set_ylabel('IPS area')
[this_ax.set_xlabel('') for this_ax in ax]
plt.savefig(image_path, bbox_inches='tight')
plt.show()

# save IPS area comparison plot to excel file
workbook = load_workbook(processed_filepath_xlsx)
plot_sheet = workbook.create_sheet('Details IPS concentration')

img = Image(os.path.join(plot_directory, 'ips_areas.png'))

cell_position = plot_sheet.cell(row=1, column=1).coordinate
plot_sheet.add_image(img, cell_position)

workbook.save(processed_filepath_xlsx)

In the following code block the data (calibration not included) is separated in the data junk used for mdl calculation,
and the remaining data (where the PFAS are quantified).

In [None]:
indices_for_mdl_evaluation = sample_input_data[sample_input_data['used for mdl calculation']].index

if len(indices_for_mdl_evaluation) == 0:
    mdl_only = None
    quantification_only = quantification_blank
    print(f'Be careful, no samples have been collected for the MDL calculation.')
else:
    mdl_only = quantification_blank[quantification_blank['Sample Index'].isin(indices_for_mdl_evaluation)]
    quantification_only = quantification_blank[~quantification_blank['Sample Index'].isin(indices_for_mdl_evaluation)]

# Method Detection Limits
The following block computes method detection limits (MDL) based on average and standard deviation of selected samples (process blanks, etc.). \
The code uses instrument detection limits (IDL) for the PFAS compounds which could not detcted in the samples considerd for the evaluation of MDLs. \
For some comounds the IDL may not be included in the input files, then the default value is used.\

Moreover, a new column 'Below Detection Threshold' is introduces, which indicates all Calculated Concentration Values of PFAS quantification below the determined detection limits.

In [None]:
# Make empty dataframe if blanks for MDL calculations are not available
mdl = pd.DataFrame(index=pfas_components)
mdl['Mean Concentration'] = [np.nan] * len(mdl)
mdl['Std Concentration'] = [np.nan] * len(mdl)
mdl['MDL'] = [np.nan] * len(mdl)
mdl['IDL'] = [np.nan] * len(mdl)

if not mdl_only is None:
    blank_only_default = mdl_only[mdl_only['Component Name'].isin(pfas_components)]
    # create data frame with average and standard deviation values for MDL calculation and caluclate MDL
    mdl_mean = blank_only_default.groupby('Component Name')['Calculated Concentration'].mean()
    mdl_std = blank_only_default.groupby('Component Name')['Calculated Concentration'].std()
    for (index, value) in mdl_mean.items():
        mdl.loc[index, 'Mean Concentration'] = value
    for (index, value) in mdl_std.items():
        mdl.loc[index, 'Std Concentration'] = value
    mdl['MDL'] = mdl['Mean Concentration'] + 3 * mdl['Std Concentration']

# Load idl values from idl input file
idl = pd.read_csv(idl_filepath, index_col=0, low_memory=False, nrows=1)

# Write each iql value in new column of mdl dataframe
for row_index in mdl.index:
    if row_index in idl.columns:
        mdl.loc[row_index, 'IDL'] = idl[f'{row_index}'].to_list()[0]
    else:
        mdl.loc[row_index, 'IDL'] = idl_unknown
        print(f'No IDL available for {row_index}, default value of {idl_unknown} is used.')

mdl['Detection Threshold'] = mdl['MDL']
mdl['Detection Threshold'].fillna(mdl.IDL, inplace=True)

# write detection threshold to excel file
with pd.ExcelWriter(processed_filepath_xlsx, engine='openpyxl', mode='a') as writer:
    sample_input_data.to_excel(writer, sheet_name='Sample Input Data')
    mdl.to_excel(writer, sheet_name='Detection Threshold')

# Recovery Rates
To avoid misunderstandings, in the following two abbreviations are extensively used:
- IDA: isotope dilution analysis, also known as SS=surrogate standard or EIS=extracted internal standard
- IPS: isotope performance standard, also known as IS=injection standard or NIS=non-extracted internal standard

**The following two blocks calculate response factors from calibration data:**\
ratio of (i) calculated area of IDA * **actual concentration of IPS** and (ii) calculated area of IPS * actual concentration of IDA. \
The data is saved to an excel file and a boxplot of response factors is created.

Note: The response factor calculation within sciex uses the ratio of (i) calculated area of IDA and (ii) calculated area of IPS * actual concentration of IDA. \
As the concentration of IPS is missing in the calculation, the response factors deviate by a factor of 4, which is the actual concentration of IPS in the calibration data. \

Note: The column 'Actual Concentratione' provides information on ng of internal standard added per sample.

In [None]:
# define funtion which calculates the IDA IPS ratio.
# challenge - search right IPS row indicated in the Component Group Name of IDA.
def calculate_ida_ips_ratio(data: pd.DataFrame, column_name:str, ) -> pd.DataFrame:
    """Calculates IDA area times IPS concentration divided by IPS area times IDA concentration and save the results in the indicated column.

    :param data: Entire data junk (including all rows and the following columns:
    Component Name, Sample Index, Component Group Name, Actual Concentration, Area, and IDA Average Response Factor
    :type data: pd.DataFrame
    :param column_name: name of column, the calculated ratio should be saved to
    :type column_name: str
    :return: Data junk only containing IDA rows with the corresponding ratio saved to new column
    :rtype: pd.DataFrame
    """
    # select only ida rows from input data
    data_only_ida = data[data['Component Name'].str.contains('IDA')]
    # initialize new column names
    data_only_ida[[f'{column_name}', 'IPS Area', 'IPS Concentration']] = np.nan

    # calculate recovery rate for every component, end every sample
    for row_index in data_only_ida.index:
        sample_index = data_only_ida.loc[row_index, 'Sample Index']
        ips_channel_name = data_only_ida.loc[row_index, 'Component Group Name']

        sample_name = data_only_ida.loc[row_index, 'Sample Name']
        corresponding_ips_area_row = data[(
            (data['Sample Index'] == sample_index) &
            (data['Component Name'] == ips_channel_name) &
            (data['Sample Name'] == sample_name)
            )]
        data_only_ida.loc[row_index, 'IPS Area'] = corresponding_ips_area_row['Area'].iloc[0]
        data_only_ida.loc[row_index, 'IPS Concentration'] = corresponding_ips_area_row['Actual Concentration'].iloc[0]
        data_only_ida.loc[row_index, f'{column_name}'] = \
            (data_only_ida.loc[row_index, 'Area'] * corresponding_ips_area_row['Actual Concentration'].iloc[0]) \
            / (corresponding_ips_area_row['Area'].iloc[0] * data_only_ida.loc[row_index, 'Actual Concentration'])
    return data_only_ida

In [None]:
# Extract values of IDAs and IPS
# Save basic sample information as well as areas of intensity peak, actual concentration and Component Group Name.
# The 'Component Group Name' is useful to assoiciate the right IPS to each IDA.

# calucluate IDA IPS ratio to compute response factors with calibration data
calibration_only_ida = calculate_ida_ips_ratio(
    data=calibration_only, column_name='Response Factor Mean',
    )

# create data frame with this response factor calculation (from scratch), the standard deviation and the original values evaluated by Sciex,
response_factor = calibration_only_ida.groupby('Component Name', as_index=False)['Response Factor Mean'].mean()
response_factor['Response Factor Std'] = calibration_only_ida.groupby('Component Name')['Response Factor Mean'].std().to_list()
response_factor['Response Factor Sciex'] = calibration_only_ida.groupby('Component Name')['IDA Average Response Factor'].mean().to_list()
response_factor.index = response_factor['Component Name']
response_factor.drop(columns=['Component Name'], inplace=True)

# write response factor to excel file
with pd.ExcelWriter(processed_filepath_xlsx, engine='openpyxl', mode='a') as writer:
    response_factor.to_excel(writer, sheet_name='Response Factor')

# create and save response factor box plots
image_path = os.path.join(plot_directory, 'response_factors.png')
fig, ax = plt.subplots(figsize=(8, 8))
calibration_only_ida.boxplot(column='Response Factor Mean', by='Component Name', ax=ax,)
ax.plot([np.nan] + (response_factor['Response Factor Sciex'] * 4).to_list(), color='red', linestyle='', marker="o",)
ax.plot([np.nan] + (response_factor['Response Factor Sciex']).to_list(), color='blue', linestyle='', marker="o",)
fig.suptitle('')
ax.set_title('')
plt.ylabel('Response Factor (IDA area/IPS area)')
plt.xticks(rotation=90)
box_patch = mpatches.Patch(color='blue', fill=False, label='data')
blue_patch = mpatches.Patch(color='blue', label='RF Sciex')
red_patch = mpatches.Patch(color='red', label='4 * RF Sciex')
plt.legend(handles=[box_patch, red_patch, blue_patch])
plt.savefig(image_path, bbox_inches='tight')
plt.show()

# save response factor box plot to excel file
workbook = load_workbook(processed_filepath_xlsx)
plot_sheet = workbook.create_sheet('Details RF')

img = Image(os.path.join(plot_directory, 'response_factors.png'))

cell_position = plot_sheet.cell(row=1, column=1).coordinate
plot_sheet.add_image(img, cell_position)

workbook.save(processed_filepath_xlsx)

The following block calculates recovery rates for each IDA compound in each sample.
$$
recovery~rate = \frac{\frac{area_{IDA~sample}~\cdot~concentration_{IPS~sample}}{area_{IPS~sample}~\cdot~concentration_{IDA~sample}}}{average(\frac{area_{IDA~calibration}~\cdot~concentration_{IPS~calibration}}{area_{IPS~calibration}~\cdot~concentration_{IDA~calibration}})} = \frac{ratio}{response~factor}
$$

Recovery rate computed within the SCIEX software is not normalized by concentration. So the recovery rate calculated by SCIEX reads:
$$
recovery~rate = \frac{\frac{area_{IDA~sample}}{area_{IPS~sample}}}{average(\frac{area_{IDA~calibration}}{area_{IPS~calibration}})}
$$

The uncertainty of the recovery rate is indicated by a maximum error method - for now, you can just ignore this part...
$$
\Delta recovery~rate = \Delta response~factor \cdot \frac{ratio}{response~factor^{2}} + \Delta ratio \cdot \frac{1}{response~factor}
$$

In [None]:
# function to color excel according to threshold values / mask
def highlight(data, flag):
    '''Sets all data elements to red background, when flag is True. '''
    if data.ndim == 1:  # Series from .apply(axis=0) or axis=1
        return ['background-color: red' if v else '' for v in flag]
    else:  # from .apply(axis=None)
        return pd.DataFrame(np.where(flag, 'background-color: red', ''),
                            index=data.index, columns=data.columns)
    
# function to determine if recovery rates are within given limit
def flag_poor_recovery(data: pd.DataFrame, checked_column_name: str, new_column_name: str) -> tuple[pd.DataFrame, None]:
    """Checks if recovery rates are in defined threshold limits and appends check results as boolean column to data.
    Additionally formats table for recovery rate.

    :param data: Entire data junk including all rows and the following columns:
    'Component Name', 'Sample Index', f'{checked_column_name}'
    :type data: pd.DataFrame
    :param checked_column_name: name of column which is used to check if values are in the given limits.
    :type checked_column_name: str
    :param new_column_name: name of column, the check results should be saved to.
    :type new_column_name: str
    :return: Original data junk with the new column (ckeck results) appended + formated pivot table containing recovery rates.
    :rtype: tuple[pd.DataFrame, pd.DataFrame.style.Styler]
    """
    data[f'{new_column_name}'] = np.nan
    for component_ida in data['Component Name'].unique():
        lower_threshold_recovery = recovery_thresholds_dict['lower threshold for recoveries [%]'][component_ida]
        upper_threshold_recovery = recovery_thresholds_dict['upper threshold for recoveries [%]'][component_ida]
        data.loc[data['Component Name'] == component_ida, f'{new_column_name}'] = True
        data.loc[
            (data['Component Name'] == component_ida) &
            (data[f'{checked_column_name}'] > lower_threshold_recovery) &
            (data[f'{checked_column_name}'] < upper_threshold_recovery)
        , f'{new_column_name}'] = False

    recovery_pivot = data.pivot_table(
        index=('Sample Index',), columns='Component Name', values=f'{checked_column_name}', aggfunc='mean', dropna=False,
    )

    poor_recovery_pivot = data.pivot_table(
        index=('Sample Index',), columns='Component Name', values=f'{new_column_name}', aggfunc='first', dropna=False,
        )

    # color cells based on recovery values
    recovery_styled = recovery_pivot.style.apply(highlight, flag=poor_recovery_pivot, axis=None)

    return data, recovery_styled

def reindex_in_excel(filename: str, sheetname: str) -> None:
    """Function to rename index columns in excel."""

    # load excel file
    workbook = load_workbook(filename=filename)
    # open workbook
    sheet = workbook.active
    # get right sheet
    sheet = workbook[sheetname]
    # read in index column
    sample_index = [column.value for column in sheet['A']]
  
    # change index column values
    if sample_index[0] == 'Sample Index':
        for row_index in range(1, len(sample_index)):
            sheet.cell(row=row_index + 1, column=1).value = index_mapper[sample_index[row_index]]
    
    #save the file
    workbook.save(filename=filename)
    workbook.close()

In [None]:
# Select ida rows from quantification data and calculate ida ips ratio
# function is defined in previous block
quantification_ida = calculate_ida_ips_ratio(
    data=quantification_blank, column_name="IDA-IPS Ratio",
    )

recovery_table = pd.DataFrame(columns=[
    'Sample Index', 'Component Name', 'IDA-IPS Ratio', 'IDA-IPS Ratio Std',
    'Response Factor Mean', 'Response Factor Std', 'Recovery Rate', 'Recovery Rate Uncertainty',
    'Recovery Rate Sciex'
    ])

# delete IDA and IPS values from one method if 'core' or 'extended' is selected,
# and use the average of both methods in the alternative case.
index = 0

for sample_index in quantification_ida['Sample Index'].unique():
    for component_ida in quantification_ida['Component Name'].unique():
        selected_sample_ida = quantification_ida.loc[(
            (quantification_ida['Sample Index'] == sample_index) & 
            (quantification_ida['Component Name'] == component_ida)
        ), :]
        response_factor_mean = response_factor.loc[component_ida, 'Response Factor Mean']
        response_factor_std = response_factor.loc[component_ida, 'Response Factor Std']
        if len(selected_sample_ida.index) == 0:
            print(f'Component {component_ida} is not available for sample {sample_index}')
            ida_ips_ratio_mean = np.nan
            ida_ips_ratio_std = np.nan
            recovery_rate_sciex = np.nan
        elif len(selected_sample_ida.index) == 1:
            ida_ips_ratio_mean = selected_sample_ida['IDA-IPS Ratio'].values[0]
            recovery_rate_sciex = selected_sample_ida['Reported Recovery'].values[0]
            ida_ips_ratio_std = 0.1 * ida_ips_ratio_mean
        else:
            if recovery_data == 'average':
                ida_ips_ratio_mean = selected_sample_ida['IDA-IPS Ratio'].mean()
                ida_ips_ratio_std = selected_sample_ida['IDA-IPS Ratio'].std()
                recovery_rate_sciex = selected_sample_ida['Reported Recovery'].mean()
            elif recovery_data == 'core':
                ida_ips_ratio_mean = selected_sample_ida.loc[
                    selected_sample_ida['Sample Name'].str.contains('Core'), 'IDA-IPS Ratio'
                    ].values[0]
                ida_ips_ratio_std = 0.1 * ida_ips_ratio_mean
                recovery_rate_sciex = selected_sample_ida.loc[
                    selected_sample_ida['Sample Name'].str.contains('Core'), 'Reported Recovery'
                    ].values[0]
            elif recovery_data == 'extended':
                ida_ips_ratio_mean = selected_sample_ida.loc[
                    selected_sample_ida['Sample Name'].str.contains('Ext'), 'IDA-IPS Ratio'
                    ].values[0]
                ida_ips_ratio_std = 0.1 * ida_ips_ratio_mean
                recovery_rate_sciex = selected_sample_ida.loc[
                    selected_sample_ida['Sample Name'].str.contains('Ext'), 'Reported Recovery'
                    ].values[0]
            else:
                raise Exception("""
                Invalid input for variable recovery_data, use either 'core', 'extended', or 'average'.
                            """)

        recovery_rate = 100 * ida_ips_ratio_mean / response_factor_mean
        recovery_rate_uncertainty = 100 * (
            response_factor_std * ida_ips_ratio_mean / response_factor_mean ** 2 + \
                ida_ips_ratio_std / response_factor_mean
                )
        
        recovery_table.loc[index] = [
            sample_index, component_ida, ida_ips_ratio_mean, ida_ips_ratio_std,
            response_factor_mean, response_factor_std, recovery_rate, recovery_rate_uncertainty,
            recovery_rate_sciex
            ]
        index +=1

# check if recovery rate is within indicated limit for each ida and save check results to 'Poor Recovery column
recovery_table, recovery_styled = flag_poor_recovery(data=recovery_table, checked_column_name='Recovery Rate', new_column_name='Poor Recovery')
recovery_table, recovery_sciex_styled = flag_poor_recovery(data=recovery_table, checked_column_name='Recovery Rate Sciex', new_column_name='Poor Recovery Sciex')

# Write recovery rate to excel file
with pd.ExcelWriter(processed_filepath_xlsx, engine='openpyxl', mode='a') as writer:
    recovery_thresholds.to_excel(writer, sheet_name='Recovery Thresholds')
    recovery_styled.to_excel(writer, sheet_name='Recovery Rate')
    recovery_sciex_styled.to_excel(writer, sheet_name='Sciex Recovery Rate')
    
reindex_in_excel(filename=processed_filepath_xlsx, sheetname='Recovery Rate')
reindex_in_excel(filename=processed_filepath_xlsx, sheetname='Sciex Recovery Rate')

The following code cells generate plots of recovery rates over all samples.

In [None]:
# Box plot for recovery rates
image_path = os.path.join(plot_directory, 'recovery_rates_box.png')
fig, ax = plt.subplots(figsize=(8, 8))
recovery_table.boxplot(column='Recovery Rate', by='Component Name', ax=ax,)
ax.set_ylim([0,500])
fig.suptitle('')
ax.set_title('')
plt.ylabel('Recovery Rate')
plt.xticks(rotation=90)
plt.legend()
plt.savefig(image_path, bbox_inches='tight')
plt.show()

# plot data as points for recovery rates:
plot_data = recovery_table.groupby('Sample Index')  # group data for plotting
cmap = plt.cm.get_cmap('tab20', len(plot_data)) # initialize colours
image_path = os.path.join(plot_directory, 'recovery_rates.png')  # set path for figure

fig, ax = plt.subplots(figsize=(8, 8))
for index, (title, group) in enumerate(plot_data):
    group.set_index(group['Component Name'], inplace=True)
    group.sort_index(inplace=True)
    group.drop_duplicates(keep='first', inplace=True)
    group.plot(
        y='Recovery Rate', ax=ax, marker='.', linestyle='None', label=index_mapper[title],
        grid=True, color = cmap(index),
    )
ax.set_ylim([0,500])
fig.suptitle('')
ax.set_xticks(range(len(group)))
ax.set_xticklabels(group['Component Name'], rotation=90)
ax.set_title('')
plt.xticks(rotation=90)
plt.ylabel('Recovery Rate')
plt.legend(loc='center right', bbox_to_anchor=(1.4, 0.5))
plt.savefig(image_path, bbox_inches='tight')
plt.show()

# save recovery rate plot to excel file
workbook = load_workbook(processed_filepath_xlsx)
plot_sheet = workbook.create_sheet('Details Recovery Rates')

img1 = Image(os.path.join(plot_directory, 'recovery_rates_box.png'))
img1.anchor = 'A1'
plot_sheet.column_dimensions['A'].width = img1.width / 6
plot_sheet.row_dimensions[1].height = img.height
plot_sheet.add_image(img1)

img2 = Image(os.path.join(plot_directory, 'recovery_rates.png'))
img2.anchor = 'B1'
plot_sheet.column_dimensions['B'].width = img2.width / 6
plot_sheet.add_image(img2)

workbook.save(processed_filepath_xlsx)

# Mass Channel Ratios
The following block evaluates ratio of calculated concentration from default channel and _TOF MS channel for all PFAS components and standards and all samples.

Moreover a new column 'Channel Ratio . x' is intrudoced, which indicates all calculated concentration values which deviate more than x % between channels.

x is the variable 'allowed_channel_deviation' you set in the third code block on top.

In [None]:
# Assign TOF channel to each PFAS and save calculated concentration to new column

# select columns
selected_columns = [
    'Sample Name', 'Sample Index', 'Acquisition Date & Time', 'Component Name', 'Calculated Concentration',
    ]
mass_channel_data = quantification_blank[selected_columns]

# select only channel names of fragmented masses for descriptive data of channel ratios
mass_channel_data = quantification_blank[quantification_blank['Component Name'].isin(components_fragmented)]
# initialize calculated concentration TOF column in descriptive data of channel ratios
mass_channel_data['Calculated Concentration TOF'] = np.nan
for row_index in mass_channel_data.index:
    # get sample index and name of corresponding tof channel
    sample_index = mass_channel_data.loc[row_index, 'Sample Index']
    tof_channel_index = components_fragmented.index(mass_channel_data.loc[row_index, 'Component Name'])
    tof_channel_name = components_precursor[tof_channel_index]
    # select data of corresponding tof channel
    tof_channel_row = quantification_blank[(
        (quantification_blank['Component Name'] == tof_channel_name) &
        (quantification_blank['Sample Index'] == sample_index)
    )]
    # right tof channel concentration to new data frame
    mass_channel_data.loc[row_index, 'Calculated Concentration TOF'] = \
        tof_channel_row.loc[:,'Calculated Concentration'].to_list()[0]

# Calculate percentage deviation of the channels
mass_channel_data['Channel Ratio'] = (
    200 * (mass_channel_data['Calculated Concentration'] - mass_channel_data['Calculated Concentration TOF']) \
        / (mass_channel_data['Calculated Concentration'] + mass_channel_data['Calculated Concentration TOF'])
).round(decimals=1)

# Introduce new column where everything below method detection limit is marked
mass_channel_data[f'Channel Ratio > {allowed_channel_deviation}'] = abs(mass_channel_data['Channel Ratio']) > allowed_channel_deviation

# Put channel ratio in pivot table
channel_ratio = mass_channel_data.pivot_table(
    index=('Sample Index',), columns='Component Name', values='Channel Ratio', aggfunc='mean', dropna=False,
)[pfas_components]
channel_ratio_flag = mass_channel_data.pivot_table(
    index=('Sample Index',), columns='Component Name', values=f'Channel Ratio > {allowed_channel_deviation}', aggfunc='first', dropna=False,
)[pfas_components]

# color cells based on threshold values
channel_ratio_styled = channel_ratio.style.apply(highlight, flag=channel_ratio_flag, axis=None).relabel_index(
    labels=[index_mapper[sample_index] for sample_index in channel_ratio.index], axis=0, level=0,
)

# Write to excel file
with pd.ExcelWriter(processed_filepath_xlsx, engine='openpyxl', mode='a') as writer:
    channel_ratio_styled.to_excel(writer, sheet_name='Channel Ratio')
reindex_in_excel(filename=processed_filepath_xlsx, sheetname='Channel Ratio')

The following block evaluates ratio of areas from default channel and _TOF MS channel for all standards.

In [None]:
# select columns
selected_columns = [
    'Sample Name', 'Sample Index', 'Acquisition Date & Time', 'Component Name', 'Area',
    ]
quantification_standards = quantification_blank[selected_columns]

# Get only default channels of standards
quantification_standards_default = quantification_standards[quantification_standards['Component Name'].isin(ida_ips_fragmented)]

# initialize respective TOF concentration channel and get respective values
quantification_standards_default['Area TOF'] = np.nan
for row_index in quantification_standards_default.index:
    sample_name = quantification_standards_default.loc[row_index, 'Sample Name']
    tof_channel_index = ida_ips_fragmented.index(quantification_standards_default.loc[row_index, 'Component Name'])
    tof_channel_name = ida_ips_precursor[tof_channel_index]
    tof_channel_row = quantification_standards[(
        (quantification_standards['Component Name'] == tof_channel_name) &
        (quantification_standards['Sample Name'] == sample_name)
    )]
    if tof_channel_row.empty:
        quantification_standards_default.loc[row_index, 'Area TOF'] = np.nan
    else:
        quantification_standards_default.loc[row_index, 'Area TOF'] = \
            tof_channel_row.loc[:,'Area'].values[0]

# Calculate percentage deviation of the channels
quantification_standards_default['Channel Ratio'] = (
    200 * (quantification_standards_default['Area'] - quantification_standards_default['Area TOF']) \
        / (quantification_standards_default['Area'] + quantification_standards_default['Area TOF'])
).round(decimals=1)

# Introduce new column where everything below method detection limit is marked
quantification_standards_default[f'Channel Ratio > {allowed_channel_deviation}'] = abs(quantification_standards_default['Channel Ratio']) > allowed_channel_deviation

# Put channel ratio in pivot table
channel_ratio = quantification_standards_default.pivot_table(
    index=('Sample Name',), columns='Component Name', values='Channel Ratio', aggfunc='first', dropna=False,
)[ida_ips_fragmented]
channel_ratio_flag = quantification_standards_default.pivot_table(
    index=('Sample Name',), columns='Component Name', values=f'Channel Ratio > {allowed_channel_deviation}', aggfunc='first', dropna=False,
)[ida_ips_fragmented]

# color cells based on threshold values
channel_ratio_styled = channel_ratio.style.apply(highlight, flag=channel_ratio_flag, axis=None)

# Write to excel file
with pd.ExcelWriter(processed_filepath_xlsx, engine='openpyxl', mode='a') as writer:
    channel_ratio_styled.to_excel(writer, sheet_name='Standards Channel Ratio')

# Ouputs and data merging

In the following code block all relevant data and parameters are combined in one common table quantification_pfas_default. The table has one row for each PFAS component in each sample: \
(1) The right recovery rate is assigned to each PFAS. Each PFAS component has a corresponding IDA - the recovery rate is deduced from the recovery rate of the corresponding IDA. The assignment works for each sample by using the 'IS Name' column which provides information on which IDA standard is associated to which PFAS.\
(2) detection thresholds are appended to the table as well as a boolean column indicating if the detected values are below the threshold. \
(3) basic information about the sample are also extracted from the input table and included \
(4) mass channel ratios are assigned to each PFAS

In [None]:
# Initialize data frame for following assignments, select only needed columns and use reasonable naming.
selected_columns = [
    'Sample Name', 'Sample Index', 'Acquisition Date & Time', 'Component Name', 'Calculated Concentration',
    'Area', 'IS Name',
    ]
quantification_pfas_default = quantification_only[selected_columns]
quantification_pfas_default.rename(columns={'IS Name': 'IDA Name'}, inplace=True)

# Get only PFAS default channels
quantification_pfas_default = quantification_pfas_default[
    quantification_pfas_default['Component Name'].isin(pfas_components)
    ]

# Assign right recovery rate and uncertainty to each PFAS compound
# initialize new columns
quantification_pfas_default[[
    'IDA Area', 'IDA Concentration', 'IPS Name', 'IPS Area', 'IPS Concentration',
    'Recovery Rate', 'Recovery Rate Uncertainty', 'Poor Recovery', 'Recovery Rate Sciex', 
    'Detection Threshold', 'Sample Quantity', 'Sample Unit',
    'Channel Ratio', 'Channel Ratio > {allowed_channel_deviation}',
]] = np.nan
quantification_pfas_default['Below Detection Threshold'] = False

for row_index in quantification_pfas_default.index:
    sample_index = quantification_pfas_default.loc[row_index, 'Sample Index']
    sample_name = quantification_pfas_default.loc[row_index, 'Sample Name']
    ida_channel_name = quantification_pfas_default.loc[row_index, 'IDA Name']
    component_name = quantification_pfas_default.loc[row_index, 'Component Name']

    recovery_row = recovery_table[(
        (recovery_table['Component Name'] == ida_channel_name) &
        (recovery_table['Sample Index'] == sample_index)
    )]

    ida_row = quantification_ida[(
            (quantification_ida['Component Name'] == ida_channel_name) &
            (quantification_ida['Sample Index'] == sample_index) &
            (quantification_ida['Sample Name'] == sample_name)
    )]

    channel_row = mass_channel_data[(
            (mass_channel_data['Component Name'] == component_name) &
            (mass_channel_data['Sample Index'] == sample_index)
    )]

    detection_threshold = mdl.loc[component_name, 'Detection Threshold']
    
    if ida_channel_name in quantification_ida['Component Name'].to_list():
        quantification_pfas_default.loc[row_index,'IDA Area'] = ida_row.loc[:,'Area'].values[0]
        quantification_pfas_default.loc[row_index,'IDA Concentration'] = ida_row.loc[:,'Actual Concentration'].values[0]
        quantification_pfas_default.loc[row_index,'IPS Area'] = ida_row.loc[:,'IPS Area'].values[0]
        quantification_pfas_default.loc[row_index,'IPS Concentration'] = ida_row.loc[:,'IPS Concentration'].values[0]
        quantification_pfas_default.loc[row_index, 'IPS Name'] = \
            ida_row.loc[:,'Component Group Name'].values[0]
        quantification_pfas_default.loc[row_index, 'Recovery Rate'] = \
            recovery_row.loc[:,'Recovery Rate'].values[0]
        quantification_pfas_default.loc[row_index, 'Recovery Rate Uncertainty'] = \
            recovery_row.loc[:,'Recovery Rate Uncertainty'].values[0]
        quantification_pfas_default.loc[row_index, 'Poor Recovery'] = \
            recovery_row.loc[:,'Poor Recovery'].values[0]
        quantification_pfas_default.loc[row_index, 'Recovery Rate Sciex'] = \
            recovery_row.loc[:,'Recovery Rate Sciex'].values[0]

    quantification_pfas_default.loc[row_index, 'Detection Threshold'] = detection_threshold
    if not np.isnan(quantification_pfas_default.loc[row_index, 'Calculated Concentration']):
        if quantification_pfas_default.loc[row_index, 'Calculated Concentration'] < detection_threshold:
            quantification_pfas_default.loc[row_index, 'Below Detection Threshold'] = True
    quantification_pfas_default.loc[row_index, 'Sample Quantity'] = \
        sample_input_data.loc[sample_index, 'volume/weight']
    quantification_pfas_default.loc[row_index, 'Sample Unit'] = \
        sample_input_data.loc[sample_index, 'unit [ml or g]']

    quantification_pfas_default.loc[row_index, 'Channel Ratio'] = \
        channel_row.loc[:, 'Channel Ratio'].values[0]
    quantification_pfas_default.loc[row_index, f'Channel Ratio > {allowed_channel_deviation}'] = \
        channel_row.loc[:, f'Channel Ratio > {allowed_channel_deviation}'].values[0]

for column in ['Recovery Rate', 'Recovery Rate Uncertainty', 'Recovery Rate Sciex']:
    quantification_pfas_default[f'{column}'] = quantification_pfas_default[f'{column}'].round(decimals=1)

# Put computed recovery rate in pivot table
recovery_extended_pivot = quantification_pfas_default.pivot_table(
    index=('Sample Index',), columns='Component Name', values='Recovery Rate', aggfunc='mean', dropna=False,
)[pfas_components]
poor_recovery_extended_pivot = quantification_pfas_default.pivot_table(
    index=('Sample Index',), columns='Component Name', values=f'Poor Recovery', aggfunc='first', dropna=False,
)[pfas_components]

# color cells based on recovery values
recovery_extended_styled = recovery_extended_pivot.style.apply(
    highlight, flag=poor_recovery_extended_pivot, axis=None
    ).relabel_index(
        labels=[index_mapper[sample_index] for sample_index in recovery_extended_pivot.index],
        axis=0, level=0,
    )

# Write to excel file
with pd.ExcelWriter(processed_filepath_xlsx, engine='openpyxl', mode='a') as writer:
    recovery_extended_styled.to_excel(writer, sheet_name='Recovery Rate Extended')
reindex_in_excel(filename=processed_filepath_xlsx, sheetname='Recovery Rate Extended')

# Outputs
The following code block writes final concentration table with all information to excel. It uses concentration values and indicates all values below detection threshold with '< MDL', all channel deviation above x % with '> CR' and all recovery rates below or above the indicated threshold values with 'Poor Recovery'.
In addition, concentration values are converted to ng\g or ng\l respectively.

In [None]:
def flag_values(data: pd.DataFrame, column: str) -> pd.DataFrame:
    """Flags column values (most probably concentrations) with channel ratio, recovery rates and MDLs.

    :param data: data frame containing columns 'Channel Ratio', 'Poor Recovery', and 'Below Detection Threshold',
    as well as the column you indicated.
    :type data: pd.DataFrame
    :param column: Colun name of data frame to be filtered or flagged.
    :type column: str
    :return: Data frame, where the column data is flagged.
    :rtype: pd.DataFrame
    """
    final_table = data[['Sample Name', 'Sample Index', 'Component Name', column]]
    final_table.loc[final_table.index[quantification_pfas_default[f'Channel Ratio > {allowed_channel_deviation}']], column] = '> CR'
    final_table.loc[final_table.index[quantification_pfas_default['Poor Recovery']], column] = 'Poor Recovery'
    final_table.loc[final_table.index[quantification_pfas_default['Below Detection Threshold']], column] = '< MDL'
    return final_table

# Transform concentration to ng/g or ng/l, depending on your sample_unit
quantification_pfas_default[f'Concentration in ng per Unit'] = quantification_pfas_default['Calculated Concentration'] \
    / quantification_pfas_default['Sample Quantity']

# Flag concentration values with channel ratio, recovery rates and detection threshold.
calculated_concentration = flag_values(data=quantification_pfas_default, column='Calculated Concentration')
calculated_concentration_II = flag_values(data=quantification_pfas_default, column=f'Concentration in ng per Unit')

# Pivot concentration tables.
calculated_concentration = calculated_concentration.pivot_table(
    index=('Sample Index',), columns='Component Name', values='Calculated Concentration', aggfunc='first', dropna=False,
)
calculated_concentration.rename(index=index_mapper, inplace=True)
calculated_concentration = calculated_concentration[pfas_components]

calculated_concentration_II = calculated_concentration_II.pivot_table(
    index=('Sample Index',), columns='Component Name', values=f'Concentration in ng per Unit', aggfunc='first', dropna=False,
)
calculated_concentration_II.rename(index=index_mapper, inplace=True)
calculated_concentration_II = calculated_concentration_II[pfas_components]

# Write pivot tables to existing excel file
with pd.ExcelWriter(processed_filepath_xlsx, engine='openpyxl', mode='a') as writer:
    calculated_concentration.to_excel(writer, sheet_name='Concentration Table')
    calculated_concentration_II.to_excel(writer, sheet_name=f'Concentration (ng per Unit)')

# Write long format data to csv
quantification_pfas_default = quantification_pfas_default[[
    'Sample Name', 'Sample Index', 'Acquisition Date & Time','Component Name',
    'Area', 'Calculated Concentration', 'Concentration in ng per Unit', 'Sample Quantity', 'Sample Unit',
    'IDA Name', 'IDA Area', 'IDA Concentration', 'IPS Name', 'IPS Area', 'IPS Concentration',
    'Recovery Rate', 'Recovery Rate Uncertainty', 'Recovery Rate Sciex', 'Poor Recovery',
    'Detection Threshold', 'Below Detection Threshold',
    'Channel Ratio', f'Channel Ratio > {allowed_channel_deviation}', 
]]
quantification_pfas_default.sort_values(by=['Sample Index', 'Component Name'], inplace=True)

quantification_pfas_default.to_csv(processed_filepath_csv)

# Print output
display(calculated_concentration)