## MLOps with Azure ML Pipelines

ML Pipeline - Write Scripts to folder

In [19]:
registered_env_name = "experiment_env"
experiment_folder = 'devOps_train_pipeline'
dataset_prefix_name = 'exp'
cluster_name = "mm-cluster"

Import required packages

In [20]:
# Import required packages
from azureml.core import Workspace, Experiment, Datastore, Environment, Dataset
from azureml.core.compute import ComputeTarget, AmlCompute, DataFactoryCompute
from azureml.core.compute_target import ComputeTargetException
from azureml.core.runconfig import RunConfiguration
from azureml.core.conda_dependencies import CondaDependencies
from azureml.core.runconfig import DEFAULT_CPU_IMAGE
from azureml.pipeline.core import Pipeline, PipelineParameter, PipelineData
from azureml.pipeline.steps import PythonScriptStep
from azureml.pipeline.core import PipelineParameter, PipelineData
from azureml.data.output_dataset_config import OutputTabularDatasetConfig, OutputDatasetConfig, OutputFileDatasetConfig
from azureml.data.datapath import DataPath
from azureml.data.data_reference import DataReference
from azureml.data.sql_data_reference import SqlDataReference
from azureml.pipeline.steps import DataTransferStep
import logging

In [21]:
# Connect to AML Workspace
ws = Workspace.from_config()

# Get the default datastore
default_ds = ws.get_default_datastore()

#Select AML Compute Cluster
from azureml.core.compute import ComputeTarget, AmlCompute
from azureml.core.compute_target import ComputeTargetException


try:
    # Check for existing compute target
    pipeline_cluster = ComputeTarget(workspace=ws, name=cluster_name)
    print('Found existing cluster, use it.')
except ComputeTargetException:
    # If it doesn't already exist, create it
    try:
        compute_config = AmlCompute.provisioning_configuration(vm_size='STANDARD_DS11_V2', max_nodes=2)
        pipeline_cluster = ComputeTarget.create(ws, cluster_name, compute_config)
        pipeline_cluster.wait_for_completion(show_output=True)
    except Exception as ex:
        print(ex)

Found existing cluster, use it.


In [22]:
import os
# Create a folder for the pipeline step files
os.makedirs(experiment_folder, exist_ok=True)

print(experiment_folder)

devOps_train_pipeline


In [23]:
#conda_yml_file = '../configuration/environment.yml'
conda_yml_file = './'+ experiment_folder+ '/environment.yml'

In [24]:
%%writefile $conda_yml_file
name: experiment_env
dependencies:
- python=3.6.2
- scikit-learn
- ipykernel
- matplotlib
- pandas
- pip
- pip:
  - azureml-defaults
  - pyarrow
  - azureml-monitoring
  - azureml-interpret
  - inference-schema
  - joblib
  - azure-ml-api-sdk

Overwriting ./devOps_train_pipeline/environment.yml


In [25]:
%%writefile ./$experiment_folder/get_data.py

from azureml.core import Run, Workspace, Datastore, Dataset
from azureml.data.datapath import DataPath
import pandas as pd
import os
import argparse
from sklearn import preprocessing
import numpy as np

#Parse input arguments
#command-line parsing module 
parser = argparse.ArgumentParser("Get data from and register in AML workspace")
parser.add_argument('--exp_raw_data', dest='exp_raw_data', required=True)

args, _ = parser.parse_known_args()
exp_raw_dataset = args.exp_raw_data

#Get current run
current_run = Run.get_context()

#Get associated AML workspace
ws = current_run.experiment.workspace

#Connect to default data store
ds = ws.get_default_datastore()

tab_data_set = Dataset.Tabular.from_delimited_files(path=(ds, 'diabetes-data/*.csv'))

raw_df = tab_data_set.to_pandas_dataframe()

#Make directory on mounted storage
os.makedirs(exp_raw_dataset, exist_ok=True)

#this will allow us to register the dataset on completion
raw_df.to_csv(os.path.join(exp_raw_dataset, 'exp_raw_data.csv'), index=False)

Overwriting ./devOps_train_pipeline/get_data.py


In [26]:
%%writefile ./$experiment_folder/split.py

from azureml.core import Run, Workspace, Datastore, Dataset
from azureml.data.datapath import DataPath
import os
import argparse

import pandas as pd
import numpy as np
from sklearn import preprocessing
from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plt
import joblib
from numpy.random import seed

#Parse input arguments
parser = argparse.ArgumentParser("Split raw data into train/test and scale appropriately")

parser.add_argument('--exp_training_data', dest='exp_training_data', required=True)
parser.add_argument('--exp_testing_data', dest='exp_testing_data', required=True)


