# Build Machine Learning Dataset


The data retrieval process loops through a range of dates, retrieves and joins RAWS, HRRR, and other data sources and saves to a local directory.

This notebook describes the process of reading that data, performing the final set of quality control filters, and formatting into data that can be fed into the various models used in this project. 

## Setup

In [None]:
import os.path as osp
from datetime import datetime, timezone
from dateutil.relativedelta import relativedelta
import synoptic
import json
import sys
import numpy as np
import polars as pl
import pandas as pd
sys.path.append('../src')
from utils import Dict, read_yml, read_pkl, str2time, print_dict_summary, time_range, rename_dict
import models.moisture_models as mm
import ingest.RAWS as rr
import ingest.HRRR as ih
import data_funcs 

In [None]:
start = "2023-01-01T00:00:00Z"
end = "2023-01-06T23:00:00Z"

In [None]:
params_data = Dict(read_yml("../etc/params_data.yaml"))
print_dict_summary(params_data)

## Retrieve Data

Nested dictionary with top level key corresponding to a RAWS and subkeys for RAWS, atmospheric data (HRRR), geographic info, etc

This format is used because different FMC models used in this project require different data formatting. The ODE+KF physics-based model is run pointwise and does not incorporate info from other locations. The static ML models have the least restrictive input data structure, and all observations can be thrown into one set of tabular data. The RNN models require structuring input data with the format (batch_size, timesteps, features). Thus, it is simpler to keep all data separate at separate locations and recombine in various ways at the modeling step. Also, data filters for suspect RAWS sensors are applied in the next step. This is because the raw data retrieval should not depend on hyperparameter choices related to data filters, so it is easier to collect everything and apply filters later.

In [None]:
paths = ["../data/rocky_fmda/202301/fmda_20230101.pkl", 
         "../data/rocky_fmda/202301/fmda_20230102.pkl",
         "../data/rocky_fmda/202301/fmda_20230103.pkl",
         "../data/rocky_fmda/202301/fmda_20230104.pkl",
         "../data/rocky_fmda/202301/fmda_20230105.pkl",
         "../data/rocky_fmda/202301/fmda_20230106.pkl"
        ]

In [None]:
import importlib
import data_funcs
importlib.reload(data_funcs)
from data_funcs import combine_fmda_files

In [None]:
raws_dict = data_funcs.combine_fmda_files(paths, save_path="../data/test_data/test_fmda_combined.pkl")

## Build ML Dataset

Filter data and merge RAWS and HRRR and other sources. The file `etc/params_data.yaml` has hyperparameters related to filtering data. The steps include:

- Determine atmospheric data source. Intended to be "HRRR" for production, but "RAWS" used for research purposes.
- Combine atmospheric data predictors with FMC
- Break timeseries into 72 hour periods, adding a column "st_period" starting at 0 (see README for info on why 72)
- Apply data filters to 72 hour periods to RAWS data and remove from samples. HRRR data should already be QC'ed, so filtering will not be performed.

In [None]:
params_data

In [None]:
ml_dict = data_funcs.build_ml_data(raws_dict, hours=params_data.hours, 
                                   max_linear_time = params_data.max_linear_time, 
                                   save_path = "../data/test_data/test_ml_dat.pkl")

In [None]:
len(raws_dict.keys())

In [None]:
len(ml_dict.keys())

## Setup CV

In [None]:
train_times, val_times, test_times = data_funcs.cv_time_setup("2023-01-05T00:00:00Z", 
                                                train_hours=48*2, forecast_hours=48)

In [None]:
stids = [*ml_dict.keys()]

tr_sts, val_sts, te_sts = data_funcs.cv_space_setup(stids, random_state=42)

In [None]:
print(val_sts)

In [None]:
print(te_sts)

In [None]:
train = data_funcs.get_sts_and_times(ml_dict, tr_sts, train_times)

