# Running on a Dask Cluster (with Coiled)

Previously, we expanded each row to the full timeseries to use with the `forecast` function. In practice, we want to be minimizing network data transfer when it comes to distributed computing. 

The setup shown here is suboptimal because we have a file on a local machine that we are chunking and sending to the Dask workers. All the data is passing through the scheduler. Ideally, the workers should load the partition of data they need. For example, if the data is saved partitioned by `unique_id` already, it will be natural for the workers to load in the partition that is needs to process.

## Setup

Because we are passing everything through the scheduler in this scenario, it's a good opportunity to show a technique used to minimize the data transfer. We can send a compressed version. We already have the `combined.parquet` created in the preprocessing. We will just minimize the data footprint before bringing it to the cluster.

This means we also need to write logic to handle the decompressing the data passed to the workers. This gives us a good chance to explore how to add custom logic alongside our large scale forecasting. This same approach would be used to persist intermediate artifacts or metrics.

In [7]:
import pandas as pd
import os

download_path = os.path.abspath(os.path.join(".","..","data","m5-forecasting-accuracy.zip"))
unzipped_path = os.path.abspath(os.path.join(".","..","data","m5-forecasting-accuracy-unzipped"))

# Read in the data
INPUT_DIR = unzipped_path
WORKING_DIR = os.path.join(unzipped_path, "..", "working")
combined = pd.read_parquet(f'{WORKING_DIR}/combined.parquet')

## Minimizing Data Footprint

To minimize the data footprint, we don't need to repeat the string data like `store_id` and `item_id` 2000 times (once for each day). These are also especially big because they are strings. Instead, we'll pack all of the information into one row so only that row gets sent through the scheduler. 

The function below will put all of the prices and sales volume as a list. No need to worry about the fact that the output is a `List[Dict[str,Any]]`. Similar to what was shown in the previous sections, Fugue will handle the conversion for us later when we apply this function on a Pandas DataFrame by reading the type hints.

We'll just review what the `combined` DataFrame looks like.

In [8]:
combined.head()

Unnamed: 0,unique_id,item_id,dept_id,cat_id,store_id,state_id,ds,y,wm_yr_wk,sell_price
0,HOBBIES_1_001_CA_1_evaluation,HOBBIES_1_001,HOBBIES_1,HOBBIES,CA_1,CA,2013-10-01,2.0,11336,8.26
1,HOBBIES_1_001_CA_1_evaluation,HOBBIES_1_001,HOBBIES_1,HOBBIES,CA_1,CA,2013-10-04,0.01,11336,8.26
2,HOBBIES_1_001_CA_1_evaluation,HOBBIES_1_001,HOBBIES_1,HOBBIES,CA_1,CA,2013-09-29,0.01,11336,8.26
3,HOBBIES_1_001_CA_1_evaluation,HOBBIES_1_001,HOBBIES_1,HOBBIES,CA_1,CA,2013-10-02,0.01,11336,8.26
4,HOBBIES_1_001_CA_1_evaluation,HOBBIES_1_001,HOBBIES_1,HOBBIES,CA_1,CA,2013-09-28,0.01,11336,8.26


Recall we just kept the hierarchical columns for the previous section. We actually don't need them for forecasting so we'll drop them so that they don't get sent to the cluster workers.

In [16]:
from typing import List, Dict, Any, Iterable
from datetime import date

def compress_data(df:pd.DataFrame) -> List[Dict[str,Any]]:
    return [dict(unique_id=df.iloc[0]["unique_id"],
                 store_id=df.iloc[0]["store_id"],
                 item_id=df.iloc[0]['item_id'],
                 start_date=df.iloc[0]['ds'], 
                 y=df["y"].tolist(),
                 prices=df["sell_price"].tolist())]

# Testing on sample data
df = pd.DataFrame([["id1","store1","item1",date(2020,1,2),1,2.2], 
                   ["id1","store1","item1",date(2020,1,3),2,3.3],
                   ["id1","store1","item1", date(2020,1,4),3,4.4]], 
                   columns=["unique_id","store_id", "item_id", "ds","y","sell_price"])
print(compress_data(df))

[{'unique_id': 'id1', 'store_id': 'store1', 'item_id': 'item1', 'start_date': datetime.date(2020, 1, 2), 'y': [1, 2, 3], 'prices': [2.2, 3.3, 4.4]}]


The above functions will operate on each `store_id` and `item_id` combination. This means we have to use the `partition` argument of the `transform()` function to defining the grouping. This will also work on the distributed backends.

The `compress_data()` function assumed the data is sorted. We can apply a `presort` to the `partition` strategy to make sure the data is sorted.

This cell can take about 4 minutes to run.

In [20]:
from fugue import transform

compressed = transform(combined, 
                compress_data, 
                schema="unique_id:str,store_id:str,item_id:str,start_date:datetime,y:[float],prices:[float]",
                partition={"by": ["store_id", "item_id"], "presort": "ds asc"})
compressed.head()

