In [1]:
import pandas as pd
import numpy as np
import numba
import glob
import os
import re
from functools import partial

import dask
import dask.dataframe as dd
from dask.distributed import Client, progress
#client = Client(n_workers=2, threads_per_worker=2, memory_limit='4GB')
#client

## Script to clean up any data inconsistencies for UTESpac and any other use (e.g. python)

In [2]:
in_path = "C:/Users/moo90/Box/data/materhorn/raw_data/ES"

out_path = "C:/Users/moo90/Box/data/materhorn/ES"


# UofU : var_hgt (w/ decimals)
# Split on underscore, index by first and last
# first -> use rename dict
# last -> convert to float, then centimeters, then centimeters to int

# UND : varhgt_?? (w/o decimals)
# Split on underscore, use first, then split by numerical vs. alphabetic
# alpha -> use rename dict
# num -> convert to centimeters, then centimeters to int


def find_time_lims(fpath, dt_str='%Y%m%d%H%M%S'):
    """
    Find start and end dates for each file based on naming conventions for materhorn dataset

    Args:
        fpath (str) : full file name or path to dataset
        dt_str (str) : strptime representation of the date in the filename

    Returns:
        list containing start [0] and end [1] of timeseries in file
    """

    dt_list = os.path.splitext(os.path.basename(fpath))[0].split('_')[-2:]

    return [pd.to_datetime(dt, format=dt_str) for dt in dt_list]

def asdf(file_list, rename_func, in_path, parq_path, csv_path, station_pairs=None):

    file_dict = {k:{'start_end_dates':find_time_lims(k)} for k in file_list}

    date_list = []

    # Make sure output directories are there
    os.makedirs(os.path.join(parq_path, "raw", station), exist_ok=True)
    os.makedirs(os.path.join(csv_path, "raw", station), exist_ok=True)
    return

In [4]:
# ES1 - UofU (easiest) - consistent heights in both seasons

def rename_es1(col_str):
    
    var_rename = {'Ux' : 'Ux',
                  'Uy' : 'Uy',
                  'Uz' : 'Uz',
                  'T_Sonic' : 'T_sonic',
                  'diagnostic' : 'diag_sonic'
                  }

    (vname, _, hgt) = col_str.rpartition('_')
    new_vname = var_rename[vname]

    return f"{new_vname}_{hgt}"

station = 'ES1'

# First find files and start/end dates of each file
file_list = sorted(glob.glob(os.path.join(in_path, station, f"DPG-UoU_{station}_*_20Hz_*.txt")))

file_dict = {k:{'start_end_dates':find_time_lims(k)} for k in file_list}

date_list = []

# Make sure output directories are there
os.makedirs(os.path.join(out_path, station, "raw_20hz", "parq"), exist_ok=True)
os.makedirs(os.path.join(out_path, station, "raw_20hz", "csv"), exist_ok=True)

for i,f in enumerate(file_dict.keys()):

    # Read in file
    in_df = pd.read_csv(f, skiprows=[0,2,3], header=0, index_col=0, parse_dates=[0], na_values=["NAN"])
    
    # Drop duplicates, force to 20hz, and shift one
    in_df = in_df[~in_df.index.duplicated(keep='first')].shift(-1, freq='50ms')

    # Write initial running dataframe or concat to existing
    if i==0:
        running_df = in_df.copy()
    else:
        running_df = pd.concat([in_df, running_df])

    # Drop any dates already written out
    # https://stackoverflow.com/questions/37307796/fastest-way-to-eliminate-specific-dates-from-pandas-dataframe
    running_df = running_df.loc[~np.in1d(running_df.index.date, pd.to_datetime(date_list).date), :]

    # Concat to running dataframe
    out_dates = np.unique(running_df.index.date)

    for j,date in enumerate(out_dates):
        # Skip last date if not on last data file
        if (j == out_dates.shape[0]-1) & (i != len(file_dict.keys())):
            continue
        
        # Select date
        temp_df = running_df.loc[date.strftime('%Y-%m-%d')]

        # Shift back to correct datetimes, sort index
        # Also force to 20hz between first and last value
        temp_df = temp_df.shift(1, freq='50ms').sort_index().asfreq('50ms')

        # Rename columns
        temp_df = temp_df.rename(columns=rename_es1)
    
        # Write out to parquet file
        file_date = date.strftime('%Y_%m_%d')
        temp_df.to_parquet(os.path.join(out_path, station, "raw_20hz", "parq", f"{station}_EC_20hz_{file_date}.parquet"), engine='pyarrow', index=True)
        temp_df.to_csv(os.path.join(out_path, station, "raw_20hz", "csv", f"{station}_EC_20hz_{file_date}.csv"), float_format='%g')

        date_list.append(date)

