In [None]:
import os
import re
import sys

sys.path.append(os.path.join('..'))
sys.path.append(os.path.join('..', 'src'))
sys.path.append(os.path.join('..', 'src', 'libs'))

from dotenv import load_dotenv
import pandas as pd

from mfethuls import parse as pa

### Test current parser implementation

In [None]:
kw = 'uv'
df = pa.get_data(pa.path_constructor(kw, 'LUB038'), kw)
# data = pa.get_data(pa.path_constructor('uv', *exps_reps), 'uv')
df

### General funtions - parsers interface

In [None]:
def path_constructor(instrmnt_kw, *args):

    # Load .env into enviroment variables
    load_dotenv()
    
    # Path to folder containing instrument data
    path = os.environ.get('PATH_TO_DATA')
    env_suffix = f'{instrmnt_kw.upper()}_FOLDER_NAME'
    path = os.path.join(path, os.environ.get(env_suffix))
    
    
    # Folders/Files insterested in for analysis
    if [*args]:
        args = [*args]
    
    # Create dictionary of folders in accordance with args and folders present
    dict_paths = {}
    for root, dirs, files in os.walk(path):
        name = [os.path.normpath(root).split(os.path.sep)[-1] for name in args if name in root]
        if name:
            dict_paths[name[0]] = [os.path.join(root, f) for f in sorted(files)]
    
    if not [*sum([*dict_paths.values()], [])] and not os.path.exists(path):
        raise KeyError(f'path: {path} does not exist')
    
    return dict_paths


# Construct dataframe from different instruments via walk through paths
def get_data(dict_paths, instrmnt_kw):
    instrmnt_kw_lwr = instrmnt_kw.lower()

    dict_df = {}
    for name, paths in dict_paths.items():
        df = pd.DataFrame()
        for path in paths:
            print(path)

            # TODO: develop method to read type of file - user decides on path
            if instrmnt_kw_lwr == 'uv':
                df = parse_uvvis(df, path)

            elif instrmnt_kw_lwr == 'ftir':
                df = parse_ftir(df, path)

            elif instrmnt_kw_lwr == 'tga':
                df = parse_tga(df, path)

            elif instrmnt_kw_lwr == 'dsc':
                df = parse_dsc(df, path)

            else:
                raise KeyError(f'The instrument keyword {instrmnt_kw} is not found')
    
        if not df.empty:
            dict_df[name] = df

    return dict_df

### UV-Vis

In [None]:
# Read in .txt files from UV-Vis Shimadzu
def get_uvvis_df(path): return pd.read_csv(path, skiprows=lambda x: x in [0, 2], sep='\t') \
    .set_index('Wavelength nm.') \
    .rename(columns={'Abs.': os.path.basename(os.path.normpath(path)).split('_')[-1].rstrip('.txt').lstrip('0')}) \
    .rename(columns={'': '0'}) \
    .astype(float)

# Concat each read and indicate files not read
def parse_uvvis(df: pd.DataFrame, path: str) -> pd.DataFrame:
    if '.txt' in path[-4:]:

        df = pd.concat([df, get_uvvis_df(path)], axis=1).dropna(how='all', axis=1)

    else:
        print(f'Not reading: {path}')

    return df

### FTIR

In [None]:
# Needs to be improved on.
def get_ftir_df(path): return pd.read_csv(path, skiprows=lambda x: x in [0, 0], sep=',') \
    .set_index('cm-1') \
    .rename(columns={'%T': os.path.basename(os.path.normpath(path)).split('_')[-1].rstrip('.csv').lstrip('0')}) \
    .astype(float)

def parse_ftir(df: pd.DataFrame, path: str) -> pd.DataFrame:
    if '.csv' in path[-4:]:

        df = pd.concat([df, get_ftir_df(path)], axis=1).dropna(how='all', axis=1)

    else:
        print(f'Not reading: {path}')

    return df

### TGA

In [None]:
def get_tga_df(path):

    lines = []
    with open(path) as f:
        take = 0
        for line in f.readlines():

            if take==1:
                l = re.split('\s+', line.strip(), maxsplit=5)
                lines.append(l)

            if 'Index' in line:
                cols = re.split('\s+', line.strip(), maxsplit=5)
                take=1
            elif 'Results' in line:
                take=0

    return pd.DataFrame(lines, columns=cols).apply(pd.to_numeric, errors='coerce').dropna() \
             .rename(columns={'Value': f'Value_{os.path.basename(os.path.normpath(path)).rstrip(".txt")}'}) \
             .set_index('Tr') \
             .drop(columns=['Index', 't', 'Ts'])

