#Technique in Spark to train multiple models and perform scalable inferencing

## 1.Preperation before class

### Environment preperation
1. Prepare a Databricks instance with Ls8s_v2
2. Prepare a Azure ML workspace 
3. Prepare a service principal with secret key registered in keyvault. The service principal should have contributor access to your Azure ML workspace

### Download data from Microsoft Open Dataset

https://azure.microsoft.com/en-us/services/open-datasets/catalog/sample-oj-sales-simulated

In [6]:
data =spark.read.format("csv").option("header", True).load("wasbs://ojsales-simulatedcontainer@azureopendatastorage.blob.core.windows.net/oj_sales_data/Store10*.csv")

In [7]:
#Write to local delta for fast reading
data.write.format("delta").saveAsTable("OJ_Sales_Data")

In [8]:
%sql optimize OJ_Sales_Data zorder by store, brand

path,metrics
,"List(1, 16, List(456483, 456483, 456483.0, 1, 456483), List(29570, 37082, 36254.5625, 16, 580073), 0, List(minCubeSize(107374182400), List(0, 0), List(16, 580073), 0, List(16, 580073), 1, null), 1, 16, 0, false)"


In [9]:
%sql select * from OJ_Sales_Data limit 10

WeekStarting,Store,Brand,Quantity,Advert,Price,Revenue
1990-06-14,1094,minute.maid,17892,1,2.09,37394.28
1990-06-21,1094,minute.maid,14053,1,2.45,34429.850000000006
1990-06-28,1094,minute.maid,17341,1,2.47,42832.27
1990-07-05,1094,minute.maid,17194,1,2.42,41609.48
1990-07-12,1094,minute.maid,17945,1,2.39,42888.55
1990-07-19,1094,minute.maid,17371,1,2.3,39953.3
1990-07-26,1094,minute.maid,9825,1,2.36,23187.0
1990-08-02,1094,minute.maid,10849,1,2.58,27990.42
1990-08-09,1094,minute.maid,12084,1,2.0,24168.0
1990-08-16,1094,minute.maid,10484,1,2.32,24322.88


In [10]:
%sql select count (distinct store, brand) from OJ_Sales_Data 

"count(DISTINCT store, brand)"
300


In [11]:
%sql select distinct brand from OJ_Sales_Data 

brand
dominicks
tropicana
minute.maid


## Pre-training exersize

1. Read about Pandas Function APIs: https://docs.microsoft.com/en-us/azure/databricks/spark/latest/spark-sql/pandas-function-apis
2. Answer following questions:
  - What is the advantage of this technology vs. regular Python UDF?
  - What is the role of Apache Arrow in this?
  - What is the use of iterator and yield vs. regular list and return?

Using the OJ sales dataset above, use Pandas Function APIs, pick out for each store and brand the best selling week in the form of week_number-yyyy.
The result set look like this:

In [15]:
import pandas as pd
result_sample= pd.DataFrame({"store": [1066, 1067, 1068],'Brand':['dominicks', 'tropicana','tropicana'],"Best_Selling_Week": ['23-1992', '24-1991','24-1991']})
display(result_sample)

store,Brand,Best_Selling_Week
1066,dominicks,23-1992
1067,tropicana,24-1991
1068,tropicana,24-1991


In [16]:
#Solution
#The Pandas function
import pandas as pd
def best_selling_week(inputdf):
  store =inputdf['Store'][0]
  brand = inputdf['Brand'][0]
  best_week_row = inputdf.iloc[inputdf['Quantity'].argmax()]
  best_week =str(best_week_row['WeekStarting'].isocalendar()[1]) +"-"+ str(best_week_row['WeekStarting'].isocalendar()[0])
  qty = best_week_row['Quantity']

  return pd.DataFrame({"Store":[store], "Brand":[brand], "Best_Selling_Week":best_week, "Qty":[qty]})
  

df = spark.sql("select to_timestamp(WeekStarting) WeekStarting, float(Quantity), Brand,Revenue, Store from OJ_Sales_Data")
df = df.repartition(200) #to increase parallelism
#Use the pandas function in group by

result = df.groupby(["Brand","Store"]).applyInPandas(best_selling_week, schema="Store string, Brand string, Best_Selling_Week string, Qty float")
display(result.head(10))

