## Time

>The objective of this notebook is to implement utilities to ease the time handling

In [None]:
#| default_exp utils.time

In [None]:
#| hide
from nbdev.showdoc import *
import numpy as np

In [None]:
#| export
import pandas as pd
import xarray as xr

## Forecast time handling

Time series forecasting often involves multiple time indices (e.g., run time and forecast time) which can lead to alignment errors if not handled carefully. This class provides a unified way to manage these indices and prevent common mistakes in forecast data manipulation. The class provides methods to convert between two indexing ways:

- Forecast horizons as columns (e.g., t+1, t+2 columns)
- Forecast horizons and times as row indices

The first format is convenient for saving data, while the second is better suited for scoring and plotting since it explicitly tracks the actual forecast times.

In [None]:
#| export
class ForecastTimeHandler:
    """
    A utility class for handling forecast time transformations.

    This class provides functionality to manipulate forecast time between different formats,
    specifically handling the conversion between columnar forecast horizons and stacked time series formats.
    It manages forecast horizons (e.g., 't+1', 't+2') and their corresponding timestamps.
    """
    def __init__(
            self,
            run_time_col_name: str = "run_time", # Name of the time index that represent the time from which the forecast is made
            stack_col_name: str = "pred" # Name of the column when columns are stacked
            ):
        self.stack_col_name = stack_col_name
        self.run_time_col_name = run_time_col_name

    def stack(self, df: pd.DataFrame) -> pd.DataFrame:
        """Stack the forecast horizon as index and add forecast time as index"""
        df = df.copy()
        if df.columns.name is None:
            df.columns.name = "forecast_horizon"
        df = self.transpose_forecast_horizon_as_index(df)
        df = self.add_forecast_time_as_index(df)
        return df
    
    def transpose_forecast_horizon_as_index(self, df: pd.DataFrame) -> pd.DataFrame:
        df=df.stack()
        df = df.to_frame(self.stack_col_name)
        return df

    def add_forecast_time_as_index(self, df: pd.DataFrame) -> pd.DataFrame:
        def get_daily_timedeltas(forecast_horizons: pd.Index) -> list[pd.Timedelta]:
            """Extract timedelta day values from forecast horizons starting with 't+'"""
            return [pd.Timedelta(days=int(fh.replace("t+", ""))) for fh in forecast_horizons if fh.startswith('t+')]
        df = df.copy()
        forecast_horizon = df.index.get_level_values("forecast_horizon")
        timedeltas = get_daily_timedeltas(forecast_horizon)
        df["forecast_time"] = df.index.get_level_values(self.run_time_col_name) + pd.Index(timedeltas)
        df.set_index("forecast_time", inplace=True, append=True)
        return df
    
    def unstack(self, df: pd.DataFrame) -> pd.DataFrame:
        """Convert stacked forecast horizon index back to horizon-as-columns format"""
        return df.reset_index("forecast_time", drop=True)[self.stack_col_name].unstack("forecast_horizon")
    
    def align(self,
            pred: pd.DataFrame, # Predictions
            obs: pd.DataFrame, # Observations
            stack_pred: bool = True, # Set to false if predictions already have forecast time as index
            how: str = "left", # How to align the data
            **kwargs
        ) -> tuple[pd.DataFrame, pd.DataFrame]:
        """Align the predictions and observations by forecast time"""
        obs, pred = obs.copy(), pred.copy()
        if stack_pred:
            pred = self.stack(pred)
        obs_index_name = obs.index.name
        obs.index.name = "forecast_time"
        pred, obs = pred.align(obs, join=how, axis=0, **kwargs)
        obs.index.name = obs_index_name
        return pred, obs
    
    def align_as_xarray(self, 
            pred: pd.DataFrame, # Predictions
            obs: pd.DataFrame, # Observations
            stack_pred: bool = True, # Set to false if predictions already have forecast time as index
            how: str = "left", # How to align the data
            **kwargs
        )-> tuple[xr.DataArray, xr.DataArray]:
        """Align the predictions and observations by forecast horizon and return as xarray"""
        if 1 != len(obs.columns):
            raise ValueError("Observations must have only one column")
        obs_col = obs.columns[0]
        pred_col = self.stack_col_name

        obs, pred = obs.copy(), pred.copy()
        pred, obs = self.align(pred, obs, stack_pred=stack_pred, how=how, **kwargs)

        self.stack_col_name = obs_col
        obs = self.unstack(obs)
        obs = obs.to_xarray().to_array("forecast_horizon", name=obs_col)
        self.stack_col_name = pred_col
        pred = self.unstack(pred)
        pred = pred.to_xarray().to_array("forecast_horizon", name=pred_col)
        
        return pred, obs
    
    def join(self,
            pred: pd.DataFrame, # Predictions
            obs: pd.DataFrame, # Observations
            stack_pred: bool = True, # Set to false if predictions already have forecast time as index
            **kwargs
        ) -> pd.DataFrame:
        """Join the predictions and observations by forecast time"""
        obs, pred = obs.copy(), pred.copy()
        if stack_pred:
            pred = self.stack(pred)
        obs.index.name = "forecast_time"
        return pred.join(obs, on="forecast_time", **kwargs)

    
    def join_as_xarray(self,
            pred: pd.DataFrame, # Predictions
            obs: pd.DataFrame, # Observations
            stack_pred: bool = True, # Set to false if predictions already have forecast time as index
            **kwargs
        ) -> xr.DataArray:
        """Align the predictions and observations by forecast horizon and join them as xarray"""
        joint = self.join(pred, obs, stack_pred=stack_pred, **kwargs)
        joint = self.unstack(joint)
        return joint.to_xarray().to_array("forecast_horizon")


