In [1]:
# !pip install -r lib/requirements.txt

In [2]:
import warnings
warnings.filterwarnings("ignore")

In [3]:
import pandas as pd
import sklearn
import zipfile
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestRegressor

from raiwidgets import ResponsibleAIDashboard
from responsibleai import RAIInsights
from urllib.request import urlretrieve
import zipfile

In [4]:
from azure.ai.ml import MLClient
from azure.identity import DefaultAzureCredential
from azure.ai.ml.entities import Environment, BuildContext
from azureml.mlflow import register_model
import mlflow
import pandas as pd

subscription_id = "<SUBSCRIPTION_ID>"
resource_group = "<RESOURCE_GROUP>"
workspace = "<AML_WORKSPACE_NAME>"

#connect to the workspace
registry_name = "azureml"

#credential = DefaultAzureCredential()
ml_client =  MLClient(DefaultAzureCredential(), subscription_id, resource_group, workspace,)

ml_client_registry = MLClient(DefaultAzureCredential(), subscription_id, resource_group, workspace, registry_name)

In [5]:
compute_name = "trainingcompute"

In [6]:
from azure.ai.ml.entities import AmlCompute

all_compute_names = [x.name for x in ml_client.compute.list()]

if compute_name in all_compute_names:
    print(f"Found existing compute: {compute_name}")
else:
    my_compute = AmlCompute(
        name=compute_name,
        size="Standard_DS2_v2",
        min_instances=0,
        max_instances=4,
        idle_time_before_scale_down=3600
    )
    ml_client.compute.begin_create_or_update(my_compute)
    print("Initiated compute creation")

Found existing compute: trainingcompute


In [7]:
rai_hospital_version_string = '1'
version='1'

In [8]:
model_name = 'rai_hospital_model'

In [9]:

models = ml_client.models.list(name=model_name)
model = models.next()
latest_model_version = model.version


expected_model_id = f'rai_hospital_model:{latest_model_version}'
azureml_model_id = f'azureml:{expected_model_id}'

In [10]:
def get_categorical_numerical_data(dataset):
    dataset = dataset.drop([target_column], axis = 1)  
    categorical = []
    for col, value in dataset.iteritems():
        if value.dtype == 'object' or value.dtype == 'bool':
            categorical.append(col)
    numerical = dataset.columns.difference(categorical)
    return categorical, numerical

In [11]:
train_data = pd.read_parquet('data/training_data.parquet')
test_data = pd.read_parquet('data/testing_data.parquet')

In [12]:
from azure.ai.ml import dsl, Input


hospital_train_parquet = Input(
    type="uri_file", path="data/training_data.parquet", mode="download"
)

hospital_test_parquet = Input(
    type="uri_file", path="data/testing_data.parquet", mode="download"
)

In [13]:
target_column = "readmit_status"

In [14]:
# get categorical and numerical fields from training data
categorical, numerical = get_categorical_numerical_data(train_data)
print("categorical columns: ",  categorical)
print("numerical field: ", numerical)

categorical columns:  ['race', 'gender', 'age', 'discharge_destination', 'admission_source', 'primary_diagnosis', 'max_glu_serum', 'A1Cresult', 'insulin', 'diabetes_Med_prescribe', 'medicare', 'medicaid']
numerical field:  Index(['num_lab_procedures', 'num_medications', 'num_procedures',
       'number_diagnoses', 'prior_emergency', 'prior_inpatient',
       'prior_outpatient', 'time_in_hospital'],
      dtype='object')


In [15]:
label = "latest"

rai_constructor_component = ml_client_registry.components.get(
    name="microsoft_azureml_rai_tabular_insight_constructor", label=label
)

# We get latest version and use the same version for all components
version = rai_constructor_component.version

rai_explanation_component = ml_client_registry.components.get(
    name="microsoft_azureml_rai_tabular_explanation", version=version
)

