######  NB! This,InteractiveLoginAuthentication, is only needed to run 1st time, then when ws_config is written, use later CELL in notebook, that just reads that file
import repackage
repackage.add("../azure-enterprise-scale-ml/esml/common/")
from azureml.core import Workspace
from azureml.core.authentication import InteractiveLoginAuthentication
from esml import ESMLDataset, ESMLProject

p = ESMLProject()
p.dev_test_prod="dev"
auth = InteractiveLoginAuthentication(tenant_id = p.tenant)
ws, config_name = p.authenticate_workspace_and_write_config(auth)
######  NB!

# `ESML - BATCH PIPELINE "Classic Creation via notebook`
Note: "Classic Creation via notebook`: This can be done with 0% ESML SDK involved. 
- Note: This full notebook (15 pages of code, 17-32h of work), can be done via the ESML SDK with a 1-liner. See other notebook example for that.

## WHAT
- Get correct environment and Auzre ML workspace (dev,test,prod) via ESML
- Get compute via ESML
- Get AutoML environment and model.pickle via ESML
- Create without ESML, Azure ML dependency only
    - Steps: In -> Silver
        - Get indata "to score" dynamically. Azuda data factory sends 2 parameters to control this. (model verision, scoring date)
    - Step: Silver2Gold
    - Step: ScoreGold
- `Scoring & Writeback orchestration` Supports scenario:  `"daily scoring & writeback"
    - This notebook gives Azure data factory the "meta-data" needed to call this Azure ML Pipline, and also know what scoring to write back

## EFFORT avoided "head ache" for you : ) 
- To write & debug this notebook, took me ~100 pipeline runs a´ 5-10min = `20h work`.
- Why? To support the  scenario `"daily scoring & writeback"`, keeping the scored data historically in date_time folders there was...
    -  ...A lot of headache `integrating` Azure ML Pipeline with Azure Datafactory with dynamic dataset for ADLS GEN 2 (and lack of support for dynamic Dataset paths)
        - Example: `A djungle of choices to "pass data"` between pipeline steps 
            - 4 alternatives was tested, including this: `PipelineData VS PipelineParameter + DataPath + DataPathComputeBinding VS Dataset + DatasetConsumptionConfig and OutputFileDatasetConfig`
                - ....DataReference was not tested, since very legacy. But also PipelineData is sort of going away, hence avoided that - did not work for ADLS GEN 2 datasets anyway...
        - Example `Lack of support to control path` for Dataset IN our OUT data 
            - ADLS GEN 2 does not support DataPath, which Azure Datafactory can set (for Blob storage)
            - Azure Datafactory cannot read or write from Azure ML Datasets (which would be great, to use version-number to get a registered dataset)
            - OutputFileDatasetConfig nly support for {run-id} and {output-name} for output, which Azure Datafactory cannot receive from activity...we sort of need "date_folder" concept
        - `ESML Solution:`
            - `IN Dynamic path`: An own "template" path, that support date_folder and model_version, besides {run-id}. This is set in runtime, from Azure data factory parameters `"Scoring_Date"`
                - How: Set "dummy" paths, or a§ "default" dataset with a default path....reset the path during runtime.
                - Cons': The path that is is set during pipline runtime, is not reflected in the Azure ML Studio UI, graphical representaiton, and we cannot "update the path" to be reflected (but works)
                    - The path that is set when pipeline is CREATED/PUBLISHED, is `"static"` in the UI. 
                    - Example: If we have a `"mydata/2020/01/01"` when creating pipe, this will show even if we run it `"mydata/2021/05/03` with PipelineParameters "2021/05/03"
            - `OUT Dynamic path`: Write "run-meta-data" to a file in ADLS GEN 2, that Azure datafactory can read after run is done, able to "WriteBack" data from a specific "date_folder"
                - `last_scored_gold.csv` is then read by Azure Datafactory post AML pipeline activity, to fetch the "path" of the `correct` scored gold..with `2021/05/03` in its path, not `2020-01-01`
# TIP:
You can use ESMLPipeline factory instead of running/managing a notebook like this.

`ESMLPipeline factory will build the pipeline automatically`, all steps based on the dataset array in the `model_settings.json` and witht the `ESML Datamodel: Bronze->Silver-Gold` 

    - p_factory = ESMLPipelineFactory(p, "Y") 
    - batch_pipeline = p_factory.create_batch_scoring_pipe()

In [1]:
import repackage
repackage.add("../azure-enterprise-scale-ml/esml/common/")
from esml import ESMLProject

p = ESMLProject() # Will search in ROOT for your copied SETTINGS folder '../../../settings', you should copy template settings from '../settings'
p.ws = p.get_workspace_from_config() #2) Load DEV or TEST or PROD Azure ML Studio workspace
p.inference_mode = True

# `01_Get COMPUTE & ENVIRONMENT, Connect to LAKE`

import repackage
repackage.add("../azure-enterprise-scale-ml/esml/common/")
from esml import ESMLProject

param_esml_env = "dev"
param_inference_model_version = "4"
param_scoring_folder_date = "2021-06-22 15:35:01.243860" # will become both IN and GOLD path:  
param_train_in_folder_date = "2021-01-22 15:35:01.243860" # 
optional_param_my_unique_scoring_folder = "88483b56c3f8450a86f2eeac000bb834"

p = ESMLProject(param_esml_env,param_inference_model_version,param_scoring_folder_date,param_train_in_folder_date)
p.ws = p.get_workspace_from_config()
p.describe()

In [2]:
aml_compute = p.get_training_aml_compute(p.ws) # Get compute, for active environment, either use same as training, or create a separate INFERENCE compute
datastore = p.connect_to_lake()

Note: OVERRIDING enterprise performance settings with project specifics. (to change, set flag in 'dev_test_prod_settings.json' -> override_enterprise_settings_with_model_specific=False)
Using a model specific cluster, per configuration in project specific settings, (the integer of 'model_number' is the base for the name)
Note: OVERRIDING enterprise performance settings with project specifics. (to change, set flag in 'dev_test_prod_settings.json' -> override_enterprise_settings_with_model_specific=False)
Found existing cluster prj02-m11-dev for project and environment, using it.
Succeeded
AmlCompute wait for completion finished

Minimum number of nodes requested have been provisioned
image_build_compute = prj02-m11-dev
Using GEN2 as Datastore


## 02 `Create a local ScriptFolder`
- Later to be a Snapshot and below to the pipeline


In [3]:
import os
# python scripts folder
script_folder = './common/pipeline/scripts'
os.makedirs(script_folder, exist_ok=True)
print('Script is in {}.'.format(os.path.realpath(script_folder)))

Script is in c:\Users\jostrm\OneDrive - Microsoft\0_GIT\2_My\github\azure-enterprise-scale-ml\notebook_demos\common\pipeline\scripts.


## 03 `Download a model to use to score with`

In [4]:
#2a - Download from Azure
inference_config_to_override_and_inject, model, best_run = p.get_active_model_inference_config() # 1) You can override this scoring_script - get a baseline, then modify...
inference_env = best_run.get_environment() # Batch scoring Environment(), for ScriptRunConfig
best_run.download_file(
    "outputs/model.pkl", os.path.join(script_folder, "model.pkl") # Download model.pkl for SNAPSHOT
)

