# Prophetを活用したモデル構築

Prophetを活用したモデルトレーニングおよびMLflowトラッキングを行います

**要件**
クラスタ Runtime 15.4 ML LTS以上を使用してください。

<!-- %md
### Model Building Using Prophet
We will perform model training using Prophet and track the results with MLflow. -->


<img src='https://github.com/komae5519pv/komae_dbdemos/blob/main/fine_grain_forecast_20241013/Customized_e2e_demand_forecasting/_image_for_notebook/model_train.png?raw=true' width='1200'/>

## Step 1: 事前準備

需要予測で人気の高いライブラリ、[prophet](https://facebook.github.io/prophet/) を使用します。

In [0]:
%pip install prophet
dbutils.library.restartPython()

In [0]:
%run ./00_config

In [0]:
from pyspark.sql import functions as F
from pyspark.sql.types import *
from pyspark.sql.functions import current_date, col
import mlflow
import mlflow.sklearn
import mlflow.prophet
from prophet import Prophet
from sklearn.metrics import mean_absolute_error, mean_squared_error, mean_absolute_percentage_error
from sklearn.ensemble import RandomForestRegressor
from math import sqrt
from datetime import datetime, date
import pandas as pd
import matplotlib.pyplot as plt
import logging
import re
import os

## Step 2: 単一の予測をしてみる

自動販売機とアイテムの組み合わせごとに予測を生成する前に、
Prophetの使い方に慣れるために特定の自動販売機と商品の組み合わせを選択します。

<!-- %md
## Step 2: Build a Single Forecast

Before generating forecasts for each store-item combination, select a specific store and item combination to get familiar with how Prophet works: -->

In [0]:
sql_statement = '''
  SELECT
    vm,
    item,
    CAST(ds as date) as ds,
    y
  FROM silver_train
  WHERE vm=1 AND item=1    -- 単一店舗x単一商品
  ORDER BY ds
  '''

# assemble dataset in Pandas dataframe
history_pd = spark.sql(sql_statement).toPandas()

# drop any missing records
history_pd = history_pd.dropna()

display(history_pd)

次に、prophetライブラリをインポートしますが、使用時にやや冗長になるため、環境のログ設定を調整する必要があります。

<!-- %md
Now, we will import the prophet library, but because it can be a bit verbose when in use, we will need to fine-tune the logging settings in our environment: -->

In [0]:
# disable informational messages from prophet
logging.getLogger('py4j').setLevel(logging.ERROR)

データを確認した結果、成長パターンは線形に設定し、週と年の季節性パターンを有効にします。売上が増えるにつれて季節性も強くなるようなので、季節性モードは乗法に設定します。

<!-- %md
Based on our review of the data, it looks like we should set our overall growth pattern to linear and enable the evaluation of weekly and yearly seasonal patterns. We might also wish to set our seasonality mode to multiplicative as the seasonal pattern seems to grow with overall growth in sales: -->

In [0]:
# set model parameters
model = Prophet(
  interval_width=0.95,                # 信頼区間の幅を95%に設定（予測の不確実性範囲）
  growth='linear',                    # トレンド成長モデルを線形（linear）に設定
  daily_seasonality=False,            # 日次の季節性を無効化
  weekly_seasonality=True,            # 週次の季節性を有効化
  yearly_seasonality=True,            # 年次の季節性を有効化
  seasonality_mode='multiplicative'   # 季節性の影響を「乗法的」に設定（トレンドが大きいほど季節性の変動が大きくなる）
  )

# fit the model to historical data
model.fit(history_pd)

モデルのトレーニングが完了したので、それを使って90日間の予測を作成しましょう。

<!-- %md
Now that we have a trained model, let's use it to build a 90-day forecast: -->

In [0]:
# define a dataset including both historical dates & 90-days beyond the last available date
# 将来予測用の日付データを生成
future_pd = model.make_future_dataframe(
  periods=90,             # 過去データの終了日から90日先までの日付を生成
  freq='d',               # 日単位の頻度でデータを生成
  include_history=True    # 過去データも含める（予測と過去データを比較可能にするため）
  )

# predict over the dataset
forecast_pd = model.predict(future_pd)

display(forecast_pd)

モデルの結果はどうでしょうか？ここでは、モデルの全体的なトレンドと季節的なトレンドがグラフとして表示されています。

<!-- %md
How did our model perform? Here we can see the general and seasonal trends in our model presented as graphs: -->

In [0]:
trends_fig = model.plot_components(forecast_pd)
display(trends_fig)

ここでは、実際のデータと予測データの比較、および将来の予測を見ることができます。  
ただし、グラフが読みやすいように、過去1年分のデータに限定しています。

<!-- %md
And here, we can see how our actual and predicted data line up as well as a forecast for the future, though we will limit our graph to the last year of historical data just to keep it readable: -->

In [0]:
# Plot the forecast results
predict_fig = model.plot(forecast_pd, xlabel='date', ylabel='sales')

# Adjust the graph: show the past year and 90-day forecast
xlim = predict_fig.axes[0].get_xlim()                   # Get the current range of the X-axis
new_xlim = (xlim[1] - (180.0 + 365.0), xlim[1] - 90.0)  # Set the new range for the X-axis
predict_fig.axes[0].set_xlim(new_xlim)                  # Apply the new X-axis range

**補足:** 黒い点は実際のデータを表し、濃い青の線は予測、薄い青の帯は95%の予測区間を示しています。

<!-- %md
**NOTE** The black dots represent our actuals with the darker blue line representing our predictions and the lighter blue band representing our (95%) uncertainty interval. -->

視覚的な確認も有用ですが、予測モデルを適切に評価するために、  
実績に対する予測値の平均絶対誤差（MAE）、平均二乗誤差（MSE）、および二乗平均平方根誤差（RMSE）の値を計算します。

<!-- %md
- Visual inspection is useful, but to properly evaluate the forecast model, we will calculate the Mean Absolute Error (MAE), Mean Squared Error (MSE), and Root Mean Squared Error (RMSE) values for the predicted values compared to the actuals. -->

In [0]:
# get historical actuals & predictions for comparison
actuals_pd = history_pd[ history_pd['ds'] < date(2018, 1, 1) ]['y']
predicted_pd = forecast_pd[ forecast_pd['ds'] < pd.to_datetime('2018-01-01') ]['yhat']

# calculate evaluation metrics
mae = mean_absolute_error(actuals_pd, predicted_pd)   # 実績値と予測値の絶対誤差の平均 (数値が小さいほど良い予測 / 外れ値の影響を受けにくい)
mse = mean_squared_error(actuals_pd, predicted_pd)    # 実績値と予測値の差を二乗して平均したもの (外れ値の影響を強く受ける)
rmse = sqrt(mse)                                      # MSEの平方根を取ったもの(実績値と予測値の差の「標準的な大きさ」を示す)

# print metrics to the screen
print( '\n'.join(['MAE: {0}', 'MSE: {1}', 'RMSE: {2}']).format(mae, mse, rmse) )

prophetは、予測が時間経過とともにどのように維持されるかを評価するための[追加の手段](https://facebook.github.io/prophet/docs/diagnostics.html)を提供しています。  
予測モデルを構築する際には、これらの手法の使用を強くお勧めしますが、今回はスケーリングの課題に焦点を当てるため、それらについては省略します。

<!-- %md
prophet provides [additional means](https://facebook.github.io/prophet/docs/diagnostics.html) for evaluating how your forecasts hold up over time. You're strongly encouraged to consider using these and those additional techniques when building your forecast models but we'll skip this here to focus on the scaling challenge. -->

## Step 3: 予測生成のスケールアップ

基本的な手順を理解したので、主目的である自動販売機とアイテムの組み合わせごとに多数の詳細なモデルと予測を構築してます。まず、自動販売機・アイテム・日付単位で販売データを集計します。

**注意**: このデータセットのデータはすでにこの粒度で集計されているはずですが、期待通りのデータ構造を持っていることを確認するために、改めて明示的に集計します。


<!-- %md
## Step 3: Scale Forecast Generation

With the mechanics under our belt, let's now tackle our original goal of building numerous, fine-grain models & forecasts for individual store and item combinations.  We will start by assembling sales data at the store-item-date level of granularity:

**NOTE**: The data in this data set should already be aggregated at this level of granularity but we are explicitly aggregating to ensure we have the expected data structure. -->


### 3-1. トレーニングデータ準備

In [0]:
sql_statement = '''
  SELECT
    vm,
    item,
    CAST(ds as date) as ds,
    SUM(y) as y
  FROM silver_train
  GROUP BY vm, item, ds
  ORDER BY vm, item, ds
  '''

vm_item_history = (
  spark
    .sql( sql_statement )
    # .repartition(sc.defaultParallelism, ['vm', 'item'])
    .repartition(sc.defaultParallelism, 'vm', 'item')
  ).cache()

# # 特徴量テーブルとして保存
# vm_item_history.write.format('delta').mode('overwrite').saveAsTable('silver_train')

display(vm_item_history)

In [0]:
display(vm_item_history.count())

自動販売機・アイテム・日付単位でデータを集計したので、これをどのようにprophetに渡すかを考えます。各自動販売機とアイテムの組み合わせごとにモデルを作るためには、データから自動販売機とアイテムのサブセットを渡し、そのサブセットでモデルを学習させ、予測を得ます。予測結果には、自動販売機とアイテムの識別子を保持し、Prophetモデルが生成した必要なフィールドだけが含まれます。

<!-- %md
With our data aggregated at the store-item-date level, we need to consider how we will pass our data to prophet. If our goal is to build a model for each store and item combination, we will need to pass in a store-item subset from the dataset we just assembled, train a model on that subset, and receive a store-item forecast back. We'd expect that forecast to be returned as a dataset with a structure like this where we retain the store and item identifiers for which the forecast was assembled and we limit the output to just the relevant subset of fields generated by the Prophet model: -->


### 3-2. MLflow トラッキングのロジック定義

指定したカタログ、スキーマ配下のモデルを全て削除

In [0]:
# from databricks.sdk import WorkspaceClient
# from mlflow.tracking import MlflowClient

# def delete_specific_models(catalog_name, schema_name, prefix):
#     workspace_client = WorkspaceClient()
#     mlflow_client = MlflowClient()
    
#     models = workspace_client.registered_models.list(
#         catalog_name=catalog_name,
#         schema_name=schema_name
#     )
    
#     for model in models:
#         full_name = model.full_name
#         if full_name.startswith(prefix):
#             print(f"Deleting model: {full_name}")
#             mlflow_client.delete_registered_model(name=full_name)
#         else:
#             pass
#             # print(f"Skipping model: {full_name}")
    
#     print("Deletion process completed.")

# # 使用例
# delete_specific_models(MY_CATALOG, MY_SCHEMA, f"{MY_CATALOG}.{MY_SCHEMA}.demand_forecast_vm_")

Experimentを設定します

In [0]:
# UC配下のモデルとして登録
mlflow.set_registry_uri("databricks-uc")

In [0]:
# MLflowのシステムメトリクスロギングを無効にします
mlflow.disable_system_metrics_logging()

In [0]:
from datetime import datetime

# 現在の日付をyyyymmdd形式で取得
current_datetime = datetime.now().strftime("%Y%m%d_%H%M%S")

# Set MLflow Experiment
experiment_name = f"/Shared/{MY_CATALOG}_demand_forecast_{current_datetime}"
# experiment_name = f"/Shared/komae_demand_forecast_{current_datetime}"
experiment_id = mlflow.set_experiment(experiment_name).experiment_id

print(f'Experiment Name: {experiment_name}')
print(f'Experiment ID: {experiment_id}')

モデルをトレーニングして予測を作成するために、Pandas関数を使います。この関数は、自動販売機とアイテムごとに整理されたデータを受け取り、予測結果を返します。

この関数では、モデルのトレーニングや予測の部分は以前と同じで、新しいのは予測結果をPandasでまとめる標準的な処理だけです。

<!-- %md
To train the model and generate a forecast we will leverage a Pandas function.  We will define this function to receive a subset of data organized around a store and item combination.  It will return a forecast in the format identified in the previous cell:

**UPDATE** With Spark 3.0, pandas functions replace the functionality found in pandas UDFs.  The deprecated pandas UDF syntax is still supported but will be phased out over time.  For more information on the new, streamlined pandas functions API, please refer to [this document](https://databricks.com/blog/2020/05/20/new-pandas-udfs-and-python-type-hints-in-the-upcoming-release-of-apache-spark-3-0.html).
There's a lot taking place within our function, but if you compare the first two blocks of code within which the model is being trained and a forecast is being built to the cells in the previous portion of this notebook, you'll see the code is pretty much the same as before. It's only in the assembly of the required result set that truly new code is being introduced and it consists of fairly standard Pandas dataframe manipulations. -->

In [0]:
from sklearn.metrics import mean_absolute_error, mean_squared_error, mean_absolute_percentage_error
from math import sqrt
import numpy as np
import mlflow
from mlflow.models import infer_signature

# median_absolute_percentage_error関数の定義
def median_absolute_percentage_error(y_true, y_pred):
    y_true, y_pred = np.array(y_true), np.array(y_pred)
    return np.median(np.abs((y_true - y_pred) / y_true)) * 100

# train_modelの戻り値のスキーマ定義
result_schema = StructType([
    StructField('ds', DateType()),
    StructField('vm', IntegerType()),
    StructField('item', IntegerType()),
    StructField('y', FloatType()),
    StructField('yhat', FloatType()),
    StructField('yhat_upper', FloatType()),
    StructField('yhat_lower', FloatType()),
    StructField('mae', FloatType()),
    StructField('mse', FloatType()),
    StructField('rmse', FloatType()),
    StructField('mape', FloatType()),
    StructField('mdape', FloatType())
])

# Prophetモデルのトレーニングと予測、MLflowのロギング
def train_model(history_pd: pd.DataFrame) -> pd.DataFrame:
    '''
    グループごとにPrphetモデルトレーニング＆予測結果を戻す
    '''
    # ------------------------------
    # GET METADATA
    # ------------------------------
    vm = history_pd['vm'].iloc[0]
    item = history_pd['item'].iloc[0]
    run_id = history_pd["run_id"].iloc[0]  # Pulls run ID to do a nested run

    # ------------------------------
    # TRAIN THE MODEL
    # ------------------------------
    # ハイパーパラメータの設定
    interval_width = 0.95
    growth = 'linear'
    daily_seasonality = False
    weekly_seasonality = True
    yearly_seasonality = True
    seasonality_mode = 'multiplicative'

    # Prophetモデルの設定と学習
    model = Prophet(
        interval_width=interval_width,
        growth=growth,
        daily_seasonality=daily_seasonality,
        weekly_seasonality=weekly_seasonality,
        yearly_seasonality=yearly_seasonality,
        seasonality_mode=seasonality_mode
    )
    model.fit(history_pd)

    # --------------------------
    # BUILD FORECAST AS BEFORE
    # --------------------------
    # make predictions
    future_pd = model.make_future_dataframe(
        periods=90,
        freq='d',
        include_history=True
    )
    forecast_pd = model.predict(future_pd)

    # --------------------------------------
    # ASSEMBLE EXPECTED RESULT SET
    # --------------------------------------
    # get relevant fields from forecast
    f_pd = forecast_pd[ ['ds', 'yhat', 'yhat_upper', 'yhat_lower'] ].set_index('ds')

    # get relevant fields from history
    h_pd = history_pd[ ['ds', 'vm', 'item', 'y'] ].set_index('ds')

    # join history and forecast
    results_pd = f_pd.join( h_pd, how='left' )
    results_pd.reset_index(level=0, inplace=True)
  
    # get vm & item from incoming data set
    results_pd['vm'] = h_pd['vm'].iloc[0]
    results_pd['item'] = h_pd['item'].iloc[0]

    # メトリクス計算
    actuals_pd = history_pd[history_pd['ds'] < pd.to_datetime('2018-01-01')]['y']
    predicted_pd = forecast_pd[forecast_pd['ds'] < pd.to_datetime('2018-01-01')]['yhat']

    if len(actuals_pd) > 0 and len(predicted_pd) > 0:
        mae = mean_absolute_error(actuals_pd, predicted_pd)
        mse = mean_squared_error(actuals_pd, predicted_pd)
        rmse = sqrt(mse)
        mape = mean_absolute_percentage_error(actuals_pd, predicted_pd)
        mdape = median_absolute_percentage_error(actuals_pd, predicted_pd)

        results_pd['mae'] = mae
        results_pd['mse'] = mse
        results_pd['rmse'] = rmse
        results_pd['mape'] = mape
        results_pd['mdape'] = mdape
    else:
        print(f"Warning: No data available for metrics calculation for vm {vm} and item {item}")
        results_pd['mae'] = results_pd['mse'] = results_pd['rmse'] = results_pd['mape'] = results_pd['mdape'] = None

    # ------------------------------
    # RESUME THE TOP-LEVEL TRAINING
    # ------------------------------
    with mlflow.start_run(run_id=run_id) as outer_run:
        # Small hack for running as a job
        experiment_id = outer_run.info.experiment_id
        print(f"Current experiment_id = {experiment_id}")

        # Create a nested run for the vm and item
        with mlflow.start_run(run_name=f"vm_{vm}_item_{item}", nested=True, experiment_id=experiment_id) as run:

            # # ----------------------
            # # LOG MODEL
            # # ----------------------
            # # シグネチャの推論
            # signature = infer_signature(history_pd[['ds']], forecast_pd[['yhat', 'yhat_upper', 'yhat_lower']])
            
            # # モデルのロギング
            # registered_model_name = f"{MY_CATALOG}.{MY_SCHEMA}.demand_forecast_vm_{vm}_item_{item}"
            # mlflow.prophet.log_model(
            #     model, 
            #     f"prophet_model_vm_{vm}_item_{item}",
            #     signature=signature,
            #     registered_model_name=registered_model_name
            # )
            # # エイリアスの設定
            # client = mlflow.tracking.MlflowClient()

            # def get_latest_model_version(model_name):
            #     model_version_infos = client.search_model_versions(f"name = '{model_name}'")
            #     return max([model_version_info.version for model_version_info in model_version_infos])

            # latest_version = get_latest_model_version(registered_model_name)
            # client.set_registered_model_alias(registered_model_name, "Champion", latest_version)

            # print(f"Set 'Champion' alias to version {latest_version} of model {registered_model_name}")

            # ----------------------
            # LOG TAG
            # ----------------------
            mlflow.set_tag("vm", vm)
            mlflow.set_tag("item", item)
            mlflow.set_tag("mlflow.note.content", f"Demand forecast model for VM {vm} and Item {item}")

            # ----------------------
            # LOG HYPER PARAMETER
            # ----------------------
            mlflow.log_params({
                "interval_width": model.interval_width,
                "growth": model.growth,
                "daily_seasonality": model.daily_seasonality,
                "weekly_seasonality": model.weekly_seasonality,
                "yearly_seasonality": model.yearly_seasonality,
                "seasonality_mode": model.seasonality_mode
            })

            # ----------------------
            # LOG METRICS
            # ----------------------
            # メトリクスのロギング
            mlflow.log_metrics({
                "mae": mae,
                "mse": mse,
                "rmse": rmse,
                "mape": mape,
                "mdape": mdape
            })

            # Create a return pandas DataFrame that matches the schema above
            return_df = results_pd[['ds', 'vm', 'item', 'y', 'yhat', 'yhat_upper', 'yhat_lower', 'mae', 'mse', 'rmse', 'mape', 'mdape']]

    return return_df


### 3-3. Pandas関数を使って予測スタート

では、Pandas関数を使って予測を作成しましょう。  
自動販売機とアイテムごとにデータをグループ化し、関数を適用します。そして、データ管理用に今日の日付を *training_date* として追加します。Pandas UDFではなく `applyInPandas()` を使います。


<!-- %md
Now let's call our pandas function to build our forecasts.  We do this by grouping our historical dataset around store and item.  We then apply our function to each group and tack on today's date as our *training_date* for data management purposes:

**UPDATE** Per the previous update note, we are now using applyInPandas() to call a pandas function instead of a pandas UDF. -->

In [0]:
with mlflow.start_run(run_name="demand_forecast") as run:
    run_id = run.info.run_id

    model_directories_df = (
        vm_item_history
            .withColumn("run_id", F.lit(run_id)) # add current run_id
            .groupBy('vm', 'item')
            .applyInPandas(train_model, schema=result_schema)
            .withColumn('training_date', F.current_date())
            .cache()
    )

vm_item_history = vm_item_history.drop(col('y'))
combined_df = model_directories_df.join(vm_item_history, on=["ds", "vm", "item"], how="left")

combined_df.createOrReplaceTempView('new_forecasts')

# display(combined_df.count())
# display(combined_df)


### 3-4. 予測結果をテーブル保存

多くの場合、予測結果をレポートで活用したいと考えるので、クエリー可能なテーブル構造として保存します。

以下では、他のユーザーとデータベースが競合しないようにユーザー名を埋め込んだデータベースを作成しています。

In [0]:
# # Username を取得
# username_raw = dbutils.notebook.entry_point.getDbutils().notebook().getContext().tags().apply('user')

# # Username の英数字以外を除去し、全て小文字化
# username = re.sub('[^A-Za-z0-9]+', '', username_raw).lower()

# # ユーザー固有のデータベース名を生成します
# db_name = f"forecasts_{username}"

# # データベースの準備
# spark.sql(f"DROP DATABASE IF EXISTS {db_name} CASCADE")
# spark.sql(f"CREATE DATABASE IF NOT EXISTS {db_name}")
# spark.sql(f"USE {db_name}")

# # データベースを表示
# print(f"database_name: {db_name}")

In [0]:
%sql

-- 予測結果テーブルの作成
CREATE TABLE IF NOT EXISTS silver_forecasts (
  order_date DATE,
  vending_machine_id INTEGER,
  item_id INTEGER,
  actual_sales_quantity FLOAT,
  forecast_sales_quantity FLOAT,
  forecast_sales_quantity_upper FLOAT,
  forecast_sales_quantity_lower FLOAT,
  sales_inference_date DATE
  )
USING DELTA
PARTITIONED BY (order_date);

-- 予測結果テーブルにデータをマージ
MERGE INTO silver_forecasts f
USING new_forecasts n 
ON f.order_date = n.ds AND f.vending_machine_id = n.vm AND f.item_id = n.item
WHEN MATCHED THEN UPDATE SET
  f.order_date = n.ds,
  f.vending_machine_id = n.vm,
  f.item_id = n.item,
  f.actual_sales_quantity = n.y,
  f.forecast_sales_quantity = n.yhat,
  f.forecast_sales_quantity_upper = n.yhat_upper,
  f.forecast_sales_quantity_lower = n.yhat_lower,
  f.sales_inference_date = current_date()
WHEN NOT MATCHED THEN INSERT (
  order_date,
  vending_machine_id,
  item_id,
  actual_sales_quantity,
  forecast_sales_quantity,
  forecast_sales_quantity_upper,
  forecast_sales_quantity_lower,
  sales_inference_date)
VALUES (
  n.ds,
  n.vm,
  n.item,
  n.y,
  n.yhat,
  n.yhat_upper,
  n.yhat_lower,
  current_date());

In [0]:
# テーブル名
table_name = f'{MY_CATALOG}.{MY_SCHEMA}.silver_forecasts'

# テーブルコメント
comment = """
`silver_forecasts`テーブルは、自動販売機の需要予測結果データを管理します。
"""
spark.sql(f'COMMENT ON TABLE {table_name} IS "{comment}"')

# カラムコメント
column_comments = {
    "order_date": "受注日（主キー、外部キー）、YYYY-MM-DDフォーマット",
    "vending_machine_id": "自動販売機ID（主キー、外部キー）、例: 10",
    "item_id": "商品ID（主キー、外部キー）、例: 10",
    "actual_sales_quantity": "実績販売数、例: 50",
    "forecast_sales_quantity": "予測販売数、例: 50",
    "forecast_sales_quantity_upper": "予測販売数（上限）、例: 60",
    "forecast_sales_quantity_lower": "予測販売数（下限）、例: 40",
    "sales_inference_date": "販売数予測日、YYYY-MM-DDフォーマット"
}

for column, comment in column_comments.items():
    # シングルクォートをエスケープ
    escaped_comment = comment.replace("'", "\\'")
    sql_query = f"ALTER TABLE {table_name} ALTER COLUMN {column} COMMENT '{escaped_comment}'"
    spark.sql(sql_query)


### 3-5. 予測結果の可視化

ここまでで、それぞれの店舗・商品の組み合わせに対する予測結果を生成し、それぞれの基本的な評価メトリクスを計算しました。  
この予測データを確認するために、シンプルなクエリー(ここでは店舗1から店舗3における商品1に限定しています)を実行できます

<!-- %md

### 3-5. Visualization of Forecast Results

At this point, we have generated forecast results for each store-item combination and calculated the basic evaluation metrics for each.  
To review this forecast data, you can run a simple query (here limited to product 1 across stores 1 to 3). -->

In [0]:
%sql

SELECT
  vending_machine_id,
  order_date,
  forecast_sales_quantity,
  forecast_sales_quantity_upper,
  forecast_sales_quantity_lower
FROM silver_forecasts a
WHERE item_id = 1 AND
      vending_machine_id IN (1, 2, 3) AND
      order_date >= '2018-01-01' AND
      sales_inference_date=current_date()
ORDER BY vending_machine_id;

Databricks visualization. Run in Databricks to view.


### 3-6. モデル予測結果の評価

<!-- %md

### 3-6. Evaluation of Model Forecast Results -->

しかし、各予測の精度はどれくらい良い（または悪い）のでしょうか？Pandas関数を使うことで、各店舗・アイテムの予測に対する評価指標を以下のように生成できます。

<!-- %md
But how good (or bad) is each forecast?  Using the pandas function technique, we can generate evaluation metrics for each store-item forecast as follows -->

In [0]:
# 予測結果のスキーマを定義
eval_schema = StructType([
  StructField('training_date', DateType()),  # トレーニング日時
  StructField('vm', IntegerType()),          # 店舗
  StructField('item', IntegerType()),        # アイテム
  StructField('mae', FloatType()),           # 平均絶対誤差 (MAE)
  StructField('mse', FloatType()),           # 平均二乗誤差 (MSE)
  StructField('rmse', FloatType()),          # 二乗平均平方根誤差 (RMSE)
  StructField('mape', FloatType()),          # 平均絶対パーセント誤差 (MAPE)
  StructField('mdape', FloatType())          # 中央絶対パーセント誤差 (MdAPE)
])

import numpy as np
from sklearn.metrics import mean_absolute_error, mean_squared_error
from math import sqrt

def mape(y_true, y_pred):
    y_true, y_pred = np.array(y_true), np.array(y_pred)
    return np.mean(np.abs((y_true - y_pred) / y_true)) * 100

def mdape(y_true, y_pred):
    y_true, y_pred = np.array(y_true), np.array(y_pred)
    return np.median(np.abs((y_true - y_pred) / y_true)) * 100

# メトリクスを計算する関数を定義
def evaluate_forecast(evaluation_pd: pd.DataFrame) -> pd.DataFrame:
  
  # 入力データセットから店舗と商品を取得
  training_date = evaluation_pd['training_date'].iloc[0]
  vm = evaluation_pd['vm'].iloc[0]
  item = evaluation_pd['item'].iloc[0]

  # 評価メトリクスを計算
  mae = mean_absolute_error(evaluation_pd['y'], evaluation_pd['yhat'])
  mse = mean_squared_error(evaluation_pd['y'], evaluation_pd['yhat'])
  rmse = sqrt(mse)
  mape_value = mape(evaluation_pd['y'], evaluation_pd['yhat'])
  mdape_value = mdape(evaluation_pd['y'], evaluation_pd['yhat'])

  # 結果セットを組み立てる
  results = {
    'training_date': [training_date], 
    'vm': [vm], 
    'item': [item], 
    'mae': [mae], 
    'mse': [mse], 
    'rmse': [rmse],
    'mape': [mape_value],
    'mdape': [mdape_value]
  }
  return pd.DataFrame.from_dict(results)

# メトリクスを計算
results = (
  spark
    .table('new_forecasts')
    .filter('ds < \'2018-01-01\'') # 過去のデータがある期間に絞って評価を実施
    .select('training_date', 'vm', 'item', 'y', 'yhat')
    .groupBy('training_date', 'vm', 'item')
    .applyInPandas(evaluate_forecast, schema=eval_schema)
    )

results.createOrReplaceTempView('new_forecast_evals')
display(results)

再び、各予測の評価指標をクエリ可能なテーブルとして保存します。

<!-- %md
Once again, we will likely want to report the metrics for each forecast, so we persist these to a queryable table: -->

In [0]:
%sql

CREATE TABLE IF NOT EXISTS silver_forecast_evals (
  vending_machine_id INTEGER,
  item_id INTEGER,
  mae FLOAT,
  mse FLOAT,
  rmse FLOAT,
  mape FLOAT,
  mdape FLOAT,
  training_date date
  )
USING DELTA
PARTITIONED BY (training_date);

INSERT INTO silver_forecast_evals
SELECT
  vm as vending_machine_id,
  item as item_id,
  mae,
  mse,
  rmse,
  mape,
  mdape,
  training_date
FROM new_forecast_evals;

In [0]:
# テーブル名
table_name = f'{MY_CATALOG}.{MY_SCHEMA}.silver_forecast_evals'

# テーブルコメント
comment = """
`silver_forecast_evals`テーブルは、自動販売機の需要予測結果データを管理します。
"""
spark.sql(f'COMMENT ON TABLE {table_name} IS "{comment}"')

# カラムコメント
column_comments = {
    "vending_machine_id": "自動販売機ID（主キー、外部キー）、例: 10",
    "item_id": "商品ID（主キー、外部キー）、例: 10",
    "mae": "MAE、例: 6539.265137",
    "mse": "MSE、例: 67675384.33",
    "rmse": "RMSE、例: 8226.504883",
    "mape": "MAPE、例: 10.95132351",
    "mdape": "MDAPE、例: 8.96886158",
    "training_date": "トレーニング実行日、YYYY-MM-DDフォーマット"
}

for column, comment in column_comments.items():
    # シングルクォートをエスケープ
    escaped_comment = comment.replace("'", "\\'")
    sql_query = f"ALTER TABLE {table_name} ALTER COLUMN {column} COMMENT '{escaped_comment}'"
    spark.sql(sql_query)

これらの予測について、それぞれの信頼性を評価するための指標を取得できます。

<!-- %md
And for each of these, we can retrieve a measure of help us assess the reliability of each forecast: -->

In [0]:
%sql

SELECT
  vending_machine_id,
  mae,
  mse,
  rmse,
  mape,
  mdape
FROM silver_forecast_evals a
WHERE item_id = 1 AND
      training_date=current_date()
ORDER BY vending_machine_id