# Convert input data (Jonathan D. Müller)

Loads the raw data of branch or soil chambers and splits files into daily files in monthly folders

In [None]:
import os
import glob
import re
import pandas as pd
import numpy as np

In [None]:
# Initialization parameters

# Data input
project_path =  './'

project_path_laser = project_path + '01_rawdata/laser computer/'
project_path_tc    = project_path + '01_rawdata/thermocouples/'
project_path_irga  = project_path + '01_rawdata/irga/'
project_path_par   = project_path + '01_rawdata/PAR/'
project_path_flow  = project_path + '01_rawdata/flow/'

# Output
project_path_output = project_path + '02_preprocessed_data/'

# List of months to process
# - If empty, all available data is processed
# - Otherwise, specify using a string of year-month, e.g. ['2018-06'] or ['2017-03','2018-06']
month_list = [ '2022-01' ]

# Minimum IRGA pump flow
# All gas data from the branch chamber IRGA data below this flow will be removed
min_irga_flow = 4

# Functions

In [None]:
# Main functions
#---------------

# Delete all relevant files in a folder.
# - Used to remove 1h and 1min monthly files before running to prevent appending to existing files
def empty_dir(directory):
    files = glob.glob(directory + '*')
    for f in files:
        month_id = f[-10:-6] + '-' + f[-6:-4]
        if(month_id in month_list):
            print('Remove ' + f)
            os.remove(f)
        if(not month_list):
            print('Remove ' + f)
            os.remove(f)
    pass

# Read an Aerodyne laser input file, full file
def read_laser_file(input_fn):
    # Read data
    data = [ ]
    with open(input_fn) as f:
        next(f) # Skip the first line
        for line in f:
            data.append(re.split(r'\s', line.strip(), 9))
    # build the generator        
    for line in data:
        if(len(line) < 9):
            line.append('')
    # first element returned is the columns
    columns = ['timestamp','OCS.1','CO2.1','CO2.2','H2O.1','CO2.3','CO.1','OCS.2','CO2.4']
    # build the data frame
    df = pd.DataFrame(data, columns=columns)
    #for index, row in df.iterrows():
    #    df.loc[df['timestamp'] == row['timestamp'], 'timestamp'] = float(df.loc[df['timestamp'] == row['timestamp'], 'timestamp'])
    df['timestamp'] = pd.to_numeric(df['timestamp'])
    # Now apply normal conversions
    df['timestamp'] = pd.to_datetime(df.timestamp, unit='s', origin=pd.Timestamp('1904-01-01')) - pd.DateOffset(seconds=1) # To convert IGOR-time (i.e. Excel 1904)
    df['dayid'] = df['timestamp'].apply(lambda x:(x.year*10000 + x.month*100 + x.day))
    return(df)

# Read an Aerodyne laser input file, full file
def read_laser_file_by_lines(input_fn, lines):
    # Read data
    data = [ ]
    with open(input_fn) as f:
        next(f) # Skip the first line
        data.append(re.split(r'\s', f.readline().strip(), 9))
    # build the generator        
    for line in data:
        if(len(line) < 9):
            line.append('')
    # first element returned is the columns
    columns = ['timestamp','OCS.1','CO2.1','CO2.2','H2O.1','CO2.3','CO.1','OCS.2','CO2.4']
    # build the data frame
    df = pd.DataFrame(data, columns=columns)
    #for index, row in df.iterrows():
    #    df.loc[df['timestamp'] == row['timestamp'], 'timestamp'] = float(df.loc[df['timestamp'] == row['timestamp'], 'timestamp'])
    df['timestamp'] = pd.to_numeric(df['timestamp'])
    # Now apply normal conversions
    df['timestamp'] = pd.to_datetime(df.timestamp, unit='s', origin=pd.Timestamp('1904-01-01')) - pd.DateOffset(seconds=1) # To convert IGOR-time (i.e. Excel 1904)
    df['dayid'] = df['timestamp'].apply(lambda x:(x.year*10000 + x.month*100 + x.day))
    return(df)

