In [1]:
#conda install -c conda-forge lightgbm

In [2]:
#conda install azure-common azure-ai-ml==0.1.0b6 mltable==0.1.0b3 azureml_dataprep azureml_dataprep_rslex responsibleai~=0.18.0 raiwidgets~=0.18.0 pandas pyarrow shap

In [3]:
import warnings
warnings.filterwarnings("ignore")

In [4]:
import sklearn
import zipfile
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestRegressor
import pandas as pd

from raiwidgets import ResponsibleAIDashboard
from responsibleai import RAIInsights
from urllib.request import urlretrieve
import zipfile



In [5]:
from azure.ai.ml import MLClient
from azure.identity import DefaultAzureCredential
from azureml.mlflow import register_model
import mlflow

#connect to the workspace
credential = DefaultAzureCredential()
ml_client =  MLClient.from_config(credential=credential)

Found the config file in: ./config.json


In [6]:
compute_name = "trainingcompute"

In [7]:
from azure.ai.ml.entities import AmlCompute

all_compute_names = [x.name for x in ml_client.compute.list()]

if compute_name in all_compute_names:
    print(f"Found existing compute: {compute_name}")
else:
    my_compute = AmlCompute(
        name=compute_name,
        size="Standard_DS2_v2",
        min_instances=0,
        max_instances=4,
        idle_time_before_scale_down=3600
    )
    ml_client.compute.begin_create_or_update(my_compute)
    print("Initiated compute creation")

Found existing compute: trainingcompute


In [8]:
rai_emp_attrition_classifier_version_string = '46'
version='1'

In [9]:
data_df = pd.read_csv('data/WA_Fn-UseC_-HR-Employee-Attrition.csv')

In [10]:
# Dropping Employee count as all values are 1 and hence attrition is independent of this feature
data_df = data_df.drop(['EmployeeCount'], axis=1)
# Dropping Employee Number since it is merely an identifier
data_df = data_df.drop(['EmployeeNumber'], axis=1)
data_df = data_df.drop(['Over18'], axis=1)

# Since all values are 80
data_df = data_df.drop(['StandardHours'], axis=1)

# Converting target variables from string to numerical values
#target_map = {'Yes': 'Leaving', 'No': 'Staying'}
target_map = {'Yes': '1', 'No': '0'}
data_df["Attrition_numerical"] = data_df["Attrition"].apply(lambda x: target_map[x])
data_df = data_df.drop(['Attrition'], axis=1)


target_column = "Attrition_numerical"

In [11]:
train, test = train_test_split(data_df, test_size=0.3)

In [12]:
source_data_path = 'data/all_emp_dataset.parquet'

train_data = train.to_parquet('data/train_dataset.parquet')
test_data = test.to_parquet('data/test_dataset.parquet')
all_emp_data_parquet = data_df.to_parquet(source_data_path)

In [13]:
import os
from azure.ai.ml.entities import Data
from azure.ai.ml.constants import AssetTypes


training_dataset_filename = 'emp_attr_train_parquet'
testing_dataset_filename = 'emp_attr_test_parquet'


training_data = Data(
    name=training_dataset_filename,
    path='data/train_dataset.parquet',
    type=AssetTypes.URI_FILE,
    description="RAI employee attrition train data",  
)

tr_data = ml_client.data.create_or_update(training_data)

testing_data = Data(
    name=testing_dataset_filename,
    path='data/test_dataset.parquet',
    type=AssetTypes.URI_FILE,
    description="RAI employee attrition test data",  
)

te_data = ml_client.data.create_or_update(testing_data)


