In [1]:
import numpy as np
import os, re, glob
import pandas as pd
from PyPDF2 import PdfReader
top = os.getcwd()
data_dir = os.path.join(top, 'data') # Specify data directoy here


# Parse pdf File into csv

- extract data from pdf file into individual csv file in each folder run

In [2]:
def parse_table(text):
    """
    Parse table from GC report pdf
    
    Parameters
    ----------
    text: string
        string from reading report
    """

    if "No peaks found" in text: 
        return pd.DataFrame(columns=["#", "RetTime [min]", "Type", "Width [min]", "Area [pA*s]", "Height [pA]", "Area %"])

    pattern = r"----\|-------\|----\|-------\|----------\|----------\|--------\|\n(.*?)Totals"
    match = re.search(pattern, text, re.DOTALL)
    if not match:
        return pd.DataFrame(columns=["#", "RetTime [min]", "Type", "Width [min]", "Area [pA*s]", "Height [pA]", "Area %"])

    table = match.group(1).strip() 

    lines = [line.strip() for line in table.strip().splitlines() if line.strip()] 

    data = []
    for line in lines:
        parts = re.split(r"\s+", line)
        if len(parts) == 7:
            data.append({
                "#": int(parts[0]),
                "RetTime [min]": float(parts[1]),
                "Type": parts[2],
                "Width [min]": float(parts[3]),
                "Area [pA*s]": float(parts[4].replace("e", "E")),
                "Height [pA]": float(parts[5]),
                "Area %": float(parts[6])
            })
        elif len(parts) == 8:
            data.append({
                "#": int(parts[0]),
                "RetTime [min]": float(parts[1]),
                "Type": parts[2] + parts[3],
                "Width [min]": float(parts[4]),
                "Area [pA*s]": float(parts[5].replace("e", "E")),
                "Height [pA]": float(parts[6]),
                "Area %": float(parts[7])
            })

    return pd.DataFrame(data)

- pdf_unmatch lists all data pdf files in the wrong data folder or wrong file name
- seq_unmatch lists all data pdf files that has a wrong sequence (sanity check)
- the number ouputted by the cell below shows how many files failed to parse

In [3]:
pdf_unmatch = []
seq_unmatch = []
errs = []

for path in glob.glob(os.path.join(data_dir, '*')):
    if os.path.basename(path)[:2] == 'F-':
        try:
            sequence = int(os.path.basename(path)[2:5])
            name = os.path.basename(path)[9:]
            for pdf in glob.glob(os.path.join(path, '*.PDF')):
                reader = PdfReader(pdf)
                possible_pages = [1, 2]   

                text = ""
                for p in possible_pages:
                    if p < len(reader.pages):
                        t = reader.pages[p].extract_text() or ""
                        if "Seq. Line" in t: 
                            text = t
                            break

                # text = reader.pages[1, 2].extract_text() # might need to change page numeber if the data is on a different page
                match = re.search(r"Seq\. Line\s*:\s*(\d+)", text)
                if not match:
                    pdf_unmatch.append(pdf) 
                elif not sequence == int(match.group(1)):
                    seq_unmatch.append(pdf) 
                
                
                df = parse_table(text)
                df.set_index("#", inplace=True)
                data = os.path.join(path, 'data.csv')
                df.to_csv(data)

        except Exception as e:
            errs.append((sequence, e))

print(pdf_unmatch)
print(seq_unmatch)
print(len(errs))

[]
[]
0


# Sample Peak Area Retention Time Collection

- create a complete peak area dataframe for each solution
    - each row is a sample injection
    - two columns for each unique peak retention times: 
        1. peak area for common retention time (3 decimal truncation) 
        2. actual retention time value
    - peaks with retention times 0.1 apart are merged

## Collect Unique RT

In [4]:
unique_RT = set()