# OLD Read an Aerodyne laser input file, full file
def read_laser_file_old(input_fn):
    df = pd.read_csv(input_fn, sep=' ', skiprows=[0], index_col=False, header=0, names=['timestamp','OCS.1','CO2.1','CO2.2','H2O.1','CO2.3','CO.1','OCS.2','CO2.4'])
    df['timestamp'] = pd.to_datetime(df.timestamp, unit='s', origin=pd.Timestamp('1904-01-01')) - pd.DateOffset(seconds=1) # To convert IGOR-time (i.e. Excel 1904)
    df['dayid'] = df['timestamp'].apply(lambda x:(x.year*10000 + x.month*100 + x.day))
    return(df)

# OLD Read an Aerodyne laser input file, full file
def read_laser_file_by_lines_old(input_fn, lines):
    df = pd.read_csv(input_fn, sep=' ', skiprows=[0], nrows=lines, index_col=False, header=0, names=['timestamp','OCS.1','CO2.1','CO2.2','H2O.1','CO2.3','CO.1','OCS.2','CO2.4'])
    df['timestamp'] = pd.to_datetime(df.timestamp, unit='s', origin=pd.Timestamp('1904-01-01')) - pd.DateOffset(seconds=1) # To convert IGOR-time (i.e. Excel 1904)
    df['dayid'] = df['timestamp'].apply(lambda x:(x.year*10000 + x.month*100 + x.day))
    return(df)

# Read a CR1000 input file, full file
def read_cr1000_file(input_fn):
    df = pd.read_csv(input_fn,skiprows=[0,2,3,4,5], na_values=["NAN"])
    if(df.columns[0] != 'TIMESTAMP'):
        df = pd.read_csv(input_fn,skiprows=[0,1,3,4,5,6], na_values=["NAN"])
    df.rename(columns={'TIMESTAMP':'timestamp'}, inplace=True)
    df['timestamp'] = pd.to_datetime( df.timestamp, format='%Y-%m-%d %H:%M:%S', utc=True, errors="raise")#errors='coerce')
    df['dayid'] = df['timestamp'].apply(lambda x:(x.year*10000 + x.month*100 + x.day))
    return(df)

# Read a CR1000 input file by lines, only some initial lines
def read_cr1000_file_by_lines(input_fn, lines):
    df = pd.read_csv(input_fn,skiprows=[0,2,3,4], na_values=["NAN"],nrows=lines)
    if(df.columns[0] != 'TIMESTAMP'):
        df = pd.read_csv(input_fn,skiprows=[0,1,3,4,5], na_values=["NAN"],nrows=lines)
    df.rename(columns={'TIMESTAMP':'timestamp'}, inplace=True)
    df['timestamp'] = pd.to_datetime( df.timestamp, format='%Y-%m-%d %H:%M:%S', utc=True, errors="raise")#errors='coerce')
    df['dayid'] = df['timestamp'].apply(lambda x:(x.year*10000 + x.month*100 + x.day))
    return(df)

# Read a CR1000 input file, full file
def read_irga_file(input_fn):
    df = pd.read_csv(input_fn,skiprows=[0,2,3,4,5], na_values=["NAN"])
    if(df.columns[0] != 'TIMESTAMP'):
        df = pd.read_csv(input_fn,skiprows=[0,1,3,4,5,6], na_values=["NAN"])
    df.rename(columns={'TIMESTAMP':'timestamp'}, inplace=True)
    df['timestamp'] = pd.to_datetime( df.timestamp, format='%Y-%m-%d %H:%M:%S', utc=True, errors="raise")#errors='coerce')
    df['dayid'] = df['timestamp'].apply(lambda x:(x.year*10000 + x.month*100 + x.day))
    return(df)

# Read a CR1000 input file by lines
def read_irga_file_by_lines(input_fn, lines):
    df = pd.read_csv(input_fn,skiprows=[0,2,3,4], na_values=["NAN"],nrows=lines)
    if(df.columns[0] != 'TIMESTAMP'):
        df = pd.read_csv(input_fn,skiprows=[0,1,3,4,5], na_values=["NAN"],nrows=lines)
    df.rename(columns={'TIMESTAMP':'timestamp'}, inplace=True)
    df['timestamp'] = pd.to_datetime( df.timestamp, format='%Y-%m-%d %H:%M:%S', utc=True, errors="raise")#errors='coerce')
    df['dayid'] = df['timestamp'].apply(lambda x:(x.year*10000 + x.month*100 + x.day))
    return(df)

