# Parallel demand forecasting at scale using Ray Tune and Pyspark UDF

Batch training and tuning are common tasks in machine learning use-cases. They require training simple models, on data batches, typcially corresponding to different locations, products, etc. Batch training can take less time to process all the data at once, but only if those batches can run in parallel!

This notebook showcases how to conduct batch forecasting with NeuralProphet. NeuralProphet is a popular open-source library developed by Facebook and designed for automatic forecasting of univariate time series data. 
<br></br>
<div style="text-align: center; line-height: 5; padding-top: 20px;  padding-bottom: 20px;">
  <img src="https://docs.ray.io/en/master/_images/batch-training.svg" alt='Push compute' height="300" width="400">
</div>

For the data, we will use the M5 walmart dataset.This popular tabular dataset contains historical sales of products for different locations and regions in USA

### Import the libraries

In [None]:
import random
import multiprocessing
import numpy as np
from datetime import timedelta, date
import traceback
import math
import timeit
import torch
import mlflow
from mlflow.tracking import MlflowClient

import pandas as pd
import neuralprophet
from neuralprophet import NeuralProphet, set_log_level


# importing hyperopt and ray
from hyperopt import hp
import ray
from ray import tune
from ray.tune.schedulers import AsyncHyperBandScheduler
from ray.tune.search.hyperopt import HyperOptSearch
from ray.tune.stopper import TimeoutStopper
from ray.tune.search.concurrency_limiter import ConcurrencyLimiter
from ray.runtime_env import RuntimeEnv

from hyperopt.pyll import scope
from itertools import product
from ray.util.multiprocessing import Pool

import multiprocessing
num_cpus = multiprocessing.cpu_count()
print(num_cpus)

import logging

from delta.tables import DeltaTable
from pyspark.sql.functions import col
from pyspark.sql import Window
from sklearn.metrics import mean_squared_error
from datetime import datetime
import json
import time, os
from ast import literal_eval
from pytorch_lightning.callbacks.early_stopping import EarlyStopping
import pyspark.sql.functions as F





8


## Ray Cluster Setup :

There are 2 options of settig up ray cluster on databricks:
- **Using the setup_ray_cluster API**
  ```
    setup_ray_cluster(
      num_worker_nodes=MAX_NUM_WORKER_NODES,
      num_cpus_per_node=int(num_cpu_cores_per_worker),
      num_gpus_per_node=int(num_gpu_per_worker),
      collect_log_to_path=RAY_LOG_DIR #ray_collected_logs
      )

    ray.init(address='auto')

    num_workers = (ray.cluster_resources()["CPU"]/num_cpu_cores_per_worker)
  ```
- **Using init scripts**
  ```
  #!/bin/bash
  #RAY PORT
  RAY_PORT=9339

  # install ray
  # Install additional ray libraries
  /databricks/python/bin/pip install ray[tune,default]==2.5.1
  /databricks/python/bin/pip install neuralprophet==0.6.0
  /databricks/python/bin/pip install protobuf==3.20.0

  #Create the location if not exists
  mkdir -p /local_disk0/tmp/ray/job

  # If starting on the Spark driver node, initialize the Ray head node
  # If starting on the Spark worker node, connect to the head Ray node
  if [ ! -z $DB_IS_DRIVER ] && [ $DB_IS_DRIVER = TRUE ] ; then
    echo "Starting the head node"
    ulimit -n 1000000
    ray start  --head --min-worker-port=20000 --max-worker-port=25000 --temp-dir="/local_disk0/tmp/ray/job"  --port=$RAY_PORT --dashboard-port=8501 --dashboard-host="0.0.0.0" --include-dashboard=true --num-cpus=0 --num-gpus=0
  else
    sleep 40
    ulimit -n 1000000
    echo "Starting the non-head node - connecting to $DB_DRIVER_IP:$RAY_PORT"
    ray start  --min-worker-port=20000 --max-worker-port=25000 --temp-dir="/local_disk0/tmp/ray/job" --address="$DB_DRIVER_IP:$RAY_PORT"  
fi
  ```