Store,Brand,Best_Selling_Week,Qty
1031,tropicana,48-1991,19916.0
1021,minute.maid,11-1991,19947.0
1074,tropicana,38-1991,19932.0
1077,minute.maid,15-1992,19934.0
1078,minute.maid,44-1991,19978.0
1019,minute.maid,41-1991,19685.0
1090,tropicana,44-1990,19997.0
1099,tropicana,30-1990,19576.0
1014,minute.maid,32-1991,19995.0
1020,minute.maid,43-1991,19996.0


###Optional reading: we'll forecast models and utilities from the Many Models repo (AML PRS method) to compare. To prepare yourself on the training day, it's useful to get familiar the class and libraries there.

In [18]:
https://github.com/microsoft/solution-accelerator-many-models/blob/master/Custom_Script/scripts/timeseries_utilities.py
https://github.com/microsoft/solution-accelerator-many-models/blob/master/Custom_Script/scripts/train.py
https://github.com/microsoft/solution-accelerator-many-models/blob/master/Custom_Script/scripts/forecast.py

##2. Training content

###The Map function

You perform map operations with pandas instances by DataFrame.mapInPandas() in order to transform an iterator of pandas.DataFrame to another iterator of pandas.DataFrame that represents the current PySpark DataFrame and returns the result as a PySpark DataFrame.

The underlying function takes and outputs an iterator of pandas.DataFrame. It can return the output of arbitrary length in contrast to some pandas UDFs such as Series to Series pandas UDF.

In [22]:
spark.conf.set(' spark.sql.execution.arrow.maxRecordsPerBatch', 100)
#Default is 10000 which in some cases may defeat the purpose of parallelism

In [23]:
def parallel_transform(df_iterator):
  for df in df_iterator:
    df['Week'] = df['WeekStarting'].map(lambda x: str(x.isocalendar()[1]) +"-"+ str(x.isocalendar()[0]))
    df.drop("WeekStarting", inplace=True, axis=1)
    yield df
df = spark.sql("select to_timestamp(WeekStarting) WeekStarting, float(Quantity), Brand,Revenue, Store from OJ_Sales_Data")

result = df.mapInPandas(parallel_transform, schema="Store string, Brand string, Week string, Quantity float, Revenue string")

display(result.head(10))

Store,Brand,Week,Quantity,Revenue
1094,minute.maid,24-1990,17892.0,37394.28
1094,minute.maid,25-1990,14053.0,34429.850000000006
1094,minute.maid,26-1990,17341.0,42832.27
1094,minute.maid,27-1990,17194.0,41609.48
1094,minute.maid,28-1990,17945.0,42888.55
1094,minute.maid,29-1990,17371.0,39953.3
1094,minute.maid,30-1990,9825.0,23187.0
1094,minute.maid,31-1990,10849.0,27990.42
1094,minute.maid,32-1990,12084.0,24168.0
1094,minute.maid,33-1990,10484.0,24322.88


###Many Model Training

In [25]:
#prepare values to broadcast
tenant_id ='72f988bf-86f1-41af-91ab-2d7cd011db47' 
service_principal_id='af883abf-89dd-4889-bdb3-1ee84f68465e'
service_principal_password=dbutils.secrets.get('scope1','app01-pass')
subscription_id = '0e9bace8-7a81-4922-83b5-d995ff706507'
# Azure Machine Learning resource group NOT the managed resource group
resource_group = 'azureml' 

#Azure Machine Learning workspace name, NOT Azure Databricks workspace
workspace_name = 'ws01ent'  

### Test with a single store & brand combination (single time series)

In [27]:
%run ./timeseries_utilities


In [28]:
#Getting data
import pandas as pd
train_data_df = spark.sql("select to_timestamp(WeekStarting) WeekStarting, float(Quantity), Brand,Revenue, Store from OJ_Sales_Data where Store = '1066' and Brand ='tropicana'").toPandas()


In [29]:
display(train_data_df.head(10))

