In [1]:
from functools import partial
from multiprocessing import Pool
from typing import Callable, List, Tuple, Type, Dict
from uuid import uuid4

import mlflow
import optuna
import pandas as pd
from tqdm import tqdm
import keras
from keras import layers
import numpy as np
import pmdarima as pm

from src import metrics, models
from src.datasets import DATASET_FACTORY_LOOKUP
from src.splitters import AnchoredSplitter, Splitter
from src.ts_models import ModelClassLookupCallback
from src.ts_models import SARIMAModel, RNNModel
from src.hybrid_model import HybridModel

In [None]:
class SARIMAModel:

    def __init__(self, m: int):
        self.m = m
        self.model = None

    def fit(self, y: pd.Series) -> None:
        self.model = pm.auto_arima(y, seasonal=True, m=self.m)

    def predict_one_ahead(self) -> float:
        return self.model.predict(n_periods=1)[0]

    @staticmethod
    def suggest_params(trial: optuna.Trial) -> Dict[str, int]:
        return {}


class RNNModel:

    def __init__(self,
                 timesteps: int,
                 hidden_units: int,
                 epochs: int,
                 m: int = None):
        self.timesteps = timesteps
        self.hidden_units = hidden_units
        self.epochs = epochs

    def _create_model(self) -> keras.Sequential:
        model = keras.Sequential()
        model.add(
            layers.SimpleRNN(
                units=self.hidden_units,
                activation="relu",
                input_shape=(self.timesteps, 1),
            ))
        model.add(layers.Dense(1))
        model.compile(optimizer="adam", loss="mse")
        return model

    def fit(self, y: pd.Series, X: pd.DataFrame) -> None:

        y = y.values
        X = X.values.reshape(X.shape[0], self.timesteps, 1)

        self.model = self._create_model()
        self.model.fit(X, y, epochs=self.epochs, verbose=0)

    def predict_one_ahead(self, X:pd.DataFrame) -> float:
        if hasattr(self, "model"):
            return self.model.predict(X.values.reshape(1, self.timesteps, 1))[0][0]

    @staticmethod
    def suggest_params(trial: optuna.Trial) -> dict:
        timesteps = trial.suggest_categorical("timesteps",
                                              [2, 4, 6, 12, 24, 36])
        hidden_units = trial.suggest_int("hidden_units", 5, 25, 5)
        epochs = trial.suggest_int("epochs", 300, 800, 100)

        return {
            "timesteps": timesteps,
            "hidden_units": hidden_units,
            "epochs": epochs
        }

In [None]:
class LinearModelInteractor:
    pass

In [None]:
def split_trains_test(
    y: pd.Series,
    splitter: Splitter,
) -> Tuple[List[pd.Series], pd.Series]:

    slices = splitter.split(y)

    train_indexes_list = [s[0] for s in slices]
    test_indexes_list = [s[1] for s in slices]

    indexes = [idx[0] for idx in test_indexes_list]

    trains = [y.loc[idx] for idx in train_indexes_list]
    test = y.loc[indexes]

    return trains, test

In [None]:
X = pd.DataFrame(np.random.random(size=(len(dataset), 4)), columns=list('ABCD'))
split_trains_test(
    X, AnchoredSplitter(min_train_points=dataset.train_size))

In [None]:
dataset_name = "AIR_PASSENGERS"
dataset = DATASET_FACTORY_LOOKUP[dataset_name]()

trains, test = split_trains_test(
    dataset, AnchoredSplitter(min_train_points=dataset.train_size))

In [None]:
preds = fit_trains_and_predict_next(trains, test, model_class, params)

results = metrics.generate_all_metrics(preds, test)

In [None]:
def objective(
    trial: optuna.Trial,
    model_class: Type[models.OneAheadModel],
    suggest_params: Callable[[optuna.Trial], dict],
    train: pd.Series,
) -> float:

    parameters = suggest_params(trial)

    trains, test = split_trains_test(train,
                                     AnchoredSplitter(min_train_points=40))
    preds = fit_trains_and_predict_next(trains, test, model_class, parameters)

    trial.set_user_attr("metrics", metrics.generate_all_metrics(preds, test))

    return metrics.rmse(preds, test)


def split_trains_test(
    y: pd.Series,
    splitter: Splitter,
) -> Tuple[List[pd.Series], pd.Series]:

    slices = splitter.split(y)

    train_indexes_list = [s[0] for s in slices]
    test_indexes_list = [s[1] for s in slices]

    indexes = [idx[0] for idx in test_indexes_list]

    trains = [y.loc[idx] for idx in train_indexes_list]
    test = y.loc[indexes]

    return trains, test


def _predict(
    model_class: Type[models.OneAheadModel],
    parameters: dict,
    train: dict,
) -> float:
    model = model_class(**parameters)
    model.fit(train)
    return model.predict_one_ahead()


def fit_trains_and_predict_next(
    trains: List[pd.Series],
    test: pd.Series,
    model_class: Type[models.OneAheadModel],
    parameters: dict,
) -> pd.Series:

    partial_predict = partial(_predict, model_class, parameters)

    preds = []
    for train in tqdm(trains, ):
        preds.append(partial_predict(train))

    return pd.Series(preds, index=test.index, name=test.name)