def parse_tga(df: pd.DataFrame, path: str) -> pd.DataFrame:
    if '.txt' in path[-4:]:

        df = pd.concat([df, get_tga_df(path)], axis=1).dropna(how='all', axis=1)

    else:
        print(f'Not reading: {path}')

    return df

### DSC

In [None]:
def get_dsc_df(path):

    lines = []
    with open(path) as f:
        take = 0
        for line in f.readlines():

            if take==1:
                l = re.split('\s+', line.strip(), maxsplit=5)
                lines.append(l)

            if 'Index' in line:
                cols = re.split('\s+', line.strip(), maxsplit=5)
                take=1
            elif 'Results' in line:
                take=0

    df = pd.DataFrame(lines, columns=cols).apply(pd.to_numeric, errors='coerce').dropna() \
           .drop(columns=['Index', 't', 'Ts'])
    df['name'] = [f'{os.path.basename(os.path.normpath(path)).rstrip(".txt")}'] * len(df.Tr)
    
    
    # TODO: Make more elegant >:
    # Cut heating, cooling and isothermal cycles - label accordingly
    df['cycle'] = ['Isothermal'] * len(df.Tr)
    df['differ'] = df.Tr.diff()
    df['differ_1'] = df.differ.diff()

    df.loc[df.differ > 0, 'cycle'] = 'Heating'
    df.loc[df.differ < 0, 'cycle'] = 'Cooling'
    df.loc[(df.differ_1 < -0.1) & (df.cycle != 'Isothermal'), 'cycle'] = 'Cooling_start'
    df.loc[(df.differ_1 < -0.1) & (df.cycle == 'Isothermal'), 'cycle'] = 'Heating_end'
    df.loc[(df.differ_1 > 0.1) & (df.cycle != 'Isothermal'), 'cycle'] = 'Heating_start'
    df.loc[(df.differ_1 > 0.1) & (df.cycle == 'Isothermal'), 'cycle'] = 'Cooling_end'


    heating_cycle_num = 0
    cooling_cycle_num = 0
    for index, row in df.iterrows():
        if row.differ > 0.0:
            if 'Heating_end' not in row.cycle:
                df.loc[index, 'cycle'] = df.loc[index, 'cycle'] + f'_{str(heating_cycle_num)}'
            else:
                heating_cycle_num += 1

        elif row.differ < 0.0:
            if 'Cooling_end' not in row.cycle:
                df.loc[index, 'cycle'] = df.loc[index, 'cycle'] + f'_{str(cooling_cycle_num)}'
            else:
                cooling_cycle_num += 1

        else:
            if 'Heating_end' in row.cycle:
                heating_cycle_num += 1
            elif 'Cooling_end' in row.cycle:
                cooling_cycle_num += 1
    
    return df.drop(columns=['differ', 'differ_1'])

def parse_dsc(df: pd.DataFrame, path: str) -> pd.DataFrame:
    if '.txt' in path[-4:]:

        df = pd.concat([df, get_dsc_df(path)], axis=0).dropna(how='all', axis=1)

    else:
        print(f'Not reading: {path}')

    return df

### RHEOMETER

In [None]:
# df = pa.get_data(pa.path_constructor('Rheology', 'LUB080'), 'rheo')
# pa.path_constructor('Rheology', 'LUB080')
dev_path = 'C:\\Users\\BertossL\\Documents\\Rheology\\LUB080\\lub080_uv0.csv'

# with open(dev_path, 'r') as f:
#     contents = f.read()

df = pd.read_csv('C:\\Users\\BertossL\\Documents\\Rheology\\LUB080\\lub080_uv0.txt', engine='python',
            encoding='utf-8', on_bad_lines='skip', skip_blank_lines=True, header=[4, 6], sep='\t') \
            .dropna(how='all') \
            .reset_index(drop=True) \
            .sort_index(axis=1) \
            .drop(columns=['Interval data:', 'Point No.'])
df.columns = df.columns.get_level_values(0) + [f' {col}' if 'Unnamed' not in col else f'' for col in df.columns.get_level_values(1)]
df.loc[:, 'name'] = os.path.basename(os.path.normpath(dev_path)).split('$')[0]
df.loc[:, 'test_type'] = os.path.basename(os.path.normpath(dev_path)).split('$')[-1].rstrip('.csv').strip('0')
df