In [None]:
val = data_funcs.get_sts_and_times(ml_dict, val_sts, val_times)

In [None]:
test = data_funcs.get_sts_and_times(ml_dict, te_sts, test_times)

## ODE+KF Data

* Run on 72 hour stretches (24 spinup, 48 val)
* Get test station list used by other models
* For those test stations, use `get_sts_and_times` accounting for the spinup period
    * So adjust test times by subtracting 24 hours to account for spinup
 
Function `get_ode_data` wraps the `get_sts_and_times` function... 

In [None]:
ode_data = data_funcs.get_ode_data(ml_dict, te_sts, test_times)

In [None]:
ode = mm.ODE_FMC()
m, errs = ode.run_model(ode_data, hours=72, h2=24)

In [None]:
print(f"RMSE Over Test Period: {errs}")

## Static ML Data

Throw all train/val/test data together without worrying about timesteps samples. In other words, data can all be jumbled up in any order as observations are considered independent in time.

Data is stored as a custom class `StaticMLData` defined in `models/moisture_models.py`. A custom class is used to organize data scaling and inverse scaling. A scaler should be fit using only the training data, and then applied to the val and test data to avoid data leakage. This is done internally in the StaticMLData class. Additionally, the class has methods to print hashes for reproducibility checks, 

In [None]:
from sklearn.preprocessing import MinMaxScaler, StandardScaler

# Dictionary of scalers, used to avoid multiple object creation and to avoid multiple if statements
scalers = {
    'minmax': MinMaxScaler(),
    'standard': StandardScaler() 
}

In [None]:
from abc import ABC, abstractmethod
from utils import hash_ndarray

In [None]:
class MLData(ABC):
    """
    Abstract base class for ML Data, providing support for scaling. 
    Scaling performed on training data and applied to val and test.
    """    
    def __init__(self, train, val=None, test=None, scaler="standard", features_list=None):
        self._run_checks(train, val, test, scaler)

        if scaler not in {"standard", "minmax"}:
            raise ValueError("scaler must be 'standard' or 'minmax'")
        self.scaler = StandardScaler() if scaler == "standard" else MinMaxScaler()
        self.features_list = features_list if features_list is not None else ["Ed", "Ew", "rain"]

        # Setup data fiels, e.g. X_train and y_train
        self._setup_data(train, val, test)
        # Assuming that units are all the same as it was checked in a previous step
        self.units = next(iter(train.values()))["units"]
    
    def _run_checks(self, train, val, test, scaler):
        """Validates input types for train, val, test, and scaler."""
        if not isinstance(train, dict):
            raise ValueError("train must be a dictionary")
        if val is not None and not isinstance(val, dict):
            raise ValueError("val must be a dictionary or None")
        if test is not None and not isinstance(test, dict):
            raise ValueError("test must be a dictionary or None")
        if scaler not in {"standard", "minmax"}:
            raise ValueError("scaler must be 'standard' or 'minmax'")
    
    @abstractmethod
    def _setup_data(self, train, val, test):
        """Abstract method to initialize X_train, y_train, X_val, y_val, X_test, y_test"""
        pass

    def scale_data(self, verbose=True):
        """
        Scales the training data using the set scaler.
        NOTE: this converts pandas dataframes into numpy ndarrays.
        Tensorflow requires numpy ndarrays so this is intended behavior

        Parameters:
        -----------
        verbose : bool, optional
            If True, prints status messages. Default is True.

        Returns:
        ---------
        Nothing, modifies in place
        """        

        if not hasattr(self, "X_train"):
            raise AttributeError("No X_train within object. Run train_test_split first. This is to avoid fitting the scaler with prediction data.")
        if verbose:
            print(f"Scaling training data with scaler {self.scaler}, fitting on X_train")

        # Fit scaler on row-joined training data
        self.scaler.fit(self.X_train)
        # Transform data using fitted scaler
        self.X_train = self.scaler.transform(self.X_train)
        if hasattr(self, 'X_val'):
            if self.X_val is not None:
                self.X_val = self.scaler.transform(self.X_val)
        if self.X_test is not None:
            self.X_test = self.scaler.transform(self.X_test)    

    def inverse_scale(self, save_changes=False, verbose=True):
        """
        Inversely scales the data to its original form. Either save changes internally,
        or return tuple X_train, X_val, X_test

        Parameters:
        -----------
        return_X : str, optional
            Specifies what data to return after inverse scaling. Default is 'all_hours'.
        save_changes : bool, optional
            If True, updates the internal data with the inversely scaled values. Default is False.
        verbose : bool, optional
            If True, prints status messages. Default is True.
        """        
        if verbose:
            print("Inverse scaling data...")
        X_train = self.scaler.inverse_transform(self.X_train)
        X_val = self.scaler.inverse_transform(self.X_val)
        X_test = self.scaler.inverse_transform(self.X_test)

        if save_changes:
            print("Inverse transformed data saved")
            self.X_train = X_train
            self.X_val = X_val
            self.X_test = X_test
        else:
            if verbose:
                print("Inverse scaled, but internal data not changed.")
            return X_train, X_val, X_test    
    
    def print_hashes(self, attrs_to_check = ['X_train', 'y_train', 'X_val', 'y_val', 'X_test', 'y_test']):
        """
        Prints the hash of specified data attributes.

        Parameters:
        -----------
        attrs_to_check : list, optional
            A list of attribute names to hash and print. Default includes 'X', 'y', and split data.
        """
        
        for attr in attrs_to_check:
            if hasattr(self, attr):
                value = getattr(self, attr)
                print(f"Hash of {attr}: {hash_ndarray(value)}") 