for path in glob.glob(os.path.join(data_dir, '*')):
    if os.path.basename(path)[:2] == 'F-':
        id = os.path.basename(path)

        data = os.path.join(path, 'data.csv')
        # data = os.path.join(path, 'REPORT01.CSV')
        df_iter = pd.read_csv(data)
        
        for index, row in df_iter.iterrows():
            RT_trunc = int(row['RetTime [min]'] * 10) / 10
            unique_RT.add(RT_trunc)
            # print(f"retention time: {row['RetTime [min]']}; peak area: {row['Area [pA*s]']}")
        
unique_RT = sorted(unique_RT)
print(len(unique_RT))
print(unique_RT)

columns = []
for rt in unique_RT:
    columns.append(f"{rt}_Peak_Area")
    columns.append(f"{rt}_actual-RT")

df = pd.DataFrame(columns=columns)
df.insert(0, 'name', None)
df


23
[5.5, 5.6, 5.9, 6.3, 6.4, 6.5, 7.6, 7.7, 8.1, 8.2, 8.8, 8.9, 11.1, 11.2, 13.7, 13.8, 14.1, 16.0, 16.1, 18.7, 19.5, 19.7, 19.8]


Unnamed: 0,name,5.5_Peak_Area,5.5_actual-RT,5.6_Peak_Area,5.6_actual-RT,5.9_Peak_Area,5.9_actual-RT,6.3_Peak_Area,6.3_actual-RT,6.4_Peak_Area,...,16.1_Peak_Area,16.1_actual-RT,18.7_Peak_Area,18.7_actual-RT,19.5_Peak_Area,19.5_actual-RT,19.7_Peak_Area,19.7_actual-RT,19.8_Peak_Area,19.8_actual-RT


# Merge Retention Times that are 0.1 apart
- These are the same peak but artificially divided by truncating the decimal point

In [5]:
def merge(df):
    """
    Merge Columns with Retention Time difference of 0.1
    
    Parameters
    ----------
    df: pd.DataFrame
        dataframe of peak area anc actual retention times 
    """
    pattern = r'\d+\.\d+'
    RTs = set()
    for col in df.columns:
        match = re.findall(pattern, col)
        for m in match:
            RTs.add(float(m))

    RTs = sorted(RTs)
    pairs = []
    triples = []

    for i in range(len(RTs) - 1): # detect pairs
        if round(RTs[i+1] - RTs[i], 1) == 0.1:
            pairs.append([RTs[i], RTs[i+1]])


    i = 0
    while i < len(pairs) - 1: # detect triples
        a1, a2 = pairs[i]
        b1, b2 = pairs[i+1]

        if a2 == b1:
            triples.append([a1, a2, b2])
            i += 2
        else:
            i += 1

    pairs_used = set()
    for t in triples:
        pairs_used.add((t[0], t[1]))
        pairs_used.add((t[1], t[2]))

    pairs = [p for p in pairs if tuple(p) not in pairs_used]
    
    print(pairs)
    print(triples)

    if len(pairs) > 0: # collapse pairs
        for pair in pairs:
            col0_lower = f"{pair[0]}_Peak_Area"
            col0_higher = f"{pair[1]}_Peak_Area"
            col1_lower = f"{pair[0]}_actual-RT"
            col1_higher = f"{pair[1]}_actual-RT"

            low_n = df[col0_lower].count()
            high_n = df[col0_higher].count()

            if low_n > high_n:
                for index, row in df.iterrows():
                    if pd.isna(df.loc[index, col0_lower]):
                        df.loc[index, col0_lower] = df.loc[index, col0_higher]
                        df.loc[index, col1_lower] = df.loc[index, col1_higher]
                df = df.drop(columns=[col0_higher, col1_higher])
            else:
                for index, row in df.iterrows():
                    if pd.isna(df.loc[index, col0_higher]):
                        df.loc[index, col0_higher] = df.loc[index, col0_lower]
                        df.loc[index, col1_higher] = df.loc[index, col1_lower]
                df = df.drop(columns=[col0_lower, col1_lower])

    if len(triples) > 0: # collapse triples
        for triple in triples:
            low, mid, high = triple

            col0 = {
                "low": f"{low}_Peak_Area",
                "mid": f"{mid}_Peak_Area",
                "high": f"{high}_Peak_Area"
            }
            col1 = {
                "low": f"{low}_actual-RT",
                "mid": f"{mid}_actual-RT",
                "high": f"{high}_actual-RT"
            }
            counts = {
                "low": df[col0["low"]].count(),
                "mid": df[col0["mid"]].count(),
                "high": df[col0["high"]].count()
            }

            primary = "mid"
            secondaries = ["low", "high"]

            for idx, row in df.iterrows():
                if pd.isna(df.loc[idx, col0[primary]]):
                    if not pd.isna(df.loc[idx, col0[secondaries[0]]]):
                        df.loc[idx, col0[primary]] = df.loc[idx, col0[secondaries[0]]]
                        df.loc[idx, col1[primary]] = df.loc[idx, col1[secondaries[0]]]
                    elif not pd.isna(df.loc[idx, col0[secondaries[1]]]):
                        df.loc[idx, col0[primary]] = df.loc[idx, col0[secondaries[1]]]
                        df.loc[idx, col1[primary]] = df.loc[idx, col1[secondaries[1]]]
            df = df.drop(
                columns=[
                    col0[secondaries[0]], col1[secondaries[0]],
                    col0[secondaries[1]], col1[secondaries[1]]
                ]
            )
    return df
            