Loading AutoML config settings from: dev
Loading AutoML config settings from: dev


Package:azureml-automl-runtime, training version:1.30.0, current version:1.26.0
Package:azureml-core, training version:1.30.0, current version:1.26.0
Package:azureml-dataprep, training version:2.15.1, current version:2.13.2
Package:azureml-dataprep-native, training version:33.0.0, current version:32.0.0
Package:azureml-dataprep-rslex, training version:1.13.0, current version:1.11.2
Package:azureml-dataset-runtime, training version:1.30.0, current version:1.26.0
Package:azureml-defaults, training version:1.30.0, current version:1.26.0
Package:azureml-interpret, training version:1.30.0, current version:1.26.0
Package:azureml-pipeline-core, training version:1.30.0, current version:1.26.0
Package:azureml-telemetry, training version:1.30.0, current version:1.26.0
Package:azureml-train-automl-client, training version:1.30.0, current version:1.26.0
Package:azureml-train-automl-runtime, training version:1.30.0, current version:1.26.0


## 04 `Create parameters, and set defaults`

## Who gives input? parameters? and Why? 
### Q: Why? 
- A: To use same DEV scoring pipeline, with either different data to be scored `daily scoring`, or `different model-version SAME day` to score with.
- A: To have "environment parameters (dev,test,prod) we can instatiate a ESMLProject what knows the lake, workspace, makes it easy to create 3 pipelines for dev,test,prod
    - And data, if 1 LAKE or 3 LAKES (dev,test,prod), they all have data-folders "dev,test,prod"

### Who gives input? 
- A) Azure Devops (CI/CD) will trigger TRAIN pipeline, that will end with creating this BATCH SCORING, with 
    - 2 parameters (`esml_environment, esml_inference_model_version`), to CREATE/UPDATE the BATCH pipeline with newly trained model
    - 1 dummy (`esml_scoring_folder_date`) to test BATCH SCORING after creation.
- B) Azure Datafactory will trigger BATCH SCORING, with 
    - 2 PIPELINE parameters (`esml_inference_model_version, esml_scoring_folder_date`), to read IN-DATA to be scored. Usually "todays" esml_scoring_folder_date
    - 1 Optional PIPELINE parameter: `esml_optional_unique_scoring_folder`. ESML Supports multiple data writes/scorings per day. But if not all "guid-folders" below date should be merged and scored, 1 folder can be passed as filter.
    - Note: `*esml_environment` is not really needed post creation - since we already created the pipleine in DEV, `locked and loaded`
    
Note: Prerequisite: Before Azure ML pipleine is called, AZURE DATA FACTORY has written either a .csv or .parquet or .jpg in the IN folder, and passed the "scoring_date" and "model_version" 

In [5]:
from azureml.pipeline.core import PipelineParameter

# Must be a REAL dataset at Pipelime creation. even though "dummy"
par_esml_model_version = PipelineParameter(name="esml_inference_model_version", default_value=p.inferenceModelVersion) 
par_esml_scoring_date = PipelineParameter(name="esml_scoring_folder_date", default_value=str(p.date_scoring_folder))
par_esml_guid_folder = PipelineParameter(name="esml_optional_unique_scoring_folder", default_value="*") 

### Below is just an easy way to use ESML not having to remember the Datalake and Azure ML dataset design
- The cell also remove ALL dependancy to ESML SDK on further cells


In [6]:
from azureml.core.dataset import Dataset

esml_environment = p.dev_test_prod # PIPELINE "locked and loaded", e.g. to a TEST Azure ML workspace, and TEST datalake
ds01 = p.DatasetByName("ds01_diabetes")
ds01_in_name = ds01.AzureName_IN # 'M11_ds01_diabetes_inference_IN'
ds01_silver_name = ds01.AzureName_Silver # 'M11_ds01_diabetes_inference_SILVER'
ds01_silver_path = ds01.SilverPath
ds01_template_path = ds01.InPathTemplate

ds02 = p.DatasetByName("ds02_other")
ds02_in_name = ds02.AzureName_IN # 'M11_ds02_other_inference_IN'
ds02_silver_name = ds02.AzureName_Silver # 'M11_ds02_other_inference_SILVER'
ds02_silver_path = ds02.SilverPath

gold_to_score_name = p.dataset_gold_to_score_name_azure # 'M11_GOLD_TO_SCORE'
gold_to_score_path = p.GoldPathToScoreBatch
target_column_name = "Y"

dataset_to_score = p.GoldToScore # Dataset.get_by_name(workspace = p.ws, name = gold_to_score_name)

gold_scored_name = p.dataset_gold_scored_name_azure
to_score_folder_batch, scored_folder, date_folder = p.get_gold_scored_unique_path(p.date_scoring_folder)

## 04 `Bronze2Gold Environment: RunConfig`
You can have separate environments to each pipeline step. For ease we here use same for all steps.
- We wil just use a curated here: AzureML-Tutorial (USE_CURATED_ENV)

#### Example: 
- `Pipeline 1: Dataprep & Batch scoring`: For In->Bronze->Silver-GOLD_TO_SCORE we will also use pandas, scikit-learn and automl, pyarrow for the pipeline steps. 
    - We defining the runconfig for that.
- `Pipeline 2: AutoML train pipeline`:  For a pipline with:  In->Bronze->Silver-GOLD_TO_TRAIN->Training we will also need automl.
    - We don't need that here, since only scoring.

In [7]:
from azureml.core import Environment
from azureml.core.runconfig import DEFAULT_CPU_IMAGE
from azureml.core.runconfig import DockerConfiguration

from azureml.core.runconfig import RunConfiguration
from azureml.core.conda_dependencies import CondaDependencies

aml_run_config = RunConfiguration()
aml_run_config.target = aml_compute # `compute_target` as defined in "Azure Machine Learning compute" section above

USE_CURATED_ENV = True
if USE_CURATED_ENV :
    curated_environment = Environment.get(workspace=p.ws, name="AzureML-Tutorial") # "AzureML-AutoMLTutorial" https://docs.microsoft.com/en-us/azure/machine-learning/resource-curated-environments
    aml_run_config.environment = curated_environment
else:
    aml_run_config.environment.python.user_managed_dependencies = False
    
    # Add some packages relied on by data prep step
    aml_run_config.environment.python.conda_dependencies = CondaDependencies.create(
        conda_packages=['pandas==0.25.1','scikit-learn==0.22.1', 'numpy==1.18.5',''], 
        pip_packages=['azureml-defaults', 'azureml-dataprep[fuse,pandas]'], # azureml-sdk
        pin_sdk_version=False)
    
docker_config = DockerConfiguration(use_docker=True)
aml_run_config.docker = docker_config

## 05a Pipline step: `01_In2Silver`
- Used both for TRAIN and INFERENCE
- Depends on data exists in IN folder, by ADF pipeline System -> IN

### 05b scrip file for step: `01_In2Silver` 