args, _ = parser.parse_known_args()
exp_training_data = args.exp_training_data
exp_testing_data = args.exp_testing_data


#Get current run
current_run = Run.get_context()

#Get associated AML workspace
ws = current_run.experiment.workspace

# Read input dataset to pandas dataframe
raw_datset = current_run.input_datasets['Exp_Raw_Data']
raw_df = raw_datset.to_pandas_dataframe()


for col in raw_df.columns:
    missing = raw_df[col].isnull()
    num_missing = np.sum(missing)
    if num_missing > 0:  
        raw_df['quality_{}_ismissing'.format(col)] = missing


print(raw_df.columns)

#Split data into training set and test set
df_train, df_test = train_test_split(raw_df, test_size=0.3, random_state=0)





# Save train data to both train and test (reflects the usage pattern in this sample. Note: test/train sets are typically distinct data).
os.makedirs(exp_training_data, exist_ok=True)
os.makedirs(exp_testing_data, exist_ok=True)

df_train.to_csv(os.path.join(exp_training_data, 'exp_training_data.csv'), index=False)
df_test.to_csv(os.path.join(exp_testing_data, 'exp_testing_data.csv'), index=False)


Overwriting ./devOps_train_pipeline/split.py


In [27]:
%%writefile ./$experiment_folder/train.py

from azureml.core import Run, Workspace, Datastore, Dataset
from azureml.data.datapath import DataPath
import os
import argparse
import shutil

import pandas as pd
import numpy as np
from sklearn import preprocessing
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import roc_auc_score
from sklearn.metrics import roc_curve


import matplotlib.pyplot as plt
import joblib
from numpy.random import seed


#Parse input arguments
parser = argparse.ArgumentParser("Train Logistic Regression model")
parser.add_argument('--model_file_output', dest='model_file_output', required=True)


args, _ = parser.parse_known_args()
model_file_output = args.model_file_output

def converttypes(df):
    cols = df.columns
    for c in cols:
        df[c] = pd.to_numeric(df[c], errors = 'coerce')

    print('data types')
    print(df.dtypes)
    return df


#Get current run
run = Run.get_context()

#Get associated AML workspace
ws = run.experiment.workspace

# Read input dataset to pandas dataframe
X_train_dataset = run.input_datasets['Exp_Training_Data'].to_pandas_dataframe()
X_test_dataset = run.input_datasets['Exp_Testing_Data'].to_pandas_dataframe()

X_train_dataset = converttypes(X_train_dataset)
X_test_dataset = converttypes(X_test_dataset)


print(type(X_train_dataset))

# Separate features and labels
X_train, y_train = X_train_dataset[['Pregnancies','PlasmaGlucose','DiastolicBloodPressure','TricepsThickness','SerumInsulin','BMI','DiabetesPedigree','Age']].values, X_train_dataset['Diabetic'].values
X_test, y_test   = X_test_dataset[['Pregnancies','PlasmaGlucose','DiastolicBloodPressure','TricepsThickness','SerumInsulin','BMI','DiabetesPedigree','Age']].values, X_test_dataset['Diabetic'].values

print('***********')
print(type(X_train))
print(type(X_test))
print(type(y_train[0]))
print(type(y_test[0]))
print('**********')
# Set regularization hyperparameter
reg = 0.01

# Train a logistic regression model
print('Training a logistic regression model with regularization rate of', reg)
run.log('Regularization Rate',  np.float(reg))
model = LogisticRegression(C=1/reg, solver="liblinear").fit(X_train, y_train)

# calculate accuracy
y_hat = model.predict(X_test)
acc = np.average(y_hat == y_test)
print('Accuracy:', acc)
run.log('Accuracy', np.float(acc))

print('y_hat[0] is if type=:' + str(type(y_hat[0])))

# calculate AUC
y_scores = model.predict_proba(X_test)
auc = roc_auc_score(y_test,y_scores[:,1])
print('AUC: ' + str(auc))
run.log('AUC', np.float(auc))

run.parent.log(name='AUC', value=np.float(auc))
run.parent.log(name='Accuracy', value=np.float(acc))

# Save the trained model in the outputs folder
os.makedirs('./outputs', exist_ok=True)
joblib.dump(value=model, filename='./outputs/diabetes_model_remote.pkl')

os.makedirs(model_file_output, exist_ok=True)

shutil.copyfile('./outputs/diabetes_model_remote.pkl', os.path.join(model_file_output, 'diabetes_model_remote.pkl'))


Overwriting ./devOps_train_pipeline/train.py


In [28]:
%%writefile ./$experiment_folder/evaluate_and_register.py

from azureml.core import Run, Workspace, Datastore, Dataset
from azureml.core.model import Model
from azureml.data.datapath import DataPath

