In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
path = "/content/drive/MyDrive/時系列分析/"
file_name = "sample_sales_data.csv"
file_path = path + file_name
file_path

In [None]:
import pandas as pd
import numpy as np
import os # サンプルデータ作成のためにインポート

def transform_time_series_data_auto_columns_indexed(file_path):
    """
    時系列データを読み込み、日付と数値カラムを自動判別して
    月別、週別、日別に集計し、結果のDataFrameのインデックスに日付データを配置します。

    Args:
        file_path (str): 読み込むCSVファイルのパス。

    Returns:
        tuple: (monthly_data, weekly_data, daily_data)
               - monthly_data (pd.DataFrame): 月別に集計されたデータ（日付がインデックス）。
               - weekly_data (pd.DataFrame): 週別に集計されたデータ（日付がインデックス）。
               - daily_data (pd.DataFrame): 日別に集計されたデータ（日付がインデックス）。
               いずれかのデータフレームの生成に失敗した場合はNoneを返します。
    """
    try:
        df = pd.read_csv(file_path)

        # 1. 日付カラムの自動特定
        date_column = None
        for col in df.columns:
            try:
                temp_series = pd.to_datetime(df[col], errors='coerce')
                # 80%以上の非NaN値があり、かつ50%以上のユニーク値があるカラムを日付として認識
                if temp_series.notna().sum() > len(df) * 0.8 and temp_series.nunique() > len(df) * 0.5:
                    date_column = col
                    break
            except Exception:
                continue

        if date_column is None:
            print("エラー: データ内で日付として認識できるカラムが見つかりませんでした。")
            return None, None, None

        df[date_column] = pd.to_datetime(df[date_column])
        df = df.set_index(date_column) # ここで日付をインデックスに設定

        # 2. 数値カラムの自動特定
        numeric_columns = [col for col in df.columns if pd.api.types.is_numeric_dtype(df[col])]

        if not numeric_columns:
            print("エラー: 集計対象となる数値カラムが見つかりませんでした。")
            return None, None, None

        print(f"日付カラムとして '{date_column}' を特定しました。")
        print(f"数値カラムとして {numeric_columns} を特定しました。")

        # 3. 期間別の集計
        # ここで .reset_index() を削除することで、日付がインデックスとして残ります
        monthly_data = df[numeric_columns].resample('MS').sum()
        weekly_data = df[numeric_columns].resample('W').sum()
        daily_data = df[numeric_columns].resample('D').sum()

        # インデックスが日付となるため、列名の調整は不要に(indexに日付が入らないならば、コメント削除)
        # monthly_data.rename(columns={date_column: '年月'}, inplace=True) # 不要
        # weekly_data.rename(columns={date_column: '週'}, inplace=True)     # 不要
        # daily_data.rename(columns={date_column: '日付'}, inplace=True)   # 不要

        return monthly_data, weekly_data, daily_data

    except FileNotFoundError:
        print(f"エラー: ファイル '{file_path}' が見つかりません。パスが正しいか確認してください。")
        return None, None, None
    except Exception as e:
        print(f"予期せぬエラーが発生しました: {e}")
        return None, None, None

In [None]:
monthly_data, weekly_data, daily_data = transform_time_series_data_auto_columns_indexed(file_path)
print("月別")
print(monthly_data.head())
print("週別")
print(weekly_data.head())
print("日別")
print(daily_data.head())

In [None]:
pip install -q pmdarima

In [None]:
pip install -q numpy==1.26.4

In [None]:
pip install -q sktime

In [None]:
# 数値計算に使うライブラリ
import numpy as np
import pandas as pd
from scipy import stats

# グラフを描画するライブラリ
from matplotlib import pyplot as plt
import seaborn as sns
sns.set()

# 統計モデルを推定するライブラリ
import statsmodels.api as sm
import statsmodels.tsa.api as tsa
import statsmodels.formula.api as smf
import pmdarima as pm

# 予測
from sktime.forecasting.arima import AutoARIMA