In [8]:
%%writefile $script_folder/ds01_bronze2silver.py
from azureml.core import Run
from azureml.core import Dataset
import pandas as pd 
import argparse
import datetime

parser = argparse.ArgumentParser()
# REGULAR PARAMETERS 
parser.add_argument('--par_esml_env', dest='par_esml_env',type=str, required=True)
parser.add_argument('--esml_input_lake_template', dest='esml_input_lake_template', required=True)

# PIPELINE PARAMETERS 
parser.add_argument('--par_esml_model_version', dest='par_esml_model_version',type=str, required=True)
parser.add_argument('--par_esml_scoring_date', dest='par_esml_scoring_date',type=str, required=True)
args = parser.parse_args()

# CREATE IN FOLDER from parameters
date_infolder = datetime.datetime.strptime(args.par_esml_scoring_date, '%Y-%m-%d %H:%M:%S.%f') 
esml_scoring_date_in = date_infolder.strftime('%Y/%m/%d') #  String to folder structure 2020/01/01
esml_model_version = args.par_esml_model_version
esml_env = args.par_esml_env
print("Scoring date IN folder: {}".format(esml_scoring_date_in))

input_path = args.esml_input_lake_template.format(inference_model_version = esml_model_version, dev_test_prod = esml_env, scoring_folder_date=esml_scoring_date_in)
input_path_csv = input_path + '*.csv'
input_path_parquet = input_path + '*.parquet'
print("Scoring full path, input_path: {}".format(input_path_csv))

run = Run.get_context()
ws = run.experiment.workspace
datastore = ws.get_default_datastore()
M11_ds01_diabetes_inference_IN_csv = None

try:
    M11_ds01_diabetes_inference_IN_csv = Dataset.Tabular.from_delimited_files(path = [(datastore, input_path_csv)])
except Exception as e:
    print("Could not load .CSV files from IN dataset. Now trying .PARQUET instead:  {}".format(input_path_parquet))
    M11_ds01_diabetes_inference_IN_csv = Dataset.Tabular.from_delimited_files(path = [(datastore, input_path_parquet)])

print("ESML Alt2 did ALSO work: Dataset.Tabular.from_delimited_files(path = [(datastore, input_path_csv)]) = {}".format(M11_ds01_diabetes_inference_IN_csv.name))

try:
    # Test to SET input : ) 
    name_dummy_ds = M11_ds01_diabetes_inference_IN_csv.name
    run.input_datasets[name_dummy_ds] = M11_ds01_diabetes_inference_IN_csv.as_named_input(name_dummy_ds) # Get Dataset
    print("ESML set input_datasets did work : ) ")
except Exception as e:
    print("Error: ESML Alt3 next(iter(run.input_datasets)) -> run.input_datasets[dataset_name] , did not work {}".format(e.message))

################################### EDIT BELOW - feature engieering ########################

# DROP Y...for DEMPO purpose. Real world has no Y set.
target_column_name = "Y"
df = M11_ds01_diabetes_inference_IN_csv.to_pandas_dataframe().drop(target_column_name, axis=1) # Simulate feature engineering...source system might not know column name for Y, and certainly not values
df.at[0,'AGE'] = 0.099
df.at[0,'BMI'] = 0.099
df.at[0,'BP'] = 0.099


################################### EDIT ABOVE - feature engieering ########################
# Save as Dataset
df.reset_index(inplace=True, drop=True)
output_silver_dataset_key1 =  next(iter(run.output_datasets)) 
output_silver_dataset = run.output_datasets[output_silver_dataset_key1]

if not (output_silver_dataset is None):
    os.makedirs(output_silver_dataset, exist_ok=True)
    print("%s created" % output_silver_dataset)
    path = output_silver_dataset + "/silver.parquet"
    write_df = df.to_parquet(path,engine='pyarrow', index=False,use_deprecated_int96_timestamps=True,allow_truncated_timestamps=False)

print(f"Wrote prepped data to {output_silver_dataset}/silver.parquet")

Overwriting ./common/pipeline/scripts/ds01_bronze2silver.py


In [9]:
import os
import datetime
from azureml.pipeline.steps import PythonScriptStep
from azureml.core.dataset import Dataset
from azureml.data import OutputFileDatasetConfig

## 06a Pipline step: `02_In2Silver`
- Used both for TRAIN and INFERENCE
- Depends on data exists in IN folder, by ADF pipeline System -> IN

In [10]:
# IN DATASET: CREATE "Template Dummy dataset, but with LATEST path"
date_infolder = datetime.datetime.strptime(par_esml_scoring_date.default_value, '%Y-%m-%d %H:%M:%S.%f')
esml_scoring_date_in = date_infolder.strftime('%Y/%m/%d') #  String 2020/01/01
input_path = ds01_template_path.format(inference_model_version = par_esml_model_version.default_value, dev_test_prod = esml_environment, scoring_folder_date=esml_scoring_date_in)
input_path_csv = input_path + "*.csv"

# Must be a real dataset, evb though "dummy/template". Or else: # DatasetValidationError.  Cannot load any data from the specified path. Make sure the path is accessible and contains data.
M11_ds01_diabetes_inference_IN_csv = Dataset.Tabular.from_delimited_files(path = [(datastore, input_path_csv)], validate=False) 
ds_dataset_consumption_config = M11_ds01_diabetes_inference_IN_csv.as_named_input(ds01_in_name)

# OUT DATASET: 
path = ds01_silver_path + "{run-id}/"
stepdata10_ds01_diabetes_silver = (
    OutputFileDatasetConfig(
            name=ds01_silver_name,
            destination=(datastore, path))
            .as_upload(overwrite=True)
            .read_parquet_files()
            .register_on_complete(name=ds01_silver_name)
)

step_diabetes2silver = PythonScriptStep( 
    runconfig=aml_run_config,
    script_name="ds01_bronze2silver.py",
    name="BRONZE to SILVER",
    arguments=["--esml_input_lake_template", ds01.InPathTemplate,
    "--par_esml_env",esml_environment, "--par_esml_model_version",par_esml_model_version,"--par_esml_scoring_date",par_esml_scoring_date], 
    inputs=[ds_dataset_consumption_config], # IN dataset
    outputs=[stepdata10_ds01_diabetes_silver], # OUT dataset
    source_directory=script_folder, 
    compute_target=aml_compute,
    allow_reuse=False
)

In [35]:
esml_step_types.BRONZE_2_SILVER

'BRONZE_2_SILVER'

### 06b scrip file for step: `02_In2Silver`

In [11]:
%%writefile $script_folder/ds02_bronze2silver.py
from azureml.core import Run
from azureml.core import Dataset
import pandas as pd 
import argparse
import os
import datetime

# IN/OUT folder paths
parser = argparse.ArgumentParser()
parser.add_argument('--esml_input_lake_template', dest='esml_input_lake_template', required=True)
parser.add_argument('--par_esml_env', dest='par_esml_env', required=True)
parser.add_argument('--par_esml_model_version', dest='par_esml_model_version', required=True)
parser.add_argument('--par_esml_scoring_date', dest='par_esml_scoring_date', required=True)
args = parser.parse_args()

