# Azure Real Time Diabetes Predictions Pipeline

Developed a real-time diabetes prediction solution using Azure ML. Steps include connecting to the workspace, creating a compute cluster, preparing data, training a logistic regression model, deploying it as a web service, orchestrating an inference pipeline, and conducting real-time predictions.

In [None]:
import azureml.core
import os
print("Ready to use Azure ML", azureml.core.VERSION)

#### Connecting to Workspace

In [None]:
from azureml.core import Workspace
ws = Workspace.from_config()
print(ws.name, "loaded")

#### Create a compute cluster

In [None]:
from azureml.core.compute import ComputeTarget, AmlCompute
from azureml.core.compute_target import ComputeTargetException

cluster_name = "your-compute-cluster"

try:
    # Check for existing compute target
    training_cluster = ComputeTarget(workspace=ws, name=cluster_name)
    print('Found existing cluster, use it.')
except ComputeTargetException:
    # If it doesn't already exist, create it
    try:
        compute_config = AmlCompute.provisioning_configuration(vm_size='STANDARD_DS11_V2', max_nodes=2)
        training_cluster = ComputeTarget.create(ws, cluster_name, compute_config)
        training_cluster.wait_for_completion(show_output=True)
    except Exception as ex:
        print(ex)

#### View Azure Machine Learning resources in the workspace


In [None]:
print("Compute Resources:")
for compute_name in ws.compute_targets:
    compute = ws.compute_targets[compute_name]
    print("\t", compute.name, ':', compute.type)

#### Create Datastore and Upload Tabular Dataset 

In [None]:
from azureml.core import Dataset
from azureml.core import Dataset
from azureml.data.datapath import DataPath
default_ds = ws.get_default_datastore()

Dataset.File.upload_directory(src_dir='./diabetes-data/',
                              target=DataPath(default_ds, 'diabetes-data/')
                              )

tab_data_set = Dataset.Tabular.from_delimited_files(path=(default_ds, './diabetes-data/*.csv'))
tab_data_set.take(20).to_pandas_dataframe()

## Define an environment


In [None]:
from azureml.core import Environment

experiment_env = Environment.from_conda_specification("experiment_env","./experiment_env.yml")

# Let Azure ML manage dependencies
experiment_env.python.user_managed_dependencies = False 

# Print the environment details
print(experiment_env.name, 'defined.')
print(experiment_env.python.conda_dependencies.serialize_to_string())

## Register the environment


In [None]:
experiment_env.register(workspace=ws)
registered_env = Environment.get(ws, 'experiment_env')
diabetes_ds = ws.datasets.get("diabetes dataset")

### Train Model using sript

In [None]:
from azureml.core import Experiment, ScriptRunConfig, Environment
from azureml.core.runconfig import DockerConfiguration
from azureml.widgets import RunDetails

diabetes_ds = ws.datasets.get("diabetes dataset")

script_config = ScriptRunConfig(source_directory='./training_folder/',
                              script='lg_training_script.py',
                              arguments = ['--regularization', 0.1, # Regularizaton rate parameter
                                           '--input-data', diabetes_ds.as_named_input('training_data')],
                              environment=registered_env,
                              docker_runtime_config=DockerConfiguration(use_docker=True),
                              compute_target=cluster_name) 

# submit the experiment
experiment_name = 'mslearn-train-diabetes'
experiment = Experiment(workspace=ws, name=experiment_name)
run = experiment.submit(config=script_config)
RunDetails(run).show()
run.wait_for_completion()

### Register the Trained Model

In [None]:
# Register the model
run.register_model(model_path='./outputs/diabetes_model.pkl', model_name='diabetes_model',
                   tags={'Training context':'Inline Training'},
                   properties={'AUC': run.get_metrics()['AUC'], 'Accuracy': run.get_metrics()['Accuracy']})

print('Model trained and registered.')

## Deploy the model as a web service

In [None]:
from azureml.core import Model
for model in Model.list(ws):
    print(model.name, 'version:', model.version)
    for tag_name in model.tags:
        tag = model.tags[tag_name]
        print ('\t',tag_name, ':', tag)
    for prop_name in model.properties:
        prop = model.properties[prop_name]
        print ('\t',prop_name, ':', prop)
    print('\n')

In [None]:
model = ws.models['diabetes_model']
print(model.name, 'version', model.version)

In [None]:
deployment_folder = './diabetes_service'
os.makedirs(deployment_folder, exist_ok=True)
print(deployment_folder, 'folder created.')
script_path = os.path.join(deployment_folder,script_file)

In [None]:
from azureml.core import Environment
from azureml.core.model import InferenceConfig
from azureml.core.webservice import AciWebservice

# Configure the scoring environment
service_env = Environment.get(workspace=ws, name="AzureML-sklearn-0.24.1-ubuntu18.04-py37-cpu-inference")
service_env.inferencing_stack_version="latest"

inference_config = InferenceConfig(source_directory=deployment_folder,
                                   entry_script='./training_folder/score_diabetes.py',
                                   environment=service_env)

# Configure the web service container
deployment_config = AciWebservice.deploy_configuration(cpu_cores=1, memory_gb=1)

