In [1]:
import numpy as np
import pandas as pd
import os, fnmatch
import shutil
import zipfile
import glob
from sklearn.preprocessing import MinMaxScaler
import matplotlib
matplotlib.use('Agg')  # non-GUI backend for script-only plotting
import matplotlib.pyplot as plt
from scipy.signal import savgol_filter
import ruptures as rpt
from scipy.signal import argrelextrema
from scipy.interpolate import splrep, PPoly
from datetime import datetime, timedelta
from concurrent.futures import ProcessPoolExecutor, as_completed
from tqdm import tqdm
import xarray as xr
import warnings

warnings.filterwarnings("ignore")

In [2]:
### setting
dir_flux = "xxx"
fn_fluxlist = "fluxsite_info_metalist.xlsx"
dir_csv_out = "xxx"
dir_savefig = "xxx"

window_size = 13
poly_deg1 = 2
threshold_30 = 0.5 # 0.3 
penalty = 0.5

# columns in FLUXNET, AmeriFlux, ICOS
pick_cols = ["TIMESTAMP_START", "TIMESTAMP_END", "P_F", "P_F_QC", "SW_IN_F", "SW_IN_F_QC",
             "GPP_DT_VUT_REF", "NEE_VUT_REF_QC", "TA_F", "TA_F_QC", "USTAR"]
dic_QC = {"P_F":"P_F_QC", "TA_F":"TA_F_QC", "SW_IN_F":"SW_IN_F_QC", 
          "GPP_DT_VUT_REF":"NEE_VUT_REF_QC"}
dic_QC_flux = {"GPP_DT_VUT_REF":"NEE_VUT_REF_QC"}
var_to_QC = ["P_F", "TA_F", "SW_IN_F", "GPP_DT_VUT_REF"]

# columns in Ozflux
pick_cols_Ozflux = ["TIMESTAMP_START", "TIMESTAMP_END", "P_F", "P_F_QC", "SW_IN_F", "SW_IN_F_QC", 
                    "TA_F", "TA_F_QC", "GPP", "GPP_QC", "USTAR"]
dic_QC_Ozflux = {"P_F":"P_F_QC", "TA_F":"TA_F_QC", "SW_IN_F":"SW_IN_F_QC",
                 "USTAR":"USTAR_QC", "GPP":"GPP_QC"}
dic_QC_Ozflux_flux = {"GPP":"GPP_QC"}
var_to_QC_Ozflux = ["P_F", "TA_F", "SW_IN_F", "USTAR", "GPP"]

# columns in LBA-ECO
pick_cols_LBAECO = ["TIMESTAMP_END", "USTAR", "GPP"]


In [3]:
### filter flux site
df_sitelist = pd.read_excel(dir_flux+fn_fluxlist)
forest_site_mask = (((df_sitelist["IGBP"]=="EBF") | (df_sitelist["IGBP"]=="ENF") | 
                     (df_sitelist["IGBP"]=="DBF") | (df_sitelist["IGBP"]=="DNF") | 
                     (df_sitelist["IGBP"]=="MF")) & 
                    (df_sitelist["year_end"]-df_sitelist["year_start"]+1 >= 3))
df_forest_sitelist = df_sitelist[forest_site_mask].reindex()
total_site = len(df_forest_sitelist["site_ID"])
print("total site = ", total_site)
print(df_forest_sitelist.columns)

total site =  177
Index(['site_ID', 'SITE_NAME', 'LOCATION_LAT', 'LOCATION_LONG',
       'LOCATION_ELEV', 'IGBP', 'year_start', 'year_end', 'measure_h',
       'measure_h_year', 'canopy_h', 'canopy_h_year', 'source', 'Unnamed: 13'],
      dtype='object')


In [4]:
def smooth_savgol(dataframe, column_name):
    x = dataframe[column_name].values
    x_smooth = savgol_filter(x, window_size*2, poly_deg1)
    x_diff = x/x_smooth
    x_diff_cri1 = np.nanmean(x_diff)+np.nanstd(x_diff)
    x_diff_cri2 = np.nanmean(x_diff)-np.nanstd(x_diff)
    x_out = np.where(((x_diff>x_diff_cri1) | (x_diff<x_diff_cri2)), x_smooth, x)
    dataframe[column_name] = x_out
    return dataframe
    