# GENEREATE PATH to data
date_infolder = datetime.datetime.strptime(args.par_esml_scoring_date, '%Y-%m-%d %H:%M:%S.%f')
esml_scoring_date_in = date_infolder.strftime('%Y/%m/%d') #  String to folder structure 2020/01/01
esml_model_version = args.par_esml_model_version
esml_env = args.par_esml_env
print("Scoring date IN folder: {}".format(esml_scoring_date_in))

input_path = args.esml_input_lake_template.format(inference_model_version = esml_model_version, dev_test_prod = esml_env, scoring_folder_date=esml_scoring_date_in)
input_path_csv = input_path + '*.csv'
input_path_parquet = input_path + '*.parquet'
print("IN Dataset. INPUT full path: {}".format(input_path_csv))

# 1) GET INPUT - Get .CSV or .PARQUET Dataset
run = Run.get_context()
ws = run.experiment.workspace
datastore = ws.get_default_datastore()

try:
    M11_ds02_diabetes_inference_IN_csv = Dataset.Tabular.from_delimited_files(path = [(datastore, input_path_csv)]) 
except Exception as e:
    print("Could not load .CSV files from IN dataset. Now trying .PARQUET instead:  {}".format(input_path_parquet))
    M11_ds02_diabetes_inference_IN_csv = Dataset.Tabular.from_parquet_files(path = [(datastore, input_path_parquet)])

# 2) UPDATE INPUT Dataset
#name_ds = M11_ds02_diabetes_inference_IN_csv.name
#run.input_datasets[name_ds] = M11_ds02_diabetes_inference_IN_csv.as_named_input(name_ds) # Get Dataset

################################### 3) EDIT BELOW - feature engieering ########################

target_column_name = "Y" # for DEMO purpose, to drop label data
# DROP Y, for DEMO purpose
df = M11_ds02_diabetes_inference_IN_csv.to_pandas_dataframe().drop(target_column_name, axis=1) # Simulate feature engineering...source system might not know column name for Y, and certainly not values


################################### EDIT ABOVE - feature engieering ########################

# Save as Dataset
df.reset_index(inplace=True, drop=True)
output_silver_dataset_key1 =  next(iter(run.output_datasets)) # Get 1st key in dictionary
output_silver_dataset = run.output_datasets[output_silver_dataset_key1] # args.output_silver_dataset

if not (output_silver_dataset is None):
    os.makedirs(output_silver_dataset, exist_ok=True)
    print("%s created" % output_silver_dataset)
    path = output_silver_dataset + "/silver.parquet"
    write_df = df.to_parquet(path, engine='pyarrow', index=False,use_deprecated_int96_timestamps=True,allow_truncated_timestamps=False)

print(f"Wrote prepped data to {output_silver_dataset}/silver.parquet")

Overwriting ./common/pipeline/scripts/ds02_bronze2silver.py


In [12]:
par_esml_model_version.default_value

1

In [13]:
# CREATE "Template Dummy dataset, but with LATEST path"
date_infolder = datetime.datetime.strptime(par_esml_scoring_date.default_value, '%Y-%m-%d %H:%M:%S.%f') # DateTime 
esml_scoring_date_in = date_infolder.strftime('%Y/%m/%d') #  String 2020/01/01
input_path_ds02 = ds02.InPathTemplate.format(inference_model_version = par_esml_model_version.default_value, dev_test_prod = esml_environment, scoring_folder_date=esml_scoring_date_in)
input_path_csv_ds02 = input_path_ds02 + "*.csv"

# IN: 
M11_ds02_other_inference_IN_csv = Dataset.Tabular.from_delimited_files(path = [(datastore, input_path_csv_ds02)])
# OUT:
path_out02 = ds02_silver_path + "{run-id}/" # + "{run-id}/{output-name}"

stepdata11_ds02_other_silver = (
    OutputFileDatasetConfig(
            name=ds02_silver_name,
            destination=(datastore, path_out02)) # partition_format='/{PipelineUniqueRunId}/silver.parquet'
            .as_mount()
            .read_parquet_files()
            .register_on_complete(name=ds02_silver_name)
)

step_other2silver = PythonScriptStep(
    runconfig=aml_run_config,
    script_name='ds02_bronze2silver.py',
    name="BRONZE to SILVER",
    arguments=["--esml_input_lake_template", ds02.InPathTemplate,
    "--par_esml_env",esml_environment, "--par_esml_model_version",par_esml_model_version,"--par_esml_scoring_date",par_esml_scoring_date ],
    inputs=[M11_ds02_other_inference_IN_csv.as_named_input(ds02_in_name)],
    outputs=[stepdata11_ds02_other_silver],
    source_directory=script_folder,
    compute_target=aml_compute,
    allow_reuse=False
)

## 07 Pipline step: `Merge Silvers 2 GOLD_TO_SCORE` 
- Used for INFERENCE. Merges all silver datasets to a GOLD_TO_SCORE dataset
- Depends on data exists in SILVER datasets. Default takes latest version.

In [14]:
p.path_gold_to_score_template()

'projects/project002/11_diabetes_model_reg/inference/{model_version}/gold/dev/'

In [15]:
p.path_gold_to_score_template().format(model_version = 0) # 0 = Latest

'projects/project002/11_diabetes_model_reg/inference/0/gold/dev/'

In [16]:
p.path_gold_to_score_template(True,True)

'projects/project002/11_diabetes_model_reg/inference/{model_version}/gold/dev/{date_folder}/{id_folder}/'

In [17]:
# OUT:
#gold_to_score_folder = gold_to_score_path+'{run-id}'

path_gold_to_score_template_latest = p.path_gold_to_score_template()
path_gold_to_score_template_pars = p.path_gold_to_score_template(True,True)
gold_to_score_folder = path_gold_to_score_template_latest.format(model_version = 0) # 0 means "latest" par_esml_model_version.default_value

gold_to_score = (
    OutputFileDatasetConfig(name=gold_to_score_name,destination=(datastore,gold_to_score_folder))
    .as_upload(overwrite=True) # as_mount() also works
    .read_parquet_files()  # To promote File to Tabular Dataset. This, or .read_delimited_files()  will return/converts to an "OutputTabularDatasetConfig"
    .register_on_complete(name=gold_to_score_name)
)

step_gold_merged = PythonScriptStep(
    runconfig=aml_run_config,
    script_name='silvers2gold.py',
    name="SILVER's merged to GOLD_TO_SCORE",
    arguments=["--input_10_silver_name",stepdata10_ds01_diabetes_silver.name,"--input_11_silver_name",stepdata11_ds02_other_silver.name,"--output_20_merged_gold",gold_to_score, "--target_column_name",target_column_name,
    "--par_esml_scoring_date",par_esml_scoring_date, "--par_esml_model_version",par_esml_model_version,"--esml_output_lake_template",path_gold_to_score_template_pars,
    "--param1","ParameterValueTest"],
    inputs=[stepdata10_ds01_diabetes_silver.as_input(stepdata10_ds01_diabetes_silver.name) , stepdata11_ds02_other_silver.as_input(stepdata11_ds02_other_silver.name)], # needed to be able to fetch dataset from 'run.input_datasets' array
    outputs=[gold_to_score], # Do not use. Maybe it looks nice in UI graph? 
    source_directory=script_folder,
    compute_target=aml_compute,
    allow_reuse=False
)