# Deploy the model as a service
print('Deploying model...')
service_name = "diabetes-service"
service = Model.deploy(ws, service_name, [model], inference_config, deployment_config, overwrite=True)
service.wait_for_deployment(True)
print(service.state)

In [None]:
print(service.get_logs())

### Use Web Service

In [None]:
endpoint = service.scoring_uri
print(endpoint)

In [None]:
import requests
import json

x_new = [[2,180,74,24,21,23.9091702,1.488172308,22],
         [0,148,58,11,179,39.19207553,0.160829008,45]]

# Convert the array to a serializable list in a JSON document
input_json = json.dumps({"data": x_new})

# Set the content type
headers = { 'Content-Type':'application/json' }

predictions = requests.post(endpoint, input_json, headers = headers)
predicted_classes = json.loads(predictions.json())

for i in range(len(x_new)):
    print ("Patient {}".format(x_new[i]), predicted_classes[i] )

## Creating Batch Inference Pipeline

In [None]:
experiment_folder = './batch_pipeline'
os.makedirs(experiment_folder, exist_ok=True)
print(experiment_folder)

Using a ParallelRunStep in a pipeline allows for faster batch predictions by processing data in parallel. The results are then combined into a single output file, named parallel_run_step.txt.

In [None]:
from azureml.pipeline.steps import ParallelRunConfig, ParallelRunStep
from azureml.data import OutputFileDatasetConfig

output_dir = OutputFileDatasetConfig(name='inferences')

parallel_run_config = ParallelRunConfig(
    source_directory=experiment_folder,
    entry_script="./training_folder/batch_score_diabetes.py",
    mini_batch_size="5",
    error_threshold=10,
    output_action="append_row",
    environment=registered_env,
    compute_target=inference_cluster,
    node_count=2)

parallelrun_step = ParallelRunStep(
    name='batch-score-diabetes',
    parallel_run_config=parallel_run_config,
    inputs=[batch_data_set.as_named_input('diabetes_batch')],
    output=output_dir,
    arguments=[],
    allow_reuse=True
)


In [None]:
from azureml.core import Experiment
from azureml.pipeline.core import Pipeline

pipeline = Pipeline(workspace=ws, steps=[parallelrun_step])
pipeline_run = Experiment(ws, 'mslearn-diabetes-batch').submit(pipeline)
pipeline_run.wait_for_completion(show_output=True)

### Retrive the output results

In [None]:
shutil.rmtree('./diabetes-results', ignore_errors=True)

# Get the run for the first step and download its output
prediction_run = next(pipeline_run.get_children())
prediction_output = prediction_run.get_output_data('inferences')
prediction_output.download(local_path='diabetes-results')

for root, dirs, files in os.walk('./diabetes-results'):
    for file in files:
        if file.endswith('parallel_run_step.txt'):
            result_file = os.path.join(root,file)

# cleanup output format
df = pd.read_csv(result_file, delimiter=":", header=None)
df.columns = ["File", "Prediction"]
df.head(20)

### Publish the pipeline and use it as Service

In [None]:
published_pipeline = pipeline_run.publish_pipeline(
    name='diabetes-batch-pipeline', description='Batch scoring of diabetes data', version='1.0')

published_pipeline

In [None]:
rest_endpoint = published_pipeline.endpoint
print(rest_endpoint)

Authentication using Interactive Login  and initiates a pipeline run using a REST API call, obtaining the run ID for tracking the execution.

In [None]:
from azureml.core.authentication import InteractiveLoginAuthentication

interactive_auth = InteractiveLoginAuthentication()
auth_header = interactive_auth.get_authentication_header()
print('Authentication header ready.')

import requests

rest_endpoint = published_pipeline.endpoint
response = requests.post(rest_endpoint, 
                         headers=auth_header, 
                         json={"ExperimentName": "mslearn-diabetes-batch"})
run_id = response.json()["Id"]
run_id

Since we have the run ID, we can use the **RunDetails** widget to view the experiment as it runs:

In [None]:
from azureml.pipeline.core.run import PipelineRun
from azureml.widgets import RunDetails

published_pipeline_run = PipelineRun(ws.experiments['mslearn-diabetes-batch'], run_id)

# Block until the run completes
published_pipeline_run.wait_for_completion(show_output=True)

Download the output of the pipeline's first step, and display the first 20 results from the 'parallel_run_step.txt' file in a formatted DataFrame.

In [None]:
# Remove the local results folder if left over from a previous run
shutil.rmtree('./diabetes-results', ignore_errors=True)

# Get the run for the first step and download its output
prediction_run = next(pipeline_run.get_children())
prediction_output = prediction_run.get_output_data('inferences')
prediction_output.download(local_path='diabetes-results')

# Traverse the folder hierarchy and find the results file
for root, dirs, files in os.walk('diabetes-results'):
    for file in files:
        if file.endswith('parallel_run_step.txt'):
            result_file = os.path.join(root,file)

# cleanup output format
df = pd.read_csv(result_file, delimiter=":", header=None)
df.columns = ["File", "Prediction"]

# Display the first 20 results
df.head(20)