## Visualize the ray dashboard

In [None]:
%run ./ray_dashboard

In [None]:
ray.init('auto', ignore_reinit_error=True)
ray_resources = ray.cluster_resources()
# ray.shutdown()

# Reading walmart data

In [None]:
sdf_walmart = spark.read.format('delta').load('dbfs:/walmart/data/clean_data/final_cleaned_filtered/')

In [None]:
display(sdf_walmart)

## Get The cluster information

In [None]:
import requests
ctx = dbutils.notebook.entry_point.getDbutils().notebook().getContext()
host_name = ctx.tags().get("browserHostName").get()
host_token = ctx.apiToken().get()
cluster_id = ctx.tags().get("clusterId").get()

response = requests.get(
    f'https://{host_name}/api/2.0/clusters/get?cluster_id={cluster_id}',
    headers={'Authorization': f'Bearer {host_token}'}
  ).json()
cluster_info = {
  'driver_type': response['driver_node_type_id'],
  'worker_type': response['node_type_id'],
  'num_workers': response['num_workers'],
  'spark_ver': response['spark_version'],
  'cluster_memory_mb': response['cluster_memory_mb']
}
cluster_info=json.dumps(cluster_info)
print(cluster_info)

{"driver_type": "Standard_NC8as_T4_v3", "worker_type": "Standard_NC8as_T4_v3", "num_workers": 2, "spark_ver": "12.2.x-gpu-ml-scala2.12", "cluster_memory_mb": 172032}


### Create Random time-series per model for the Demo.
make sure num_items and items_per_model are divisible

In [None]:
# Experiment setup - make sure num_items and items_per_model are divisible
num_items = 200  # Max number of item time series to load, full dataset has 30490 which is overkill
items_per_model = 100  # Number of item time series per model
num_batches = 1  # num trials = max_concurrent_trials * num_batches

In [None]:

window_spec = Window.orderBy('state_id', 'store_id', 'cat_id', 'dept_id', 'item_id')
sdf_walmart_with_model_num = sdf_walmart.withColumn("item_num", F.dense_rank().over(window_spec))  # A unique item number based on the window
sdf_walmart_with_model_num = sdf_walmart_with_model_num.filter(sdf_walmart_with_model_num.item_num <= num_items)
sdf_walmart_with_model_num = sdf_walmart_with_model_num.withColumn("model_num", F.ceil(F.col("item_num") / items_per_model))
sdf_walmart_with_model_num = sdf_walmart_with_model_num.withColumn('y', F.col('sell_price')*F.col('sale_quantity'))
sdf_walmart_with_model_num.cache()
print(sdf_walmart.count())
sdf_walmart_with_model_num.display()

# Many model Forecasting with Ray Tune and Pyspark UDF
Ray Tune is a powerful library for hyperparameter tuning, designed to simplify the development of distributed applications. It allows you to efficiently sample hyperparameters and get optimized results on your objective function. Ray Tune provides a variety of state-of-the-art hyperparameter tuning algorithms for optimizing model performance. 

To use Ray Tune for hyperparameter tuning, you can follow these steps:
- Define your training function and objective function.
- Specify the hyperparameters and their search space.
- Define the pyspark udf function which runs ray tune for each Hierarchial model for the chosen search algorithm and scheduler.
- Run the pyspark job and get the result

## Step 1 : Define the training and objective function