print("step_gold_merged created.")

step_gold_merged created.


### 07 scrip file for step: `Silvers 2 GOLD_TO_SCORE` 

In [18]:
%%writefile $script_folder/silvers2gold.py
import argparse
import os
import datetime
import numpy as np
from azureml.core import Run
from azureml.core import Dataset
from azureml.data.dataset_factory import FileDatasetFactory

parser = argparse.ArgumentParser("gold")
parser.add_argument('--input_10_silver_name', dest='input_10_silver_name', help="10,11,12,.. can be processed in parallell", required=True)
parser.add_argument('--input_11_silver_name', dest='input_11_silver_name',help="10,11,12,.. can be processed in parallell", required=True)
parser.add_argument('--target_column_name', dest='target_column_name',type=str, help="Target Label - column to add", required=True)
parser.add_argument('--output_20_merged_gold', dest='output_20_merged_gold',help='Path to write GOLD_TO_SCORE',required=True)

parser.add_argument('--par_esml_scoring_date', dest='par_esml_scoring_date',help='Date_folder in lake  to score',required=True)
parser.add_argument('--par_esml_model_version', dest='par_esml_model_version',help='Model version to score with 1,2,3',required=True)
parser.add_argument('--esml_output_lake_template', dest='esml_output_lake_template',help='Template path with plae holders to write GOLD_TO_SCORE',required=True)


# OPTIONAL - 
parser.add_argument('--param1', dest='param1',help='My test parameter', required=False)
args = parser.parse_args()

print("Argument (output merge data path): %s" % args.output_20_merged_gold)
print("Argument OutputFileDatasetConfig datatype: %s" % type(args.output_20_merged_gold))

print("Merge SILVERs data to GOLD")
run = Run.get_context()

# INPUTS
for indata in run.input_datasets:
    print("input_datasets is not empty")
    print(str(type(indata)))
    print(indata)

ws = run.experiment.workspace
datastore = ws.get_default_datastore()

M11_ds01_diabetes_inference_IN_csv = run.input_datasets[args.input_10_silver_name] # Alt 1) Get via input_datastes array (just demo purpose, to use same way makes mor sense)
df1 = M11_ds01_diabetes_inference_IN_csv.to_pandas_dataframe()

M11_ds02_other_inference_SILVER = Dataset.get_by_name(workspace=ws, name=args.input_11_silver_name,  version='latest') # Alt 2) Get via workspace (just demo purpose, use this for all)
df2 = M11_ds02_other_inference_SILVER.to_pandas_dataframe() 

################################### EDIT BELOW if NEEDED - end up with a datafram called 'combined_df'  ########################
# Merge data, and sample 10%, simulate a "filter" for junk
combined_df = df1.append(df2, ignore_index=True)
combined_df.reset_index(inplace=True, drop=True)
df_to_score = combined_df.sample(frac=0.1, replace=True, random_state=1)

# Add LABEL column, to score
df_to_score['Y'] = np.nan

################################### EDIT ABOVE if NEEDED - end up with a datafram called 'combined_df' ########################

output_to_score_gold_name =  next(iter(run.output_datasets)) # Get 1st key in dictionary
output_to_score_gold = run.output_datasets[output_to_score_gold_name]


# SET PATH to "parameters": model-version, date-folder



# 1) UPLOAD Files

# 2) Create a DATASET from files. and register them
#...the pipeline has "no output" then...but LAKE looks right.


# Save GOLD as .parquet
# OUTPUT - M11_GOLD_TO_SCORE
if not (output_to_score_gold is None):
    os.makedirs(output_to_score_gold, exist_ok=True)
    print("%s created" % output_to_score_gold)
    path = output_to_score_gold + "/gold_to_score.parquet"
    
    # 1) Save/Overwrite "latest" data: 'projects/project002/11_diabetes_model_reg/inference/0/gold/dev/', for SCORE_GOLD step to read
    write_df = combined_df.to_parquet(path,engine='pyarrow', index=False,use_deprecated_int96_timestamps=True,allow_truncated_timestamps=False)
    
    # Copy also to a "date_folder", for history in the lake
    date_infolder = datetime.datetime.strptime(args.par_esml_scoring_date, '%Y-%m-%d %H:%M:%S.%f')
    esml_scoring_date_out = date_infolder.strftime('%Y/%m/%d') #  Save scoring same date as IN-data 'in/2020/01/01' for 'gold_scored/2020/01/01'

    # 2) Save historic data, with runtime parameters 'projects/project002/11_diabetes_model_reg/inference/{model_version}/gold/dev/{date_folder}/{id_folder}/'
    print("run.run_id {}".format(run.id))
    
    new_path = args.esml_output_lake_template.format(model_version = args.par_esml_model_version, date_folder = esml_scoring_date_out,id_folder= run.id)
    FileDatasetFactory.upload_directory(src_dir=output_to_score_gold, target=(datastore, new_path), pattern=None, overwrite=True, show_progress=False)
'''
if not (args.output_20_merged_gold is None):
    os.makedirs(args.output_20_merged_gold, exist_ok=True)
    print("%s created" % args.output_20_merged_gold)
    path = args.output_20_merged_gold + "/gold_to_score.parquet"
    write_df = combined_df.to_parquet(path,engine='pyarrow', index=False,use_deprecated_int96_timestamps=True,allow_truncated_timestamps=False)
'''

Overwriting ./common/pipeline/scripts/silvers2gold.py


# 08 PIPELINE STEP: `SCORE GOLD STEP`

In [19]:
p.path_gold_scored_template().format(model_version=par_esml_model_version.default_value)

'projects/project002/11_diabetes_model_reg/inference/1/scored/dev/'

In [20]:
# Get paths and names in whatever way you like for ESML AutoLake design. Here we use ESMLProject.properties and template methods
latest_scored_folder = p.path_gold_scored_template().format(model_version=0) # 0= latest scored
latest_gold_scored_path = latest_scored_folder + "{run-id}"
scored_folder_template = p.path_gold_scored_template(True,True)

# OUT:
scored_gold = (
    OutputFileDatasetConfig(name= p.dataset_gold_scored_name_azure,destination=(datastore,latest_gold_scored_path))
    .as_upload(overwrite=True) # as_mount() also works
    .read_parquet_files()  # To promote File to Tabular Dataset. This, or .read_delimited_files()  will return/converts to an "OutputTabularDatasetConfig"
    .register_on_complete(name= p.dataset_gold_scored_name_azure)
)

last_gold_run = (
    OutputFileDatasetConfig(name=p.dataset_gold_scored_runinfo_name_azure,destination=(datastore,p.path_inference_gold_scored_runinfo))
    .as_upload(overwrite=True) # as_mount() also works
    .read_delimited_files()  # To promote File to Tabular Dataset. This, or .read_delimited_files()  will return/converts to an "OutputTabularDatasetConfig"
    .register_on_complete(name=p.dataset_gold_scored_runinfo_name_azure)
)

