The MLOps pipeline in AzureDevOps will leverage the same pipeline steps created in the previous example

In [5]:
# Import required packages
os.makedirs('./pipeline_step_scripts', exist_ok=True)

In [6]:
%%writefile ./pipeline_step_scripts/get_data.py

from azureml.core import Run, Workspace, Datastore, Dataset
from azureml.data.datapath import DataPath
import pandas as pd
import os
import argparse
from sklearn import preprocessing
import numpy as np

#Parse input arguments
parser = argparse.ArgumentParser("Get data from and register in AML workspace")
parser.add_argument('--exp_raw_data', dest='exp_raw_data', required=True)

args, _ = parser.parse_known_args()
exp_raw_dataset = args.exp_raw_data

#Get current run
current_run = Run.get_context()

#Get associated AML workspace
ws = current_run.experiment.workspace

#Connect to default data store
ds = ws.get_default_datastore()

tab_data_set = Dataset.Tabular.from_delimited_files(path=(ds, 'diabetes-data/*.csv'))


raw_df = tab_data_set.to_pandas_dataframe()

#Make directory on mounted storage
os.makedirs(exp_raw_dataset, exist_ok=True)

#Upload modified dataframe
raw_df.to_csv(os.path.join(exp_raw_dataset, 'exp_raw_data.csv'), index=False)

Writing ./pipeline_step_scripts/get_data.py


In [7]:
%%writefile ./pipeline_step_scripts/split.py

from azureml.core import Run, Workspace, Datastore, Dataset
from azureml.data.datapath import DataPath
import os
import argparse

import pandas as pd
import numpy as np
from sklearn import preprocessing
from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plt
import joblib
from numpy.random import seed

#Parse input arguments
parser = argparse.ArgumentParser("Split raw data into train/test and scale appropriately")

parser.add_argument('--exp_training_data', dest='exp_training_data', required=True)
parser.add_argument('--exp_testing_data', dest='exp_testing_data', required=True)


args, _ = parser.parse_known_args()
exp_training_data = args.exp_training_data
exp_testing_data = args.exp_testing_data


#Get current run
current_run = Run.get_context()

#Get associated AML workspace
ws = current_run.experiment.workspace

# Read input dataset to pandas dataframe
raw_datset = current_run.input_datasets['Exp_Raw_Data']
raw_df = raw_datset.to_pandas_dataframe()


for col in raw_df.columns:
    missing = raw_df[col].isnull()
    num_missing = np.sum(missing)
    if num_missing > 0:  
        raw_df['quality_{}_ismissing'.format(col)] = missing


print(raw_df.columns)

#Split data into training set and test set
df_train, df_test = train_test_split(raw_df, test_size=0.3, random_state=0)





# Save train data to both train and test (reflects the usage pattern in this sample. Note: test/train sets are typically distinct data).
os.makedirs(exp_training_data, exist_ok=True)
os.makedirs(exp_testing_data, exist_ok=True)

df_train.to_csv(os.path.join(exp_training_data, 'exp_training_data.csv'), index=False)
df_test.to_csv(os.path.join(exp_testing_data, 'exp_testing_data.csv'), index=False)

# Save scaler to PipelineData and outputs for record-keeping
#os.makedirs('./outputs', exist_ok=True)



Writing ./pipeline_step_scripts/split.py


In [8]:
%%writefile ./pipeline_step_scripts/train.py

from azureml.core import Run, Workspace, Datastore, Dataset
from azureml.data.datapath import DataPath
import os
import argparse
import shutil

import pandas as pd
import numpy as np
from sklearn import preprocessing
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import roc_auc_score
from sklearn.metrics import roc_curve


import matplotlib.pyplot as plt
import joblib
from numpy.random import seed


#Parse input arguments
parser = argparse.ArgumentParser("Train Logistic Regression model")
parser.add_argument('--exp_trained_model_pipeline_data', dest='exp_trained_model_pipeline_data', required=True)

args, _ = parser.parse_known_args()
exp_trained_model_pipeline_data = args.exp_trained_model_pipeline_data

#Get current run
run = Run.get_context()

#Get associated AML workspace
ws = run.experiment.workspace

# Read input dataset to pandas dataframe
X_train_dataset = run.input_datasets['Exp_Training_Data'].to_pandas_dataframe()
X_test_dataset = run.input_datasets['Exp_Testing_Data'].to_pandas_dataframe()

print(type(X_train_dataset))

