In [1]:
import os
import random
import numpy
import pandas
import seaborn as sns

from matplotlib import pyplot

def set_seeds(seed: int):
    assert seed > 0
    random.seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)
    numpy.random.seed(seed)


set_seeds(seed=42)

sns.set_theme(font="IPAexGothic", font_scale=2)

pyplot.rcParams["figure.figsize"] = (20, 10)


In [2]:
import optuna

from prophet import Prophet
from prophet.diagnostics import cross_validation
from prophet.diagnostics import performance_metrics


In [3]:
def build_prophet_model(is_longterm=True, **params) -> Prophet:
   print(f"build_prophet_model: {is_longterm=} {params}")
   model = Prophet(**params, yearly_seasonality=4)
   model.add_seasonality(name='triennial', period=365.25*3, fourier_order=1)
   model.add_seasonality(name='kitchen', period=365.25/12*40, fourier_order=1)
   if is_longterm:
      model.add_seasonality(name='quinquennial', period=365.25*5, fourier_order=1)
      model.add_seasonality(name='decennial_09', period=365.25*9, fourier_order=1)
      model.add_seasonality(name='decennial_10', period=365.25*10, fourier_order=1)
   return model


In [4]:
class Evaluator(object):
   def __init__(self, df: pandas.DataFrame, n_horizon: int=365.25, freq="3MS", horizon_scaler: float =3) -> None:
      self.df: pandas.DataFrame = df
      self.n_horizon: int = n_horizon  # cv prediction range
      self.freq = freq     # cutoff freq
      self.horizon_scaler = horizon_scaler
      self.max_changepoints = self.df.shape[0]

   def objective_value(self, trial: optuna.Trial) -> float:
      params = {
               "growth" : 
                  trial.suggest_categorical("growth", ["linear", "logistic"]),
               "changepoint_range" : 
                  trial.suggest_float("changepoint_range", 0.8, 1.0),
               "n_changepoints" : 
                  trial.suggest_int("n_changepoints", 1, self.max_changepoints),
               "changepoint_prior_scale" : 
                  trial.suggest_float("changepoint_prior_scale", 0.001, 5),
               "seasonality_prior_scale" : 
                  trial.suggest_float("seasonality_prior_scale", 0.01, 10),
               "seasonality_mode" : 
                  trial.suggest_categorical("seasonality_mode", ["additive", "multiplicative"])
               }

      model: Prophet = build_prophet_model(**params)
      model.fit(self.df)
      __df_cv, df_pm = self.run_cross_validation(model=model)
      n = df_pm.shape[0]
      # NOTE: 
      #     - rmse: horizon が長くなる(index が後になる)とエラー幅が増加するので、
      #             差に敏感な rmse の後半を多めに評価するように逆順で累積する
      #     - mae : rmse と同じく、horizon が短い間も精度が高くないと困るので、
      #             前半を多めに評価するように mae を累積する
      score = numpy.cumsum(df_pm['rmse'].values[::-1]).mean() + numpy.cumsum(df_pm['mae'].values).mean()
      score /= n     # for intepretability of `score` in optuna.visualizaion

      return score

   def run_cross_validation(self, model: Prophet) -> tuple[pandas.DataFrame, pandas.DataFrame]:
      n_horizon = self.n_horizon
      date_start = self.df.ds.max() - pandas.Timedelta(days=n_horizon * self.horizon_scaler)
      date_end = self.df.ds.max() - pandas.Timedelta(days=n_horizon)
      cutoffs = pandas.date_range(start=date_start, end=date_end, freq=self.freq)

      # run cv and metrics
      df_cv = cross_validation(model, cutoffs=cutoffs, horizon=f"{n_horizon} days", parallel="processes")
      df_pm = performance_metrics(df_cv)

      # store context
      self.date_start = date_start
      self.date_end = date_end
      self.cutoffs = cutoffs

      return df_cv, df_pm


In [5]:
from dataclasses import dataclass


In [6]:
@dataclass
class Limitter(object):
    df: pandas.DataFrame

    def __post_init__(self):
        df = self.df
        self.floor = df.y.min() - 3 * df.y.std()
        self.cap = df.y.max() + 3 * df.y.std()


def setup_limit(df: pandas.DataFrame, lmt: Limitter, inplace=False) -> pandas.DataFrame:
    _df = df
    if not inplace:
        _df = df.copy()
    _df["floor"] = lmt.floor
    _df["cap"] = lmt.cap
    return _df


In [7]:
@dataclass
class BestEstimator(object):
    df: pandas.DataFrame
    model: Prophet
    evaluator: Evaluator
    study: optuna.Study
    df_cv: dict
    df_pm: dict
    future: pandas.DataFrame | None = None
    forecast: pandas.DataFrame | None = None


In [8]:
def setup_df_index(df: pandas.DataFrame, col="ds", do_drop: bool=True):
    df.index = df[col]
    if do_drop:
        df.drop(col, axis=1, inplace=True)
    return df


In [None]:
def convert_wareki_to_seireki(wareki_date):
    """
    和暦を西暦に変換する関数。
    """
    era = wareki_date[0]
    year, month, day = map(int, wareki_date[1:].split('.'))

    if era == "S":  # 昭和
        seireki_year = 1925 + year
    elif era == "H":  # 平成
        seireki_year = 1988 + year
    elif era == "R":  # 令和
        seireki_year = 2018 + year
    else:
        raise ValueError(f"Unknown era: {era}")

    return f"{seireki_year}-{month:02}-{day:02}"