import joblib
import os
import argparse
import shutil
import pandas as pd

from interpret.ext.blackbox import TabularExplainer
from azureml.interpret import ExplanationClient
from azureml.interpret.scoring.scoring_explainer import LinearScoringExplainer, save

from azureml.core.model import InferenceConfig
from azureml.core.compute import ComputeTarget, AksCompute
from azureml.core.compute_target import ComputeTargetException
from azureml.core.webservice import Webservice, AksWebservice


parser = argparse.ArgumentParser("Evaluate model and register if more performant")

parser.add_argument('--model_file', type=str, required=True)
parser.add_argument('--deploy_file_output', type=str, help='File passing in pipeline to deploy')

args, _ = parser.parse_known_args()

deploy_file = args.deploy_file_output
model_file = args.model_file

def converttypes(df):
    cols = df.columns
    for c in cols:
        df[c] = pd.to_numeric(df[c], errors = 'coerce')

    print('data types')
    print(df.dtypes)
    return df

def model_explain():
    #load trinning data
    X_train_dataset = run.input_datasets['Exp_Training_Data'].to_pandas_dataframe()
    X_test_dataset = run.input_datasets['Exp_Testing_Data'].to_pandas_dataframe()
    
    X_test_dataset = converttypes(X_test_dataset)
    X_train_dataset = converttypes(X_train_dataset)
    
    X_train, y_train = X_train_dataset[['Pregnancies','PlasmaGlucose','DiastolicBloodPressure','TricepsThickness','SerumInsulin','BMI','DiabetesPedigree','Age']].values, X_train_dataset['Diabetic'].values
    X_test, y_test   = X_test_dataset[['Pregnancies','PlasmaGlucose','DiastolicBloodPressure','TricepsThickness','SerumInsulin','BMI','DiabetesPedigree','Age']].values, X_test_dataset['Diabetic'].values

    
    #load the model
    model_list = Model.list(ws, name=model_name, latest=True)
    model_path = model_list[0].download(exist_ok=True)
    model = joblib.load(model_path)

    #https://github.com/Azure/MachineLearningNotebooks/blob/master/how-to-use-azureml/explain-model/azure-integration/scoring-time/train_explain.py
    # create an explanation client to store the explanation (contrib API)
    client = ExplanationClient.from_run(run)

    # create an explainer to validate or debug the model
    tabular_explainer = TabularExplainer(model,
                                         initialization_examples=X_train,
                                         features=['Pregnancies','PlasmaGlucose','DiastolicBloodPressure','TricepsThickness','SerumInsulin','BMI','DiabetesPedigree','Age'],
                                         classes=[0, 1])
                                         #transformations=transformations)

    # explain overall model predictions (global explanation)
    # passing in test dataset for evaluation examples - note it must be a representative sample of the original data
    # more data (e.g. x_train) will likely lead to higher accuracy, but at a time cost
    global_explanation = tabular_explainer.explain_global(X_test)

    # uploading model explanation data for storage or visualization
    comment = 'Global explanation on classification model trained'
    client.upload_model_explanation(global_explanation, comment=comment, model_id=model_reg.id)



#Get current run
run = Run.get_context()

#Get associated AML workspace
ws = run.experiment.workspace

#Get default datastore
ds = ws.get_default_datastore()


#Get metrics associated with current parent run
metrics = run.get_metrics()

print('current run metrics')
for key in metrics.keys():
        print(key, metrics.get(key))
print('\n')


print('parent run metrics')
#Get metrics associated with current parent run
metrics = run.parent.get_metrics()

for key in metrics.keys():
        print(key, metrics.get(key))
print('\n')

current_model_AUC = float(metrics['AUC'])
current_model_accuracy = float(metrics['Accuracy'])

# Get current model from workspace
model_name = 'diabetes_model_remote'
model_description = 'Diabetes model remote'
model_list = Model.list(ws, name=model_name, latest=True)
first_registration = len(model_list)==0

updated_tags = {'AUC': current_model_AUC}

print('updated tags')
print(updated_tags)

# Copy  training outputs to relative path for registration



relative_model_path = 'outputs'
run.upload_folder(name=relative_model_path, path=model_file)



#If no model exists register the current model
if first_registration:
    print('First model registration.')
    model_reg = run.register_model(model_path='outputs/diabetes_model_remote.pkl', model_name=model_name,
                   tags=updated_tags,
                   properties={'AUC': current_model_AUC})

    #model_explain()