def remove_obsolete_irga_data(temp):
    temp = temp.copy()
    
    # Rename PAR & temperature
    temp.rename(columns={'RadKipZonen':'par.ambient.umol_m2_s1'}, inplace=True)
    temp.rename(columns={'Tc(8)':'temp.air.ambient.c'}, inplace=True)
    temp.rename(columns={'H2o_6262_mmol_mol':'h2o.irga.ambient.mmol_mol'}, inplace=True)
    temp.rename(columns={'Co2_6262_micmol_mol':'co2.irga.ambient.umol_mol'}, inplace=True)
    temp.rename(columns={'AirFlow_Amb':'pump.flow.irga.lpm'}, inplace=True)
    temp.rename(columns={'Prees_7000':'P.irga.kPa'}, inplace=True)
    
    # Remove bad data
    temp.loc[temp['pump.flow.irga.lpm'] <= min_irga_flow, 'h2o.irga.ambient.mmol_mol'] = np.nan
    temp.loc[temp['pump.flow.irga.lpm'] <= min_irga_flow, 'co2.irga.ambient.umol_mol'] = np.nan
    
    # Keep only relevant columns
    temp = temp[['timestamp','temp.air.ambient.c','par.ambient.umol_m2_s1','h2o.irga.ambient.mmol_mol','co2.irga.ambient.umol_mol','P.irga.kPa','dayid']]
    
    return(temp)

# Writes output files in full temporal resolution
def write_output_file(out_df, date_idx, out_dir, output_fn):
    out_df = out_df.copy()
    # Drop duplicates
    out_df.drop_duplicates(subset = 'timestamp', inplace=True)
    # Sort, in case it's not yet the case
    out_df.sort_values('timestamp', inplace=True)
    
    # Check if output folders exist. If not, create
    month_dir = str(date_idx)[0:4] + "-" + str(date_idx)[4:6]
    if(not os.path.exists(out_dir + month_dir)): # make directory if it doesn't exist
        os.makedirs(out_dir + month_dir)
    # Create file name
    out_fn = out_dir + month_dir + "/" + output_fn + "_" + str(date_idx) + ".csv"
    #print(str(len(out_df.dayid)), out_fn) # Shows final file size)
    # organise data for output
    temp_df = out_df
    # Before saving, remove the index
    temp_df.drop('dayid', axis=1, inplace=True)
    # Move timestamp column to the front
    col = temp_df.pop('timestamp')
    temp_df.insert(0, col.name, col, allow_duplicates=True)
    # Remove timezone information
    temp_df['timestamp'] = temp_df['timestamp'].dt.tz_localize(None)
    # Write data
    temp_df.to_csv(out_fn, index=False, encoding='utf-8', date_format='%Y-%m-%d %H:%M:%S') # Save file
    
