In [1]:
from azureml.core import Workspace 
ws = Workspace.from_config()

# Uploading data

In [2]:
from azureml.core import Dataset
from azureml.data.datapath import DataPath 

default_ds = ws.get_default_datastore()

if 'diabetes dataset' not in ws.datasets:

    # Uploading data from notebook to a datastore
    Dataset.File.upload_directory(src_dir = 'data', target = DataPath(default_ds, 'diabetes-data/'))

    # Creating a Tabular dataset by uploading data from datastore
    tab_data_set = Dataset.Tabular.from_delimited_files(path = (default_ds, 'diabetes-data/*.csv'))

    # Registering tabular dataset
    try:
        tab_data_set = tab_data_set.register(workspace = ws, name = 'diabetes dataset', description = 'Diabetes Dataset', 
        tags = {'format': 'CSV'}, create_new_version = True)

        print('Dataset is registered.')
    
    except Exception as ex:
        print(ex)

else:
    print("Dataset is already registered.")

Validating arguments.
Arguments validated.
Uploading file to diabetes-data/
Uploading an estimated of 4 files
Uploading data/.amlignore
Uploaded data/.amlignore, 1 files out of an estimated total of 4
Uploading data/.amlignore.amltmp
Uploaded data/.amlignore.amltmp, 2 files out of an estimated total of 4
Uploading data/diabetes.csv
Uploaded data/diabetes.csv, 3 files out of an estimated total of 4
Uploading data/diabetes2.csv
Uploaded data/diabetes2.csv, 4 files out of an estimated total of 4
Uploaded 4 files
Creating new dataset
Dataset is registered.


# Creating scripts for Pipeline steps

In [3]:
import os 

# Creating a seperate folder for pipeline steps
experiment_folder = 'diabetes_pipeline'
os.makedirs(experiment_folder, exist_ok=True)

print(experiment_folder)

diabetes_pipeline


# Pipeline Starts

**Pipeline steps can be configured as Feature Engineering, Feature Selection, Model Training, Hyperparameter Tuning and Model Training, etc.<br>A single python file should have code for single step such as Feature Engineering.**

## Step 1: Feature Engineering

In [4]:
%%writefile $experiment_folder/prep_diabetes.py

# Import libraries
import os
import argparse
import pandas as pd
from azureml.core import Run
from sklearn.preprocessing import MinMaxScaler

# Get Parameters
parser = argparse.ArgumentParser()
parser.add_argument('--input-data', type = str, dest = 'raw_dataset_id', help = 'Raw Dataset')
parser.add_argument('--prepped-data', type = str, dest = 'prepped_data', help = 'Folder for results')
args = parser.parse_args()

save_folder = args.prepped_data

# Get the Experiment Run Context
run = Run.get_context()

# Load the data
diabetes = run.input_datasets['raw_data'].to_pandas_dataframe()

# Log raw row count
row_count = (len(diabetes))
run.log('raw_rows', row_count)

# Remove missing data from the dataset
diabetes = diabetes.dropna()

# Applying MinMaxScaler
scaler = MinMaxScaler()
num_cols = ['Pregnancies','PlasmaGlucose','DiastolicBloodPressure','TricepsThickness','SerumInsulin','BMI','DiabetesPedigree']
diabetes[num_cols] = scaler.fit_transform(diabetes[num_cols])

# Log processed rows
row_count = (len(diabetes))
run.log('processed_rows', row_count)

# Save the prepped data
print("Saving Data...")

os.makedirs(save_folder, exist_ok=True)

save_path = os.path.join(save_folder,'data.csv')
diabetes.to_csv(save_path, index=False, header=True)

# End the run
run.complete()

Writing diabetes_pipeline/prep_diabetes.py


## Step 2: Model Training and Model Registering

In [5]:
%%writefile $experiment_folder/train_diabetes.py

# Import libraries
from azureml.core import Run, Model
import argparse
import pandas as pd
import numpy as np
import joblib
import os
from sklearn.model_selection import train_test_split
from sklearn.tree import DecisionTreeClassifier
from sklearn.metrics import roc_auc_score
from sklearn.metrics import roc_curve
import matplotlib.pyplot as plt

# Get parameters
parser = argparse.ArgumentParser()
parser.add_argument("--training-data", type=str, dest='training_data', help='training data')
args = parser.parse_args()
training_data = args.training_data