active_folder = (
    OutputFileDatasetConfig(name=p.dataset_active_name_azure,destination=(datastore, p.path_inference_active))
    .as_upload(overwrite=True) # as_mount() also works
    #.read_delimited_files()  # To promote File to Tabular Dataset. This, or .read_delimited_files()  will return/converts to an "OutputTabularDatasetConfig"
    #.register_on_complete(name=p.dataset_active_name_azure)
)

step_score_gold = PythonScriptStep(
    runconfig=aml_run_config,
    script_name='score_gold.py',
    name="SCORING GOLD",
    arguments=["--input_gold_name",gold_to_score.name, "--par_esml_scoring_date",par_esml_scoring_date, "--par_esml_model_version",par_esml_model_version
    , "--target_column_name",target_column_name, "--esml_output_lake_template",scored_folder_template],
    inputs=[gold_to_score.as_input(gold_to_score.name)], 
    outputs=[scored_gold,last_gold_run,active_folder], 
    source_directory=script_folder,
    compute_target=aml_compute,
    allow_reuse=False
)

print("SCORING step created.")

SCORING step created.


In [21]:
%%writefile $script_folder/score_gold.py

import logging
import os
import pickle
import numpy as np
import pandas as pd
import joblib
import azureml.automl.core
from azureml.automl.core.shared import logging_utilities, log_server
from azureml.telemetry import INSTRUMENTATION_KEY
import argparse
from azureml.core import Dataset
from azureml.core import Run
from azureml.data.dataset_factory import FileDatasetFactory

# Exists not in CURATED environment "AzureML-Tutorial", but only in "AzureML-AutoML"
#from inference_schema.schema_decorators import input_schema, output_schema
#from inference_schema.parameter_types.numpy_parameter_type import NumpyParameterType
#from inference_schema.parameter_types.pandas_parameter_type import PandasParameterType

input_sample = pd.DataFrame({"AGE": pd.Series([0.0], dtype="float64"), "SEX": pd.Series([0.0], dtype="float64"), "BMI": pd.Series([0.0], dtype="float64"), "BP": pd.Series([0.0], dtype="float64"), "S1": pd.Series([0.0], dtype="float64"), "S2": pd.Series([0.0], dtype="float64"), "S3": pd.Series([0.0], dtype="float64"), "S4": pd.Series([0.0], dtype="float64"), "S5": pd.Series([0.0], dtype="float64"), "S6": pd.Series([0.0], dtype="float64")})
output_sample = np.array([0])
try:
    log_server.enable_telemetry(INSTRUMENTATION_KEY)
    log_server.set_verbosity('INFO')
    logger = logging.getLogger('azureml.automl.core.scoring_script')
except:
    pass

def init():
    global model, probabilities, gold_to_score_df, output_scored_gold,datastore,historic_path,last_gold_run,run_id,active_folder,date_in,model_version_in

    parser = argparse.ArgumentParser("Scoring the model")
    parser.add_argument('--input_gold_name', dest="input_gold_name", type=str, required=True)
    parser.add_argument('--target_column_name', dest="target_column_name", type=str, required=True)
    parser.add_argument('--par_esml_scoring_date', dest="par_esml_scoring_date", required=True)
    parser.add_argument('--par_esml_model_version', dest="par_esml_model_version", required=False)
    parser.add_argument('--esml_output_lake_template', dest="esml_output_lake_template", required=False)
    
    args = parser.parse_args()

    try:
        logger.info("Loading model from path: model.pkl")
        model = joblib.load("model.pkl")

        logger.info("Loading data to score")
        run = Run.get_context()
        ws = run.experiment.workspace
        datastore = ws.get_default_datastore()

        gold_to_score = Dataset.get_by_name(workspace=ws, name=args.input_gold_name) #Latest should be correct version
        gold_to_score_df = gold_to_score.to_pandas_dataframe().reset_index(drop=True)

        if args.target_column_name in gold_to_score_df: # REMOVE TARGET column, if exists (for demo, sometimes TESTSET are used to score)
            gold_to_score_df.drop(columns=[args.target_column_name], inplace=True)
            print("Dropped target column: {}".format(args.target_column_name))
        
        logger.info("Dataset GOLD to score, loaded successfully")
        logger.info("Loading SCORED_GOLD path, via OutputFileDatasetConfig to from output_datasets[next(iter(run.output_datasets)) ]")

        ### PATHS - save in 2 places
        # 1) Save LATEST GOLD_SCORED - for Azure Data factory able to know the PATH, since static in time, able to "WriteBack" scored data
        it = iter(run.output_datasets)
        output_scored_gold_name =  next(it) # Get 1st key in dictionary
        output_scored_gold = run.output_datasets[output_scored_gold_name]

        # 2) Save META data:"score_gold path, run_id, pipeline_id etc
        last_gold_run_name =  next(it) # Save meta as dataset also, for visibility in Azure ML Studio
        last_gold_run = run.output_datasets[last_gold_run_name]

        active_folder_name =  next(it) # 3rd item. Good to show where files are LOCATED inlake aslo, for Azure Data factory
        active_folder = run.output_datasets[active_folder_name]

        # 2) Save HISTORIC scoring - with parameter in real time: DATE_FOLDER, MODEL_VERSIOM from calling applicatiom (Data factory)
        date_in = args.par_esml_scoring_date
        date_infolder = datetime.datetime.strptime(date_in, '%Y-%m-%d %H:%M:%S.%f') # UTC string to DateTime object
        esml_scoring_date_out = date_infolder.strftime('%Y/%m/%d') #  Save scoring same date as IN data 'in/2020/01/01' and 'gold_scored/2020/01/01' (but can be different, depends on choice of meta)
        run_id = run.id
        model_version_in = args.par_esml_model_version
        historic_path = args.esml_output_lake_template.format(model_version = model_version_in, date_folder = esml_scoring_date_out,id_folder= run_id)
        # Example: 'projects/project002/11_diabetes_model_reg/inference/{model_version}/gold/[dev]/{date_folder}/{id_folder}/'  ...where [dev] is set during [CREATION] not {RUNTIME} parameter.

    except Exception as e:
        logging_utilities.log_traceback(e, logger)
        raise


