#3b. Modélisation Machine Learning

Azure ML & Azure Databricks notebooks by Parashar Shah.
Copyright (c) Microsoft Corporation. All rights reserved.
Licensed under the MIT License.

##1. Paramétrage

In [3]:
import os
import pprint
import numpy as np

from pyspark.ml import Pipeline, PipelineModel
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

In [4]:
import azureml.core

# Check core SDK version number
print("Version Azure ML service :", azureml.core.VERSION)

In [5]:

from azureml.core import Workspace

ws = Workspace.from_config()
print('Nom du Workspace : ' + ws.name, 
      'Région Azure : ' + ws.location, 
      'Subscription ID : ' + ws.subscription_id, 
      'Nom du Ressource Group : ' + ws.resource_group, sep = '\n')

##2. Chargement des données et partitionnement

In [7]:
train_data_path = "AdultCensusIncomeTrain"
test_data_path = "AdultCensusIncomeTest"

train = spark.read.parquet(train_data_path)
test = spark.read.parquet(test_data_path)

print("Training : ({}, {})".format(train.count(), len(train.columns)))
print("Test : ({}, {})".format(test.count(), len(test.columns)))

print()
train.printSchema()

##3. Pipeline ML

In [9]:
label = "income"
dtypes = dict(train.dtypes)
dtypes.pop(label)

si_xvars = []
ohe_xvars = []
featureCols = []
for idx,key in enumerate(dtypes):
    if dtypes[key] == "string":
        featureCol = "-".join([key, "encoded"])
        featureCols.append(featureCol)
        
        tmpCol = "-".join([key, "tmp"])
        # string-index and one-hot encode the string column
        #https://spark.apache.org/docs/2.3.0/api/java/org/apache/spark/ml/feature/StringIndexer.html
        #handleInvalid: Param for how to handle invalid data (unseen labels or NULL values). 
        #Options are 'skip' (filter out rows with invalid data), 'error' (throw an error), 
        #or 'keep' (put invalid data in a special additional bucket, at index numLabels). Default: "error"
        si_xvars.append(StringIndexer(inputCol=key, outputCol=tmpCol, handleInvalid="skip"))
        ohe_xvars.append(OneHotEncoder(inputCol=tmpCol, outputCol=featureCol))
    else:
        featureCols.append(key)

si_label = StringIndexer(inputCol=label, outputCol='label')

assembler = VectorAssembler(inputCols=featureCols, outputCol="features")

In [10]:
from azureml.core.run import Run
from azureml.core.experiment import Experiment
import numpy as np
import os
import shutil

model_name = "AdultCensus_runHistory.mml"
model_dbfs = os.path.join("/dbfs", model_name)
run_history_name = 'spark-ml-notebook'

# Définition experiment Azure ML
myexperiment = Experiment(ws, "Azure_Databricks_Experiment")
root_run = myexperiment.start_logging()

# Taux de régularisation
regs = np.arange(0.0, 1.0, 0.2)

for reg in regs:
    print("Taux de régularisation : {}".format(reg))
    # create a bunch of child runs
    with root_run.child_run("reg-" + str(reg)) as run:
        # create a new Logistic Regression model.
        lr = LogisticRegression(regParam=reg)
        
        # put together the pipeline
        pipe = Pipeline(stages=[*si_xvars, *ohe_xvars, si_label, assembler, lr])

        # train the model
        model_pipeline = pipe.fit(train)
        
        # make prediction
        pred = model_pipeline.transform(test)
        
        # evaluate. note only 2 metrics are supported out of the box by Spark ML.
        bce = BinaryClassificationEvaluator(rawPredictionCol='rawPrediction')
        au_roc = bce.setMetricName('areaUnderROC').evaluate(pred)
        au_prc = bce.setMetricName('areaUnderPR').evaluate(pred)

        print("Area under ROC : {}".format(au_roc))
        print("Area Under PR : {}".format(au_prc))
        print()
        # log reg, au_roc, au_prc and feature names in run history
        run.log("reg", reg)
        run.log("au_roc", au_roc)
        run.log("au_prc", au_prc)
        run.log_list("columns", train.columns)

        # save model
        model_pipeline.write().overwrite().save(model_name)
        
        # upload the serialized model into run history record
        mdl, ext = model_name.split(".")
        model_zip = mdl + ".zip"
        shutil.make_archive(mdl, 'zip', model_dbfs)
        run.upload_file("outputs/" + model_name, model_zip)        
        #run.upload_file("outputs/" + model_name, path_or_stream = model_dbfs) #cannot deal with folders

        # now delete the serialized model from local folder since it is already uploaded to run history 
        shutil.rmtree(model_dbfs)
        os.remove(model_zip)
        