In [5]:
def find_SOS_EOS(dataframe, column_name):
    try:
        SOS_EOS_DOY = np.empty([4])
        SOS_EOS_DOY[:] = np.nan # SOS1, EOS1, SOS2, EOS2
        SOS_EOS_intersect = np.empty([4])
        SOS_EOS_intersect[:] = np.nan # SOS1, EOS1, SOS2, EOS2
        LOS = np.empty([1])
        LOS[:] = np.nan
        
        # detect change points
        x1 = dataframe[column_name].values
        algo = rpt.Pelt(model="l2", min_size=window_size)
        algo.fit(x1)
        x1_result = algo.predict(pen=penalty)
        
        # take avg between change points
        x1_avg = np.empty(np.shape(x1_result))
        x1_avg[:] = np.nan
        x1_bkpt = np.append([0],x1_result)
        for it in range(len(x1_avg)):
            x1_avg[it] = np.nanmean(x1[x1_bkpt[it]:x1_bkpt[it+1]+1])
        x1_lmax = argrelextrema(x1_avg, np.greater) # get the position of local max
        peak_count = np.nansum(np.where((np.array(x1_result)[x1_lmax[0].astype(int)]>=365) & (np.array(x1_result)[x1_lmax[0].astype(int)]<365*2-1), 1, 0))
        x1_lmin = argrelextrema(x1_avg, np.less) # get the position of local min
        trough_count = np.nansum(np.where((np.array(x1_result)[x1_lmin[0].astype(int)]>=365) & (np.array(x1_result)[x1_lmin[0].astype(int)]<365*2-1), 1, 0))
    
        if ((peak_count==1) & (trough_count==1)):
            # find local min/max values and SOS/EOS between local min/max
            x1_lmax_valid = np.array(x1_lmax[0])[((np.array(x1_result)[x1_lmax[0].astype(int)]>=365) & (np.array(x1_result)[x1_lmax[0].astype(int)]<365*2-1))]
            x1_lmax_value1 = np.nanmax(x1[x1_result[x1_lmax_valid[0]-1]:x1_result[x1_lmax_valid[0]]])
            x1_lmax_pos1 = np.nanargmax(x1[x1_result[x1_lmax_valid[0]-1]:x1_result[x1_lmax_valid[0]]])+x1_result[x1_lmax_valid[0]-1]
            
            x1_lmin_valid = np.array(x1_lmin[0])[((np.array(x1_result)[x1_lmin[0].astype(int)]>=365) & (np.array(x1_result)[x1_lmin[0].astype(int)]<365*2-1))]
            x1_lmin_value1 = np.nanmin(x1[x1_result[x1_lmin_valid[0]-1]:x1_result[x1_lmin_valid[0]]])
            x1_lmin_pos1 = np.nanargmin(x1[x1_result[x1_lmin_valid[0]-1]:x1_result[x1_lmin_valid[0]]])+x1_result[x1_lmin_valid[0]-1]
    
            # determine the amplitude and the intersection for SOS and EOS
            if (x1_lmin_valid[0]<x1_lmax_valid[0]):
                SOS_intercect1 = x1_lmin_value1+abs(x1_lmax_value1-x1_lmin_value1)*threshold_30
                EOS_intercect1 = x1_lmax_value1-abs(x1_lmax_value1-x1_lmin_value1)*(1-threshold_30)
                SOS_EOS_intersect[0] = SOS_intercect1
                SOS_EOS_intersect[1] = EOS_intercect1
                SOS_EOS_intersect[2] = np.nan
                SOS_EOS_intersect[3] = np.nan
            
                # find the root for SOS and EOS (should lie between min and max index)
                tck_x = dataframe.index.values[x1_lmin_pos1:x1_lmax_pos1+1]+1
                tck_y = x1[x1_lmin_pos1:x1_lmax_pos1+1]
                tck1 = splrep(tck_x, tck_y-SOS_intercect1, s=0)
                ppoly1 = PPoly.from_spline(tck1)
                ppoly_r1 = ppoly1.roots(extrapolate=False)
                if (len(ppoly_r1)>1):
                    SOS_EOS_DOY[0] = ppoly_r1[-1]
                else:
                    SOS_EOS_DOY[0] = ppoly_r1
                
                tck_x = dataframe.index.values[x1_lmax_pos1:x1_lmin_pos1+365+1]+1
                tck_y = x1[x1_lmax_pos1:x1_lmin_pos1+365+1]
                tck2 = splrep(tck_x, tck_y-EOS_intercect1, s=0)
                ppoly2 = PPoly.from_spline(tck2)
                ppoly_r2 = ppoly2.roots(extrapolate=False)
                if (len(ppoly_r2)>1):
                    SOS_EOS_DOY[1] = ppoly_r2[0]
                else:
                    SOS_EOS_DOY[1] = ppoly_r2
                
                SOS_EOS_DOY[2] = np.nan
                SOS_EOS_DOY[3] = np.nan
                # LOS[0] = SOS_EOS_DOY[1]-SOS_EOS_DOY[0]
                
            else:
                EOS_intercect1 = x1_lmax_value1-abs(x1_lmax_value1-x1_lmin_value1)*(1-threshold_30)
                SOS_intercect1 = x1_lmin_value1+abs(x1_lmax_value1-x1_lmin_value1)*threshold_30
                SOS_EOS_intersect[0] = SOS_intercect1
                SOS_EOS_intersect[1] = EOS_intercect1
                SOS_EOS_intersect[2] = np.nan
                SOS_EOS_intersect[3] = np.nan
            
                # find the root for SOS and EOS (should lie between min and max index)
                tck_x = dataframe.index.values[x1_lmax_pos1:x1_lmin_pos1+1]+1
                tck_y = x1[x1_lmax_pos1:x1_lmin_pos1+1]
                tck1 = splrep(tck_x, tck_y-EOS_intercect1, s=0)
                ppoly1 = PPoly.from_spline(tck1)
                ppoly_r1 = ppoly1.roots(extrapolate=False)
                if (len(ppoly_r1)>1):
                    SOS_EOS_DOY[1] = ppoly_r1[0]
                else:
                    SOS_EOS_DOY[1] = ppoly_r1
            
                tck_x = dataframe.index.values[x1_lmin_pos1:x1_lmax_pos1+365+1]+1
                tck_y = x1[x1_lmin_pos1:x1_lmax_pos1+365+1]
                tck2 = splrep(tck_x, tck_y-SOS_intercect1, s=0)
                ppoly2 = PPoly.from_spline(tck2)
                ppoly_r2 = ppoly2.roots(extrapolate=False)
                if (len(ppoly_r2)>1):
                    SOS_EOS_DOY[0] = ppoly_r2[-1]
                else:
                    SOS_EOS_DOY[0] = ppoly_r2
    
                SOS_EOS_DOY[2] = np.nan
                SOS_EOS_DOY[3] = np.nan
                # LOS[0] = (SOS_EOS_DOY[1]-(SOS_EOS_DOY[0]-365))

        elif ((peak_count==2) & (trough_count==2)):
            # find local min/max values and SOS/EOS between local min/max
            x1_lmax_valid = np.array(x1_lmax[0])[((np.array(x1_result)[x1_lmax[0].astype(int)]>=365) & (np.array(x1_result)[x1_lmax[0].astype(int)]<365*2-1))]
            x1_lmax_value1 = np.nanmax(x1[x1_result[x1_lmax_valid[0]-1]:x1_result[x1_lmax_valid[0]]])
            x1_lmax_pos1 = np.nanargmax(x1[x1_result[x1_lmax_valid[0]-1]:x1_result[x1_lmax_valid[0]]])+x1_result[x1_lmax_valid[0]-1]
            x1_lmax_value2 = np.nanmax(x1[x1_result[x1_lmax_valid[1]-1]:x1_result[x1_lmax_valid[1]]])
            x1_lmax_pos2 = np.nanargmax(x1[x1_result[x1_lmax_valid[1]-1]:x1_result[x1_lmax_valid[1]]])+x1_result[x1_lmax_valid[1]-1]
            
            x1_lmin_valid = np.array(x1_lmin[0])[((np.array(x1_result)[x1_lmin[0].astype(int)]>=365) & (np.array(x1_result)[x1_lmin[0].astype(int)]<365*2-1))]
            x1_lmin_value1 = np.nanmin(x1[x1_result[x1_lmin_valid[0]-1]:x1_result[x1_lmin_valid[0]]])
            x1_lmin_pos1 = np.nanargmin(x1[x1_result[x1_lmin_valid[0]-1]:x1_result[x1_lmin_valid[0]]])+x1_result[x1_lmin_valid[0]-1]
            x1_lmin_value2 = np.nanmin(x1[x1_result[x1_lmin_valid[1]-1]:x1_result[x1_lmin_valid[1]]])
            x1_lmin_pos2 = np.nanargmin(x1[x1_result[x1_lmin_valid[1]-1]:x1_result[x1_lmin_valid[1]]])+x1_result[x1_lmin_valid[1]-1]
            
            # determine the amplitude and the intersection for SOS and EOS
            if (x1_lmin_valid[0]<x1_lmax_valid[0]):
                SOS_intercect1 = x1_lmin_value1+abs(x1_lmax_value1-x1_lmin_value1)*threshold_30
                EOS_intercect1 = x1_lmax_value1-abs(x1_lmax_value1-x1_lmin_value2)*(1-threshold_30)
                SOS_intercect2 = x1_lmin_value2+abs(x1_lmax_value2-x1_lmin_value2)*threshold_30
                EOS_intercect2 = x1_lmax_value2-abs(x1_lmax_value2-x1_lmin_value1)*(1-threshold_30)
                SOS_EOS_intersect[0] = SOS_intercect1
                SOS_EOS_intersect[1] = EOS_intercect1
                SOS_EOS_intersect[2] = SOS_intercect2
                SOS_EOS_intersect[3] = EOS_intercect2
                
                # find the root for SOS and EOS (should lie between min and max index)
                tck_x = dataframe.index.values[x1_lmin_pos1:x1_lmax_pos1+1]+1
                tck_y = x1[x1_lmin_pos1:x1_lmax_pos1+1]
                tck1 = splrep(tck_x, tck_y-SOS_intercect1, s=0)
                ppoly1 = PPoly.from_spline(tck1)
                ppoly_r1 = ppoly1.roots(extrapolate=False)
                if (len(ppoly_r1)>1):
                    SOS_EOS_DOY[0] = ppoly_r1[0]
                else:
                    SOS_EOS_DOY[0] = ppoly_r1
                
                tck_x = dataframe.index.values[x1_lmax_pos1:x1_lmin_pos2+1]+1
                tck_y = x1[x1_lmax_pos1:x1_lmin_pos2+1]
                tck2 = splrep(tck_x, tck_y-EOS_intercect1, s=0)
                ppoly2 = PPoly.from_spline(tck2)
                ppoly_r2 = ppoly2.roots(extrapolate=False)
                if (len(ppoly_r2)>1):
                    SOS_EOS_DOY[1] = ppoly_r2[-1]
                else:
                    SOS_EOS_DOY[1] = ppoly_r2
    
                tck_x = dataframe.index.values[x1_lmin_pos2:x1_lmax_pos2+1]+1
                tck_y = x1[x1_lmin_pos2:x1_lmax_pos2+1]
                tck3 = splrep(tck_x, tck_y-SOS_intercect2, s=0)
                ppoly3 = PPoly.from_spline(tck3)
                ppoly_r3 = ppoly3.roots(extrapolate=False)
                if (len(ppoly_r3)>1):
                    SOS_EOS_DOY[2] = ppoly_r3[0]
                else:
                    SOS_EOS_DOY[2] = ppoly_r3
            
                tck_x = dataframe.index.values[x1_lmax_pos2:x1_lmin_pos1+365+1]+1
                tck_y = x1[x1_lmax_pos2:x1_lmin_pos1+365+1]
                tck4 = splrep(tck_x, tck_y-EOS_intercect2, s=0)
                ppoly4 = PPoly.from_spline(tck4)
                ppoly_r4 = ppoly4.roots(extrapolate=False)
                if (len(ppoly_r4)>1):
                    SOS_EOS_DOY[3] = ppoly_r4[-1]
                else:
                    SOS_EOS_DOY[3] = ppoly_r4
                # LOS[0] = (SOS_EOS_DOY[1]-SOS_EOS_DOY[0])+(SOS_EOS_DOY[3]-SOS_EOS_DOY[2])
                
            else:
                EOS_intercect1 = x1_lmax_value1-abs(x1_lmax_value1-x1_lmin_value1)*(1-threshold_30)
                SOS_intercect1 = x1_lmin_value1+abs(x1_lmax_value2-x1_lmin_value1)*threshold_30
                EOS_intercect2 = x1_lmax_value2-abs(x1_lmax_value2-x1_lmin_value2)*(1-threshold_30)
                SOS_intercect2 = x1_lmin_value2+abs(x1_lmax_value1-x1_lmin_value2)*threshold_30
                SOS_EOS_intersect[0] = SOS_intercect2
                SOS_EOS_intersect[1] = EOS_intercect1
                SOS_EOS_intersect[2] = SOS_intercect1
                SOS_EOS_intersect[3] = EOS_intercect2
    
                # find the root for SOS and EOS (should lie between min and max index)
                tck_x = dataframe.index.values[x1_lmax_pos1:x1_lmin_pos1+1]+1
                tck_y = x1[x1_lmax_pos1:x1_lmin_pos1+1]
                tck1 = splrep(tck_x, tck_y-EOS_intercect1, s=0)
                ppoly1 = PPoly.from_spline(tck1)
                ppoly_r1 = ppoly1.roots(extrapolate=False)
                if (len(ppoly_r1)>1):
                    SOS_EOS_DOY[1] = ppoly_r1[-1]
                else:
                    SOS_EOS_DOY[1] = ppoly_r1
            
                tck_x = dataframe.index.values[x1_lmin_pos1:x1_lmax_pos2+1]+1
                tck_y = x1[x1_lmin_pos1:x1_lmax_pos2+1]
                tck2 = splrep(tck_x, tck_y-SOS_intercect1, s=0)
                ppoly2 = PPoly.from_spline(tck2)
                ppoly_r2 = ppoly2.roots(extrapolate=False)
                if (len(ppoly_r2)>1):
                    SOS_EOS_DOY[2] = ppoly_r2[0]
                else:
                    SOS_EOS_DOY[2] = ppoly_r2
            
                tck_x = dataframe.index.values[x1_lmax_pos2:x1_lmin_pos2+1]+1
                tck_y = x1[x1_lmax_pos2:x1_lmin_pos2+1]
                tck3 = splrep(tck_x, tck_y-EOS_intercect2, s=0)
                ppoly3 = PPoly.from_spline(tck3)
                ppoly_r3 = ppoly3.roots(extrapolate=False)
                if (len(ppoly_r3)>1):
                    SOS_EOS_DOY[3] = ppoly_r3[-1]
                else:
                    SOS_EOS_DOY[3] = ppoly_r3
            
                tck_x = dataframe.index.values[x1_lmin_pos2:x1_lmax_pos1+365+1]+1
                tck_y = x1[x1_lmin_pos2:x1_lmax_pos1+365+1]
                tck4 = splrep(tck_x, tck_y-SOS_intercect2, s=0)
                ppoly4 = PPoly.from_spline(tck4)
                ppoly_r4 = ppoly4.roots(extrapolate=False)
                if (len(ppoly_r4)>1):
                    SOS_EOS_DOY[0] = ppoly_r4[0]
                else:
                    SOS_EOS_DOY[0] = ppoly_r4
                # LOS[0] = (SOS_EOS_DOY[1]-(SOS_EOS_DOY[0]-365))+(SOS_EOS_DOY[3]-SOS_EOS_DOY[2])
        else:
            pass
    
        # return SOS_EOS_DOY, SOS_EOS_intersect, x1_result, LOS
        return SOS_EOS_DOY, SOS_EOS_intersect, x1_result
    
    except Exception as e:
        # Handle the exception
        print(f"An error occurred: {e}")
        

In [11]:
def remove_outlier(df, var):
    df_no_outlier = df.copy()
    df_no_outlier = df_no_outlier.reset_index()
    outlier_mask = np.full(len(df_no_outlier), False)

    df_no_outlier['hour_start'] = df_no_outlier['TIMESTAMP_START'].dt.hour
    
    for idx, row in df_no_outlier.iterrows():
        center_time = row['TIMESTAMP_END']
        window_start = center_time - pd.Timedelta(days=7)
        window_end = center_time + pd.Timedelta(days=7)

        hour_diff = (df_no_outlier['hour_start'] - row['hour_start']) % 24
        same_time_mask = (
            (df_no_outlier['TIMESTAMP_END'] >= window_start) &
            (df_no_outlier['TIMESTAMP_END'] <= window_end) &
            (hour_diff <= 2) &
            (~df_no_outlier[var].isna())
        )

        window_values = df_no_outlier.loc[same_time_mask, var]

            
        if len(window_values) < 5:  # Not enough data to judge
            continue

        if not window_values.empty:
            mean_val = window_values.mean()
            std_val = window_values.std()

            # allow noiser fluxes if the temperature is low (set 5-day mean air temperature lower than 5 degree C as criteria)
            # when 5-day mean air temperature is low, use mean+-5*std to filter the outlier
            # remove precipitation data when the temperature is low (found this problem because wrong and extreme high annual precipitation in US-NGC and US-xWD)
            temp_mask = ((df_no_outlier['TIMESTAMP_END'] >= row['TIMESTAMP_END'] - pd.Timedelta(days=2)) & 
                        (df_no_outlier['TIMESTAMP_END'] <= row['TIMESTAMP_END'] + pd.Timedelta(days=2)))
            if 'TA_low' in df_no_outlier.columns:
                TA_5day_mean = df_no_outlier.loc[temp_mask, 'TA_low'].mean()
            else:
                TA_5day_mean = 10 # Assume above freezing if no TA_freezing data

            if (TA_5day_mean < 5) :
                upper_bound = mean_val + 5*std_val
                lower_bound = mean_val - 5*std_val
                low_TA = 1 # True
            else:
                upper_bound = mean_val + 3*std_val
                lower_bound = mean_val - 3*std_val
                low_TA = 0 # False

        if var in ['GPP_DT_VUT_REF', 'GPP']:
            if ((row[var] > upper_bound) | (row[var] < lower_bound)):
                outlier_mask[idx] = True
        else:
            if (low_TA == 1):
                outlier_mask[idx] = True

    df_no_outlier.loc[outlier_mask, var] = np.nan
    # df_no_outlier.drop(columns=['hour_start'], inplace=True)
    # .drop(columns=['hour_start', 'TIMESTAMP_START'], inplace=True)

    return df_no_outlier

    

