Simple pipeline to preprocess data and train a model to predict diabetes in patients. Uses Azure Machine Learning.

In [21]:
# Connect to workspace
import azureml.core
from azureml.core import Workspace 

ws = Workspace.from_config()
print(f"Using Azure ML {azureml.core.VERSION, ws.name}")

Using Azure ML ('1.47.0', 'mguo-projs')


In [23]:
# Prepare dataset of diabetes patients
from azureml.core import Dataset

default_ds = ws.get_default_datastore() 

if 'diabetes dataset' not in ws.datasets:
    default_ds.upload_files(
        files=['./data/diabetes.csv', './data/diabetes2.csv'],
        target_path = 'diabetes-data',
        overwrite=True,
        show_progress=True)

    # Create tabular dataset from datastore path
    tabular_dataset = Dataset.Tabular.from_delimited_files(path=(default_ds, 'diabetes-data/*.csv'))

    # Register dataset
    try: 
        tabular_dataset = tabular_dataset.register(workspace=ws, 
                                        name='diabetes dataset',
                                        description='diabetes data',
                                        tags = {'format': 'CSV'},
                                        create_new_version=True)
        print('Dataset registered.')
    except Exception as ex: 
        print(ex)
else: 
    print('Dataset already registered.')

Dataset already registered.


In [24]:
# Create scripts for pipeline
# Create folder for pipeline step files
import os 
experiment_folder = 'diabetes_pipeline'
os.makedirs(experiment_folder, exist_ok=True)
print(experiment_folder)

diabetes_pipeline


In [25]:
%%writefile $experiment_folder/prep_diabetes.py

# Script 1 reads from the diabetes dataset and preprocesses it
#Import libraries
import os 
import argparse
import pandas as pd 
from azureml.core import Run 
from sklearn.preprocessing import MinMaxScaler

# Get params
parser = argparse.ArgumentParser() 
parser.add_argument('--input-data', type=str, dest='raw_dataset_id', help='Raw dataset')
parser.add_argument('--prepped-data', type=str, dest='prepped_data', default='prepped_data', help='Folder for results')
args = parser.parse_args()
save_folder = args.prepped_data

# Get experiment run context
run = Run.get_context()

# Load the data that was passed as an input dataset
print("Loading...")
diabetes = run.input_datasets['raw_data'].to_pandas_dataframe()

# Log initial row count
row_count = len(diabetes)
run.log('raw_rows', row_count)

# Remove null values
diabetes = diabetes.dropna() 

# Normalize numeric columns
scaler = MinMaxScaler()
num_cols = ['Pregnancies','PlasmaGlucose','DiastolicBloodPressure','TricepsThickness','SerumInsulin','BMI','DiabetesPedigree']
diabetes[num_cols] = scaler.fit_transform(diabetes[num_cols])

# Log newly processed rows 
row_count = len(diabetes)
run.log('processed_rows', row_count)

# Save data
print('Saving...')
os.makedirs(save_folder, exist_ok=True)
save_path = os.path.join(save_folder, 'data.csv')
diabetes.to_csv(save_path, index=False, header=True)

# End run
run.complete()

Overwriting diabetes_pipeline/prep_diabetes.py


In [26]:
%%writefile $experiment_folder/train_diabetes.py

# Script Step 2 trains the model
# Import libraries
from azureml.core import Run, Model 
import argparse
import pandas as pd 
import numpy as np
import os
import joblib
from sklearn.model_selection import train_test_split
from sklearn.tree import DecisionTreeClassifier
from sklearn.metrics import roc_auc_score, roc_curve 
import matplotlib.pyplot as plt

# Get params
parser = argparse.ArgumenetParser()
# --traning-folder references the folder where the prepped data was saved
parser.add_argument('--training-folder', type=str, dest='training_folder', help='training data folder')
args = parser.parse_args()
training_folder = args.training_folder 

# Get experiment run context
run = Run.get_context() 