# Creates a daily file for each type
def organise_files(project_path, output_path, filetype):
    # List all files in the directories
    fn_list = sorted(glob.glob(project_path + '*/*', recursive=True))
    if(filetype == 'laser'):
        fn_list = sorted(glob.glob(project_path + '*/*.str', recursive=True))
    saved = []
    
    # Create output path name
    project_path_output = output_path + project_path.split('/')[-2] + '/'

    # For all files in the directory
    for fn_i, fn in enumerate(fn_list):
        # Only run data in the month list
        current_month = fn.replace(project_path[:-1], "")[1:8]
        if((current_month in month_list) or (len(month_list) == 0)):
            if( (filetype == 'irga') & (int(current_month.replace('-','')) < 202007) ):
                continue
            #display(fn.replace(project_path_chambers[:-1], "")[1:8])
            pass
        else:
            continue
        
        # Debugging message
        if(fn_i % 1 == 0): # % 20 to show every 20th file being loaded
            print( '{:<07}'.format(str(round(fn_i * 100 / len(fn_list), 4))) + "%\t\t" + fn.split('\\')[-2] + '/' + fn.split('\\')[-1]) # Show status
            #print( '{:<07}'.format(str(round(fn_i * 100 / len(fn_list), 4))) + "%\t\t" + fn.replace(project_path + "01_rawdata/", "")) # Show status
    
        # Load the current laser file
        if(filetype == 'laser'):
            df = read_laser_file(fn)
        else:
            df = read_cr1000_file(fn)
            if(filetype == 'irga'):
                df = remove_obsolete_irga_data(df)
                project_path_output = output_path + 'ambient/'
    
        if (fn_i != len(fn_list)-1):
            # Load next file
            if(filetype == 'laser'):
                df_next = read_laser_file_by_lines(fn_list[fn_i+1], lines=1)
            else:
                df_next = read_cr1000_file_by_lines(fn_list[fn_i+1], lines=1)
            next_day = df_next['dayid'].tolist()[0]
            final_file = False
        else:
            next_day = 0
            final_file = True
  
        # Group by per-day id
        grouped = df.groupby(['dayid'])
        #print("    Days in this file:  ", len(grouped))
    
        # For each group
        for group_i, (this_day, day) in enumerate(grouped):
        
            # If we have saved data and if the day matches append the current
            if (len(saved) > 0):
                if (this_day == saved['dayid'].tolist()[0]):
                    day = pd.concat([saved, day], axis=0, ignore_index=True)
                    #print("    Current line count: ", str(len(day.dayid)))
        
            #If this is the final group in the file and this is not the final file and the first group of the next file is the same day
            if (group_i == len(grouped)-1) and (not final_file) and (this_day == next_day):
                saved = day
                continue  
            else:
                write_output_file(day, this_day, project_path_output, filetype)
    pass

# Convert files to daily output

In [None]:
# Run calculations
#-----------------

print('Laser files')
organise_files(project_path_laser, project_path_output, 'laser')

print("Done...")

In [None]:
print('PAR files')
organise_files(project_path_par, project_path_output, 'par')

print("Done...")

In [None]:
print('Thermocouple files')
organise_files(project_path_tc, project_path_output, 'tc')

print("Done...")

In [None]:
print('Flow rate files')
organise_files(project_path_flow, project_path_output, 'flow')

print("Done...")

In [None]:
# IRGA files needed for the ambient data. There is a lot, so it's slow...
print('IRGA files')
organise_files(project_path_irga, project_path_output, 'irga')

print("Done...")

# Missing data replacement

There were gaps in the measurements due to device failure. Data was considered to be stable during this time period and was therefore replaced as follows:

- FR gap:    [started at 11th July 2021 end 7th Aug 2021], we will use 10th July 2021 for July days, and 9th Aug 2021 for Aug days  
- Tc gap:    [started at 13th July 2021 end 23rd July 2021], we will use 11th July 2021 for July days
- PAR gap:   [started at 11th July 2021 end 7th Aug 2021], we will use 10th July 2021 for July days, and 8th Aug 2021 for Aug days

In [None]:
# Replace bad Tc, July 2021

# Read input file
input_fn = project_path_output + 'thermocouples/' + '2021-07/tc_20210711.csv'
print(input_fn)
input_df = pd.read_csv(input_fn)
input_df['timestamp'] = pd.to_datetime(input_df.timestamp, format='%Y-%m-%d %H:%M:%S')

# Calculate number of days offset
days_offset = str(input_df['timestamp'].dt.day.values[0] - 1) + 'days'
current_month = input_df['timestamp'].dt.month.values[0]
input_df['timestamp'] = input_df['timestamp'] - pd.Timedelta(days_offset)

# July
for i in np.arange(14, 23):
    out_df = input_df.copy()
    out_df['timestamp'] = out_df['timestamp'] + pd.Timedelta(str(i-1) + 'day')
    out_df['dayid'] = 2021*10000 + current_month*100 + i
    date_idx = 2021*10000 + current_month*100 + i
    print(date_idx)
    write_output_file(out_df, date_idx, project_path_output + 'thermocouples/', 'tc')
    pass

In [None]:
# Replace bad flow, July 2021