In [16]:
def process_and_plot(args):

    site_ID, site_name, site_year_start, site_year_end, site_IGBP, site_src = args
    
    print(f"in process_and_plot {site_ID} and source from {site_src}")
    
    if (site_src == 'ICOS'):
        # site setting
        zip_dir = '/burg/glab/users/rg3390/data/FLUXNET2015/ICOS/warm_winter_2020_release_2022/'
        pwd_dir = os.listdir('/burg/glab/users/rg3390/data/FLUXNET2015/ICOS/warm_winter_2020_release_2022/.')
        pattern = 'FLX_'+site_ID+'_*_FULLSET_*.zip'
        unzip_dir = '/burg/glab/users/rg3390/data/FLUXNET2015/ICOS/warm_winter_2020_release_2022/tmp_unzip/'
        pos_sitestr_start = 5-1
        pos_sitestr_end = 10
        pos_yearstart_start = 32-1
        pos_yearstart_end = 35
        pos_yearend_start = 37-1
        pos_yearend_end = 40

    elif (site_src == 'AmeriFlux'):
        zip_dir = '/burg/glab/users/rg3390/data/FLUXNET2015/AmeriFlux/'
        pwd_dir = os.listdir('/burg/glab/users/rg3390/data/FLUXNET2015/AmeriFlux/.')
        pattern = 'AMF_'+site_ID+'_*_FULLSET_*.zip'
        unzip_dir = '/burg/glab/users/rg3390/data/FLUXNET2015/AmeriFlux/tmp_unzip/'
        pos_sitestr_start = 5-1
        pos_sitestr_end = 10
        pos_yearstart_start = 28-1
        pos_yearstart_end = 31
        pos_yearend_start = 33-1
        pos_yearend_end = 36
    
    elif (site_src == 'FLUXNET'):
        zip_dir = '/burg/glab/users/rg3390/data/FLUXNET2015/FLUXNET2015_zipped/'
        pwd_dir = os.listdir('/burg/glab/users/rg3390/data/FLUXNET2015/FLUXNET2015_zipped/.')
        pattern = 'FLX_'+site_ID+'_*_FULLSET_*.zip'
        unzip_dir = '/burg/glab/users/rg3390/data/FLUXNET2015/FLUXNET2015_zipped/tmp_unzip/'
        pos_sitestr_start = 5-1
        pos_sitestr_end = 10
        pos_yearstart_start = 32-1
        pos_yearstart_end = 35
        pos_yearend_start = 37-1
        pos_yearend_end = 40

    elif (site_src == 'Ozflux'): # Ozflux
        nc_dir = '/burg/glab/users/rg3390/data/FLUXNET2015/OzFlux/'
        pwd_dir = os.listdir('/burg/glab/users/rg3390/data/FLUXNET2015/OzFlux/.')
        pattern = site_name+'_L6.nc'

    elif (site_src == 'LBA-ECO'):
        xlsx_dir = '/burg/glab/users/rg3390/data/FLUXNET2015/LBA-ECO/'
        pattern = site_ID+'_CfluxBF.xlsx'

    else:
        pass

    print(f"choosing directory for {site_ID} and source from {site_src}")

    # processing flux data
    if ((site_src == 'ICOS') | (site_src == 'AmeriFlux') | (site_src == 'FLUXNET')):
        for entry in pwd_dir:
            out_columns = []
            if fnmatch.fnmatch(entry, pattern):
                a1 = entry[pos_sitestr_start:pos_sitestr_end]
                a2 = str(int(entry[pos_yearstart_start:pos_yearstart_end]))
                a3 = str(int(entry[pos_yearend_start:pos_yearend_end]))

                # unzip the zip files
                with zipfile.ZipFile(zip_dir+entry,"r") as zip_ref:
                    zip_ref.extractall(unzip_dir)

                # Import hourly or half-hourly fluxes data
                len_HH = len(glob.glob(unzip_dir + f"*{site_ID}*FULLSET_HH*.csv"))
                print(glob.glob(unzip_dir + f"*{site_ID}*FULLSET_HH*.csv"))
                if  (len_HH == 1):
                    flux_df = pd.read_csv(glob.glob(unzip_dir + f"*{site_ID}*FULLSET_HH*.csv")[0], usecols=lambda x: x in pick_cols) # Use half hourly data
                    day_timestep = 48 # 48 timestep in a day
                    rain_QC_timestep = 24 # rain within 12 hours
                else:
                    flux_df = pd.read_csv(glob.glob(unzip_dir + f"*{site_ID}*FULLSET_HR*.csv")[0], usecols=lambda x: x in pick_cols) # Use hourly data
                    day_timestep = 24 # 24 timestep in a day
                    rain_QC_timestep = 12 # rain within 12 hours
                    
                flux_df["TIMESTAMP_START"] = pd.to_datetime(flux_df["TIMESTAMP_START"], format="%Y%m%d%H%M")
                flux_df["TIMESTAMP_END"] = pd.to_datetime(flux_df["TIMESTAMP_END"], format="%Y%m%d%H%M")
                flux_df["time_year"] = flux_df["TIMESTAMP_START"].dt.year
                print("site = ", a1, ", HH = ", len_HH) # HH=1 is half-hourly data

                # ## convert unit
                # flux_df["PA_F"] = flux_df["PA_F"]*10 # kPa -> hPa
                # # VPD unit already hPa

                # create a dataframe to save good data
                site_data = pd.DataFrame({'TIMESTAMP_START': flux_df['TIMESTAMP_START'], 'TIMESTAMP_END': flux_df['TIMESTAMP_END']})

                ## Quality control 1
                flux_df = flux_df.replace(-9999,np.nan)
                for it, (var, qc) in enumerate(dic_QC.items()):
                    if qc in flux_df.columns:
                        # flux_df = flux_df[flux_df[qc].isin([0, 1])]
                        flux_df.loc[(flux_df[qc] >= 2), var] = np.nan
                print(f"complete {a1} QC 1 (from QC flags)")

                # remove rain in previous 12 hours
                flux_df.loc[len(flux_df)] = {col: np.nan for col in flux_df.columns} # add a nan row because df["var"][a:b] does not count in df["var"][b]
                for pt in range(rain_QC_timestep*1, flux_df.shape[0]-1):
                    if ((np.nansum(flux_df.loc[pt-rain_QC_timestep:pt+1, "P_F"])==0) | (np.isnan(np.nansum(flux_df.loc[pt-rain_QC_timestep:pt+1, "P_F"])))):
                        pass
                    else:
                        flux_df.loc[pt, "USTAR"] = np.nan
                        flux_df.loc[pt, "GPP_DT_VUT_REF"] = np.nan

                ## Quality control 2
                # keep the flux data when u* >= 0.2 and < upper bound (mean+2*std)
                # Get u* upper bound -> for Quality control 2
                USTAR_mean = flux_df["USTAR"].mean(axis=0)
                USTAR_std = flux_df["USTAR"].std(axis=0)
                USTAR_upper = USTAR_mean+2*USTAR_std
                flux_df.loc[((flux_df["USTAR"]<=0.2) | (flux_df["USTAR"]>USTAR_upper)), 'GPP_DT_VUT_REF'] = np.nan
                print(f"complete {a1} QC 2 (exclude too low and too high USTAR for fluxes)")

                ## Quality Control 3 - remove outliers
                site_data['time_year'] = flux_df["TIMESTAMP_START"].dt.year
                site_data['SW_IN_F'] = flux_df["SW_IN_F"]
                for it, (var, qc) in enumerate(dic_QC_flux.items()):
                    # pick variable column and keep good or no qc flags data
                    df_var = flux_df[['TIMESTAMP_START', 'TIMESTAMP_END', 'SW_IN_F', var]].copy()
                    # copy Tair data for filtering freezing conditions
                    df_var['TA_low'] = flux_df['TA_F']
                    df_var['TA_low_QC'] = flux_df['TA_F_QC']
                    if df_var['TA_low'].notna().any():
                        df_var['TA_low'] = df_var['TA_low'].where(df_var['TA_low_QC'].isin([0,1]), np.nan)
                    else:
                        pass
                    df_var.drop('TA_low_QC', axis=1, inplace=True)
    
                    if var in ['GPP_DT_VUT_REF']:
                        print(f"start outlier removing {site_ID} - {var}")
                        df_var_no_outlier = remove_outlier(df_var, var)
                        df_var_no_outlier.drop(columns=['TA_low', 'hour_start'], inplace=True)
                        print(f"finish outlier removing {site_ID} - {var}")
                    else:
                        df_var_no_outlier = df_var.copy()
                        df_var_no_outlier.drop(columns=['TA_low'], inplace=True)
    
                    # merge good data into dataframe
                    site_data = site_data.merge(df_var_no_outlier[['TIMESTAMP_START', 'TIMESTAMP_END', var]], on=['TIMESTAMP_START', 'TIMESTAMP_END'], how='left')
                    del df_var, df_var_no_outlier
                print(f"complete {a1} QC 3 (remove outlier for fluxes)")

                ## select time period: daytime (SW>10 W/m2)
                flux_time_mask = ((site_data["time_year"] >= site_year_start) & 
                        (site_data["time_year"] <= site_year_end) & 
                        (site_data["SW_IN_F"]>=50))
                site_data = site_data[flux_time_mask]

                print(f"complete {a1} selecting site_data")

                ## daily mean
                site_data = site_data.set_index('TIMESTAMP_START')
                site_data_daily_tmp = site_data['GPP_DT_VUT_REF'].resample('D').mean()
                site_data_daily_tmp = site_data_daily_tmp.reset_index()
                site_data_daily_tmp = site_data_daily_tmp.rename(columns={"TIMESTAMP_START": "date"})
                site_data_daily = pd.DataFrame({'date': pd.date_range(start=f"{int(site_year_start)}-01-01", end=f"{int(site_year_end)}-12-31", freq='D')})
                site_data_daily = pd.merge(site_data_daily, site_data_daily_tmp, on='date', how='left')
                

                ## interpolation
                site_data_daily["GPP_DT_VUT_REF"] = site_data_daily["GPP_DT_VUT_REF"].interpolate(method='linear')
                site_data_daily["GPP_DT_VUT_REF"] = site_data_daily["GPP_DT_VUT_REF"].rolling(window_size, min_periods=5, center=True).apply(lambda x : np.nanmedian(x))
                site_data_daily.loc[np.isnan(site_data_daily["GPP_DT_VUT_REF"]), "GPP_DT_VUT_REF"] = 0
                print(f"complete {a1} interpolation and rolling mean")

                ### --------------- get SOS, EOS every year ---------------
                site_data_daily["DOY"] = site_data_daily["date"].dt.dayofyear
                ## exclude all Feb 29 rows
                site_data_daily = site_data_daily[~((site_data_daily['date'].dt.month == 2) & (site_data_daily['date'].dt.day == 29))]
            
                ## select year
                site_data_daily['Year'] = site_data_daily['date'].dt.year
            
                ## create dataframe for SOS and EOS for each site
                df_SOS_EOS = pd.DataFrame()
                
                ## smooth and scale data for each year
                site_years = site_data_daily.loc[len(site_data_daily)-1, 'Year'] - site_data_daily.loc[0, 'Year'] + 1
                site_year_start = int(site_data_daily.loc[0, 'Year'])
                site_year_end = int(site_data_daily.loc[len(site_data_daily)-1, 'Year'])
                count = 0
                fig, axs = plt.subplots(site_years, 1, figsize=(8,3*site_years))
                # for year, group in grouped_year:
                for yt in range(site_year_start, site_year_end+1, 1):
                    df_year_tmp = site_data_daily.copy()
                    df_year = df_year_tmp[df_year_tmp['Year'] == yt]
                    
                    ## smoothing grouped_year (use the middle concat result to avoid unsmoothed tail)
                    grouped_df_concat3 = pd.concat([df_year, df_year, df_year], axis=0)
                    grouped_df_concat3 = grouped_df_concat3.reset_index()
            
                    smoothed_df_concat3 = grouped_df_concat3.copy()
                    for it in range(50):
                        smoothed_df_concat3 = smooth_savgol(smoothed_df_concat3, "GPP_DT_VUT_REF")
                    print(f"complete {site_ID} smoothing year = {yt}")
                    
                    # ## plot: smoothed vs original
                    # axs[count].plot(grouped_df_concat3.index.values, grouped_df_concat3["GPP_DT_VUT_REF"])
                    # axs[count].plot(smoothed_df_concat3.index.values, smoothed_df_concat3["GPP_DT_VUT_REF"])
                    # axs[count].set_title(f"{year}")
                    # axs[count].set_xlim((365,365*2-1))

                    ## scale to [0,1]
                    smoothed_df = smoothed_df_concat3.copy()
                    columns_to_normalize = "GPP_DT_VUT_REF"
                    scaled_df = smoothed_df.copy()
                    scaler = MinMaxScaler()
                    scaled_df[columns_to_normalize] = scaler.fit_transform(smoothed_df[[columns_to_normalize]])
                    print(f"complete {site_ID} scaling year = {yt}")
                    print(scaled_df)

                    ## get SOS, EOS
                    SOS_EOS_DOY_GPP, SOS_EOS_intersect_GPP, x_result_GPP = find_SOS_EOS(scaled_df, "GPP_DT_VUT_REF")
                    print(f"get {site_ID} SOS, EOS in year = {yt}")

                    ## plot SOS, EOS
                    axs[count].plot(grouped_df_concat3.index.values, scaled_df["GPP_DT_VUT_REF"])
                    print(f"pass first line of plot {site_ID} in year = {yt}")
                    for it in range(len(x_result_GPP)):
                        axs[count].axvline(x=x_result_GPP[it], color='black', linestyle='--')
                    for it in range(len(SOS_EOS_DOY_GPP)):
                        if (~np.isnan(SOS_EOS_DOY_GPP[it])):
                            axs[count].plot(SOS_EOS_DOY_GPP[it], SOS_EOS_intersect_GPP[it], 'o')
                        else:
                            pass
                    print(f"pass second line of plot {site_ID} in year = {yt}")
                    axs[count].set_title(f"{yt}")
                    axs[count].set_xlim((365,365*2-1))
                    axs[count].set_xticks(np.arange(365,365*2,60), np.arange(0,365,60)+1)
                    axs[count].set_ylabel('GPP')
                    
                    ## fill in SOS, EOS
                    df_SOS_EOS.loc[count, "site_ID"] = site_ID
                    df_SOS_EOS.loc[count, "IGBP"] = site_IGBP
                    df_SOS_EOS.loc[count, "year"] = int(yt)
                    df_SOS_EOS.loc[count, "SOS_1_GPP"] = SOS_EOS_DOY_GPP[0]-365
                    df_SOS_EOS.loc[count, "EOS_1_GPP"] = SOS_EOS_DOY_GPP[1]-365
                    df_SOS_EOS.loc[count, "SOS_2_GPP"] = SOS_EOS_DOY_GPP[2]-365
                    df_SOS_EOS.loc[count, "EOS_2_GPP"] = SOS_EOS_DOY_GPP[3]-365
                    
                    count = count + 1

                    del grouped_df_concat3, smoothed_df_concat3, smoothed_df, scaled_df
                
                del site_data_daily, flux_df
                # print(f"get {a1} SOS and EOS")
                

                ## save figure
                filename = f"scaled_SOS_EOS_{site_ID}.png"
                # filename = f"smooth_SOS_EOS_{site_ID}.png"
                fig.savefig(dir_savefig+filename, dpi=400, bbox_inches='tight')
                print(f"Saved png: {filename}")
                # plt.close()
            
                # --- Save filtered site data ---
                fn_csv_out = f"{site_ID}_SOS_EOS.csv"
                df_SOS_EOS.fillna(-9999).to_csv(dir_csv_out + fn_csv_out, index=False)
                print(f"Saved csv: {fn_csv_out}")

                if len_HH == 1:
                    pattern = os.path.join(unzip_dir, f"*{site_ID}*.csv")
                else:
                    pattern = os.path.join(unzip_dir, f"*{site_ID}*.csv")
                
                for file_path in glob.glob(pattern):
                    try:
                        os.remove(file_path)
                        print(f"Deleted: {file_path}")
                    except FileNotFoundError:
                        print(f"File not found: {file_path}")
                    except Exception as e:
                        print(f"Error deleting {file_path}: {e}")

            else:
                pass

            

        # ### delete the unzip folder
        # shutil.rmtree(unzip_dir)
        
            

    elif (site_src == 'Ozflux'):
        try:
            ## create a dataframe in filtering negative sensible heat flux
            flux_df = pd.DataFrame(columns=pick_cols_Ozflux)
    
            ## read site data from netcdf files
            flux_ds = xr.open_dataset(nc_dir+pattern)
            print(f"-- read OzFlux data from {nc_dir+pattern}")
            flux_df["TIMESTAMP_END"] = flux_ds["time"]
            flux_df["P_F"] = np.squeeze(flux_ds["Precip"])
            flux_df["P_F_QC"] = np.squeeze(flux_ds["Precip_QCFlag"])
            # flux_df["LE_flux"] = np.squeeze(flux_ds["Fe"])
            # flux_df["LE_flux_QC"] = np.squeeze(flux_ds["Fe_QCFlag"])
            # flux_df["H_flux"] = np.squeeze(flux_ds["Fh"])
            # flux_df["H_flux_QC"] = np.squeeze(flux_ds["Fh_QCFlag"])
            flux_df["TA_F"] = np.squeeze(flux_ds["Ta"]) # degree C
            flux_df["TA_F_QC"] = np.squeeze(flux_ds["Ta_QCFlag"])
            # flux_df["PA_F"] = np.squeeze(flux_ds["ps"]) # kPa
            # flux_df["PA_F_QC"] = np.squeeze(flux_ds["ps_QCFlag"])
            if "GPP_LL" in flux_ds:
                ## select GPP product
                flux_df["GPP"] = np.squeeze(flux_ds["GPP_LL"])
                flux_df["GPP_QC"] = np.squeeze(flux_ds["GPP_LL_QCFlag"]) # same as GPP_DT_VUT_REF
            else:
                flux_df["GPP"] = np.squeeze(flux_ds["GPP_SOLO"])
                flux_df["GPP_QC"] = np.squeeze(flux_ds["GPP_SOLO_QCFlag"])
            # flux_df["VPD_F"] = np.squeeze(flux_ds["VPD"]) # kPa
            # flux_df["VPD_F_QC"] = np.squeeze(flux_ds["VPD_QCFlag"])
            flux_df["SW_IN_F"] = np.squeeze(flux_ds["Fsd"])
            flux_df["SW_IN_F_QC"] = np.squeeze(flux_ds["Fsd_QCFlag"])
            # flux_df["WS_F"] = np.squeeze(flux_ds["Ws"])
            # flux_df["WS_F_QC"] = np.squeeze(flux_ds["Ws_QCFlag"])
            # flux_df["LW_IN_F"] = np.squeeze(flux_ds["Fld"])
            # flux_df["LW_IN_F_QC"] = np.squeeze(flux_ds["Fld_QCFlag"])
            # flux_df["NETRAD"] = np.squeeze(flux_ds["Fn"])
            # flux_df["NETRAD_QC"] = np.squeeze(flux_ds["Fn_QCFlag"])
            # flux_df["G_flux"] = np.squeeze(flux_ds["Fg"])
            # flux_df["G_flux_QC"] = np.squeeze(flux_ds["Fg_QCFlag"])
            flux_df["USTAR"] = np.squeeze(flux_ds["ustar"])
            flux_df["USTAR_QC"] = np.squeeze(flux_ds["ustar_QCFlag"])
            flux_ds.close()
            del flux_ds
    
            # ## convert unit
            # flux_df["PA_F"] = flux_df["PA_F"]*10 # kPa -> hPa
            # flux_df["VPD_F"] = flux_df["VPD_F"]*10 # kPa -> hPa
    
            ## check whether data is half-hourly or hourly
            if (flux_df["TIMESTAMP_END"].iloc[1]-flux_df["TIMESTAMP_END"].iloc[0] == pd.Timedelta(minutes=30)):
                len_HH = 1
                flux_df["TIMESTAMP_START"] = flux_df["TIMESTAMP_END"] - pd.Timedelta(minutes=30)
            else:
                len_HH = 0
                flux_df["TIMESTAMP_START"] = flux_df["TIMESTAMP_END"] - pd.Timedelta(minutes=60)
            print(f"site = {site_ID} , HH = {len_HH}") # HH=1 is half-hourly data
            if  (len_HH == 1):
                day_timestep = 48 # 48 timestep in a day
                rain_QC_timestep = 24 # rain within 12 hours
            else:
                day_timestep = 24 # 24 timestep in a day
                rain_QC_timestep = 12 # rain within 12 hours
    
            ## Quality control 1
            flux_df = flux_df.replace(-9999,np.nan)
            for var in var_to_QC_Ozflux:
                # Data that has been corrected is recommended for process-based studies.
                # Data that has been gap filled is not recommended for process-based studies
                # but may be used for budget-style studies.
                # source: https://github.com/OzFlux/PyFluxPro/wiki/QC-Flag-Definitions
                # where the condition is true, keep the original values, otherwise, default replace with nan
                flux_df[var] = flux_df[var].where(((np.all(np.isnan(flux_df[dic_QC_Ozflux[var]]))) |
                                                   (flux_df[dic_QC_Ozflux[var]]==0) |
                                                   (flux_df[dic_QC_Ozflux[var]]==10) |
                                                   (flux_df[dic_QC_Ozflux[var]]==20)))
            print(f"complete {site_ID} QC 1 (from QC flags)")
            
            # # 0<=RH<=100
            # flux_df["e_sat"] = es0*np.exp((Lv/Rv)*((1/T0)-(1/(flux_df["TA_F"]+273.15))))
            # flux_df["RH"] = 100*(flux_df["e_sat"]-flux_df["VPD_F"])/flux_df["e_sat"]
            # flux_df["RH"] = flux_df["RH"].where((flux_df["RH"]>=0) & (flux_df["RH"]<=100))
    
            # remove rainy event (rain within previous 12 hours) from flux data
            flux_df.loc[len(flux_df)] = {col: np.nan for col in flux_df.columns} # add a nan row because df["var"][a:b] does not count in df["var"][b]
            for pt in range(rain_QC_timestep*1, flux_df.shape[0]):
                if ((np.all(~np.isnan(flux_df["P_F"].iloc[pt-rain_QC_timestep:pt+1]))) & (np.sum(flux_df["P_F"].iloc[pt-rain_QC_timestep:pt+1])==0)):
                    # if only np.sum, still count in nan
                    pass
                else:
                    flux_df["USTAR"].iloc[pt] = np.nan
                    # flux_df["H_flux"].iloc[pt] = np.nan
                    # flux_df["LE_flux"].iloc[pt] = np.nan
                    flux_df["GPP"].iloc[pt] = np.nan
                    # flux_df["G_flux"].iloc[pt] = np.nan

            ## Get u* upper bound -> for Quality control 2
            USTAR_mean = flux_df["USTAR"].mean(axis=0)
            USTAR_std = flux_df["USTAR"].std(axis=0)
            USTAR_upper = USTAR_mean+2*USTAR_std
            flux_df.loc[((flux_df["USTAR"]<=0.2) | (flux_df["USTAR"]>USTAR_upper)), 'GPP'] = np.nan
            print(f"complete {site_ID} QC 2 (exclude too low and too high USTAR for fluxes)")
            # print(flux_df.loc[5760:5790, "GPP_DT_VUT_REF"])
            # print(f"{site_ID} {flux_df}")
    
            # ## Quality Control 3 - remove outliers
            # site_data['time_year'] = flux_df["TIMESTAMP_START"].dt.year
            # site_data['SW_IN_F'] = flux_df["SW_IN_F"]
            # for it, (var, qc) in enumerate(dic_QC_Ozflux_flux.items()):
            #     # pick variable column and keep good or no qc flags data
            #     df_var = flux_df[['TIMESTAMP_START', 'TIMESTAMP_END', 'SW_IN_F', var]].copy()
            #     print(f"{site_ID} {df_var}")
            #     # copy Tair data for filtering freezing conditions
            #     df_var['TA_low'] = flux_df['TA_F']
            #     df_var['TA_low_QC'] = flux_df['TA_F_QC']
            #     if df_var['TA_low'].notna().any():
            #         df_var['TA_low'] = df_var['TA_low'].where(df_var['TA_low_QC'].isin([0,10,20]), np.nan)
            #     else:
            #         pass
            #     df_var.drop('TA_low_QC', axis=1, inplace=True)
    
            #     if var in ['GPP']:
            #         print(f"start outlier removing {site_ID} - {var}")
            #         df_var_no_outlier = remove_outlier(df_var, var)
            #         df_var_no_outlier.drop(columns=['TA_low', 'hour_start'], inplace=True)
            #         print(f"finish outlier removing {site_ID} - {var}")
            #     else:
            #         df_var_no_outlier = df_var.copy()
            #         df_var_no_outlier.drop(columns=['TA_low'], inplace=True)
    
            #     # merge good data into dataframe
            #     site_data = site_data.merge(df_var_no_outlier[['TIMESTAMP_START', 'TIMESTAMP_END', var]], on=['TIMESTAMP_START', 'TIMESTAMP_END'], how='left')
            #     del df_var, df_var_no_outlier
            # print(f"complete {site_ID} QC 3 (remove outlier for fluxes)")

            # --- Quality Control 3 - remove outliers (robust) ---
            required_base = ["TIMESTAMP_START", "TIMESTAMP_END", "SW_IN_F"]
            missing_base = [c for c in required_base if c not in flux_df.columns]
            if missing_base:
                raise KeyError(f"[{site_ID}] Missing base columns before QC3: {missing_base}")
            
            # ensure site_data exists and aligns
            site_data = pd.DataFrame({
                "TIMESTAMP_START": flux_df["TIMESTAMP_START"],
                "TIMESTAMP_END": flux_df["TIMESTAMP_END"],
            })
            site_data["time_year"] = flux_df["TIMESTAMP_START"].dt.year
            site_data["SW_IN_F"]   = flux_df["SW_IN_F"]
            
            # bring the dict into scope robustly (works even in child processes)
            try:
                _qc_map = dic_QC_Ozflux_flux
            except NameError:
                # fallback: minimal map for GPP
                _qc_map = {"GPP": "GPP_QC"}
                print(f"[{site_ID}] dic_QC_Ozflux_flux not found in worker; using fallback: {_qc_map}")
            
            if not isinstance(_qc_map, dict) or not _qc_map:
                print(f"[{site_ID}] QC map empty or not a dict: {_qc_map} — skipping QC3 loop.")
            else:
                # keep only variables that exist
                vars_present = [v for v in _qc_map.keys() if v in flux_df.columns]
                if not vars_present:
                    print(f"[{site_ID}] None of {_qc_map.keys()} present in flux_df; skipping QC3 loop.")
                else:
                    print(f"[{site_ID}] QC3 will process variables: {vars_present}")
            
                for it, var in enumerate(vars_present):
                    qc_col = _qc_map[var]
                    need_cols = ["TIMESTAMP_START", "TIMESTAMP_END", "SW_IN_F", var]
                    missing_now = [c for c in need_cols if c not in flux_df.columns]
                    if missing_now:
                        print(f"[{site_ID}] Skipping {var}: missing columns {missing_now}")
                        continue
                    if qc_col not in flux_df.columns:
                        print(f"[{site_ID}] Skipping {var}: QC column '{qc_col}' not found.")
                        continue
            
                    df_var = flux_df[need_cols].copy()
                    # Optional Tair screen if present
                    if ("TA_F" in flux_df.columns) and ("TA_F_QC" in flux_df.columns):
                        df_var["TA_low"] = flux_df["TA_F"].where(flux_df["TA_F_QC"].isin([0, 10, 20]), np.nan)
            
                    if var in ["GPP"]:
                        print(f"[{site_ID}] start outlier removing - {var} (rows={len(df_var)})")
                        df_var_no_outlier = remove_outlier(df_var, var)
                        df_var_no_outlier.drop(columns=["TA_low", "hour_start"], inplace=True, errors="ignore")
                        print(f"[{site_ID}] finish outlier removing - {var}")
                    else:
                        df_var_no_outlier = df_var.copy()
                        df_var_no_outlier.drop(columns=["TA_low"], inplace=True, errors="ignore")
            
                    # Merge back
                    site_data = site_data.merge(
                        df_var_no_outlier[["TIMESTAMP_START", "TIMESTAMP_END", var]],
                        on=["TIMESTAMP_START", "TIMESTAMP_END"], how="left"
                    )
                    del df_var, df_var_no_outlier
            
                print(f"[{site_ID}] complete QC 3 (remove outlier for fluxes)")

            ## select time period: daytime (SW>10 W/m2)
            flux_time_mask = ((site_data["time_year"] >= site_year_start) & 
                    (site_data["time_year"] <= site_year_end) & 
                    (site_data["SW_IN_F"]>=50))
            site_data = site_data[flux_time_mask]
    
            print(f"complete {site_ID} selecting site_data")
    
            ## daily mean
            site_data = site_data.set_index('TIMESTAMP_START')
            site_data_daily_tmp = site_data['GPP'].resample('D').mean()
            site_data_daily_tmp = site_data_daily_tmp.reset_index()
            site_data_daily_tmp = site_data_daily_tmp.rename(columns={"TIMESTAMP_START": "date"})
            site_data_daily = pd.DataFrame({'date': pd.date_range(start=f"{int(site_year_start)}-01-01", end=f"{int(site_year_end)}-12-31", freq='D')})
            site_data_daily = pd.merge(site_data_daily, site_data_daily_tmp, on='date', how='left')
            
    
            ## interpolation
            site_data_daily["GPP"] = site_data_daily["GPP"].interpolate(method='linear')
            site_data_daily["GPP"] = site_data_daily["GPP"].rolling(window_size, min_periods=5, center=True).apply(lambda x : np.nanmedian(x))
            site_data_daily.loc[np.isnan(site_data_daily["GPP"]), "GPP"] = 0
            print(f"complete {site_ID} interpolation and rolling mean")
    
            ### --------------- get SOS, EOS every year ---------------
            site_data_daily["DOY"] = site_data_daily["date"].dt.dayofyear
            ## exclude all Feb 29 rows
            site_data_daily = site_data_daily[~((site_data_daily['date'].dt.month == 2) & (site_data_daily['date'].dt.day == 29))]
        
            ## select year
            site_data_daily['Year'] = site_data_daily['date'].dt.year
        
            ## create dataframe for SOS and EOS for each site
            df_SOS_EOS = pd.DataFrame()

            ## smooth and scale data for each year
            site_years = site_data_daily.loc[len(site_data_daily)-1, 'Year'] - site_data_daily.loc[0, 'Year'] + 1
            site_year_start = int(site_data_daily.loc[0, 'Year'])
            site_year_end = int(site_data_daily.loc[len(site_data_daily)-1, 'Year'])
            count = 0
            fig, axs = plt.subplots(site_years, 1, figsize=(8,3*site_years))
            # for year, group in grouped_year:
            for yt in range(site_year_start, site_year_end+1, 1):
                df_year_tmp = site_data_daily.copy()
                df_year = df_year_tmp[df_year_tmp['Year'] == yt]
                
                ## smoothing grouped_year (use the middle concat result to avoid unsmoothed tail)
                grouped_df_concat3 = pd.concat([df_year, df_year, df_year], axis=0)
                grouped_df_concat3 = grouped_df_concat3.reset_index()
        
                smoothed_df_concat3 = grouped_df_concat3.copy()
                for it in range(50):
                    smoothed_df_concat3 = smooth_savgol(smoothed_df_concat3, "GPP")
                print(f"complete {site_ID} smoothing year = {yt}")
                
                # ## plot: smoothed vs original
                # axs[count].plot(grouped_df_concat3.index.values, grouped_df_concat3["GPP_DT_VUT_REF"])
                # axs[count].plot(smoothed_df_concat3.index.values, smoothed_df_concat3["GPP_DT_VUT_REF"])
                # axs[count].set_title(f"{year}")
                # axs[count].set_xlim((365,365*2-1))
    
                ## scale to [0,1]
                smoothed_df = smoothed_df_concat3.copy()
                columns_to_normalize = "GPP"
                scaled_df = smoothed_df.copy()
                scaler = MinMaxScaler()
                scaled_df[columns_to_normalize] = scaler.fit_transform(smoothed_df[[columns_to_normalize]])
                print(f"complete {site_ID} scaling year = {yt}")
                # print(scaled_df)
    
                ## get SOS, EOS
                SOS_EOS_DOY_GPP, SOS_EOS_intersect_GPP, x_result_GPP = find_SOS_EOS(scaled_df, "GPP")
                print(f"get {site_ID} SOS, EOS in year = {yt}")
    
                ## plot SOS, EOS
                axs[count].plot(grouped_df_concat3.index.values, scaled_df["GPP"])
                print(f"pass first line of plot {site_ID} in year = {yt}")
                for it in range(len(x_result_GPP)):
                    axs[count].axvline(x=x_result_GPP[it], color='black', linestyle='--')
                for it in range(len(SOS_EOS_DOY_GPP)):
                    if (~np.isnan(SOS_EOS_DOY_GPP[it])):
                        axs[count].plot(SOS_EOS_DOY_GPP[it], SOS_EOS_intersect_GPP[it], 'o')
                    else:
                        pass
                print(f"pass second line of plot {site_ID} in year = {yt}")
                axs[count].set_title(f"{yt}")
                axs[count].set_xlim((365,365*2-1))
                axs[count].set_xticks(np.arange(365,365*2,60), np.arange(0,365,60)+1)
                axs[count].set_ylabel('GPP')
                
                ## fill in SOS, EOS
                df_SOS_EOS.loc[count, "site_ID"] = site_ID
                df_SOS_EOS.loc[count, "IGBP"] = site_IGBP
                df_SOS_EOS.loc[count, "year"] = int(yt)
                df_SOS_EOS.loc[count, "SOS_1_GPP"] = SOS_EOS_DOY_GPP[0]-365
                df_SOS_EOS.loc[count, "EOS_1_GPP"] = SOS_EOS_DOY_GPP[1]-365
                df_SOS_EOS.loc[count, "SOS_2_GPP"] = SOS_EOS_DOY_GPP[2]-365
                df_SOS_EOS.loc[count, "EOS_2_GPP"] = SOS_EOS_DOY_GPP[3]-365
                
                count = count + 1
    
                del grouped_df_concat3, smoothed_df_concat3, smoothed_df, scaled_df
            
            del site_data_daily, flux_df
            # print(f"get {a1} SOS and EOS")
            
    
            ## save figure
            filename = f"scaled_SOS_EOS_{site_ID}.png"
            # filename = f"smooth_SOS_EOS_{site_ID}.png"
            fig.savefig(dir_savefig+filename, dpi=400, bbox_inches='tight')
            print(f"Saved png: {filename}")
            # plt.close()
    
            # --- Save filtered site data ---
            fn_csv_out = f"{site_ID}_SOS_EOS.csv"
            df_SOS_EOS.fillna(-9999).to_csv(dir_csv_out + fn_csv_out, index=False)
            print(f"Saved csv: {fn_csv_out}")

        except Exception as e:
            print(f"Error processing {sel_site}: {e}")
            return None
        
        
        
        
        
        
        
        
        
        
        
        
        
        
        
        
        
        
        
        
        
        
        
        
        
        
        
        # ## Quality Control 3 - remove outliers
        # site_data['time_year'] = flux_df["TIMESTAMP_START"].dt.year
        # site_data['SW_IN_F'] = flux_df["SW_IN_F"]
        # for it, (var, qc) in enumerate(dic_QC_flux.items()):
        #     # pick variable column and keep good or no qc flags data
        #     df_var = flux_df[['TIMESTAMP_START', 'TIMESTAMP_END', 'SW_IN_F', var]].copy()
        #     # copy Tair data for filtering freezing conditions
        #     df_var['TA_low'] = flux_df['TA_F']
        #     df_var['TA_low_QC'] = flux_df['TA_F_QC']
        #     if df_var['TA_low'].notna().any():
        #         df_var['TA_low'] = df_var['TA_low'].where(df_var['TA_low_QC'].isin([0,1]), np.nan)
        #     else:
        #         pass
        #     df_var.drop('TA_low_QC', axis=1, inplace=True)

        #     if var in ['GPP']:
        #         print(f"start outlier removing {site_ID} - {var}")
        #         # print(df_var.iloc[5760:5790, :])
        #         df_var_no_outlier = remove_outlier(df_var, var)
        #         df_var_no_outlier.drop(columns=['TA_low', 'hour_start'], inplace=True)
        #         print(f"finish outlier removing {site_ID} - {var}")
        #         # print(df_var_no_outlier.iloc[5760:5790, :])
        #     else:
        #         df_var_no_outlier = df_var.copy()
        #         df_var_no_outlier.drop(columns=['TA_low'], inplace=True)

        #     print(f"complete {a1} QC 3 (remove outlier for fluxes)")

        #     # merge good data into dataframe
        #     site_data = site_data.merge(df_var_no_outlier[['TIMESTAMP_START', 'TIMESTAMP_END', var]], on=['TIMESTAMP_START', 'TIMESTAMP_END'], how='left')
        #     del df_var, df_var_no_outlier
        #     # print(site_data.iloc[5760:5790, :])

        # # print(site_data.columns)

        # ## select time period: daytime (SW>10 W/m2)
        # flux_time_mask = ((site_data["time_year"] >= site_year_start) & 
        #         (site_data["time_year"] <= site_year_end) & 
        #         (site_data["SW_IN_F"]>=10))
        # site_data = site_data[flux_time_mask]

        # print(f"complete {a1} selecting site_data")

        # ## daily mean
        # site_data = site_data.set_index('TIMESTAMP_START')
        # site_data_daily_tmp = site_data['GPP'].resample('D').mean()
        # site_data_daily_tmp = site_data_daily_tmp.reset_index()
        # site_data_daily_tmp = site_data_daily_tmp.rename(columns={"TIMESTAMP_START": "date"})
        # site_data_daily = pd.DataFrame({'date': pd.date_range(start=f"{int(site_year_start)}-01-01", end=f"{int(site_year_end)}-12-31", freq='D')})
        # site_data_daily = pd.merge(site_data_daily, site_data_daily_tmp, on='date', how='left')

        # ## interpolation
        # site_data_daily["GPP"] = site_data_daily["GPP"].interpolate(method='linear')
        # site_data_daily["GPP"] = site_data_daily["GPP"].rolling(window_size, min_periods=5, center=True).apply(lambda x : np.nanmedian(x))
        # site_data_daily.loc[np.isnan(site_data_daily["GPP"]), "GPP"] = 0
        # print(f"complete {a1} interpolation and rolling mean")

        # ### --------------- get SOS, EOS every year ---------------
        # site_data_daily["DOY"] = site_data_daily["date"].dt.dayofyear
        # ## exclude all Feb 29 rows
        # site_data_daily = site_data_daily[~((site_data_daily['date'].dt.month == 2) & (site_data_daily['date'].dt.day == 29))]
    
        # ## select year
        # site_data_daily['Year'] = site_data_daily['date'].dt.year
    
        # ## create dataframe for SOS and EOS for each site
        # df_SOS_EOS = pd.DataFrame()
        
        # ## smooth and scale data for each year
        # site_years = site_data_daily.loc[len(site_data_daily)-1, 'Year'] - site_data_daily.loc[0, 'Year'] + 1
        # site_year_start = int(site_data_daily.loc[0, 'Year'])
        # site_year_end = int(site_data_daily.loc[len(site_data_daily)-1, 'Year'])
        # count = 0
        # fig, axs = plt.subplots(site_years, 1, figsize=(8,3*site_years))
        # # for year, group in grouped_year:
        # for yt in range(site_year_start, site_year_end+1, 1):
        #     df_year_tmp = site_data_daily.copy()
        #     df_year = df_year_tmp[df_year_tmp['Year'] == yt]
            
        #     ## smoothing grouped_year (use the middle concat result to avoid unsmoothed tail)
        #     grouped_df_concat3 = pd.concat([df_year, df_year, df_year], axis=0)
        #     grouped_df_concat3 = grouped_df_concat3.reset_index()
    
        #     smoothed_df_concat3 = grouped_df_concat3.copy()
        #     for it in range(50):
        #         smoothed_df_concat3 = smooth_savgol(smoothed_df_concat3, "GPP_DT_VUT_REF")
        #     print(f"complete {site_ID} smoothing year = {yt}")

        #     ## scale to [0,1]
        #     smoothed_df = smoothed_df_concat3.copy()
        #     columns_to_normalize = "GPP"
        #     scaled_df = smoothed_df.copy()
        #     scaler = MinMaxScaler()
        #     scaled_df[columns_to_normalize] = scaler.fit_transform(smoothed_df[[columns_to_normalize]])
        #     print(f"complete {site_ID} scaling {yt}")

        #     ## get SOS, EOS
        #     SOS_EOS_DOY_GPP, SOS_EOS_intersect_GPP, x_result_GPP = find_SOS_EOS(scaled_df, "GPP")

        #     ## plot SOS, EOS
        #     axs[count].plot(grouped_df_concat3.index.values, scaled_df["GPP"])
        #     print(f"pass first line of figure {site_ID} - {yt}")
        #     for it in range(len(x_result_GPP)):
        #         axs[count].axvline(x=x_result_GPP[it], color='black', linestyle='--')
        #     for it in range(len(SOS_EOS_DOY_GPP)):
        #         if (~np.isnan(SOS_EOS_DOY_GPP[it])):
        #             axs[count].plot(SOS_EOS_DOY_GPP[it], SOS_EOS_intersect_GPP[it], 'o')
        #         else:
        #             pass
        #     print(f"pass second line of figure {site_ID} - {yt}")
        #     axs[count].set_title(f"{year}")
        #     axs[count].set_xlim((365,365*2-1))
        #     axs[count].set_xticks(np.arange(365,365*2,60), np.arange(0,365,60)+1)
        #     axs[count].set_ylabel('GPP')
            
        #     ## fill in SOS, EOS
        #     df_SOS_EOS.loc[count, "site_ID"] = site_ID
        #     df_SOS_EOS.loc[count, "IGBP"] = site_IGBP
        #     df_SOS_EOS.loc[count, "year"] = int(year)
        #     df_SOS_EOS.loc[count, "SOS_1_GPP"] = SOS_EOS_DOY_GPP[0]-365
        #     df_SOS_EOS.loc[count, "EOS_1_GPP"] = SOS_EOS_DOY_GPP[1]-365
        #     df_SOS_EOS.loc[count, "SOS_2_GPP"] = SOS_EOS_DOY_GPP[2]-365
        #     df_SOS_EOS.loc[count, "EOS_2_GPP"] = SOS_EOS_DOY_GPP[3]-365
            
        #     count = count + 1

        #     del grouped_df_concat3, smoothed_df_concat3, smoothed_df, scaled_df
            
        # print(f"get {a1} SOS and EOS")

        # ## save figure
        # filename = f"scaled_SOS_EOS_{site_ID}.png"
        # # filename = f"smooth_SOS_EOS_{site_ID}.png"
        # fig.savefig(dir_savefig+filename, dpi=400, bbox_inches='tight')
        # print(f"Saved png: {filename}")
        # plt.close()
    
        # # --- Save filtered site data ---
        # fn_csv_out = f"{site_ID}_SOS_EOS.csv"
        # df_SOS_EOS.fillna(-9999).to_csv(dir_csv_out + fn_csv_out, index=False)
        # print(f"Saved csv: {fn_csv_out}")


    
    
    
    
    elif (site_src == 'LBA-ECO'):
        ## read site data
        LBAECO_df = pd.read_excel(xlsx_dir+pattern, skiprows=[1])
        print(f"read site xlsx {site_ID}")

        # get date
        LBAECO_df["combined"] = LBAECO_df["Year_LBAMIP"].astype(int)*1000 + LBAECO_df["DoY_LBAMIP"].astype(int)
        LBAECO_df["date"] = pd.to_datetime(LBAECO_df["combined"], format = "%Y%j")
        LBAECO_df["datetime"] = LBAECO_df["date"] + pd.to_timedelta(LBAECO_df["Hour_LBAMIP"], unit='h')

        ## create a dataframe in filtering negative sensible heat flux
        flux_df = pd.DataFrame(columns=pick_cols_LBAECO)

        ## add site description and fill in data
        flux_df["site_ID"] = site_ID
        flux_df["site_name"] = site_name
        flux_df["IGBP"] = site_IGBP
        flux_df["site_src"] = site_src

        flux_df["TIMESTAMP_END"] = LBAECO_df["datetime"]
        flux_df["P_F"] = LBAECO_df["prec"] # mm
        # flux_df["LE_flux"] = LBAECO_df["LE"]
        # flux_df["H_flux"] = LBAECO_df["H"]
        flux_df["TA_F"] = LBAECO_df["ta"] # C
        # flux_df["PA_F"] = LBAECO_df["press"] # kPa
        # flux_df["GPP_NT_VUT_REF"] = LBAECO_df["GEP_model"]
        flux_df["GPP"] = LBAECO_df["GEP_model"]
        # flux_df["VPD_F"] = LBAECO_df["VPD"] # kPa
        flux_df["SW_IN_F"] = LBAECO_df["rgs"]
        # flux_df["WS_F"] = LBAECO_df["ws"]
        # flux_df["LW_IN_F"] = LBAECO_df["rgl"]
        # flux_df["NETRAD"] = LBAECO_df["Rn"]
        # flux_df["G_flux"] = LBAECO_df["FG"]
        flux_df["USTAR"] = LBAECO_df["ust"]
        del LBAECO_df

        # ## convert unit
        # flux_df["PA_F"] = flux_df["PA_F"]*10 # kPa -> hPa
        # flux_df["VPD_F"] = flux_df["VPD_F"]*10 # kPa -> hPa

        ## check whether data is half-hourly or hourly
        if (flux_df["TIMESTAMP_END"].iloc[1]-flux_df["TIMESTAMP_END"].iloc[0] == pd.Timedelta(minutes=30)):
            len_HH = 1
            flux_df["TIMESTAMP_START"] = flux_df["TIMESTAMP_END"]-pd.Timedelta(minutes=30)
        else:
            len_HH = 0
            flux_df["TIMESTAMP_START"] = flux_df["TIMESTAMP_END"]-pd.Timedelta(minutes=60)
        print("site = ", site_ID, ", HH = ", len_HH) # HH=1 is half-hourly data
        if  (len_HH == 1):
            day_timestep = 48 # 48 timestep in a day
            rain_QC_timestep = 24 # rain within 12 hours
        else:
            day_timestep = 24 # 24 timestep in a day
            rain_QC_timestep = 12 # rain within 12 hours

        ## Quality control 1
        flux_df = flux_df.replace(-9999, np.nan)

        # # 0<=RH<=100
        # flux_df["e_sat"] = es0*np.exp((Lv/Rv)*((1/T0)-(1/(flux_df["TA_F"]+273.15))))
        # flux_df["RH"] = 100*(flux_df["e_sat"]-flux_df["VPD_F"])/flux_df["e_sat"]
        # flux_df["RH"] = flux_df["RH"].where((flux_df["RH"]>=0) & (flux_df["RH"]<=100))

        # remove rain in previous 12 hours
        flux_df.loc[len(flux_df)] = {col: np.nan for col in flux_df.columns} # add a nan row because df["var"][a:b] does not count in df["var"][b]
        for pt in range(rain_QC_timestep*1, flux_df.shape[0]-1):
            if ((np.nansum(flux_df.loc[pt-rain_QC_timestep:pt+1, "P_F"])==0) | (np.isnan(np.nansum(flux_df.loc[pt-rain_QC_timestep:pt+1, "P_F"])))):
                pass
            else:
                flux_df.loc[pt, "USTAR"] = np.nan
                flux_df.loc[pt, "GPP"] = np.nan

        ## Get u* upper bound -> for Quality control 2
        USTAR_mean = flux_df["USTAR"].mean(axis=0)
        USTAR_std = flux_df["USTAR"].std(axis=0)
        USTAR_upper = USTAR_mean+2*USTAR_std
        flux_df.loc[((flux_df["USTAR"]<=0.2) | (flux_df["USTAR"]>USTAR_upper)), 'GPP'] = np.nan
        print(f"complete {site_ID} QC 2 (exclude too low and too high USTAR for fluxes)")
        # print(flux_df.loc[5760:5790, "GPP_DT_VUT_REF"])

        # ## Quality Control 3 - remove outliers
        # site_data['time_year'] = flux_df["TIMESTAMP_START"].dt.year
        # site_data['SW_IN_F'] = flux_df["SW_IN_F"]
        # for it, (var, qc) in enumerate(dic_QC_flux.items()):
        #     # pick variable column and keep good or no qc flags data
        #     df_var = flux_df[['TIMESTAMP_START', 'TIMESTAMP_END', 'SW_IN_F', var]].copy()
        #     # copy Tair data for filtering freezing conditions
        #     df_var['TA_low'] = flux_df['TA_F']
        #     df_var['TA_low_QC'] = flux_df['TA_F_QC']
        #     if df_var['TA_low'].notna().any():
        #         df_var['TA_low'] = df_var['TA_low'].where(df_var['TA_low_QC'].isin([0,1]), np.nan)
        #     else:
        #         pass
        #     df_var.drop('TA_low_QC', axis=1, inplace=True)

        #     if var in ['GPP']:
        #         print(f"start outlier removing {site_ID} - {var}")
        #         # print(df_var.iloc[5760:5790, :])
        #         df_var_no_outlier = remove_outlier(df_var, var)
        #         df_var_no_outlier.drop(columns=['TA_low', 'hour_start'], inplace=True)
        #         print(f"finish outlier removing {site_ID} - {var}")
        #         # print(df_var_no_outlier.iloc[5760:5790, :])
        #     else:
        #         df_var_no_outlier = df_var.copy()
        #         df_var_no_outlier.drop(columns=['TA_low'], inplace=True)

        #     print(f"complete {a1} QC 3 (remove outlier for fluxes)")

        #     # merge good data into dataframe
        #     site_data = site_data.merge(df_var_no_outlier[['TIMESTAMP_START', 'TIMESTAMP_END', var]], on=['TIMESTAMP_START', 'TIMESTAMP_END'], how='left')
        #     del df_var, df_var_no_outlier

        # # --- Quality Control 3 - remove outliers (robust) ---
        # required_base = ["TIMESTAMP_START", "TIMESTAMP_END", "SW_IN_F"]
        # missing_base = [c for c in required_base if c not in flux_df.columns]
        # if missing_base:
        #     raise KeyError(f"[{site_ID}] Missing base columns before QC3: {missing_base}")
        
        # # ensure site_data exists and aligns
        # site_data = pd.DataFrame({
        #     "TIMESTAMP_START": flux_df["TIMESTAMP_START"],
        #     "TIMESTAMP_END": flux_df["TIMESTAMP_END"],
        # })
        # site_data["time_year"] = flux_df["TIMESTAMP_START"].dt.year
        # site_data["SW_IN_F"]   = flux_df["SW_IN_F"]
        
        # # bring the dict into scope robustly (works even in child processes)
        # try:
        #     _qc_map = dic_QC_Ozflux_flux
        # except NameError:
        #     # fallback: minimal map for GPP
        #     _qc_map = {"GPP": "GPP_QC"}
        #     print(f"[{site_ID}] dic_QC_Ozflux_flux not found in worker; using fallback: {_qc_map}")
        
        # if not isinstance(_qc_map, dict) or not _qc_map:
        #     print(f"[{site_ID}] QC map empty or not a dict: {_qc_map} — skipping QC3 loop.")
        # else:
        #     # keep only variables that exist
        #     vars_present = [v for v in _qc_map.keys() if v in flux_df.columns]
        #     if not vars_present:
        #         print(f"[{site_ID}] None of {_qc_map.keys()} present in flux_df; skipping QC3 loop.")
        #     else:
        #         print(f"[{site_ID}] QC3 will process variables: {vars_present}")

        #     for it, var in enumerate(vars_present):
        #         qc_col = _qc_map[var]
        #         need_cols = ["TIMESTAMP_START", "TIMESTAMP_END", "SW_IN_F", var]
        #         missing_now = [c for c in need_cols if c not in flux_df.columns]
        #         if missing_now:
        #             print(f"[{site_ID}] Skipping {var}: missing columns {missing_now}")
        #             continue
        #         if qc_col not in flux_df.columns:
        #             print(f"[{site_ID}] Skipping {var}: QC column '{qc_col}' not found.")
        #             continue
        
        #         df_var = flux_df[need_cols].copy()
        #         # Optional Tair screen if present
        #         if ("TA_F" in flux_df.columns) and ("TA_F_QC" in flux_df.columns):
        #             df_var["TA_low"] = flux_df["TA_F"].where(flux_df["TA_F_QC"].isin([0, 10, 20]), np.nan)
        
        #         if var in ["GPP"]:
        #             print(f"[{site_ID}] start outlier removing - {var} (rows={len(df_var)})")
        #             df_var_no_outlier = remove_outlier(df_var, var)
        #             df_var_no_outlier.drop(columns=["TA_low", "hour_start"], inplace=True, errors="ignore")
        #             print(f"[{site_ID}] finish outlier removing - {var}")
        #         else:
        #             df_var_no_outlier = df_var.copy()
        #             df_var_no_outlier.drop(columns=["TA_low"], inplace=True, errors="ignore")
        
        #         # Merge back
        #         site_data = site_data.merge(
        #             df_var_no_outlier[["TIMESTAMP_START", "TIMESTAMP_END", var]],
        #             on=["TIMESTAMP_START", "TIMESTAMP_END"], how="left"
        #         )
        #         del df_var, df_var_no_outlier
        
        #     print(f"[{site_ID}] complete QC 3 (remove outlier for fluxes)")

        # -------------------------
        # QC3: Outlier removal (QC-free)
        # -------------------------
        required_base = ["TIMESTAMP_START", "TIMESTAMP_END", "SW_IN_F"]
        missing_base  = [c for c in required_base if c not in flux_df.columns]
        if missing_base:
            raise KeyError(f"[{site_ID}] Missing base columns before QC3: {missing_base}")
        
        # Build site_data aligned with flux_df (no QC dict; just work on GPP if present)
        site_data = pd.DataFrame({
            "TIMESTAMP_START": flux_df["TIMESTAMP_START"],
            "TIMESTAMP_END"  : flux_df["TIMESTAMP_END"],
        })
        site_data["time_year"] = flux_df["TIMESTAMP_START"].dt.year
        site_data["SW_IN_F"]   = flux_df["SW_IN_F"]
        
        # Choose variables to outlier-filter (no QC map). Here: GPP only if available.
        vars_present = [v for v in ["GPP"] if v in flux_df.columns]
        
        for var in vars_present:
            need_cols  = ["TIMESTAMP_START", "TIMESTAMP_END", "SW_IN_F", var]
            missing_now = [c for c in need_cols if c not in flux_df.columns]
            if missing_now:
                print(f"[{site_ID}] Skipping {var}: missing columns {missing_now}")
                continue
        
            df_var = flux_df[need_cols].copy()
        
            # Run your outlier remover (assumes it can work without QC flags)
            print(f"[{site_ID}] start outlier removing - {var} (rows={len(df_var)})")
            df_var_no_outlier = remove_outlier(df_var, var)
            # Some versions add 'hour_start'; drop if present. No TA_low used here.
            df_var_no_outlier.drop(columns=["hour_start"], inplace=True, errors="ignore")
            print(f"[{site_ID}] finish outlier removing - {var}")
        
            # Merge back
            site_data = site_data.merge(
                df_var_no_outlier[["TIMESTAMP_START", "TIMESTAMP_END", var]],
                on=["TIMESTAMP_START", "TIMESTAMP_END"],
                how="left"
            )
            del df_var, df_var_no_outlier
        
        print(f"[{site_ID}] complete QC 3 (remove outlier for fluxes)")

        ## select time period: daytime (SW>10 W/m2)
        flux_time_mask = ((site_data["time_year"] >= site_year_start) & 
                (site_data["time_year"] <= site_year_end) & 
                (site_data["SW_IN_F"]>=50))
        site_data = site_data[flux_time_mask]

        print(f"complete {site_ID} selecting site_data")

        ## daily mean
        site_data = site_data.set_index('TIMESTAMP_START')
        site_data_daily_tmp = site_data['GPP'].resample('D').mean()
        site_data_daily_tmp = site_data_daily_tmp.reset_index()
        site_data_daily_tmp = site_data_daily_tmp.rename(columns={"TIMESTAMP_START": "date"})
        site_data_daily = pd.DataFrame({'date': pd.date_range(start=f"{int(site_year_start)}-01-01", end=f"{int(site_year_end)}-12-31", freq='D')})
        site_data_daily = pd.merge(site_data_daily, site_data_daily_tmp, on='date', how='left')

        ## interpolation
        site_data_daily["GPP"] = site_data_daily["GPP"].interpolate(method='linear')
        site_data_daily["GPP"] = site_data_daily["GPP"].rolling(window_size, min_periods=5, center=True).apply(lambda x : np.nanmedian(x))
        site_data_daily.loc[np.isnan(site_data_daily["GPP"]), "GPP"] = 0
        print(f"complete {site_ID} interpolation and rolling mean")

                ### --------------- get SOS, EOS every year ---------------
        site_data_daily["DOY"] = site_data_daily["date"].dt.dayofyear
        ## exclude all Feb 29 rows
        site_data_daily = site_data_daily[~((site_data_daily['date'].dt.month == 2) & (site_data_daily['date'].dt.day == 29))]
    
        ## select year
        site_data_daily['Year'] = site_data_daily['date'].dt.year
    
        ## create dataframe for SOS and EOS for each site
        df_SOS_EOS = pd.DataFrame()
        
        ## smooth and scale data for each year
        site_years = site_data_daily.loc[len(site_data_daily)-1, 'Year'] - site_data_daily.loc[0, 'Year'] + 1
        site_year_start = int(site_data_daily.loc[0, 'Year'])
        site_year_end = int(site_data_daily.loc[len(site_data_daily)-1, 'Year'])
        count = 0
        fig, axs = plt.subplots(site_years, 1, figsize=(8,3*site_years))
        # for year, group in grouped_year:
        for yt in range(site_year_start, site_year_end+1, 1):
            df_year_tmp = site_data_daily.copy()
            df_year = df_year_tmp[df_year_tmp['Year'] == yt]
            
            ## smoothing grouped_year (use the middle concat result to avoid unsmoothed tail)
            grouped_df_concat3 = pd.concat([df_year, df_year, df_year], axis=0)
            grouped_df_concat3 = grouped_df_concat3.reset_index()
    
            smoothed_df_concat3 = grouped_df_concat3.copy()
            for it in range(50):
                smoothed_df_concat3 = smooth_savgol(smoothed_df_concat3, "GPP")
            print(f"complete {site_ID} smoothing year = {yt}")

            ## scale to [0,1]
            smoothed_df = smoothed_df_concat3.copy()
            columns_to_normalize = "GPP"
            scaled_df = smoothed_df.copy()
            scaler = MinMaxScaler()
            scaled_df[columns_to_normalize] = scaler.fit_transform(smoothed_df[[columns_to_normalize]])
            print(f"complete {site_ID} scaling {yt}")

            ## get SOS, EOS
            SOS_EOS_DOY_GPP, SOS_EOS_intersect_GPP, x_result_GPP = find_SOS_EOS(scaled_df, "GPP")

            ## plot SOS, EOS
            axs[count].plot(grouped_df_concat3.index.values, scaled_df["GPP"])
            print(f"pass first line of figure {site_ID} - {yt}")
            for it in range(len(x_result_GPP)):
                axs[count].axvline(x=x_result_GPP[it], color='black', linestyle='--')
            for it in range(len(SOS_EOS_DOY_GPP)):
                if (~np.isnan(SOS_EOS_DOY_GPP[it])):
                    axs[count].plot(SOS_EOS_DOY_GPP[it], SOS_EOS_intersect_GPP[it], 'o')
                else:
                    pass
            print(f"pass second line of figure {site_ID} - {yt}")
            axs[count].set_title(f"{yt}")
            axs[count].set_xlim((365,365*2-1))
            axs[count].set_xticks(np.arange(365,365*2,60), np.arange(0,365,60)+1)
            axs[count].set_ylabel('GPP')
            
            ## fill in SOS, EOS
            df_SOS_EOS.loc[count, "site_ID"] = site_ID
            df_SOS_EOS.loc[count, "IGBP"] = site_IGBP
            df_SOS_EOS.loc[count, "year"] = int(yt)
            df_SOS_EOS.loc[count, "SOS_1_GPP"] = SOS_EOS_DOY_GPP[0]-365
            df_SOS_EOS.loc[count, "EOS_1_GPP"] = SOS_EOS_DOY_GPP[1]-365
            df_SOS_EOS.loc[count, "SOS_2_GPP"] = SOS_EOS_DOY_GPP[2]-365
            df_SOS_EOS.loc[count, "EOS_2_GPP"] = SOS_EOS_DOY_GPP[3]-365
            
            count = count + 1

            del grouped_df_concat3, smoothed_df_concat3, smoothed_df, scaled_df
            
        print(f"get {site_ID} SOS and EOS")

        ## save figure
        filename = f"scaled_SOS_EOS_{site_ID}.png"
        # filename = f"smooth_SOS_EOS_{site_ID}.png"
        fig.savefig(dir_savefig+filename, dpi=400, bbox_inches='tight')
        print(f"Saved png: {filename}")
        plt.close()
    
        # --- Save filtered site data ---
        fn_csv_out = f"{site_ID}_SOS_EOS.csv"
        df_SOS_EOS.fillna(-9999).to_csv(dir_csv_out + fn_csv_out, index=False)
        print(f"Saved csv: {fn_csv_out}")
    

        
    
    else:
        pass

    # shutil.rmtree("/burg/glab/users/rg3390/data/FLUXNET2015/ICOS/warm_winter_2020_release_2022/tmp_unzip/")
    # shutil.rmtree("/burg/glab/users/rg3390/data/FLUXNET2015/AmeriFlux/tmp_unzip/")
    # shutil.rmtree("/burg/glab/users/rg3390/data/FLUXNET2015/FLUXNET2015_zipped/tmp_unzip/")


    return None
        
            
    