## Collect all sample into total.csv in peak-area/

In [6]:
for path in glob.glob(os.path.join(data_dir, '*')):
    if os.path.basename(path)[:2] == 'F-':
        id = os.path.basename(path)
        name = os.path.basename(path)[9:]

        data = os.path.join(path, 'data.csv')
        df_iter = pd.read_csv(data)
        
        row_insert = {}
        for index, row in df_iter.iterrows():
            RT_trunc = int(row['RetTime [min]'] * 10) / 10
            row_insert[f"{RT_trunc}_Peak_Area"] = row['Area [pA*s]']
            row_insert[f"{RT_trunc}_actual-RT"] = row['RetTime [min]']
            row_insert['name'] = name
        
        df.loc[id] = row_insert

peakdir = os.path.join(top, 'peak-area')
if os.path.exists(peakdir) == False:
    os.mkdir(peakdir)
df = merge(df)
df.to_csv(os.path.join(peakdir, 'total.csv'))


[[5.5, 5.6], [7.6, 7.7], [8.1, 8.2], [8.8, 8.9], [11.1, 11.2], [13.7, 13.8], [16.0, 16.1], [19.7, 19.8]]
[[6.3, 6.4, 6.5]]


## Isolate each Samples

In [7]:
names = ['CL_1.D', 'CL_2.D', 'CL_3.D', 'CL_4.D', 'CL_whiskey_08.D', 'CL_ROH_07.D', 'CL_Lismore.D', 'CL_Clarlyle.D', 'CL_Evan.D'] # change solution names to whatever

for name in names:
    df_name = df[df['name'] == name]
    df_name = df_name.dropna(axis=1, how='all')
    base, ext = os.path.splitext(name)
    df_name.to_csv(os.path.join(peakdir, f'{base}.csv'))

# Compute Mean and Standard Deviations

- Create a new DataFrame with mean and standard deviations and confidence intervals
    - Rows are samples
    - Columns are average peak area, average standard deviations and average retention times for each RTs
- saved in peak_area.csv

In [8]:
peakdir = os.path.join(top, 'peak-area')

df_total = pd.read_csv(os.path.join(peakdir, 'total.csv'))
pattern = r'\d+\.\d+'
RTs = set()
for col in df_total.columns:
    match = re.findall(pattern, col)
    for m in match:
        RTs.add(float(m))

RTs = sorted(RTs)
print(RTs)

columns = []
for rt in RTs:
    columns.append(f"{rt}_n_samples")
    columns.append(f"{rt}_avg_Area")
    columns.append(f"{rt}_std_Area")
    columns.append(f"{rt}_avg_RT")

