<a href="https://colab.research.google.com/github/shyuwang/Time-Series-Forecasting/blob/main/3_ModelBuilding_Scale.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
# Install pyspark
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.2.tar.gz (281.4 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m281.4/281.4 MB[0m [31m5.5 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m199.7/199.7 KB[0m [31m14.1 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.2-py2.py3-none-any.whl size=281824025 sha256=88673f30d0a5bbb37558dc2849aefb98ab8f2e64c832d6f4b4d789751a7ea421
  Stored in directory: /root/.cache/pip/wheels/b1/59/a0/a1a0624b5e865fd389919c1a10f53aec9b12195d6747710baf
Successfully built pyspark
Installing collected packages: py4j, pyspa

In [2]:
# Import SparkSession
from pyspark.sql import SparkSession

# Create a Spark Session
spark = SparkSession.builder.master("local[*]").getOrCreate()

# Check Spark Session Information
# spark

In [3]:
from pyspark.sql.types import *
from google.colab import drive

# Mount google drive
drive.mount('/content/drive')

# Path of training set on Google drive
train_path = '/content/drive/MyDrive/demand-forecasting-at-scale-data/train.csv'
 
# Structure of the training data set
train_schema = StructType([
  StructField('Date', DateType()),
  StructField('Store', IntegerType()),
  StructField('Item', IntegerType()),
  StructField('Sales', IntegerType())
  ])
 
# Read the training file into a dataframe
train = spark.read.csv(
  train_path, 
  header=True, 
  schema=train_schema
  )

train.createOrReplaceTempView('train')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


## Step 1:
Retrieve data for all available store-item combinations at day level.

In [28]:
query = """
select
  Store as store
  ,Item as item
  ,cast(Date as date) as ds
  ,Sales as y
from train
order by 1,2,3
"""

store_item_history = spark.sql(query)

In [29]:
store_item_history.show(5)

+-----+----+----------+---+
|store|item|        ds|  y|
+-----+----+----------+---+
|    1|   1|2013-01-01| 13|
|    1|   1|2013-01-02| 11|
|    1|   1|2013-01-03| 14|
|    1|   1|2013-01-04| 13|
|    1|   1|2013-01-05| 10|
+-----+----+----------+---+
only showing top 5 rows



## Step 2:
Define schema for forecast output.

The goal is to build a model for each store-item combination and get the forecast back for this store-item subset. with that being said, we expect the returned forecast has the following fields:
- store
- item
- ds
- y (for historical data only)
- yhat, yhat_lower, yhat_upper

In [15]:
from pyspark.sql.types import *

result_schema = StructType([
    StructField('ds', DateType()),
    StructField('store',IntegerType()),
    StructField('item', IntegerType()),
    StructField('y', FloatType()),
    StructField('yhat',FloatType()),
    StructField('yhat_upper',FloatType()),
    StructField('yhat_lower',FloatType())
])

## Step 3:
Define function to train data and forecast.

The function serves to receive a single store-item combination, and return a forecast in the previously defined result schema.

In [7]:
from prophet import Prophet

In [23]:
def forecast_store_item(history_pd):
  """
  Receive a single store-item combination, 
  and return a forecast in the previously defined result schema.
  """
  # TRAIN MODEL
  # --------------------------------------
  # Configure
  model = Prophet(
      interval_width=0.95,
      growth='linear',
      daily_seasonality=False,
      weekly_seasonality=True,
      yearly_seasonality=True,
      seasonality_mode='multiplicative'
  )

  # Train
  model.fit(history_pd)

  # BUILD FORECAST
  # -----------------------------------
  future_pd = model.make_future_dataframe(
      periods=90,
      freq='d',
      include_history=True
  )
  forecast_pd = model.predict(future_pd)

  # ASSEMBLE RESULTS
  #-------------------------------------
  # Get relevant fields from forecast
  f_pd = forecast_pd[['ds','yhat','yhat_upper','yhat_lower']].set_index('ds')
  # Get relevant fields from history
  h_pd = history_pd[['ds','store','item','y']].set_index('ds')

  result_pd = f_pd.join(h_pd, how='left')
  result_pd.reset_index(level=0, inplace=True)

  # Get store and item for this subset
  result_pd['store'] = history_pd['store'].iloc[0]
  result_pd['item'] = history_pd['item'].iloc[0]

  # Adjust the sequence and return
  return result_pd[['ds','store','item','y','yhat','yhat_upper','yhat_lower']]

## Step 4:
Apply the pandas function to each store-item combination.

1. Group the historical dataset by store, item
2. Apply the function to each group

In [19]:
from pyspark.sql.functions import current_date

In [32]:
results = (
    store_item_history
    .groupBy('store','item')
    .applyInPandas(forecast_store_item, schema=result_schema)
    # add training date for data management
    .withColumn('training_date', current_date())
)

results.createOrReplaceTempView('new_forecasts')

In [33]:
# Glimpse of the forecast
results.show(5)

+----------+-----+----+----+---------+----------+----------+-------------+
|        ds|store|item|   y|     yhat|yhat_upper|yhat_lower|training_date|
+----------+-----+----+----+---------+----------+----------+-------------+
|2013-01-01|    1|   1|13.0|10.048304| 18.312273| 1.1078302|   2023-02-17|
|2013-01-02|    1|   1|11.0|10.525652| 18.830145| 2.6071098|   2023-02-17|
|2013-01-03|    1|   1|14.0|11.050407|  19.78737| 2.5539734|   2023-02-17|
|2013-01-04|    1|   1|13.0|12.241633| 20.948008|   4.42211|   2023-02-17|
|2013-01-05|    1|   1|10.0|13.777667| 23.248283|  5.464106|   2023-02-17|
+----------+-----+----+----+---------+----------+----------+-------------+
only showing top 5 rows



Persist forecast output to delta table if in Databricks environment.

## Step 5:
Evaluate model for each store-item combination.

In [36]:
from sklearn.metrics import mean_squared_error, mean_absolute_error
import pandas as pd

In [34]:
# Evaluation metrics schema
eval_schema = StructType([
    StructField('training_date', DateType()),
    StructField('store', IntegerType()),
    StructField('item', IntegerType()),
    StructField('mae', FloatType()),
    StructField('mse', FloatType()),
    StructField('rmse', FloatType())
])

In [44]:
from pandas.core.aggregation import maybe_mangle_lambdas
def evaluate_forecast(evaluation_pd):
  """
  """
  # Retrieve information
  training_date = evaluation_pd['training_date'].iloc[0]
  store = evaluation_pd['store'].iloc[0]
  item = evaluation_pd['item'].iloc[0]

  # Calculate metrics
  mae = mean_absolute_error(evaluation_pd['y'], evaluation_pd['yhat'])
  mse = mean_squared_error(evaluation_pd['y'], evaluation_pd['yhat'])
  rmse = mse**0.5

  # Assemble results
  result = pd.DataFrame({"training_date":[training_date],"store":[store],"item":[item],"mae":[mae],"mse":[mse],"rmse":[rmse]})
  return result

In [45]:
results = (
    spark
    .table('new_forecasts')
    # limit to periods where historical data is available
    .filter("ds<'2018-01-01'")
    .select('training_date','store','item','y','yhat')
    .groupBy('training_date','store','item')
    .applyInPandas(evaluate_forecast, schema=eval_schema)
)

results.createOrReplaceTempView('new_forecast_evals')
results.show()

+-------------+-----+----+---------+---------+---------+
|training_date|store|item|      mae|      mse|     rmse|
+-------------+-----+----+---------+---------+---------+
|   2023-02-17|    1|   1|3.4865103|19.388674|4.4032574|
|   2023-02-17|    1|   2|6.0576615| 58.63383|7.6572733|
|   2023-02-17|    1|   5|3.1745038|16.208797|4.0260153|
|   2023-02-17|    1|  10|6.9078736|74.809814| 8.649266|
|   2023-02-17|    1|  11|6.5393095| 67.67937|8.2267475|
|   2023-02-17|    1|  13|7.2633123|82.972534| 9.108926|
|   2023-02-17|    1|  14|  6.04398| 59.03582| 7.683477|
|   2023-02-17|    1|  15|7.6815734|93.604546| 9.674944|
|   2023-02-17|    1|  18|7.4220057| 86.89475| 9.321735|
|   2023-02-17|    1|  20|5.2182527|43.944183|6.6290407|
|   2023-02-17|    1|  26|5.3457084| 45.03836|6.7110624|
|   2023-02-17|    1|  28| 7.646754|93.070175| 9.647288|
|   2023-02-17|    1|  29|6.4875183| 65.63709| 8.101672|
|   2023-02-17|    1|  30|4.8911767|  37.9981|  6.16426|
|   2023-02-17|    1|  31| 5.89

Persist forecast evaluation to delta table if in Databricks environment.