In [17]:
# ------------ Run processing in parallel ------------ 
if __name__ == "__main__":
    # site_info_df = pd.DataFrame(columns = ['site_ID', 'site_year_start', 'site_year_end', 'filepath', 'site_IGBP', 'dic_varQC_first', 'dic_varQC', 'dir_save_fig', 'dir_csv_out', 'pick_variables_rename', 'pick_variables_unit_precip_hh', 'pick_variables_unit_precip_hr'])
    site_info_list = []
    for st in range(152, 158): # total_site
        site_src = df_forest_sitelist["source"].iloc[st] # site source
        site_ID = df_forest_sitelist["site_ID"].iloc[st]
        site_name = df_forest_sitelist["SITE_NAME"].iloc[st]
        site_IGBP = df_forest_sitelist["IGBP"].iloc[st]
        site_year_start = df_forest_sitelist["year_start"].iloc[st]
        site_year_end = df_forest_sitelist["year_end"].iloc[st]
        # print("start listing", site_ID)

        site_info_list.append((
                            site_ID,
                            site_name,
                            int(site_year_start),
                            int(site_year_end),
                            site_IGBP,
                            site_src
                        ))
                        
    print(f"append all {st+1} sites!")

    with ProcessPoolExecutor() as executor:
        futures = {executor.submit(process_and_plot, args): args[0] for args in site_info_list}
        for _ in tqdm(as_completed(futures), total=len(futures), desc="Processing sites"):
            pass
            



