# Intermediate Processing
- replace sentinel values with NaN
- downsample from half-hourly to hourly (if needed for each site)
- group overlapping dataframes into a single dataframe
- runtime: \< 30m

In [1]:
import os
import numpy as np
import pandas as pd

In [3]:
COLS_VA = ['TA_F', 'SW_IN_F', 'LW_IN_F', 'VPD_F', 'PA_F', 'P_F', 'WS_F', 'WD', 'RH', 'USTAR', 'NETRAD', 'PPFD_IN', 'PPFD_DIF', 'PPFD_OUT', 'SW_DIF', 'SW_OUT', 'LW_OUT',
        'CO2_F_MDS', 'G_F_MDS', 'LE_F_MDS', 'H_F_MDS', 'NEE_VUT_REF', 'RECO_NT_VUT_REF', 'RECO_DT_VUT_REF', 'GPP_NT_VUT_REF', 'GPP_DT_VUT_REF']
COLS_QC = [f'{c}_QC' for c in COLS_VA]
COLS_TS = ['TIMESTAMP_START']

BASE_DIR = 'data/raw'
ameriflux_dir = os.path.join(BASE_DIR, 'ameriflux', 'unzipped')
fluxnet_dir = os.path.join(BASE_DIR, 'fluxnet', 'unzipped')
icos_ww_dir = os.path.join(BASE_DIR, 'icos-ww', 'unzipped')
icos_2023_dir = os.path.join(BASE_DIR, 'icos-2023', 'unzipped')

INTERMEDIATE_DIR = 'data/intermediate/int_1'
OUTPUT_DIR = 'data/intermediate/int_2'

In [3]:
def process_site_dataframe(df, downsample=True):
    df = df.replace(-9999.0, np.nan)
    for column in COLS_VA + COLS_QC:
        if column not in df.columns:
            df[column] = np.nan
    
    df_ts = df[COLS_TS]
    df_va = df[COLS_VA]
    df_qc = df[COLS_QC]

    if downsample:
        # Average to hourly data
        grouping_key = np.arange(len(df_va)) // 2
        df_va = df_va.groupby(grouping_key).mean().reset_index(drop=True)
        df_ts = df_ts.iloc[::2,:].reset_index(drop=True)
        df_qc = df_qc.iloc[::2,:].reset_index(drop=True)
    
    # Double precipitation, as this should not be averaged
    df_va['P_F'] = df_va['P_F'] * 2.0
    df = pd.concat([df_ts, df_va, df_qc], axis=1)
    return df

In [4]:
# Get a list of (site, file, needs_downsampling)
data = []

ameriflux_sites = os.listdir(ameriflux_dir)
for site in ameriflux_sites:
    files = os.listdir(os.path.join(ameriflux_dir, site))
    fluxnet_compatible_files = [f for f in files if 'FLUXNET_SUBSET_HH' in f  and 'VARINFO' not in f]
    if len(fluxnet_compatible_files) == 1:
        data.append((site, os.path.join(ameriflux_dir, site, fluxnet_compatible_files[0]), 'ameriflux', True))
    else:
        fluxnet_compatible_files = [f for f in files if 'FLUXNET_SUBSET_HR' in f  and 'VARINFO' not in f]
        if len(fluxnet_compatible_files) != 1:
            print(f'No valid file found for {site}')
            continue
        data.append((site, os.path.join(ameriflux_dir, site, fluxnet_compatible_files[0]), 'ameriflux', False))

fluxnet_sites = os.listdir(fluxnet_dir)
for site in fluxnet_sites:
    file = os.listdir(os.path.join(fluxnet_dir, site))[0]
    data.append((site, os.path.join(fluxnet_dir, site, file), 'fluxnet', True))

icos_ww_sites = os.listdir(icos_ww_dir)
for site in icos_ww_sites:
    files = os.listdir(os.path.join(icos_ww_dir, site))
    fluxnet_compatible_files = [f for f in files if 'FLUXNET2015_FULLSET_HH' in f and 'VARINFO' not in f]
    if len(fluxnet_compatible_files) != 1:
        continue
    file = fluxnet_compatible_files[0]
    data.append((site, os.path.join(icos_ww_dir, site, file), 'icos-ww', True))