#@input_schema('gold_to_score_df', PandasParameterType(input_sample))
#@output_schema(NumpyParameterType(output_sample))
def run(gold_to_score_df):
    try:
        logger.info("model.predict with gold_to_score")
        X_test_df = gold_to_score_df.reset_index(drop=True)
        result = model.predict(X_test_df)

        # Format result to a dataframe, join SCORING with its FEATURES
        df_res  = pd.DataFrame(result, columns=['prediction'])
        df_out = gold_to_score_df.join(df_res[['prediction']],how = 'left')
        
        logger.info("Saving prediction to GOLD_SCORED dataset")
        if not (output_scored_gold is None):
            os.makedirs(output_scored_gold, exist_ok=True)
            print("%s created" % output_scored_gold)
            path = output_scored_gold + "/gold_scored.parquet"
            logger.info("Saving result as PARQUET at: {}".format(path))
            written_df = df_out.to_parquet(path,engine='pyarrow', index=False,use_deprecated_int96_timestamps=True,allow_truncated_timestamps=False)

            # Alt 2) Note: This can also be done by Azure Data factory instead of this CPU cluster node. In a ADF Copy activity, post this pipeline, using 'latest_batch_score_run.csv file that has the 'historic_path'
            print("Also save to HISTORIC path, output_scored_gold is {}".format(output_scored_gold))
            FileDatasetFactory.upload_directory(src_dir=output_scored_gold, target=(datastore, historic_path), pattern=None, overwrite=True, show_progress=False)

        last_gold_run_filename = "last_gold_run.csv"
        if not (last_gold_run is None):
            os.makedirs(last_gold_run, exist_ok=True)
            print("%s created" % last_gold_run)
            path_last_gold_run = last_gold_run + "/"+last_gold_run_filename
            logger.info("Saving last_gold_run.csv at: {}".format(path_last_gold_run))

            # create the pandasd dataframe with meta, save to .csv for "Azure datafactory WriteBack pipeline/step" to use
            date_now_str = str(datetime.datetime.now())

            last_gold_run_data = [[run_id, historic_path,date_in,date_now_str,model_version_in]]
            df2 = pd.DataFrame(last_gold_run_data, columns = ['pipeline_run_id', 'scored_gold_path', 'date_in_parameter', 'date_at_pipeline_run','model_version'])
            written_df2 = df2.to_csv(path_last_gold_run, encoding='utf-8',index=False)

            # Also save full FOLDER
        if not (active_folder is None):
            os.makedirs(active_folder, exist_ok=True)
            path_active_folder = active_folder + "/"+last_gold_run_filename
            written_df3 = df2.to_csv(path_active_folder, encoding='utf-8',index=False) # DUMMY 2nd Write needed?

    except Exception as e:
        logging_utilities.log_traceback(e, logger)
        raise

if __name__ == "__main__":
    init()
    run(gold_to_score_df)

Overwriting ./common/pipeline/scripts/score_gold.py


In [22]:
str(datetime.datetime.now())

'2021-08-20 15:20:26.287695'

# 10 - `GOLD PIPELINE`: Put it together and RUN

In [23]:
par_esml_scoring_date.default_value

'2021-06-22 15:35:01.243860'

In [24]:
par_esml_model_version.default_value

1

In [25]:
from azureml.pipeline.core import Pipeline
from azureml.pipeline.core import StepSequence
from azureml.widgets import RunDetails
from azureml.core import Experiment

pipeline = Pipeline(workspace = p.ws, steps= [step_diabetes2silver,step_other2silver, step_score_gold]) # This works also. Looks better, since not "double up" on dependancy arrows in Azure ML Studio
print("Pipeline is built.")

# Create Experiment
experiment_name = p.experiment_name + "_batch_scoring_pipe" if (p.experiment_name is not None) else "11_diabetes_batch_scoring_pipe"
experiment = Experiment(p.ws,experiment_name)

# PARAMETERS: scoring_date
scoring_date = '2021-06-23 10:35:01.243860'
par_dic = {par_esml_model_version.name: 1, par_esml_scoring_date.name: scoring_date}
pipeline_run = experiment.submit(pipeline, regenerate_outputs=True,pipeline_parameters=par_dic,
    tags={
            "training_run_id": best_run.id,
            "run_algorithm": best_run.properties["run_algorithm"],
            "valid_score": best_run.properties["score"],
            "primary_metric": best_run.properties["primary_metric"],
        })

print("Pipeline submitted for execution.")
pipeline_run.log("run_algorithm", best_run.properties["run_algorithm"])

# ERROR: Can't resolve parameters, top level graph doesn't contain parameter with name data_path_pipeline_param


Pipeline is built.
Created step BRONZE to SILVER [4af08a8f][18de75d6-237b-49cf-be0d-0c2d966bf156], (This step will run and generate new outputs)
Created step BRONZE to SILVER [4b8d688b][8ea13e25-727d-4733-a4ec-63fcb110e729], (This step will run and generate new outputs)
Created step SCORING GOLD [fa5d2bf3][8d6e178d-42bc-4eb9-8251-8843c495f0f1], (This step will run and generate new outputs)
Created step SILVER's merged to GOLD_TO_SCORE [99abcffd][65e3c334-d57a-406c-a558-a94635161afb], (This step will run and generate new outputs)
Submitted PipelineRun 06465ecb-2d1b-460d-b3b4-0e2bb268b22d
Link to Azure Machine Learning Portal: https://ml.azure.com/runs/06465ecb-2d1b-460d-b3b4-0e2bb268b22d?wsid=/subscriptions/ca0a8c40-b06a-4e4e-8434-63c03a1dee34/resourcegroups/MSFT-WEU-EAP_PROJECT02_AI-DEV-RG/workspaces/msft-weu-DEV-eap-proj02_ai-amls&tid=72f988bf-86f1-41af-91ab-2d7cd011db47
Pipeline submitted for execution.


In [26]:
# Before we proceed we need to wait for the run to complete.
pipeline_run.wait_for_completion(show_output=False)