append all 158 sites!
in process_and_plot K67 and source from LBA-ECOin process_and_plot K34 and source from LBA-ECO

Processing sites:   0%|                                                                                          | 0/6 [00:00<?, ?it/s]

in process_and_plot BAN and source from LBA-ECOin process_and_plot CAX and source from LBA-ECOin process_and_plot RJA and source from LBA-ECOin process_and_plot K83 and source from LBA-ECO





choosing directory for K34 and source from LBA-ECOchoosing directory for BAN and source from LBA-ECOchoosing directory for K67 and source from LBA-ECOchoosing directory for CAX and source from LBA-ECOchoosing directory for RJA and source from LBA-ECOchoosing directory for K83 and source from LBA-ECO





read site xlsx K67
read site xlsx RJA
site = site =   K67RJA  , HH = , HH =   00

read site xlsx BAN
site =  BAN , HH =  0
complete BAN QC 2 (exclude too low and too high USTAR for fluxes)
[BAN] start outlier removing - GPP (rows=35041)
complete K67 QC 2 (exclude too low and too high USTAR for fluxes)complete RJA QC 2 (exclude too low and too high USTAR for fluxes)

[K67] start outlier removing - GPP (rows=35041)[RJA] start outlier removing - GPP (rows=35041)

read site xlsx K83
site =  K83 , HH