[32mUploading train_dataset.parquet[32m (< 1 MB): 100%|██████████| 57.5k/57.5k [00:00<00:00, 4.21MB/s]
[39m

[32mUploading test_dataset.parquet[32m (< 1 MB): 100%|██████████| 39.2k/39.2k [00:00<00:00, 2.85MB/s]
[39m



In [14]:
import os

os.makedirs('component', exist_ok=True)

In [15]:
%%writefile component/training.py


from pathlib import Path
import sys
import os

parent_dir =  os.path.dirname(os.getcwd())
 
# setting path
sys.path.append(parent_dir)

import argparse
import os
import shutil
import tempfile
from prep_data import split_label, transform_data


from azureml.core import Run

import mlflow
import mlflow.sklearn

import pandas as pd
import numpy as np
from sklearn.compose import make_column_selector as selector
from sklearn.preprocessing import OneHotEncoder, OrdinalEncoder, StandardScaler
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
#from lightgbm import LGBMClassifier


from sklearn.linear_model import LogisticRegression
from sklearn.pipeline import make_pipeline
from sklearn.model_selection import train_test_split
from sklearn.metrics import r2_score, mean_absolute_error, mean_squared_error


def parse_args():
    # setup arg parser
    parser = argparse.ArgumentParser()

    # add arguments
    parser.add_argument("--training_data", type=str, help="Path to training data")
    parser.add_argument("--target_column_name", type=str, help="Name of target column")
    parser.add_argument("--model_output", type=str, help="Path of output model")

    # parse args
    args = parser.parse_args()    

    # return args
    return args


def main(args):
    current_experiment = Run.get_context().experiment
    tracking_uri = current_experiment.workspace.get_mlflow_tracking_uri()
    print("tracking_uri: {0}".format(tracking_uri))
    mlflow.set_tracking_uri(tracking_uri)
    mlflow.set_experiment(current_experiment.name)

    # Read in data
    print("Reading data")
    all_training_data = pd.read_parquet(args.training_data)
    target = all_training_data[args.target_column_name]
    features = all_training_data.drop([args.target_column_name], axis = 1)  

    # Transform string data to numeric
    numerical_selector = selector(dtype_exclude=object, dtype_include=np.number)
    categorical_selector = selector(dtype_include=object)

    numerical_columns = numerical_selector(features)
    categorical_columns = categorical_selector(features)

    categorial_encoder = OneHotEncoder(handle_unknown="ignore")
    numerical_encoder = StandardScaler()

    preprocessor = ColumnTransformer([
    ('ordinal-encoder', categorial_encoder, categorical_columns),
    ('standard_scaler', numerical_encoder, numerical_columns)])

    clf = make_pipeline(preprocessor, LogisticRegression())

    X_train, X_test, y_train, y_test = train_test_split(features, target, test_size=0.3)

    print("Training model...") 
    
    model = clf.fit(X_train, y_train)

    # Track model metrics


    # Saving model with mlflow - leave this section unchanged
    model_dir =  "./model_output"
    with tempfile.TemporaryDirectory() as td:
        print("Saving model with MLFlow to temporary directory")
        tmp_output_dir = os.path.join(td, model_dir)
        mlflow.sklearn.save_model(sk_model=model, path=tmp_output_dir)

        print("Copying MLFlow model to output path")
        for file_name in os.listdir(tmp_output_dir):
            print("  Copying: ", file_name)
            # As of Python 3.8, copytree will acquire dirs_exist_ok as
            # an option, removing the need for listdir
            shutil.copy2(src=os.path.join(tmp_output_dir, file_name), dst=os.path.join(args.model_output, file_name))


# run script
if __name__ == "__main__":
    # add space in logs
    print("*" * 60)
    print("\n\n")

    # parse args
    args = parse_args()

    # run main function
    main(args)

    # add space in logs
    print("*" * 60)
    print("\n\n")

Overwriting component/training.py


In [16]:
from azure.ai.ml import load_component

yaml_contents = f"""
$schema: http://azureml/sdk-2-0/CommandComponent.json
name: rai_employee_attrition_training_component
display_name: Employee Atrition classification training component for RAI example
version: {rai_emp_attrition_classifier_version_string}
type: command
inputs:
  training_data:
    type: path
  target_column_name:
    type: string
outputs:
  model_output:
    type: path
code: ./component/
environment: azureml:AzureML-sklearn-1.0-ubuntu20.04-py38-cpu:{str(version)}
""" + r"""
command: >-
  python training.py
  --training_data ${{{{inputs.training_data}}}}
  --target_column_name ${{{{inputs.target_column_name}}}}
  --model_output ${{{{outputs.model_output}}}}
"""

yaml_filename = "RAIEmployeeAttritionClassificationTrainingComponent.yaml"

with open(yaml_filename, 'w') as f:
    f.write(yaml_contents.format(yaml_contents))
    
train_component_definition = load_component(
    path=yaml_filename
)

ml_client.components.create_or_update(train_component_definition)

CommandComponent({'auto_increment_version': False, 'source': 'REMOTE.WORKSPACE.COMPONENT', 'is_anonymous': False, 'name': 'rai_employee_attrition_training_component', 'description': None, 'tags': {}, 'properties': {}, 'id': '/subscriptions/8a0f6419-1f4c-45b3-8d92-ee53be1ea443/resourceGroups/testRG/providers/Microsoft.MachineLearningServices/workspaces/rai-ws/components/rai_employee_attrition_training_component/versions/46', 'Resource__source_path': None, 'base_path': './', 'creation_context': <azure.ai.ml._restclient.v2022_05_01.models._models_py3.SystemData object at 0x7f3be24f3e50>, 'serialize': <msrest.serialization.Serializer object at 0x7f3be2544070>, 'command': 'python training.py --training_data ${{inputs.training_data}} --target_column_name ${{inputs.target_column_name}} --model_output ${{outputs.model_output}}', 'code': '/subscriptions/8a0f6419-1f4c-45b3-8d92-ee53be1ea443/resourceGroups/testRG/providers/Microsoft.MachineLearningServices/workspaces/rai-ws/codes/164610f2-914e-41

In [77]:
import time

model_name_suffix = int(time.time())
model_base_name = 'rai_employee_attrition_model'

In [78]:
from azure.ai.ml import dsl, Input

register_component =  ml_client.components.get(
        name="register_model", version=version
    )


train_model_component = ml_client.components.get(
    name="rai_employee_attrition_training_component", version=rai_emp_attrition_classifier_version_string
)
emppayrate_train_parquet = Input(
    type="uri_file", path="data/train_dataset.parquet", mode="download"
)

emppayrate_test_parquet = Input(
    type="uri_file", path="data/test_dataset.parquet", mode="download"
)

@dsl.pipeline(
    compute=compute_name,
    description="Register Model for RAI Employee Attrition",
    experiment_name=f"RAI_Employee_Attrition_Model_Training_{model_name_suffix}",
)
def my_training_pipeline(target_column_name, training_data):
    trained_model = train_component_definition(
        target_column_name=target_column_name,
        training_data=training_data
    )
    trained_model.set_limits(timeout=120)

    _ = register_component(
        model_input_path=trained_model.outputs.model_output,
        model_base_name=model_base_name,
        model_name_suffix=model_name_suffix,
    )

    return {}

model_registration_pipeline_job = my_training_pipeline(target_column, emppayrate_train_parquet)

In [79]:
from azure.ai.ml.entities import PipelineJob
import webbrowser

def submit_and_wait(ml_client, pipeline_job) -> PipelineJob:
    created_job = ml_client.jobs.create_or_update(pipeline_job)
    assert created_job is not None

    while created_job.status not in ['Completed', 'Failed', 'Canceled', 'NotResponding']:
        time.sleep(30)
        created_job = ml_client.jobs.get(created_job.name)
        print("Latest status : {0}".format(created_job.status))


    # open the pipeline in web browser
    webbrowser.open(created_job.services["Studio"].endpoint)
    
    #assert created_job.status == 'Completed'
    return created_job

# This is the actual submission
training_job = submit_and_wait(ml_client, model_registration_pipeline_job)

Latest status : Completed


In [80]:
expected_model_id = f'{model_base_name}_{model_name_suffix}:1'

In [81]:
def get_categorical_numerical_data(all_data):
    data_df = pd.read_parquet(all_data)
    categorical = []
    for col, value in data_df.iteritems():
        if value.dtype == 'object':
            categorical.append(col)
    numerical = data_df.columns.difference(categorical)
    #categorical.drop('Attrition_numerical')
    categorical.remove('Attrition_numerical')
    return categorical, numerical

In [82]:
# get categorical and numerical fields from training data
categorical, numerical = get_categorical_numerical_data(source_data_path)
print("categorical columns: ",  categorical)
print("numerical field: ", numerical)

categorical columns:  ['BusinessTravel', 'Department', 'EducationField', 'Gender', 'JobRole', 'MaritalStatus', 'OverTime']
numerical field:  Index(['Age', 'DailyRate', 'DistanceFromHome', 'Education',
       'EnvironmentSatisfaction', 'HourlyRate', 'JobInvolvement', 'JobLevel',
       'JobSatisfaction', 'MonthlyIncome', 'MonthlyRate', 'NumCompaniesWorked',
       'PercentSalaryHike', 'PerformanceRating', 'RelationshipSatisfaction',
       'StockOptionLevel', 'TotalWorkingYears', 'TrainingTimesLastYear',
       'WorkLifeBalance', 'YearsAtCompany', 'YearsInCurrentRole',
       'YearsSinceLastPromotion', 'YearsWithCurrManager'],
      dtype='object')


In [83]:
fetch_model_component = ml_client.components.get(
    name='fetch_registered_model', version=version
)

rai_constructor_component = ml_client.components.get(
    name="rai_insights_constructor", version=version
)

rai_counterfactual_component = ml_client.components.get(
    name="rai_insights_counterfactual", version=version
)

rai_causal_component = ml_client.components.get(
    name="rai_insights_causal", version=version
)

rai_explanation_component = ml_client.components.get(
    name="rai_insights_explanation", version=version
)

rai_erroranalysis_component = ml_client.components.get(
    name="rai_insights_erroranalysis", version=version
)

rai_gather_component = ml_client.components.get(
    name="rai_insights_gather", version=version
)

rai_scorecard_component = ml_client.components.get(
    name="rai_score_card", version=version
)

In [84]:
import json

score_card_config_dict = {
    "Model": {
        "ModelName": "Employee churn measure",
        "ModelType": "Classification",
        "ModelSummary": "This model provides a measure of employee leaving or staying with a company"
    },
    "Metrics" :{
        "accuracy_score": {
            "threshold": ">=0.85"
        }
    }
}

score_card_config_filename = "rai_employee_attrition_score_card_config.json"

with open(score_card_config_filename, 'w') as f:
    json.dump(score_card_config_dict, f)

In [85]:
import json

score_card_config_path = Input(
    type="uri_file",
    path=score_card_config_filename,
    mode="download"
)

@dsl.pipeline(
        compute=compute_name,
        description="RAI computation on emp attrition classification data",
        experiment_name=f"RAI_Employee_Attrition_Classification_RAIInsights_Computation_{model_name_suffix}",
    )
def rai_classification_pipeline(
        target_column_name,
        training_data,
        testing_data,
        score_card_config_path,
    ):

        # Fetch the model
        fetch_job = fetch_model_component(
            model_id=expected_model_id
        )
        
        # Initiate the RAIInsights
        create_rai_job = rai_constructor_component(
            title="RAI Dashboard",
            task_type="classification",
            model_info_path=fetch_job.outputs.model_info_output_path,
            train_dataset=training_data,
            test_dataset=testing_data,
            target_column_name=target_column_name,
            #classes=json.dumps(['Staying', 'Leaving']),
            classes=json.dumps(['1', '0']),
            categorical_column_names=json.dumps(categorical),
        )
        create_rai_job.set_limits(timeout=120)
        
        # Add an explanation
        explain_job = rai_explanation_component(
            comment="Explanation for employee attrition classification",
            rai_insights_dashboard=create_rai_job.outputs.rai_insights_dashboard,
        )
        explain_job.set_limits(timeout=120)
        
        # Add error analysis
        erroranalysis_job = rai_erroranalysis_component(
            rai_insights_dashboard=create_rai_job.outputs.rai_insights_dashboard,
        )
        erroranalysis_job.set_limits(timeout=120)

        # Add causal analysis
        causal_job = rai_causal_component(
            treatment_features=json.dumps(['Age', 'DailyRate', 'YearsAtCompany']),
            rai_insights_dashboard=create_rai_job.outputs.rai_insights_dashboard,
        )
        causal_job.set_limits(timeout=120)
        
        # Add counterfactual analysis
        counterfactual_job = rai_counterfactual_component(
            rai_insights_dashboard=create_rai_job.outputs.rai_insights_dashboard,
            total_cfs=10,
            desired_range=json.dumps([5, 10]),
            desired_class='opposite',
        )
        counterfactual_job.set_limits(timeout=600)

        # Combine everything
        rai_gather_job = rai_gather_component(
            constructor=create_rai_job.outputs.rai_insights_dashboard,
            insight_1=explain_job.outputs.explanation,
            insight_2=causal_job.outputs.causal,
            insight_3=counterfactual_job.outputs.counterfactual,
            insight_4=erroranalysis_job.outputs.error_analysis,
        )
        rai_gather_job.set_limits(timeout=120)

        rai_gather_job.outputs.dashboard.mode = "upload"
        rai_gather_job.outputs.ux_json.mode = "upload"

        # Generate score card in pdf format for a summary report on model performance,
        # and observe distrbution of error between prediction vs ground truth.
        rai_scorecard_job = rai_scorecard_component(
            dashboard=rai_gather_job.outputs.dashboard,
            pdf_generation_config=score_card_config_path
        )

        return {
            "dashboard": rai_gather_job.outputs.dashboard,
            "ux_json": rai_gather_job.outputs.ux_json,
            "scorecard": rai_scorecard_job.outputs.scorecard
        }

In [86]:
import uuid
from azure.ai.ml import Output

# Pipeline to construct the RAI Insights
insights_pipeline_job = rai_classification_pipeline(
    target_column_name=target_column,
    training_data=emppayrate_train_parquet,
    testing_data=emppayrate_test_parquet,
    score_card_config_path=score_card_config_path,
)

# Workaround to enable the download
rand_path = str(uuid.uuid4())
insights_pipeline_job.outputs.dashboard = Output(
    path=f"azureml://datastores/workspaceblobstore/paths/{rand_path}/dashboard/",
    mode="upload",
    type="uri_folder",
)
insights_pipeline_job.outputs.ux_json = Output(
    path=f"azureml://datastores/workspaceblobstore/paths/{rand_path}/ux_json/",
    mode="upload",
    type="uri_folder",

)
insights_pipeline_job.outputs.scorecard = Output(
    path=f"azureml://datastores/workspaceblobstore/paths/{rand_path}/scorecard/",
    mode="upload",
    type="uri_folder",
)

# submit pipeline
insights_job = submit_and_wait(ml_client, insights_pipeline_job)

Latest status : Running
Latest status : Running
Latest status : Running
Latest status : Running
Latest status : Running
Latest status : Running
Latest status : Running
Latest status : Running
Latest status : Running
Latest status : Failed


In [87]:
sub_id = ml_client._operation_scope.subscription_id
rg_name = ml_client._operation_scope.resource_group_name
ws_name = ml_client.workspace_name

expected_uri = f"https://ml.azure.com/model/{expected_model_id}/model_analysis?wsid=/subscriptions/{sub_id}/resourcegroups/{rg_name}/workspaces/{ws_name}"

print(f"Please visit {expected_uri} to see your analysis")

Please visit https://ml.azure.com/model/rai_employee_attrition_model_1662727362:1/model_analysis?wsid=/subscriptions/8a0f6419-1f4c-45b3-8d92-ee53be1ea443/resourcegroups/testRG/workspaces/rai-ws to see your analysis


In [88]:
target_directory = "."

ml_client.jobs.download(
    insights_job.name, download_path=target_directory, output_name="scorecard"
)

ServiceResponseError: HTTPSConnectionPool(host='eastus.api.azureml.ms', port=443): Read timed out. (read timeout=300)

In [None]:
import tempfile
import pathlib
from responsibleai import RAIInsights
from raiwidgets import ResponsibleAIDashboard
with tempfile.TemporaryDirectory() as dashboard_path:
        ml_client.jobs.download(
            insights_job.name, download_path=dashboard_path, output_name="dashboard"
        )
        expected_path = pathlib.Path(dashboard_path) / 'named-outputs' / 'dashboard'
        # This load is very fragile with respect to Python version and conda environment
        rai_i = RAIInsights.load(expected_path)
        ResponsibleAIDashboard(rai_i)