# Use Synapse Spark 3.x to train multiple models and perform scalable inferencing

## 1.Preperation 

### Environment preperation
1. Prepare a Synapse Spark Pool 3.x Medium
2. Prepare a Azure ML workspace 
3. Prepare a service principal with secret key registered in keyvault. The service principal should have contributor access to your Azure ML workspace

### Load util classes and models


In [1]:
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.

import numpy as np
import pandas as pd
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.pipeline import Pipeline


class ColumnDropper(TransformerMixin, BaseEstimator):
    """
    Transformer for dropping columns from a dataframe.
    """
    def __init__(self, drop_columns):
        assert isinstance(drop_columns, list), "Expected drop_columns input to be a list"
        self.drop_columns = drop_columns

    def fit(self, X, y=None):
        return self

    def transform(self, X):
        return X.drop(columns=self.drop_columns, errors='ignore')


class SimpleCalendarFeaturizer(TransformerMixin, BaseEstimator):
    """
    Transformer for adding a simple calendar feature derived from the input time index.
    For demonstration purposes, the transform creates a feature for week of the year.
    """
    def fit(self, X, y=None):
        return self

    def transform(self, X):
        return X.assign(Week_Year=X.index.isocalendar().week.values)


class SimpleLagger(TransformerMixin, BaseEstimator):
    """
    Simple lagging transform that creates lagged values of the target column.
    This transform uses information known at fit time to create lags at transform time
    to maintain lag feature continuity across train/test splits.
    """

    def __init__(self, target_column_name, lag_orders=None):
        my_lag_orders = lag_orders if lag_orders is not None else [1]
        assert isinstance(my_lag_orders, list) and min(my_lag_orders) > 0, \
            'Expected lag_orders to be a list of integers all greater than zero'
        self.target_column_name = target_column_name
        self.lag_orders = my_lag_orders

    def fit(self, X, y=None):
        """
        Fit the lagger transform.
        This transform caches the tail of the training data up to the maximum lag order
        so that lag features can be created on test data.
        """
        assert self.target_column_name in X.columns, \
            "Target column is missing from the input dataframe."

        X_fit = X.sort_index(ascending=True)
        max_lag_order = max(self.lag_orders)
        self._train_tail = X_fit.iloc[-max_lag_order:]
        self._column_order = self._train_tail.columns

        return self

    def transform(self, X):
        """
        Create lag features of the target for the input data.
        The transform uses data cached at fit time, if necessary, to provide
        continuity of lag features.
        """
        X_trans = X.copy()
        added_target = False
        if self.target_column_name not in X_trans.columns:
            X_trans[self.target_column_name] = np.nan
            added_target = True

        # decide if we need to use the training cache i.e. are we in a test scenario?
        train_latest = self._train_tail.index.max()
        X_earliest = X_trans.index.min()
        if train_latest < X_earliest:
            # X data is later than the training period - append the cached tail of training data
            X_trans = pd.concat((self._train_tail, X_trans[self._column_order]))

        # Ensure data is sorted by time before making lags
        X_trans.sort_index(ascending=True, inplace=True)

        # Make the lag features
        for lag_order in self.lag_orders:
            X_trans['lag_' + str(lag_order)] = X_trans[self.target_column_name].shift(lag_order)

        # Return transformed dataframe with the same time range as X
        if added_target:
            X_trans.drop(columns=[self.target_column_name], inplace=True)
        return X_trans.loc[X.index]