Processing sites:  17%|█████████████▌                                                                   | 1/6 [01:55<09:37, 115.50s/it]

[RJA] finish outlier removing - GPP
[RJA] complete QC 3 (remove outlier for fluxes)
complete RJA selecting site_data
complete RJA interpolation and rolling mean
complete RJA smoothing year = 1999
complete RJA scaling 1999
pass first line of figure RJA - 1999
pass second line of figure RJA - 1999
complete RJA smoothing year = 2000
complete RJA scaling 2000
pass first line of figure RJA - 2000
pass second line of figure RJA - 2000
complete RJA smoothing year = 2001
complete RJA scaling 2001
pass first line of figure RJA - 2001
pass second line of figure RJA - 2001
complete RJA smoothing year = 2002
complete RJA scaling 2002
pass first line of figure RJA - 2002
pass second line of figure RJA - 2002
get RJA SOS and EOS
Saved png: scaled_SOS_EOS_RJA.png
Saved csv: RJA_SOS_EOS.csv


Processing sites:  33%|███████████████████████████▎                                                      | 2/6 [01:58<03:16, 49.07s/it]

[K67] finish outlier removing - GPP
[K67] complete QC 3 (remove outlier for fluxes)
complete K67 selecting site_data
complete K67 interpolation and rolling mean
complete K67 smoothing year = 2002
complete K67 scaling 2002
pass first line of figure K67 - 2002
pass second line of figure K67 - 2002
complete K67 smoothing year = 2003
complete K67 scaling 2003
pass first line of figure K67 - 2003
pass second line of figure K67 - 2003
complete K67 smoothing year = 2004
complete K67 scaling 2004
pass first line of figure K67 - 2004
pass second line of figure K67 - 2004
complete K67 smoothing year = 2005
complete K67 scaling 2005
pass first line of figure K67 - 2005
pass second line of figure K67 - 2005
complete K67 smoothing year = 2006
complete K67 scaling 2006
pass first line of figure K67 - 2006
pass second line of figure K67 - 2006
get K67 SOS and EOS
Saved png: scaled_SOS_EOS_K67.png
Saved csv: K67_SOS_EOS.csv