# 予測の評価指標
from sktime.performance_metrics.forecasting import (
    mean_absolute_scaled_error, MeanAbsoluteError,
    mean_absolute_percentage_error, mean_absolute_error
)

# グラフの日本語表記
from matplotlib import rcParams
rcParams['font.family'] = 'sans-serif'
rcParams['font.sans-serif'] = 'Meiryo'

In [None]:
# 表示設定
np.set_printoptions(linewidth=60)
pd.set_option('display.width', 80)

from matplotlib.pylab import rcParams
rcParams['figure.figsize'] = 8, 4

In [None]:
mod_sarimax_best = pm.arima.auto_arima(
    y = np.log(monthly_data['Sales']),                  # データ
    X = monthly_data[['Customers']],  # 外生変数
    test='kpss',              # KPSS検定で、差分をとる階数を決める
    seasonal_test='ocsb',     # OCSB検定で、季節差分をとる階数を決める
    criterion='AIC',          # AICで変数選択
    m=3,                     # 周期は12
    max_p=2, max_q=2, max_P=2, max_Q=2,              # 最大次数
    start_p=0, start_q=0, start_Q=0, start_P=0,      # 開始次数
    stepwise=False,          # 総当たりでAICを比較
    n_jobs=-1,               # 使える限りのコアを使って並列化 
    maxiter=5000,            # パラメータ推定のときの設定(反復回数を5000回に増やす)
    with_intercept=False,    # 切片なしのモデルにする
    solver='nm',              # パラメータ推定のときの設定(最適化の手法を変更)
    error_action='warn',
    suppress_warnings=True
)

In [None]:
import pmdarima as pm
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

def train_and_visualize_sarimax(data, y_variable, exogenous_variables=None, test_size=0.2, m=3):
    """
    SARIMAXモデルを訓練し、結果を可視化する関数。

    Args:
        data (pd.DataFrame): 時系列データを含むDataFrame。
        y_variable (str): 目的変数として使用する列名。
        exogenous_variables (list, optional): 外生変数として使用する列名のリスト。デフォルトはNone。
        test_size (float, optional): テストデータの割合。デフォルトは0.2 (20%)。
        m (int, optional): 季節周期。デフォルトは3。

    Returns:
        tuple: (訓練済みモデル, 予測結果, 実際のテストデータ)
    """

    # データを訓練用とテスト用に分割
    train_size = int(len(data) * (1 - test_size))
    train_data = data.iloc[:train_size]
    test_data = data.iloc[train_size:]

    y_train = np.log(train_data[y_variable])
    y_test = np.log(test_data[y_variable])

    X_train = train_data[exogenous_variables] if exogenous_variables else None
    X_test = test_data[exogenous_variables] if exogenous_variables else None

    print(f"訓練データの期間: {train_data.index.min()} から {train_data.index.max()}")
    print(f"テストデータの期間: {test_data.index.min()} から {test_data.index.max()}")
    print(f"訓練データ数: {len(train_data)}")
    print(f"テストデータ数: {len(test_data)}")

    # SARIMAXモデルの自動選択と訓練
    print("\nSARIMAXモデルの訓練を開始します...")
    mod_sarimax_best = pm.arima.auto_arima(
        y = y_train,                  # 訓練データ
        X = X_train,                  # 訓練用の外生変数
        test='kpss',                  # KPSS検定で、差分をとる階数を決める
        seasonal_test='ocsb',         # OCSB検定で、季節差分をとる階数を決める
        criterion='AIC',              # AICで変数選択
        m=m,                          # 周期
        max_p=2, max_q=2, max_P=2, max_Q=2, # 最大次数
        start_p=0, start_q=0, start_Q=0, start_P=0, # 開始次数
        stepwise=False,               # 総当たりでAICを比較
        n_jobs=-1,                    # 使える限りのコアを使って並列化
        maxiter=5000,                 # パラメータ推定のときの設定(反復回数を5000回に増やす)
        with_intercept=False,         # 切片なしのモデルにする
        solver='nm',                  # パラメータ推定のときの設定(最適化の手法を変更)
        error_action='warn',
        suppress_warnings=True
    )
    print("SARIMAXモデルの訓練が完了しました。")
    print(mod_sarimax_best.summary())

    # テストデータでの予測
    print("\nテストデータでの予測を実行します...")
    # n_periodsは予測する期間の長さ、Xは予測時の外生変数
    forecast, conf_int = mod_sarimax_best.predict(
        n_periods=len(y_test),
        X=X_test,
        return_conf_int=True
    )

    # 対数変換を元に戻す (指数関数を適用)
    forecast_exp = np.exp(forecast)
    y_test_exp = np.exp(y_test)
    lower_bound = np.exp(conf_int[:, 0])
    upper_bound = np.exp(conf_int[:, 1])

    # 予測結果の可視化
    print("\n予測結果を可視化します...")
    plt.figure(figsize=(14, 7))
    plt.plot(train_data.index, np.exp(y_train), label='訓練データ (実測値)', color='blue')
    plt.plot(test_data.index, y_test_exp, label='テストデータ (実測値)', color='green')
    plt.plot(test_data.index, forecast_exp, label='予測値', color='red', linestyle='--')
    plt.fill_between(test_data.index, lower_bound, upper_bound, color='red', alpha=0.1, label='信頼区間')
    plt.title(f'{y_variable} のSARIMAXモデルによる予測と実測値の比較')
    plt.xlabel('日付')
    plt.ylabel(y_variable)
    plt.legend()
    plt.grid(True)
    plt.show()

    return mod_sarimax_best, forecast_exp, y_test_exp