WeekStarting,Quantity,Brand,Revenue,Store
1990-06-14T00:00:00.000+0000,13198.0,tropicana,29695.5,1066
1990-06-21T00:00:00.000+0000,12188.0,tropicana,27179.24,1066
1990-06-28T00:00:00.000+0000,10453.0,tropicana,25505.32,1066
1990-07-05T00:00:00.000+0000,13390.0,tropicana,35349.6,1066
1990-07-12T00:00:00.000+0000,12798.0,tropicana,29691.36,1066
1990-07-19T00:00:00.000+0000,18476.0,tropicana,49146.16,1066
1990-07-26T00:00:00.000+0000,16244.0,tropicana,35087.04,1066
1990-08-02T00:00:00.000+0000,16057.0,tropicana,35807.11,1066
1990-08-09T00:00:00.000+0000,16888.0,tropicana,35127.04,1066
1990-08-16T00:00:00.000+0000,14045.0,tropicana,30056.300000000003,1066


In [30]:

#Getting data for one table to test the utility function
from sklearn.linear_model import LinearRegression
from sklearn.metrics import mean_squared_error, mean_absolute_error
import joblib
import os
target_column= 'Quantity'
timestamp_column= 'WeekStarting'
timeseries_id_columns= [ 'Store', 'Brand']
drop_columns=['Revenue', 'Store', 'Brand']
model_type= 'lr'
model_name=train_data_df['Store'][0]+"_"+train_data_df['Brand'][0]
test_size=20
# 1.0 Read the data from CSV - parse timestamps as datetime type and put the time in the index
data = train_data_df \
        .set_index('WeekStarting') \
        .sort_index(ascending=True)

# 2.0 Split the data into train and test sets
train = data[:-test_size]
test = data[-test_size:]

# 3.0 Create and fit the forecasting pipeline
# The pipeline will drop unhelpful features, make a calendar feature, and make lag features
lagger = SimpleLagger(target_column, lag_orders=[1, 2, 3, 4])
transform_steps = [('column_dropper', ColumnDropper(drop_columns)),
                   ('calendar_featurizer', SimpleCalendarFeaturizer()), ('lagger', lagger)]
forecaster = SimpleForecaster(transform_steps, LinearRegression(), target_column, timestamp_column)
forecaster.fit(train)
print('Featurized data example:')
display(forecaster.transform(train).head())


Quantity,Week_Year,lag_1,lag_2,lag_3,lag_4
13198.0,24,,,,
12188.0,25,13198.0,,,
10453.0,26,12188.0,13198.0,,
13390.0,27,10453.0,12188.0,13198.0,
12798.0,28,13390.0,10453.0,12188.0,13198.0


In [31]:
from azureml.core.authentication import ServicePrincipalAuthentication
from azureml.core import Workspace
from azureml.core import Model

import cloudpickle 

sp_auth = ServicePrincipalAuthentication(tenant_id =tenant_id,
                                         service_principal_id=service_principal_id,
                                         service_principal_password=service_principal_password)
# Instantiate Azure Machine Learning workspace
ws = Workspace.get(name=workspace_name,
                   subscription_id=subscription_id,
                   resource_group=resource_group,auth= sp_auth)


# 4.0 Get predictions on test set
forecasts = forecaster.forecast(test)
compare_data = test.assign(forecasts=forecasts).dropna()

# 5.0 Calculate accuracy metrics for the fit
mse = mean_squared_error(compare_data[target_column], compare_data['forecasts'])
rmse = np.sqrt(mse)
mae = mean_absolute_error(compare_data[target_column], compare_data['forecasts'])
actuals = compare_data[target_column].values
preds = compare_data['forecasts'].values
mape = np.mean(np.abs((actuals - preds) / actuals) * 100)

# 7.0 Train model with full dataset
forecaster.fit(data)

# 8.0 Save the forecasting pipeline
with open(model_name, mode='wb') as file:
   cloudpickle.dump(forecaster, file)

model = Model.register(workspace=ws, model_name=model_name, model_path=model_name, tags={'mse':str(mse), 'mape': str(mape), 'rmse': str(rmse)})



####Scale it up with many model training with function Pandas API

In [33]:
#Prepare the core training function

