In [5]:
import os
import urllib
import pprint
import numpy as np
import time

from pyspark.ml import Pipeline, PipelineModel
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler
from pyspark.ml.classification import LogisticRegression, DecisionTreeClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

StatementMeta(testspark, 56, 7, Finished, Available)



In [6]:
#%%spark
# SQLDW inside Synapse analytics
#val df = spark.read.sqlanalytics("sqldb.dbo.AdultCensusIncome4") 
#df.createOrReplaceTempView("AdultCensusIncome4")

StatementMeta(testspark, 56, 8, Finished, Available)



In [7]:
%%spark
// SQLDW inside Synapse analytics. In this example, SQLDW table data origine from csv in this folder: AdultCensusIncome_withheader.csvimport com.microsoft.spark.sqlanalytics.utils.Constants
import org.apache.spark.sql.SqlAnalyticsConnector._

val df = spark.read.
option(Constants.SERVER, "<<your external SQLDW logical server>>.database.windows.net").
sqlanalytics("<<your database>>.dbo.AdultCensusIncome4")
df.createOrReplaceTempView("AdultCensusIncome4")

StatementMeta(testspark, 56, 9, Finished, Available)

import com.microsoft.spark.sqlanalytics.utils.Constants
import org.apache.spark.sql.SqlAnalyticsConnector._
df: org.apache.spark.sql.DataFrame = [age: int, workclass: string ... 13 more fields]


In [8]:
from pyspark.ml.recommendation import ALS
from pyspark.sql import Row

StatementMeta(testspark, 56, 10, Finished, Available)



In [9]:
# To be able to complete this lab in under an hour, let's just work with 1,500k rows
data_all = spark.sql("SELECT * FROM AdultCensusIncome4").limit(15000000)
#sample.createOrReplaceTempView("AdultCensusIncome")

columns_new = [col.replace("-", "") for col in data_all.columns]
data_all = data_all.toDF(*columns_new)

StatementMeta(testspark, 56, 11, Finished, Available)



In [10]:
data_all.printSchema()

StatementMeta(testspark, 56, 12, Finished, Available)

root
 |-- age: integer (nullable = false)
 |-- workclass: string (nullable = false)
 |-- fnlwgt: integer (nullable = false)
 |-- education: string (nullable = false)
 |-- educationnum: integer (nullable = false)
 |-- maritalstatus: string (nullable = false)
 |-- occupation: string (nullable = false)
 |-- relationship: string (nullable = false)
 |-- race: string (nullable = false)
 |-- sex: string (nullable = false)
 |-- capitalgain: integer (nullable = false)
 |-- capitalloss: integer (nullable = false)
 |-- hoursperweek: integer (nullable = false)
 |-- nativecountry: string (nullable = false)
 |-- income: string (nullable = false)

In [11]:
display(data_all.limit(10))

StatementMeta(testspark, 56, 13, Finished, Available)

SynapseWidget(Synapse.DataFrame, 9b62157b-dcc0-4262-b768-25e5644cf848)

In [12]:
import datetime
import azureml.core
from azureml.core import Workspace
from azureml.core.run import Run
from azureml.core.experiment import Experiment

# Check core SDK version number
print("SDK version:", azureml.core.VERSION)

workspace="<<your AzureML workspace>>"
subscription_id="<<your subscription>>"
resource_grp="<<your response group>>"

experiment_name = "experiment_model_release_synapse"
model_name_run = datetime.datetime.now().strftime("%Y%m%d%H%M%S")+ "_dbrmod_synapse.mml" # in case you want to change the name, keep the .mml extension
model_name = "databricksmodelsynapse.mml" # in case you want to change the name, keep the .mml extension

#
# Step 1: Run notebook using Databricks Compute in AML SDK
#

ws = Workspace(workspace_name = workspace,
               subscription_id = subscription_id,
               resource_group = resource_grp)

ws.get_details()

print('Workspace name: ' + ws.name, 
      'Azure region: ' + ws.location, 
      'Subscription id: ' + ws.subscription_id, 
      'Resource group: ' + ws.resource_group, sep = '\n')

StatementMeta(testspark, 56, 14, Finished, Available)

SDK version: 1.10.0
Workspace name: blog-devaisec-amls
Azure region: westeurope
Subscription id: 513a7987-b0d9-4106-a24d-4b3f49136ea8
Resource group: blog-devaisec-rg

In [13]:
import os
import urllib
import pprint
import numpy as np
import shutil
import time

from pyspark.ml import Pipeline, PipelineModel
from pyspark.ml.feature import OneHotEncoder, OneHotEncoderEstimator, StringIndexer, VectorAssembler
from pyspark.ml.classification import LogisticRegression, DecisionTreeClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