Unnamed: 0,unique_id,store_id,item_id,start_date,y,prices
0,FOODS_1_001_CA_1_evaluation,CA_1,FOODS_1_001,2011-01-29,"[3.0, 0.009999999776482582, 0.0099999997764825...","[2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, ..."
1,FOODS_1_002_CA_1_evaluation,CA_1,FOODS_1_002,2011-01-29,"[0.009999999776482582, 1.0, 0.0099999997764825...","[7.88, 7.88, 7.88, 7.88, 7.88, 7.88, 7.88, 7.8..."
2,FOODS_1_003_CA_1_evaluation,CA_1,FOODS_1_003,2011-01-29,"[0.009999999776482582, 0.009999999776482582, 0...","[2.88, 2.88, 2.88, 2.88, 2.88, 2.88, 2.88, 2.8..."
3,FOODS_1_004_CA_1_evaluation,CA_1,FOODS_1_004,2012-03-03,"[0.009999999776482582, 0.009999999776482582, 0...","[1.78, 1.78, 1.78, 1.78, 1.78, 1.78, 1.78, 1.7..."
4,FOODS_1_005_CA_1_evaluation,CA_1,FOODS_1_005,2011-01-29,"[3.0, 9.0, 3.0, 3.0, 0.009999999776482582, 2.0...","[2.94, 2.94, 2.94, 2.94, 2.94, 2.94, 2.94, 2.9..."


## Defining Logic for Each Timeseries

Now that the data is compressed, we need to bring it back out to the uncompressed form in order to use `StatsForecast`. This code will execute on the Dask workers.

In [30]:
def format_series(df:List[Dict[str,Any]]) -> pd.DataFrame:
    row = df[0]
    dr = pd.date_range(row["start_date"],periods=len(row["y"]), freq="d")
    df = pd.DataFrame({"y":row["y"]},index = dr)
    df["price"] = pd.Series(row["prices"],index = dr)
    df = df.reset_index()
    df.columns=["ds", "y", "price"]
    df['unique_id'] = row['unique_id'] 
    return df

In [31]:
test = format_series(compressed.iloc[0:1].to_dict("records"))
test.head()

Unnamed: 0,ds,y,price,unique_id
0,2011-01-29,3.0,2.0,FOODS_1_001_CA_1_evaluation
1,2011-01-30,0.01,2.0,FOODS_1_001_CA_1_evaluation
2,2011-01-31,0.01,2.0,FOODS_1_001_CA_1_evaluation
3,2011-02-01,1.0,2.0,FOODS_1_001_CA_1_evaluation
4,2011-02-02,4.0,2.0,FOODS_1_001_CA_1_evaluation


## Time Series Cross Validation

For timeseries cross validations, we perform the modelling with a sliding window of test sets. This is so we don't predict past data points with future information. Below is a visual representation of this.

