In [1]:
%reload_kedro
train_data = catalog.load("train_data")
test_data = catalog.load("eval_data")
best_estimators = catalog.load("best_estimators")
serie_id = catalog.load("params:series_level.columns")
serie_target = catalog.load("params:serie_target")
date_col = catalog.load("params:serie_period")

2022-02-19 21:41:39,190 - kedro.framework.hooks.manager - INFO - Registered hooks from 1 installed plugin(s): kedro-mlflow-0.7.6
2022-02-19 21:41:39,272 - kedro.framework.session.store - INFO - `read()` not implemented for `BaseSessionStore`. Assuming empty store.
2022-02-19 21:41:39,312 - kedro.config.config - INFO - Config from path `/home/matheus/projects/time_series_kedro/conf/local` will override the following existing top-level config keys: fr_horizon, initial, models, n_jobs, sampling, stride, use_exog
2022-02-19 21:41:39,315 - root - INFO - ** Kedro project time_series_kedro
2022-02-19 21:41:39,316 - root - INFO - Defined global variable `context`, `session`, `catalog` and `pipelines`
2022-02-19 21:41:39,333 - root - INFO - Registered line magic `run_viz`
2022-02-19 21:41:39,334 - root - INFO - Registered line magic `reload_kedro_mlflow`
2022-02-19 21:41:39,335 - kedro.io.data_catalog - INFO - Loading data from `train_data` (CSVDataSet)...
2022-02-19 21:41:39,352 - kedro.io.dat

  data_set = class_obj(**config)  # type: ignore
  catalog._data_sets[name] = MlflowMetricsDataSet(prefix=name)


In [3]:

from typing import Any, Dict, List, Optional, Union
from xmlrpc.client import boolean

import numpy as np
from time_series_kedro.pipelines.training.nodes._params_search import build_params_search
from time_series_kedro.pipelines.training.nodes._search import TSModelSearchCV
from time_series_kedro.extras.utils import model_from_string
from pmdarima.model_selection import RollingForecastCV
from sklearn.base import clone, BaseEstimator
import pandas as pd
from tqdm import tqdm
from pandas.tseries.offsets import DateOffset
import warnings

def _search(
    serie_data: pd.DataFrame,
    estimator_base: BaseEstimator,
    model_groups_params: Dict[str, Any],
    serie_target: str,
    date_col: str,
    stride: int,
    fr_horizon: int,
    initial: Union[float, int],
    n_jobs: int,
    score: str,
    use_exog: bool,
    exog_columns: Optional[List[str]]):
    """
    This node train and search the best estimators for a serie.
    Args:
        series_data: DataFrame with train time series.
        serie_id: Column or list of columns that identify series.
        serie_data: DataFrame with time series.
        serie_target: Target column name.
        date_col: Period column name.
        model: Dict with model definition
        stride: Stride of Cross Validation
        fr_horizon: Forecast horizon of Cross Validation
        initial: initial size of train dataset
        n_jobs: number of jobs in Cross Validation process
        score: used metric
    Returns:
        Best Estimator
    """

    serie_group = serie_data.group.iloc[0]
    
    if serie_group in model_groups_params:
        model_group  = serie_group
    elif "all" in model_groups_params:
        model_group = "all"
    else:
        model_group = None
    if model_group is not None:
        params_search = build_params_search(model_groups_params[model_group]["params_search"])
        estimator = clone(estimator_base)
        ts = serie_data.set_index(date_col)[serie_target]
        start_point = int(initial) if initial > 1 else int(initial*ts.shape[0])
        cv = RollingForecastCV(step=stride, h=fr_horizon, initial=start_point)
        search = TSModelSearchCV(clone(estimator), params_search, cv_split=cv, n_jobs=n_jobs, verbose=0, score=score)
        if use_exog:
            X = serie_data[exog_columns + [date_col]].set_index(date_col) if len(exog_columns) else None
        else:
            X = None
        search.fit(np.log1p(ts), X=X)
        result = pd.Series({"estimator": search._best_estimator, "metric": search._best_score})
        
    else:
        result = pd.Series({"estimator": None, "metric": np.nan})
    return result   

def model_selection(serie_id, *best_estimators):

    estimators = pd.concat(best_estimators)
    estimators = estimators.reset_index().groupby(serie_id).apply(lambda data: data.set_index("estimator").metric.idxmin())
    estimators.name = "best_estimator"
    estimators = estimators.reset_index()
    
    return estimators

In [16]:


warnings.filterwarnings("ignore")