# Load the prepped data file in the training folder
file_path = os.path.join(training_folder, 'data.csv')
diabetes = pd.read_csv(file_path)

# Separate labels from features
X, y = diabetes[['Pregnancies','PlasmaGlucose','DiastolicBloodPressure','TricepsThickness','SerumInsulin','BMI','DiabetesPedigree','Age']].values, diabetes['Diabetic'].values

# Split data into training and validation sets 
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=0)

# Train decision tree model
model = DecisionTreeClassifier().fit(X_train, y_train)

# Determine metrics
y_hat = model.predict(X_test)
acc = np.average(y_hat == y_test)
print('Accuracy: ', acc)
run.log('Accuracy', np.float(acc)) # accuracy

y_scores = model.predict_proba(X_test)
auc = roc_auc_score(y_test, y_scores[:,1])
print('AUC: ' + str(auc))
run.log('AUC', np.float(auc)) # AUC

fpr, tpr, thresholds = roc_curve(y_test, y_scores[:,1])
fig = plt.figure(figsize=(6,4))
plt.plot([0,1], [0,1], 'k--')
plt.plot(fpr, tpr)
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('ROC Curve')
plt.log_image(name='ROC', plot=fig)
plt.show() 

# Save trained model to outputs folder
os.makedirs('outputs', exist_ok=True)
model_file = os.path.join('outputs', 'diabetes_model.pkl')
joblib.dump(value=model, filename=model_file)

# Register model 
Model.register(workspace=run.experiment.workspace, 
                model_path = model_file, 
                model_name = 'diabetes_model',
                tags={'Training context' : 'Pipeline'}, 
                properties={'AUC' : np.float(auc), 'Accuracy' : np.float(acc)})

# End run
run.complete()

Overwriting diabetes_pipeline/train_diabetes.py


In [27]:
# Prepare compute environment for pipeline steps 

from azureml.core.compute import ComputeTarget, AmlCompute
from azureml.core.compute_target import ComputeTargetException 

cluster_name = 'mguo1'

try: 
    pipeline_cluster = ComputeTarget(workspace=ws, name=cluster_name)
    print('Found existing cluster')
except ComputeTargetException: 
    try: 
        compute_config = AmlCompute.provisioning_configuration(vm_size='STANDARD_DS11_V2', max_nodes=2)
        pipeline_cluster = ComputeTarget.create(ws, cluster_name, compute_config)
        pipeline_cluster.wait_for_completion(show_output=True)
    except Exception as ex: 
        print(ex)

Found existing cluster


In [28]:
# Create run configuration 

from azureml.core import Environment
from azureml.core.conda_dependencies import CondaDependencies
from azureml.core.runconfig import RunConfiguration

# Set up python env 
diabetes_env = Environment('diabetes-pipeline-env')
diabetes_env.python.user_managed_dependencies = False; 
diabetes_env.docker.enabled = True

# Create package dependencies
diabetes_packages = CondaDependencies.create(conda_packages=['scikit-learn','ipykernel','matplotlib','pandas','pip'],
                                             pip_packages=['azureml-defaults','azureml-dataprep[pandas]','pyarrow'])

diabetes_env.python.conda_dependencies = diabetes_packages

# Register python environment 
diabetes_env.register(workspace=ws)
registered_env = Environment.get(ws, 'diabetes-pipeline-env')

# New runconfig object created 
pipeline_run_config = RunConfiguration()
pipeline_run_config.target = pipeline_cluster

# Link environment to run configuration
pipeline_run_config.environment = registered_env

print('Run config created.')

'enabled' is deprecated. Please use the azureml.core.runconfig.DockerConfiguration object with the 'use_docker' param instead.


Run config created.


In [33]:
from azureml.pipeline.core import PipelineData
from azureml.pipeline.steps import PythonScriptStep

# Get training data
diabetes_ds = ws.datasets.get("diabetes dataset")

# Create a PipelineData for model folder
prepped_data_folder = PipelineData("prepped_data_folder", datastore=ws.get_default_datastore())