class SklearnWrapper(BaseEstimator):
    """
    Wrapper class around an sklearn model.
    This wrapper formats DataFrame input for scikit-learn regression estimators.
    """
    def __init__(self, sklearn_model, target_column_name):
        self.sklearn_model = sklearn_model
        self.target_column_name = target_column_name

    def fit(self, X, y=None):
        """
        Fit the sklearn model on the input dataframe.
        """
        assert self.target_column_name in X.columns, \
            "Target column is missing from the input dataframe."

        # Drop rows with missing values and check that we still have data left
        X_fit = X.dropna()
        assert len(X_fit) > 0, 'Training dataframe is empty after dropping NA values'

        # Check that data is all numeric type
        # This simple pipeline does not handle categoricals or other non-numeric types
        full_col_set = set(X_fit.columns)
        numeric_col_set = set(X_fit.select_dtypes(include=[np.number]).columns)
        assert full_col_set == numeric_col_set, \
            ('Found non-numeric columns {} in the input dataframe. Please drop them prior to modeling.'
             .format(full_col_set - numeric_col_set))

        # Fit the scikit model
        y_fit = X_fit.pop(self.target_column_name)
        self._column_order = X_fit.columns
        self.sklearn_model.fit(X_fit.values, y_fit.values)
        return self

    def transform(self, X):
        """
        Identity transform for fit_transform pipelines.
        """
        return X

    def predict(self, X):
        """
        Predict on the input dataframe.
        Return a Pandas Series with time in the index
        """
        # Check the column set in input is compatible with fitted model
        input_col_set = set(X.columns) - set([self.target_column_name])
        assert input_col_set == set(self._column_order), \
            'Input columns {} do not match expected columns {}'.format(input_col_set, self._column_order)

        X_pred = X.drop(columns=[self.target_column_name], errors='ignore')[self._column_order]
        X_pred.dropna(inplace=True)
        assert len(X_pred) > 0, 'Prediction dataframe is empty after dropping NA values'
        y_raw = self.sklearn_model.predict(X_pred.values)
        return pd.Series(data=y_raw, index=X_pred.index)


class SimpleForecaster(TransformerMixin):
    """
    Forecasting class for a simple, 1-step ahead forecaster.
    This class encapsulates fitting a transform pipeline with an sklearn regression estimator
    and producing in-sample and out-of-sample forecasts.
    Out-of-sample forecasts apply the model recursively over the prediction set to produce forecasts
    at any horizon.
    The forecaster assumes that the time-series data is regularly sampled on a contiguous interval;
    it does not handle missing values.
    """

    def __init__(self, transform_steps, estimator, target_column_name, time_column_name):
        assert estimator is not None, "Estimator cannot be None."
        assert transform_steps is None or isinstance(transform_steps, list), \
            "transform_steps should be a list"
        estimator_step = ('estimator', SklearnWrapper(estimator, target_column_name))
        steps = transform_steps + [estimator_step] if transform_steps is not None else [estimator_step]
        self.pipeline = Pipeline(steps=steps)

        self.target_column_name = target_column_name
        self.time_column_name = time_column_name

    def _recursive_forecast(self, X):
        """
        Apply the trained model resursively for out-of-sample predictions.
        """
        X_fcst = X.sort_index(ascending=True)
        if self.target_column_name not in X_fcst.columns:
            X_fcst[self.target_column_name] = np.nan

        forecasts = pd.Series(np.nan, index=X_fcst.index)
        for fcst_date in X_fcst.index.get_level_values(self.time_column_name):
            # Get predictions on an expanding window ending on the current forecast date
            y_fcst = self.pipeline.predict(X_fcst[X_fcst.index <= fcst_date])

            # Set the current forecast
            forecasts.loc[fcst_date] = y_fcst.loc[fcst_date]

            # Set the actual value to the forecast value so that lag features can be made on next iteration
            X_fcst.loc[fcst_date, self.target_column_name] = y_fcst.loc[fcst_date]

        return forecasts

    def fit(self, X):
        """
        Fit the forecasting pipeline.
        This method assumes the target is a column in the input, X.
        """
        assert list(X.index.names) == [self.time_column_name], \
            "Expected time column to comprise input dataframe index."
        self._latest_training_date = X.index.max()
        self.pipeline.fit(X)
        return self

    def transform(self, X):
        """
        Transform the data through the pipeline.
        """
        return self.pipeline.transform(X)

    def forecast(self, X):
        """
        Make forecasts over the prediction frame, X.
        X can contain in-sample and out-of-sample data.
        For out-of-sample data, the 1-step-ahead model is recursively applied.
        Returns forecasts for the target in a pd.Series object with the same time index as X.
        np.nan values will be returned for dates where a forecast could not be found.
        """
        assert list(X.index.names) == [self.time_column_name], \
            "Expected time column to comprise input dataframe index."
        # Get in-sample forecasts if requested
        X_insamp = X[X.index <= self._latest_training_date]
        forecasts_insamp = pd.Series()
        if len(X_insamp) > 0:
            forecasts_insamp = self.pipeline.predict(X_insamp)

        # Get out-of-sample forecasts
        X_fcst = X[X.index > self._latest_training_date]
        forecasts = pd.Series()
        if len(X_fcst) > 0:
            # Need to iterate/recurse 1-step forecasts here
            forecasts = self._recursive_forecast(X_fcst)
        forecasts = pd.concat((forecasts_insamp, forecasts))

        return forecasts.reindex(X.index)