def train_model_apply(
    series_data: pd.DataFrame,
    serie_id: Union[str, List],
    serie_target: str,
    date_col: str,
    model: Dict[str, Any],
    stride: int,
    fr_horizon: int,
    initial: Union[float, int],
    train_start: Optional[Dict],
    use_exog: bool, 
    exog_info: Optional[Dict],
    n_jobs: int = -1,
    score: str = "rmse",
    ) -> pd.DataFrame:
    """
    This node train and search the best estimators for each serie.
    Args:
        series_data: DataFrame with train time series.
        serie_id: Column or list of columns that identify series.
        serie_data: DataFrame with time series.
        serie_target: Target column name.
        date_col: Period column name.
        model: Dict with model definition
        stride: Stride of Cross Validation
        fr_horizon: Forecast horizon of Cross Validation
        initial: initial size of train dataset
        n_jobs: number of jobs in Cross Validation process
        score: used metric
    Returns:
        Best Estimators for each serie
    """
    if train_start is not None:
        if train_start["by"] ==  "offset":
            train_start_date = series_data.date.max() - DateOffset(**train_start["date"])
        elif train_start["by"] == "date":
            train_start_date = train_start["date"]
        else:
            raise ValueError(f"Filter by {train_start['by']} was not implemented")
        series_data = series_data[series_data.date >= train_start_date]
    model_groups_params = model["params"]
    exog_list = []
    if exog_info is not None:
        for exog_name in exog_info:
            exog_list += exog_info[exog_name]["target_columns"]
    
    estimator = model_from_string(model["model_class"], model["default_args"])
    best_estimators = series_data.groupby(serie_id).apply(lambda serie_data: _search(serie_data, 
                                                                                            estimator, 
                                                                                            model_groups_params, 
                                                                                            serie_target, 
                                                                                            date_col, 
                                                                                            stride, 
                                                                                            fr_horizon, 
                                                                                            initial,
                                                                                            n_jobs,
                                                                                            score,
                                                                                            use_exog,
                                                                                            exog_list))
    return best_estimators



In [11]:
def train_model(
    series_data: pd.DataFrame,
    serie_id: Union[str, List],
    serie_target: str,
    date_col: str,
    model: Dict[str, Any],
    stride: int,
    fr_horizon: int,
    initial: Union[float, int],
    train_start: Optional[Dict],
    use_exog: bool, 
    exog_info: Optional[Dict],
    n_jobs: int = -1,
    score: str = "rmse",
    ) -> pd.DataFrame:
    """
    This node train and search the best estimators for each serie.
    Args:
        series_data: DataFrame with train time series.
        serie_id: Column or list of columns that identify series.
        serie_data: DataFrame with time series.
        serie_target: Target column name.
        date_col: Period column name.
        model: Dict with model definition
        stride: Stride of Cross Validation
        fr_horizon: Forecast horizon of Cross Validation
        initial: initial size of train dataset
        n_jobs: number of jobs in Cross Validation process
        score: used metric
    Returns:
        Best Estimators for each serie
    """
    if train_start is not None:
        if train_start["by"] ==  "offset":
            train_start_date = series_data.date.max() - DateOffset(**train_start["date"])
        elif train_start["by"] == "date":
            train_start_date = train_start["date"]
        else:
            raise ValueError(f"Filter by {train_start['by']} was not implemented")
        series_data = series_data[series_data.date >= train_start_date]
    model_groups_params = model["params"]
    exog_list = []
    if exog_info is not None:
        for exog_name in exog_info:
            exog_list += exog_info[exog_name]["target_columns"]
    
    estimator = model_from_string(model["model_class"], model["default_args"])
    best_estimators = pd.DataFrame()
    for serie_idx, serie_data in series_data.groupby(serie_id):
        serie_result = _search(serie_data, estimator, model_groups_params, 
                               serie_target, date_col, stride, 
                               fr_horizon, initial,n_jobs,score, 
                               use_exog, exog_list)
        for id_col, id in zip(serie_id, serie_idx):
            serie_result[id_col] = id
        best_estimators = pd.concat((best_estimators, serie_result), ignore_index=True)
    
    return best_estimators


In [12]:
train_data = catalog.load("train_data")
serie_id = catalog.load("params:series_level.columns")
serie_target = catalog.load("params:serie_target")
date_col = catalog.load("params:serie_period")
models = catalog.load("params:models")
initial = catalog.load("params:initial")
stride = catalog.load("params:stride")
fr_horizon = catalog.load("params:fr_horizon")
model = models["exponential_smoothing"]
n_jobs = 4
score = "rmsle"
exog_info = None
train_start = None
use_exog = False

2022-02-19 21:55:27,195 - kedro.io.data_catalog - INFO - Loading data from `train_data` (CSVDataSet)...
2022-02-19 21:55:27,214 - kedro.io.data_catalog - INFO - Loading data from `params:series_level.columns` (MemoryDataSet)...
2022-02-19 21:55:27,215 - kedro.io.data_catalog - INFO - Loading data from `params:serie_target` (MemoryDataSet)...
2022-02-19 21:55:27,217 - kedro.io.data_catalog - INFO - Loading data from `params:serie_period` (MemoryDataSet)...
2022-02-19 21:55:27,217 - kedro.io.data_catalog - INFO - Loading data from `params:models` (MemoryDataSet)...
2022-02-19 21:55:27,218 - kedro.io.data_catalog - INFO - Loading data from `params:initial` (MemoryDataSet)...
2022-02-19 21:55:27,219 - kedro.io.data_catalog - INFO - Loading data from `params:stride` (MemoryDataSet)...
2022-02-19 21:55:27,221 - kedro.io.data_catalog - INFO - Loading data from `params:fr_horizon` (MemoryDataSet)...


In [20]:
%timeit best_estimators = train_model_for(train_data, serie_id, serie_target, \
                                  date_col, model, stride, fr_horizon, \
                                  initial, train_start, use_exog, exog_info, \
                                  n_jobs, score)

6.44 s ± 42.5 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [21]:
%timeit best_estimators = train_model_apply(train_data, serie_id, serie_target, \
                                  date_col, model, stride, fr_horizon, \
                                  initial, train_start, use_exog, exog_info, \
                                  n_jobs, score)

6.5 s ± 204 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
