# a Pipeline

### Prepare a compute environment for the pipeline steps the compute will require a Python environment with the necessary package dependencies installed, so you'll need to create a run configuration.

#### Call the pipeline endpoint

To use the endpoint, client applications need to make a REST call over HTTP. This request must be authenticated, so an authorization header is required.

In [None]:
from azureml.core.authentication import InteractiveLoginAuthentication
import requests

interactive_auth = InteractiveLoginAuthentication()
auth_header = interactive_auth.get_authentication_header()
rest_endpoint = published_pipeline.endpoint
response = requests.post(rest_endpoint, headers=auth_header, json={"ExperimentName": 'Run_pipeline'})
run_id = response.json()["Id"]

In [None]:
from azureml.pipeline.core.run import PipelineRun

published_pipeline_run = PipelineRun(ws.experiments[experiment_name], run_id)
pipeline_run.wait_for_completion(show_output=True)



### Schedule the Pipeline

In [None]:
from azureml.pipeline.core import ScheduleRecurrence, Schedule

# Submit the Pipeline every Monday at 00:00 UTC
recurrence = ScheduleRecurrence(frequency="Week", interval=1, week_days=["Monday"], time_of_day="00:00")
weekly_schedule = Schedule.create(ws, name="weekly-diabetes-training", 
                                  description="Based on time",
                                  pipeline_id=published_pipeline.id, 
                                  experiment_name=experiment_name, 
                                  recurrence=recurrence)
schedules = Schedule.list(ws)
pipeline_experiment = ws.experiments.get(experiment_name)
latest_run = list(pipeline_experiment.get_runs())[0]
latest_run.get_details()



# Create a real-time inferencing service

 We'll deploy the container a service named diabetes-service. The deployment process includes the following steps:

    Define an inference configuration, which includes the scoring and environment files required to load and use the model.
    Define a deployment configuration that defines the execution environment in which the service will be hosted. In this case, an Azure Container Instance.
    Deploy the model as a web service.
    Verify the status of the deployed service.


In [None]:
from azureml.core.webservice import AciWebservice
from azureml.core.model import InferenceConfig

inference_config = InferenceConfig(runtime= "python",entry_script=script_file,conda_file=env_file)
deployment_config = AciWebservice.deploy_configuration(cpu_cores = 1, memory_gb = 1)

service_name = "diabetes-service"
service = Model.deploy(ws, service_name, [model], inference_config, deployment_config)
service.wait_for_deployment(True)
print(service.state)
print(service.get_logs())
[print(webservice_name) for webservice_name in ws.webservices]



# Create a Batch Inferencing Service

Generate and upload batch data¶

In [None]:
# Set default data store
# Enumerate all datastores, indicating which is the default
# Load the diabetes data
# Get a 100-item sample of the feature columns (not the diabetic label)
# Create a folder
# Save each sample as a separate file

# Upload the files to the default datastore
default_ds.upload(src_dir="batch-data", target_path="batch-data", overwrite=True, show_progress=True)

# Register a dataset for the input data
batch_data_set = Dataset.File.from_files(path=(default_ds, 'batch-data/'), validate=False)
 batch_data_set = batch_data_set.register(workspace=ws,  name='batch-data',
                                         description='batch data',create_new_version=True)
#Create compute¶


## Create a pipeline for batch inferencing

In [None]:
os.makedirs(experiment_folder, exist_ok=True)
%%writefile $experiment_folder/batch_diabetes.py
import os
import numpy as np
from azureml.core import Model
import joblib


def init():
    global model# Runs when the pipeline step is initialized

    # load the model
    model_path = Model.get_model_path('diabetes_model')
    model = joblib.load(model_path)

def run(mini_batch):
    resultList = []# This runs for each batch
    # process each file in the batch
    for f in mini_batch:
        data = np.genfromtxt(f, delimiter=',') # Read the comma-delimited data into an array       
        prediction = model.predict(data.reshape(1, -1))# Reshape into a 2-dimensional array for prediction (model expects multiple items)
        resultList.append("{}: {}".format(os.path.basename(f), prediction[0]))
    return resultList



You're going to use a pipeline to run the batch prediction script, generate predictions from the input data, and save the results as a text file in the output folder. To do this, you can use a ParallelRunStep, 