StatementMeta(spark31, 4, 1, Finished, Available)

## Load data 

https://azure.microsoft.com/en-us/services/open-datasets/catalog/sample-oj-sales-simulated

In [1]:
data =spark.read.format("csv").option("header", True).load("wasbs://ojsales-simulatedcontainer@azureopendatastorage.blob.core.windows.net/oj_sales_data/Store10*.csv")

StatementMeta(spark31, 1, 1, Finished, Available)

In [2]:
#Write to local delta for fast reading
data.write.format("delta").mode("overwrite").saveAsTable("OJ_Sales_Data")

StatementMeta(spark31, 4, 2, Finished, Available)

NameError: name 'data' is not defined

In [4]:
%%sql 
select * from OJ_Sales_Data limit 10

StatementMeta(spark31, 1, 4, Finished, Available)

<Spark SQL result set with 10 rows and 7 fields>

In [10]:
%sql select count (distinct store, brand) from OJ_Sales_Data 

"count(DISTINCT store, brand)"
300


In [11]:
%sql select distinct brand from OJ_Sales_Data 

brand
dominicks
tropicana
minute.maid


## Warm-up exersize

1. Read about Pandas Function APIs: https://docs.microsoft.com/en-us/azure/databricks/spark/latest/spark-sql/pandas-function-apis
2. Answer following questions:
  - What is the advantage of this technology vs. regular Python UDF?
  - What is the role of Apache Arrow in this?
  - What is the use of iterator and yield vs. regular list and return?

Using the OJ sales dataset above, use Pandas Function APIs, pick out for each store and brand the best selling week in the form of week_number-yyyy.
The result set look like this:

In [5]:
import pandas as pd
result_sample= pd.DataFrame({"store": [1066, 1067, 1068],'Brand':['dominicks', 'tropicana','tropicana'],"Best_Selling_Week": ['23-1992', '24-1991','24-1991']})
display(result_sample)

StatementMeta(spark31, 1, 5, Finished, Available)

SynapseWidget(Synapse.DataFrame, a62d323b-99f4-4ba8-b6b8-140fa393b8e0)

In [6]:
#Solution
#The Pandas function
import pandas as pd
def best_selling_week(inputdf):
  store =inputdf['Store'][0]
  brand = inputdf['Brand'][0]
  best_week_row = inputdf.iloc[inputdf['Quantity'].argmax()]
  best_week =str(best_week_row['WeekStarting'].isocalendar()[1]) +"-"+ str(best_week_row['WeekStarting'].isocalendar()[0])
  qty = best_week_row['Quantity']

  return pd.DataFrame({"Store":[store], "Brand":[brand], "Best_Selling_Week":best_week, "Qty":[qty]})
  

df = spark.sql("select to_timestamp(WeekStarting) WeekStarting, float(Quantity), Brand,Revenue, Store from OJ_Sales_Data")
df = df.repartition(200) #to increase parallelism
#Use the pandas function in group by

result = df.groupby(["Brand","Store"]).applyInPandas(best_selling_week, schema="Store string, Brand string, Best_Selling_Week string, Qty float")
display(result.head(10))

StatementMeta(spark31, 1, 6, Finished, Available)

SynapseWidget(Synapse.DataFrame, f917d030-68aa-4170-b080-c81bcc3c7c82)

## 2. Useful knowledge

### The Map function

You perform map operations with pandas instances by DataFrame.mapInPandas() in order to transform an iterator of pandas.DataFrame to another iterator of pandas.DataFrame that represents the current PySpark DataFrame and returns the result as a PySpark DataFrame.

