### Configure Workspace

In [1]:
from pyspark.sql import SparkSession
from azureml.core import Workspace, Experiment
import mlflow

# Setup Azure Workspace
ws = Workspace.from_config()
experiment_name = 'leads-pyspark-train'
experiment = Experiment(workspace=ws, name=experiment_name)

# Start MLflow Experiment
mlflow.set_tracking_uri(ws.get_mlflow_tracking_uri())
mlflow.set_experiment(experiment_name)
run = mlflow.start_run()

# Get default datastore
default_ds = ws.get_default_datastore()

# Get Spark session
spark = SparkSession.builder.getOrCreate()

### Generate and Upload Batch Data

In [2]:
from azureml.core import Dataset
import pandas as pd
import os

df = spark.read.csv(
    path='data/bank-additional-full.csv',
    header="true",
    inferSchema="true",
    sep=";")
trainDF, testDF, batchDF = df.randomSplit([.7, .29, .01], seed=999)
batchData = batchDF.toPandas()

# Create a folder
batch_folder = './batch-data'
os.makedirs(batch_folder, exist_ok=True)
print("Folder created!")

# Save each sample as a separate file
print("Saving files...")
x = 0
y = 10
for i in range(int(batchDF.count()/10)):
    filename = str(i+1) + '.csv'
    writeData=batchData[x:y]
    writeData.to_csv(os.path.join(batch_folder, filename), sep=",")
    x+=10
    y+=10

print("files saved!")

# Upload the files to the default datastore
print("Uploading files to datastore...")
default_ds = ws.get_default_datastore()
default_ds.upload(src_dir="batch-data", target_path="batch-data", overwrite=True, show_progress=True)

# Register a dataset for the input data
batch_data_set = Dataset.File.from_files(path=(default_ds, 'batch-data/'), validate=False)
try:
    batch_data_set = batch_data_set.register(workspace=ws, 
                                             name='leads-batch-data',
                                             description='batch data for Marketing Leads UCI',
                                             create_new_version=True)
except Exception as ex:
    print(ex)

print("Done!")


Folder created!
Saving files...
files saved!
Uploading files to datastore...
Uploading an estimated of 40 files
Uploading batch-data/1.csv
Uploaded batch-data/1.csv, 1 files out of an estimated total of 40
Uploading batch-data/10.csv
Uploaded batch-data/10.csv, 2 files out of an estimated total of 40
Uploading batch-data/11.csv
Uploaded batch-data/11.csv, 3 files out of an estimated total of 40
Uploading batch-data/12.csv
Uploaded batch-data/12.csv, 4 files out of an estimated total of 40
Uploading batch-data/13.csv
Uploaded batch-data/13.csv, 5 files out of an estimated total of 40
Uploading batch-data/14.csv
Uploaded batch-data/14.csv, 6 files out of an estimated total of 40
Uploading batch-data/15.csv
Uploaded batch-data/15.csv, 7 files out of an estimated total of 40
Uploading batch-data/16.csv
Uploaded batch-data/16.csv, 8 files out of an estimated total of 40
Uploading batch-data/17.csv
Uploaded batch-data/17.csv, 9 files out of an estimated total of 40
Uploading batch-data/18.cs

### Train Model

In [3]:
# A Transformer used in pipelines for renaming columns 
from pyspark.ml import Transformer
from pyspark.ml.util import DefaultParamsReadable, DefaultParamsWritable  

class ColumnRenamer(Transformer, DefaultParamsReadable, DefaultParamsWritable):
    '''
    Renames the following columns in the dataframe: 
    employment variation rate
    consumer price index
    consumer confidence index 
    number of employees 
    '''
    def __init__(self):
        super(ColumnRenamer, self).__init__()
        self.columnsToBeRenamed = {
            'emp.var.rate':'emp_var_rate',
            'cons.price.idx':'cons_price_idx',
            'cons.conf.idx':'cons_conf_idx',
            'nr.employed':'nr_employed'}

    def _transform(self, df):
        for key in self.columnsToBeRenamed.keys():
            df = df.withColumnRenamed(key, self.columnsToBeRenamed[key])
        return df    