from azureml.core.authentication import ServicePrincipalAuthentication
from azureml.core import Workspace
from azureml.core import Model
import cloudpickle
#do not use joblib to dump because it will have issue with multi-level object
def many_model_train(train_data_df):
  sp_auth = ServicePrincipalAuthentication(tenant_id =tenant_id,
                                         service_principal_id=service_principal_id,
                                         service_principal_password=service_principal_password)
  # Instantiate Azure Machine Learning workspace
  ws = Workspace.get(name=workspace_name,
                     subscription_id=subscription_id,
                     resource_group=resource_group,auth= sp_auth)


  target_column= 'Quantity'
  timestamp_column= 'WeekStarting'
  timeseries_id_columns= [ 'Store', 'Brand']
  drop_columns=['Revenue', 'Store', 'Brand']
  model_type= 'lr'
  #Get the store and brand. They are unique from the group so just the first value is sufficient
  store = train_data_df['Store'][0]
  brand = train_data_df['Brand'][0]

  model_name=store+"_"+brand
  test_size=20
  # 1.0 Format the input data from group by, put the time in the index
  data = train_data_df \
          .set_index('WeekStarting') \
          .sort_index(ascending=True)

  # 2.0 Split the data into train and test sets
  train = data[:-test_size]
  test = data[-test_size:]

  # 3.0 Create and fit the forecasting pipeline
  # The pipeline will drop unhelpful features, make a calendar feature, and make lag features
  lagger = SimpleLagger(target_column, lag_orders=[1, 2, 3, 4])
  transform_steps = [('column_dropper', ColumnDropper(drop_columns)),
                     ('calendar_featurizer', SimpleCalendarFeaturizer()), ('lagger', lagger)]
  forecaster = SimpleForecaster(transform_steps, LinearRegression(), target_column, timestamp_column)
  forecaster.fit(train)

  # 4.0 Get predictions on test set
  forecasts = forecaster.forecast(test)
  compare_data = test.assign(forecasts=forecasts).dropna()

  # 5.0 Calculate accuracy metrics for the fit
  mse = mean_squared_error(compare_data[target_column], compare_data['forecasts'])
  rmse = np.sqrt(mse)
  mae = mean_absolute_error(compare_data[target_column], compare_data['forecasts'])
  actuals = compare_data[target_column].values
  preds = compare_data['forecasts'].values
  mape = np.mean(np.abs((actuals - preds) / actuals) * 100)

  # 7.0 Train model with full dataset
  forecaster.fit(data)

  # 8.0 Save the pipeline and register model to AML
  with open(model_name, mode='wb') as file:
     cloudpickle.dump(forecaster, file)#   
  model = Model.register(workspace=ws, model_name=model_name, model_path=model_name, tags={'mse':str(mse), 'mape': str(mape), 'rmse': str(rmse)})
  
  return pd.DataFrame({'Store':store,'Brand':brand, 'mse':[mse], 'mape': [mape], 'rmse': [rmse], 'model_name':[model_name]})


In [34]:
df = spark.sql("select to_timestamp(WeekStarting) WeekStarting, float(Quantity), Brand,Revenue, Store from OJ_Sales_Data")
df = df.repartition(200) #to increase parallelism
result = df.groupby(["Brand","Store"]).applyInPandas(many_model_train, schema="Store string, Brand string, mse float, mape float, rmse float, model_name string ")


In [35]:
display(result.head(10))

Store,Brand,mse,mape,rmse,model_name
1031,tropicana,10501595.0,20.475067138671875,3240.616455078125,1031_tropicana
1021,minute.maid,8323296.5,17.98264503479004,2885.012451171875,1021_minute.maid
1074,tropicana,8422692.0,18.610090255737305,2902.1875,1074_tropicana
1077,minute.maid,12016312.0,22.2121524810791,3466.455322265625,1077_minute.maid
1078,minute.maid,6714000.0,13.475154876708984,2591.138671875,1078_minute.maid
1019,minute.maid,10599569.0,23.259544372558597,3255.69775390625,1019_minute.maid
1090,tropicana,5647451.5,15.766434669494627,2376.436767578125,1090_tropicana
1099,tropicana,8836949.0,19.13409805297852,2972.70068359375,1099_tropicana
1014,minute.maid,7331310.0,14.901358604431152,2707.63916015625,1014_minute.maid
1020,minute.maid,10048782.0,20.763259887695312,3169.9814453125,1020_minute.maid


###Many Model Inferencing: Can you score using multiple models in parallel?