In [5]:
# ES2 - UND

def rename_es2(col_str, file_id):
    
    var_rename = {'u' : 'Ux',
                  'v' : 'Uy',
                  'w' : 'Uz',
                  'ts' : 'T_sonic'
                  }

    hgt_rename = {'05' : '0.5',
                  '2' : '2',
                  '4' : '4',
                  '5' : '5',
                  '10' : '10',
                  '16' : '16',
                  '20' : '20',
                  '25' : '25',
                  '28' : '28',
                  '32' : '32'
    }    

    try:
        [vname, hgt] = re.findall(r"[^\W\d_]+|\d+", col_str.rpartition('_')[0])
        new_vname = var_rename[vname]

        new_hgt = hgt_rename[hgt]

        return f"{new_vname}_{new_hgt}"

    except Exception as e:

        return f"{col_str.lower()}_{file_id}"

# 1407 - Fall 0.5, 4, and 10 m
# 2590 - Spring 16, 20, 25, and 32 m
# 2717 - Fall 16, 20, 25, and 28 m and Spring 0.5, 2, 5, and 10 m
# 2720 - Fall and Spring temperature profiles

# Fall should zip fine
# Spring is off by one (jfc)... decided to load all into memory
station_pairs = {2012 : [1407, 2717],
                 2013 : [2717, 2590]
                 }

station = 'ES2'

# Create output directories
os.makedirs(os.path.join(out_path, station, "raw_20hz", "parq"), exist_ok=True)
os.makedirs(os.path.join(out_path, station, "raw_20hz", "csv"), exist_ok=True)

for yr in station_pairs.keys():

    yr_list = []

    for fid in station_pairs[yr]:

        temp_df_list = []

        # Get file list for certain file id and year
        file_list = glob.glob(os.path.join(in_path, station, 'raw_20hz', f"UND_{station}_{fid}*_{yr}*.txt"))

        for f in file_list:
            
            # Read in data from file, remove any duplicates, shift -1 for date grouping, and rename columns
            temp_df = pd.read_csv(f, skiprows=[0,2,3], header=0, na_values=["NAN"], parse_dates=[0], index_col=[0])
            temp_df = temp_df[~temp_df.index.duplicated(keep='first')].shift(-1, freq='50ms').rename(columns=partial(rename_es2, file_id=fid))

            # Append to running list
            temp_df_list.append(temp_df)

        # Concat all dataframes with same year and 
        yr_list.append(pd.concat(temp_df_list))

    # Concat all dataframes for the year, sort and pad any missing values with nans
    yr_df = pd.concat(yr_list, axis=1)
    yr_list = []
    yr_df = yr_df.sort_index().asfreq('50ms')

    # Get dates to loop through (shifting once just in case... although it's a very isolated edge case)
    out_dates = np.unique(yr_df.shift(1, freq='50ms').index.date)

    for date in out_dates:

        # Subset by days, shift back times, and rename columns
        daily_df = yr_df.loc[date.strftime('%Y-%m-%d')].shift(1, freq='50ms')

        # Write out to parquet and csv files
        file_date = date.strftime('%Y_%m_%d')
        daily_df.to_parquet(os.path.join(out_path, station, "raw_20hz", "parq", f"{station}_EC_20hz_{file_date}.parquet"), engine='pyarrow', index=True)
        daily_df.to_csv(os.path.join(out_path, station, "raw_20hz", "csv", f"{station}_EC_20hz_{file_date}.csv"), float_format='%g')

    yr_df = None
    