rename_columns = ColumnRenamer()

# Uses R Formula for automatic conversion of categorical labels to 1 hot encoding
from pyspark.ml.feature import RFormula
rFormula = RFormula(formula="y ~ .", featuresCol="features", labelCol="label", handleInvalid="skip")

# Uses String Indexer and Numeric Columns only for Tree Based Classifiers
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler
example_df = rename_columns.transform(trainDF)
categorialColumns = [colname for (colname, dataType) in example_df.dtypes if ((dataType=="string") and (colname!="y"))]
stringIndexer = StringIndexer(inputCols=categorialColumns, outputCols=[c + "Index" for c in categorialColumns])
oheEncoder = OneHotEncoder(inputCols=stringIndexer.getOutputCols(), outputCols=[c + "ohe" for c in categorialColumns])
label_stringIdx = StringIndexer(inputCol="y", outputCol="label")
numericColumns = [colname for (colname, dataType) in example_df.dtypes if (dataType=="int" or dataType=="float" or dataType=="double")]
assembledInputs = numericColumns + [c + "Index" for c in categorialColumns]
vecAssembler = VectorAssembler(inputCols=assembledInputs, outputCol="features")

In [4]:
from pyspark.ml.classification import LogisticRegression, DecisionTreeClassifier, RandomForestClassifier, GBTClassifier, LinearSVC
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
from pyspark.ml import Pipeline
import mlflow.spark
import pandas as pd

# For Tracking Models
model_num=1
pipelineModel = None

# Evaluators for performance metrics
bevaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction", labelCol="label")
mevaluator = MulticlassClassificationEvaluator()

# Non Tree Based Models
non_tree_models = [LogisticRegression(), LinearSVC()]
for model in non_tree_models:
    non_tree_pipeline = Pipeline(stages=[rename_columns, rFormula, model])
    pipelineModel = non_tree_pipeline.fit(trainDF)
    predDF = pipelineModel.transform(testDF)

    modelName =str(model_num)+'-'+model.__class__.__name__
    accuracy = mevaluator.setMetricName("accuracy").evaluate(predDF)
    roc = bevaluator.setMetricName("areaUnderROC").evaluate(predDF)
    pr = bevaluator.setMetricName("areaUnderPR").evaluate(predDF)
    model_num += 1


    # Log metrics and model
    mlflow.spark.log_model(pipelineModel, modelName)
    mlflow.log_metrics({"modelNum":model_num, "accuracy":accuracy, "areaUnderROC":roc, "areaUnderPR":pr})
    print("Training complete:",modelName)

# Tree Based Models
tree_models = [DecisionTreeClassifier(), RandomForestClassifier(), GBTClassifier()]
for model in tree_models:
    tree_pipeline = Pipeline(stages=[rename_columns, stringIndexer, oheEncoder, label_stringIdx, vecAssembler,model])
    pipelineModel = tree_pipeline.fit(trainDF)
    predDF = pipelineModel.transform(testDF)

    modelName = str(model_num)+'-'+model.__class__.__name__
    accuracy = mevaluator.setMetricName("accuracy").evaluate(predDF)
    roc = bevaluator.setMetricName("areaUnderROC").evaluate(predDF)
    pr = bevaluator.setMetricName("areaUnderPR").evaluate(predDF)
    model_num += 1

    # Log metrics and model
    mlflow.spark.log_model(pipelineModel, modelName)
    mlflow.log_metrics({"modelNum":model_num, "accuracy":accuracy, "areaUnderROC":roc, "areaUnderPR":pr})
    print("Training complete:",modelName)