# Get the experiment run context
run = Run.get_context()

# load the prepared data file in the training folder
print("Loading Data...")
file_path = os.path.join(training_data,'data.csv')
diabetes = pd.read_csv(file_path)

# Separate features and labels
X, y = diabetes[['Pregnancies','PlasmaGlucose','DiastolicBloodPressure','TricepsThickness','SerumInsulin','BMI','DiabetesPedigree','Age']].values, diabetes['Diabetic'].values

# Split data into training set and test set
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.30, random_state=0)

# Training decision tree classifier
print('Training a decision tree model...')
model = DecisionTreeClassifier().fit(X_train, y_train)

# Calculating accuracy
y_hat = model.predict(X_test)
acc = np.average(y_hat == y_test)

print('Accuracy:', acc)
run.log('Accuracy', np.float(acc))

# Calculate Area Under the Curve
y_scores = model.predict_proba(X_test)
auc = roc_auc_score(y_test,y_scores[:,1])

print('AUC: ' + str(auc))
run.log('AUC', np.float(auc))

# Plot ROC curve
fpr, tpr, thresholds = roc_curve(y_test, y_scores[:,1])
fig = plt.figure(figsize = (6, 4))

# Plot the diagonal 50% line
plt.plot([0, 1], [0, 1], 'k--')

# Plot the FPR and TPR achieved by our model
plt.plot(fpr, tpr)
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('ROC Curve')

run.log_image(name = "ROC", plot = fig)
plt.show()

# Save the trained model in the outputs folder
print("Saving model...")
os.makedirs('outputs', exist_ok=True)

model_file = os.path.join('outputs', 'diabetes_model.pkl')
joblib.dump(value = model, filename = model_file)

# Register the model
print('Registering model...')

Model.register(workspace = run.experiment.workspace,
               model_path = model_file,
               model_name = 'diabetes_model',
               tags={'Training context':'Pipeline'},
               properties={'AUC': np.float(auc), 'Accuracy': np.float(acc)})


run.complete()

Writing diabetes_pipeline/train_diabetes.py


# Creating compute target

In [6]:
from azureml.core.compute import AmlCompute, ComputeTarget
from azureml.core.compute_target import ComputeTargetException

cluster_name = 'your-cluster-name'

try:

    # Checking for existing compute target
    pipeline_cluster = ComputeTarget(workspace = ws, name = cluster_name)
    print("There is an exisiting cluster, please use it.")

except ComputeTargetException:

    # If cluster doesnt exist, create new one
    try:
        compute_config = AmlCompute.provisioning_configuration(vm_size='Standard_D2as_v4', max_nodes=2)
        
        pipeline_cluster = ComputeTarget.create(workspace = ws, name = cluster_name, provisioning_configuration = compute_config)
        pipeline_cluster.wait_for_completion(show_output = True)
    
    except Exception as ex:
        print(ex)



InProgress.......
SucceededProvisioning operation finished, operation "Succeeded"
Succeeded
AmlCompute wait for completion finished

Minimum number of nodes requested have been provisioned


## Compute requires a python env with the necessary package dependencies installed

In [7]:
%%writefile $experiment_folder/experiment_env.yml
name: experiment_env
dependencies:
- python=3.6.2
- scikit-learn
- ipykernel
- matplotlib
- pandas
- pip
- pip:
  - azureml-defaults
  - pyarrow

Writing diabetes_pipeline/experiment_env.yml


### Creating an env and use it to run configuration for the pipeline

In [12]:
from azureml.core import Environment
from azureml.core.runconfig import RunConfiguration

# Creating a Python env
experiment_env = Environment.from_conda_specification(name = 'experiment_env', file_path = experiment_folder + '/experiment_env.yml')

# Registering the environment
experiment_env.register(workspace = ws)

# Connecting to registered env
registered_env = Environment.get(workspace = ws, name = 'experiment_env')

# Creating a new run configuration object to the pipeline
pipeline_run_config = RunConfiguration()
pipeline_run_config.target = pipeline_cluster

# Assigning the run configuration the env
pipeline_run_config.environment = registered_env

print("Run configuration is completed.")

Run configuration is completed.


# Creating a Pipeline