In [None]:
from azureml.pipeline.steps import ParallelRunConfig, ParallelRunStep
from azureml.pipeline.core import PipelineData
from azureml.core import Experiment
from azureml.pipeline.core import Pipeline

default_ds = ws.get_default_datastore()

output_dir = PipelineData(name='inferences', 
                          datastore=default_ds, 
                          output_path_on_compute='diabetes/results')

parallel_run_config = ParallelRunConfig(
    source_directory=experiment_folder,
    entry_script="batch_diabetes.py",
    mini_batch_size="5",
    error_threshold=10,
    output_action="append_row",
    environment=batch_env,
    compute_target=inference_cluster,
    node_count=2)

parallelrun_step = ParallelRunStep(
    name='batch-score-diabetes',
    parallel_run_config=parallel_run_config,
    inputs=[batch_data_set.as_named_input('diabetes_batch')],
    output=output_dir,
    arguments=[],
    allow_reuse=True
)

pipeline = Pipeline(workspace=ws, steps=[parallelrun_step])
pipeline_run = Experiment(ws, 'batch_prediction_pipeline').submit(pipeline)
pipeline_run.wait_for_completion(show_output=True)

In [None]:
import pandas as pd
import shutil

# Remove the local results folder if left over from a previous run
shutil.rmtree('diabetes-results', ignore_errors=True)

# Get the run for the first step and download its output
prediction_run = next(pipeline_run.get_children())
prediction_output = prediction_run.get_output_data('inferences')
prediction_output.download(local_path='diabetes-results')



In [None]:
published_pipeline = pipeline_run.publish_pipeline(
    name='Diabetes_Parallel_Batch_Pipeline', description='Batch scoring of diabetes data', version='1.0')
rest_endpoint = published_pipeline.endpoint
from azureml.core.authentication import InteractiveLoginAuthentication

interactive_auth = InteractiveLoginAuthentication()
auth_header = interactive_auth.get_authentication_header()
rest_endpoint = published_pipeline.endpoint
response = requests.post(rest_endpoint, headers=auth_header, json={"ExperimentName": "Batch_Pipeline_via_REST"})
run_id = response.json()["Id"]

published_pipeline_run = PipelineRun(ws.experiments["Batch_Pipeline_via_REST"], run_id)
published_pipeline_run.wait_for_completion(show_output=True)

 #the results are in the output of the first pipeline step:
# Get the run for the first step and download its output
prediction_run = next(pipeline_run.get_children())
prediction_output = prediction_run.get_output_data('inferences')
prediction_output.download(local_path='diabetes-results')


# Monitor a Model

In [None]:
#Enable Application Insights
aci_service.update(enable_app_insights=True)
endpoint = aci_service.scoring_uri
######ceck for prediction now

On the Overview page, click the link for the associated Application Insights resource.

On the Application Insights blade, click Logs.

    Note: If this is the first time you've opened log analytics, you may need to click Get Started to open the query editor. If a tip explaining how to write a query is displayed, close it.

Paste the following query into the query editor and click Run

**
 traces
 |where  message == "STDOUT"
   and customDimensions.["Service Name"] == "diabetes-service-app-insights"
 |project timestamp, customDimensions.Content
**

View the results. At first there may be none, because an ACI web service can take up to five minutes to send the telemetry to Application Insights. Wait a few minutes and re-run the query until you see the logged data and predictions.

# Detect and Mitigate Unfairness in Models
you've trained a model, you can use the Fairlearn package to compare its behavior for different sensitive feature values.


    Use the fairlearn selection_rate function to return the selection rate (percentage of positive predictions) for the overall population.
    Use scikit-learn metric functions to calculate overall accuracy, recall, and precision metrics.
    
    Use a MetricFrame to calculate selection rate, accuracy, recall, and precision for each age group in the Age sensitive feature. Note that a mix of fairlearn and scikit-learn metric functions are used to calculate the performance values.


In [None]:
from fairlearn.metrics import selection_rate, MetricFrame
from sklearn.metrics import accuracy_score, recall_score, precision_score

# Get predictions for the witheld test data
y_hat = diabetes_model.predict(X_test)
overall_selection_rate = selection_rate(y_test, y_hat)
overall_accuracy = accuracy_score(y_test, y_hat)
overall_precision = precision_score(y_test, y_hat)
overall_recall = recall_score(y_test, y_hat)

In [None]:
metrics = {'selection_rate': selection_rate,
           'accuracy': accuracy_score,
           'recall': recall_score,
           'precision': precision_score}