icos_2023_sites = os.listdir(icos_2023_dir)
for site in icos_2023_sites:
    files = os.listdir(os.path.join(icos_2023_dir, site))
    fluxnet_compatible_files = [f for f in files if 'FLUXNET_HH_L2' in f and 'VARINFO' not in f]
    if len(fluxnet_compatible_files) != 1:
        continue
    file = fluxnet_compatible_files[0]
    data.append((site, os.path.join(icos_2023_dir, site, file), 'icos-2023', True))

In [None]:
if not os.path.exists(INTERMEDIATE_DIR):
    os.makedirs(INTERMEDIATE_DIR)

for site, file, source, downsample in data:
    print(f'Processing {file}...')
    site_dir = os.path.join(INTERMEDIATE_DIR, site)
    if not os.path.exists(site_dir):
        os.makedirs(site_dir)
    
    site_df = pd.read_csv(file)
    processed_df = process_site_dataframe(site_df, downsample=downsample)
    min_time = processed_df['TIMESTAMP_START'].min()
    max_time = processed_df['TIMESTAMP_START'].max()
    outfile = os.path.join(site_dir, f'{min_time}_{max_time}_{source}.csv')
    processed_df.to_csv(outfile, index=False)

## Part 2: Combining observations from different datasets

In [6]:
PRIORITIES = {
    'ameriflux': 1,
    'icos-2023': 1,
    'icos-ww': 2,
    'fluxnet': 3
}

def add_hour(timestamp):
    year = timestamp[0:4]
    month = timestamp[4:6]
    day = timestamp[6:8]
    hour = timestamp[8:]
    if month == '12' and day == '31':
        return f'{str(int(year)+1)}01010000'
    else:
        return timestamp


def has_overlap(data_1, data_2):
    start_1 = str(data_1[1])
    end_1 = add_hour(str(data_1[2]))
    start_2 = str(data_2[1])
    end_2 = add_hour(str(data_2[2]))
    disjoint_before = end_1 < start_2
    disjoint_after = start_1 > end_2
    return not disjoint_before and not disjoint_after


def merge_sites(data_1, data_2):
    # implement
    df1 = data_1[0]
    df2 = data_2[0]
    merged_df = pd.merge(df1, df2, on='TIMESTAMP_START', how='outer', suffixes=('_df1', '_df2'))

    # If QC value is better in one than the other, set null.
    for col in COLS_VA:
        c1 = f'{col}_df1'
        c2 = f'{col}_df2'
        qc1 = f'{col}_QC_df1'
        qc2 = f'{col}_QC_df2'

        merged_df[qc1] = merged_df[qc1].fillna(5)
        merged_df[qc2] = merged_df[qc2].fillna(5)
        qcidx = merged_df[qc1]-merged_df[qc2]
        # at this point, qcidx <= 0 means keep df1 value. qcidx > 0 means keep df2 value.

        merged_df.loc[qcidx > 0.0, c1] = np.nan
        merged_df[col] = merged_df[c1].combine_first(merged_df[c2])
        merged_df[f'{col}_QC'] = merged_df[[qc1, qc2]].min(axis=1)
        merged_df.loc[merged_df[f'{col}_QC'] == 5, f'{col}_QC'] = np.nan
        merged_df = merged_df.drop(labels=[c1, c2, qc1, qc2], axis=1).sort_values(by='TIMESTAMP_START')
    return (merged_df, merged_df.index[0], merged_df.index[-1].max(), f'{data_1[-1]},{data_2[-1]}')


def merge_site_data(site_data):
    all_merged = False
    while not all_merged:
        merged_sites = []
        all_merged = True
        i = 0
        while i < len(site_data):
            if i == len(site_data)-1:
                merged_sites.append(site_data[i])
                i += 1
            elif has_overlap(site_data[i], site_data[i+1]):
                all_merged = False
                new_data = merge_sites(site_data[i], site_data[i+1])
                merged_sites.append(new_data)
                i += 2
            else:
                merged_sites.append(site_data[i])
                i += 1

        site_data = merged_sites
    return site_data


