# Line up precipitation and streamflow historical tabular data

Before lining up tabular data, the following steps is taken to clean and prepare the data for next steps:

1. Convert streamflow cubic foot per second to cubic meters per second.
2. Fill in missing values for precipitation stations with zeros (the original source file omits non precipitation data. Therefore we assume all non precipitation dates to be zero).
3. Convert height of precipitation in foot to meter. 
4. Separete quickflow from baseflow

In [None]:
from pathlib import Path
import geopandas as gpd
import pandas as pd
import sys
from tqdm.notebook import tqdm


In [2]:
# Custom Modules
project_root_path = Path.cwd().parent.parent
sys.path.append(str(project_root_path / 'src'))

from preprocessing.lyne_hollick_filter  import lyne_hollick_filter

In [3]:
# Define help function
import re
import ast
from typing import Any

def parse_np_float64_string(s: str) -> Any:
    """
    Safely parse a string representing a list/dict containing np.float64 calls.
    It replaces np.float64(NUMBER) with NUMBER and evaluates the cleaned string.

    Parameters
    ----------
    s : str
        The string to parse.

    Returns
    -------
    Any
        The resulting Python object (list, dict, etc.).
    """
    # Replace np.float64(NUMBER) with NUMBER
    cleaned = re.sub(r'np\.float64\(([^)]+)\)', r'\1', s)
    # Safely parse as Python literal
    return ast.literal_eval(cleaned)

## Load Data

In [4]:
# File paths
vector_ppt_path = project_root_path / 'data/silver/geo/gpkg/study_area_ppt_stn.gpkg'
wsdir = project_root_path / 'data/gold/geo/gpkg/watersheds_with_thiessen_info'

In [5]:
# Load the watershed data
ws_list_path_list = wsdir.glob('*.gpkg')
ws_gdf_list = []
for ws_path in ws_list_path_list:
    ws = gpd.read_file(ws_path)
    ws_gdf_list.append(ws)

ppt_gdf = gpd.read_file(vector_ppt_path)


## Line up historical data

In [10]:
# Function to clean streaflow data
def clean_streamflow_data(df: pd.DataFrame) -> pd.DataFrame:

    df['discharge'] = df['discharge_cfs']* (0.3048**3)
    df = df[['dateTime','discharge']]
    df.columns = ['date', 'discharge']

    return df

# Function to  clean precipitation data
def clean_ppt_data(df: pd.DataFrame) -> pd.DataFrame:
    df['date'] = df['date'].dt.tz_localize('UTC')
    df['height'] = df['height']*0.3048
    df['height'] = df['height'].fillna(0.0)
    df = df[['date', 'height']]

    return df

# Function to line up the data
def line_up_data(ws_gdf: gpd.GeoDataFrame, ppt_gdf: gpd.GeoDataFrame, streamflow_data_dir: Path, ppt_data_dir: Path) -> pd.DataFrame:
    
    # Get metadata for the watershed
    id = ws_gdf['stnid'].iloc[0]
    dict_data = parse_np_float64_string(ws_gdf['intersecting_ppt_info'].iloc[0])
    ppt_ids = [id['ppt_stnid'] for id in dict_data]
    stn_weight = [d['relative_area'] for d in dict_data]
    
    # Load and clean tabular streamflow data for the watershed
    streamflow_path = streamflow_data_dir / f'streamflow_{id}.parquet'
    if not streamflow_path.exists():
        raise FileNotFoundError(f"Streamflow data for watershed {id} not found at {streamflow_path}")
    streamflow_df = pd.read_parquet(streamflow_path)
    streamflow_df = clean_streamflow_data(streamflow_df)

    # Load and clean precipitation data for each ppt station in the watershed
    for sntid, weight in zip(ppt_ids, stn_weight):
        ppt_path = ppt_data_dir / f'{sntid}.parquet'
        if not ppt_path.exists():
            raise FileNotFoundError(f"Precipitation data for station {sntid} not found at {ppt_path}")

        ppt_df = pd.read_parquet(ppt_path)
        ppt_df = clean_ppt_data(ppt_df)
        ppt_df = ppt_df.rename(columns={'height': f'height_{sntid}'})

        # Left join the streamflow data with the precipitation data
        streamflow_df = streamflow_df.merge(
            ppt_df,
            how='left',
            on='date'
        )

        # Update weighted average height
        if 'height' not in streamflow_df.columns:
            streamflow_df['height'] = streamflow_df[f'height_{sntid}']* weight
        else:
            streamflow_df['height'] += streamflow_df[f'height_{sntid}'] * weight
    
    # Clip date range to the available precipitation data
    return streamflow_df.dropna(subset=['height']), id

    


