Copyright (c) Microsoft. All rights reserved.

Licensed under the MIT license.

In [1]:
%run /01_Authenticate_to_Purview_AML

In [2]:
%run /02_Create_ML_Lineage_Types

In [3]:
%run /03_Create_ML_Lineage_Functions

In [2]:
import json

In [3]:
#update below variables with synapse adls name and container/filesystem name
data_lake_account_name = ""
file_system_name = "data"

In [4]:
import pandas as pd
import numpy as np

synapse_base_path = 'abfss://' + file_system_name + '@' + data_lake_account_name + '.dfs.core.windows.net'
df_borrower = spark.read.load(synapse_base_path+ '/creditriskdata/borrower.csv', format='csv', header=True).toPandas()
#display(df_borrower.head(10))

df_loan = spark.read.load(synapse_base_path + '/creditriskdata/loan.csv', format='csv', header=True).toPandas()
#display(df_loan.head(1))

# Join data and do some transformations
df_data = df_borrower.merge(df_loan,on='memberId',how='inner')
df_data.shape

df_sp = spark.createDataFrame(df_data)
df_sp = df_sp.drop('loanStatus')

df_sp.write.option('header', 'true').mode('overwrite').csv(synapse_base_path + '/creditriskdata/testdata/')

df_data['homeOwnership'] = df_data['homeOwnership'].replace('nan', np.nan).fillna(0)
df_data['isJointApplication'] = df_data['isJointApplication'].replace('nan', np.nan).fillna(0)

drop_cols = ['memberId', 'loanId', 'date','grade']
df_data = df_data.drop(drop_cols, axis=1)
#df_data.dtypes

In [6]:
experimentname = "CreditRiskExperiment"

In [7]:
#create an entity for prepated data
data_ent_name = 'creditriskdata'
create_data_entity_with_schema(df_data,data_ent_name,'custom_dataset')

#create preprocess lineage 

syn_basepath = 'https://' + data_lake_account_name + '.dfs.core.windows.net/' + file_system_name + '/creditriskdata'
purview_basepath = 'pyapacheatlas://'

in_ent_qns = {syn_basepath + '/borrower.csv':'azure_datalake_gen2_path',syn_basepath + '/loan.csv':'azure_datalake_gen2_path'}
out_ent_qns = {purview_basepath + data_ent_name:'custom_dataset'}

processname = '-preprocess'
create_lineage_for_entities(experimentname,processname, in_ent_qns,out_ent_qns,ColumnMapping=True)

In [8]:
from azureml.core.experiment import Experiment
from azureml.train.automl.run import AutoMLRun
from azureml.train.automl import AutoMLConfig

##run only once
experiment = Experiment(ws, experimentname)

automl_classifier_config = AutoMLConfig(
        task='classification', 
        enable_early_stopping = True, 
        iterations = 2,      
        experiment_timeout_minutes=15,
        primary_metric='AUC_weighted',
        training_data= df_data,
        #compute = 'local',
        label_column_name='loanStatus',
        n_cross_validations=5,
        model_explainability=True,
        enable_onnx_compatible_models=True,
        enable_voting_ensemble=False,
        enable_stack_ensemble=False
        )
local_run = experiment.submit(automl_classifier_config, show_output=True)

In [10]:
# get experiment run, get the best model and register

from azureml.core.experiment import Experiment
from azureml.core.workspace import Workspace
from azureml.train.automl.run import AutoMLRun
from azureml.train.automl import AutoMLConfig
from azureml.core.model import Model
import joblib

# get experiment run, get the best model and register
experimentname = "CreditRiskExperiment"

for automl_run in ws.experiments[experimentname].get_runs():
    best_run, fitted_model = automl_run.get_output()  # We are taking the first run. You can update this if you like to take a different run
    break

#save the model to a local file
model_path = 'creditrisk_model'
joblib.dump(fitted_model, model_path)

model_name = "creditrisk_model"
registered_model = Model.register(model_path = model_path, # this points to a local file
                       model_name = model_name, # name the model is registered as
                       tags = {'type': "classification"}, 
                       description = "Credit Risk Classifier", 
                       workspace = ws)


In [11]:
#create packages entities
#[programming_language,package_name,version,notes]
packageslist = [['python','mmlspark','v0.0.11','older versions before 0.0.10 give error'],
                ['python','scikit-learn','0.22rc2.post1','latest version 0.24.x gives error if you call the model from Azure Function']]
create_package_entities(experimentname,packageslist)

In [12]:
#create experiment train lineage
create_experiment_config_entity(ws,experimentname,automl_run)
create_model_entity(ws,experimentname,model_name)
create_model_metrics_entity(experimentname,best_run)

pbasepath = 'pyapacheatlas://'

in_ent_qns = {pbasepath + data_ent_name:'custom_dataset',pbasepath + experimentname + "-config":'custom_ml_exp_config',pbasepath + experimentname + '-packages':'custom_ml_packages'}
out_ent_qns = {pbasepath + model_name:'custom_ml_model',pbasepath + experimentname + "-modelmetrics":'custom_ml_model_metrics'}

processname = '-train'
create_lineage_for_entities(experimentname,processname, in_ent_qns,out_ent_qns,ColumnMapping=False)

In [13]:
scoring_script = """
import json
import pickle
import numpy as np
import pandas as pd
import azureml.train.automl
from sklearn.externals import joblib
from azureml.core.model import Model

def init():
    global model
    # This name is model.id of model that we want to deploy deserialize the model file back
    model_path = Model.get_model_path(model_name = 'creditrisk_model')
    model = joblib.load(model_path)

def run(input_json):     
    try:
        data_df = pd.read_json(input_json)       
        # Get the predictions...
        prediction = model.predict(data_df)
        prediction = json.dumps(prediction.tolist())
    except Exception as e:
        prediction = str(e)
    return prediction
"""
exec(scoring_script)
with open("scoring_script.py", "w") as file:
    file.write(scoring_script)
    
