Author: Paul Inkenbrandt, modified by Diane Menuz for testing  
Date: October 1, 2025  
Goal: Test modifications to micromet to see about fixing problems, using Escalante as a test case. Clean up  
processing steps.

# Initialization

## Import Libraries

In [1]:
import pandas as pd
import numpy as np
import os
import geopandas as gpd
import sys
import pathlib
from pathlib import Path


import matplotlib
import matplotlib.pyplot as plt
import plotly.express as px

import pandas as pd
import numpy as np
from pandas.tseries.frequencies import to_offset
import plotly.graph_objects as go

sys.path.append("../../src/")
import micromet

%matplotlib inline

In [2]:
loggerloader_path= "C:/Users/dmenuz/Documents/scripts/loggerloader"

import sys

sys.path.append(loggerloader_path)
from loggerloader import plotlystuff

## Initialize Logger

In [3]:
import logging
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
ch = logging.StreamHandler()
ch.setFormatter(
    logging.Formatter(
        fmt="%(levelname)s [%(asctime)s] %(name)s – %(message)s",
        datefmt="%Y-%m-%d %H:%M:%S",
    )
)
logger.addHandler(ch)

# Functions

## Fill NA Drop Dups

In [4]:
def fill_na_drop_dups(df: pd.DataFrame) -> pd.DataFrame:
    for col in df.columns:
        if col.endswith(".1"):
            col1 = col[:-2]
            col2 = col
            # Treat -9999 as missing
            s1 = df[col1].replace(-9999, np.nan)
            s2 = df[col2].replace(-9999, np.nan)
            df[col1] = s1.combine_first(s2).fillna(-9999)

            df = df.drop([col2], axis=1)
        elif col.endswith(".2"):
            col1 = col[:-2]
            col2 = col
            # Treat -9999 as missing
            s1 = df[col1].replace(-9999, np.nan)
            s2 = df[col2].replace(-9999, np.nan)
            df[col1] = s1.combine_first(s2).fillna(-9999)

            df = df.drop([col2], axis=1)
    return df


## Summarize Gaps

In [5]:
def summarize_gaps(
    df: pd.DataFrame,
    station_level: str = "STATIONID",
    time_level: str = "DATETIME_END",
    expected_freq: str = "30min",
    columns: list | None = None,
) -> pd.DataFrame:
    """
    Summarize runs of missing data (NaNs) per column for each station in a
    MultiIndex DataFrame indexed by (station, datetime).

    Parameters
    ----------
    df : pd.DataFrame
        Input DataFrame with a MultiIndex (station_level, time_level).
    station_level : str, default "STATIONID"
        Name of the station level in the index.
    time_level : str, default "DATETIME_END"
        Name of the datetime level in the index.
    expected_freq : str, default "30min"
        The expected sampling frequency. Used to build a complete timeline per station
        so that missing timestamps become explicit NaNs.
    columns : list[str] | None
        Subset of columns to analyze. Defaults to all columns.

    Returns
    -------
    pd.DataFrame
        Columns:
            - STATIONID
            - COLUMN
            - GAP_START
            - GAP_END
            - N_STEPS_MISSING
            - HOURS_MISSING
            - GAP_KIND  ("MissingTimestamp", "NaN", or "Mixed")
    """
    if not isinstance(df.index, pd.MultiIndex):
        raise TypeError("df must have a MultiIndex (station, datetime).")

    if station_level not in df.index.names or time_level not in df.index.names:
        raise KeyError("MultiIndex must contain the specified station_level and time_level.")

    # Work on a sorted copy
    df = df.copy()
    df = df.sort_index()

    if columns is None:
        columns = list(df.columns)

    # Frequency as a Timedelta (e.g., 30 minutes)
    freq_td = pd.Timedelta(to_offset(expected_freq))
    hours_per_step = freq_td / pd.Timedelta(hours=1)

    records = []

    # Iterate station by station
    stations = df.index.get_level_values(station_level).unique()
    for stn in stations:
        # Slice one station: index becomes time_level
        dfx = df.xs(stn, level=station_level)

        # Ensure the time index is datetime and sorted
        time_idx = pd.to_datetime(dfx.index)
        dfx = dfx.set_index(time_idx).sort_index()
        original_idx = dfx.index

        # Build a complete timeline so *missing timestamps* are turned into NaNs
        full_idx = pd.date_range(start=original_idx.min(), end=original_idx.max(), freq=expected_freq)
        # Mask telling which timestamps were missing in the original index
        missing_row_mask = pd.Series(~pd.Index(full_idx).isin(original_idx), index=full_idx)

        # Reindex to full timeline
        dfr = dfx.reindex(full_idx)

        for col in columns:
            col_na = dfr[col].isna()
            if not col_na.any():
                continue  # no gaps for this column

            # Label contiguous runs (True/False) and keep only True-runs (gaps)
            run_id = (col_na != col_na.shift(1)).cumsum()
            for rid, run_mask in col_na.groupby(run_id):
                if not run_mask.iloc[0]:
                    continue  # this run is of non-NaNs

                run_times = run_mask.index
                gap_start = run_times[0]
                gap_end = run_times[-1]
                n_steps = int(run_mask.sum())

                # Determine the kind of gap: missing timestamps vs NaNs vs mixed
                row_missing_in_run = missing_row_mask.loc[run_times]
                if row_missing_in_run.all():
                    kind = "MissingTimestamp"
                elif not row_missing_in_run.any():
                    kind = "NaN"
                else:
                    kind = "Mixed"

                records.append(
                    {
                        "STATIONID": stn,
                        "COLUMN": col,
                        "GAP_START": gap_start,
                        "GAP_END": gap_end,
                        "N_STEPS_MISSING": n_steps,
                        "HOURS_MISSING": n_steps * hours_per_step,
                        "GAP_KIND": kind,
                    }
                )

    out = pd.DataFrame.from_records(records)
    if not out.empty:
        out = out.sort_values(["STATIONID", "COLUMN", "GAP_START"]).reset_index(drop=True)
    else:
        # Ensure expected columns even when no gaps
        out = pd.DataFrame(
            columns=["STATIONID", "COLUMN", "GAP_START", "GAP_END",
                     "N_STEPS_MISSING", "HOURS_MISSING", "GAP_KIND"]
        )
    return out


## Compare Gap Summaries