# Separate features and labels
X_train, y_train = X_train_dataset[['Pregnancies','PlasmaGlucose','DiastolicBloodPressure','TricepsThickness','SerumInsulin','BMI','DiabetesPedigree','Age']].values, X_train_dataset['Diabetic'].values
X_test, y_test   = X_test_dataset[['Pregnancies','PlasmaGlucose','DiastolicBloodPressure','TricepsThickness','SerumInsulin','BMI','DiabetesPedigree','Age']].values, X_test_dataset['Diabetic'].values



# Set regularization hyperparameter
reg = 0.01

# Train a logistic regression model
print('Training a logistic regression model with regularization rate of', reg)
run.log('Regularization Rate',  np.float(reg))
model = LogisticRegression(C=1/reg, solver="liblinear").fit(X_train, y_train)

# calculate accuracy
y_hat = model.predict(X_test)
acc = np.average(y_hat == y_test)
print('Accuracy:', acc)
run.log('Accuracy', np.float(acc))

# calculate AUC
y_scores = model.predict_proba(X_test)
auc = roc_auc_score(y_test,y_scores[:,1])
print('AUC: ' + str(auc))
run.log('AUC', np.float(auc))

run.parent.log(name='AUC', value=np.float(auc))
run.parent.log(name='Accuracy', value=np.float(acc))

# Save the trained model in the outputs folder
os.makedirs('./outputs', exist_ok=True)
joblib.dump(value=model, filename='./outputs/diabetes_model.pkl')

#df_train.to_csv(os.path.join(exp_training_data, 'exp_training_data.csv'), index=False)
os.makedirs(exp_trained_model_pipeline_data, exist_ok=True)

shutil.copyfile('./outputs/diabetes_model.pkl', os.path.join(exp_trained_model_pipeline_data, 'diabetes_model.pkl'))



Writing ./pipeline_step_scripts/train.py


In [9]:
%%writefile ./pipeline_step_scripts/evaluate_and_register.py

from azureml.core import Run, Workspace, Datastore, Dataset
from azureml.core.model import Model
from azureml.data.datapath import DataPath

import os
import argparse
import shutil

parser = argparse.ArgumentParser("Evaluate model and register if more performant")
parser.add_argument('--exp_trained_model_pipeline_data', type=str, required=True)

args, _ = parser.parse_known_args()
exp_trained_model_pipeline_data = args.exp_trained_model_pipeline_data


#Get current run
run = Run.get_context()

#Get associated AML workspace
ws = run.experiment.workspace

#Get default datastore
ds = ws.get_default_datastore()

#Get metrics associated with current parent run
metrics = run.get_metrics()

print('current run metrics')
for key in metrics.keys():
        print(key, metrics.get(key))
print('\n')


print('parent run metrics')
#Get metrics associated with current parent run
metrics = run.parent.get_metrics()

for key in metrics.keys():
        print(key, metrics.get(key))
print('\n')

current_model_AUC = float(metrics['AUC'])
current_model_accuracy = float(metrics['Accuracy'])

# Get current model from workspace
model_name = 'diabetes_model'
model_description = 'Diabetes model'
model_list = Model.list(ws, name=model_name, latest=True)
first_registration = len(model_list)==0

updated_tags = {'AUC': current_model_AUC}

print('updated tags')
print(updated_tags)

# Copy autoencoder training outputs to relative path for registration
relative_model_path = 'model_files'
run.upload_folder(name=relative_model_path, path=exp_trained_model_pipeline_data)


#If no model exists register the current model
if first_registration:
    print('First model registration.')
    #model = run.register_model(model_name, model_path='model_files', description=model_description, model_framework='sklearn', model_framework_version=tf.__version__, tags=updated_tags, datasets=formatted_datasets, sample_input_dataset = training_dataset)
    run.register_model(model_path=relative_model_path, model_name='diabetes_model',
                   tags=updated_tags,
                   properties={'AUC': current_model_AUC})
else:
    #If a model has been registered previously, check to see if current model 
    #performs better. If so, register it.
    print(dir(model_list[0]))
    if float(model_list[0].tags['AUC']) < current_model_AUC:
        print('New model performs better than existing model. Register it.')
        #model = run.register_model(model_name, model_path='model_files', description=model_description, model_framework='Tensorflow/Keras', model_framework_version=tf.__version__, tags=updated_tags, datasets=formatted_datasets, sample_input_dataset = training_dataset)
        run.register_model(model_path=relative_model_path, model_name='diabetes_model',
                   tags=updated_tags,
                   properties={'AUC': current_model_AUC, 'Accuracy': current_model_accuracy})
    else:
        print('New model does not perform better than existing model. Cancel run.')
        run.cancel()

Writing ./pipeline_step_scripts/evaluate_and_register.py