In [6]:
# ES3 - UND

# 2590 - Fall temperature profiles
# 5254 - Spring temperature profiles
# 2667 - Fall and Spring 5, 10, and 20 m 
# 2712 - Fall and Spring 0.5 and 2 m (NOTE: 2m has different naming conventions (yay...))
    # Fall "TIMESTAMP","RECORD","u05_EM","v05_EM","w05_EM","ts05_EM","T2_EM","Ux2_EM","Uy2_EM","Uz2_EM","Ts2_EM","rho_w","kh_mV","PTemp","VWC","Period"
    # Spring "TIMESTAMP","RECORD","u05_ES3","v05_ES3","w05_ES3","Ts05_ES3","T2_ES3","ux2_ES3","Uy2_ES3","Uz2_ES3","Ts2_ES3","CO2_ES3","H2O_ES3","CO2abs_ES3","H2Oabs_ES3","P7500_ES3","T7500_ES3","Cool7500_ES3","Diag7500_ES3","PTemp"

# NOTE: this only extracts the EC data, might do temperature data later on

def rename_es3(col_str, file_id):
    
    var_rename_hgt = {'u' : 'ux',
                      'v' : 'uy',
                      'w' : 'uz',
                      'ux' : 'ux',
                      'uy' : 'uy',
                      'uz' : 'uz',
                      'ts' : 't_sonic'
                     }

    hgt_rename = {'05':'0.5',
                  '2':'2',
                  '5':'5',
                  '10':'10',
                  '20':'20'
                 }

    var_rename_nohgt = {'rho_w' : 'kh2o_h2o_2',
                        'kh_mV' : 'kh2o_mV_2',
                        'CO2_ES3' : 'li_co2_2',
                        'H2O_ES3' : 'li_h2o_2',
                        'CO2abs_ES3' : 'li_co2_abs_2',
                        'H2Oabs_ES3' : 'li_h2o_abs_2',
                        'P7500_ES3' : 'li_pres_2',
                        'T7500_ES3' : 'li_t_2',
                        'Cool7500_ES3' : 'li_cool_2',
                        'Diag7500_ES3' : 'li_diag_2'
                       }


    # for sonic variables
    try:
        [vname, hgt] = re.findall(r"[^\W\d_]+|\d+", col_str.rpartition('_')[0].lower())
        
        new_vname = var_rename_hgt[vname]
        new_hgt = hgt_rename[hgt]

        return f"{new_vname}_{new_hgt}"

    except Exception as e:

        # for kh2o and li7500 variables
        try:
            return var_rename_nohgt[col_str]
        
        # any others
        except:
            return f"{col_str.lower()}_{file_id}"


# Force lowercase for Ts/ts comparison


# UND_ES3_2590_20Hz_FluxTower_20120924184437_20120925162426.txt

station_pairs = {2012: [2712, 2667],
                 2013: [2712, 2667]
                 }

station = 'ES3'

# Create output directories
os.makedirs(os.path.join(out_path, station, "raw_20hz", "parq"), exist_ok=True)
os.makedirs(os.path.join(out_path, station, "raw_20hz", "csv"), exist_ok=True)