df = pd.DataFrame(columns=columns)
df.insert(0, 'n_samples', None)

pattern = r'(\d+\.\d+)_(Peak_Area|actual-RT)'

for name in names:
    base, ext = os.path.splitext(name)
    csv = os.path.join(peakdir, f'{base}.csv')
    id = os.path.basename(csv)
    base, ext = os.path.splitext(id)
    id = base

    row_insert = {}

    df_iter = pd.read_csv(csv)
    row_insert['n_samples'] = len(df_iter)

    for col in df_iter.columns:
        match = re.match(pattern, col)
        if match:
            rt = float(match.group(1))
            col_type = match.group(2)

            row_list = df_iter[col].dropna().to_numpy()
            if col_type == 'Peak_Area':
                row_insert[f"{rt}_n_samples"] = len(row_list)
                row_insert[f"{rt}_avg_Area"] = np.mean(row_list)
                row_insert[f"{rt}_std_Area"] = np.std(row_list)
            if col_type == 'actual-RT':
                row_insert[f"{rt}_avg_RT"] = np.mean(row_list)
    df.loc[id] = row_insert

df.to_csv(os.path.join(peakdir, 'peak_area.csv'))
df

[5.6, 5.9, 6.4, 7.6, 8.1, 8.9, 11.1, 13.8, 14.1, 16.0, 18.7, 19.5, 19.7]


Unnamed: 0,n_samples,5.6_n_samples,5.6_avg_Area,5.6_std_Area,5.6_avg_RT,5.9_n_samples,5.9_avg_Area,5.9_std_Area,5.9_avg_RT,6.4_n_samples,...,18.7_std_Area,18.7_avg_RT,19.5_n_samples,19.5_avg_Area,19.5_std_Area,19.5_avg_RT,19.7_n_samples,19.7_avg_Area,19.7_std_Area,19.7_avg_RT
CL_1,10,10.0,2.722588,0.436106,5.6264,,,,,,...,,,,,,,10,257.989241,7.098318,19.792
CL_2,10,10.0,3.468092,0.189681,5.6106,,,,,10.0,...,,,,,,,10,526.351981,8.239846,19.7933
CL_3,10,10.0,2.980861,0.198321,5.6106,,,,,,...,,,,,,,10,4.900047,0.181932,19.8175
CL_4,10,10.0,3.732605,0.21164,5.6058,,,,,10.0,...,,,,,,,10,60.634962,1.624316,19.787
CL_whiskey_08,9,,,,,,,,,,...,,,,,,,9,7.834658,0.126155,19.785111
CL_ROH_07,9,,,,,,,,,9.0,...,0.0,18.785,9.0,34.402772,0.6825,19.573222,9,14.742714,0.234583,19.783667
CL_Lismore,9,,,,,1.0,3.84904,0.0,5.937,8.0,...,,,9.0,37.887647,0.264937,19.576,9,18.654211,0.283424,19.785222
CL_Clarlyle,9,,,,,,,,,5.0,...,,,9.0,11.188203,0.063288,19.583222,9,5.867513,0.161361,19.793111
CL_Evan,3,,,,,,,,,3.0,...,,,3.0,42.507867,0.030428,19.561333,3,13.58524,0.127081,19.773667


## Seperate Standards and Samples

In [9]:
standards = ['CL_1', 'CL_2', 'CL_3', 'CL_4']
samples = ['CL_whiskey_08', 'CL_ROH_07', 'CL_Lismore', 'CL_Clarlyle', 'CL_Evan']

df_standard = df.loc[standards]
df_samples = df.loc[samples]

df_standard = df_standard.dropna(axis=1, how='all')
df_samples = df_samples.dropna(axis=1, how='all')

df_standard.to_csv(os.path.join(peakdir,'standards.csv'))
df_samples.to_csv(os.path.join(peakdir, 'samples.csv'))