このデモは、技術的な詳細というより、ビジネス上の成果に関心があるビジネスアラインドのペルソナを対象としています。 そのため、このノートブックのコードの多くは隠され、ロジック自体も合理化されており、彼らが関心を持つポイントにすぐに到達できるようになっています。 より技術的な説明を必要とされる方は、[the solution accelerator](https://databricks.com/blog/2021/04/06/fine-grained-time-series-forecasting-at-scale-with-facebook-prophet-and-apache-spark-updated-for-spark-3.html)に関連するノートブックの利用をご検討ください。

このノートブックは、**Databricks 10.4 LTS**ランタイムで動作するように開発されています。 実行する前に、必ず[このデータセット](https://www.kaggle.com/c/demand-forecasting-kernels-only/data)をダウンロード、解凍し、Databricksにアップロードしてください。 データセットはCSVで */FileStore/demand_forecast/train* に保存してください。

最後に、*View.Results Only*を必ず選択する。最後に、このノートブックを顧客に提示する前に、*View: Results Only* を選択してください。

In [0]:
from pyspark.sql.types import *
import pyspark.sql.functions as f
from pyspark.sql.window import Window

from prophet import Prophet
import logging

# disable informational messages from fbprophet
logging.getLogger('py4j').setLevel(logging.ERROR)

import pandas as pd

from sklearn.metrics import mean_squared_error, mean_absolute_error
from math import sqrt

## Step 1: データを検証する

予測を作成するデータセットは、5年間にわたる10店舗の50商品の日次売上データから構成されています。

In [0]:
# structure of the training data set
train_schema = StructType([
  StructField('date', DateType()),
  StructField('store', IntegerType()),
  StructField('item', IntegerType()),
  StructField('sales', IntegerType())
  ])

# read the training file into a dataframe
train = spark.read.csv(
  'dbfs:/FileStore/demand_forecast/train/train.csv', 
  header=True, 
  schema=train_schema
  )

# make the dataframe queriable as a temporary view
train.createOrReplaceTempView('train')

# show data
display(train)

date,store,item,sales
2013-01-01,1,1,13
2013-01-02,1,1,11
2013-01-03,1,1,14
2013-01-04,1,1,13
2013-01-05,1,1,10
2013-01-06,1,1,12
2013-01-07,1,1,10
2013-01-08,1,1,9
2013-01-09,1,1,12
2013-01-10,1,1,9


予測を行う際の典型的な例として、年単位と週単位の両方で、データにトレンドと季節性があるかどうかを調べたいと思います。

In [0]:
%sql

SELECT
  year(date) as year, 
  sum(sales) as sales
FROM train
GROUP BY year(date)
ORDER BY year;

year,sales
2013,7941243
2014,9135482
2015,9536887
2016,10357160
2017,10733740


In [0]:
%sql

SELECT 
  TRUNC(date, 'MM') as month,
  SUM(sales) as sales
FROM train
GROUP BY TRUNC(date, 'MM')
ORDER BY month;

month,sales
2013-01-01,454904
2013-02-01,459417
2013-03-01,617382
2013-04-01,682274
2013-05-01,763242
2013-06-01,795597
2013-07-01,855922
2013-08-01,766761
2013-09-01,689907
2013-10-01,656587


In [0]:
%sql

SELECT
  YEAR(date) as year,
  (
    CASE
      WHEN DATE_FORMAT(date, 'E') = 'Sun' THEN 0
      WHEN DATE_FORMAT(date, 'E') = 'Mon' THEN 1
      WHEN DATE_FORMAT(date, 'E') = 'Tue' THEN 2
      WHEN DATE_FORMAT(date, 'E') = 'Wed' THEN 3
      WHEN DATE_FORMAT(date, 'E') = 'Thu' THEN 4
      WHEN DATE_FORMAT(date, 'E') = 'Fri' THEN 5
      WHEN DATE_FORMAT(date, 'E') = 'Sat' THEN 6
    END
  ) % 7 as weekday,
  AVG(sales) as sales
FROM (
  SELECT 
    date,
    SUM(sales) as sales
  FROM train
  GROUP BY date
 ) x
GROUP BY year, weekday
ORDER BY year, weekday;

year,weekday,sales
2013,0,25788.44230769231
2013,1,17269.69230769231
2013,2,20015.81132075472
2013,3,20150.153846153848
2013,4,21503.19230769231
2013,5,23071.096153846152
2013,6,24532.903846153848
2014,0,29901.0
2014,1,19791.98076923077
2014,2,23179.346153846152


## Step 2: 需要予測を実施する

時系列予測を構築するための強力なパターンがあるように見えます。 しかし、10店舗と50アイテムでは、500店舗とアイテムの予測を行うために500のモデルをトレーニングする必要があります。

In [0]:
%sql -- get dataset metrics

SELECT 
  COUNT(DISTINCT store) as stores,
  COUNT(DISTINCT item) as items,
  COUNT(DISTINCT year(date)) as years,
  COUNT(*) as records
FROM train;

stores,items,years,records
10,50,5,913000


従来のアプローチは、データを集計し、集計されたデータセットから予測を生成するものでした。 もし10店舗すべてのデータを集約すれば、必要なモデル数は50に減り、これは歴史的に我々のビジネスにとってより現実的なものでした。 これを店舗レベルに戻すには、販売台数に基づく割り当てを使用します。

In [0]:
# allocation ratios
ratios = (
  spark.sql('''
    SELECT
      store,
      item,
      sales / SUM(sales) OVER(PARTITION BY item) as ratio
    FROM (
      SELECT 
        store,
        item,
        SUM(sales) as sales
      FROM train
      GROUP BY
        store, item
        ) 
    ''')
    )

# define forecasting function
result_schema =StructType([
  StructField('ds',DateType()),
  StructField('item',IntegerType()),
  StructField('y',FloatType()),
  StructField('yhat',FloatType()),
  StructField('yhat_upper',FloatType()),
  StructField('yhat_lower',FloatType())
  ])

def forecast_item( history_pd: pd.DataFrame ) -> pd.DataFrame:
  
  # TRAIN MODEL AS BEFORE
  # --------------------------------------
  # remove missing values (more likely at day-store-item level)
  history_pd = history_pd.dropna()
  
  # configure the model
  model = Prophet(
    interval_width=0.95,
    growth='linear',
    daily_seasonality=False,
    weekly_seasonality=True,
    yearly_seasonality=True,
    seasonality_mode='multiplicative'
    )
  
  # train the model
  model.fit( history_pd )
  # --------------------------------------
  
  # BUILD FORECAST AS BEFORE
  # --------------------------------------
  # make predictions
  future_pd = model.make_future_dataframe(
    periods=90, 
    freq='d', 
    include_history=True
    )
  forecast_pd = model.predict( future_pd )  
  # --------------------------------------
  
  # ASSEMBLE EXPECTED RESULT SET
  # --------------------------------------
  # get relevant fields from forecast
  f_pd = forecast_pd[ ['ds','yhat', 'yhat_upper', 'yhat_lower'] ].set_index('ds')
  
  # get relevant fields from history
  h_pd = history_pd[['ds','item','y']].set_index('ds')
  
  # join history and forecast
  results_pd = f_pd.join( h_pd, how='left' )
  results_pd.reset_index(level=0, inplace=True)
  
  # get store & item from incoming data set
  results_pd['item'] = history_pd['item'].iloc[0]
  # --------------------------------------
  
  # return expected dataset
  return results_pd[ ['ds', 'item', 'y', 'yhat', 'yhat_upper', 'yhat_lower'] ]  

forecast = (
  spark
    .table('train')
    .withColumnRenamed('date','ds')
    .groupBy('item','ds')
      .agg(f.sum('sales').alias('y'))
    .orderBy('item','ds')
    .repartition('item')
    .groupBy('item')
      .applyInPandas(forecast_item, schema=result_schema)
    .withColumn('training_date', f.current_date())
  )

results = (
  forecast
    .join(ratios, on='item')
    .withColumn('yhat',f.expr('yhat * ratio'))
    .selectExpr('ds as date','store','item','y as sales', 'yhat as forecast', 'training_date') 
    )

_ = (
  results
    .write
    .format('delta')
    .mode('overwrite')
    .option('overwriteSchema','true')
    .saveAsTable('allocated_forecasts')
  )

display(spark.table('allocated_forecasts'))

date,store,item,sales,forecast,training_date
2013-01-01,4,1,133.0,11.643899878484651,2022-05-02
2013-01-01,5,1,133.0,8.497050176330085,2022-05-02
2013-01-01,2,1,133.0,14.301208739164467,2022-05-02
2013-01-01,8,1,133.0,13.697135912063596,2022-05-02
2013-01-01,10,1,133.0,12.556263899904378,2022-05-02
2013-01-01,1,1,133.0,10.137748669449897,2022-05-02
2013-01-01,9,1,133.0,11.840438989741124,2022-05-02
2013-01-01,7,1,133.0,7.695048286690866,2022-05-02
2013-01-01,3,1,133.0,12.725837956292564,2022-05-02
2013-01-01,6,1,133.0,8.486208556331496,2022-05-02


Databricksでは、クラウドを活用して、実際に必要な500個のモデルを提供する可能です!

In [0]:
# retrieve historical data
sql_statement = '''
  SELECT
    store,
    item,
    CAST(date as date) as ds,
    SUM(sales) as y
  FROM train
  GROUP BY store, item, ds
  ORDER BY store, item, ds
  '''

store_item_history = (
  spark
    .sql( sql_statement )
    .repartition(sc.defaultParallelism, ['store', 'item'])
  ).cache()

# define forecasting function
result_schema =StructType([
  StructField('ds',DateType()),
  StructField('store',IntegerType()),
  StructField('item',IntegerType()),
  StructField('y',FloatType()),
  StructField('yhat',FloatType()),
  StructField('yhat_upper',FloatType()),
  StructField('yhat_lower',FloatType())
  ])

def forecast_store_item( history_pd: pd.DataFrame ) -> pd.DataFrame:
  
  # TRAIN MODEL AS BEFORE
  # --------------------------------------
  # remove missing values (more likely at day-store-item level)
  history_pd = history_pd.dropna()
  
  # configure the model
  model = Prophet(
    interval_width=0.95,
    growth='linear',
    daily_seasonality=False,
    weekly_seasonality=True,
    yearly_seasonality=True,
    seasonality_mode='multiplicative'
    )
  
  # train the model
  model.fit( history_pd )
  # --------------------------------------
  
  # BUILD FORECAST AS BEFORE
  # --------------------------------------
  # make predictions
  future_pd = model.make_future_dataframe(
    periods=90, 
    freq='d', 
    include_history=True
    )
  forecast_pd = model.predict( future_pd )  
  # --------------------------------------
  
  # ASSEMBLE EXPECTED RESULT SET
  # --------------------------------------
  # get relevant fields from forecast
  f_pd = forecast_pd[ ['ds','yhat', 'yhat_upper', 'yhat_lower'] ].set_index('ds')
  
  # get relevant fields from history
  h_pd = history_pd[['ds','store','item','y']].set_index('ds')
  
  # join history and forecast
  results_pd = f_pd.join( h_pd, how='left' )
  results_pd.reset_index(level=0, inplace=True)
  
  # get store & item from incoming data set
  results_pd['store'] = history_pd['store'].iloc[0]
  results_pd['item'] = history_pd['item'].iloc[0]
  # --------------------------------------
  
  # return expected dataset
  return results_pd[ ['ds', 'store', 'item', 'y', 'yhat', 'yhat_upper', 'yhat_lower'] ]  

# generate forecast
results = (
  store_item_history
    .groupBy('store', 'item')
      .applyInPandas(forecast_store_item, schema=result_schema)
    .withColumn('training_date', f.current_date() )
    .withColumnRenamed('ds','date')
    .withColumnRenamed('y','sales')
    .withColumnRenamed('yhat','forecast')
    .withColumnRenamed('yhat_upper','forecast_upper')
    .withColumnRenamed('yhat_lower','forecast_lower')
    )

_ = (
  results
    .write
    .format('delta')
    .mode('overwrite')
    .option('overwriteSchema','true')
    .saveAsTable('forecasts')
  )

display(spark.table('forecasts').drop('forecast_upper','forecast_lower'))

date,store,item,sales,forecast,training_date
2013-01-01,1,42,21.0,15.504805,2022-03-30
2013-01-02,1,42,24.0,16.079311,2022-03-30
2013-01-03,1,42,14.0,17.32605,2022-03-30
2013-01-04,1,42,22.0,18.914005,2022-03-30
2013-01-05,1,42,18.0,20.546606,2022-03-30
2013-01-06,1,42,26.0,22.339357,2022-03-30
2013-01-07,1,42,14.0,12.545781,2022-03-30
2013-01-08,1,42,14.0,15.60132,2022-03-30
2013-01-09,1,42,12.0,16.238214,2022-03-30
2013-01-10,1,42,17.0,17.547174,2022-03-30


それぞれの結果を比較すると、きめ細かい予測は、私たちが捕らえたいと思っていた局所的な変動をもたらすのに対し、配分法は単に店舗間予測のスケーリングされた変動を返すだけであることがわかります。 これらの変動は、地域の需要の違いを表しており、利益を最大化するためには、オペレーションを微調整する必要があります。

In [0]:
%sql

SELECT 
  store,
  date,
  forecast
FROM allocated_forecasts
WHERE item = 1 AND 
      date >= '2018-01-01' AND 
      training_date=current_date()
ORDER BY date, store

store,date,forecast
1,2018-01-01,11.886370370128242
2,2018-01-01,16.76796982810265
3,2018-01-01,14.920869332119413
4,2018-01-01,13.652311871317089
5,2018-01-01,9.962674035684431
6,2018-01-01,9.94996238589736
7,2018-01-01,9.022337891179111
8,2018-01-01,16.059702777145958
9,2018-01-01,13.882751266175063
10,2018-01-01,14.722046091860054


In [0]:
%sql

SELECT
  store,
  date,
  forecast
FROM forecasts a
WHERE item = 1 AND
      date >= '2018-01-01' AND
      training_date=current_date()
ORDER BY date, store

store,date,forecast
1,2018-01-01,11.556276
2,2018-01-01,17.964453
3,2018-01-01,14.400181
4,2018-01-01,14.163125
5,2018-01-01,10.320256
6,2018-01-01,10.057019
7,2018-01-01,8.648001
8,2018-01-01,15.725753
9,2018-01-01,14.140076
10,2018-01-01,14.7101145


このDatabricksを使ったアプローチは、問題に割り当てるリソースの数を決めることができ、それがそのままオペレーションを完了するための時間になるという点で有効です。 ここでは、500のモデルを実行したときの処理時間について、環境の大きさの違いを見ていますが、当社の最大の顧客の多くは、これと同じパターンを使って毎日1～2時間以内に数百万件の予測を完了していることに留意してください。

In [0]:
# Test runs on Azure F4s_v2 - 4 cores, 8 GB RAM
tests_pd = pd.DataFrame(
    [ (1, 20.77 * 60),
      (2, 11.14 * 60),
      (3, 7.46 * 60),
      (4, 6.11 * 60),
      (5, 5.01 * 60),
      (6, 4.90 * 60),
      (8, 3.81 * 60),
      (10, 3.20 * 60),
      (12, 2.80 * 60),
      (15, 2.26 * 60) ],
    columns = ['workers', 'seconds']
    )

tests_pd['cores'] = tests_pd['workers'] * 4 # 4-cores per worker VM

display(tests_pd)

workers,seconds,cores
1,1246.2,4
2,668.4000000000001,8
3,447.6,12
4,366.6,16
5,300.6,20
6,294.0,24
8,228.6,32
10,192.0,40
12,168.0,48
15,135.6,60


## Step 3: アナリストへのデータ提供/共有

Databricksはこれらの予測を作成するのに必要な時間を短縮するのに優れていますが、アナリストはどのようにそれを利用するでしょうか。このノートブックにあるネイティブな可視化機能はすでにご覧の通りです。 これは、データサイエンティストの作業を支援するための機能です。

アナリストの場合は、[Databricks' SQL Dashboard](https://adb-2704554918254528.8.azuredatabricks.net/sql/dashboards/fafa7c3f-35e0-4b4c-925e-04e305155678?o=2704554918254528)を活用して、結果を提示することもできます。

<img src='https://brysmiwasb.blob.core.windows.net/demos/images/forecasting_dashboard.PNG' width=800>

また、TableauやPower BIなどのツールでデータを表示したいと思うこともあります。

<img src='https://brysmiwasb.blob.core.windows.net/demos/images/forecasting_powerbi.PNG' width=800>

これらのデータをExcelで提示することも可能です。

<img src='https://brysmiwasb.blob.core.windows.net/demos/images/forecasting_excel.PNG' width=800>