In [None]:
#| hide
def create_test_data():
    """Create sample forecast data for testing"""
    run_times = pd.date_range(start='2023-01-01', end='2023-01-10', freq='D', name="run_time")  
    horizons = [f't+{i}' for i in range(1, 4)] # Forecast horizons (t+1, t+2, t+3)
    predictions = pd.DataFrame(index=run_times, columns=horizons, data=np.random.rand(len(run_times), len(horizons)) * 100)
    
    # Create matching observations
    obs_times = pd.date_range(start='2023-01-03', end='2023-01-15', freq='D', name="time")  # t+1 starts one day after
    observations = pd.DataFrame(index=obs_times, columns=['obs'], data=np.random.rand(len(obs_times)) * 100)
    
    return predictions, observations

We will first create some syntethic data

In [None]:
forecast, observations = create_test_data()
forecast.head(3)

Unnamed: 0_level_0,t+1,t+2,t+3
run_time,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
2023-01-01,79.208846,81.737806,36.44788
2023-01-02,11.969587,87.031815,52.113039
2023-01-03,17.593355,62.932074,91.113724


In [None]:
observations.head(3)

Unnamed: 0_level_0,obs
time,Unnamed: 1_level_1
2023-01-03,29.9769
2023-01-04,7.623766
2023-01-05,36.682855


We will now create an instance of the ForecastTimeHandler class

In [None]:
frcst_time_handler = ForecastTimeHandler(run_time_col_name="run_time", stack_col_name="pred")

In [None]:
show_doc(ForecastTimeHandler.stack)

---

### ForecastTimeHandler.stack

>      ForecastTimeHandler.stack (df:pandas.core.frame.DataFrame)

*Stack the forecast horizon as index and add forecast time as index*

We can stack the data to convert from forecast horizons as columns to having forecast time as an index. This format is better suited for scoring and plotting.

In [None]:
stacked_forecast = frcst_time_handler.stack(forecast)
stacked_forecast.head()

Unnamed: 0_level_0,Unnamed: 1_level_0,Unnamed: 2_level_0,pred
run_time,forecast_horizon,forecast_time,Unnamed: 3_level_1
2023-01-01,t+1,2023-01-02,79.208846
2023-01-01,t+2,2023-01-03,81.737806
2023-01-01,t+3,2023-01-04,36.44788
2023-01-02,t+1,2023-01-03,11.969587
2023-01-02,t+2,2023-01-04,87.031815


In [None]:
def test_stack():
    """Test stacking forecast horizons"""
    pred, _ = create_test_data()
    handler = ForecastTimeHandler()
    
    stacked = handler.stack(pred)
    
    if not isinstance(stacked, pd.DataFrame):
        raise ValueError(f"Expected DataFrame but got {type(stacked)}")
    
    if not all(idx in stacked.index.names for idx in ['run_time', 'forecast_horizon', 'forecast_time']):
        raise ValueError(f"Missing expected index levels. Got {stacked.index.names}")
    
    if not stacked.shape[0] == pred.shape[0] * pred.shape[1]:
        raise ValueError(f"Wrong number of rows after stacking. Expected {pred.shape[0] * pred.shape[1]}, got {stacked.shape[0]}")
    
test_stack()

In [None]:
show_doc(ForecastTimeHandler.unstack)

---

### ForecastTimeHandler.unstack