In [13]:
from azureml.data import OutputFileDatasetConfig
from azureml.pipeline.steps import PythonScriptStep

# Getting the tabular train dataset that we registered in above steps
diabetes_ds = ws.datasets.get('diabetes dataset')

# OutputFileDatasetConfig :- It is an intermediary store for data that must be passed from one step to a subsequent step.
prepped_data = OutputFileDatasetConfig(name = 'prepped_data')

# Step 1:
prep_step = PythonScriptStep(name = 'Prepare Data', source_directory = experiment_folder, script_name = 'prep_diabetes.py',
                             arguments = ['--input-data', diabetes_ds.as_named_input('raw_data'),
                                          '--prepped-data', prepped_data],
                             compute_target = pipeline_cluster, runconfig = pipeline_run_config, allow_reuse = True)

# Step 2:
train_step = PythonScriptStep(name = "Train and Register Model", source_directory = experiment_folder, script_name = 'train_diabetes.py',
                              arguments = ['--training-data', prepped_data.as_input()],
                              compute_target = pipeline_cluster, runconfig = pipeline_run_config, allow_reuse = True)


# Defining and Running an Experiment

In [15]:
from azureml.core import Experiment
from azureml.pipeline.core import Pipeline
from azureml.widgets import RunDetails

# Constructing the pipeline
pipeline_steps = [prep_step, train_step]
pipeline = Pipeline(workspace = ws, steps = pipeline_steps)
print("Pipeline is Built...!")

# Creating an Experiment and run the Pipeline
experiment = Experiment(workspace = ws, name = 'mslearn-diabetes-pipeline')
pipeline_run = experiment.submit(config = pipeline, regenerate_outputs = True)

print('Pipeline submitted for execution.')
RunDetails(pipeline_run).show()

pipeline_run.wait_for_completion(show_output = True)

Pipeline is Built...!
Created step Prepare Data [edb80074][c9c79908-4aac-41bd-b2b9-f8d06eedbef3], (This step will run and generate new outputs)Created step Train and Register Model [04be6bf8][f06a00e7-0747-4c6e-9d3b-942d7b2867d5], (This step will run and generate new outputs)

Submitted PipelineRun eea52228-ade9-4b93-82b4-22fe7f33df3e
Link to Azure Machine Learning Portal: https://ml.azure.com/runs/eea52228-ade9-4b93-82b4-22fe7f33df3e?wsid=/subscriptions/3571f8dc-3527-4993-9d2b-ac0812d807fd/resourcegroups/aml-resources/workspaces/aml-workspace&tid=78c76086-2fb7-4f6a-b684-c129ba0ea713
Pipeline submitted for execution.