group_metrics = MetricFrame(metrics,
                             y_test, y_hat,
                             sensitive_features=S_test['Age'])

print(group_metrics.by_group)

View the dashboard visualization, which shows:

    Disparity in performance - how the selected performance metric compares for the subpopulations, including underprediction (false negatives) and overprediction (false positives).
    Disparity in predictions - A comparison of the number of positive cases per subpopulation.


In [None]:
simply removing the Age feature slightly reduces the disparity in recall, but increases the disparity in precision and accuracy. This underlines one the key difficulties in applying fairness to machine learning models - you must be clear about what fairness means in a particular context, and optimize for that.

In [None]:
from fairlearn.widget import FairlearnDashboard

# View this model in Fairlearn's fairness dashboard, and see the disparities which appear:
FairlearnDashboard(sensitive_features=S_test, 
                   sensitive_feature_names=['Age'],
                   y_true=y_test,
                   y_pred={"diabetes_model": diabetes_model.predict(X_test)})

In [None]:

run = exp.start_logging()

# Upload the dashboard to Azure Machine Learning
try:
    dashboard_title = "Fairness insights of Diabetes Classifier"
    upload_id = upload_dashboard_dictionary(run,
                                            dash_dict,
                                            dashboard_name=dashboard_title)
    print("\nUploaded to id: {0}\n".format(upload_id))

    # To test the dashboard, you can download it
    downloaded_dict = download_dashboard_by_upload_id(run, upload_id)
    print(downloaded_dict)
finally:
    run.complete()

Now that you've analyzed the model for fairness, you can use any of the mitigation techniques supported by the FairLearn package to find a model that achieves the best balance of predictive performance and fairness.

 you'll use the GridSearch feature, which trains multiple models in an attempt to minimize the disparity of predictive performance for the sensitive features in the dataset (in this case, the age groups). You'll optimize the models by applying the EqualizedOdds parity constraint, which tries to ensure that models that exhibit similar true and false positive rates for each sensitive feature grouping.
 
 
    Register the models found by the GridSearch process.
    Compute the performance and disparity metrics for the models.
    Upload the metrics in an Azure Machine Learning experiment.


In [None]:
from fairlearn.reductions import GridSearch, EqualizedOdds
import joblib
import os

print('Finding mitigated models...')

# Train multiple models
sweep = GridSearch(DecisionTreeClassifier(),
                   constraints=EqualizedOdds(),
                   grid_size=20)

sweep.fit(X_train, y_train, sensitive_features=S_train.Age)
models = sweep.predictors_

# Save the models and get predictions from them (plus the original unmitigated one for comparison)
model_dir = 'mitigated_models'
os.makedirs(model_dir, exist_ok=True)
model_name = 'diabetes_unmitigated'
print(model_name)
joblib.dump(value=diabetes_model, filename=os.path.join(model_dir, '{0}.pkl'.format(model_name)))
predictions = {model_name: diabetes_model.predict(X_test)}
i = 0
for model in models:
    i += 1
    model_name = 'diabetes_mitigated_{0}'.format(i)
    print(model_name)
    joblib.dump(value=model, filename=os.path.join(model_dir, '{0}.pkl'.format(model_name)))
    predictions[model_name] = model.predict(X_test)

#Now you can use the FairLearn dashboard to compare the mitigated models:


FairlearnDashboard(sensitive_features=S_test, 
                   sensitive_feature_names=['Age'],
                   y_true=y_test,
                   y_pred=predictions)



# Interpret Models

 There are many kinds of explainer.  use a Tabular Explainer, which is a "black box" explainer that can be used to explain many kinds of model by invoking an appropriate SHAP model explainer.

In [None]:
from interpret.ext.blackbox import TabularExplainer
tab_explainer = TabularExplainer(model,X_train, 
                             features=features,  classes=labels)# "features" and "classes" fields are optional


evaluating the overall feature importance --Get global feature importance

explaining individual observations --Get local feature importance

In [None]:
# Get the top features by importance
global_tab_explanation = tab_explainer.explain_global(X_train)
global_tab_feature_importance = global_tab_explanation.get_feature_importance_dict()
[print(feature,":", importance) for feature, importance in global_tab_feature_importance.items()]

