# Train a classifier to determine product seasonality


First, check if XGBoost is properly installed in the Spark environment (should have version 1.0.2)


In [None]:
import pip
pip.get_installed_distributions()

Import all necessary libraries.


In [None]:
from sklearn.preprocessing import StandardScaler, MinMaxScaler, Normalizer
from sklearn.decomposition import PCA
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
from sklearn.externals import joblib

from xgboost import XGBClassifier

from onnxmltools.convert import convert_xgboost
from onnxmltools.convert.common.data_types import FloatTensorType

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

from azureml.core.experiment import Experiment
from azureml.core.workspace import Workspace
from azureml.train.automl.run import AutoMLRun
from azureml.train.automl import AutoMLConfig
from azureml.automl.runtime.onnx_convert import OnnxConverter
from azureml.core.model import Model
from azureml.core import Environment
from azureml.core.model import InferenceConfig
from azureml.core.webservice import AciWebservice
from azureml.core.webservice import Webservice

## Exploratory data analysis (basic stats)

Create Spark temporary views for sales and products.

In [None]:
%%spark
val df = spark.read.sqlanalytics("SQLPool01.wwi_mcw.SaleSmall") 
df.createOrReplaceTempView("sale")

val df2 = spark.read.sqlanalytics("SQLPool01.wwi_mcw.Product") 
df2.createOrReplaceTempView("product")
display(df2)

Load daily product sales from the SQL pool.


In [None]:
sqlQuery = """
SELECT
    P.ProductId
    ,P.Seasonality
    ,S.TransactionDateId
    ,COUNT(*) as TransactionItemsCount
FROM
    sale S
    JOIN product P ON
        S.ProductId = P.ProductId
GROUP BY
    P.ProductId
    ,P.Seasonality
    ,S.TransactionDateId
"""

prod_df = spark.sql(sqlQuery)
prod_df.cache()

Check the number of records in the data frame (should be around 2.6 million rows).

In [None]:
prod_df.count()

Display some statistics about the data frame.


In [None]:
display(prod_df.describe())

Pivot the data frame to make daily sale items counts columns. 


In [None]:
prod_prep_df = prod_df.groupBy(['ProductId', 'Seasonality']).pivot('TransactionDateId').sum('TransactionItemsCount').toPandas()

Clean up the nulls and take a look at the result.


In [None]:
prod_prep_df = prod_prep_df.fillna(0)
prod_prep_df.head(10)

Isolate features and prediction classes.

Standardize features by removing the mean and scaling to unit variance.


In [None]:
X = prod_prep_df.iloc[:, 2:].values
y = prod_prep_df['Seasonality'].values

X_scale = StandardScaler().fit_transform(X)

## Use PCA for dimensionality reduction

Perform dimensionality reduction using Principal Components Analysis and two target components.


In [None]:
pca = PCA(n_components=2)
principal_components = pca.fit_transform(X_scale)
principal_components = MinMaxScaler().fit_transform(principal_components)

pca_df = pd.DataFrame(data = principal_components, columns = ['pc1', 'pc2'])
pca_df = pd.concat([pca_df, prod_prep_df[['Seasonality']]], axis = 1)

Display the products data frame in two dimensions (mapped to the two principal components).

Note the clear separation of clusters.


In [None]:
fig = plt.figure(figsize = (6,6))
ax = fig.add_subplot(1,1,1) 
ax.set_xlabel('Principal Component 1', fontsize = 15)
ax.set_ylabel('Principal Component 2', fontsize = 15)
ax.set_title('2 component PCA', fontsize = 20)
targets = [1, 2, 3]
colors = ['r', 'g', 'b']
for target, color in zip(targets,colors):
    indicesToKeep = pca_df['Seasonality'] == target
    ax.scatter(pca_df.loc[indicesToKeep, 'pc1']
               , pca_df.loc[indicesToKeep, 'pc2']
               , c = color
               , s = 1)
ax.legend(['All Season Products', 'Summer Products', 'Winter Products'])
ax.plot([-0.05, 1.05], [0.77, 1.0], linestyle=':', linewidth=1, color='y')
ax.plot([-0.05, 1.05], [0.37, 0.6], linestyle=':', linewidth=1, color='y')
ax.grid()

plt.show()
plt.close()

Redo the Principal Components Analysis, this time with twenty dimensions.


In [None]:
def col_name(x):
    return f'f{x:02}'

pca = PCA(n_components=20)
principal_components = pca.fit_transform(X_scale)
principal_components = MinMaxScaler().fit_transform(principal_components)

X = pd.DataFrame(data = principal_components, columns = list(map(col_name, np.arange(0, 20))))
pca_df = pd.concat([X, prod_prep_df[['ProductId']]], axis = 1)
pca_automl_df = pd.concat([X, prod_prep_df[['Seasonality']]], axis = 1)

X = X[:4500]
y = prod_prep_df['Seasonality'][:4500]
pca_automl_df = pca_automl_df[:4500]

Save the PCA components to the SQL pool (you may ignore any warnings).