Training complete: 1-LogisticRegression
Training complete: 2-LinearSVC
Training complete: 3-DecisionTreeClassifier
Training complete: 4-RandomForestClassifier
Training complete: 5-GBTClassifier


In [5]:
'''
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
gbt = GBTClassifier()
gbt_pipeline = Pipeline(stages=[rename_columns, stringIndexer, oheEncoder, label_stringIdx, vecAssembler, gbt])
paramGrid = ParamGridBuilder().addGrid(gbt.maxDepth,[5,10]).build()
    
cv = CrossValidator(estimator=gbt_pipeline, estimatorParamMaps=paramGrid, evaluator=mevaluator, numFolds=5)
cvModel = cv.fit(trainDF)
predictions = cvModel.transform(testDF)
bevaluator.evaluate(predictions)
'''

pipelineModel.save('model')

from azureml.core import Model
Model.register(
    workspace=ws,
    model_path='model/',
    model_name='pyspark-batch-leads-model',
)



Registering model pyspark-batch-leads-model


### Create Compute

In [21]:
from azureml.core.compute import ComputeTarget, AmlCompute
from azureml.core.compute_target import ComputeTargetException

# Compute params
compute_name = 'rohan-vm-cluster'
inference_cluster = None

if compute_name in ws.compute_targets:
    inference_cluster = ComputeTarget(ws, compute_name)
    print("Using existing cluster.")
else:
    try:
        compute_config = AmlCompute.provisioning_configuration(
            vm_size ='STANDARD_DS11_V2', 
            max_nodes=2 )
        inference_cluster = ComputeTarget.create(ws, compute_name, compute_config)
        inference_cluster.wait_for_completion(show_output=True)
    except Exception as ex:
        print(ex)
    print("Cluster created.")

Using existing cluster.


### Scoring Script

In [37]:
%%writefile 'batch-pipeline/score.py'
import os
import numpy as np
from azureml.core import Model
from pyspark.ml import PipelineModel
#from pyspark.sql import SparkSession


def init():
    global model
    #spark = SparkSession.builder.getOrCreate()
    model_path = Model.get_model_path('pyspark-batch-leads-model')
    model= PipelineModel.load(model_path)

def run(mini_batch):
    # This runs for each batch
    resultList = []
    # process each file in the batch
    for f in mini_batch:
        df = spark.read.csv(path=f,header="true",inferSchema="true",sep=",").drop('_c0')
        prediction = model.transform(df).select('prediction').toPandas().prediction.map({0.0:"no",1.0:"yes"}).to_numpy()
        resultList.append("{}: {}".format(os.path.basename(f), prediction))
    return resultList


Overwriting batch-pipeline/score.py


### Create Pipeline

In [38]:
from azureml.core import Environment
from azureml.core.runconfig import DEFAULT_CPU_IMAGE

# Create an Environment for the experiment
batch_env = Environment.from_conda_specification(name="experiment_env", file_path="batch-pipeline/batch_environment.yml")
batch_env.docker.base_image = DEFAULT_CPU_IMAGE
print('Configuration ready.')

Configuration ready.


In [39]:
from azureml.pipeline.steps import ParallelRunConfig, ParallelRunStep
from azureml.data import OutputFileDatasetConfig
from azureml.core.runconfig import DockerConfiguration

output_dir = OutputFileDatasetConfig(name='inferences')

parallel_run_config = ParallelRunConfig(
    source_directory='batch-pipeline/',
    entry_script="score.py",
    mini_batch_size="5",
    error_threshold=10,
    output_action="append_row",
    environment=batch_env,
    compute_target=inference_cluster,
    node_count=2)

parallelrun_step = ParallelRunStep(
    name='batch-score-leads',
    parallel_run_config=parallel_run_config,
    inputs=[batch_data_set.as_named_input('leads_batch')],
    output=output_dir,
    arguments=[],
    allow_reuse=True
)

print('Steps defined')

Steps defined