The underlying function takes and outputs an iterator of pandas.DataFrame. It can return the output of arbitrary length in contrast to some pandas UDFs such as Series to Series pandas UDF.

In [2]:
spark.conf.set(' spark.sql.execution.arrow.maxRecordsPerBatch', 100)
#Default is 10000 which in some cases may defeat the purpose of parallelism

StatementMeta(spark31, 2, 2, Finished, Available)

In [3]:
def parallel_transform(df_iterator):
  for df in df_iterator:
    df['Week'] = df['WeekStarting'].map(lambda x: str(x.isocalendar()[1]) +"-"+ str(x.isocalendar()[0]))
    df.drop("WeekStarting", inplace=True, axis=1)
    yield df
df = spark.sql("select to_timestamp(WeekStarting) WeekStarting, float(Quantity), Brand,Revenue, Store from OJ_Sales_Data")

result = df.mapInPandas(parallel_transform, schema="Store string, Brand string, Week string, Quantity float, Revenue string")

display(result.head(10))

StatementMeta(spark31, 2, 3, Finished, Available)

SynapseWidget(Synapse.DataFrame, 21fbd9d0-3984-4d57-92cf-93cc1dbeb7ea)

### Many Model Training

In [4]:
#prepare values to broadcast
tenant_id ='' 
service_principal_id=''
service_principal_password=''
subscription_id = ''
# Azure Machine Learning resource group NOT the managed resource group
resource_group = '' 

#Azure Machine Learning workspace name, NOT Azure Databricks workspace
workspace_name = ''  

StatementMeta(spark31, 2, 4, Finished, Available)

### Test with a single store & brand combination (single time series)

In [27]:
%run ./timeseries_utilities


In [7]:
#Getting data
import pandas as pd
train_data_df = spark.sql("select to_timestamp(WeekStarting) WeekStarting, float(Quantity), Brand,Revenue, Store from OJ_Sales_Data where Store = '1066' and Brand ='tropicana'").toPandas()


StatementMeta(spark31, 2, 7, Finished, Available)

In [10]:
display(train_data_df.head(10))

StatementMeta(spark31, 1, 10, Finished, Available)

SynapseWidget(Synapse.DataFrame, c660445e-5186-42a7-ab1e-a42e7cfaca4d)

In [8]:

#Getting data for one table to test the utility function
from sklearn.linear_model import LinearRegression
from sklearn.metrics import mean_squared_error, mean_absolute_error
import joblib
import os
target_column= 'Quantity'
timestamp_column= 'WeekStarting'
timeseries_id_columns= [ 'Store', 'Brand']
drop_columns=['Revenue', 'Store', 'Brand']
model_type= 'lr'
model_name=train_data_df['Store'][0]+"_"+train_data_df['Brand'][0]
test_size=20
# 1.0 Read the data from CSV - parse timestamps as datetime type and put the time in the index
data = train_data_df \
        .set_index('WeekStarting') \
        .sort_index(ascending=True)

# 2.0 Split the data into train and test sets
train = data[:-test_size]
test = data[-test_size:]

# 3.0 Create and fit the forecasting pipeline
# The pipeline will drop unhelpful features, make a calendar feature, and make lag features
lagger = SimpleLagger(target_column, lag_orders=[1, 2, 3, 4])
transform_steps = [('column_dropper', ColumnDropper(drop_columns)),
                   ('calendar_featurizer', SimpleCalendarFeaturizer()), ('lagger', lagger)]
forecaster = SimpleForecaster(transform_steps, LinearRegression(), target_column, timestamp_column)
forecaster.fit(train)
print('Featurized data example:')
display(forecaster.transform(train).head())


StatementMeta(spark31, 2, 8, Finished, Available)

Featurized data example:

  Unsupported type in conversion from Arrow: uint32
Attempting non-optimization as 'spark.sql.execution.arrow.pyspark.fallback.enabled' is set to true.

SynapseWidget(Synapse.DataFrame, 7b9064d4-0adb-4162-a978-f60bec951379)

In [13]:
from azureml.core.authentication import ServicePrincipalAuthentication
from azureml.core import Workspace
from azureml.core import Model