def tune_hyperparameters(
    study_name: str,
    model_class: Type[models.OneAheadModel],
    train: pd.Series,
    n_trials: int = 5,
) -> optuna.trial.Trial:

    study = optuna.create_study(
        study_name=study_name,
        direction="minimize",
        storage="sqlite:///optuna.db",
        load_if_exists=True,
    )

    partial_objective = partial(
        objective,
        model_class=model_class,
        suggest_params=model_class.suggest_params,
        train=train,
    )
    study.optimize(partial_objective, n_trials=n_trials)

    return study.best_value, study.best_trial


def train_and_test_model():
    dataset_name = "AIR_PASSENGERS"

    # model_name = "RNN"
    # params = {"epochs": 700, "hidden_units": 25, "timesteps": 12}
    # model_name = "SARIMA"
    # params = {"m": 12}
    # model_name = "NAIVE"
    # params = {"constant": 0}
    # model_name = "ARIMA"
    # params = {}
    model_name = "SARIMA_SVR"
    params = {"svr_C": 0.011103136450426, "svr_kernel": "linear"}

    filepath = f"src/data/results/{dataset_name}_{model_name}_{uuid4().hex}.csv"

    dataset = DATASET_FACTORY_LOOKUP[dataset_name]()
    model_class = ModelClassLookupCallback(model_name, dataset.period)

    with mlflow.start_run():
        trains, test = split_trains_test(
            dataset, AnchoredSplitter(min_train_points=dataset.train_size))
        preds = fit_trains_and_predict_next(trains, test, model_class, params)

        results = metrics.generate_all_metrics(preds, test)

        for metric_name, value in results.items():
            mlflow.log_metric(metric_name, value)

        mlflow.log_param("dataset_name", dataset_name)
        mlflow.log_param("model_name", model_name)
        mlflow.log_param("params", params)
        mlflow.log_param("n_train_points", dataset.train_size)

        preds.to_csv(filepath)
        mlflow.log_artifact(filepath)

    return metrics.rmse(preds, test)


def get_best_study_params() -> dict:
    dataset_name = "AIR_PASSENGERS"
    model_name = "SARIMA_SVR"

    study_name = f"{dataset_name}/{model_name}/v2"
    study = optuna.create_study(
        study_name=study_name,
        direction="minimize",
        storage="sqlite:///optuna.db",
        load_if_exists=True,
    )
    print(study.best_params)
    return study.best_params


def tune_hyperparameters_with_optuna():
    dataset_name = "AIR_PASSENGERS"
    model_name = "SARIMA_SVR"

    study_name = f"{dataset_name}/{model_name}/v2"

    dataset = DATASET_FACTORY_LOOKUP[dataset_name]()
    train = dataset.iloc[:dataset.train_size]

    best_value, best_trial = tune_hyperparameters(
        study_name=study_name,
        model_class=ModelClassLookupCallback(model_name, dataset.period),
        train=train,
        n_trials=100,
    )

    print(best_value, best_trial.params)


In [None]:
# tune_hyperparameters_with_optuna()
# get_best_study_params()
# train_and_test_model()


In [None]:
class RNNModel:

    def __init__(self,
                 timesteps: int,
                 hidden_units: int,
                 epochs: int,
                 m: int = None):
        self.model = None
        self.timesteps = timesteps
        self.hidden_units = hidden_units
        self.epochs = epochs
        self.x_input = None

    def _create_model(self) -> keras.Sequential:
        model = keras.Sequential()
        model.add(
            layers.SimpleRNN(
                units=self.hidden_units,
                activation="relu",
                input_shape=(self.timesteps, 1),
            ))
        model.add(layers.Dense(1))
        model.compile(optimizer="adam", loss="mse")
        return model

    def fit(self, X: np.array, y: np.array) -> None:
        self.model = self._create_model()
        self.model.fit(X, y, epochs=self.epochs, verbose=0)

    def predict_one_ahead(self) -> float:
        return self.model.predict(self.x_input).reshape(-1)[0]

    @staticmethod
    def suggest_params(trial: optuna.Trial) -> dict:
        timesteps = trial.suggest_categorical("timesteps",
                                              [2, 4, 6, 12, 24, 36])
        hidden_units = trial.suggest_int("hidden_units", 5, 25, 5)
        epochs = trial.suggest_int("epochs", 300, 800, 100)

        return {
            "timesteps": timesteps,
            "hidden_units": hidden_units,
            "epochs": epochs
        }


In [None]:
dataset_name = "AIR_PASSENGERS"

dataset = DATASET_FACTORY_LOOKUP[dataset_name]()
model_class = HybridModel
params = {
    'first_model': SARIMAModel,
    'second_model': RNNModel,
    'first_model_params': {
        "m": 12
    },
    'second_model_params': {
        "epochs": 700,
        "hidden_units": 25,
        "timesteps": 12
    },
}

trains, test = split_trains_test(
    dataset, AnchoredSplitter(min_train_points=dataset.train_size))
preds = fit_trains_and_predict_next(trains, test, model_class, params)

results = metrics.generate_all_metrics(preds, test)