else:
    #If a model has been registered previously, check to see if current model 
    #performs better. If so, register it.
    print(dir(model_list[0]))
    if float(model_list[0].tags['AUC']) < current_model_AUC:
        print('New model performs better than existing model. Register it.')

        model_reg = run.register_model(model_path='outputs/diabetes_model_remote.pkl', model_name=model_name,
                   tags=updated_tags,
                   properties={'AUC': current_model_AUC, 'Accuracy': current_model_accuracy})

        #model_explain()
        
        # Output accuracy to file
        with open(deploy_file, 'w+') as f:
            f.write(('deploy'))
    
    else:
        print('New model does not perform better than existing model. Cancel run.')
        
        with open(deploy_file, 'w+') as f:
            f.write(('no deployment'))
            
        run.cancel()

Overwriting ./devOps_train_pipeline/evaluate_and_register.py


In [29]:
%%writefile ./$experiment_folder/deployACI.py

import argparse
from azureml.core import Workspace, Environment
from azureml.core.model import Model
from azureml.core.run import Run
from azureml.core.model import InferenceConfig
from azureml.core.webservice import Webservice, AciWebservice
from azureml.exceptions import WebserviceException

parser = argparse.ArgumentParser(description='Deploy arg parser')
parser.add_argument('--scoring_file_output', type=str, help='File storing the scoring url')
parser.add_argument('--deploy_file', type=str, help='File storing if model should be deployed')
parser.add_argument('--environment_name', type=str,help='Environment name')
parser.add_argument('--service_name', type=str,help='service name')
parser.add_argument('--model_name', type=str,help='model name')



args = parser.parse_args()
scoring_url_file = args.scoring_file_output
deploy_file      = args.deploy_file
environment_name = args.environment_name
service_name     = args.service_name
model_name       = args.model_name


run = Run.get_context()

#Get associated AML workspace
ws = run.experiment.workspace

model = Model(ws, model_name)
env = Environment.get(ws, environment_name)
inference_config = InferenceConfig(entry_script='score.py', environment=env)

# Deploy model
aci_config = AciWebservice.deploy_configuration(
            cpu_cores = 1, 
            memory_gb = 2, 
            tags = {'model': 'diabetes remote training'},
            auth_enabled=True,
            enable_app_insights=True,
            collect_model_data=True)

try:
    service = Webservice(ws, name=service_name)
    if service:
        service.delete()
except WebserviceException as e:
         print()

service = Model.deploy(ws, service_name, [model], inference_config, aci_config)
service.wait_for_deployment(True)
    

# Output scoring url
print(service.scoring_uri)
with open(scoring_url_file, 'w+') as f:
    f.write(service.scoring_uri)



Overwriting ./devOps_train_pipeline/deployACI.py


In [30]:
%%writefile ./$experiment_folder/score.py

import json
import joblib
import numpy as np
from azureml.core.model import Model
from azureml.monitoring import ModelDataCollector
import time
import os
import pandas as pd


#version 2
# Called when the service is loaded
def init():
    global model
    #Print statement for appinsights custom traces:
    print ("model initialized" + time.strftime("%H:%M:%S"))
    # Get the path to the deployed model file and load it
    path = os.path.join(Model.get_model_path('diabetes_model_remote'))
    
    print(path)
    model = joblib.load(path)

    
    global inputs_dc, prediction_dc
    inputs_dc = ModelDataCollector("best_model", designation="inputs", feature_names=['Pregnancies','PlasmaGlucose','DiastolicBloodPressure','TricepsThickness','SerumInsulin','BMI','DiabetesPedigree','Age'])
    prediction_dc = ModelDataCollector("best_model", designation="predictions", feature_names=["Diabetic"])



# Called when a request is received
def run(raw_data):
    try:
        # Get the input data as a numpy array
        #data = np.array(json.loads(raw_data)['data'])
        # Get a prediction from the model
        
        json_data = json.loads(raw_data)
        predictions = model.predict(json_data['data'])
        print ("Prediction created" + time.strftime("%H:%M:%S"))
        # Get the corresponding classname for each prediction (0 or 1)
        classnames = ['not-diabetic', 'diabetic']
        predicted_classes = []
        for prediction in predictions:
            val = int(prediction)
            predicted_classes.append(classnames[val])
        # Return the predictions as JSON
        
         # Log the input and output data to appinsights:
        info = {
            "input": raw_data,
            "output": predicted_classes
            }
        print(json.dumps(info))
        
        inputs_dc.collect(json_data['data']) #this call is saving our input data into Azure Blob
        prediction_dc.collect(predicted_classes) #this call is saving our prediction data into Azure Blob

        
        return json.dumps(predicted_classes)
    except Exception as e:
        error = str(e)
        print (error + time.strftime("%H:%M:%S"))
        return error

Overwriting ./devOps_train_pipeline/score.py