In [6]:
def compare_gap_summaries(
    gaps_a: pd.DataFrame,
    gaps_b: pd.DataFrame,
    expected_freq: str = "30min",
    min_steps: int = 1,
) -> pd.DataFrame:
    """
    Compare two gap-summary DataFrames (from `summarize_gaps`) and highlight
    where one dataset has coverage that could fill the other's gaps.

    Parameters
    ----------
    gaps_a, gaps_b : pd.DataFrame
        DataFrames returned by `summarize_gaps`. Must include the columns:
        ['STATIONID','COLUMN','GAP_START','GAP_END','N_STEPS_MISSING','HOURS_MISSING','GAP_KIND'].
    expected_freq : str, default "30min"
        Sampling frequency. Used to compute discrete step counts and to
        treat intervals on the expected time grid.
    min_steps : int, default 1
        Only report fillable segments with at least this many steps.

    Returns
    -------
    pd.DataFrame
        One row per *fillable segment*.
        Columns:
            - TARGET_DATASET   ("A" or "B")
            - SOURCE_DATASET   ("B" or "A")
            - STATIONID
            - COLUMN
            - TARGET_GAP_START
            - TARGET_GAP_END
            - FILLABLE_START
            - FILLABLE_END
            - N_STEPS_FILLABLE
            - HOURS_FILLABLE
            - TARGET_N_STEPS_MISSING
            - COVERAGE_RATIO    (steps_fillable / TARGET_N_STEPS_MISSING)
            - TARGET_GAP_KIND
    """
    req = {"STATIONID","COLUMN","GAP_START","GAP_END","N_STEPS_MISSING"}
    for name, g in [("gaps_a", gaps_a), ("gaps_b", gaps_b)]:
        missing = req - set(g.columns)
        if missing:
            raise KeyError(f"{name} missing required columns: {missing}")

    # Normalize dtypes and sort
    def _prep(g):
        g = g.copy()
        g["GAP_START"] = pd.to_datetime(g["GAP_START"])
        g["GAP_END"] = pd.to_datetime(g["GAP_END"])
        if "GAP_KIND" not in g.columns:
            g["GAP_KIND"] = "Unknown"
        return g.sort_values(["STATIONID","COLUMN","GAP_START","GAP_END"]).reset_index(drop=True)

    gaps_a = _prep(gaps_a)
    gaps_b = _prep(gaps_b)

    freq_td = to_offset(expected_freq).delta
    hours_per_step = freq_td / pd.Timedelta(hours=1)

    # Build a quick lookup: for each (station, column), list of (start, end) gaps
    def _build_lookup(g):
        d = {}
        for (stn, col), sub in g.groupby(["STATIONID","COLUMN"], sort=False):
            d[(stn, col)] = list(zip(sub["GAP_START"], sub["GAP_END"]))
        return d

    gapsB_lookup = _build_lookup(gaps_b)
    gapsA_lookup = _build_lookup(gaps_a)

    def _steps_inclusive(s, e):
        # number of discrete samples on the regular grid from s..e inclusive
        return int(((e - s) // freq_td) + 1)

    def _subtract_interval(base, subtracts):
        """Given a base [a0,a1] (inclusive, on grid) and a list of
        subtract intervals (inclusive), return list of remaining
        inclusive intervals on the same grid."""
        a0, a1 = base
        if a0 > a1:
            return []
        # Clip subtracts to base
        cl = []
        for s, e in subtracts:
            s1 = max(s, a0)
            e1 = min(e, a1)
            if s1 <= e1:
                cl.append((s1, e1))
        if not cl:
            return [(a0, a1)]
        cl.sort(key=lambda x: x[0])

        segs = []
        cur = a0
        for s, e in cl:
            # segment before s (subtract is inclusive)
            before_end = s - freq_td
            if cur <= before_end:
                segs.append((cur, before_end))
            # skip the subtracted run
            cur = e + freq_td
            if cur > a1:
                break
        if cur <= a1:
            segs.append((cur, a1))
        return segs

    def _direction_fill(target_gaps, source_lookup, target_label, source_label):
        """Compute fillable segments where `source` can fill `target`."""
        out_rows = []
        for _, r in target_gaps.iterrows():
            key = (r["STATIONID"], r["COLUMN"])
            base = (r["GAP_START"], r["GAP_END"])
            subtracts = source_lookup.get(key, [])
            fill_segments = _subtract_interval(base, subtracts)
            for fs, fe in fill_segments:
                steps = _steps_inclusive(fs, fe)
                if steps < min_steps:
                    continue
                out_rows.append({
                    "TARGET_DATASET": target_label,
                    "SOURCE_DATASET": source_label,
                    "STATIONID": r["STATIONID"],
                    "COLUMN": r["COLUMN"],
                    "TARGET_GAP_START": r["GAP_START"],
                    "TARGET_GAP_END": r["GAP_END"],
                    "FILLABLE_START": fs,
                    "FILLABLE_END": fe,
                    "N_STEPS_FILLABLE": steps,
                    "HOURS_FILLABLE": steps * hours_per_step,
                    "TARGET_N_STEPS_MISSING": int(r["N_STEPS_MISSING"]),
                    "COVERAGE_RATIO": steps / int(r["N_STEPS_MISSING"]),
                    "TARGET_GAP_KIND": r.get("GAP_KIND", "Unknown"),
                })
        if not out_rows:
            return pd.DataFrame(columns=[
                "TARGET_DATASET","SOURCE_DATASET","STATIONID","COLUMN",
                "TARGET_GAP_START","TARGET_GAP_END","FILLABLE_START","FILLABLE_END",
                "N_STEPS_FILLABLE","HOURS_FILLABLE","TARGET_N_STEPS_MISSING",
                "COVERAGE_RATIO","TARGET_GAP_KIND"
            ])
        return pd.DataFrame(out_rows).sort_values(
            ["STATIONID","COLUMN","TARGET_GAP_START","FILLABLE_START"]
        ).reset_index(drop=True)

    # B can fill A (subtract A's gaps by B's gaps)
    fill_B_to_A = _direction_fill(gaps_a, gapsB_lookup, target_label="A", source_label="B")
    # A can fill B
    fill_A_to_B = _direction_fill(gaps_b, gapsA_lookup, target_label="B", source_label="A")

    # Combine
    combined = pd.concat([fill_B_to_A, fill_A_to_B], ignore_index=True)
    return combined.sort_values(
        ["STATIONID","COLUMN","TARGET_DATASET","TARGET_GAP_START","FILLABLE_START"]
    ).reset_index(drop=True)


## New Functions

In [7]:
# check column names vs. ameriflux
import pandas as pd
from typing import List, Dict, Union

def process_and_match_columns(
    df_full: pd.DataFrame,
    amflux: Union[pd.DataFrame, pd.Series]
) -> pd.DataFrame:
    """
    Cleans column names of df_full by removing '_1', '_2', '_3', and '_4' 
    suffixes, compares the cleaned names against an 'amflux' variable list, 
    and returns a DataFrame of the results, along with printing the unmatched columns.

    Args:
        df_full: The DataFrame whose columns need to be cleaned and matched.
        amflux: A DataFrame or Series that contains the 'Variable' column 
                or is the Series of variables to match against.

    Returns:
        A DataFrame containing the original columns, the cleaned columns, 
        and a boolean indicating if the cleaned column is in the amflux list.
    """
    
    # 1. Column Cleaning Logic
    clean_columns = list(df_full.columns)
    
    # Iteratively remove suffixes: '_1', '_2', '_3', '_4'
    # This loop is a condensed way to achieve the same result as the four 
    # separate list comprehensions in the original code.
    suffixes_to_remove = ['_1', '_2', '_3', '_4']
    
    for suffix in suffixes_to_remove:
        clean_columns = [item.split(suffix)[0] for item in clean_columns]

    clean_columns_series = pd.Series(clean_columns)
    
    # 2. Determine the AMERIFLUX Variable List for Matching
    # Handle both Series and DataFrame inputs for amflux
    if isinstance(amflux, pd.DataFrame) and 'Variable' in amflux.columns:
        amflux_variables = amflux['Variable']
    elif isinstance(amflux, pd.Series):
        amflux_variables = amflux
    else:
        raise ValueError("The 'amflux' argument must be a pandas Series or a DataFrame with a 'Variable' column.")

    # 3. Matching
    is_in_amflux = clean_columns_series.isin(amflux_variables)
    
    # 4. Create Results DataFrame
    results_df = pd.DataFrame({
        'all_columns': df_full.columns,
        'clean_columns': clean_columns,
        'is_in_amflux': is_in_amflux
    })

    # 5. Print and Return
    unmatched_df = results_df[results_df.is_in_amflux == False].sort_values('clean_columns')
    
    print('COLUMNS NOT IN AMERIFLUX VARIABLE LIST\n')
    print(unmatched_df)
    
    return results_df
    

In [8]:
# validate test variables to equal 0, 1, 2

import pandas as pd
from typing import List, Dict

def validate_flags(df: pd.DataFrame, 
                   flag_columns: List[str] = ['FC_SSITC_TEST', 'LE_SSITC_TEST', 'ET_SSITC_TEST', 'H_SSITC_TEST',
       'TAU_SSITC_TEST'], 
                   allowed_values: List[int] = [0, 1, 2]) -> Dict[str, List]:
    """
    Checks specified DataFrame columns for values outside of the allowed set,
    including checking for NaN (missing) values.

    This is typically used for quality control (QC) flag columns which should 
    only contain specific integer values (like 0, 1, 2).

    Parameters
    ----------
    df : pd.DataFrame
        The input DataFrame containing the flag columns.
    flag_columns : List[str]
        A list of column names to check.
    allowed_values : List[int]
        The list of values considered valid (defaults to [0, 1, 2]).

    Returns
    -------
    Dict[str, List]
        A dictionary where keys are the column names that failed validation,
        and values are a list of the unique, invalid values found in that column,
        including the string "NaN" if missing values are present.
    """
    
    # Convert allowed_values to a set for faster lookup
    allowed_set = set(allowed_values)
    
    # Dictionary to store results for columns that fail the validation
    invalid_columns = {}

    print(f"--- Starting Validation ---")
    print(f"Checking columns: {flag_columns}")
    print(f"Allowed values: {allowed_set}")

    for col in flag_columns:
        if col not in df.columns:
            print(f"Warning: Column '{col}' not found in DataFrame.")
            continue

        # 1. Find all unique values in the series, including NaNs
        unique_values = df[col].unique()

        # 2. Separate NaNs, valid flags, and invalid numeric flags
        invalid_numeric_flags = []
        nan_present = False
        
        for val in unique_values:
            if pd.isna(val):
                nan_present = True
            elif val not in allowed_set:
                invalid_numeric_flags.append(val)

        # 3. Construct the final report list (numeric values first, then "NaN" indicator)
        final_report_list = sorted(invalid_numeric_flags)
        
        if nan_present:
            final_report_list.append("NaN")
            
        if final_report_list:
            invalid_columns[col] = final_report_list
            print(f"FAIL: Column '{col}' contains unexpected values: {final_report_list}")
        else:
            print(f"PASS: Column '{col}' contains only valid values.")

    print(f"--- Validation Complete ---")
    return invalid_columns


In [9]:
# compare alignment between two files (one raw that is read in and one from micromet)
def compare_to_raw(raw_file_path, micromet_df, test_var = 'NETRAD', threshold=0.1):
    '''Compares a specific variable between a raw data file and a micromet DataFrame.

    The function reads a 'raw' DAT or CSV file from the provided path, merges it with the 
    'micromet' DataFrame based on TIMESTAMP to DATETIME_END fields, and calculates the absolute
    difference for a specified variable (`test_var`) between the two sources. It 
    returns only the rows where this absolute difference is greater than the given 
    `threshold`.

    Args:
        raw_file_path (str): The file path to the raw data CSV file. This file is 
                             assumed to have a specific format (header on row 1, with 
                             rows 2 and 3 skipped).
        micromet_df (pd.DataFrame): DataFrame containing the micrometeorological data.
        test_var (str, optional): The variable to compare (e.g., 'LE' for Latent Energy). 
                                  Defaults to 'LE'. The function assumes the raw 
                                  column is named '{test_var}_1_1_1' and the micromet 
                                  column is named '{test_var}'.
        threshold (float, optional): The absolute difference threshold. Rows where 
                                     |raw_value - micromet_value| > threshold are returned. 
                                     Defaults to 0.1.

    Returns:
        pd.DataFrame: A DataFrame containing the 'DATETIME_END' and the values of the 
                      `test_var` from both sources ('{test_var}_1_1_1' and '{test_var}') 
                      for all rows where the absolute difference exceeds the `threshold`.
    '''
    raw = pd.read_csv(raw_file_path, skiprows=[2,3], header=1, low_memory=False)
    raw['TIMESTAMP'] = pd.to_datetime(raw['TIMESTAMP'])

    combo = raw.merge(micromet_df, how='inner', left_on='TIMESTAMP', right_on='DATETIME_END',
                      suffixes=['_raw', '_micromet'])

    le_diff = combo[f'{test_var}_1_1_1'] -combo[f'{test_var}'].astype('float')
    value_differences = combo.loc[(le_diff.abs()>threshold), ['DATETIME_END',f'{test_var}_1_1_1', f'{test_var}']]
    return(value_differences)

In [29]:
def validate_timestamp_consistency(df: pd.DataFrame) -> pd.DataFrame:
    """
    Checks for consistency between a standardized datetime column (DATETIME_END)
    and a string/integer timestamp column (TIMESTAMP_START) formatted as YYYYMMDDHHMM.

    Parameters
    ----------
    df : pd.DataFrame
        The input DataFrame containing the columns to check.

    Returns
    -------
    pd.DataFrame
        A DataFrame containing only the rows where the DATETIME_END and
        the converted TIMESTAMP_END columns do not match, along with both columns
        for inspection. Returns an empty DataFrame if all rows match.
    """
    df = df.copy()
    
    REQUIRED_COLS = ['DATETIME_END', 'TIMESTAMP_END']

    if not all(col in df.columns for col in REQUIRED_COLS):
        print(f"Error: DataFrame must contain both {REQUIRED_COLS} columns.")
        return pd.DataFrame()

    print("\n--- Starting Timestamp Consistency Validation ---")

    # Ensure DATETIME_END is properly parsed datetime object
    df['DATETIME_END_DT'] = pd.to_datetime(df['DATETIME_END'], errors='coerce')

    # Convert TIMESTAMP_END (e.g., 202406241430) to a datetime object
    # We convert to string first to handle both int and string inputs
    df['TIMESTAMP_END_DT'] = pd.to_datetime(
        df['TIMESTAMP_END'].astype(str), 
        format='%Y%m%d%H%M', 
        errors='coerce'
    )

    # Compare the two generated datetime columns
    # We use .notna() to ignore rows where either conversion failed (coerced to NaT)
    mismatch_mask = (df['DATETIME_END_DT'] != df['TIMESTAMP_END_DT']) & \
                    (df['DATETIME_END_DT'].notna()) & \
                    (df['TIMESTAMP_END_DT'].notna())

    # Filter for mismatches and report
    mismatch_report = df.loc[mismatch_mask, REQUIRED_COLS + ['DATETIME_END_DT', 'TIMESTAMP_END_DT']].copy()
    
    if mismatch_report.empty:
        print("PASS: DATETIME_END and TIMESTAMP_END are perfectly consistent (where both are valid).")
    else:
        print(f"FAIL: Found {len(mismatch_report)} inconsistent rows.")
        
    print("--- Timestamp Consistency Validation Complete ---")


    
    return mismatch_report

# Define the root folder for the data

In [11]:
raw_fold = pathlib.Path(f'M:/Shared drives/UGS_Flux/Data_Downloads/compiled')

#amflux column data
amflux = pd.read_csv(r'M:\Shared drives\UGS_Flux\Data_Downloads\compiled\flux-met_processing_variables_20250818.csv')

# Review Some Raw Files for Escalante

In [None]:
raw_folder = r'M:\My Drive\projects\eddy_covariance\site_specific_data_review'
rawcs = pd.read_csv(Path(raw_folder, 'Escalante_Flux_CSFormat.dat'),
                    skiprows=[2,3], header=1)
rawaf = pd.read_csv(Path(raw_folder, 'Escalante_Flux_AmeriFluxFormat.dat'),
                    skiprows=[2,3], header=1)

In [None]:
print(rawcs.columns.sort_values())
print(rawaf.columns.sort_values())

In [None]:
# look at soil heat flux = 0 records
rawcs.loc[rawcs.G_plate_1_1_2==0, ['TIMESTAMP', 'G_plate_1_1_2', 'SG_1_1_2', 'G_1_1_2']].head(3)

In [None]:
# look at datetime alignment- looks aligned!
# note that cs only has a timestamp column and af has timestamp, timestamp_end and timestamp_start columns!!
combo = rawcs.merge(rawaf, how='inner', left_on='TIMESTAMP',right_on='TIMESTAMP', suffixes=['_cs', '_af'])
combo[['TIMESTAMP', 'TIMESTAMP_END', 'LE_cs', 'LE_af']].head(3)

In [None]:
rawaf[['TIMESTAMP', 'TIMESTAMP_START', 'TIMESTAMP_END']].head(3)

# Run Compilation

Define the site folders and stations

In [None]:
site_folders = {'US-UTD':'Dugout_Ranch',
                'US-UTB':'BSF',
                'US-UTJ':'Bluff',
                'US-UTW':'Wellington',
                'US-UTE':'Escalante',
                'US-UTM':'Matheson',
                'US-UTP':'Phrag',
                'US-CdM':'Cedar_mesa',
                'US-UTV':'Desert_View_Myton',
                'US-UTN':'Juab',
                'US-UTG':'Green_River',
                'US-UTL':'Pelican_Lake',
                }

loggerids = {
    "eddy": {
        "US-UTD": 21314,
        "US-UTB": 27736,
        "US-UTJ": 21020,
        "US-UTW": 21025,
        "US-UTE": 21021,
        "US-UTM": 21029,
        "US-UTP": 8442,
        "US-CdM": 21313,
        "US-UTV": 21027,
        "US-UTN": 8441,
        "US-UTG": 25415,
        "US-UTL": 21215,
    },
    "met": {
        "US-UTD": 21031,
        "US-UTB": 27736,
        "US-UTJ": 21030,
        "US-UTW": 21026,
        "US-UTE": 21032,
        "US-UTM": 21023,
        "US-UTP": 8441,
        "US-CdM": 21029,
        "US-UTV": 21311,
        "US-UTG": 25414,
        "US-UTL": 21028,
    },
}

# Met

### Compile Met Statistics Tables

In [None]:

stats = {}
for key, value in site_folders.items():
    print(f"Processing site: {key} - {value}")
    parent_fold = raw_fold / f"{key}" / "Statistics"
    am_df = {}
    i=0
    #raw_data = micromet.raw_file_compile(raw_fold, parent_fold, search_str = "TOA5*Statistics*.dat")    
    for file_name in parent_fold.glob("TOA5*Statistics*.dat"):
        i += 1
        #print(f"Processing file: {file_name}")
        sts = pd.read_csv(file_name, skiprows = [0,2,3])
        for col in sts.columns:
            if col.endswith("_Avg"):
                sts.rename(columns={col: col[:-4]}, inplace=True)
            elif col.endswith("_Tot"):
                sts.rename(columns={col: col[:-4]}, inplace=True)
        sts['TIMESTAMP'] = pd.to_datetime(sts['TIMESTAMP'])
        sts['DATETIME_END'] = sts['TIMESTAMP']
        sts["TIMESTAMP_START"] = sts["TIMESTAMP"].apply(lambda x: f"{x:%Y%m%d%H%M}")
        am_data = micromet.Reformatter(drop_soil=False, logger=logger,)
        #raw_data = raw_data.drop([0], axis=0)
        df, report = am_data.prepare(sts, data_type="met")
        am_df[file_name.stem] = df
    if i > 0:
        stats[key] = pd.concat(am_df)



In [None]:
stats_met = pd.concat(stats)
stats_met = stats_met.reset_index().rename(columns={'level_0':'STATIONID'})
stats_met = stats_met.drop(['level_1'],axis=1)
stats_met = stats_met.drop_duplicates(subset=['STATIONID','DATETIME_END'])
stats_met = stats_met.set_index(['STATIONID','DATETIME_END'])
stats_met = stats_met.mask(stats_met < -5000)
stats_met['DATALOGGER_NO'] = stats_met.index.get_level_values(0).map(loggerids['met'])
stats_met.to_parquet(raw_fold / "comp_met_stat.parquet")
gaps_metstat = summarize_gaps(stats_met)

gaps_metstat.to_parquet(raw_fold / "gaps_metstat.parquet")
print(stats_met.index.get_level_values(0).unique())
for col in sorted(stats_met.columns):
    print(col)


bal = stats_met.loc['US-UTE'].sort_index()

fig = go.Figure()
fig.add_trace(go.Scatter(x=bal.index, y=bal['NETRAD_1_1_2'].shift(-1)*2, mode="lines", name="NETRAD",
                         hovertemplate="Date: %{x|%Y-%m-%d %H:%M}<br>B: %{y:.2f}<extra></extra>"))
fig.update_layout(
    title="Two Time Series (single plot)",
    xaxis=dict(title="Time", rangeslider=dict(visible=True), type="date"),
    yaxis_title="Value",
    legend=dict(orientation="h", y=1.02, x=1, xanchor="right", yanchor="bottom"),
    height=500, margin=dict(l=60, r=30, t=60, b=40),
)
fig.show()

### Compile Statistics Ameriflux .dat Tables

In [None]:
comp_met_df = {}
outlier_report = {}

am = micromet.AmerifluxDataProcessor(logger=logger)

for key, value in site_folders.items():

    parent_fold = raw_fold / f"{key}" / "Statistics_Ameriflux"
    raw_data = am.raw_file_compile(raw_fold, parent_fold, search_str = "*Statistics_AmeriFlux*.dat")
    if raw_data is not None:
        am_data = micromet.Reformatter(drop_soil=False,
                                       logger=logger,
                                       )
        #raw_data = raw_data.drop([0], axis=0)
        am_df, report = am_data.prepare(raw_data, data_type="met")
        comp_met_df[key] = am_df
        outlier_report[key] = report

        timestart = am_df['TIMESTAMP_START'].values[0]
        timeend = am_df['TIMESTAMP_END'].values[-1]

        am_df.to_csv(raw_fold / f"{key}" / f"{key}-met_HH_{timestart:}_{timeend:}.csv")


In [None]:
comp_met = pd.concat(comp_met_df)

comp_met = comp_met.reset_index().rename(columns={'level_0':'STATIONID'})
#comp_met = comp_met.drop(['level_1'],axis=1)
comp_met = comp_met.drop_duplicates(subset=['STATIONID','DATETIME_END'])
comp_met = comp_met.set_index(['STATIONID','DATETIME_END'])
comp_met = comp_met.mask(comp_met < -5000)
comp_met['DATALOGGER_NO'] = comp_met.index.get_level_values(0).map(loggerids['met'])
gaps_met = summarize_gaps(comp_met)

gaps_met.to_parquet(raw_fold / "gaps_met.parquet")
comp_met.to_parquet(raw_fold / "comp_met.parquet")

out_report_met = pd.concat(outlier_report)
out_report_met.to_csv(raw_fold / "outlier_report_met.csv")

print(comp_met.index.get_level_values(0).unique())
for col in sorted(comp_met.columns):
    print(col)

In [None]:
bal = comp_met.loc['US-UTV']

fig = go.Figure()
fig.add_trace(go.Scatter(x=bal.index, y=bal['NETRAD_1_1_2'].shift(-1)*2, mode="lines", name="NETRAD",
                         hovertemplate="Date: %{x|%Y-%m-%d %H:%M}<br>B: %{y:.2f}<extra></extra>"))
fig.update_layout(
    title="Two Time Series (single plot)",
    xaxis=dict(title="Time", rangeslider=dict(visible=True), type="date"),
    yaxis_title="Value",
    legend=dict(orientation="h", y=1.02, x=1, xanchor="right", yanchor="bottom"),
    height=500, margin=dict(l=60, r=30, t=60, b=40),
)
fig.show()

# Eddy

In [12]:
site_folders = {#'US-UTD':'Dugout_Ranch',
                #'US-UTB':'BSF',
                #'US-UTJ':'Bluff',
                #'US-UTW':'Wellington',
                'US-UTE':'Escalante',
                #'US-UTM':'Matheson',
                #'US-UTP':'Phrag',
                #'US-CdM':'Cedar_mesa',
                #'US-UTV':'Desert_View_Myton',
                #'US-UTN':'Juab',
                #'US-UTG':'Green_River',
                #'US-UTL':'Pelican_Lake',
                }

## Compile data from an individual Dat file

In [None]:
easyfluxdf = {}
ef_reports = {}

file = r'M:\My Drive\projects\eddy_covariance\site_specific_data_review\Escalante_Flux_AmeriFluxFormat.dat'


am_data = micromet.Reformatter(drop_soil=True,
                                    logger=logger,
                                    )
df = pd.read_csv(file,skiprows=[0,2,3],
                na_values=[-9999,"NAN","NaN","nan"])

df['TIMESTAMP'] = pd.to_datetime(df['TIMESTAMP'])

am_df, report = am_data.prepare(df, data_type="eddy")


In [None]:
# compare TEST vs. raw
rawaf = pd.read_csv(Path(raw_folder, 'Escalante_Flux_AmeriFluxFormat.dat'),
                    skiprows=[2,3], header=1,
                    na_values=[-9999,"NAN","NaN","nan"])

category_mapping = {1:0, 2:0, 3:0, 4:1, 5:1, 6:1, 7:2, 8:2, 9:2}
rawaf['reclass_fc_test'] = rawaf['FC_SSITC_TEST'].replace(category_mapping)

print(rawaf['reclass_fc_test'].value_counts())
print(am_df['FC_SSITC_TEST'].value_counts())
print(rawaf.FC_SSITC_TEST.isna().sum())


In [None]:
print(rawaf.columns.sort_values())
rawaf.loc[(rawaf.TAU_SSITC_TEST.isna()) & (~rawaf.TAU.isna()),
          [ 'TAU', 'TAU_SSITC_TEST']]

In [None]:
rawaf['converted_FC_TEST'] = rawaf['FC_SSITC_TEST'].astype('float', errors='ignore')

# 2. Apply the reclassification function to the integer series.
rawaf['converted_FC_TEST'] = rawaf['converted_FC_TEST'].apply(rating)

# The final result will be the value counts of the reclassified categories (0, 1, 2).
rawaf.loc[rawaf.FC_SSITC_TEST.isna(),['converted_FC_TEST', 'FC_SSITC_TEST']]


## Compile Downloaded Eddy Data from EasyFluxWeb

In [None]:
# processing one .dat file per station that is in station folder with "*_Flux_AmeriFluxFormat.dat"
easyfluxdf = {}
ef_reports = {}

for key, value in site_folders.items():
    site_dir = raw_fold / key
    print(site_dir)
    for file in site_dir.glob("*_Flux_AmeriFluxFormat.dat"):
        print(file)
        am_data = micromet.Reformatter(drop_soil=True,
                                            logger=logger,
                                            )
        df = pd.read_csv(file,skiprows=[0,2,3],
                        na_values=[-9999,"NAN","NaN","nan"])
        
        am_df, report = am_data.prepare(df, data_type="eddy")
        easyfluxdf[key] = am_df
        ef_reports[key] = report

ef_report = pd.concat(ef_reports, axis=1)
easyflux = pd.concat(easyfluxdf)

ef_report.to_csv(raw_fold / "easyflux_report_diane.csv")

INFO [2025-10-02 13:41:41] __main__ – Starting reformat (20267 rows)
DEBUG [2025-10-02 13:41:41] __main__ – TS col TIMESTAMP_END
DEBUG [2025-10-02 13:41:41] __main__ – TIMESTAMP_END col 202406241500


M:\Shared drives\UGS_Flux\Data_Downloads\compiled\US-UTE
M:\Shared drives\UGS_Flux\Data_Downloads\compiled\US-UTE\Escalante_Flux_AmeriFluxFormat.dat


DEBUG [2025-10-02 13:41:41] __main__ – Len of unfixed timestamps 20267
DEBUG [2025-10-02 13:41:41] __main__ – Len of fixed timestamps 20267
DEBUG [2025-10-02 13:41:41] __main__ – Renaming columns from Index(['RECORD', 'TIMESTAMP_START', 'TIMESTAMP_END', 'CO2', 'CO2_SIGMA', 'H2O',
       'H2O_SIGMA', 'FC', 'FC_SSITC_TEST', 'LE', 'LE_SSITC_TEST', 'ET',
       'ET_SSITC_TEST', 'H', 'H_SSITC_TEST', 'G', 'SG', 'FETCH_MAX',
       'FETCH_90', 'FETCH_55', 'FETCH_40', 'WD', 'WS', 'WS_MAX', 'USTAR', 'ZL',
       'TAU', 'TAU_SSITC_TEST', 'MO_LENGTH', 'U', 'U_SIGMA', 'V', 'V_SIGMA',
       'W', 'W_SIGMA', 'PA', 'TA_1_1_1', 'RH_1_1_1', 'T_DP_1_1_1', 'TA_1_1_2',
       'RH_1_1_2', 'T_DP_1_1_2', 'TA_1_1_3', 'RH_1_1_3', 'T_DP_1_1_3',
       'TA_1_1_4', 'VPD', 'T_SONIC', 'T_SONIC_SIGMA', 'PBLH', 'TS_1_1_1',
       'TS_1_1_2', 'SWC_1_1_1', 'SWC_1_1_2', 'ALB', 'NETRAD', 'SW_IN',
       'SW_OUT', 'LW_IN', 'LW_OUT', 'P', 'DATETIME_END'],
      dtype='object') to {'ET': 'ET_1_1_1', 'LE': 'LE_1_1_1', 'H': '

In [121]:
# review variables with a lot of dropped values based on the report
report_stacked = ef_report.stack(level=0)

report_final = report_stacked.reset_index(level=1)
report_final = report_final.rename(columns={'level_1': 'STATIONID'})

report_final[report_final.pct_flagged>=10]





Unnamed: 0,STATIONID,column,matched_key,min,max,n_below,n_above,n_flagged,pct_flagged
24,US-UTE,RECORD,RECO,-20.0,50.0,0,20015,20015,94.830854
21,US-UTE,SW_IN_1_1_1,SW_IN,0.0,1300.0,9573,8,9581,45.394675
50,US-UTE,G_1_1_A,G,-250.0,400.0,96,6357,6453,30.574244
43,US-UTE,ET_1_1_1,ET,0.0,20.0,2322,0,2322,11.001611


In [122]:
# run various tests on data
raw_file = r'M:\My Drive\projects\eddy_covariance\site_specific_data_review\Escalante_Flux_AmeriFluxFormat.dat'

easyflux = easyflux.reset_index().rename(columns={'level_0':'STATIONID'})

validate_flags(easyflux)
print('\n')

results = process_and_match_columns(easyflux, amflux)
print('\n')

validate_timestamp_consistency(easyflux)
print('\n')

differences_from_raw = compare_to_raw(raw_file, easyflux, test_var='NETRAD', threshold=0.1)
print('\n')
print(f'Differences between raw and micromet files')
print(differences_from_raw)

--- Starting Validation ---
Checking columns: ['FC_SSITC_TEST', 'LE_SSITC_TEST', 'ET_SSITC_TEST', 'H_SSITC_TEST', 'TAU_SSITC_TEST']
Allowed values: {0, 1, 2}
PASS: Column 'FC_SSITC_TEST' contains only valid values.
PASS: Column 'LE_SSITC_TEST' contains only valid values.
PASS: Column 'ET_SSITC_TEST' contains only valid values.
PASS: Column 'H_SSITC_TEST' contains only valid values.
PASS: Column 'TAU_SSITC_TEST' contains only valid values.
--- Validation Complete ---


COLUMNS NOT IN AMERIFLUX VARIABLE LIST

      all_columns  clean_columns  is_in_amflux
1    DATETIME_END   DATETIME_END         False
12       ET_1_1_1             ET         False
13  ET_SSITC_TEST  ET_SSITC_TEST         False
21       FETCH_40          FETCH         False
20       FETCH_55       FETCH_55         False
50         PBLH_F         PBLH_F         False
0       STATIONID      STATIONID         False
39     T_DP_1_1_1           T_DP         False
42     T_DP_1_2_1           T_DP         False
45     T_DP_1_3_1

In [126]:
# check for any duplicates and export parquette file
easyflux_final = easyflux.reset_index().rename(columns={'level_0':'STATIONID'})
if len (easyflux_final[easyflux_final.duplicated(subset=['STATIONID','DATETIME_END'])])>0:
    print('FAIL: STATIONID AND DATETIME_END DUPLICATES PRESENT')
    print('DROPPING DUPLICATES')
    easyflux_final = easyflux_final.drop_duplicates(subset=['STATIONID','DATETIME_END'])

else:
    print("PASS: NO STATIONID AND DATETIME_END DUPLICATES")

easyflux_final = easyflux_final.set_index(['STATIONID','DATETIME_END'])
easyflux_final = easyflux_final.mask(easyflux_final < -5000)
easyflux_final.to_parquet(raw_fold / "easyflux_diane.parquet")


PASS: NO STATIONID AND DATETIME_END DUPLICATES


In [127]:
# summarize and view data gaps (view for just one station)

gaps_easyflux = summarize_gaps(easyflux_final)

bal = easyflux_final.loc['US-UTE']
plotlystuff([bal, bal], ['LE_1_1_1', 'NETRAD_1_1_1'])

## Compile Ameriflux Format dat files from Dataloggers

In [32]:
# this script is using raw_file_compile for each site
# which I assume is helping fix header issues, etc.
# then exporting data to a csv file that will be read in to create a file with all of the data
comp_edd_df = {}
outlier_reports = {}

am = micromet.AmerifluxDataProcessor(logger=logger)

for key, value in site_folders.items():

    parent_fold = raw_fold / f"{key}" / "AmeriFluxFormat"
    #ahp.scan(parent_fold, min_sim=0.3, backup=False)
    #pths = micromet.fix_all_in_parent(parent_fold)
    print(raw_fold)
    raw_data = am.raw_file_compile(raw_fold, parent_fold, search_str = "*Flux_AmeriFluxFormat*.dat")
    if raw_data is not None:
        am_data = micromet.Reformatter(drop_soil=False,
                                       logger=logger,
                                       )
        #raw_data = raw_data.drop([0], axis=0)
        am_df, report = am_data.prepare(raw_data, data_type="eddy")
        comp_edd_df[key] = am_df
        outlier_reports[key] = report

        timestart = am_df['TIMESTAMP_START'].values[0]
        timeend = am_df['TIMESTAMP_END'].values[-1]
        am_df.to_csv(raw_fold / f"{key}" / f"{key}_HH_{timestart:}_{timeend:}_diane.csv")

outlier_report = pd.concat(outlier_reports, axis=1)
datalogger_dat = pd.concat(comp_edd_df)



INFO [2025-10-02 11:14:17] __main__ – Compiling data from M:\Shared drives\UGS_Flux\Data_Downloads\compiled\US-UTE\AmeriFluxFormat
INFO [2025-10-02 11:14:17] __main__ – Processing file: M:\Shared drives\UGS_Flux\Data_Downloads\compiled\US-UTE\AmeriFluxFormat\21021_Flux_AmeriFluxFormat_5_2.dat
DEBUG [2025-10-02 11:14:17] __main__ – 2 -> 21021
DEBUG [2025-10-02 11:14:18] __main__ – Header row detected: ['TIMESTAMP_START', 'TIMESTAMP_END', 'CO2', 'CO2_SIGMA', 'H2O', 'H2O_SIGMA', 'FC', 'FC_SSITC_TEST', 'LE', 'LE_SSITC_TEST', 'ET', 'ET_SSITC_TEST', 'H', 'H_SSITC_TEST', 'G', 'SG', 'FETCH_MAX', 'FETCH_90', 'FETCH_55', 'FETCH_40', 'WD', 'WS', 'WS_MAX', 'USTAR', 'ZL', 'TAU', 'TAU_SSITC_TEST', 'MO_LENGTH', 'U', 'U_SIGMA', 'V', 'V_SIGMA', 'W', 'W_SIGMA', 'PA', 'TA_1_1_1', 'RH_1_1_1', 'T_DP_1_1_1', 'TA_1_1_2', 'RH_1_1_2', 'T_DP_1_1_2', 'TA_1_1_3', 'RH_1_1_3', 'T_DP_1_1_3', 'TA_1_1_4', 'VPD', 'T_SONIC', 'T_SONIC_SIGMA', 'PBLH', 'TS_1_1_1', 'TS_1_1_2', 'SWC_1_1_1', 'SWC_1_1_2', 'ALB', 'NETRAD', 'SW_

M:\Shared drives\UGS_Flux\Data_Downloads\compiled


DEBUG [2025-10-02 11:14:18] __main__ – 1 -> 21021
DEBUG [2025-10-02 11:14:18] __main__ – Header row detected: ['TIMESTAMP_START', 'TIMESTAMP_END', 'CO2', 'CO2_SIGMA', 'H2O', 'H2O_SIGMA', 'FC', 'FC_SSITC_TEST', 'LE', 'LE_SSITC_TEST', 'ET', 'ET_SSITC_TEST', 'H', 'H_SSITC_TEST', 'G', 'SG', 'FETCH_MAX', 'FETCH_90', 'FETCH_55', 'FETCH_40', 'WD', 'WS', 'WS_MAX', 'USTAR', 'ZL', 'TAU', 'TAU_SSITC_TEST', 'MO_LENGTH', 'U', 'U_SIGMA', 'V', 'V_SIGMA', 'W', 'W_SIGMA', 'PA', 'TA_1_1_1', 'RH_1_1_1', 'T_DP_1_1_1', 'TA_1_1_2', 'RH_1_1_2', 'T_DP_1_1_2', 'TA_1_1_3', 'RH_1_1_3', 'T_DP_1_1_3', 'TA_1_1_4', 'VPD', 'T_SONIC', 'T_SONIC_SIGMA', 'PBLH', 'TS_1_1_1', 'TS_1_1_2', 'SWC_1_1_1', 'SWC_1_1_2', 'ALB', 'NETRAD', 'SW_IN', 'SW_OUT', 'LW_IN', 'LW_OUT', 'P']
DEBUG [2025-10-02 11:14:18] __main__ – Skip rows for set to 1
DEBUG [2025-10-02 11:14:18] __main__ – Reading M:\Shared drives\UGS_Flux\Data_Downloads\compiled\US-UTE\AmeriFluxFormat\21021_Flux_AmeriFluxFormat_5_1.dat
INFO [2025-10-02 11:14:18] __main__ – 

In [20]:
# review variables with a lot of dropped values based on the report
report_stacked = outlier_report.stack(level=0)

report_final = report_stacked.reset_index(level=1)
report_final = report_final.rename(columns={'level_1': 'STATIONID'})

report_final[report_final.pct_flagged>=10]





Unnamed: 0,STATIONID,column,matched_key,min,max,n_below,n_above,n_flagged,pct_flagged
21,US-UTE,SW_IN_1_1_1,SW_IN,0.0,1300.0,9039,13,9052,31.982475
51,US-UTE,G_1_1_A,G,-250.0,400.0,96,6580,6676,23.587606


In [33]:
# run various tests on data
raw_file = r'M:\My Drive\projects\eddy_covariance\site_specific_data_review\Escalante_Flux_AmeriFluxFormat.dat'

dataloggerdf = datalogger_dat.reset_index().rename(columns={'level_0':'STATIONID'})

validate_flags(dataloggerdf)
print('\n')

results = process_and_match_columns(dataloggerdf, amflux)
print('\n')

validate_timestamp_consistency(dataloggerdf)
print('\n')

differences_from_raw = compare_to_raw(raw_file, dataloggerdf, test_var='NETRAD', threshold=0.1)
print('\n')
print(f'Differences between micromet (left) and raw (right) files')
print(differences_from_raw)

--- Starting Validation ---
Checking columns: ['FC_SSITC_TEST', 'LE_SSITC_TEST', 'ET_SSITC_TEST', 'H_SSITC_TEST', 'TAU_SSITC_TEST']
Allowed values: {0, 1, 2}
PASS: Column 'FC_SSITC_TEST' contains only valid values.
PASS: Column 'LE_SSITC_TEST' contains only valid values.
PASS: Column 'ET_SSITC_TEST' contains only valid values.
PASS: Column 'H_SSITC_TEST' contains only valid values.
PASS: Column 'TAU_SSITC_TEST' contains only valid values.
--- Validation Complete ---


COLUMNS NOT IN AMERIFLUX VARIABLE LIST

           all_columns       clean_columns  is_in_amflux
64     BATTERY_VOLTAGE     BATTERY_VOLTAGE         False
65  CO2_SIG_STRGTH_MIN  CO2_SIG_STRGTH_MIN         False
63       DATALOGGER_NO       DATALOGGER_NO         False
1         DATETIME_END        DATETIME_END         False
12            ET_1_1_1                  ET         False
13       ET_SSITC_TEST       ET_SSITC_TEST         False
21            FETCH_40               FETCH         False
20            FETCH_55         

In [34]:
# check for any duplicates and export parquette file
datalogger_final = dataloggerdf.rename(columns={'BATTERY_VOLTAGE':'V_BATT'}) # tried to fix this with the refromatter_vars but I must have done something wrong...

datalogger_final = datalogger_final.reset_index().rename(columns={'level_0':'STATIONID'})
if len (datalogger_final[datalogger_final.duplicated(subset=['STATIONID','DATETIME_END'])])>0:
    print('FAIL: STATIONID AND DATETIME_END DUPLICATES PRESENT')
    print('DROPPING DUPLICATES')
    datalogger_final = datalogger_final.drop_duplicates(subset=['STATIONID','DATETIME_END'])

else:
    print("PASS: NO STATIONID AND DATETIME_END DUPLICATES")

datalogger_final = datalogger_final.set_index(['STATIONID','DATETIME_END'])
datalogger_final = datalogger_final.mask(datalogger_final < -5000)
datalogger_final.to_parquet(raw_fold / "datalogger_diane.parquet")

PASS: NO STATIONID AND DATETIME_END DUPLICATES


In [43]:
# summarize and view data gaps (view for just one station)

gaps_datalogger = summarize_gaps(datalogger_final)

bal = datalogger_final.loc['US-UTE']
plotlystuff([bal, bal], ['LE_1_1_1', 'NETRAD_1_1_1'])

In [None]:
# this is just a check on the pre-install Escalante data
# looks like it is almost all just the TEST variables and a few other things
# can definitely drop all these data
check_date = pd.to_datetime('2024-06-01')
bal_sub = bal[bal.index<check_date]
bal_sub.info()

<class 'pandas.core.frame.DataFrame'>
DatetimeIndex: 6280 entries, 2024-01-22 04:00:00 to 2024-05-31 23:30:00
Data columns (total 66 columns):
 #   Column              Non-Null Count  Dtype  
---  ------              --------------  -----  
 0   index               6280 non-null   int64  
 1   TIMESTAMP_START     6280 non-null   int64  
 2   TIMESTAMP_END       6280 non-null   int64  
 3   CO2_1_1_1           0 non-null      float64
 4   CO2_SIGMA           0 non-null      float64
 5   H2O_1_1_1           0 non-null      float64
 6   H2O_SIGMA           0 non-null      float64
 7   FC_1_1_1            0 non-null      float64
 8   FC_SSITC_TEST       6280 non-null   int64  
 9   LE_1_1_1            0 non-null      float64
 10  LE_SSITC_TEST       6280 non-null   int64  
 11  ET_1_1_1            0 non-null      float64
 12  ET_SSITC_TEST       6280 non-null   int64  
 13  H_1_1_1             0 non-null      float64
 14  H_SSITC_TEST        6280 non-null   int64  
 15  G_1_1_A            

In [35]:
# Paul read in files after they were exported, above, and then put together into a  
# parqeutte file

# cmp_edd_df = {}

# for key, value in site_folders.items():
#     for file in (raw_fold / f"{key}").glob(f"{key}_HH_*.csv"):
#         print(file) 
#         df = pd.read_csv(file, index_col=0)
#         df.index = pd.to_datetime(df.index)
#         df = df.sort_index()
#         df = df.drop_duplicates(subset=['TIMESTAMP_START','TIMESTAMP_END'])
#         cmp_edd_df[key] = df

## Compile CSFormat Files

In [None]:
# processing one .dat file per station that is in station folder with "*_Flux_AmeriFluxFormat.dat"
csdf = {}
cs_reports = {}

for key, value in site_folders.items():
    file_pattern = raw_fold / key / '*_Flux_CSFormat.dat'
    file_to_read = next(raw_fold.glob(str(file_pattern.relative_to(raw_fold))))
    df = pd.read_csv(file_to_read,skiprows=[0,2,3],
                    na_values=[-9999,"NAN","NaN","nan"])
    am_data = micromet.Reformatter(drop_soil=True,
                                        logger=logger,
                                        )
    df = pd.read_csv(file,skiprows=[0,2,3],
                    na_values=[-9999,"NAN","NaN","nan"])
    
    csflux_temp, report = am_data.prepare(df, data_type="eddy")
    csdf[key] = csflux_temp
    cs_reports[key] = report

ef_report = pd.concat(ef_reports, axis=1)
cs_ = pd.concat(easyfluxdf)

# ef_report.to_csv(raw_fold / "easyflux_report_diane.csv")

In [167]:
# this outputs each site file into that sites folder

cs_df = {}
outlier_reports = {}

am = micromet.AmerifluxDataProcessor(logger=logger)

for key, value in site_folders.items():
    sitedf = {}
    sitereport = {}
    parent_fold = raw_fold / f"{key}" / "Flux_CSFormat"
    #ahp.scan(parent_fold, min_sim=0.3, backup=False)
    #pths = micromet.fix_all_in_parent(parent_fold)
    for file in parent_fold.glob("*_Flux_CSFormat*.dat"):
        am_data = micromet.Reformatter(drop_soil=False,
                                            logger=logger,
                                            )
        df = pd.read_csv(file,skiprows=[0,2,3],
                        na_values=[-9999,"NAN","NaN","nan"])
        # must create a timestamp_end column to feed into prepare
        # b/c otherwise no data will be returned
        df['TIMESTAMP'] = pd.to_datetime(df['TIMESTAMP'])
        df["TIMESTAMP_END"] = df.TIMESTAMP.dt.strftime("%Y%m%d%H%M").astype(int)
        
        csprep, report = am_data.prepare(df, data_type="eddy")
        sitedf[file] = csprep
        sitereport[file] = report
    cs_df[key] = pd.concat(sitedf)
    outlier_reports[key] = pd.concat(sitereport)



outlier_report = pd.concat(outlier_reports, axis=1).droplevel(level=0, axis=0)
cs_dat = pd.concat(cs_df).droplevel(level=1, axis=0)

INFO [2025-10-02 13:35:23] __main__ – Starting reformat (1081 rows)
DEBUG [2025-10-02 13:35:23] __main__ – TS col TIMESTAMP_END
DEBUG [2025-10-02 13:35:23] __main__ – TIMESTAMP_END col 202504270030
DEBUG [2025-10-02 13:35:23] __main__ – Len of unfixed timestamps 1081
DEBUG [2025-10-02 13:35:23] __main__ – Len of fixed timestamps 1081
DEBUG [2025-10-02 13:35:23] __main__ – Renaming columns from Index(['RECORD', 'FC_mass', 'FC_QC', 'FC_samples', 'LE', 'LE_QC', 'LE_samples',
       'H', 'H_QC', 'H_samples', 'NETRAD', 'G', 'SG', 'energy_closure',
       'poor_enrg_clsur', 'Bowen_ratio', 'TAU', 'TAU_QC', 'USTAR', 'TSTAR',
       'TKE', 'TA_1_1_1', 'RH_1_1_1', 'T_DP_1_1_1', 'e_amb', 'e_sat_amb',
       'TA_1_1_2', 'RH_1_1_2', 'T_DP_1_1_2', 'e', 'e_sat', 'TA_1_1_3',
       'RH_1_1_3', 'T_DP_1_1_3', 'e_probe', 'e_sat_probe', 'H2O_density_probe',
       'PA', 'VPD', 'TA_1_1_4', 'Ux', 'Ux_SIGMA', 'Uy', 'Uy_SIGMA', 'Uz',
       'Uz_SIGMA', 'T_SONIC', 'T_SONIC_SIGMA', 'sonic_azimuth', 'WS',
      

In [174]:
len(outlier_report)

1856

In [169]:
# review variables with a lot of dropped values based on the report
report_stacked = outlier_report.stack(level=0)

report_final = report_stacked.reset_index(level=1)
report_final = report_final.rename(columns={'level_1': 'STATIONID'})

report_final.to_csv(raw_fold / "csflux_report_diane.csv")

report_final[report_final.pct_flagged>=10]





Unnamed: 0,STATIONID,column,matched_key,min,max,n_below,n_above,n_flagged,pct_flagged
17,US-UTE,CO2_SIG_STRGTH_MIN,CO2,150.0,1200.0,1081,0,1081,100.000000
14,US-UTE,RECORD,RECO,-20.0,50.0,0,1081,1081,100.000000
16,US-UTE,CO2_DENSITY_SIGMA,CO2,150.0,1200.0,1080,0,1080,99.907493
30,US-UTE,FC_SAMPLES,FC,-100.0,100.0,0,1077,1077,99.629972
60,US-UTE,H_SAMPLES,H,-450.0,900.0,0,1076,1076,99.537465
...,...,...,...,...,...,...,...,...,...
60,US-UTE,H_SAMPLES,H,-450.0,900.0,0,419,419,99.761905
33,US-UTE,LE_SAMPLES,LE,-450.0,900.0,0,419,419,99.761905
27,US-UTE,TAU_QC,TAU,-10.0,2.0,0,211,211,50.238095
12,US-UTE,SW_IN_1_1_1,SW_IN,0.0,1300.0,187,0,187,44.523810


In [175]:
# drop and/or rename fields we don't want in the file

csflux = cs_dat.reset_index().rename(columns={'level_0':'STATIONID'})

drop_fields = [
    "TS_CS65X_2_1_1",
    "WS_RSLT",
    "_229_DEL_TMPR(1)",
    "_229_DEL_TMPR(2)",
    "_229_TMPR_T0_1",
    "_229_TMPR_T0_2",
    "_229_TMPR_T1_1",
    "_229_TMPR_T1_2",
    "_229_TMPR_T30_1",
    "_229_TMPR_T30_2",
    "_PANEL_TMPR_T0",
    "_PANEL_TMPR_T1",
    "_PANEL_TMPR_T30",
    "WND_DIR_STD",
    "WND_DIR_UNIT_VEC",
    "WND_SPD_AVG",
    "U_HEATMAX",
    "U_SEN0",
    "U_SENAMP",
    "U_SENMAX",
    "SONIC_AZIMUTH",
    "CS65X_EC_2_1_1"
    "SUN_AZIMUTH",
    "SUN_DECLINATION",
    "SUN_ELEVATION",
    "HEIGHT_AGL",
    "HOUR_ANGLE",
    "CS65X_PERM_1_1_1",
    "DAYTIME",
    "E",
    "E1_Q",
    "ANONYMOUS1",
    "ANONYMOUS2",
    "TD_TP01",
    "AIR_MASS_COEFF",
    "ROCP_TP01",
    "Q"
]

for field in drop_fields:
    if field in csflux.columns:
        csflux = csflux.drop(columns=[field],axis=1)

rename_fields = {
    "CS65X_EC_1_1_1":"EC_1_1_1",
    "CS65X_EC_1_1_2":"EC_1_1_2",
    "LI7700_AMB_TMPR":"TA_1_1_5",
    "T_SONIC":"T_SONIC_1_1_1",
    'CO2_SIGMA':'CO2_SIGMA_1_1_1', 
    'H2O_SIGMA':'H2O_SIGMA_1_1_1',
    }


csflux = csflux.rename(columns=rename_fields)

In [176]:
# run various tests on data; file needs to be downloaded file from easyflux website
# skip flags- not in dataframe
raw_file = r'M:\My Drive\projects\eddy_covariance\site_specific_data_review\Escalante_Flux_CSFormat.dat'

results = process_and_match_columns(csflux, amflux)
print('\n')

validate_timestamp_consistency(csflux)
print('\n')

differences_from_raw = compare_to_raw(raw_file, csflux, test_var='NETRAD', threshold=0.1)
print('\n')
print(f'Differences between micromet (left) and raw (right) files')
print(differences_from_raw)

COLUMNS NOT IN AMERIFLUX VARIABLE LIST

           all_columns       clean_columns  is_in_amflux
18         BOWEN_RATIO         BOWEN_RATIO         False
55         CO2_DENSITY         CO2_DENSITY         False
56   CO2_DENSITY_SIGMA   CO2_DENSITY_SIGMA         False
59  CO2_SIG_STRGTH_MIN  CO2_SIG_STRGTH_MIN         False
1         DATETIME_END        DATETIME_END         False
76            EC_1_1_1                  EC         False
77            EC_1_1_2                  EC         False
16      ENERGY_CLOSURE      ENERGY_CLOSURE         False
27               E_AMB               E_AMB         False
36             E_PROBE             E_PROBE         False
32               E_SAT               E_SAT         False
28           E_SAT_AMB           E_SAT_AMB         False
37         E_SAT_PROBE         E_SAT_PROBE         False
4              FC_MASS             FC_MASS         False
5                FC_QC               FC_QC         False
6           FC_SAMPLES          FC_SAMPLES      

In [177]:
# based on Paul's merge code; I found that most of the columns didn't exist in my data but I just looked at Escalante
# may want to drop the value columns when I am done

mergefields = {
    "TA_1_1_4": ["AMB_AIR_TMPR"],
    "E_AMB": ["AMB_E"],
    "E_SAT_AMB": ["AMB_E_SAT"],
    "TS_1_1_1": ["TS_CS65X_1_1_1", "TS_CS65X_1_1_2"], 
    "TS110_T_AVG": ["T_CANOPY"]
}

for key, values_list in mergefields.items():
    
    if key not in csflux.columns:
        print(f"Skipping target '{key}': not found in DataFrame.")
        continue # Skip to the next key
    
    s_target = csflux[key].replace(-9999, np.nan)
    
    for value in values_list:
        if value in csflux.columns:
            print(f"Merging '{value}' into '{key}'...")
            
            s_source = csflux[value].replace(-9999, np.nan)
            
            s_target = s_target.combine_first(s_source)
        else:
            print(f"Source column '{value}' not found, skipping merge into '{key}'.")
            
    # Save the final result back to the DataFrame
    csflux[key] = s_target.fillna(-9999)

Skipping target 'TA_1_1_4': not found in DataFrame.
Source column 'AMB_E' not found, skipping merge into 'E_AMB'.
Source column 'AMB_E_SAT' not found, skipping merge into 'E_SAT_AMB'.
Source column 'TS_CS65X_1_1_1' not found, skipping merge into 'TS_1_1_1'.
Source column 'TS_CS65X_1_1_2' not found, skipping merge into 'TS_1_1_1'.
Skipping target 'TS110_T_AVG': not found in DataFrame.


In [184]:
# check for any duplicates and export parquette file
# # # Identify duplicate column names
duplicate_columns = csflux.columns[csflux.columns.duplicated()]
print("Duplicate column names:", duplicate_columns)


csflux_final = csflux.reset_index().rename(columns={'level_0':'STATIONID'})
if len (csflux_final[csflux_final.duplicated(subset=['STATIONID','DATETIME_END'])])>0:
    print('FAIL: STATIONID AND DATETIME_END DUPLICATES PRESENT')
    print('DROPPING DUPLICATES')
    csflux_final = csflux_final.drop_duplicates(subset=['STATIONID','DATETIME_END'])

else:
    print("PASS: NO STATIONID AND DATETIME_END DUPLICATES")

csflux_final = csflux_final.set_index(['STATIONID','DATETIME_END'])
csflux_final = csflux_final.mask(csflux_final < -5000)

csflux_final.to_parquet(raw_fold / "csflux_diane.parquet")

Duplicate column names: Index([], dtype='object')
PASS: NO STATIONID AND DATETIME_END DUPLICATES


In [185]:
# summarize and view data gaps (view for just one station)

gaps_csflux = summarize_gaps(csflux_final)

bal = csflux_final.loc['US-UTE']
bal = bal.sort_index()
plotlystuff([bal, bal], ['LE_1_1_1', 'NETRAD_1_1_1'])

# Bring together the datasets - NOT REVIEWED!

In [None]:
gaps_amfluxfmt

In [None]:
site_vs_files = compare_gap_summaries(gaps_easyflux, gaps_amfluxfmt)

In [None]:
site_vs_files

In [None]:
import pandas as pd
from pandas.tseries.frequencies import to_offset

def fill_missing_from_other(
    df_target: pd.DataFrame,
    df_source: pd.DataFrame,
    expected_freq: str = "30min",
    add_missing_timestamps: bool = True,
    min_steps: int = 1,
    columns: list | None = None,
    station_level: str = "STATIONID",
    time_level: str = "DATETIME_END",
    return_plan: bool = False,
):
    """
    Fill missing values in `df_target` using `df_source` guided by gap/coverage analysis.

    It:
      1) runs `summarize_gaps` on target and source
      2) runs `compare_gap_summaries` to find fillable segments where SOURCE can fill TARGET
      3) (optionally) reindexes target to include any missing timestamps in those segments
      4) copies values from source -> target ONLY for the targeted column(s), station, and times
         where target is missing (NaN or newly added rows)

    Parameters
    ----------
    df_target : pd.DataFrame
        MultiIndex (station, datetime) with data to be filled (we call this "A" internally).
    df_source : pd.DataFrame
        MultiIndex (station, datetime) with data to copy from (we call this "B").
    expected_freq : str, default "30min"
        Grid frequency (must match both datasets).
    add_missing_timestamps : bool, default True
        If True, adds missing rows in the target during fillable segments before copying.
        If False, only fills NaN cells at timestamps that already exist in target.
    min_steps : int, default 1
        Only consider fillable segments of at least this many samples.
    columns : list[str] | None
        Optional subset of columns to fill. By default uses the intersection of
        df_target.columns and df_source.columns.
    station_level : str, default "STATIONID"
        Name of station level in MultiIndex.
    time_level : str, default "DATETIME_END"
        Name of time level in MultiIndex.
    return_plan : bool, default False
        If True, also returns the computed fill plan (B→A only).

    Returns
    -------
    filled : pd.DataFrame
        A copy of `df_target` with values filled from `df_source`.
    audit : pd.DataFrame
        Row-by-row audit of realized fills with columns:
          ['STATIONID','COLUMN','FILLABLE_START','FILLABLE_END',
           'N_STEPS_PLANNED','N_STEPS_FILLED','HOURS_FILLED']
    plan (optional) : pd.DataFrame
        The B→A portion of the compare plan (only if return_plan=True).

    Notes
    -----
    - Requires the helper functions `summarize_gaps` and `compare_gap_summaries` to be defined.
    - Only copies the specified column indicated by each plan row (no cross-column filling).
    - Never overwrites non-missing target values.
    """
    # --- basic checks ---
    if not isinstance(df_target.index, pd.MultiIndex) or not isinstance(df_source.index, pd.MultiIndex):
        raise TypeError("Both df_target and df_source must have a MultiIndex (station, datetime).")
    if station_level not in df_target.index.names or time_level not in df_target.index.names:
        raise KeyError("df_target index must include levels: station and time.")
    if station_level not in df_source.index.names or time_level not in df_source.index.names:
        raise KeyError("df_source index must include levels: station and time.")

    # Decide which columns to work on
    if columns is None:
        columns = list(set(df_target.columns).intersection(set(df_source.columns)))
        if not columns:
            raise ValueError("No overlapping columns between target and source to fill.")

    # Frequency helpers
    freq_td = to_offset(expected_freq).delta
    hours_per_step = freq_td / pd.Timedelta(hours=1)

    # --- Build plan: B fills A ---
    gaps_a = summarize_gaps(df_target, station_level=station_level, time_level=time_level,
                            expected_freq=expected_freq, columns=columns)
    gaps_b = summarize_gaps(df_source, station_level=station_level, time_level=time_level,
                            expected_freq=expected_freq, columns=columns)
    plan_all = compare_gap_summaries(gaps_a, gaps_b, expected_freq=expected_freq, min_steps=min_steps)
    plan = plan_all[plan_all["TARGET_DATASET"] == "A"].copy()

    if plan.empty:
        # Nothing to do
        audit = pd.DataFrame(columns=[
            "STATIONID","COLUMN","FILLABLE_START","FILLABLE_END",
            "N_STEPS_PLANNED","N_STEPS_FILLED","HOURS_FILLED"
        ])
        return (df_target.copy(), audit, plan) if return_plan else (df_target.copy(), audit)

    # Optional column filter
    plan = plan[plan["COLUMN"].isin(columns)].copy()
    if plan.empty:
        audit = pd.DataFrame(columns=[
            "STATIONID","COLUMN","FILLABLE_START","FILLABLE_END",
            "N_STEPS_PLANNED","N_STEPS_FILLED","HOURS_FILLED"
        ])
        return (df_target.copy(), audit, plan) if return_plan else (df_target.copy(), audit)

    # --- Prepare a working copy of target ---
    target = df_target.copy()

    # If we need to add missing timestamps, compute per-station union of times from plan
    if add_missing_timestamps:
        add_times_by_station = {}
        for _, r in plan.iterrows():
            stn = r["STATIONID"]
            times = pd.date_range(r["FILLABLE_START"], r["FILLABLE_END"], freq=expected_freq)
            add_times_by_station.setdefault(stn, set()).update(times.to_pydatetime().tolist())

        # Reindex per station once with the union of needed times
        rebuilt = []
        stations = target.index.get_level_values(station_level).unique()
        stations_in_plan = set(plan["STATIONID"].unique())
        for stn in stations.union(stations_in_plan):
            # Slice existing station data if present, else empty
            if stn in stations:
                sub = target.xs(stn, level=station_level)
            else:
                # Create empty subframe with all columns if station absent
                sub = pd.DataFrame(columns=target.columns, index=pd.DatetimeIndex([], name=time_level))

            need_times = pd.DatetimeIndex(sorted(add_times_by_station.get(stn, [])))
            if len(need_times) > 0:
                new_index = sub.index.union(need_times)
                sub = sub.reindex(new_index)

            # Return to MultiIndex
            sub = sub.copy()
            sub[station_level] = stn
            sub[time_level] = sub.index
            sub = sub.set_index([station_level, time_level]).sort_index()
            rebuilt.append(sub)

        target = pd.concat(rebuilt).sort_index()

    # --- Perform the fill per plan row ---
    audit_rows = []
    idx = pd.IndexSlice
    for _, r in plan.iterrows():
        stn = r["STATIONID"]
        col = r["COLUMN"]
        times = pd.date_range(r["FILLABLE_START"], r["FILLABLE_END"], freq=expected_freq)

        # Intersect with indices present in both frames (after optional reindex, target has them;
        # still be safe if add_missing_timestamps=False)
        try:
            t_vals = target.loc[idx[stn, times], col]
        except KeyError:
            # If none of the times exist in target and we didn't reindex them in, skip
            continue

        # Source values for those times (skip if missing in source for any reason)
        try:
            s_vals = df_source.loc[idx[stn, times], col]
        except KeyError:
            # If source lacks all those times (shouldn't happen per plan), skip
            continue

        # Only fill where target is NA and source is not NA
        to_fill_mask = t_vals.isna() & s_vals.notna()
        if not to_fill_mask.any():
            # Nothing filled for this segment
            audit_rows.append({
                "STATIONID": stn,
                "COLUMN": col,
                "FILLABLE_START": r["FILLABLE_START"],
                "FILLABLE_END": r["FILLABLE_END"],
                "N_STEPS_PLANNED": int(r["N_STEPS_FILLABLE"]),
                "N_STEPS_FILLED": 0,
                "HOURS_FILLED": 0.0,
            })
            continue

        # Assign
        fill_index = to_fill_mask.index[to_fill_mask]
        target.loc[idx[stn, fill_index], col] = s_vals.loc[fill_index]

        n_filled = int(to_fill_mask.sum())
        audit_rows.append({
            "STATIONID": stn,
            "COLUMN": col,
            "FILLABLE_START": r["FILLABLE_START"],
            "FILLABLE_END": r["FILLABLE_END"],
            "N_STEPS_PLANNED": int(r["N_STEPS_FILLABLE"]),
            "N_STEPS_FILLED": n_filled,
            "HOURS_FILLED": n_filled * hours_per_step,
        })

    audit = pd.DataFrame(audit_rows, columns=[
        "STATIONID","COLUMN","FILLABLE_START","FILLABLE_END",
        "N_STEPS_PLANNED","N_STEPS_FILLED","HOURS_FILLED"
    ]).sort_values(["STATIONID","COLUMN","FILLABLE_START"]).reset_index(drop=True)

    # Done
    target = target.sort_index()
    if return_plan:
        return target, audit, plan
    return target, audit


In [None]:
# Assuming summarize_gaps() and compare_gap_summaries() are defined (from earlier),
# and df_a (target) and df_b (source) are your MultiIndex DataFrames.

df_a = pd.read_parquet(raw_fold / "easyflux.parquet")
df_b = pd.read_parquet(raw_fold / "comp_edd.parquet")
filled_a, audit = fill_missing_from_other(
    df_target=df_a,
    df_source=df_b,
    expected_freq="30min",
    add_missing_timestamps=True,     # add structurally-missing rows before filling
    min_steps=1,                     # ignore super-short segments if you want, e.g., min_steps=2
    columns=["LE_1_1_1","H_1_1_1","NETRAD_1_1_1",
             "LW_IN_1_1_1","SW_IN_1_1_1","SW_OUT_1_1_1","LW_OUT_1_1_1"],         # or None to auto-use shared columns
    station_level="STATIONID",
    time_level="DATETIME_END",
)

print(audit.head())
# filled_a now contains values copied from df_b wherever plan said B could fill A.


## Eddy Data

In [None]:
df_edd = pd.read_parquet(raw_fold /  "comp_edd.parquet",).replace(-9999,np.nan)
df_edd.index.names = ['STATIONID','DATETIME_END']
df_edd['PRIORITY'] = 1

df = pd.read_parquet(raw_fold /  "easyflux.parquet",).replace(-9999,np.nan)
df.index.names = ['STATIONID','DATETIME_END']
df['PRIORITY'] = 2

In [None]:
df_merged = pd.read_parquet(raw_fold / "comp_cs_flux.parquet")
df_merged.index.names = ['STATIONID','DATETIME_END']
df_merged['PRIORITY'] = 3

In [None]:
dfdb = pd.read_parquet(raw_fold /  "old_database_eddy.parquet",).replace(-9999,np.nan)
dfdb.columns = dfdb.columns.str.upper()
dfdb['DATETIME_END'] = pd.to_datetime(dfdb['DATETIME_END'])
#dfdb["TIMESTAMP_START"] = dfdb['DATETIME_END'].apply(lambda x: f"{x:%Y%m%d%H%M}")
dfdb = dfdb.set_index(['STATIONID','DATETIME_END'])
#df.index.names = ['station','datetime']

dfdb.columns = dfdb.columns.str.upper()
rename_dict = {'CO2':'CO2_1_1_1', 
               'CO2_SIGMA':'CO2_SIGMA_1_1_1', 
               'H2O':'H2O_1_1_1', 
               'H2O_SIGMA':'H2O_SIGMA_1_1_1',
               'FC':'FC_1_1_1', 
               'FC_SSITC_TEST':'FC_SSITC_TEST_1_1_1', 
               'LE':'LE_1_1_1',
               'LE_SSITC_TEST':'LE_SSITC_TEST_1_1_1', 
               'ET':'ET_1_1_1',
               'ET_SSITC_TEST':'ET_SSITC_TEST_1_1_1', 
               'H':'H_1_1_1',
               'H_SSITC_TEST':'H_SSITC_TEST_1_1_1', 
               'G':'G_1_1_A',
               'G_SSITC_TEST':'G_SSITC_TEST_1_1_1',
               'SG':'SG_1_1_1', 
               'WD':'WD_1_1_1', 
               'WS':'WS_1_1_1', 
               'WS_MAX':'WS_MAX_1_1_1',
               'PA':'PA_1_1_1', 
               'VPD':'VPD_1_1_1', 
               'ALB':'ALB_1_1_1', 
               'NETRAD':'NETRAD_1_1_1', 
               'SW_IN':'SW_IN_1_1_1',
               'SW_OUT':'SW_OUT_1_1_1', 
               'LW_IN':'LW_IN_1_1_1', 
               'LW_OUT':'LW_OUT_1_1_1', 
               'P':'P_1_1_1', 
               }

dfdb = dfdb.rename(columns=rename_dict)
dfdb['ET_1_1_1'].where(dfdb['ET_1_1_1'].between(0,1.1),np.nan)
dfdb['PRIORITY'] = 4


In [None]:
import numpy as np
import pandas as pd

def coalesce_by_priority_multiindex(
    df_or_dfs,
    priority_col="priority",
    ascending=True,
    invalid_values=(-9999,),
    keep_index=True,
):
    """
    Column-wise coalesce: for each MultiIndex group (all index levels except `priority_col`),
    take the first non-null value per column after sorting by priority.

    Parameters
    ----------
    df_or_dfs : DataFrame or list/tuple of DataFrames
        Concatenated DataFrame (or list to be concatenated) with a MultiIndex.
    priority_col : str
        Column name (or index level name) indicating priority. Lower/greater is better
        depending on `ascending`.
    ascending : bool
        Sort so that smaller (True) or larger (False) priority wins.
    invalid_values : tuple
        Treat these values as missing.
    keep_index : bool
        Keep the MultiIndex in the result. If False, returns a reset_index frame.
    """
    # 0) Accept list of dfs or a single df
    if isinstance(df_or_dfs, (list, tuple)):
        df = pd.concat(df_or_dfs, axis=0)
    else:
        df = df_or_dfs.copy()

    # 1) If priority is an index level, move it to a column (so we don't group by it)
    if isinstance(df.index, pd.MultiIndex) and priority_col in df.index.names:
        df = df.reset_index(level=priority_col)

    # 2) Define group levels = all current index levels (MultiIndex) → the "keys"
    if not isinstance(df.index, pd.MultiIndex):
        raise ValueError("Expected a MultiIndex index. Set your keys as the DataFrame index first.")
    group_levels = list(df.index.names)

    # 3) Value columns = all columns except the priority column
    if priority_col not in df.columns:
        raise ValueError(f"'{priority_col}' must be a column or an index level.")
    value_cols = [c for c in df.columns if c != priority_col]

    # 4) Treat sentinels as NaN
    if invalid_values:
        for v in invalid_values:
            df[value_cols] = df[value_cols].mask(df[value_cols].eq(v))
    df[value_cols] = df[value_cols].where(df[value_cols].notna(), np.nan)

    # 5) Sort by priority (best first)
    df = df.sort_values(priority_col, ascending=ascending)

    # 6) Per group & per column, take the first non-null
    def _first_valid(s):
        s = s.dropna()
        return s.iloc[0] if len(s) else np.nan

    out = (
        df.groupby(level=group_levels, sort=False)[value_cols]
          .agg(_first_valid)
    )

    return out if keep_index else out.reset_index()



result = coalesce_by_priority_multiindex([df,df_edd,df_merged],  
                              priority_col="PRIORITY", 
                              ascending=True, 
                              invalid_values=(-9999,np.nan,"NAN",None))
result.to_parquet(raw_fold / "combined_eddy_dataset_20250905.parquet")

In [None]:
import numpy as np
import pandas as pd

def coalesce_by_priority_multiindex_fast(
    df_or_dfs,
    priority_col="PRIORITY",
    ascending=True,
    invalid_values=(-9999,),
    keep_index=True,
):
    # 1) Combine frames
    if isinstance(df_or_dfs, (list, tuple)):
        df = pd.concat(df_or_dfs, axis=0)
    else:
        df = df_or_dfs.copy()

    # 2) Ensure PRIORITY is an index level (last)
    if priority_col in df.columns:
        df = df.set_index(priority_col, append=True)
    elif not (isinstance(df.index, pd.MultiIndex) and priority_col in df.index.names):
        raise ValueError(f"'{priority_col}' must be a column or an index level.")
    levels = list(df.index.names)
    if levels[-1] != priority_col:
        levels.remove(priority_col)
        levels.append(priority_col)
        df = df.reorder_levels(levels).sort_index()

    value_cols = list(df.columns)  # all non-index columns

    # 3) Normalize invalids → NaN
    if invalid_values:
        for v in invalid_values:
            # Skip np.nan because .eq(np.nan) is always False
            if isinstance(v, float) and np.isnan(v):
                continue
            df[value_cols] = df[value_cols].mask(df[value_cols].eq(v))
    df[value_cols] = df[value_cols].where(df[value_cols].notna(), np.nan)

    # 4) DEDUP step: collapse duplicates per (keys..., PRIORITY)
    #    For each group & column, take the first non-null.
    def _first_valid(s):
        s = s.dropna()
        return s.iloc[0] if len(s) else np.nan

    df = (
        df.groupby(level=list(df.index.names), sort=False)[value_cols]
          .agg(_first_valid)
    )

    # 5) Unstack PRIORITY and fill across priority dimension (best → worse)
    wide = df.unstack(priority_col)  # columns: (value_col, priority)
    wide = wide.sort_index(axis=1, level=1, ascending=ascending)
    filled = wide.bfill(axis=1)

    # 6) Take the first (best) priority slice for each value column
    best_priority_label = filled.columns.levels[1][0]
    out = filled.xs(best_priority_label, level=1, axis=1)

    return out if keep_index else out.reset_index()

result2 = coalesce_by_priority_multiindex_fast([df,df_edd,df_merged, dfdb],  
                              priority_col="PRIORITY", 
                              ascending=True, 
                              invalid_values=(-9999,np.nan,"NAN",None))
result2.to_parquet(raw_fold / "combined_eddy_dataset_20250905_v2.parquet")

In [None]:
result2.loc['US-UTV','NETRAD_1_1_1'].sort_index().plot()
plt.ylim(0,800)

In [None]:
def filter_static_outliers(
    df: pd.DataFrame,
    thresh: float = 4.0,
) -> pd.DataFrame:
    """
    Replace values that deviate more than `thresh` standard deviations
    from the *station-wide* mean (no moving window).

    Outlier detection is performed separately for each station (level-0
    of the MultiIndex).  Only floating-point columns are filtered.

    Parameters
    ----------
    df : pandas.DataFrame
        MultiIndex DataFrame with outer index = stationid and inner
        index = datetime (half-hourly).
    thresh : float, default 3.0
        Number of σ from the mean that defines an outlier.

    Returns
    -------
    pandas.DataFrame
        Copy of `df` with outliers in float columns replaced by NaN.
    """
    # Work on a copy to avoid mutating the caller’s DataFrame
    df = df.copy()

    # Select only float columns (ignore integers, objects, etc.)
    float_cols = df.select_dtypes(include=[np.floating]).columns
    if float_cols.empty:
        return df                        # nothing to do

    # Compute station-specific mean and std, broadcast back with transform
    grp = df[float_cols].groupby(level=0)
    mean  = grp.transform("mean")
    std   = grp.transform("std")         # sample std (ddof=1) like pandas default

    # Identify outliers and replace with NaN
    mask = (df[float_cols] - mean).abs() > thresh * std
    df.loc[:, float_cols] = df[float_cols].mask(mask)

    return df

In [None]:
combo = pd.concat([df,df_edd,df_merged],axis=0)
# Remove duplicate station datetime values, keeping the non-na values
combo = combo.sort_values(['LE_1_1_1','NETRAD_1_1_1','priority']).sort_index()
combo = combo.reset_index().drop_duplicates(subset=['stationid','DATETIME_END'],keep='first')
combo = combo.set_index(['stationid','DATETIME_END'])

In [None]:
# can't run this- drops most precip values
# clean_df = filter_static_outliers(combo, thresh=4)  # custom

In [None]:
combo.to_parquet(raw_fold / "combined_eddy_dataset.parquet")

## Met Compile

In [None]:
df_met = pd.read_parquet(raw_fold /  "comp_met.parquet",).replace(-9999,np.nan)
df_met.index.names = ['stationid','DATETIME_END']
df_met['priority'] = 1

In [None]:
stmet = pd.read_parquet(raw_fold / "comp_met_stat.parquet")
stmet['DATETIME_END'] = pd.to_datetime(stmet['TIMESTAMP_START'],format="%Y%m%d%H%M")
stmet = stmet.reset_index()
stmet = stmet.rename(columns = {'level_0':'stationid'})
stmet = stmet.set_index(['stationid','DATETIME_END'])

In [None]:
dfdbm = pd.read_parquet(raw_fold /  "old_database_met.parquet",).replace(-9999,np.nan)
dfdbm['DATETIME_END'] = pd.to_datetime(dfdbm['DATETIME_END'])
dfdbm = dfdbm.set_index(['stationid','DATETIME_END'])
#df.index.names = ['station','datetime']

dfdbm.columns = dfdbm.columns.str.upper()
rename_dict_m = {'CO2':'CO2_1_1_2', 
               'CO2_SIGMA':'CO2_SIGMA_1_1_2', 
               'H2O':'H2O_1_1_2', 
               'H2O_SIGMA':'H2O_SIGMA_1_1_2',
               'FC':'FC_1_1_2', 
               'FC_SSITC_TEST':'FC_SSITC_TEST_1_1_2', 
               'LE':'LE_1_1_2',
               'LE_SSITC_TEST':'LE_SSITC_TEST_1_1_2', 
               'ET':'ET_1_1_2',
               'ET_SSITC_TEST':'ET_SSITC_TEST_1_1_2', 
               'H':'H_1_1_2',
               'H_SSITC_TEST':'H_SSITC_TEST_1_1_2', 
               'G':'G_1_1_A',
               'G_SSITC_TEST':'G_SSITC_TEST_1_1_2',
               'SG':'SG_1_1_2', 
               'WD':'WD_1_1_2', 
               'WS':'WS_1_1_2', 
               'WS_MAX':'WS_MAX_1_1_2',
               'PA':'PA_1_1_2', 
               'VPD':'VPD_1_1_2', 
               'ALB':'ALB_1_1_2', 
               'NETRAD':'NETRAD_1_1_2', 
               'SW_IN':'SW_IN_1_1_2',
               'SW_OUT':'SW_OUT_1_1_2', 
               'LW_IN':'LW_IN_1_1_2', 
               'LW_OUT':'LW_OUT_1_1_2', 
               'P':'P_1_1_2', 
               }

dfdbm = dfdbm.rename(columns=rename_dict_m)
#dfdb['ET_1_1_1'].where(dfdb['ET_1_1_1'].between(0,1.1),np.nan)
dfdbm['priority'] = 3


In [None]:
combo_met = pd.concat([df_met,dfdbm,stmet],axis=0)
combo_met

In [None]:
# Remove duplicate station datetime values, keeping the non-na values
combo_met = combo_met.sort_values(['NETRAD_1_1_2','priority']).sort_index()
combo_met = combo_met.reset_index().drop_duplicates(subset=['stationid','DATETIME_END'],keep='first')
combo_met = combo_met.set_index(['stationid','DATETIME_END'])

In [None]:
# may want to revisit whether to run this- caused issues with precip data for the eddy stations
# clean_df_met = filter_static_outliers(combo_met, thresh=4)  # custom

In [None]:
clean_df_met.to_parquet(raw_fold / "combined_met_dataset.parquet")

In [None]:
met  = pd.read_parquet(raw_fold / "combined_met_dataset.parquet")
eddy = pd.read_parquet(raw_fold / "combined_eddy_dataset.parquet")
#met.to_csv(raw_fold / "combined_met_dataset.csv")
#eddy.to_csv(raw_fold / "combined_eddy_dataset.csv")

In [None]:
combined = pd.merge(met, eddy, how='outer', left_index=True, right_index=True,
         suffixes=('_met', '_eddy'))

combined

In [None]:
combined.loc['US-UTD']

In [None]:
combined.loc['US-UTD', ['WS','WS_1_1_1']].dropna().plot(kind='scatter',x='WS',y='WS_1_1_1',)

In [None]:
compare_cols = ["WS", 
                "TA_", 
                "RH_", 
                "LE_", 
                "H_", 
                "VPD", 
                "PA", 
                "WD", 
                "NETRAD", 
                "SW_IN_", 
                "SW_OUT_", 
                "LW_IN_", 
                "LW_OUT_", 
                "ALB"]

matches = {}
for i in compare_cols:
    values = []
    met_col = []
    eddy_col = []

    for col in met.columns:
        if 'MAX' not in col and 'SSITC' not in col:
            if col.startswith(i):
                values.append(col)
                met_col.append(col)

    for col in eddy.columns:
        if 'MAX' not in col and 'SSITC' not in col:
            if col.startswith(i):
                values.append(col)
                eddy_col.append(col)

    matches[i] = values
    if len(values) > 1:
        fig, ax = plt.subplots(figsize=(12, 6))
        plt.title(f"Comparison of {i} for US-UTD")
        for j in met_col:
            met.loc['US-UTD',j].replace(-9999,np.nan).plot(label=j,ax=ax)
        for k in eddy_col:
            eddy.loc['US-UTD',k].replace(-9999,np.nan).plot(label=k,ax=ax)
        plt.legend()





In [None]:
import pandas as pd
import numpy as np
from scipy import stats
from sklearn.ensemble import IsolationForest
from collections import defaultdict

# --------------------------------------------------
# 1. LOAD  (needs pyarrow or fastparquet installed)
# --------------------------------------------------
met  = pd.read_parquet(raw_fold / "combined_met_dataset.parquet")
eddy = pd.read_parquet(raw_fold / "combined_eddy_dataset.parquet")

# If not already multi-indexed by (station, timestamp):
# met  = met.set_index(["station_id", "timestamp"]).sort_index()
# eddy = eddy.set_index(["station_id", "timestamp"]).sort_index()

# Keep only overlapping station–time rows
common_idx = met.index.intersection(eddy.index)
met, eddy  = met.loc[common_idx], eddy.loc[common_idx]

# --------------------------------------------------
# 2. DEFINE THE PREFIXES YOU WANT TO COMPARE
#    (fill this list in with your own)
# --------------------------------------------------
prefixes = ["WS", "TA", "RH", "LE", "H", "VPD", "PA", "WD", "NETRAD", "SW_IN", "SW_OUT", "LW_IN", "LW_OUT", "ALB"]

# --------------------------------------------------
# 3. BUILD A MATCH TABLE  {prefix -> [(met_col, eddy_col), …]}
# --------------------------------------------------
matches = defaultdict(list)

for p in prefixes:
    # columns that begin with that prefix
    met_cols  = [c for c in met.columns  if c.startswith(p)]
    eddy_cols = [c for c in eddy.columns if c.startswith(p)]

    # simplest strategy: look for *exact* column-name matches
    common = set(met_cols).intersection(eddy_cols)
    for col in common:
        matches[p].append((col, col))

    # fallback: if names differ after the prefix, pair by the suffix
    if not common:
        met_suffix  = {c[len(p):]: c for c in met_cols}
        eddy_suffix = {c[len(p):]: c for c in eddy_cols}
        for suf in met_suffix.keys() & eddy_suffix.keys():
            matches[p].append((met_suffix[suf], eddy_suffix[suf]))

# sanity check
if not any(matches.values()):
    raise ValueError("No columns matched with the given prefixes!")
else:
    print(f"Found {len(matches)} prefixes with matches:")
    for p, pairs in matches.items():
        print(f"  {p}: {len(pairs)} pairs")
        for mcol, ecol in pairs:
            print(f"    {mcol} ↔ {ecol}")

# --------------------------------------------------
# 4. COLLECT ALL DIFFERENCES INTO ONE DATAFRAME
#    (column names => "<prefix><suffix>_diff")
# --------------------------------------------------
diff_frames = []
for p, pairs in matches.items():
    for mcol, ecol in pairs:
        name = f"{mcol}_diff"          # keeps original met name for clarity
        diff_frames.append(
            (name, met[mcol] - eddy[ecol])
        )

# combine into a single MultiIndex-friendly DataFrame
diff = pd.concat(
    {name: series for name, series in diff_frames}, axis=1
)

abs_diff = diff.abs()

# --------------------------------------------------
# 5. OUTLIER METHODS
# --------------------------------------------------
# 5A. Z-score (3σ)
z_scores = abs_diff.groupby(level=0).transform(
    lambda g: (g - g.mean()) / g.std(ddof=0)
)
flags_z = z_scores > 3

# 5B. MAD (3.5× MAD)
def mad_flags(s, k=3.5):
    med = s.median()
    mad = np.median(np.abs(s - med))
    return np.abs(s - med) / (1.4826 * mad + 1e-9) > k

flags_mad = abs_diff.groupby(level=0).transform(mad_flags)

# 5C. Isolation Forest (multivariate, per station)
flags_if = pd.DataFrame(False, index=abs_diff.index, columns=abs_diff.columns)

for stn, g in abs_diff.groupby(level=0):
    X   = g.values
    ok  = np.any(~np.isnan(X), axis=1)
    if ok.sum() < 20:                # need enough rows to fit
        continue

    clf = IsolationForest(
        n_estimators=300,
        contamination=0.01,
        random_state=42,
    ).fit(X[ok])

    row_out = clf.predict(X[ok]) == -1   # → Boolean vector
    # broadcast to all columns
    flags_if.loc[g.index[ok], :] = np.repeat(
        row_out[:, None], g.shape[1], axis=1
    )

# --------------------------------------------------
# 6. QUICK SUMMARY  (how many flags per variable)
# --------------------------------------------------
summary = (
    pd.DataFrame({
        "Zscore": flags_z.sum(),
        "MAD":    flags_mad.sum(),
        "IsoF":   flags_if.sum(),
    })
    .sort_index()
)
print(summary.head())

# --------------------------------------------------
# 7. OPTIONAL:  EXPORT OR APPLY MASK
# --------------------------------------------------
# Example: mask out any value flagged by *any* method
combined_flags = flags_z | flags_mad | flags_if
clean_met  = met.where(~combined_flags)  # replaces flagged cells with NaN


In [None]:
met_cols

In [None]:
import pandas as pd
import numpy as np
from scipy import stats
from sklearn.ensemble import IsolationForest

# ---------- 1. LOAD ----------
met   = pd.read_parquet(raw_fold /"combined_met_dataset.parquet")   # needs pyarrow or fastparquet
eddy  = pd.read_parquet(raw_fold /"combined_eddy_dataset.parquet")

# If your indices aren’t yet a MultiIndex (station, time) do this once:
# met  = met.set_index(["station_id","timestamp"]).sort_index()
# eddy = eddy.set_index(["station_id","timestamp"]).sort_index()

# Keep only the overlapping stations & times
common_idx = met.index.intersection(eddy.index)
met  = met.loc[common_idx]
eddy = eddy.loc[common_idx]

# ---------- 2. IDENTIFY MATCHING VARIABLES ----------
common_cols = met.columns.intersection(eddy.columns)
if common_cols.empty:
    raise ValueError("No shared measurement names between the two datasets!")

# Optionally drop columns that are integer-typed (often flags / counters)
keep_float = [c for c in common_cols if np.issubdtype(met[c].dtype, np.floating)]
met  = met[keep_float]
eddy = eddy[keep_float]

# ---------- 3. STACK THE TWO SOURCES FOR EZ COMPARISON ----------
diff = met - eddy              # sign tells you which source is higher
abs_diff = diff.abs()

# ---------- 4A. Z-SCORE BASED OUTLIERS ----------
z_scores = abs_diff.groupby(level=0).transform(  # compute σ station-by-station
    lambda g: (g - g.mean()) / g.std(ddof=0)
)
outliers_z = z_scores > 3        # boolean DF same shape as diff

# ---------- 4B. MAD BASED OUTLIERS ----------
def mad_based_flags(series, k=3.5):
    med = series.median()
    mad = np.median(np.abs(series - med))
    # 1.4826 converts MAD to σ for a normal dist.
    return np.abs(series - med) / (1.4826 * mad + 1e-9) > k

outliers_mad = abs_diff.groupby(level=0).transform(mad_based_flags)

# ---------- 4C. ISOLATION FOREST (multivariate) ----------
iso_out = {}
for stn, g in abs_diff.groupby(level=0):

    X = g.values
    mask = np.any(~np.isnan(X), axis=1)          # rows with ≥1 real number
    flags = pd.DataFrame(False, index=g.index, columns=g.columns)

    if mask.sum() >= 20:                         # enough samples to train
        clf = IsolationForest(
            contamination=0.01,
            n_estimators=300,
            random_state=42,
        ).fit(X[mask])

        row_flags = clf.predict(X[mask]) == -1   # 1-D Boolean (outlier rows)

        # --- broadcast row_flags to full (n_rows_selected × n_columns) matrix
        flags.iloc[mask, :] = np.repeat(
            row_flags[:, None], g.shape[1], axis=1
        )

    iso_out[stn] = flags

outliers_iso = pd.concat(iso_out)

# ---------- 5. SUMMARIZE ----------
summary = (
    pd.DataFrame({
        "z_score":  outliers_z.sum(),
        "MAD":      outliers_mad.sum(),
        "iForest":  outliers_iso.sum()
    })
    .rename_axis("variable")
)
print(summary.head())


In [None]:
summary

Compile files from each station into a a single dataframe.

In [None]:
cdf = pd.concat(comp_edd_df, axis=0)
cdf.index.set_names(['stationid','DATETIME_END'],inplace=True)
#cdf.rename(columns={'level_0':'stationid'},inplace=True)
#cdf.to_parquet('../station_data/all_data.parquet')
for col in cdf.columns:
    cdf.rename(columns={col:col.lower()},inplace=True)

Save to Parquet

In [None]:
cdf.to_parquet('../../station_data/all_eddy_data.parquet')

In [None]:

comp_met_df = {}
root_dir = "C:/Users/paulinkenbrandt/Documents/GitHub/MicroMet/src/micromet/data/"
config_path = root_dir + "reformatter_vars.yml"
var_limits_csv = root_dir + "extreme_values.csv"
am = micromet.AmerifluxDataProcessor(config_path, logger)


for key, value in site_folders.items():

    print(key)
    raw_fold = pathlib.Path('G:/Shared drives/UGS_Flux/Data_Downloads/')
    raw_data = am.raw_file_compile(raw_fold, value, search_str = "*Statistics_AmeriFlux*.dat")
    if raw_data is not None:
        am_data = micromet.Reformatter(
                                       config_path=config_path,
                                       var_limits_csv= var_limits_csv,
                                       drop_soil=False,
                                       logger=logger,
                                       )
        am_df = am_data.prepare(raw_data, data_type="met")
        #am_df = am_data.et_data
        comp_met_df[key] = am_df

        #am_df.to_csv(f"../../station_data/{key}_HH_{am_df['TIMESTAMP_START'].values[0]:}_{am_df['TIMESTAMP_END'].values[-1]:}.csv")

        



In [None]:
ddf.columns = ddf.columns.str.lower()

In [None]:
soildfs

for old_col, new_col in mapping.items():
    if str(old_col).lower() in soildfs.columns.str.lower():
        if str(new_col).lower() in soildfs.columns.str.lower():
            soildfs[new_col.lower()] = soildfs[[old_col.lower(), new_col.lower()]].max(axis=1)
            soildfs = soildfs.drop(old_col.lower(), axis=1)
        else:
            soildfs = soildfs.rename(columns={old_col.lower(): new_col.lower()})
    elif str(old_col).lower()+"_eddy" in soildfs.columns.str.lower():
        print(f"Found {old_col} eddy column")
        if str(new_col).lower()+"_eddy" in soildfs.columns.str.lower():
            soildfs[new_col.lower()] = soildfs[[old_col.lower()+"_eddy", new_col.lower()+"_eddy"]].max(axis=1)
            soildfs = soildfs.drop(old_col.lower()+"_eddy", axis=1)
        else:
            soildfs = soildfs.rename(columns={old_col.lower()+"_eddy": new_col.lower()})
    elif str(new_col).lower()+"_eddy" in soildfs.columns.str.lower():
        if str(new_col).lower() in soildfs.columns.str.lower():
            soildfs[new_col.lower()] = soildfs[[new_col.lower()+"_eddy", new_col.lower()+"_eddy"]].max(axis=1)
            soildfs = soildfs.drop(new_col.lower()+"_eddy", axis=1)
            print(f"Found {new_col} eddy column")
        else:
            print(f"Found {new_col} eddy column")
            soildfs = soildfs.rename(columns={new_col.lower()+"_eddy": new_col.lower()})
        


In [None]:
ddf = pd.concat(comp_met_df, axis=0)
ddf.index.set_names(['stationid','DATETIME_END'],inplace=True)
#cdf.rename(columns={'level_0':'stationid'},inplace=True)
#cdf.to_parquet('../station_data/all_data.parquet')
for col in ddf.columns:
    ddf.rename(columns={col:col.lower()},inplace=True)

In [None]:
ddf[~ddf['vwc_2_7_1'].isna()]

In [None]:
ddf.iloc[0:1,:].to_clipboard()

In [None]:
import re

soilcols = [col.lower() for col in am_data.MATH_SOILS_V2]
pattern = re.compile(r"2_1_1|1_2_1|1_1_2")
# Print matching columns
matching_cols = [col for col in soilcols if pattern.search(col)]
# Remove them from the original list
soilcols = [col for col in soilcols if not pattern.search(col)]

        
soildfs = pd.merge(ddf,cdf[soilcols],how='left',on=['stationid','DATETIME_END'],suffixes=(None,'_eddy'))
soildfs

for col in cdf.columns:
    if col in soilcols:
        cdf.drop(columns=col,inplace=True)  # drop the soil columns from the main dataframe

cdf.to_parquet('../../station_data/all_eddy_data.parquet')

soildfs.to_parquet('../../station_data/all_soil_data.parquet')

ddf.to_parquet('../../station_data/all_met_data.parquet')

In [None]:
cdf = pd.read_parquet('../../station_data/all_eddy_data.parquet')


In [None]:
cdf.columns

In [None]:
soildfs = pd.read_parquet('../../station_data/all_soil_data.parquet')
utd_soilt = soildfs.loc['US-UTD'][['ts_3_1_1','ts_3_2_1','ts_3_3_1']].replace(-9999,np.nan)
utd_soilt = utd_soilt[utd_soilt.index >= '2024-07-01']#.resample('30T').mean()
utd_soilt['ts_3_1_1'].plot()
utd_soilt['ts_3_2_1'].shift(-1).plot()
utd_soilt['ts_3_3_1'].shift(-5).plot()
plt.axvline('2024-07-04 15:00',color='r')
#plt.xlim('2024-07-01','2024-07-08')
#plt.ylim(10,35)
plt.grid(True, which='minor')

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from statsmodels.tsa.seasonal import seasonal_decompose
from scipy.signal import correlate

# Function to decompose the seasonal component
def extract_seasonal(ts, period):
    decomposition = seasonal_decompose(ts, model='additive', period=period)
    return decomposition.seasonal

# Function to calculate lag between two seasonal series using cross-correlation
def calculate_lag(seasonal1, seasonal2):
    n = len(seasonal1)
    correlation = correlate(seasonal1 - np.mean(seasonal1), seasonal2 - np.mean(seasonal2), mode='full')
    lags = np.arange(-n + 1, n)
    lag = lags[np.argmax(correlation)]
    return lag, correlation, lags

ts1 = utd_soilt['ts_3_2_1']
ts2 = utd_soilt['ts_3_3_1']
#utd_soilt['ts_3_3_1'].shift(-5).plot()


# Extract seasonal components
seasonal1 = extract_seasonal(ts1, period=48)
seasonal2 = extract_seasonal(ts2, period=48)

# Calculate lag
lag, correlation, lags = calculate_lag(seasonal1.dropna(), seasonal2.dropna())

# Output
print(f"Calculated lag: {lag/2} hours")

# Plot seasonal components and correlation
fig, ax = plt.subplots(3, 1, figsize=(10, 8))

seasonal1.plot(ax=ax[0], label='Seasonal Component 1')
seasonal2.plot(ax=ax[0], label='Seasonal Component 2')
ax[0].legend()
ax[0].set_title('Seasonal Components')
ax[0].set_xlim(pd.to_datetime('2024-07-01'),pd.to_datetime('2024-07-08'))
ax[0].grid(True)

ax[1].plot(lags, correlation)
ax[1].set_title('Cross-Correlation')
ax[1].set_xlabel('Lag (hours)')
ax[1].set_ylabel('Correlation')
ax[1].set_xlim(-10, 10)
ax[1].grid(True)

ax[2].plot(seasonal1.index, seasonal1, label='Series 1')
ax[2].plot(seasonal2.index + pd.Timedelta(hours=lag/2), seasonal2, label='Series 2 (Shifted)')
ax[2].legend()
ax[2].set_title(f'Series alignment (Lag: {lag/2} hours)')
ax[2].set_xlim(pd.to_datetime('2024-07-01'),pd.to_datetime('2024-07-08'))
ax[2].grid(True)
plt.tight_layout()
plt.show()



In [None]:
cdf = pd.read_parquet('../../station_data/all_eddy_data.parquet')
ddf = pd.read_parquet('../../station_data/all_met_data.parquet')

for col in cdf.columns:
    if col in ddf.columns:
        print(col)


In [None]:
ddf.head(10).to_clipboard()

In [None]:
series = ddf.loc['US-UTD','t_si111_body'].replace(-9999,np.nan)
series.plot()
series.diff().plot()
new_series = series[series.diff()<2].diff().cumsum()
new_series.plot()

In [None]:
config = configparser.ConfigParser()

config.read('../../secrets/config.ini')

from sqlalchemy import create_engine
import urllib.parse
host = config['DEFAULT']['ip']
pw = config['DEFAULT']['pw']
user = config['DEFAULT']['login']

encoded_password = urllib.parse.quote_plus(pw)

def postconn_et(encoded_password, host='localhost',user='postgres',port='5432',db='groundwater', schema = 'groundwater'):
    connection_text = "postgresql+psycopg2://{:}:{:}@{:}:{:}/{:}?gssencmode=disable".format(user,encoded_password,host,port,db)
    return create_engine(connection_text, connect_args={'options': '-csearch_path={}'.format(schema)})


engine = postconn_et(encoded_password, host=host, user=user)

In [None]:
cdf.to_sql(name = 'amfluxeddy',
           schema='groundwater',
           con=engine,
           if_exists='replace',
           chunksize=2000)

In [None]:
for col in soildfs.columns:
    print(f"amfluxmet.{col},")

In [None]:
soildfs.to_sql(name = 'amfluxmet',
           schema='groundwater',
           con=engine,
           if_exists='replace',
           chunksize=2000)