_PipelineWidget(widget_settings={'childWidgetDisplay': 'popup', 'send_telemetry': False, 'log_level': 'INFO', …

PipelineRunId: eea52228-ade9-4b93-82b4-22fe7f33df3e
Link to Azure Machine Learning Portal: https://ml.azure.com/runs/eea52228-ade9-4b93-82b4-22fe7f33df3e?wsid=/subscriptions/3571f8dc-3527-4993-9d2b-ac0812d807fd/resourcegroups/aml-resources/workspaces/aml-workspace&tid=78c76086-2fb7-4f6a-b684-c129ba0ea713
PipelineRun Status: Running


StepRunId: 46646279-6df6-42e0-b489-06b3096c2c61
Link to Azure Machine Learning Portal: https://ml.azure.com/runs/46646279-6df6-42e0-b489-06b3096c2c61?wsid=/subscriptions/3571f8dc-3527-4993-9d2b-ac0812d807fd/resourcegroups/aml-resources/workspaces/aml-workspace&tid=78c76086-2fb7-4f6a-b684-c129ba0ea713
StepRun( Prepare Data ) Status: Running

Streaming azureml-logs/20_image_build_log.txt
2022/08/09 13:17:52 Downloading source code...
2022/08/09 13:17:53 Finished downloading source code
2022/08/09 13:17:53 Creating Docker network: acb_default_network, driver: 'bridge'
2022/08/09 13:17:54 Successfully set up Docker network: acb_default_network
2022/08/09 13:17

'Finished'

## Metrics defined by child runs

In [17]:
for run in pipeline_run.get_children():
    
    print(run.name, ':')
    metrics = run.get_metrics()

    for metric_name in metrics:
        print('\t', metric_name, ':', metrics[metric_name])

Train and Register Model :
	 Accuracy : 0.8993333333333333
	 AUC : 0.8834452898201293
	 ROC : aml://artifactId/ExperimentRun/dcid.82029ea6-f4e9-4773-9995-a5eb951515cf/ROC_1660051944.png
Prepare Data :
	 raw_rows : 15000
	 processed_rows : 15000


**A new model should be registered with a Training context tag indicating it was trained in a pipeline.**

In [20]:
from azureml.core import Model

for model in Model.list(ws):
    print(model.name, 'version: ', model.version)

    print("Tags: ")

    for tag_name in model.tags:
        tag = model.tags[tag_name]
        print('\t', tag_name, ':', tag)

    print()

    print("Properties: ")
    for prop_name in model.properties:
        prop = model.properties[prop_name]
        print('\t', prop_name, ':', prop)
    
    print('\n')


diabetes_model version:  1
Tags: 
	 Training context : Pipeline

Properties: 
	 AUC : 0.8834452898201293
	 Accuracy : 0.8993333333333333




# Publishing the Pipeline

In [21]:
published_pipeline = pipeline_run.publish_pipeline(name = 'diabetes-training-pipeline', description="Trains diabetes model", version="1.0")

published_pipeline

Name,Id,Status,Endpoint
diabetes-training-pipeline,f66990fd-9408-41c0-ae69-11e993976b83,Active,REST Endpoint


In [22]:
rest_endpoint = published_pipeline.endpoint

rest_endpoint

'https://centralindia.api.azureml.ms/pipelines/v1.0/subscriptions/3571f8dc-3527-4993-9d2b-ac0812d807fd/resourceGroups/aml-resources/providers/Microsoft.MachineLearningServices/workspaces/aml-workspace/PipelineRuns/PipelineSubmit/f66990fd-9408-41c0-ae69-11e993976b83'

## Calling the Pipeline endpoint

In [23]:
from azureml.core.authentication import InteractiveLoginAuthentication

interactive_auth = InteractiveLoginAuthentication()
auth_header = interactive_auth.get_authentication_header()
print('Authentication header is ready...!')

Authentication header is ready...!


In [24]:
import requests 

experiment_name = 'mslearn-diabetes-pipeline'

rest_endpoint = published_pipeline.endpoint
response = requests.post(rest_endpoint, headers = auth_header, json = {'Experiment Name': experiment_name})

run_id = response.json()["Id"]
run_id

'6bef7d7d-3b97-4be7-8f5e-ee75390361ee'

In [26]:
# Since you have the run ID, you can use it to wait for the run to complete.

from azureml.pipeline.core.run import PipelineRun

published_pipeline_run = PipelineRun(ws.experiments[experiment_name], run_id)
published_pipeline_run.wait_for_completion(show_output = True)

PipelineRunId: 6bef7d7d-3b97-4be7-8f5e-ee75390361ee
Link to Azure Machine Learning Portal: https://ml.azure.com/runs/6bef7d7d-3b97-4be7-8f5e-ee75390361ee?wsid=/subscriptions/3571f8dc-3527-4993-9d2b-ac0812d807fd/resourcegroups/aml-resources/workspaces/aml-workspace&tid=78c76086-2fb7-4f6a-b684-c129ba0ea713

PipelineRun Execution Summary
PipelineRun Status: Finished
{'runId': '6bef7d7d-3b97-4be7-8f5e-ee75390361ee', 'status': 'Completed', 'startTimeUtc': '2022-08-09T14:04:14.168293Z', 'endTimeUtc': '2022-08-09T14:04:15.990043Z', 'services': {}, 'properties': {'azureml.runsource': 'azureml.PipelineRun', 'runSource': 'Unavailable', 'runType': 'HTTP', 'azureml.parameters': '{}', 'azureml.continue_on_step_failure': 'False', 'azureml.continue_on_failed_optional_input': 'True', 'azureml.pipelineid': 'f66990fd-9408-41c0-ae69-11e993976b83', 'azureml.pipelineComponent': 'pipelinerun'}, 'inputDatasets': [], 'outputDatasets': [], 'logFiles': {'logs/azureml/executionlogs.txt': 'https://amlworkspace584

'Finished'