In [None]:
class StaticMLData(MLData):
    """
    Custom class to handle data scaling and extracting from dictionaries. 
    Static combines all data in train/val/test as independent observations in time. 
    So timeseries are not maintained and a single "sample" is one hour of data
    Inherits from MLData class.
    """    
    def _setup_data(self, train, val, test, y_col="fm", verbose=True):
        """
        Combines all DataFrames under 'data' keys for train, val, and test. 
        Static data does not keep track of timeseries, and throws all instantaneous samples into the same pool
        If train and val are None, still create those names as None objects

        Creates numpy ndarrays X_train, y_train, X_val, y_val, X_test, y_test
        """
        if verbose:
            print(f"Subsetting input data to {self.features_list}")

        
        X_train = self._combine_data(train)
        self.y_train = X_train[y_col].to_numpy()
        self.X_train = X_train[self.features_list].to_numpy()

        self.X_val, self.y_val = (None, None)
        if val:
            X_val = self._combine_data(val)
            self.y_val = X_val[y_col].to_numpy()
            self.X_val = X_val[self.features_list].to_numpy()
    
        self.X_test, self.y_test = (None, None)
        if test:
            X_test = self._combine_data(test)
            self.y_test = X_test[y_col].to_numpy()
            self.X_test = X_test[self.features_list].to_numpy()

        if verbose:
            print(f"X_train shape: {self.X_train.shape}, y_train shape: {self.y_train.shape}")
            if self.X_val is not None:
                print(f"X_val shape: {self.X_val.shape}, y_val shape: {self.y_val.shape}")
            if self.X_test is not None:
                print(f"X_test shape: {self.X_test.shape}, y_test shape: {self.y_test.shape}")
            
    def _combine_data(self, data_dict):
        """Combines all DataFrames under 'data' keys into a single DataFrame."""
        return pd.concat([v["data"] for v in data_dict.values()], ignore_index=True)    
 

In [None]:
x = StaticMLData(train, val, test)

In [None]:
x.X_train.shape

In [None]:
x.X_train[:, 0].mean()

In [None]:
x.scale_data()

In [None]:
x.X_train[:, 0].mean()

In [None]:
x.print_hashes()