In [None]:
# Can try and improve on.
def get_rheo_df(path): 
    
    # Quite a shitty parse
    df = pd.read_csv(path, engine='python', encoding='utf-8', on_bad_lines='skip', skip_blank_lines=True, header=[4, 6], sep='\t') \
           .dropna(how='all') \
           .reset_index(drop=True) \
           .sort_index(axis=1) \
           .drop(columns=['Interval data:', 'Point No.']) 
    
    # Rename columns
    df.columns = df.columns.get_level_values(0) + [f' {col}' if 'Unnamed' not in col else f'' for col in df.columns.get_level_values(1)]
    df.loc[:, 'name'] = os.path.basename(os.path.normpath(dev_path)).split('_')[0].rstrip('.csv')
    df.loc[:, 'test_type'] = os.path.basename(os.path.normpath(dev_path)).split('_')[-1].rstrip('.csv').strip('0')

    return df

def parse_rheo(df: pd.DataFrame, path: str) -> pd.DataFrame:
    if '.csv' in path[-4:]:

        df = pd.concat([df, get_ftir_df(path)], axis=1).dropna(how='all', axis=0)

    else:
        print(f'Not reading: {path}')

    return df

### Test parser

In [None]:
kw = 'dsc'
df = get_data(path_constructor(kw), kw).get('DSC')
df

In [None]:
print(df.name.unique())

import matplotlib.pyplot as plt
%matplotlib qt

heatings_bkb = df.loc[(df.name == '!$BKBZn_DSC') & (df.cycle.str.contains('Heating')), :]
heatings_bkbmbtt = df.loc[(df.name == '!$BKBMBTTZn_DSC') & (df.cycle.str.contains('Heating')), :]
heatings_bkbmbttex = df.loc[(df.name == '!$LUB026_BKBZn_MBTT_UVex_DSC') & (df.cycle.str.contains('Heating')), :]
heatings_bkbex = df.loc[(df.name == '!$LUB_BKBZn_noMBTT_UVex_DSC') & (df.cycle.str.contains('Heating')), :]


# plt.plot(heatings.Tr, heatings.Value, '.',)
# plt.plot(heatings[(heatings.cycle.str.contains('1'))].Tr, heatings[(heatings.cycle.str.contains('1'))].Value, 'r.')

# plt.plot(heatings_bkbmbtt.Tr, heatings_bkbmbtt.Value, '.', color='steelblue')
plt.plot(heatings_bkbmbtt[(heatings_bkbmbtt.cycle.str.contains('1'))].Tr, heatings_bkbmbtt[(heatings_bkbmbtt.cycle.str.contains('1'))].Value, '.', color='deepskyblue')

# plt.plot(heatings_bkbmbttex.Tr, heatings_bkbmbttex.Value, '.', color='forestgreen')
plt.plot(heatings_bkbmbttex[(heatings_bkbmbttex.cycle.str.contains('1'))].Tr, heatings_bkbmbttex[(heatings_bkbmbttex.cycle.str.contains('1'))].Value, '.', color='limegreen')

# plt.plot(heatings_bkbex.Tr, heatings_bkbex.Value, 'm.')
# plt.plot(heatings_bkbex[(heatings_bkbex.cycle.str.contains('1'))].Tr, heatings_bkbex[(heatings_bkbex.cycle.str.contains('1'))].Value, 'g.')

In [None]:
coolings = df.loc[(df.name == '!$BKBMBTTZn_DSC') & (df.cycle.str.contains('Cooling')), :]
coolings_1 = df.loc[(df.name == '!$LUB026_BKBZn_MBTT_UVex_DSC') & (df.cycle.str.contains('Cooling')), :]

plt.plot(coolings.Tr, coolings.Value, '.', color='deepskyblue')
plt.plot(coolings_1.Tr, coolings_1.Value, '.', color='limegreen')

In [None]:
for name
df.loc[(df.name == '!$BKB1_DSC') & (df.cycle.str.contains('Heating')), :]

In [None]:
kw = 'tga'
path_constructor(kw)
dict_df = get_data(path_constructor(kw), kw)

In [None]:
tga = dict_df.get('TGA')

In [None]:
tga.columns

In [None]:
tga.loc[:, ['Value_LUB026_BKBZN_noMBTT_UVex_TGA', 'Value_LUB_BKB1']].plot()

In [None]:
tga.loc[:, ['Value_LUB026_BKBZN_MBTT_UVex_TGA', 'Value_LUB_BKBMBTT']].plot()

In [None]:
kw = 'Rheology'
df = pa.get_data(pa.path_constructor(kw, 'LUB081'), kw)
df

In [None]:
df.where(df.test_type == 'freqSweep').dropna(how='all', axis=0).dropna(how='all', axis=1)

In [None]:
df.name.unique()