import cloudpickle 
import joblib
sp_auth = ServicePrincipalAuthentication(tenant_id =tenant_id,
                                         service_principal_id=service_principal_id,
                                         service_principal_password=service_principal_password)
# Instantiate Azure Machine Learning workspace
ws = Workspace.get(name=workspace_name,
                   subscription_id=subscription_id,
                   resource_group=resource_group,auth= sp_auth)


# 4.0 Get predictions on test set
forecasts = forecaster.forecast(test)
compare_data = test.assign(forecasts=forecasts).dropna()

# 5.0 Calculate accuracy metrics for the fit
mse = mean_squared_error(compare_data[target_column], compare_data['forecasts'])
rmse = np.sqrt(mse)
mae = mean_absolute_error(compare_data[target_column], compare_data['forecasts'])
actuals = compare_data[target_column].values
preds = compare_data['forecasts'].values
mape = np.mean(np.abs((actuals - preds) / actuals) * 100)

# 7.0 Train model with full dataset
forecaster.fit(data)

# 8.0 Save the forecasting pipeline
with open(model_name, mode='wb') as file:
   joblib.dump(forecaster, file)

model = Model.register(workspace=ws, model_name=model_name, model_path=model_name, tags={'mse':str(mse), 'mape': str(mape), 'rmse': str(rmse)})



StatementMeta(spark31, 2, 13, Finished, Available)

Registering model 1066_tropicana

####Scale it up with many model training with function Pandas API

In [None]:
#Prepare the core training function

from azureml.core.authentication import ServicePrincipalAuthentication
from azureml.core import Workspace
from azureml.core import Model
import cloudpickle
#do not use joblib to dump because it will have issue with multi-level object
def many_model_train(train_data_df):
  sp_auth = ServicePrincipalAuthentication(tenant_id =tenant_id,
                                         service_principal_id=service_principal_id,
                                         service_principal_password=service_principal_password)
  # Instantiate Azure Machine Learning workspace
  ws = Workspace.get(name=workspace_name,
                     subscription_id=subscription_id,
                     resource_group=resource_group,auth= sp_auth)


  target_column= 'Quantity'
  timestamp_column= 'WeekStarting'
  timeseries_id_columns= [ 'Store', 'Brand']
  drop_columns=['Revenue', 'Store', 'Brand']
  model_type= 'lr'
  #Get the store and brand. They are unique from the group so just the first value is sufficient
  store = train_data_df['Store'][0]
  brand = train_data_df['Brand'][0]

  model_name=store+"_"+brand
  test_size=20
  # 1.0 Format the input data from group by, put the time in the index
  data = train_data_df \
          .set_index('WeekStarting') \
          .sort_index(ascending=True)

  # 2.0 Split the data into train and test sets
  train = data[:-test_size]
  test = data[-test_size:]

  # 3.0 Create and fit the forecasting pipeline
  # The pipeline will drop unhelpful features, make a calendar feature, and make lag features
  lagger = SimpleLagger(target_column, lag_orders=[1, 2, 3, 4])
  transform_steps = [('column_dropper', ColumnDropper(drop_columns)),
                     ('calendar_featurizer', SimpleCalendarFeaturizer()), ('lagger', lagger)]
  forecaster = SimpleForecaster(transform_steps, LinearRegression(), target_column, timestamp_column)
  forecaster.fit(train)

  # 4.0 Get predictions on test set
  forecasts = forecaster.forecast(test)
  compare_data = test.assign(forecasts=forecasts).dropna()

  # 5.0 Calculate accuracy metrics for the fit
  mse = mean_squared_error(compare_data[target_column], compare_data['forecasts'])
  rmse = np.sqrt(mse)
  mae = mean_absolute_error(compare_data[target_column], compare_data['forecasts'])
  actuals = compare_data[target_column].values
  preds = compare_data['forecasts'].values
  mape = np.mean(np.abs((actuals - preds) / actuals) * 100)

  # 7.0 Train model with full dataset
  forecaster.fit(data)

  # 8.0 Save the pipeline and register model to AML
  with open(model_name, mode='wb') as file:
     cloudpickle.dump(forecaster, file)#   
  model = Model.register(workspace=ws, model_name=model_name, model_path=model_name, tags={'mse':str(mse), 'mape': str(mape), 'rmse': str(rmse)})
  
  return pd.DataFrame({'Store':store,'Brand':brand, 'mse':[mse], 'mape': [mape], 'rmse': [rmse], 'model_name':[model_name]})