>      ForecastTimeHandler.unstack (df:pandas.core.frame.DataFrame)

*Convert stacked forecast horizon index back to horizon-as-columns format*

We can simply revert this operation as follows

In [None]:
unstacked_forecast = frcst_time_handler.unstack(stacked_forecast)
unstacked_forecast.head()

forecast_horizon,t+1,t+2,t+3
run_time,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
2023-01-01,79.208846,81.737806,36.44788
2023-01-02,11.969587,87.031815,52.113039
2023-01-03,17.593355,62.932074,91.113724
2023-01-04,43.615642,88.209958,89.260476
2023-01-05,21.990947,61.462943,89.639517


In [None]:
def test_unstack():
    """Test unstacking forecast horizons"""
    pred, _ = create_test_data()
    handler = ForecastTimeHandler()
    
    # Stack and then unstack
    stacked = handler.stack(pred)
    unstacked = handler.unstack(stacked)
    
    if not unstacked.shape == pred.shape:
        raise ValueError(f"Shape mismatch after unstacking: {unstacked.shape} vs {pred.shape}")
    
    if not np.allclose(unstacked.values, pred.values):
        raise ValueError("Values don't match after stack/unstack cycle")
    
test_unstack()

In [None]:
show_doc(ForecastTimeHandler.align)

---

### ForecastTimeHandler.align

>      ForecastTimeHandler.align (pred:pandas.core.frame.DataFrame,
>                                 obs:pandas.core.frame.DataFrame,
>                                 stack_pred:bool=True, how:str='left',
>                                 **kwargs)

*Align the predictions and observations by forecast time*

|    | **Type** | **Default** | **Details** |
| -- | -------- | ----------- | ----------- |
| pred | DataFrame |  | Predictions |
| obs | DataFrame |  | Observations |
| stack_pred | bool | True | Set to false if predictions already have forecast time as index |
| how | str | left | How to align the data |
| kwargs | VAR_KEYWORD |  |  |
| **Returns** | **tuple** |  |  |

This operation will rename the columns index name as "forecast_horizon". Lets try few more things we can do. We can align indexes:

In [None]:
aligned_frcst, aligned_obs = frcst_time_handler.align(forecast, observations, stack_pred=True)
aligned_frcst.head(3)


Unnamed: 0_level_0,Unnamed: 1_level_0,Unnamed: 2_level_0,pred
run_time,forecast_horizon,forecast_time,Unnamed: 3_level_1
2023-01-01,t+1,2023-01-02,79.208846
2023-01-01,t+2,2023-01-03,81.737806
2023-01-01,t+3,2023-01-04,36.44788


In [None]:
aligned_obs.head(3)

Unnamed: 0_level_0,Unnamed: 1_level_0,Unnamed: 2_level_0,obs
run_time,forecast_horizon,forecast_time,Unnamed: 3_level_1
2023-01-01,t+1,2023-01-02,
2023-01-01,t+2,2023-01-03,29.9769
2023-01-01,t+3,2023-01-04,7.623766


In [None]:
def test_align():
    """Test alignment of predictions and observations"""
    pred, obs = create_test_data()
    handler = ForecastTimeHandler()
    
    # Set the index name for observations before alignment
    aligned_pred, aligned_obs = handler.align(pred, obs, stack_pred=True)
    
    pred_times = set(aligned_pred.index.get_level_values('forecast_time'))
    obs_times = set(aligned_obs.index.get_level_values('forecast_time'))
    
    if not len(pred_times.intersection(obs_times)):
        raise ValueError("No common forecast times between predictions and observations")
    if not handler.stack(pred).index.size == aligned_obs.index.size:
        raise ValueError("Default alignment method was changed and this might break expected behavior")
    
    if not all(forecast_idx == obs_idx for forecast_idx, obs_idx in zip(handler.stack(pred).index, aligned_obs.index)):
        raise ValueError("Index missmatch when aligning")
    
    # Set the index name for observations before alignment
    aligned_pred, aligned_obs = handler.align(pred, obs, stack_pred=True, how="inner")
    pred_times = sorted(set(aligned_pred.index.get_level_values('forecast_time')))
    obs_times = sorted(set(aligned_obs.index.get_level_values('forecast_time')))
    if not pred_times == obs_times:
        raise ValueError("Custom alignment method didn't align correctly")

test_align()

In [None]:
show_doc(ForecastTimeHandler.align_as_xarray)

---

### ForecastTimeHandler.align_as_xarray