In [None]:
def ray_trial(config, df):
  """
  Single ray trial of parameter config 
  This runs a NeuralProphet model based on the given config and then loads  
  """
  print(f"cpu_resources_per_trial : {cpu_resources_per_trial}")
  print(f"num_threads : {torch.get_num_threads()}")
  torch.set_num_threads(int(cpu_resources_per_trial))
  test_cutoff = df['ds'].max() - pd.Timedelta(days=7)
  df_train = df[df['ds'] < test_cutoff]
  df_test = df
  trainer_config = {}
  # Define the Model (it can be any model in our case we use NeuralProphet)
  model = NeuralProphet(
      accelerator='auto',
      trainer_config=trainer_config,
      **config
  )
  start = timeit.default_timer()
  # Train model
  progress = model.fit(
      df=df_train,
      checkpointing =True,
      freq="D",
      metrics=['RMSE'],
      progress='bar'
    )
  total_time = timeit.default_timer()-start
  print("duration of model fit: ", total_time)
  print(f"df length: {df_test.shape}")
  # Validate the model and get the RMSE Score
  forecast_week = model.predict(df[df['ds'] >= (df['ds'].max() - pd.Timedelta(days=360))])
  forecast_week = forecast_week[forecast_week['ds'] >= test_cutoff]
  forecast_week.y.fillna(0, inplace=True)
  forecast_week.yhat1.fillna(0, inplace=True)
  test_rmse = mean_squared_error(forecast_week.yhat1.tolist(), forecast_week.y.tolist(), squared=False)
  d_p = progress.loc[progress['RMSE'] == progress['RMSE'].min()].to_dict(orient='records')
  tune.report(RMSE=test_rmse,
              Loss = d_p[0]['Loss'],
              checkpoint = model)

## Step 2 : Define the search space

In [None]:
space_str = """
{
  "learning_rate": tune.uniform(0.001, 1),
  "n_changepoints": 10,
  "n_lags": 3, 
  'drop_missing': True,
  'impute_rolling': 1000,
  'newer_samples_weight': tune.uniform(1, 7),
  'batch_size': 128,
  "ar_layers": tune.choice([[64,64,64],[128,128,128],[256,256,256]]),
  'epochs': 10
}
"""

## Step 3 : Define the pyspark udf function which runs ray tune for each Hierarchial model for the chosen search algorithm and scheduler.

In [None]:
def udf_parallel_hpt_tune(keys, df):
  """
  Single ray trial of parameter config 
  This runs a NeuralProphet model based on the given config and then loads  
  """
  start = timeit.default_timer()
  model_num = keys[0]
  df['date_time'] = pd.to_datetime(df['date_time'], format='%Y-%m-%d')
  df = df.sort_values(by='date_time', ascending=True)
  df = df.rename(columns={'date_time': 'ds', 'item_num': 'ID'})
  df = df[['ID', 'ds', 'y']]
  space = eval(space_str)
  ray.init(
    address="auto",
    ignore_reinit_error=True,
    include_dashboard=False,
    log_to_driver=True,
  )
  tune_resources = {"CPU": cpu_resources_per_trial} if \
                     gpu_resources_per_trial == 0 else \
                      {"CPU": cpu_resources_per_trial,\
                                     "GPU": gpu_resources_per_trial}
  algo = HyperOptSearch(
    space,
    metric='RMSE',
    mode="min",
  )
  scheduler = AsyncHyperBandScheduler()
  analysis = tune.run(
    tune.with_parameters(ray_trial, df=df),
    search_alg=algo,
    scheduler=scheduler,
    metric="RMSE",
    mode="min",
    max_concurrent_trials=max_concurrent_trials,
    num_samples=max_concurrent_trials * num_batches,
    reuse_actors=True,
    resources_per_trial=tune.PlacementGroupFactory(
      [tune_resources],
      strategy="PACK"
    ),
  )
  best_trial = analysis.get_best_trial(scope='last-5-avg')
  print(f'aaa:{best_trial.last_result["config"]}')
  current_experiment=dict(mlflow.get_experiment_by_name(experiment_location))
  experiment_id=current_experiment['experiment_id']
  with mlflow.start_run(
    run_name=f"model_{str(model_num)}",
    experiment_id = experiment_id,
    tags={"mlflow.parentRunId": run_id},
    description="run inside udf",
) as child_run:
    for key ,value in best_trial.last_result['config'].items():
      mlflow.log_param( key = key,
                        value = str(value))
    mlflow.log_metric(key = 'rmse', value = best_trial.last_result['RMSE'])
    mlflow.log_metric(key = 'Loss', value = best_trial.last_result['Loss'])
    # mlflow.pyfunc.log_model(best_trial.last_result['checkpoint'], "model")
  print(f'aaa:{best_trial.last_result}')
  best_rmse = best_trial.last_result['RMSE']
  return pd.DataFrame([{
    'model_num': model_num,
    'model_HPT_time': str(timeit.default_timer()-start), 
    'num_datapoints': df['y'].count(),
    'RMSE': best_rmse,
    'space': str(best_trial.last_result['config'])
    }])