# 使用例
if __name__ == "__main__":
    # サンプルデータの作成 (実際のデータに置き換えてください)
    # ここでは月次データであることを想定し、PandasのDatetimeIndexを使用
    index = pd.date_range(start='2018-01-01', periods=100, freq='MS')
    monthly_data = pd.DataFrame({
        'Sales': np.random.randint(100, 500, size=100) + np.sin(np.linspace(0, 3*np.pi, 100))*50 + np.arange(100)*2,
        'Customers': np.random.randint(50, 300, size=100) + np.cos(np.linspace(0, 2*np.pi, 100))*30 + np.arange(100)*1.5,
        'Cxxx': np.random.randint(5, 300, size=100) + np.cos(np.linspace(0, 2*np.pi, 100))*30 + np.arange(100)*1.8
    }, index=index)

    # 目的変数を 'Sales'、外生変数を 'Customers' として関数を実行
    # 季節周期を12（年周期）と仮定して試します。データが月次であればm=12が適切です。
    # ユーザーの元のコードではm=3となっていましたので、例としてm=3も試せるようにしています。
    # ここでは例としてm=12とします。
    trained_model, predictions, actuals = train_and_visualize_sarimax(
        data=monthly_data,
        y_variable='Sales',
        exogenous_variables=['Customers','Cxxx'],
        test_size=0.2, # 例として20%をテストデータとする
        m=12 # 月次データなので季節周期を12に設定
    )

    print("\n--- 訓練結果の概要 ---")
    print("モデルのAIC:", trained_model.aic())

    # 外生変数なしの場合の例
    # print("\n--- 外生変数なしのモデル訓練例 ---")
    # trained_model_no_exog, predictions_no_exog, actuals_no_exog = train_and_visualize_sarimax(
    #     data=monthly_data,
    #     y_variable='Sales',
    #     test_size=0.2,
    #     m=12
    # )

In [None]:
# 残差診断
_ = trained_model.plot_diagnostics(lags=30, figsize=(15, 8))

In [None]:
# 残差の正規性の検定(Jarque-Bera検定)
# 1つ目がJB統計量
# 2つ目がp値
# 3つ、4つ目が歪度と尖度
trained_model.arima_res_.test_normality(method='jarquebera')