# Follow same processing steps as ES2 (because again, the file lists are not the same length)
for yr in station_pairs.keys():

    yr_list = []

    for fid in station_pairs[yr]:

        temp_df_list = []

        # Get file list for certain file id and year
        file_list = glob.glob(os.path.join(in_path, station, 'raw_20hz', f"UND_{station}_{fid}*_{yr}*.txt"))

        for f in file_list:
            
            # Read in data from file, remove any duplicates, shift -1 for date grouping, and rename columns
            temp_df = pd.read_csv(f, skiprows=[0,2,3], header=0, na_values=["NAN"], parse_dates=[0], index_col=[0])
            temp_df = temp_df.loc[~temp_df.index.duplicated(keep='first')].shift(-1, freq='50ms').rename(columns=partial(rename_es3, file_id=fid))

            # Append to running list
            temp_df_list.append(temp_df)

        # Concat all dataframes with same year, check any duplicate indexes again, and append to year list
        temp_yr_concat = pd.concat(temp_df_list)
        temp_yr_concat = temp_yr_concat.loc[~temp_yr_concat.index.duplicated(keep='first')]
        yr_list.append(temp_yr_concat)

    # Concat all dataframes for the year, ort and pad any missing values with nans
    yr_df = pd.concat(yr_list, axis=1)
    yr_list = []
    yr_df = yr_df.sort_index().asfreq('50ms')

    # Get dates to loop through (shifting once just in case... although it's a very isolated edge case)
    out_dates = np.unique(yr_df.shift(1, freq='50ms').index.date)

    for date in out_dates:

        # Subset by days, shift back times, and rename columns
        daily_df = yr_df.loc[date.strftime('%Y-%m-%d')].shift(1, freq='50ms')

        # Write out to parquet and csv files
        file_date = date.strftime('%Y_%m_%d')
        daily_df.to_parquet(os.path.join(out_path, station, "raw_20hz", "parq", f"{station}_EC_20hz_{file_date}.parquet"), engine='pyarrow', index=True)
        daily_df.to_csv(os.path.join(out_path, station, "raw_20hz", "csv", f"{station}_EC_20hz_{file_date}.csv"), float_format='%g')

    yr_df = None

# Clear out variables
yr_df = None
yr_list = None
temp_df_list = None

In [None]:
# ES4 - UofU

# NOTE: tower fell down at one point
# 3063 - Fall 0.5, 2, 5, 10, 20, and 32 m
# 4004 - Spring 0.5, 2, 5, 10, 20, and 32 m (pt 1)
# 5252 - Spring 0.5, 2, 5, 10, 20, and 32 m (pt 2)

def rename_es4(col_str):
    
    var_rename = {'Ux' : 'ux',
                  'Uy' : 'uy',
                  'Uz' : 'uz',
                  'T_Sonic' : 't_sonic',
                  'diagnostic' : 'diag_sonic'
                  }


    hgt_rename = {'05':'0.5',
                  '2':'2',
                  '5':'5',
                  '10':'10',
                  '20':'20',
                  '32':'32'
                 }

    (vname, _, hgt) = col_str.rpartition('_')
    new_vname = var_rename[vname]
    new_hgt = int(float(hgt) * 100)

    return f"{new_vname}_{new_hgt}"

station = 'ES4'

# File numbers are sequential
file_list = sorted(glob.glob(os.path.join(in_path, station, 'raw_20hz', f"DPG-UoU_{station}_*_20Hz_*.txt")))

file_dict = {k:{'start_end_dates':find_time_lims(k)} for k in file_list}

date_list = []

# Make sure output directories are there
os.makedirs(os.path.join(out_path, station, "raw_20hz", "parq"), exist_ok=True)
os.makedirs(os.path.join(out_path, station, "raw_20hz", "csv"), exist_ok=True)

for i,f in enumerate(file_dict.keys()):

    # Read in file
    in_df = pd.read_csv(f, skiprows=[0,2,3], header=0, index_col=0, parse_dates=[0], na_values=["NAN"])
    
    # Drop duplicates and shift one
    in_df = in_df[~in_df.index.duplicated(keep='first')].shift(-1, freq='50ms')

    # Write initial running dataframe or concat to existing
    if i==0:
        running_df = in_df.copy()
    else:
        running_df = pd.concat([in_df, running_df])

    # Drop any dates already written out
    # https://stackoverflow.com/questions/37307796/fastest-way-to-eliminate-specific-dates-from-pandas-dataframe
    running_df = running_df.loc[~np.in1d(running_df.index.date, pd.to_datetime(date_list).date), :]

    # Concat to running dataframe
    out_dates = np.unique(running_df.index.date)

    for j,date in enumerate(out_dates):
        # Skip last date if not on last data file
        if (j == out_dates.shape[0]-1) & (i != len(file_dict.keys())):
            continue
        
        # Select date
        temp_df = running_df.loc[date.strftime('%Y-%m-%d')]

        # Shift back to correct datetimes, sort index
        # Also force to 20hz between first and last value
        temp_df = temp_df.shift(1, freq='50ms').sort_index().asfreq('50ms')

        # Rename columns
        temp_df = temp_df.rename(columns=rename_es4)
    
        # Write out to parquet file
        file_date = date.strftime('%Y_%m_%d')

        temp_df.to_parquet(os.path.join(out_path, station, "raw_20hz", "parq", f"{station}_EC_20hz_{file_date}.parquet"), engine='pyarrow', index=True)
        temp_df.to_csv(os.path.join(out_path, station, "raw_20hz", "csv", f"{station}_EC_20hz_{file_date}.csv"), float_format='%g')

        date_list.append(date)