def process_unmerged_site_data(site, in_dir, out_dir):
    print(f'Processing {site}...')
    site_out_dir = os.path.join(out_dir, site)
    if not os.path.exists(site_out_dir):
        os.makedirs(site_out_dir)
    
    site_in_dir = os.path.join(in_dir, site)
    files = os.listdir(site_in_dir)
    site_data = []
    for file in files:
        df = pd.read_csv(os.path.join(site_in_dir, file))
        df = df.set_index('TIMESTAMP_START')
        filename = file.split('.')[0]
        start, end, source = filename.split('_')
        start = int(start)
        end = int(end)
        site_data.append((df, start, end, source))
    site_data = sorted(site_data, key=lambda x: PRIORITIES[x[-1]])
    
    if len(site_data) > 1:
        site_data = merge_site_data(site_data)

    for site in site_data:
        if not os.path.exists(os.path.join(site_out_dir, f'{site[1]}_{site[2]}_{site[3]}')):
            os.makedirs(os.path.join(site_out_dir, f'{site[1]}_{site[2]}_{site[3]}'))
        site[0].to_csv(os.path.join(site_out_dir, f'{site[1]}_{site[2]}_{site[3]}', 'data.csv'))



In [8]:
sites = os.listdir(INTERMEDIATE_DIR)

for site in sites:
    process_unmerged_site_data(site, INTERMEDIATE_DIR, OUTPUT_DIR)

Processing US-HB3...
Processing CA-Gro...
Processing AR-TF1...
Processing DE-Akm...
Processing US-Mo1...
Processing US-NR1...
Processing BE-Vie...
Processing US-UM3...
Processing CG-Tch...
Processing IT-MBo...
Processing AU-Dry...
Processing US-MOz...
Processing US-NC3...
Processing CH-Aws...
Processing FR-FBn...
Processing US-CS1...
Processing AT-Neu...
Processing US-KS2...
Processing US-StJ...
Processing AR-Vir...
Processing SE-Ros...
Processing US-xDL...
Processing IT-Isp...
Processing CA-SF2...
Processing US-GBT...
Processing ZM-Mon...
Processing US-Jo1...
Processing US-CS3...
Processing CA-Ca2...
Processing US-Tw4...
Processing BE-Maa...
Processing DE-SfN...
Processing US-Rwf...
Processing US-UMB...
Processing US-Ro6...
Processing CH-Cha...
Processing CZ-KrP...
Processing DK-Sor...
Processing CH-Oe2...
Processing BR-Sa3...
Processing AU-Wac...
Processing CA-ARF...
Processing DE-Lnf...
Processing CA-Ca1...
Processing CN-Dan...
Processing DE-Lkb...
Processing CH-Dav...
Processing US

In [4]:
# Get new metadata

ameriflux_meta_file = os.path.join(BASE_DIR, 'ameriflux', 'site_data.csv')
fluxnet_meta_file = os.path.join(BASE_DIR, 'fluxnet', 'site_data.csv')
icos_ww_meta_file = os.path.join(BASE_DIR, 'icos-ww', 'site_data.csv')
icos_2023_meta_file = os.path.join(BASE_DIR, 'icos-2023', 'site_data.csv')

meta_dfs = [
    pd.read_csv(ameriflux_meta_file),
    pd.read_csv(fluxnet_meta_file),
    pd.read_csv(icos_ww_meta_file),
    pd.read_csv(icos_2023_meta_file),
]

def convert_time(timestep_string):
    return f'{timestep_string[0:4]}_{timestep_string[4:6]}_{timestep_string[6:8]}'

meta_df = pd.concat(meta_dfs, axis=0)[['SITE_ID', 'LOCATION_LAT', 'LOCATION_LON', 'LOCATION_ELEV', 'IGBP']].drop_duplicates('SITE_ID')
sites = os.listdir(OUTPUT_DIR)
meta_df = meta_df[meta_df['SITE_ID'].isin(sites)]
meta_df['TIME_INFO'] = pd.NA
for site in sites:
    subsets = os.listdir(os.path.join(OUTPUT_DIR, site))
    time_info = {s.split('_')[2]: [convert_time(s.split('_')[0]), convert_time(s.split('_')[1])] for s in subsets}
    filtered_rows = meta_df['SITE_ID'] == site
    meta_df.loc[filtered_rows, 'TIME_INFO'] = [time_info] * sum(filtered_rows)

meta_df.to_csv('processed_site_meta.csv', index=False)