rai_erroranalysis_component = ml_client_registry.components.get(
    name="microsoft_azureml_rai_tabular_erroranalysis", version=version
)

rai_gather_component = ml_client_registry.components.get(
    name="microsoft_azureml_rai_tabular_insight_gather", version=version
)

In [16]:
import json
from azure.ai.ml import dsl, Input

@dsl.pipeline(
        compute=compute_name,
        description="RAI computation on hospital readmit classification data",
        experiment_name= "RAI_hospital_Classification_RAIInsights_Computation_{rai_hospital_version_string}",
    )
def rai_classification_pipeline(
        target_column_name,
        training_data,
        testing_data
    ):
        # Initiate the RAIInsights
        create_rai_job = rai_constructor_component(
            title="RAI Dashboard",
            task_type="classification",
            model_info=expected_model_id,
            model_input=Input(type=AssetTypes.MLFLOW_MODEL, path=azureml_model_id),            
            train_dataset=training_data,
            test_dataset=testing_data,
            target_column_name=target_column_name,
            categorical_column_names=json.dumps(categorical),
        )
        create_rai_job.set_limits(timeout=120)
        
        # Add an explanation
        explain_job = rai_explanation_component(
            comment="Explanation for hospital remitted less than 30days  classification",
            rai_insights_dashboard=create_rai_job.outputs.rai_insights_dashboard,
        )
        explain_job.set_limits(timeout=120)
        
        # Add error analysis
        erroranalysis_job = rai_erroranalysis_component(
            rai_insights_dashboard=create_rai_job.outputs.rai_insights_dashboard,
        )
        erroranalysis_job.set_limits(timeout=120)

        # Combine everything
        rai_gather_job = rai_gather_component(
            constructor=create_rai_job.outputs.rai_insights_dashboard,
            insight_1=explain_job.outputs.explanation,
            insight_4=erroranalysis_job.outputs.error_analysis,
        )
        rai_gather_job.set_limits(timeout=120)

        rai_gather_job.outputs.dashboard.mode = "upload"
        rai_gather_job.outputs.ux_json.mode = "upload"

        return {
            "dashboard": rai_gather_job.outputs.dashboard,
            "ux_json": rai_gather_job.outputs.ux_json
        }

In [17]:
from azure.ai.ml.entities import PipelineJob
import webbrowser
import time

def submit_and_wait(ml_client, pipeline_job) -> PipelineJob:
    created_job = ml_client.jobs.create_or_update(pipeline_job)
    assert created_job is not None

    while created_job.status not in ['Completed', 'Failed', 'Canceled', 'NotResponding']:
        time.sleep(30)
        created_job = ml_client.jobs.get(created_job.name)
        print("Latest status : {0}".format(created_job.status))


    # open the pipeline in web browser
    webbrowser.open(created_job.services["Studio"].endpoint)
    
    #assert created_job.status == 'Completed'
    return created_job

In [18]:
import uuid
from azure.ai.ml.constants import AssetTypes
from azure.ai.ml import Output

# Pipeline to construct the RAI Insights
insights_pipeline_job = rai_classification_pipeline(
    target_column_name=target_column,
    training_data=hospital_train_parquet,
    testing_data=hospital_test_parquet,
)

# Workaround to enable the download
rand_path = str(uuid.uuid4())
insights_pipeline_job.outputs.dashboard = Output(
    path=f"azureml://datastores/workspaceblobstore/paths/{rand_path}/dashboard/",
    mode="upload",
    type="uri_folder",
)
insights_pipeline_job.outputs.ux_json = Output(
    path=f"azureml://datastores/workspaceblobstore/paths/{rand_path}/ux_json/",
    mode="upload",
    type="uri_folder",
)


# submit pipeline
insights_job = submit_and_wait(ml_client, insights_pipeline_job)

Latest status : Running
Latest status : Running
Latest status : Running
Latest status : Running
Latest status : Running
Latest status : Running
Latest status : Completed