In [40]:
from azureml.core import Experiment
from azureml.pipeline.core import Pipeline

pipeline = Pipeline(workspace=ws, steps=[parallelrun_step])
pipeline_run = Experiment(workspace=ws, name='leads-batch-pipeline').submit(pipeline)
pipeline_run.wait_for_completion(show_output=True)



Created step batch-score-leads [00aced3a][a2719f31-7cef-4639-b528-3b338977fd20], (This step will run and generate new outputs)
Submitted PipelineRun 6277edd5-3538-4ea1-94c3-8df36492360a
Link to Azure Machine Learning Portal: https://ml.azure.com/runs/6277edd5-3538-4ea1-94c3-8df36492360a?wsid=/subscriptions/23416925-66df-470c-b651-f378856d8ad7/resourcegroups/rohan-rg/workspaces/rohan-ws&tid=13715ad3-e049-4909-899b-f9e22f99b1a5
PipelineRunId: 6277edd5-3538-4ea1-94c3-8df36492360a
Link to Azure Machine Learning Portal: https://ml.azure.com/runs/6277edd5-3538-4ea1-94c3-8df36492360a?wsid=/subscriptions/23416925-66df-470c-b651-f378856d8ad7/resourcegroups/rohan-rg/workspaces/rohan-ws&tid=13715ad3-e049-4909-899b-f9e22f99b1a5
PipelineRun Status: NotStarted
PipelineRun Status: Running


StepRunId: dfdb9062-eceb-4862-a2a3-519526f975e5
Link to Azure Machine Learning Portal: https://ml.azure.com/runs/dfdb9062-eceb-4862-a2a3-519526f975e5?wsid=/subscriptions/23416925-66df-470c-b651-f378856d8ad7/resour

ActivityFailedException: ActivityFailedException:
	Message: Activity Failed:
{
    "error": {
        "code": "UserError",
        "message": "User program failed with Exception: Run failed, please check logs for details. You can check logs/readme.txt for the layout of logs.",
        "messageParameters": {},
        "detailsUri": "https://aka.ms/azureml-run-troubleshooting",
        "details": []
    },
    "time": "0001-01-01T00:00:00.000Z"
}
	InnerException None
	ErrorResponse 
{
    "error": {
        "message": "Activity Failed:\n{\n    \"error\": {\n        \"code\": \"UserError\",\n        \"message\": \"User program failed with Exception: Run failed, please check logs for details. You can check logs/readme.txt for the layout of logs.\",\n        \"messageParameters\": {},\n        \"detailsUri\": \"https://aka.ms/azureml-run-troubleshooting\",\n        \"details\": []\n    },\n    \"time\": \"0001-01-01T00:00:00.000Z\"\n}"
    }
}

In [None]:
published_pipeline = pipeline_run.publish_pipeline(name='leads-batch-pipeline', description='Batch scoring of leads data from UCI', version='1.0')
rest_endpoint = published_pipeline.endpoint
print(rest_endpoint)

### Test the Pipeline

In [None]:
from azureml.core.authentication import InteractiveLoginAuthentication
from azureml.pipeline.core.run import PipelineRun
from azureml.widgets import RunDetails
import requests

interactive_auth = InteractiveLoginAuthentication()
auth_header = interactive_auth.get_authentication_header()

rest_endpoint = published_pipeline.endpoint
response = requests.post(rest_endpoint, headers=auth_header, json={"ExperimentName": "leads-batch-pipeline"})
run_id = response.json()["Id"]

published_pipeline_run = PipelineRun(ws.experiments['leads-batch-pipeline'], run_id)
published_pipeline_run.wait_for_completion(show_output=True)

In [None]:
'''
.addGrid(gbt.maxDepth,[5,10])
.addGrid(gbt.maxIter,[10,50])
.addGrid(gbt.maxBins,[16,32])
.addGrid(gbt.stepSize,[0.05,0.1])
'''