# 1) Run the data prep script
train_step = PythonScriptStep(name = "Prepare Data",
                                source_directory = experiment_folder,
                                script_name = "prep_diabetes.py",
                                arguments = ['--input-data', diabetes_ds.as_named_input('raw_data'),
                                             '--prepped-data', prepped_data_folder],
                                outputs=[prepped_data_folder],
                                compute_target = pipeline_cluster,
                                runconfig = pipeline_run_config,
                                allow_reuse = True)

# 2) run the training script
register_step = PythonScriptStep(name = "Train and Register Model",
                                source_directory = experiment_folder,
                                script_name = "train_diabetes.py",
                                arguments = ['--training-folder', prepped_data_folder],
                                inputs=[prepped_data_folder],
                                compute_target = pipeline_cluster,
                                runconfig = pipeline_run_config,
                                allow_reuse = True)

print("Pipeline steps defined")

Pipeline steps defined


OK, you're ready build the pipeline from the steps you've defined and run it as an experiment.

In [34]:
from azureml.core import Experiment
from azureml.pipeline.core import Pipeline
from azureml.widgets import RunDetails

# Construct pipeline
pipeline_steps = [train_step, register_step]
pipeline = Pipeline(workspace=ws, steps=pipeline_steps)
print("Pipeline is built.")

# Create experiment and run pipeline
experiment = Experiment(workspace=ws, name = 'diabetes-training-pipeline')
pipeline_run = experiment.submit(pipeline, regenerate_outputs=True)
print("Pipeline submitted for execution.")
RunDetails(pipeline_run).show()
pipeline_run.wait_for_completion(show_output=True)

Pipeline is built.
Created step Prepare Data [c5256e7b][f4909614-5260-4936-943f-dcac3148c132], (This step will run and generate new outputs)
Created step Train and Register Model [73b520a9][c6e2fd37-a4a3-4331-92f6-f5abe9b60d54], (This step will run and generate new outputs)
Submitted PipelineRun b7cac11d-3e54-490d-bbd3-9d0663835dad
Link to Azure Machine Learning Portal: https://ml.azure.com/runs/b7cac11d-3e54-490d-bbd3-9d0663835dad?wsid=/subscriptions/5c021c60-0721-4640-b99d-eb46315540a5/resourcegroups/mguo-learn/workspaces/mguo-projs&tid=723a5a87-f39a-4a22-9247-3fc240c01396
Pipeline submitted for execution.