![img](https://raw.githubusercontent.com/Nixtla/statsforecast/main/nbs/imgs/ChainedWindows.gif)

The `StatsForecast()` class also as a `cross_valdation()` method we can use to execute the cross validation. 

In [32]:
from statsforecast import StatsForecast
from statsforecast.models import Naive, CrostonClassic, IMAPA, ADIDA, AutoARIMA

def run_model_cv(df: pd.DataFrame):
  sf = StatsForecast(df=df, 
      models=[Naive(),
        CrostonClassic(),
        IMAPA(),
        ADIDA(),
        AutoARIMA()
    ], 
      freq="D",
      n_jobs=1)

  return sf.cross_validation(h=28, n_windows=2)

  from tqdm.autonotebook import tqdm


We can test this function on our previous `test` DataFrame. The output will be the out-of-fold predictions that we can then use to calculate evaluation metrics.

In [33]:
test2 = run_model_cv(test)
test2.head()

Unnamed: 0_level_0,ds,cutoff,y,Naive,CrostonClassic,IMAPA,ADIDA,AutoARIMA
unique_id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1
FOODS_1_001_CA_1_evaluation,2016-04-24,2016-04-23,0.01,1.0,1.105067,1.105067,1.105067,1.144669
FOODS_1_001_CA_1_evaluation,2016-04-25,2016-04-23,2.0,1.0,1.105067,1.105067,1.105067,0.941412
FOODS_1_001_CA_1_evaluation,2016-04-26,2016-04-23,0.01,1.0,1.105067,1.105067,1.105067,1.180909
FOODS_1_001_CA_1_evaluation,2016-04-27,2016-04-23,0.01,1.0,1.105067,1.105067,1.105067,0.925206
FOODS_1_001_CA_1_evaluation,2016-04-28,2016-04-23,0.01,1.0,1.105067,1.105067,1.105067,1.16762


## Caclulating Error

The last part of our pipeline is returning all of the metrics for each model.

In [34]:
from sklearn.metrics import mean_absolute_error

def calculate_metrics(cv_df: pd.DataFrame) -> pd.DataFrame:
    models = []
    metrics = []
    for model in ["Naive", "CrostonClassic", "IMAPA", "ADIDA", "AutoARIMA"]:
        models.append(model)
        metrics.append(mean_absolute_error(cv_df['y'], cv_df[model]))
    out = pd.DataFrame({"models": models, "metric": metrics})
    out['unique_id'] = cv_df.index[0]
    return out


We can test this on the out-of-fold predictions from the `test2` DataFrame above.

In [35]:
calculate_metrics(test2)

Unnamed: 0,models,metric,unique_id
0,Naive,0.923571,FOODS_1_001_CA_1_evaluation
1,CrostonClassic,1.059186,FOODS_1_001_CA_1_evaluation
2,IMAPA,1.059186,FOODS_1_001_CA_1_evaluation
3,ADIDA,1.059186,FOODS_1_001_CA_1_evaluation
4,AutoARIMA,1.045886,FOODS_1_001_CA_1_evaluation


## Full Pipeline

For each timeseries, we will unpack it,run cross-validation, and then calculate metrics. We can wrap this in one function.

In [36]:
def process(df: pd.DataFrame) -> pd.DataFrame:
    timeseries = format_series(df.to_dict("records"))
    model_cv = run_model_cv(timeseries)
    metrics = calculate_metrics(model_cv).reset_index(drop=True)
    return metrics

Now we can test it on the first two rows of the `compressed` data using the default Pandas-based engine.

In [37]:
transform(compressed.iloc[0:2], 
          process, 
          schema="models:str,metric:float,unique_id:str", 
          partition={"by": "unique_id"},)

Unnamed: 0,models,metric,unique_id
0,Naive,0.923571,FOODS_1_001_CA_1_evaluation
1,CrostonClassic,1.059186,FOODS_1_001_CA_1_evaluation
2,IMAPA,1.059186,FOODS_1_001_CA_1_evaluation
3,ADIDA,1.059186,FOODS_1_001_CA_1_evaluation
4,AutoARIMA,1.045886,FOODS_1_001_CA_1_evaluation
5,Naive,0.673393,FOODS_1_002_CA_1_evaluation
6,CrostonClassic,0.65446,FOODS_1_002_CA_1_evaluation
7,IMAPA,0.65446,FOODS_1_002_CA_1_evaluation
8,ADIDA,0.65446,FOODS_1_002_CA_1_evaluation
9,AutoARIMA,0.656686,FOODS_1_002_CA_1_evaluation


Since this works smoothly, we can now bring the execution cluster.

## Running on a Coiled Cluster

[Coiled](https://www.coiled.io/) is one of the easiest ways to get a Dask cluster. You need to be logged in to create a Dask cluster. If you don't want to follow along here, you can still run the code on a `LocalCluster` similar to the previous sections.

Coiled requires a software environment which it will use to spin up cluster workers. Below is how to create a software environment for this tutorial.

```python
import coiled

coiled.create_software_environment(
    name="pydata",
    conda=["python=3.8.13"],
    pip=["fugue[dask]", "statsforecast", "scikit-learn", "numpy==1.22.4"],
)
```

To create a cluster with Coiled after logging in and creating the software environment, you can use the following code insead of the LocalCluster.

```python
from dask.distributed import Client
from coiled import Cluster

cluster = Cluster(name="pydata", software="pydata", n_workers=10)
client = Client(cluster)
```

The Coiled cluster will give us enough resources to run all of the models for each timeseries. For the sake of demo purposes on LocalCluster, we will just run the full workflow for the first 100 timeseries. Whether you use a LocalCluster or the Coiled cluster, this will return a local Pandas DataFrame.

On the local machine, the first 100 timeseries can take roughly 5 minutes.

In [41]:
%%capture
from dask.distributed import Client, LocalCluster

cluster = LocalCluster(threads_per_worker=1)
client = Client(cluster)

results = transform(compressed.iloc[0:100], 
                    process, 
                    schema="models:str,metric:float,unique_id:str", 
                    engine=client, 
                    partition={"by": "unique_id"})
results = results.compute()

Perhaps you already have a cluster running?
Hosting the HTTP server on port 50489 instead
  x -= np.dot(xreg, par[narma + np.arange(ncxreg)])
  x -= np.dot(xreg, par[narma + np.arange(ncxreg)])


Now that we have computed all of the metrics, we can sort them and get the best model for each timeseries.

In [42]:
best_models = results.sort_values('metric', ascending=True).groupby("unique_id").first()
best_models['models'].value_counts()

AutoARIMA         40
Naive             34
CrostonClassic    12
ADIDA             10
IMAPA              4
Name: models, dtype: int64

These results show the benefit of trying multiple models for each timeseries. Some of them may be better modelled by `AutoArima` while some of them may be better modelled by other things like `CrostonClassic`. Though the `Naive` model did surprisingly well, it might be due to a lack of tuning with the other models.

## Conclusion

That wraps up the large scale time series forecasting tutorial. In this tutorial you have seen:

1. How to use StatsForecast and scale it to Spark, Dask, or Ray
2. How to train multiple models for each individual timeseries
3. How to preprocess and iterate on large scale data with Fugue
4. Principals of Hierarchical forecasting
5. Some practices around distributed computing.

Please feel free to reach out for any comments/questions!

[Fugue Slack](http://slack.fugue.ai/)

[Nixtla Slack](https://join.slack.com/t/nixtlaworkspace/shared_invite/zt-135dssye9-fWTzMpv2WBthq8NK0Yvu6A)

My email:
kdykho@gmail.com