In [None]:
pca_sdf = spark.createDataFrame(pca_df)
pca_sdf.createOrReplaceTempView("productpca")

In [None]:
%%spark
val df = spark.sqlContext.sql("select * from productpca")
df.write.sqlanalytics("SQLPool01.wwi_mcw.ProductPCA", Constants.INTERNAL)

## Train ensemble of trees classifier (using XGBoost)

Split into test and training data sets.


In [None]:
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.25, random_state=123)

Train the ensemble classifier using XGBoost.


In [None]:
model = XGBClassifier()
model.fit(X_train, y_train)

Perform predictions with the newly trained model.


In [None]:
y_pred = model.predict(X_test)

Calculate the accuracy of the model using test data.


In [None]:
accuracy = accuracy_score(y_test, y_pred)
print("Accuracy: %.2f%%" % (accuracy * 100.0))

Convert trained model to ONNX format.


In [None]:
initial_types = [
    ('input', FloatTensorType([1, 20]))
]

onnx_model = convert_xgboost(model, initial_types=initial_types)

## Train classifier using Auto ML


Configure the connection to the Azure Machine Learning workspace. The Azure portal provides all the values below.


In [None]:
subscription_id= '#SUBSCRIPTION_ID#'
resource_group= '#RESOURCE_GROUP_NAME#'
workspace_name= '#AML_WORKSPACE_NAME#'
ws = Workspace(subscription_id = subscription_id, resource_group = resource_group, workspace_name = workspace_name)
ws.write_config()

experiment = Experiment(ws, "ASAMCW_Product_Seasonality")

Configure the Automated Machine Learning experiment and start it (will run on local compute resources).


In [None]:
automl_classifier_config = AutoMLConfig(
        task='classification',        
        experiment_timeout_minutes=15,
        enable_onnx_compatible_models=True,
        training_data=pca_automl_df,
        label_column_name='Seasonality',
        n_cross_validations=5,
        enable_voting_ensemble=False,
        enable_stack_ensemble=False
        )

local_run = experiment.submit(automl_classifier_config, show_output=True)

Retrieve and persist the best model


In [None]:
best_run, fitted_model = local_run.get_output()
model_path = 'product_seasonality.pkl'
joblib.dump(fitted_model, model_path)

## Operationalize
Operationalization means getting the model into the cloud so that others can run it after you close the notebook. We will create a docker container running on Azure Container Instances (ACI) to host our model.


### Register the model


In [None]:
model_name = "ProductSeasonalityClassifier"
registered_model = Model.register(model_path = model_path, # this points to a local file
                       model_name = model_name, # name the model is registered as
                       tags = {'type': "classification"}, 
                       description = "Product Seasonality Classifier", 
                       workspace = ws)

## Develop the scoring script
For deployment, we need a function that will exercise the model with a sampling of data. This has been created for us and is available as part of the model's output artifacts.


In [None]:
scoring_script = """
import json
import pickle
import numpy as np
import pandas as pd
import azureml.train.automl
from sklearn.externals import joblib
from azureml.core.model import Model

def init():
    global model
    # This name is model.id of model that we want to deploy deserialize the model file back
    # into a sklearn model
    model_path = Model.get_model_path(model_name = 'ProductSeasonalityClassifier')
    model = joblib.load(model_path)

def run(input_json):     
    try:
        data_df = pd.read_json(input_json)       
        # Get the predictions...
        prediction = model.predict(data_df)
        prediction = json.dumps(prediction.tolist())
    except Exception as e:
        prediction = str(e)
    return prediction
"""
exec(scoring_script)
with open("scoring_script.py", "w") as file:
    file.write(scoring_script)
    
scoring_script_file_name = 'scoring_script.py'

In [None]:
#test locally
json_test_data = X_test.to_json()
init()
run(json_test_data)

## Deploy the model as a Web Service on Azure Container Instances (ACI)


In [None]:
# obtain conda dependencies from the automl run and save the file locally
environment_config_file = 'conda_env.yml'
best_run.download_file('outputs/conda_env_v_1_0_0.yml', environment_config_file)
with open('conda_env.yml', 'r') as f:
    print(f.read())

# create the environment based on the saved conda dependencies file
myenv = Environment.from_conda_specification(name="myenv", file_path=environment_config_file)
myenv.register(workspace=ws)

In [None]:
# Configure and deploy the web service to Azure Container Instances
inference_config = InferenceConfig(environment=myenv, entry_script=scoring_script_file_name)
aci_config = AciWebservice.deploy_configuration(cpu_cores = 1, memory_gb= 2, tags = { 'type' : 'automl-classification'}, description='AutoML Product Seasonality Classifier Service')
aci_service_name = 'automl-product-classifier-01'
aci_service = Model.deploy(ws, aci_service_name, [registered_model], inference_config, aci_config)
aci_service.wait_for_deployment(show_output = True)
print(aci_service.state)

## Call the Web Service


In [None]:
aci_service.run(json_test_data)