StatementMeta(, , , Cancelled, )

In [None]:
df = spark.sql("select to_timestamp(WeekStarting) WeekStarting, float(Quantity), Brand,Revenue, Store from OJ_Sales_Data")
df = df.repartition(200) #to increase parallelism
result = df.groupby(["Brand","Store"]).applyInPandas(many_model_train, schema="Store string, Brand string, mse float, mape float, rmse float, model_name string ")


StatementMeta(, , , Cancelled, )

### Many Model Inferencing: Can you score using multiple models in parallel?

#### Exersize : please prepare a function pandas UDF to produce forecast for mutliple store and brand given the test data

### Solution

#### Quick test the forecast function in utils with just one time series

In [18]:
#Test forecast for one time series, need to run command 27-30 first
ts_id_dict = {id_col: str(data[id_col].iloc[0]) for id_col in timeseries_id_columns}
forecasts=forecaster.forecast(data)
prediction_df = forecasts.to_frame(name='Prediction')
prediction_df =prediction_df.reset_index().assign(**ts_id_dict)
display(prediction_df.head(10))

StatementMeta(spark31, 2, 18, Finished, Available)

SynapseWidget(Synapse.DataFrame, 5f0fbd4c-1d80-4598-858a-1f7dbd58e536)



#### Main solution using map in pandas & loading models from AML workspace

In [19]:
#Prepare the core forecast function in pandas function API  

from azureml.core.authentication import ServicePrincipalAuthentication
from azureml.core import Workspace
from azureml.core import Model
import cloudpickle
#do not use joblib to dump because it will have issue with multi-level object
def many_model_forecast(input_data_df):
  sp_auth = ServicePrincipalAuthentication(tenant_id =tenant_id,
                                         service_principal_id=service_principal_id,
                                         service_principal_password=service_principal_password)
  # Instantiate Azure Machine Learning workspace
  ws = Workspace.get(name=workspace_name,
                     subscription_id=subscription_id,
                     resource_group=resource_group,auth= sp_auth)


  target_column= 'Quantity'
  timestamp_column= 'WeekStarting'
  timeseries_id_columns= [ 'Store', 'Brand']
  drop_columns=['Revenue', 'Store', 'Brand']
  data = input_data_df \
        .set_index(timestamp_column) \
        .sort_index(ascending=True)
  #Prepare loading model from Azure ML, get the latest model by default
  model_name=data['Store'][0]+"_"+data['Brand'][0]
  model = Model(ws, model_name)
  model.download(exist_ok =True)
  with open(model_name, 'rb') as f:
    forecaster = cloudpickle.load(f)

#   Get predictions 
  #This is to append the store and brand column to the result
  ts_id_dict = {id_col: str(data[id_col].iloc[0]) for id_col in timeseries_id_columns}
  forecasts=forecaster.forecast(data)
  prediction_df = forecasts.to_frame(name='Prediction')
  prediction_df =prediction_df.reset_index().assign(**ts_id_dict)
  
  return prediction_df


StatementMeta(spark31, 2, 19, Finished, Available)

In [20]:
#Load data to score, for now, it's same train data but in reality, it should be different.
df = spark.sql("select to_timestamp(WeekStarting) WeekStarting, float(Quantity), Brand,Revenue, Store from OJ_Sales_Data")
df = df.repartition(200) #to increase parallelism
prediction_result = df.groupby(["Brand","Store"]).applyInPandas(many_model_forecast, schema="WeekStarting date, Store string, Brand string, Prediction float")

StatementMeta(spark31, 2, 20, Finished, Available)

In [21]:
display(prediction_result.head(10))

StatementMeta(spark31, 2, 21, Finished, Available)

SynapseWidget(Synapse.DataFrame, c9c325ea-4958-46a3-b170-a16000a7416f)

##### Small ask: can you add a actual qty column to the result if the data to score has it?