scoring_script_file_name = 'scoring_script.py'

#test locally
import numpy as np
# X_test = spark.sql('select * from default.creditrisk_data limit 20').toPandas()
drop_cols = ['loanStatus']
X_test = df_data.drop(drop_cols, axis=1)
X_test = X_test.head(1)
json_test_data = X_test.to_json(orient='records')
print(json_test_data)
init()
run(json_test_data)

In [14]:
# obtain conda dependencies from the automl run and save the file locally
from azureml.core import Environment
environment_config_file = 'creditrisk_conda_env.yml'
best_run.download_file('outputs/conda_env_v_1_0_0.yml', environment_config_file)
# with open('creditrisk_conda_env.yml', 'r') as f:
#     print(f.read())

# create the environment based on the saved conda dependencies file
myenv = Environment.from_conda_specification(name="creditriskenv", file_path=environment_config_file)
myenv.register(workspace=ws)

from azureml.core.model import InferenceConfig
from azureml.core.webservice import AciWebservice
from azureml.core.webservice import Webservice

# Configure and deploy the web service to Azure Container Instances
inference_config = InferenceConfig(environment=myenv, entry_script=scoring_script_file_name)
aci_config = AciWebservice.deploy_configuration(cpu_cores = 1, memory_gb= 2, tags = { 'type' : 'automl-classification'}, description='AutoML Credit Risk Classifier Service')
aci_service_name = 'creditrisk-automl-service'
aci_service = Model.deploy(ws, aci_service_name, [registered_model], inference_config, aci_config)
aci_service.wait_for_deployment(show_output = True)
print(aci_service.state)

In [16]:
aci_service_name = 'creditrisk-automl-service'
create_model_service_entity(ws,experimentname,aci_service_name,json_test_data)

pbasepath = 'pyapacheatlas://'

in_ent_qns = {pbasepath + model_name:'custom_ml_model'}
out_ent_qns = {pbasepath + experimentname + "-model_endpoint":'custom_ml_model_endpoint'}

processname = '-deploymodel'
create_lineage_for_entities(experimentname,processname, in_ent_qns,out_ent_qns,ColumnMapping=False)

In [17]:
#batch inferencing
df_test = spark.read.load(synapse_base_path +'/creditriskdata/testdata', format='csv', header=True).toPandas()

drop_cols = ['memberId', 'loanId', 'date','grade']
df_test1 = df_test.drop(drop_cols, axis=1)

model_path = Model.get_model_path(model_name = 'creditrisk_model')
model = joblib.load(model_path)

prediction = model.predict(df_test1)
prediction

df_result = df_test 
df_result['prediction'] = prediction
df_result

data_lake_account_name = 'purviewaccdl'
file_system_name = 'purviewaccfs'
df_sp = spark.createDataFrame(df_result)
df_sp.write.option('header', 'true').mode('overwrite').csv(synapse_base_path + '/creditriskdata/batchpredictions/')

df_sp.write.mode("overwrite").saveAsTable("default.creditrisk_predictions")

In [22]:
#create an entity for test data
test_data_ent_name = 'creditrisktestdata'
create_data_entity_with_schema(df_test,test_data_ent_name,entitytype='custom_dataset')

#create an entity for batch inference data
batchpred_data_ent_name = 'creditriskbatchpredictions'
create_data_entity_with_schema(df_result,batchpred_data_ent_name,entitytype='custom_dataset')

#create batch inference lineage 
syn_basepath = 'https://' + data_lake_account_name + 'dfs.core.windows.net' + file_system_name + '/'
pbasepath = 'pyapacheatlas://'

in_ent_qns = {pbasepath + test_data_ent_name:'custom_dataset',pbasepath + model_name:'custom_ml_model'}
out_ent_qns = {pbasepath + batchpred_data_ent_name:'custom_dataset'}

processname = '-batchinference'
create_lineage_for_entities(experimentname,processname, in_ent_qns,out_ent_qns,ColumnMapping=True)

In [None]:
## uncomment below code to link PowerBI Dataset and Report in lineage if you have access to a PBI workspace 

In [23]:
# #The PowerBI entities will populate with more details if you set up a scan for PBI workspaces in Purview
# #We are only creating placeholders and links for lineage below

# #create PowerBI dataset entity and lineage 
# pbi_workspace = '<YOUR PBIWORKSPACE URL>' #'https://xxx.powerbi.com/groups/7c555287-f9b8-45ff-be6c-9909afe9df40'
# pbi_datasetid = '<YOUR PBI Dataset ID>' #'c4a30c22-466d-4a30-a1ac-8736ed6567cc' 

# pbidata_ent_name = 'creditriskpbidataset' 

# create_powerbi_dataset_and_lineage(experimentname,pbi_workspace,pbi_datasetid,pbidata_ent_name,batchpred_data_ent_name,'custom_dataset')

In [24]:
# #create PowerBI report entity and lineage
# pbi_reportid = '<YOUR PBI Report ID>' #'e495453d-6c0c-4fb9-bdc4-556319f6a57b'
# pbi_ent_name = 'creditriskpbireport'
 
# create_powerbi_report_and_lineage(experimentname,pbi_workspace,pbi_reportid,pbi_ent_name,pbi_datasetid)