PipelineRunId: 06465ecb-2d1b-460d-b3b4-0e2bb268b22d
Link to Azure Machine Learning Portal: https://ml.azure.com/runs/06465ecb-2d1b-460d-b3b4-0e2bb268b22d?wsid=/subscriptions/ca0a8c40-b06a-4e4e-8434-63c03a1dee34/resourcegroups/MSFT-WEU-EAP_PROJECT02_AI-DEV-RG/workspaces/msft-weu-DEV-eap-proj02_ai-amls&tid=72f988bf-86f1-41af-91ab-2d7cd011db47
{'runId': '06465ecb-2d1b-460d-b3b4-0e2bb268b22d', 'status': 'Completed', 'startTimeUtc': '2021-08-20T13:20:32.995606Z', 'endTimeUtc': '2021-08-20T13:32:03.903101Z', 'properties': {'azureml.runsource': 'azureml.PipelineRun', 'runSource': 'SDK', 'runType': 'SDK', 'azureml.parameters': '{"esml_inference_model_version":"1","esml_scoring_folder_date":"2021-06-23 10:35:01.243860"}'}, 'inputDatasets': [], 'outputDatasets': [], 'logFiles': {'logs/azureml/executionlogs.txt': 'https://sajxvzyuylcu5jc.blob.core.windows.net/azureml/ExperimentRun/dcid.06465ecb-2d1b-460d-b3b4-0e2bb268b22d/logs/azureml/executionlogs.txt?sv=2019-07-07&sr=b&sig=hek%2Fuwxy9LilRLCD3bo

'Finished'

## Re-run pipline, with NEW datefolder (`23rd` instead of `22nd`), and new model version `1` instead of `4`
- ADF ingest https://docs.microsoft.com/en-us/azure/machine-learning/how-to-data-ingest-adf

In [31]:
new_scoring_date = '2021-06-23 15:35:01.243860'
par_dic = {par_esml_model_version.name: 1, par_esml_scoring_date.name: new_scoring_date}
pipeline_run = experiment.submit(pipeline, regenerate_outputs=True,pipeline_parameters=par_dic)

Created step BRONZE to SILVER [03db1c1d][a0e1f4ce-f858-4a78-8072-3fadbfe75280], (This step will run and generate new outputs)
Created step BRONZE to SILVER [7d68defe][0ac3a0de-a38d-4d66-9c23-bd8c98333244], (This step will run and generate new outputs)
Created step SCORING GOLD [a5fa1d3c][f0d5204b-2214-4ab3-8732-d65e8f6d117a], (This step will run and generate new outputs)
Created step SILVER's merged to GOLD_TO_SCORE [f61e6a46][ff273512-e681-4a10-9a38-80893b5a2a9e], (This step will run and generate new outputs)
Submitted PipelineRun 7c893dfd-6a9f-4594-a4b0-99efa43c625d
Link to Azure Machine Learning Portal: https://ml.azure.com/runs/7c893dfd-6a9f-4594-a4b0-99efa43c625d?wsid=/subscriptions/ca0a8c40-b06a-4e4e-8434-63c03a1dee34/resourcegroups/MSFT-WEU-EAP_PROJECT02_AI-DEV-RG/workspaces/msft-weu-DEV-eap-proj02_ai-amls&tid=72f988bf-86f1-41af-91ab-2d7cd011db47


In [32]:
# Before we proceed we need to wait for the run to complete.
pipeline_run.wait_for_completion(show_output=False)

PipelineRunId: 7c893dfd-6a9f-4594-a4b0-99efa43c625d
Link to Azure Machine Learning Portal: https://ml.azure.com/runs/7c893dfd-6a9f-4594-a4b0-99efa43c625d?wsid=/subscriptions/ca0a8c40-b06a-4e4e-8434-63c03a1dee34/resourcegroups/MSFT-WEU-EAP_PROJECT02_AI-DEV-RG/workspaces/msft-weu-DEV-eap-proj02_ai-amls&tid=72f988bf-86f1-41af-91ab-2d7cd011db47
{'runId': '7c893dfd-6a9f-4594-a4b0-99efa43c625d', 'status': 'Completed', 'startTimeUtc': '2021-08-20T10:47:55.562889Z', 'endTimeUtc': '2021-08-20T11:00:38.274031Z', 'properties': {'azureml.runsource': 'azureml.PipelineRun', 'runSource': 'SDK', 'runType': 'SDK', 'azureml.parameters': '{"esml_inference_model_version":"1","esml_scoring_folder_date":"2021-06-23 15:35:01.243860"}'}, 'inputDatasets': [], 'outputDatasets': [], 'logFiles': {'logs/azureml/executionlogs.txt': 'https://sajxvzyuylcu5jc.blob.core.windows.net/azureml/ExperimentRun/dcid.7c893dfd-6a9f-4594-a4b0-99efa43c625d/logs/azureml/executionlogs.txt?sv=2019-07-07&sr=b&sig=%2B55PIhIEohZAdSkDD5U

'Finished'

In [33]:
# Get Steps
for step in pipeline_run.get_steps():
    print("Outputs of step " + step.name)
    
    # Get a dictionary of StepRunOutputs with the output name as the key 
    output_dict = step.get_outputs()
    
    for name, output in output_dict.items():
        
        output_reference = output.get_port_data_reference() # Get output port data reference
        print("\tname: " + name)
        print("\tdatastore: " + output_reference.datastore_name)
        print("\tpath on datastore: " + output_reference.path_on_datastore)

Outputs of step SCORING GOLD
	name: M11_GOLD_SCORED_RUNINFO
	datastore: project002
	path on datastore: last_gold_scored
	name: M11_GOLD_SCORED
	datastore: project002
	path on datastore: projects/project002/11_diabetes_model_reg/inference/{model_version}/scored/dev/0bda3163-b237-410a-8165-33ae9e87fa71
	name: M11_active_folder
	datastore: project002
	path on datastore: projects/project002/11_diabetes_model_reg/inference/active
Outputs of step SILVER's merged to GOLD_TO_SCORE
	name: M11_GOLD_TO_SCORE
	datastore: project002
	path on datastore: projects/project002/11_diabetes_model_reg/inference/0/gold/dev/
Outputs of step BRONZE to SILVER
	name: M11_ds01_diabetes_inference_SILVER
	datastore: project002
	path on datastore: projects/project002/11_diabetes_model_reg/inference/1/ds01_diabetes/out/silver/dev/835da2f9-9bef-48ea-a932-a731e18ed1c1/
Outputs of step BRONZE to SILVER
	name: M11_ds02_other_inference_SILVER
	datastore: project002
	path on datastore: projects/project002/11_diabetes_mode

In [None]:
# functions to download output to local and fetch as dataframe
def get_download_path(download_path, output_name):
    output_folder = os.listdir(download_path + '/azureml')[0]
    path =  download_path + '/azureml/' + output_folder + '/' + output_name
    return path

def fetch_df(current_step, output_name):
    output_data = current_step.get_output_data(output_name)
    print(type(output_data)) # <class 'azureml.pipeline.core.graph.PortDataReference'>
    download_path = './outputs/' + output_name
    output_data.download(download_path, overwrite=True) # # AttributeError: 'AzureDataLakeGen2Datastore' object has no attribute 'download'
    df_path = get_download_path(download_path, output_name) + '/silver.parquet'
    return pd.read_parquet(df_path)

### View SILVER diabetes, other

In [None]:
diabetes_silver_step = pipeline_run.find_step_run(step_diabetes2silver.name)[0]
other_silver_step = pipeline_run.find_step_run(step_other2silver.name)[0]

# <class 'azureml.pipeline.core.graph.PortDataReference'>
# AttributeError: 'AzureDataLakeGen2Datastore' object has no attribute 'download'
diabetes_df = fetch_df(diabetes_silver_step, stepdata10_ds01_diabetes_silver.name)
other_df = fetch_df(other_silver_step, stepdata11_ds02_other_silver.name)

display(diabetes_df.head(5))
display(other_df.head(5))

### View `GOLD` dataset

In [None]:
merged_gold_step = pipeline_run.find_step_run(step_gold_merged.name)[0]
gold_df = fetch_df(merged_gold_step, gold_to_score.name)
display(gold_df.describe())

# `05b_BatchScoring pipeline`
- Used both for INFERENCE. 
- Depends on `02_Bronze2Gold`

In [None]:
import joblib
from azureml.core.model import Model
model_path = script_folder + "/model.pkl"
model = joblib.load(model_path)

# versus...
#my_model_path = Model.get_model_path(p.model_folder_name) # Model 11_diabetes_model_reg not found in cache at azureml-models or in current working directory 
#model2 = joblib.load(my_model_path)

In [None]:
import pandas as pd
df_res  = pd.DataFrame(result, columns=['prediction'])
df_out = pd.merge(gold_to_score_df,df_res[['prediction']],how = 'left',left_index = True, right_index = True)
df_out.head()

In [None]:
import pandas as pd
df_res2  = pd.DataFrame(result, columns=['prediction'])
df_out2 = gold_to_score_df.join(df_res2[['prediction']],how = 'left')
df_out2.head()