In [1]:
# IGNORE THIS CELL
import os

basepath = "/mnt/azureblob"

if basepath not in map(lambda mnt : mnt.mountPoint, dbutils.fs.mounts()):
  dbutils.fs.mount(
    source = os.environ['AML_WORKSPACE_DATASETS_SOURCE'],
    mount_point = basepath,
    extra_configs = {os.environ['AML_WORKSPACE_DATASETS_SCOPE']:os.environ['AML_WORKSPACE_DATASETS_KEY']})

### Reading the training data

In this example, I'm using the dataset from Kaggle Gas sensor array under dynamic gas mixture: https://www.kaggle.com/uciml/gas-sensor-array-under-dynamic-gas-mixtures

This data set contains the acquired time series from 16 chemical sensors exposed to gas mixtures at varying concentration levels. In particular, we generated two gas mixtures: Ethylene and Methane in air, and Ethylene and CO in air. Each measurement was constructed by the continuous acquisition of the 16-sensor array signals for a duration of about 12 hours without interruption. 
The data set was collected in a gas delivery platform facility at the ChemoSignals Laboratory in the BioCircuits Institute, University of California San Diego. The measurement system platform provides versatility for obtaining the desired concentrations of the chemical substances of interest with high accuracy and in a highly reproducible manner.

In [3]:
# In my case, the file is stored in 
mixture_df = spark.read.parquet(basepath + "/gas-sensor-mixture/processed/ethylene_methane.parquet/*.parquet")

Create some folders to store artifacts

In [5]:
import os, shutil

model_name = "gbt_methaneconc_ppm"
model_dbfs = os.path.join("/FileStore", model_name)
dbutils.fs.mkdirs(model_dbfs)

#### Split dataset in Train and Test sets
As usual, we need to split the dataset in a train/test proportions. In this case we are leaving 75% of the dataset for training, and 25% for testing.

In [7]:
train, test = mixture_df.randomSplit([0.75, 0.25])

#### Build the model to predict Methaneconc concentration

Tree-based ensemble models are powerful estimators which tent to work, on average, prety well. Random Forest Regression is a fairly simple ensemble model that uses a modified tree learning algorithm that selects, at each candidate split in the learning process, a random subset of the features. This process is sometimes called "feature bagging". Another tree based ensemble model is Gradient Boosted Trees which uses a different approach called boosting to fit. Boosting reduces error mainly by reducing bias (and also to some extent variance, by aggregating the output from many models).

In this case we are going to build a model that first creates a vector from all the channels we have data from, and then use such vector to apply a Gradient Boosting Tree. Both steps are gathered together in a Pipeline object

In [9]:
from pyspark.ml.regression import GBTRegressor
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline

# Train a Gradient Boosted Trees (GBT) model.
assembler = VectorAssembler(inputCols=["SensorCH" + str(index) for index in range(1,17)], outputCol="features")
gbt = GBTRegressor(labelCol="Methaneconc_ppm")
gbtPipeline = Pipeline(stages=[assembler, gbt])

#### Train the model

We train the pipeline using the fit method from the object

In [11]:
gbtModel = gbtPipeline.fit(train.drop('Time_seconds', 'Ethyleneconc_ppm'))

#### Evaluate the results for the regression model

We are considering in this case R2

In [13]:
from pyspark.ml.evaluation import RegressionEvaluator

predictions = gbtModel.transform(test)

evaluator = RegressionEvaluator()
methaneconc_r2 = evaluator.evaluate(predictions, {evaluator.labelCol:"Methaneconc_ppm", evaluator.metricName: "r2"})

#### Save the model in the MLLib format

In MLLib, a Pipeline object is stored as a folder structure, where each of the steps in the pipeline is stored as a different subfolder in the structure. Inside such subfolder we will find all the information about such step. In order to register and save the model then, we must save it but also zip the entire directory.

In [15]:
gbtModel.write().overwrite().save(model_dbfs)
model_path_drv = shutil.make_archive(base_name="/tmp/"+model_name, format='zip', base_dir="/dbfs"+model_dbfs)

You can now upload to your repository the file stored in model_path_drv. This file is located in the driver node

#### Save the model in the ONNX format

In [18]:
import tempfile
tempfile.tempdir = "/tmp" # I had to apply this fix in order to get the convert_sparkml to work correctly

In [19]:
from onnxmltools import convert_sparkml
from onnxmltools.convert.sparkml.utils import buildInitialTypesSimple

sample_input = test.drop('Time_seconds', 'Ethyleneconc_ppm', 'Methaneconc_ppm') #Dropping unused columns for the model. Only retaining SensorCH01, 02,...
initial_types = buildInitialTypesSimple(sample_input) 
onnx_model = convert_sparkml(gbtModel, 'My Sparkml Pipeline', initial_types, spark_session = spark)

In [20]:
with open(os.path.join("/tmp/", "gbt_methaneconc_ppm.onnx"), "wb") as f:
    f.write(onnx_model.SerializeToString())

You can now upload the file /tmp/gbt_methaneconc_ppm.onnx. The file is at the driver node