In [4]:
# ES5 - UofU

# Follow same procedure as ES1, but with kh2o and FW 

def rename_es5(col_str):
    
    var_hgt_rename = {'Ux' : 'Ux',
                      'Uy' : 'Uy',
                      'Uz' : 'Uz',
                      'T_Sonic' : 't_sonic',
                      'diagnostic' : 'diag_sonic',
                      'FW' : 'fw'
                      }

    var_nohgt_rename = {'KH2O_H2O' : 'kh2o_h2o_2',
                        'KH2O_mV' : 'kh2o_mv_2'
                        }

    (vname, _, hgt) = col_str.rpartition('_')

    if vname in var_hgt_rename:

        new_vname = var_hgt_rename[vname]
        new_hgt = int(float(hgt) * 100)

        return f"{new_vname}_{new_hgt}"
    
    elif vname=='KH2O':

        return var_nohgt_rename[col_str]

    else:

        return col_str

station = 'ES5'

# First find files and start/end dates of each file
# NOTE: Make sure that 2013_05 is switched to *
file_list = sorted(glob.glob(os.path.join(in_path, station, 'raw_20hz', f"UoU_{station}_*.txt")))

file_dict = {k:{'start_end_dates':find_time_lims(k)} for k in file_list}

date_list = []

# Make sure output directories are there
os.makedirs(os.path.join(out_path, station, "raw_20hz", "parq"), exist_ok=True)
os.makedirs(os.path.join(out_path, station, "raw_20hz", "csv"), exist_ok=True)

for i,f in enumerate(file_dict.keys()):

    # Read in file
    in_df = pd.read_csv(f, skiprows=[0,2,3], header=0, index_col=0, parse_dates=[0], na_values=["NAN"])
    
    # Drop duplicates and shift one
    in_df = in_df[~in_df.index.duplicated(keep='first')].shift(-1, freq='50ms')

    # Write initial running dataframe or concat to existing
    if i==0:
        running_df = in_df.copy()
    else:
        running_df = pd.concat([in_df, running_df])

    # Drop any dates already written out
    # https://stackoverflow.com/questions/37307796/fastest-way-to-eliminate-specific-dates-from-pandas-dataframe
    running_df = running_df.loc[~np.in1d(running_df.index.date, pd.to_datetime(date_list).date), :]

    # Concat to running dataframe
    out_dates = np.unique(running_df.index.date)

    for j,date in enumerate(out_dates):
        # Skip last date if not on last data file
        if (j == out_dates.shape[0]-1) & (i != len(file_dict.keys())):
            continue
        
        # Select date
        temp_df = running_df.loc[date.strftime('%Y-%m-%d')]

        # Shift back to correct datetimes, sort index
        # Also force to 20hz between first and last value
        temp_df = temp_df.shift(1, freq='50ms').sort_index().asfreq('50ms')

        # Rename columns
        temp_df = temp_df.rename(columns=rename_es5)
    
        # Write out to parquet file
        file_date = date.strftime('%Y_%m_%d')

        temp_df.to_parquet(os.path.join(out_path, station, "raw_20hz", "parq", f"{station}_EC_20hz_{file_date}.parquet"), engine='pyarrow', index=True)
        temp_df.to_csv(os.path.join(out_path, station, "raw_20hz", "csv", f"{station}_EC_20hz_{file_date}.csv"), float_format='%g')

        date_list.append(date)