>      ForecastTimeHandler.align_as_xarray (pred:pandas.core.frame.DataFrame,
>                                           obs:pandas.core.frame.DataFrame,
>                                           stack_pred:bool=True,
>                                           how:str='left', **kwargs)

*Align the predictions and observations by forecast horizon and return as xarray*

|    | **Type** | **Default** | **Details** |
| -- | -------- | ----------- | ----------- |
| pred | DataFrame |  | Predictions |
| obs | DataFrame |  | Observations |
| stack_pred | bool | True | Set to false if predictions already have forecast time as index |
| how | str | left | How to align the data |
| kwargs | VAR_KEYWORD |  |  |
| **Returns** | **tuple** |  |  |

We can do the same thing but getting an xarray datarrays as output. Remember that in this case forcast data must be given with forecast horizons as columns

In [None]:
forcast_ds, obs_ds = frcst_time_handler.align_as_xarray(forecast, observations)
forcast_ds

In [None]:
obs_ds

In [None]:
def test_align_as_xarray():
    """Test alignment and conversion to xarray"""
    pred, obs = create_test_data()
    handler = ForecastTimeHandler()
    
    try:
        pred_xr, obs_xr = handler.align_as_xarray(pred, obs)
    except Exception as e:
        raise ValueError(f"Failed to convert to xarray: {str(e)}")
    
    if not 'forecast_horizon' in pred_xr.dims:
        raise ValueError("Missing forecast_horizon dimension in xarray output")
    
test_align_as_xarray()

In [None]:
show_doc(ForecastTimeHandler.join)

---

### ForecastTimeHandler.join

>      ForecastTimeHandler.join (pred:pandas.core.frame.DataFrame,
>                                obs:pandas.core.frame.DataFrame,
>                                stack_pred:bool=True, **kwargs)

*Join the predictions and observations by forecast time*

|    | **Type** | **Default** | **Details** |
| -- | -------- | ----------- | ----------- |
| pred | DataFrame |  | Predictions |
| obs | DataFrame |  | Observations |
| stack_pred | bool | True | Set to false if predictions already have forecast time as index |
| kwargs | VAR_KEYWORD |  |  |
| **Returns** | **DataFrame** |  |  |

Finally we can also join the data by forecast time index

In [None]:
joint = frcst_time_handler.join(forecast, observations, stack_pred=True)
joint.head(3)

Unnamed: 0_level_0,Unnamed: 1_level_0,Unnamed: 2_level_0,pred,obs
run_time,forecast_horizon,forecast_time,Unnamed: 3_level_1,Unnamed: 4_level_1
2023-01-01,t+1,2023-01-02,79.208846,
2023-01-01,t+2,2023-01-03,81.737806,29.9769
2023-01-01,t+3,2023-01-04,36.44788,7.623766


In [None]:
def test_join():
    """Test joining predictions with observations"""
    pred, obs = create_test_data()
    handler = ForecastTimeHandler()
    
    joined = handler.join(pred, obs, stack_pred=True)
    
    if not joined.shape[0] == handler.stack(pred).shape[0]:
        raise ValueError(f"Wrong number of rows in joined data")
    
test_join()

In [None]:
show_doc(ForecastTimeHandler.join_as_xarray)

---

### ForecastTimeHandler.join_as_xarray

>      ForecastTimeHandler.join_as_xarray (pred:pandas.core.frame.DataFrame,
>                                          obs:pandas.core.frame.DataFrame,
>                                          stack_pred:bool=True, **kwargs)

*Align the predictions and observations by forecast horizon and join them as xarray*

|    | **Type** | **Default** | **Details** |
| -- | -------- | ----------- | ----------- |
| pred | DataFrame |  | Predictions |
| obs | DataFrame |  | Observations |
| stack_pred | bool | True | Set to false if predictions already have forecast time as index |
| kwargs | VAR_KEYWORD |  |  |
| **Returns** | **DataArray** |  |  |

And do the same thing and get a xarray dataset as output

In [None]:
frcst_time_handler.join_as_xarray(forecast, observations)

In [None]:
def test_multiple_columns_obs():
    """Test that error is raised for multiple observation columns"""
    pred, obs = create_test_data()
    obs['second_col'] = obs['obs']
    handler = ForecastTimeHandler()
    
    try:
        handler.align_as_xarray(pred, obs)
        raise ValueError("Expected ValueError for multiple observation columns but none was raised")
    except ValueError as e:
        if str(e) != "Observations must have only one column":
            raise ValueError(f"Wrong error message: {str(e)}")
        
test_multiple_columns_obs()

In [None]:
#| hide
import nbdev; nbdev.nbdev_export()