In [1]:
# 数値計算に使うライブラリ
import numpy as np
import pandas as pd
from scipy import stats
from scipy.optimize import minimize

# 統計モデルを推定するライブラリ
import statsmodels.api as sm
import statsmodels.tsa.api as tsa

# グラフを描画するライブラリ
from matplotlib import pylab as plt
import seaborn as sns

sns.set()

In [2]:
# 表示設定
np.set_printoptions(linewidth=60)
pd.set_option("display.width", 80)

from matplotlib.pylab import rcParams

rcParams["figure.figsize"] = 8, 4

In [3]:
# 乱数の種
np.random.seed(1)

# 正規分布に従う乱数の累積和を作成し、ランダムウォーク系列を作る
sim_size = 100
mu = np.cumsum(stats.norm.rvs(loc=0, scale=1, size=sim_size).round(1)) + 30

# 観測値の作成
y = mu + stats.norm.rvs(loc=0, scale=5, size=sim_size).round(1)

# 時系列インデックスの付与
y_ts = pd.Series(y, index=pd.date_range(start="2020-01-01", periods=sim_size, freq="D"))

In [4]:
class LocalLevel:
    # データを格納(pd.Seriesで、日付インデックスがついている想定)
    def __init__(self, ts_data):
        self.ts_data = ts_data
        self.a = pd.Series(np.zeros(len(ts_data)), index=ts_data.index)
        self.P = pd.Series(np.zeros(len(ts_data)), index=ts_data.index)
        self.v = pd.Series(np.zeros(len(ts_data)), index=ts_data.index)
        self.F = pd.Series(np.zeros(len(ts_data)), index=ts_data.index)
        self.K = pd.Series(np.zeros(len(ts_data)), index=ts_data.index)
        self.s_level = None  # 過程誤差の分散
        self.s_irregular = None  # 観測誤差の分散

    # 状態の初期値を設定する
    def initialize(self, initial_a, initial_P):
        self.initial_a = initial_a
        self.initial_P = initial_P

    # 1時点先の予測値を計算する
    def _forecast_step(self, a_pre, P_pre, s_irregular, s_level, first=False):
        if first:
            a_forecast = self.initial_a  # 初回に限り、初期値を代入
            P_forecast = self.initial_P  # 初回に限り、初期値を代入
        else:
            a_forecast = a_pre  # 状態の予測値
            P_forecast = P_pre + s_level  # 状態の予測値の分散

        y_forecast = a_forecast  # 観測値の予測値
        F = P_forecast + s_irregular  # 観測値の予測値の残差の分散

        return pd.Series(
            [a_forecast, P_forecast, y_forecast, F], index=["a", "P", "y", "F"]
        )

    # 1時点のフィルタリングをする
    def _filter_step(self, forecasted, y, s_irregular):
        v = y - forecasted.y  # 観測値の1時点先予測値の残差
        K = forecasted.P / forecasted.F  # カルマンゲイン
        a_filter = forecasted.a + K * v  # フィルタ化推定量（更新後の状態推定値）
        P_filter = (1 - K) * forecasted.P  # フィルタ化推定量の分散（更新後の不確実性）

        return pd.Series([a_filter, P_filter, v, K], index=["a", "P", "v", "K"])

    # フィルタリングを行う
    def filter(self, s_irregular, s_level):
        for i in range(0, len(self.ts_data)):
            if i == 0:
                # 初回のみ、初期値の値を利用して予測する
                forecast_loop = self._forecast_step(
                    a_pre=None,
                    P_pre=None,
                    s_irregular=s_irregular,
                    s_level=s_level,
                    first=True,
                )
            else:
                # 2時点目以降は、1時点前の推定結果を参照して予測する
                forecast_loop = self._forecast_step(
                    a_pre=self.a.iloc[i - 1],
                    P_pre=self.P.iloc[i - 1],
                    s_irregular=s_irregular,
                    s_level=s_level,
                )

            # フィルタリングの実行（予測と観測の融合）
            filter_loop = self._filter_step(
                forecasted=forecast_loop,
                y=self.ts_data.iloc[i],
                s_irregular=s_irregular,
            )

            # 結果の保存
            self.a.iloc[i] = filter_loop.a  # 更新後の状態推定値
            self.P.iloc[i] = filter_loop.P  # 更新後の分散
            self.F.iloc[i] = forecast_loop.F  # 残差分散
            self.K.iloc[i] = filter_loop.K  # カルマンゲイン
            self.v.iloc[i] = filter_loop.v  # 観測誤差（残差）

    # 対数尤度の計算
    def llf(self):
        # 正規分布を仮定した尤度の対数和
        return np.sum(np.log(stats.norm.pdf(x=self.v, loc=0, scale=np.sqrt(self.F))))

    # パラメータの推定と状態の再当てはめ
    def fit(self, start_params):
        # 内部関数：パラメータを指定して対数尤度の-1倍を計算
        def calc_llf(params):
            self.filter(
                np.exp(params[0]), np.exp(params[1])
            )  # 対数パラメータを指数変換
            return self.llf() * -1  # 最適化では最小化を行うため、負の対数尤度を返す

        # Nelder-Mead法による最適化
        opt_res = minimize(
            calc_llf,
            start_params,
            method="Nelder-Mead",
            tol=1e-6,
            options={"maxiter": 2000},
        )

        # 推定されたパラメータの保存
        self.s_irregular = np.exp(opt_res.x[0])  # 観測誤差の分散
        self.s_level = np.exp(opt_res.x[1])  # 過程誤差の分散

        # 最適パラメータで再度フィルタリングを実行
        self.filter(self.s_irregular, self.s_level)

    # 推定された状態の可視化
    def plot_level(self):
        # 観測値とフィルタ化推定値を並べてプロット
        plot_df = pd.concat([self.a, self.ts_data], axis=1)
        plot_df.columns = ["filtered", "y"]
        plot_df.plot(title="状態推定値（filtered）と観測値（y）")