#### Home work: please prepare a function pandas UDF to produce forecast for mutliple store and brand given the test data

### Solution

#### Quick test the forecast function in utils with just one time series

In [40]:
#Test forecast for one time series, need to run command 27-30 first
ts_id_dict = {id_col: str(data[id_col].iloc[0]) for id_col in timeseries_id_columns}
forecasts=forecaster.forecast(data)
prediction_df = forecasts.to_frame(name='Prediction')
prediction_df =prediction_df.reset_index().assign(**ts_id_dict)
display(prediction_df.head(10))

WeekStarting,Prediction,Store,Brand
1990-06-14T00:00:00.000+0000,,1066,tropicana
1990-06-21T00:00:00.000+0000,,1066,tropicana
1990-06-28T00:00:00.000+0000,,1066,tropicana
1990-07-05T00:00:00.000+0000,,1066,tropicana
1990-07-12T00:00:00.000+0000,14410.436058105348,1066,tropicana
1990-07-19T00:00:00.000+0000,14569.927951313914,1066,tropicana
1990-07-26T00:00:00.000+0000,14580.11668534655,1066,tropicana
1990-08-02T00:00:00.000+0000,14873.030648117356,1066,tropicana
1990-08-09T00:00:00.000+0000,15168.46838892596,1066,tropicana
1990-08-16T00:00:00.000+0000,14837.636937709074,1066,tropicana


#### Main solution using map in pandas & loading models from AML workspace

In [42]:
#Prepare the core forecast function in pandas function API  

from azureml.core.authentication import ServicePrincipalAuthentication
from azureml.core import Workspace
from azureml.core import Model
import cloudpickle
#do not use joblib to dump because it will have issue with multi-level object
def many_model_forecast(input_data_df):
  sp_auth = ServicePrincipalAuthentication(tenant_id =tenant_id,
                                         service_principal_id=service_principal_id,
                                         service_principal_password=service_principal_password)
  # Instantiate Azure Machine Learning workspace
  ws = Workspace.get(name=workspace_name,
                     subscription_id=subscription_id,
                     resource_group=resource_group,auth= sp_auth)


  target_column= 'Quantity'
  timestamp_column= 'WeekStarting'
  timeseries_id_columns= [ 'Store', 'Brand']
  drop_columns=['Revenue', 'Store', 'Brand']
  data = input_data_df \
        .set_index(timestamp_column) \
        .sort_index(ascending=True)
  #Prepare loading model from Azure ML, get the latest model by default
  model_name=data['Store'][0]+"_"+data['Brand'][0]
  model = Model(ws, model_name)
  model.download(exist_ok =True)
  with open(model_name, 'rb') as f:
    forecaster = cloudpickle.load(f)

#   Get predictions 
  #This is to append the store and brand column to the result
  ts_id_dict = {id_col: str(data[id_col].iloc[0]) for id_col in timeseries_id_columns}
  forecasts=forecaster.forecast(data)
  prediction_df = forecasts.to_frame(name='Prediction')
  prediction_df =prediction_df.reset_index().assign(**ts_id_dict)
  
  return prediction_df


In [43]:
#Load data to score, for now, it's same train data but in reality, it should be different.
df = spark.sql("select to_timestamp(WeekStarting) WeekStarting, float(Quantity), Brand,Revenue, Store from OJ_Sales_Data")
df = df.repartition(200) #to increase parallelism
prediction_result = df.groupby(["Brand","Store"]).applyInPandas(many_model_forecast, schema="WeekStarting date, Store string, Brand string, Prediction float")

In [44]:
display(prediction_result.head(10))

WeekStarting,Store,Brand,Prediction
1990-06-14,1031,tropicana,
1990-06-21,1031,tropicana,
1990-06-28,1031,tropicana,
1990-07-05,1031,tropicana,
1990-07-12,1031,tropicana,13858.3623046875
1990-07-19,1031,tropicana,13675.5673828125
1990-07-26,1031,tropicana,14244.9736328125
1990-08-02,1031,tropicana,14784.28515625
1990-08-09,1031,tropicana,14830.6640625
1990-08-16,1031,tropicana,14164.19140625


##### Small ask: can you add a actual qty column to the result if the data to score has it?