## Read GaiaFormsXML files

In [27]:
# Imports
import xmltodict as xtd
import pandas as pd
import os

# Custom functions

## Parse gfxml file and get general info

In [28]:
def parse_gfxml(gfxml_path):

    # parses gfxml and returns it as a dictionary

    with open(gfxml_path, encoding='utf-8') as gfxml:
        gfdict = xtd.parse(gfxml.read())
    
    return gfdict

In [29]:
def get_location_id(gfdict):

    # pass dictionary from parsed gfxml and return location ID
    
    location_id = gfdict['MarineSampleDescription']['Location']
    
    return location_id

In [30]:
def get_sample_list(gfdict):

    # pass dictionary from parsed gfxml and return list of samples from that location

    sample_list = gfdict['MarineSampleDescription']['Pages']['MarineSampleDescriptionPage']
    
    # while parsing, if there are multiple samples they are returned as a list of dictionaries
    # but when it is only one sample, the dictionary is not contained in a list
    # to enable further processing analogous to list of samples, wrap it in a list
    if isinstance(sample_list, dict):
        sample_list = [sample_list]

    return sample_list

## Get HCl tests

In [31]:
# function that will later when iterating over sample list to get called each time to get all HCl tests in that sample

def get_tests(sample):

    # Sample ID
    sample_id = sample['SampleNo']

    # Sample depth
    # sometimes contains weird "nil" string which can not be converted to float
    # if that happens, catch exception and set sample depth = None
    try:
        sample_depth_m = float(sample['Depth'])
    except:
        sample_depth_m = None
    
    # save HCl test information to variable
    # if empty, tests = None
    try:
        tests = sample['HClTests']['MarineHClTest']
    except:
        tests = None
    # to preserve empty rows (samples without HCl tests) (optional):
    #     tests = [{'TestId': None, 'SubSampleRef': None, 'TestOffset': None, 'Reaction': None, 'Residue': None}]
    
    # return sample information (ID, depth, HCl tests) as dictionary
    sample_info = {
                    'Sample ID' : sample_id,
                    'Depth' : sample_depth_m,
                    'Tests' : tests
                  }

    return sample_info

In [32]:
def create_tests_dataframe(location_id, sample_info):
    
    # create dataframe from tests in sample info dictionary

    # if tests are empty, return no rows
    if sample_info['Tests'] == None:
        return None
    else:
        # hacky but works:
        # if multiple tests, they are wrapped in a list. but if it's a single test, it's just a dictionary and needs to be passed an index
        if isinstance(sample_info['Tests'], list):
            df = pd.DataFrame(sample_info['Tests'])
        else:
            df = pd.DataFrame(sample_info['Tests'], index=[0])

        # add location, sample id
        df['Location'] = location_id
        df['Sample ID'] = sample_info['Sample ID']

        # calculate test depth: sample depth + offset
        df = df.astype({'TestOffset' : float})
        df['Depth'] = sample_info['Depth'] + df['TestOffset'] / 100
        
        # drop unused columns and reorder
        df.drop(['TestId', 'TestOffset', 'SubSampleRef'], axis=1, inplace=True)
        df = df[['Location', 'Sample ID', 'Depth', 'Reaction', 'Residue']]

        return df

In [33]:
def test_data_from_gfxml(gfxml_path):

    # use custom functions defined above to pass only the gfxml file path and return the final HCl test dataframe
    
    # parse gfxml
    gfdict = parse_gfxml(gfxml_path)
    # get location id
    location_id = get_location_id(gfdict)
    # get sample list
    sample_list = get_sample_list(gfdict)

    # instantiate empty dataframe to contain all samples
    df = pd.DataFrame()

    # iterate over sample list
    for sample in sample_list:
        # get test info dataframe for every sample
        sample_info = get_tests(sample)
        sample_df = create_tests_dataframe(location_id, sample_info)
        # add sample test info to empty dataframe
        df = pd.concat([df, sample_df])
    
    # return dataframe with all samples and HCl tests
    return df

## Get layer descriptions and filter for contents

In [34]:
# function that will later when iterating over sample list to get called each time to get all layers described in that sample

def get_layers(sample):

    # Sample ID
    sample_id = sample['SampleNo']

    # Sample depth
    # sometimes contains weird "nil" string which can not be converted to float
    # if that happens, sample depth = None
    try:
        sample_depth_m = float(sample['Depth'])
    except:
        sample_depth_m = None
    
    # save layer information to variable
    # if empty, tests = None
    try:
        layers = sample['Layers']
    except:
        layers = None
    # to preserve empty rows (samples without layers):
    #     tests = [{'TestId': None, 'SubSampleRef': None, 'TestOffset': None, 'Reaction': None, 'Residue': None}]
    
    sample_info = {
                    'Sample ID' : sample_id,
                    'Depth' : sample_depth_m,
                    'Layers' : layers
                  }

    return sample_info