In [11]:
streamflow_data_dir = project_root_path / 'data/bronze/tabular/streamflow'
ppt_dir = project_root_path / 'data/silver/tabular/precipitation'
lined_up_data, ws_id = line_up_data(ws_gdf_list[2], ppt_gdf, streamflow_data_dir, ppt_dir)
lined_up_data.head()

Unnamed: 0,date,discharge,height_USC00361726,height,height_USC00366111
1996,2014-02-25 00:00:00+00:00,240.976364,0.0,0.0,0.0
1997,2014-02-25 00:15:00+00:00,240.126859,0.0,0.0,0.0
1998,2014-02-25 00:30:00+00:00,239.560522,0.0,0.0,0.0
1999,2014-02-25 00:45:00+00:00,238.711017,0.0,0.0,0.0
2000,2014-02-25 01:00:00+00:00,238.711017,0.0,0.0,0.0


#### Baseflow separation

In [13]:
lined_up_data['baseflow'] = lyne_hollick_filter(lined_up_data['discharge'])
lined_up_data

Unnamed: 0,date,discharge,height_USC00361726,height,height_USC00366111,baseflow
1996,2014-02-25 00:00:00+00:00,240.976364,0.0,0.0,0.0,122.628753
1997,2014-02-25 00:15:00+00:00,240.126859,0.0,0.0,0.0,104.082016
1998,2014-02-25 00:30:00+00:00,239.560522,0.0,0.0,0.0,87.860143
1999,2014-02-25 00:45:00+00:00,238.711017,0.0,0.0,0.0,73.191665
2000,2014-02-25 01:00:00+00:00,238.711017,0.0,0.0,0.0,61.002805
...,...,...,...,...,...,...
374874,2025-04-01 22:45:00+00:00,60.598052,0.0,0.0,0.0,1.044401
374875,2025-04-01 23:00:00+00:00,60.598052,0.0,0.0,0.0,0.915148
374876,2025-04-01 23:15:00+00:00,60.881220,0.0,0.0,0.0,1.081438
374877,2025-04-01 23:30:00+00:00,61.164389,0.0,0.0,0.0,1.249573


### Batch Process

In [None]:

# Define paths

output_dir = project_root_path / 'data/gold/tabular/lined_up_streamflow'
output_dir.mkdir(parents=True, exist_ok=True)

err_log = []
for ws_gdf in tqdm(ws_gdf_list, desc="Lining up data for watersheds"):

    # Line up the data for each watershed
    try:
        lined_up_data, ws_id = line_up_data(ws_gdf, ppt_gdf, streamflow_data_dir, ppt_dir)
        

        # Save the lined up data to a file
        output_path = output_dir / f'lined_up_streamflow_{ws_id}.parquet'
        lined_up_data.to_parquet(output_path)

    except FileNotFoundError as e:
        print(f"Error processing watershed {ws_gdf['stnid'].iloc[0]}: {e}")
line_up_data(ws_gdf_list[2], ppt_gdf, streamflow_data_dir, ppt_dir)

Lining up data for watersheds:   0%|          | 0/508 [00:00<?, ?it/s]

ArrowKeyError: Attempted to register factory for scheme 'file' but that scheme is already registered.