When using pyspark udf we have to define the schema of the output generated by the udf

In [None]:
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType

results_schema = StructType([
  # StructField('state_id', StringType(), True),
  # StructField('store_id', StringType(), True),
  # StructField('cat_id', StringType(), True),
  # StructField('dept_id', StringType(), True),
  StructField('model_num', IntegerType(), True),
  StructField('model_HPT_time', StringType(), True),
  StructField('num_datapoints', IntegerType(), True),
  StructField('RMSE', DoubleType(), True),
  StructField('space', StringType(), True),
])

## Step 4 : Run the pyspark job and get the results.

Here we dynamically specify the resources to be used based on the cluster choosen betweem CPU and GPU

In [None]:
max_concurrent_trials = 1
num_models = sdf_walmart_with_model_num.select(F.max('model_num')).collect()[0][0]
print(f"num models : {num_models}")
total_concurrent_trials = num_models*max_concurrent_trials
print(f"Total concurrent Trials: {total_concurrent_trials}")
gpu_resources_per_trial = ray_resources['GPU']/total_concurrent_trials if 'GPU' in ray_resources.keys() else 0
cpu_resources_per_trial = min(int(math.ceil((ray_resources['CPU']/num_models)/max_concurrent_trials)),16)
print(f'gpu_resources_per_trial:{gpu_resources_per_trial}\ncpu_resources_per_trial:{cpu_resources_per_trial}')
print(f"***Creating DF to ray tune on {num_models} models with {items_per_model} item time seies per model***")
# sdf_walmart_with_model_num.groupBy('state_id', 'store_id', 'cat_id', 'dept_id', 'item_id').agg(F.first('item_num').alias('item_num'), F.first('model_num').alias('model_num')).display()

num models : 2
Total concurrent Trials: 2
gpu_resources_per_trial:1.0
cpu_resources_per_trial:8
***Creating DF to ray tune on 2 models with 100 item time seies per model***


In [None]:
sdf_walmart_with_model_set = sdf_walmart_with_model_num.select('model_num','store_id','item_id'). \
                                      groupBy('model_num').agg(F.collect_set('store_id').alias('store_id'),
                                      F.collect_set('item_id').alias('item_id'), ).display()

In [None]:
spark.conf.set('spark.sql.adaptive.coalescePartitions.enabled', 'false')
spark.conf.set('spark.sql.adaptive.enabled', 'false')
spark.conf.set('spark.databricks.optimizer.adaptive.enabled', 'false')
spark.conf.set('spark.sql.shuffle.partitions', f'{num_models}')
num_models

Out[17]: 2

In [None]:
experiment_location =  '/Users/puneet.jain@databricks.com/Dais_many_model_forecasting_demo'
experiment_name = 'hpt'

In [None]:
get_or_create_experiment(experiment_location)
run_id = get_new_run(experiment_location, experiment_name+"_"+str(date.today()))

In [None]:
start = timeit.default_timer()
with mlflow.start_run(run_id=run_id):
  results = sdf_walmart_with_model_num.repartition(num_models).\
            groupBy(['model_num']).applyInPandas(func=udf_parallel_hpt_tune, schema=results_schema)
display(results.toPandas())
total_time = timeit.default_timer()-start
print("duration of execution: ", total_time)