In [None]:
# Get local explanations # Get feature names and importance for each possible label
local_tab_explanation = tab_explainer.explain_local(X_explain)
local_tab_features = local_tab_explanation.get_ranked_local_names()
local_tab_importance = local_tab_explanation.get_ranked_local_values()

In [None]:
for l in range(len(local_tab_features)):
    for o in range(len(local_tab_features[l])):
        feature_list = label[o]
        total_support = 0
        for f in range(len(feature_list)):
            print("\t\t", feature_list[f], ':', local_tab_importance[l][o][f])
            total_support += local_tab_importance[l][o][f]
        print("\t\t ----------\n\t\t Total:", total_support, "Prediction:", labels[predictions[o]])

you can also click the View run details link in the Run Details widget to see the run in Azure Machine Learning studio, and view the Explanations tab. Then:

    Select the Tabular Explanation explainer.
    View the Global Importance chart, which shows the overall global feature importance.
    View the Summary Importance chart, which shows each data point from the test data in a swarm, violin, or box plot.
    Select an individual point to see the Local Feature Importance for the individual prediction for the selected data point.


In [None]:
from azureml.core.run import Run

# Import libraries for model explanation
from azureml.interpret import ExplanationClient
from interpret.ext.blackbox import TabularExplainer
run = Run.get_context()
###############CODE#################
# note file saved in the outputs folder is automatically uploaded into experiment record
joblib.dump(value=model, filename='outputs/diabetes.pkl')

# Get explanation
explainer = TabularExplainer(model, X_train, features=features, classes=labels)
explanation = explainer.explain_global(X_test)
# Get an Explanation Client and upload the explanation
explain_client = ExplanationClient.from_run(run)
explain_client.upload_model_explanation(explanation, comment='Tabular Explanation')

# Complete the run
run.complete()

### Retrieve the feature importance values

With the experiment run completed, you can use the ExplanationClient class to retrieve the feature importance from the explanation registered for the run.

In [None]:
packages = CondaDependencies.create(conda_packages=['scikit-learn','pandas','pip'],
                                    pip_packages=['azureml-defaults','azureml-interpret'])
explain_env.python.conda_dependencies = packages

# Create a script config
script_config = ScriptRunConfig(source_directory=experiment_folder,
                      script='diabetes_training.py',
                      environment=explain_env) 

from azureml.interpret import ExplanationClient

# Get the feature explanations
client = ExplanationClient.from_run(run)
engineered_explanations = client.download_model_explanation()
feature_importances = engineered_explanations.get_feature_importance_dict()

[print(key, '\t', value) for key, value in feature_importances.items()]
    

# Monitoring Data Drift

Over time, models can become less effective at predicting accurately due to changing trends in feature data. This phenomenon is known as data drift.

The data drift monitor will run periodicaly or on-demand to compare the baseline dataset with the target dataset, to which new data will be added over time.To run the data drift monitor, you'll need a compute target.

 use a DataDriftDetector class to define the data drift monitor for your data. You can specify the features you want to monitor for data drift, the name of the compute target to be used to run the monitoring process, the frequency at which the data should be compared, the data drift threshold above which an alert should be triggered, and the latency (in hours) to allow for data collection.

In [None]:
from azureml.datadrift import DataDriftDetector
features = ['Pregnancies', 'Age', 'BMI']# set up feature list

monitor = DataDriftDetector.create_from_datasets(ws, 'diabetes-drift-detector', baseline_data_set, target_data_set,
                                                      compute_target=cluster_name, 
                                                      frequency='Week', 
                                                      feature_list=features, 
                                                      drift_threshold=.3, 
                                                      latency=24)

## Backfill the data drift monitor

You have a baseline dataset and a target dataset that includes simulated weekly data collection for six weeks. You can use this to backfill the monitor so that it can analyze data drift between the original baseline and the target data.

    Note This may take some time to run, as the compute target must be started to run the backfill analysis. 

In [None]:
from azureml.widgets import RunDetails

backfill = monitor.backfill( dt.datetime.now() - dt.timedelta(weeks=6), dt.datetime.now())
RunDetails(backfill).show()
backfill.wait_for_completion()

# to examine data drift for the points in time collected in the backfill run.
drift_metrics = backfill.get_metrics()
[print(metric, drift_metrics[metric]) for metric in drift_metrics]

On the Datasets page, view Dataset monitors tab.
Click the data drift monitor you want to view.
Select date range over which you want to view data drift metrics