# Read input file
input_fn = project_path_output + 'flow/' + '2021-07/flow_20210710.csv'
print(input_fn)
input_df = pd.read_csv(input_fn)
input_df['timestamp'] = pd.to_datetime(input_df.timestamp, format='%Y-%m-%d %H:%M:%S')

# Calculate number of days offset
days_offset = str(input_df['timestamp'].dt.day.values[0] - 1) + 'days'
current_month = input_df['timestamp'].dt.month.values[0]
input_df['timestamp'] = input_df['timestamp'] - pd.Timedelta(days_offset)

# July
for i in np.arange(11, 32):
    out_df = input_df.copy()
    out_df['timestamp'] = out_df['timestamp'] + pd.Timedelta(str(i-1) + 'day')
    out_df['dayid'] = 2021*10000 + current_month*100 + i
    date_idx = 2021*10000 + current_month*100 + i
    print(date_idx)
    write_output_file(out_df, date_idx, project_path_output + 'flow/', 'flow')
    pass

# Replace bad flow, Aug 2021

# Read input file
input_fn = project_path_output + 'flow/' + '2021-08/flow_20210809.csv'
print(input_fn)
input_df = pd.read_csv(input_fn)
input_df['timestamp'] = pd.to_datetime(input_df.timestamp, format='%Y-%m-%d %H:%M:%S')

# Calculate number of days offset
days_offset = str(input_df['timestamp'].dt.day.values[0] - 1) + 'days'
current_month = input_df['timestamp'].dt.month.values[0]
input_df['timestamp'] = input_df['timestamp'] - pd.Timedelta(days_offset)

# July
for i in np.arange(1, 8):
    out_df = input_df.copy()
    out_df['timestamp'] = out_df['timestamp'] + pd.Timedelta(str(i-1) + 'day')
    out_df['dayid'] = 2021*10000 + current_month*100 + i
    date_idx = 2021*10000 + current_month*100 + i
    print(date_idx)
    write_output_file(out_df, date_idx, project_path_output + 'flow/', 'flow')
    pass

In [None]:
# Replace bad PAR, July 2021

# Read input file
input_fn = project_path_output + 'PAR/' + '2021-07/par_20210710.csv'
print(input_fn)
input_df = pd.read_csv(input_fn)
input_df['timestamp'] = pd.to_datetime(input_df.timestamp, format='%Y-%m-%d %H:%M:%S')

# Calculate number of days offset
days_offset = str(input_df['timestamp'].dt.day.values[0] - 1) + 'days'
current_month = input_df['timestamp'].dt.month.values[0]
input_df['timestamp'] = input_df['timestamp'] - pd.Timedelta(days_offset)

# July
for i in np.arange(12, 32):
    out_df = input_df.copy()
    out_df['timestamp'] = out_df['timestamp'] + pd.Timedelta(str(i-1) + 'day')
    out_df['dayid'] = 2021*10000 + current_month*100 + i
    date_idx = 2021*10000 + current_month*100 + i
    print(date_idx)
    write_output_file(out_df, date_idx, project_path_output + 'PAR/', 'par')
    pass

# Replace bad PAR, Aug 2021

# Read input file
input_fn = project_path_output + 'PAR/' + '2021-08/par_20210808.csv'
print(input_fn)
input_df = pd.read_csv(input_fn)
input_df['timestamp'] = pd.to_datetime(input_df.timestamp, format='%Y-%m-%d %H:%M:%S')

# Calculate number of days offset
days_offset = str(input_df['timestamp'].dt.day.values[0] - 1) + 'days'
current_month = input_df['timestamp'].dt.month.values[0]
input_df['timestamp'] = input_df['timestamp'] - pd.Timedelta(days_offset)

# July
for i in np.arange(1, 8):
    out_df = input_df.copy()
    out_df['timestamp'] = out_df['timestamp'] + pd.Timedelta(str(i-1) + 'day')
    out_df['dayid'] = 2021*10000 + current_month*100 + i
    date_idx = 2021*10000 + current_month*100 + i
    print(date_idx)
    #display(out_df)
    write_output_file(out_df, date_idx, project_path_output + 'PAR/', 'par')
    pass