model_num,model_HPT_time,num_datapoints,RMSE,space
2,106.74395973799994,160913,6.752816227323234,"{  ""learning_rate"": tune.uniform(0.001, 1),  ""n_changepoints"": 10,  ""n_lags"": 3, 'drop_missing': True,  'impute_rolling': 300,  'newer_samples_weight': tune.uniform(1, 7),  'batch_size': 128,  ""ar_layers"": tune.choice([[64,64,64],[128,128,128],[256,256,256]]),  'epochs': 10 }"
1,110.89095562900002,168711,16.790971167565026,"{  ""learning_rate"": tune.uniform(0.001, 1),  ""n_changepoints"": 10,  ""n_lags"": 3, 'drop_missing': True,  'impute_rolling': 300,  'newer_samples_weight': tune.uniform(1, 7),  'batch_size': 128,  ""ar_layers"": tune.choice([[64,64,64],[128,128,128],[256,256,256]]),  'epochs': 10 }"


duration of execution:  119.70532675099992


We see for each trial the cluster utilizes on 25% of the computation
<br></br>
<div style="text-align: center; line-height: 5; padding-top: 20px;  padding-bottom: 20px;">
  <img src="https://raw.githubusercontent.com/puneet-jain159/Image_dump/test/ray_25_dash_utilization.png" alt='Push compute' height="1000" width="1600">
</div>

In [None]:
# Here we see that only 25% percent of the GPU is being utlized to when running the trails

# Improving the cluster utilization

Since we are consuming 25% let us increase the number of trails to 4 to utlize the GPU Cluster properly

In [None]:
max_concurrent_trials = 4
num_models = sdf_walmart_with_model_num.select(F.max('model_num')).collect()[0][0]
print(f"num models : {num_models}")
total_concurrent_trials = num_models*max_concurrent_trials
print(f"Total concurrent Trials: {total_concurrent_trials}")
gpu_resources_per_trial = ray_resources['GPU']/total_concurrent_trials if 'GPU' in ray_resources.keys() else 0
cpu_resources_per_trial = min(int(math.ceil((ray_resources['CPU']/num_models)/max_concurrent_trials)),16)
print(f'gpu_resources_per_trial:{gpu_resources_per_trial}\ncpu_resources_per_trial:{cpu_resources_per_trial}')
print(f"***Creating DF to ray tune on {num_models} models with {items_per_model} item time seies per model***")

num models : 2
Total concurrent Trials: 8
gpu_resources_per_trial:0.25
cpu_resources_per_trial:2
***Creating DF to ray tune on 2 models with 100 item time seies per model***


In [None]:
run_id = get_new_run(experiment_location, experiment_name+"_"+str(date.today())+"_max_trials_" +str(max_concurrent_trials))

start = timeit.default_timer()
with mlflow.start_run(run_id=run_id):
  results = sdf_walmart_with_model_num.repartition(num_models).\
            groupBy(['model_num']).applyInPandas(func=udf_parallel_hpt_tune, schema=results_schema)
display(results.toPandas())
total_time = timeit.default_timer()-start
print("duration of execution: ", total_time)

model_num,model_HPT_time,num_datapoints,RMSE,space
2,134.20414228000004,160913,6.752816227323234,"{  ""learning_rate"": tune.uniform(0.001, 1),  ""n_changepoints"": 10,  ""n_lags"": 3, 'drop_missing': True,  'impute_rolling': 300,  'newer_samples_weight': tune.uniform(1, 7),  'batch_size': 128,  ""ar_layers"": tune.choice([[64,64,64],[128,128,128],[256,256,256]]),  'epochs': 10 }"
1,139.08439712500012,168711,16.790971167565026,"{  ""learning_rate"": tune.uniform(0.001, 1),  ""n_changepoints"": 10,  ""n_lags"": 3, 'drop_missing': True,  'impute_rolling': 300,  'newer_samples_weight': tune.uniform(1, 7),  'batch_size': 128,  ""ar_layers"": tune.choice([[64,64,64],[128,128,128],[256,256,256]]),  'epochs': 10 }"


duration of execution:  147.90069438499995


We now see full utlization of the cluster
<br></br>
<div style="text-align: center; line-height: 5; padding-top: 20px;  padding-bottom: 20px;">
  <img src="https://raw.githubusercontent.com/puneet-jain159/Image_dump/test/ray_100_dash_utlization_max.png" alt='Push compute' height="1000" width="1600">
</div>