root_run.complete()
root_run_id = root_run.id
print ("run ID:", root_run.id)

In [11]:
child_runs = {}
child_run_metrics = {}

for r in root_run.get_children():
    child_runs[r.id] = r
    child_run_metrics[r.id] = r.get_metrics()

In [12]:
best_run_id = max(child_run_metrics, key = lambda k: child_run_metrics[k]['au_roc'])
best_run = child_runs[best_run_id]
print('Meilleur modèle :', best_run_id)
print()
print('Metriques :', child_run_metrics[best_run_id])

In [13]:
best_reg = child_run_metrics[best_run_id]['reg']
max_auc = child_run_metrics[best_run_id]['au_roc']

reg_auc = np.array([(child_run_metrics[k]['reg'], child_run_metrics[k]['au_roc']) for k in child_run_metrics.keys()])
reg_auc_sorted = reg_auc[reg_auc[:,0].argsort()]

import pandas as pd
df = pd.DataFrame(reg_auc_sorted)
spdf = spark.createDataFrame(df)

display(spdf)

0,1
0.0,0.8983381958673242
0.2,0.8865991419251571
0.4,0.8843149801344333
0.6000000000000001,0.8830050006983922
0.8,0.8821250887390275


In [14]:
# Récupération du meilleur modèle de DBFS en local

best_model_file_name = "best_model.zip"
best_run.download_file(name = 'outputs/' + model_name, output_file_path = best_model_file_name)

##4. Evaluation du modèle

In [16]:
if os.path.isfile(model_dbfs) or os.path.isdir(model_dbfs):
    shutil.rmtree(model_dbfs)
shutil.unpack_archive(best_model_file_name, model_dbfs)

model_pipeline_best = PipelineModel.load(model_name)

In [17]:
# Prévisions

pred = model_pipeline_best.transform(test)
output = pred[['hours_per_week','age','workclass','marital_status','income','prediction']]

display(output)

hours_per_week,age,workclass,marital_status,income,prediction
1,21,Private,Never-married,<=50K,0.0
1,23,Private,Never-married,<=50K,0.0
1,27,Private,Never-married,<=50K,0.0
1,72,?,Married-civ-spouse,<=50K,0.0
2,32,?,Never-married,<=50K,0.0
2,32,Private,Married-civ-spouse,<=50K,0.0
2,47,Private,Married-civ-spouse,>50K,1.0
2,61,?,Married-civ-spouse,>50K,0.0
2,67,?,Married-civ-spouse,<=50K,0.0
2,67,Self-emp-not-inc,Widowed,>50K,1.0


## Métriques

In [19]:
bce = BinaryClassificationEvaluator(rawPredictionCol='rawPrediction')
au_roc = bce.setMetricName('areaUnderROC').evaluate(pred)
au_prc = bce.setMetricName('areaUnderPR').evaluate(pred)

print("Métriques :")
print()
print("Area under ROC = {}".format(au_roc))
print("Area Under PR = {}".format(au_prc))

##5. Persistence du modèle

In [21]:
model_pipeline_best.write().overwrite().save(model_name)
print("Modèle sauvegardé ici : {}".format(model_dbfs))

In [22]:
%sh

ls -la /dbfs/AdultCensus_runHistory.mml/*

In [23]:
dbutils.notebook.exit("success")

success