# Running on Spark (with Databricks)

## Setup

In [None]:
import pandas as pd
import os

# Read in the data
INPUT_DIR = os.path.abspath('data')
WORKING_DIR = os.path.abspath("data/working")
calendar = pd.read_csv(f'{INPUT_DIR}/calendar.csv')
sales = pd.read_csv(f'{INPUT_DIR}/sales_train_evaluation.csv')
sell_prices = pd.read_csv(f'{INPUT_DIR}/sell_prices.csv')

## Minimizing Data Footprint

In [None]:
from typing import List, Dict, Any, Iterable
from datetime import date
import pickle

def prices_to_series(df:pd.DataFrame) -> List[Dict[str,Any]]:
    # Assert each date has a price entry
    assert df.shape[0] == (df.date.iloc[-1]-df.date.iloc[0]).days + 1
    return [dict(store_id=df.iloc[0]["store_id"],
                 item_id=df.iloc[0]['item_id'],
                 price_start=df.iloc[0]['date'], 
                 prices=df["sell_price"].tolist())]


df = pd.DataFrame([["store1","item1",date(2020,1,2),2.2], 
                   ["store1","item1",date(2020,1,3),3.3],
                   ["store1","item1", date(2020,1,4),4.4]], 
                   columns=["store_id", "item_id", "date","sell_price"])
print(prices_to_series(df))

In [None]:
joined = sell_prices.merge(calendar[["date","wm_yr_wk"]], how="inner", on="wm_yr_wk")
joined['date'] = pd.to_datetime(joined['date'])
joined.head()

In [None]:
from fugue import transform

sell_prices = transform(joined, 
                prices_to_series, 
                schema="store_id:str,item_id:str,price_start:date,prices:[float]",
                partition={"by": ["store_id", "item_id"], "presort": "date asc"})
sell_prices.head()

In [None]:
sales.head()

In [None]:
# schema: unique_id:str,item_id:str,store_id:str,sales_start:date,sales:[float]
def sales_to_series(df:Iterable[List[Any]], start) -> Iterable[List[Any]]:
    for row in df:
        yield row[:2] + [row[4]] + [start, row[6:]]

sales = transform(sales, sales_to_series, params={"start": calendar['date'].min()})

In [None]:
sales.head()

In [None]:
combined = sales.merge(sell_prices, on=["item_id", "store_id"])

## Defining Logic for Each Timeseries

In [None]:
combined.iloc[0:1]

In [None]:
def format_series(df:List[Dict[str,Any]]) -> pd.DataFrame:
    row = df[0]
    dr1 = pd.date_range(row["sales_start"],periods=len(row["sales"]), freq="d")
    df = pd.DataFrame({"quantity":row["sales"]},index = dr1)
    dr2 = pd.date_range(row["price_start"],periods=len(row["prices"]), freq="d")
    df["price"] = pd.Series(row["prices"],index = dr2)
    df=df.dropna().reset_index()
    df.columns=["ds", "quantity", "price"]
    df['unique_id'] = row['unique_id'] 
    return df

In [None]:
test = format_series(combined.iloc[0:1].to_dict("records"))
test.head()

## Time Series Cross Validation

For timeseries cross validations, we perform the modelling with a sliding window of test sets. This is so we don't predict past data points with future information.

![img](https://miro.medium.com/max/1204/1*qvdnPF8ETV9mFdMT0Y_BBA.webp)

In [None]:
from statsforecast import StatsForecast
from statsforecast.models import Naive, CrostonClassic, IMAPA, ADIDA, AutoARIMA

def run_model_cv(df: pd.DataFrame):
  sf = StatsForecast(df=df, 
      models=[CrostonClassic(),
        IMAPA(),
        AutoARIMA()
    ], 
      freq="D",
      n_jobs=1)
  
  return sf.cross_validation(h=28, n_windows=2)

In [None]:
test2 = run_model_cv(test)
test2.head()

In [None]:
from sklearn.metrics import mean_absolute_error

def calculate_metrics(cv_df: pd.DataFrame) -> pd.DataFrame:
    models = []
    metrics = []
    for model in ["CrostonClassic", "IMAPA", "AutoARIMA"]:
        models.append(model)
        metrics.append(mean_absolute_error(cv_df['y'], cv_df[model]))
    out = pd.DataFrame({"models": models, "metric": metrics})
    out['unique_id'] = cv_df.index[0]
    return out


In [None]:
calculate_metrics(test2)

In [None]:
combined.head()

In [None]:
def process(df: pd.DataFrame) -> pd.DataFrame:
    timeseries = format_series(df.to_dict("records"))
    model_cv = run_model_cv(timeseries)
    metrics = calculate_metrics(model_cv).reset_index(drop=True)
    return metrics

In [None]:
transform(combined.iloc[0:2], 
          process, 
          schema="models:str,metric:float,unique_id:str", 
          partition={"by": "unique_id"},)

## Running on Spark Cluster

You can either use `databricks-connect` to connect to a Spark cluster or you can run this on Databricks.

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

In [None]:
results = transform(combined.iloc[0:50], 
                    process, 
                    schema="models:str,metric:float,unique_id:str", 
                    engine=spark, 
                    partition={"by": "unique_id"}).toPandas()

results.to_parquet(f'{WORKING_DIR}/model_search.parquet')

In [None]:
results = pd.read_parquet(f'{WORKING_DIR}/model_search.parquet')
best_models = results.sort_values('metric', ascending=True).groupby("unique_id").first()
best_models['models'].value_counts()