_PipelineWidget(widget_settings={'childWidgetDisplay': 'popup', 'send_telemetry': False, 'log_level': 'INFO', …

PipelineRunId: b7cac11d-3e54-490d-bbd3-9d0663835dad
Link to Azure Machine Learning Portal: https://ml.azure.com/runs/b7cac11d-3e54-490d-bbd3-9d0663835dad?wsid=/subscriptions/5c021c60-0721-4640-b99d-eb46315540a5/resourcegroups/mguo-learn/workspaces/mguo-projs&tid=723a5a87-f39a-4a22-9247-3fc240c01396
PipelineRun Status: NotStarted
PipelineRun Status: Running


StepRunId: 54603402-c702-4db9-ac7b-3cb70d6fce23
Link to Azure Machine Learning Portal: https://ml.azure.com/runs/54603402-c702-4db9-ac7b-3cb70d6fce23?wsid=/subscriptions/5c021c60-0721-4640-b99d-eb46315540a5/resourcegroups/mguo-learn/workspaces/mguo-projs&tid=723a5a87-f39a-4a22-9247-3fc240c01396
StepRun( Prepare Data ) Status: NotStarted
StepRun( Prepare Data ) Status: Running


In [12]:
from azureml.core import Model

for model in Model.list(ws):
    print(model.name, 'version:', model.version)
    for tag_name in model.tags:
        tag = model.tags[tag_name]
        print ('\t',tag_name, ':', tag)
    for prop_name in model.properties:
        prop = model.properties[prop_name]
        print ('\t',prop_name, ':', prop)
    print('\n')

diabetes_model version: 1
	 Training context : Pipeline
	 AUC : 0.8832685513461764
	 Accuracy : 0.8986666666666666




In [13]:
# Publish pipeline from run
published_pipeline = pipeline_run.publish_pipeline(
    name="Diabetes_Training_Pipeline", description="Trains diabetes model", version="1.0")

published_pipeline

Name,Id,Status,Endpoint
Diabetes_Training_Pipeline,2f5f69d4-5a0b-4e0f-bbff-c6a6b3e03175,Active,REST Endpoint


In [14]:
rest_endpoint = published_pipeline.endpoint
print(rest_endpoint)

https://canadaeast.api.azureml.ms/pipelines/v1.0/subscriptions/5c021c60-0721-4640-b99d-eb46315540a5/resourceGroups/mguo-learn/providers/Microsoft.MachineLearningServices/workspaces/mguo-projs/PipelineRuns/PipelineSubmit/2f5f69d4-5a0b-4e0f-bbff-c6a6b3e03175


In [15]:
# Call pipeline endpoint
from azureml.core.authentication import InteractiveLoginAuthentication

interactive_auth = InteractiveLoginAuthentication()
auth_header = interactive_auth.get_authentication_header()
print("Authentication header ready.")

Authentication header ready.


In [16]:
# Track pipeline 
import requests

experiment_name = 'Run_pipeline'

rest_endpoint = published_pipeline.endpoint
response = requests.post(rest_endpoint, 
                         headers=auth_header, 
                         json={"ExperimentName": experiment_name})
run_id = response.json()["Id"]
run_id

'4c955a1a-9350-45b6-860c-26b3baf638ec'

In [17]:
from azureml.pipeline.core.run import PipelineRun

published_pipeline_run = PipelineRun(ws.experiments[experiment_name], run_id)
pipeline_run.wait_for_completion(show_output=True)

PipelineRunId: 0177fabc-125f-4c84-afb7-4854730caabb
Link to Azure Machine Learning Portal: https://ml.azure.com/runs/0177fabc-125f-4c84-afb7-4854730caabb?wsid=/subscriptions/5c021c60-0721-4640-b99d-eb46315540a5/resourcegroups/mguo-learn/workspaces/mguo-projs&tid=723a5a87-f39a-4a22-9247-3fc240c01396

PipelineRun Execution Summary
PipelineRun Status: Finished
{'runId': '0177fabc-125f-4c84-afb7-4854730caabb', 'status': 'Completed', 'startTimeUtc': '2022-12-27T21:19:13.253055Z', 'endTimeUtc': '2022-12-27T21:44:35.89324Z', 'services': {}, 'properties': {'azureml.runsource': 'azureml.PipelineRun', 'runSource': 'SDK', 'runType': 'SDK', 'azureml.parameters': '{}', 'azureml.continue_on_step_failure': 'False', 'azureml.continue_on_failed_optional_input': 'True', 'azureml.pipelineComponent': 'pipelinerun', 'azureml.pipelines.stages': '{"Initialization":null,"Execution":{"StartTime":"2022-12-27T21:19:13.799903+00:00","EndTime":"2022-12-27T21:44:35.7764668+00:00","Status":"Finished"}}'}, 'inputData

'Finished'

In [18]:
# Retrain model periodically using newly generated data 

from azureml.pipeline.core import ScheduleRecurrence, Schedule

# Submit the Pipeline every Monday at 00:00 UTC
recurrence = ScheduleRecurrence(frequency="Week", interval=1, week_days=["Monday"], time_of_day="00:00")
weekly_schedule = Schedule.create(ws, name="weekly-diabetes-training", 
                                  description="Based on time",
                                  pipeline_id=published_pipeline.id, 
                                  experiment_name=experiment_name, 
                                  recurrence=recurrence)
print('Pipeline scheduled.')

Pipeline scheduled.