Processing sites:  50%|█████████████████████████████████████████                                         | 3/6 [02:03<01:27, 29.30s/it]

complete K34 QC 2 (exclude too low and too high USTAR for fluxes)
[K34] start outlier removing - GPP (rows=70081)
[K83] finish outlier removing - GPP
[CAX] finish outlier removing - GPP
[K83] complete QC 3 (remove outlier for fluxes)
complete K83 selecting site_data
[CAX] complete QC 3 (remove outlier for fluxes)
complete CAX selecting site_data
complete K83 interpolation and rolling mean
complete CAX interpolation and rolling mean
complete K83 smoothing year = 2000
complete K83 scaling 2000
complete CAX smoothing year = 1999
complete CAX scaling 1999
pass first line of figure CAX - 1999
pass first line of figure K83 - 2000
pass second line of figure CAX - 1999
pass second line of figure K83 - 2000
complete CAX smoothing year = 2000
complete K83 smoothing year = 2001
complete CAX scaling 2000
complete K83 scaling 2001
pass first line of figure CAX - 2000
pass second line of figure CAX - 2000
pass first line of figure K83 - 2001
pass second line of figure K83 - 2001
complete CAX smoothi

Processing sites:  67%|██████████████████████████████████████████████████████▋                           | 4/6 [02:37<01:01, 30.83s/it]