categoricalColumns = ["workclass", "education", "maritalstatus", "occupation", "relationship", "race", "sex", "nativecountry"]
stages = [] # stages in our Pipeline
for categoricalCol in categoricalColumns:
    # Category Indexing with StringIndexer
    stringIndexer = StringIndexer(inputCol=categoricalCol, outputCol=categoricalCol + "Index")
    # Use OneHotEncoder to convert categorical variables into binary SparseVectors
    # encoder = OneHotEncoderEstimator(inputCol=categoricalCol + "Index", outputCol=categoricalCol + "classVec")
    encoder = OneHotEncoderEstimator(inputCols=[stringIndexer.getOutputCol()], outputCols=[categoricalCol + "classVec"])
    # Add stages.  These are not run here, but will run all at once later on.
    stages += [stringIndexer, encoder]
    
    
# Convert label into label indices using the StringIndexer
label_stringIdx = StringIndexer(inputCol="income", outputCol="label")
stages += [label_stringIdx]

# Transform all features into a vector using VectorAssembler
numericCols = ["age", "fnlwgt", "educationnum", "capitalgain", "capitalloss", "hoursperweek"]
assemblerInputs = [c + "classVec" for c in categoricalColumns] + numericCols
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages += [assembler]

partialPipeline = Pipeline().setStages(stages)
pipelineModel = partialPipeline.fit(data_all)
preppedDataDF = pipelineModel.transform(data_all)

selectedcols = ["label", "features"] + ["income"] + categoricalColumns + numericCols
dataset = preppedDataDF.select(selectedcols)

(trainingData, testData) = dataset.randomSplit([0.7, 0.3], seed=122423)

StatementMeta(testspark, 56, 15, Finished, Available)



In [14]:
myexperiment = Experiment(ws, experiment_name)
root_run = myexperiment.start_logging()

StatementMeta(testspark, 56, 16, Finished, Available)



In [16]:
import os

from pyspark.ml.classification import LogisticRegression
#regParam = [0,0.01, 0.5, 2.0]
regParam = [0]
# try a bunch of alpha values in a Linear Regression (Ridge) model
for reg in regParam:
    print("Regularization rate: {}".format(reg))
    # create a bunch of child runs
    with root_run.child_run("reg-" + str(reg)) as run:
        # create a new Logistic Regression model.
        
        lr = LogisticRegression(regParam=reg)
        
        # put together the pipeline
        pipe = Pipeline(stages=[lr])

        # train the model
        model_pipeline = pipe.fit(trainingData)
        predictions = model_pipeline.transform(testData)

        # evaluate. note only 2 metrics are supported out of the box by Spark ML.
        bce = BinaryClassificationEvaluator(rawPredictionCol='rawPrediction')
        au_roc = bce.setMetricName('areaUnderROC').evaluate(predictions)
        au_prc = bce.setMetricName('areaUnderPR').evaluate(predictions)
        truePositive = predictions.select("label").filter("label = 1 and prediction = 1").count()
        falsePositive = predictions.select("label").filter("label = 0 and prediction = 1").count()
        trueNegative = predictions.select("label").filter("label = 0 and prediction = 0").count()
        falseNegative = predictions.select("label").filter("label = 1 and prediction = 0").count()

        # log reg, au_roc, au_prc and feature names in run history
        run.log("reg", reg)
        run.log("au_roc", au_roc)
        run.log("au_prc", au_prc)
        
        print("Area under ROC: {}".format(au_roc))
        print("Area Under PR: {}".format(au_prc))
       
        run.log("truePositive", truePositive)
        run.log("falsePositive", falsePositive)
        run.log("trueNegative", trueNegative)
        run.log("falseNegative", falseNegative)
                                                                                                                                                                  
        print("TP: " + str(truePositive) + ", FP: " + str(falsePositive) + ", TN: " + str(trueNegative) + ", FN: " + str(falseNegative))                                                                         
        
        run.log_list("columns", trainingData.columns)

        # save model

        #os.mkdir("deploymodel2") 
        #os.chdir("deploymodel4")
        model_pipeline.write().overwrite().save("prod/" + model_name)
        
        # Todo: upload the serialized model into AzureML, fix code below
        #mdl, ext = model_name.split(".")
        #shutil.make_archive(mdl, 'zip')

        #model_zip = mdl + ".zip"
        #run.upload_file("outputs/" + model_name, model_zip)        
        #run.upload_file("outputs/" + model_name, path_or_stream = model_dbfs) #cannot deal with folders


StatementMeta(testspark, 56, 18, Finished, Available)

Regularization rate: 0
Area under ROC: 0.9043354488998249
Area Under PR: 0.761097345573136
TP: 1416, FP: 500, TN: 6895, FN: 942