In [35]:
def create_layer_dataframe(location_id, layer_info):

    # create dataframe from layers in sample dictionary

    # if layers are empty, return no rows
    if layer_info['Layers'] == None:
        return None
    else:
        # create dataframe from layer dictionary
        layers = layer_info['Layers']['StandardLayer']
        df = pd.DataFrame(layers)

        # check if there is a layer description, otherwise return None
        if 'Description' not in df.columns:
            return None
        else:
            # add location, sample id
            df['Location'] = location_id
            df['Sample ID'] = layer_info['Sample ID']
            # calculate DepthFrom and DepthTo of layer from depth + boundaries
            df = df.astype({'UpperBoundary' : float, 'LowerBoundary' : float})
            df['DepthFrom'] = layer_info['Depth'] + df['UpperBoundary'] / 100
            df['DepthTo'] = layer_info['Depth'] + df['LowerBoundary'] / 100

            # drop unused columns and reorder
            df.drop(['@xsi:type', 'Properties', 'IdentificationStandard', 'Id', 'UpperBoundary', 'LowerBoundary'], axis=1, inplace=True)
            df = df[['Location', 'Sample ID', 'DepthFrom', 'DepthTo', 'Description']]

            return df

In [36]:
# functions to filter for keywords (carbonate content, marl)

def filter_carb_content(df):

    # check if there is a layer description, otherwise return None
    if 'Description' not in df.columns:
        return df
    else:
        condition = df['Description'].str.lower().str.contains('calc') | df['Description'].str.lower().str.contains('carbonate')
        filtered_df = df[condition]

        return filtered_df

def filter_marl(df):

    # check if there is a layer description, otherwise return None
    if 'Description' not in df.columns:
        return df
    else:
        condition = df['Description'].str.lower().str.contains('marl')
        filtered_df = df[condition]

        return filtered_df

In [37]:
def data_from_gfxml(gfxml_path, filter):

    # use custom functions defined above to pass only the gfxml file path and return the final samples and layers dataframe

    # parse gfxml
    gfdict = parse_gfxml(gfxml_path)
    # get location id
    location_id = get_location_id(gfdict)
    # get sample list
    sample_list = get_sample_list(gfdict)

    # instantiate empty dataframe to contain all samples
    df = pd.DataFrame()

    # iterate over sample list
    for sample in sample_list:
        # get test info dataframe for every sample
        layer_info = get_layers(sample)
        layer_df = create_layer_dataframe(location_id, layer_info)
        # add sample test info to empty dataframe
        df = pd.concat([df, layer_df])

    # filter dataframe for keyword passed to this function
    if filter == 'carb':
        df = filter_carb_content(df)
    if filter == 'marl':
        df = filter_marl(df)

    # reset index and return dataframe with filtered samples and layers
    df = df.reset_index().drop('index', axis=1)
    return df

# Export to Excel

In [38]:
# custom function to export dataframes to excel files

def save_excel_if_not_empty(df, path):

    # use global variable excels to track files and tables created
    global excels
    
    # if df is not empty, export and update tracker
    if len(df) > 0:
        df.to_excel(path, index=False)
        excels['Files checked'] += 1
        excels['Excels created'] += 1

    # if empty, update tracker and no file export
    else:
        excels['Files checked'] += 1

    return excels

# Iterate over folder with gfxml files

## HCl test data

In [39]:
# set directory with gfxml files
directory = r'C:\Users\user\example_path\gfxml'

# create list of gfxml files in directory
file_list = [file for file in os.listdir(directory) if file.endswith('.gfxml')]

# set file for excel exports
excel_path = r'CC:\Users\user\example_path\excels\hcl'

# set tracker to 0
excels = {'Files checked' : 0, 'Excels created' : 0}

# iterate over all files
for file in file_list:
    
    # filename = location name + xlsx
    filename = file.split('_MSD')[0] + '.xlsx'

    file_directory = os.path.join(directory, file)
    
    # call function with all single custom functions 
    df = test_data_from_gfxml(file_directory)
    
    # set path for excel files
    save_path = os.path.join(excel_path, filename)
    
    # call excel export function
    stats = save_excel_if_not_empty(df, save_path)

# print statistics
print(stats)

{'Files checked': 495, 'Excels created': 33}


## Layer descriptions with filters applied

In [41]:
# works analogous to cell above (HCl data)

directory = r'C:\Users\user\example_path\gfxml'

file_list = [file for file in os.listdir(directory) if file.endswith('.gfxml')]

excel_path = r'C:\Users\user\example_path\excels\carb'

excels = {'Files checked' : 0, 'Excels created' : 0}

# sometimes there are multiple gfxml files for one location
# to combine samples in one excel file:
# save last dataframe and compare next one to it and combine if they are the same location

# first set last_df to an empty df
last_df = pd.DataFrame({'Location' : ''}, index=[0])

for file in file_list:
    
    filename = file.split('_MSD')[0] + '.xlsx'
    
    file_directory = os.path.join(directory, file)
    
    df = data_from_gfxml(file_directory, filter='carb')

    # if df is not empty, compare to last dataframe
    if len(df) > 0:
        if df.iloc[0, 0] == last_df.iloc[0, 0]:
            # append current sample to last_df if they are the same location
            df = pd.concat([last_df, df])

    # set current df to last_df to enable comparison for the next one
        last_df = df
    
    save_path = os.path.join(excel_path, filename)
    
    stats = save_excel_if_not_empty(df, save_path)


print(stats)

{'Files checked': 495, 'Excels created': 311}