Saved png: scaled_SOS_EOS_K83.png
Saved csv: K83_SOS_EOS.csv
[K34] finish outlier removing - GPP
[K34] complete QC 3 (remove outlier for fluxes)
complete K34 selecting site_data
complete K34 interpolation and rolling mean
complete K34 smoothing year = 1999
complete K34 scaling 1999
pass first line of figure K34 - 1999
pass second line of figure K34 - 1999
complete K34 smoothing year = 2000
complete K34 scaling 2000
pass first line of figure K34 - 2000
pass second line of figure K34 - 2000
complete K34 smoothing year = 2001
complete K34 scaling 2001
pass first line of figure K34 - 2001
pass second line of figure K34 - 2001
complete K34 smoothing year = 2002
complete K34 scaling 2002
pass first line of figure K34 - 2002
pass second line of figure K34 - 2002
complete K34 smoothing year = 2003
complete K34 scaling 2003
pass first line of figure K34 - 2003
pass second line of figure K34 - 2003
complete K34 smoothing year = 2004
complete K34 scaling 2004
pass first line of figure K34 - 2004


Processing sites: 100%|██████████████████████████████████████████████████████████████████████████████████| 6/6 [04:43<00:00, 47.27s/it]


In [11]:
site_info_list

[('AU-Wrr', 'Warra', 2013, 2021, 'MF', 'Ozflux'),
 ('BE-Bra', 'Brasschaat', 1996, 2020, 'MF', 'ICOS'),
 ('BE-Vie', 'Vielsalm', 1996, 2020, 'MF', 'ICOS'),
 ('BR-Sa1', 'Santarem-Km67-Primary Forest', 2002, 2011, 'EBF', 'FLUXNET'),
 ('BR-Sa3', 'Santarem-Km83-Logged Forest', 2000, 2004, 'EBF', 'FLUXNET')]

In [14]:
df_SOS_EOS

NameError: name 'df_SOS_EOS' is not defined