# Model training and registration
This notebook show the process for training the model, converting the model to ONNX and uploading the ONNX model to Azure Storage.

## Explore the training data
The following cells load the source CSV file into a Spark DataFrame and create a temporary view that can be used to query the data with Spark SQL.


In [35]:
df = spark.read.load('abfss://dev@<primary_storage>.dfs.core.windows.net/bronze/wwi-factsale.csv', format="csv"
## If header exists uncomment line bellow
, header=True, sep="|"
)

StatementMeta(SparkPool01, 0, 35, Finished, Available)



In [36]:
df.createOrReplaceTempView("facts")

StatementMeta(SparkPool01, 0, 36, Finished, Available)



In [37]:
display(spark.sql("SELECT * FROM facts WHERE `Customer Key` == '11' ORDER BY `Stock Item Key`"))

StatementMeta(SparkPool01, 0, 37, Finished, Available)

SynapseWidget(Synapse.DataFrame, 7f7e7f57-1e5a-4f32-878f-73a44f090d1c)

## Predict Quantity given Customer Key and Stock Item Key
In the following cells we load a subset of the data that just contains the fields needed for training. 



In [38]:
from pyspark.sql.functions import col
df3 = spark.sql("SELECT double(`Customer Key`) as customerkey, double(`Stock Item Key`) as stockitemkey, double(`Quantity`) as quantity FROM facts").where(col("quantity").isNotNull())
df3.cache()
print("Number of records:", df3.count())
df3.show(10)

StatementMeta(SparkPool01, 0, 38, Finished, Available)

Number of records: 12228005
+-----------+------------+--------+
|customerkey|stockitemkey|quantity|
+-----------+------------+--------+
|        0.0|       156.0|     4.0|
|        0.0|        52.0|    90.0|
|        0.0|        54.0|    10.0|
|        0.0|       141.0|    72.0|
|        0.0|       185.0|     7.0|
|        0.0|       148.0|     6.0|
|      141.0|        51.0|    80.0|
|      141.0|       216.0|     5.0|
|      141.0|        94.0|    10.0|
|      141.0|       206.0|     5.0|
+-----------+------------+--------+
only showing top 10 rows

Now, we split our DataFrame into training and testing DataFrames.


In [39]:
trainingFraction = 0.7
testingFraction = (1-trainingFraction)
seed = 42

# Split the dataframe into test and training dataframes
df_train, df_test = df3.randomSplit([trainingFraction, testingFraction], seed=seed)

StatementMeta(SparkPool01, 0, 39, Finished, Available)



Next, we package the data into the format expected by SPark ML's LinearRegression. It requires a DataFrame with two columns- `features` and a column with the labels to predict (`quantity` in this case).


In [40]:
from pyspark.ml.feature import VectorAssembler

vectorAssembler = VectorAssembler(inputCols = ['customerkey', 'stockitemkey'], outputCol = 'features')

StatementMeta(SparkPool01, 0, 40, Finished, Available)



In the following cell, we create the LinearRegression model.

In [41]:
from pyspark.ml.regression import LinearRegression

lin_reg = LinearRegression(featuresCol = 'features', labelCol='quantity', maxIter = 10, regParam=0.3)

StatementMeta(SparkPool01, 0, 41, Finished, Available)



Next, we build the training pipeline.

In [42]:
from pyspark.ml import Pipeline

stages = []
stages += [vectorAssembler]
stages += [lin_reg]

partialPipeline = Pipeline().setStages(stages)

StatementMeta(SparkPool01, 0, 42, Finished, Available)



In the following cell, we train our LinearRegression model.


In [43]:
pipelineModel = partialPipeline.fit(df_train)

print("Coefficients: " + str(pipelineModel.stages[-1].coefficients))
print("Intercept: " + str(pipelineModel.stages[-1].intercept))

StatementMeta(SparkPool01, 0, 43, Finished, Available)

Coefficients: [0.002176554432299778,-0.36032287277976727]
Intercept: 80.74728569909968

With a trained model in hand, we can use it to make predictions against the test DataFrame.


In [44]:
df_pred = pipelineModel.transform(df_test)

from pyspark.sql.functions import rand
display(df_pred.select(["quantity",  "prediction"]).orderBy(rand()).limit(5))

StatementMeta(SparkPool01, 0, 44, Finished, Available)

SynapseWidget(Synapse.DataFrame, 2c4cc9dc-ca59-4037-b276-45d764143e9f)

## Convert model to ONNX
In the cells that follow, we convert the model to ONNX and show how an output of how ONNX represents the Spark ML model.


In [53]:
from onnxmltools import convert_sparkml
from onnxmltools.convert.common.data_types import FloatTensorType

initial_types = [ 
    ("customerkey", FloatTensorType([1, 1])),
    ("stockitemkey", FloatTensorType([1, 1]))
]

StatementMeta(SparkPool01, 0, 53, Finished, Available)



In [54]:
model_onnx = convert_sparkml(pipelineModel, 'sparkml GeneralizedLinearRegression', initial_types)
model_onnx

StatementMeta(SparkPool01, 0, 54, Finished, Available)

ir_version: 6
producer_name: "OnnxMLTools"
producer_version: "1.6.0"
domain: "onnxconverter-common"
model_version: 0
doc_string: ""
graph {
  node {
    input: "customerkey"
    input: "stockitemkey"
    output: "features"
    name: "Concat"
    op_type: "Concat"
    attribute {
      name: "axis"
      i: 1
      type: INT
    }
    domain: ""
  }
  node {
    input: "features"
    output: "prediction"
    name: "LinearRegressor"
    op_type: "LinearRegressor"
    attribute {
      name: "coefficients"
      floats: 0.0021765544079244137
      floats: -0.36032286286354065
      type: FLOATS
    }
    attribute {
      name: "intercepts"
      floats: 80.74728393554688
      type: FLOATS
    }
    domain: "ai.onnx.ml"
  }
  name: "sparkml GeneralizedLinearRegression"
  input {
    name: "customerkey"
    type {
      tensor_type {
        elem_type: 1
        shape {
          dim {
            dim_value: 1
          }
          dim {
            dim_value: 1
          }
        }
    

## Upload the model to Azure Storage
In the cells that follow we save the ONNX model to the storage of Spark driver node temporarily. Then we use the Azure Storage Python SDK to upload the ONNX model to Azure Storage.


In [47]:
with open("model.onnx", "wb") as f:
    f.write(model_onnx.SerializeToString())

StatementMeta(SparkPool01, 0, 47, Finished, Available)

394

In [48]:
connection_string = "DefaultEndpointsProtocol=https;AccountName=<blob_storage>;AccountKey=<blob_storage_account_key>;EndpointSuffix=core.windows.net"

from azure.storage.blob import BlobClient

blob = BlobClient.from_connection_string(conn_str=connection_string, container_name="models", blob_name="onnx/model.onnx")

with open("./model.onnx", "rb") as data:
    blob.upload_blob(data,overwrite=True)

StatementMeta(SparkPool01, 0, 48, Finished, Available)

{'etag': '"0x8D91C875F3A7E85"', 'last_modified': datetime.datetime(2021, 5, 21, 18, 36, 39, tzinfo=datetime.timezone.utc), 'content_md5': bytearray(b'\x07\xa9\x844M\x80\xa6\x0e\xf1^\xca\xcd<\x87\xa2\xfd'), 'client_request_id': '7afcec5e-ba63-11eb-98a7-000d3ac62d85', 'request_id': '62cfeccf-001e-0033-7a70-4e858f000000', 'version': '2019-12-12', 'version_id': None, 'date': datetime.datetime(2021, 5, 21, 18, 36, 38, tzinfo=datetime.timezone.utc), 'request_server_encrypted': True, 'encryption_key_sha256': None, 'encryption_scope': None, 'error_code': None}