# 3Step Pipeline Example (Train, Evaluate, Register)
## mlflow.search_runs version

In [None]:
%%writefile search_scripts/train.py
# Copyright (c) Microsoft. All rights reserved.
# Licensed under the MIT license.

import argparse
import mlflow
import os
import json
import numpy as np
from sklearn.linear_model import Ridge
from sklearn.metrics import mean_squared_error
from sklearn.model_selection import train_test_split
import mlflow.sklearn

def parse_args():

    parser = argparse.ArgumentParser()
    parser.add_argument("--input_data", type=str, help="input data")
    parser.add_argument("--model_output", type=str, help="model output", default="./outputs")

    args = parser.parse_args()

    return args

args = parse_args()
lines = [
    f"Training data path: {args.input_data}",
    f"Model output path: {args.model_output}"
]
for line in lines:
    print(line)

with mlflow.start_run():
    mlflow.log_text('input_data', args.input_data)
    
    diabetes_data = np.loadtxt(args.input_data, delimiter=',',skiprows=1)
    X=diabetes_data[:,0:-1]
    y=diabetes_data[:,-1]
    columns = ['age', 'gender', 'bmi', 'bp', 's1', 's2', 's3', 's4', 's5', 's6']
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=0)
    data = {
        "train": {"X": X_train, "y": y_train},
        "test": {"X": X_test, "y": y_test}}

    mlflow.log_metric("Training samples", len(data['train']['X']))
    mlflow.log_metric("Test samples", len(data['test']['X']))

    # Log the algorithm parameter alpha to the run
    mlflow.log_metric('alpha', 0.03)
    # Create, fit, and test the scikit-learn Ridge regression model
    regression_model = Ridge(alpha=0.03)
    regression_model.fit(data['train']['X'], data['train']['y'])
    preds = regression_model.predict(data['test']['X'])

    # Log mean squared error
    mse = mean_squared_error(data['test']['y'], preds)
    print('Mean Squared Error is', mse)
    mlflow.log_metric('mse', mse)

    # Save the model to the outputs directory for capture
    mlflow.sklearn.save_model(regression_model, args.model_output)
    mlflow.log_artifact(args.model_output)


In [None]:
%%writefile search_scripts/evaluate.py

import argparse
import json
import mlflow
from mlflow.pyfunc import load_model
from mlflow.tracking import MlflowClient
import time

def parse_args():

    parser = argparse.ArgumentParser()
    parser.add_argument('--experiment_name', type=str, help='Experiment Name')
    parser.add_argument('--model_name', type=str, help='Name under which model will be registered')
    parser.add_argument('--model_path', type=str, help='Model directory')
    parser.add_argument('--output_path', type=str, help="eval output", default='./outputs')

    args, _ = parser.parse_known_args()
    print(f'Arguments: {args}')

    return args

def main():

    args = parse_args()

    model_name = args.model_name
    model_path = args.model_path
    output_path = args.output_path
    deploy_flag = 0
    
    mlflow.set_tag("model_name", model_name)
    mlflow.set_tag("model_path", model_path)

    time.sleep(60)

    #Experiment Name から最新の Run を取得する
    experimentName = args.experiment_name
    experiment = mlflow.get_experiment_by_name(experimentName)

    client = MlflowClient() 
    all_run = client.search_runs(
    experiment_ids = experiment.experiment_id,
    filter_string = "metrics.mse > 0"
    )
    #すでに実行順(昇順)になっている？ -> df を逆順に
    all_run.reverse()

    #[debug] search_runs をフィルタなしで全件 stdout
    all_run2 = client.search_runs(
    experiment_ids = experiment.experiment_id,
    filter_string = ""
    )
    all_run2.reverse()
    
    for ret in all_run2:
        print(ret.info.run_id, ret.data.metrics.get('mse', "N/A"), ret.data.metrics.get('run_mse', "N/A"))

    #前段の Run から精度(mse)を取得
    run_mse = all_run[0].data.metrics.get('mse', 0)
    run_id = all_run[0].info.run_id
    print("run_mse: " + str(run_mse))
    print("run_id: " + str(run_id))
    try:
        #最新の登録済みモデルを取得
        model_info = client.get_registered_model(model_name)
        model_tags = model_info.latest_versions[0].tags
        #Model の精度(mse)
        model_mse = float(model_tags["mse"])
        mlflow.log_metric('model_mse', model_mse)
        print("model_mse: " + str(model_mse))
        
        #mse 比較
        if run_mse < model_mse:
            print("精度が上回りました")
            deploy_flag = 1

        else:
            print("精度が上回りませんでした")
            deploy_flag = 0
    
    except:
        print("モデルがないよ→そのまま登録させる")
        deploy_flag = 1

    mlflow.log_metric('run_mse', run_mse)
    mlflow.set_tag("deploy_flag", deploy_flag)

    with open(output_path + "/output_evaluate.txt", "w") as f:
        f.write("hello")
    mlflow.log_artifact(output_path)

if __name__ == "__main__":
    main()


In [None]:
%%writefile search_scripts/register.py

import argparse
import json
import mlflow
from mlflow.pyfunc import load_model
from mlflow.tracking import MlflowClient
import time

def parse_args():

    parser = argparse.ArgumentParser()
    parser.add_argument('--experiment_name', type=str, help='Experiment Name')
    parser.add_argument('--model_name', type=str, help='Name under which model will be registered')
    parser.add_argument('--model_path', type=str, help='Model directory')
    parser.add_argument('--eval_path', type=str, help='eval directory')
    
    args, _ = parser.parse_known_args()
    print(f'Arguments: {args}')

    return args


def main():

    args = parse_args()

    model_name = args.model_name
    model_path = args.model_path
    eval_path = args.eval_path

    mlflow.set_tag("model_name", model_name)
    mlflow.set_tag("model_path", model_path)

    #Experiment Name から最新の Run を取得する
    experimentName = args.experiment_name
    experiment = mlflow.get_experiment_by_name(experimentName)

    time.sleep(60)

    client = MlflowClient() 
    all_run = client.search_runs(
    experiment_ids = experiment.experiment_id,
    filter_string = "metrics.run_mse > 0"
    )
    #すでに実行順(昇順)になっている？ -> df を逆順に
    all_run.reverse()
    
    #前段の Run から精度(mse)を取得
    run_mse = all_run[0].data.metrics.get('run_mse', 0)
    deploy_flag = int(all_run[0].data.tags.get('deploy_flag', 0))
    run_id = all_run[0].info.run_id
    print("run_mse: " + str(run_mse))
    print("deploy_flag: " + str(deploy_flag))
    print("run_id: " + str(run_id))

    mlflow.log_metric("run_mse", run_mse)
    mlflow.set_tag("deploy_flag", deploy_flag)

    if deploy_flag==1:
        #model_path から model をロード
        model = load_model(model_path) 
        # Log the sklearn model and register as version 1
        modelreg = mlflow.sklearn.log_model(
            sk_model=model,
            artifact_path=model_name,
            registered_model_name=model_name
        )
        print(modelreg)

        #log_model では モデルに tag が登録できないので、別途 set_model_version_tag 使ってる（エレガントではない）
        model_info = client.get_registered_model(model_name)
        model_version = model_info.latest_versions[0].version
        client.set_model_version_tag(model_name, str(model_version), "mse", run_mse)
        print("Model registered!")
    else:
        print("Model will not be registered!")

if __name__ == "__main__":
    main()




In [None]:
%%writefile 06_pipeline_job_3step_client.yml
$schema: https://azuremlschemas.azureedge.net/latest/pipelineJob.schema.json
type: pipeline
experiment_name: 06_pipeline_oh4ml_3step_client
description: 3Step Pipeline Example (Train, Evaluate, Register)

# <inputs_and_outputs>
inputs:
  input: 
    type: uri_file
    #path: azureml:diabetes_data_oh4ml@latest
    path: azureml:diabetes_data_oh4ml_350records@latest
  experiment_name: 06_pipeline_oh4ml_3step_client
  model_name: diabetes_model_oh4ml_client2
outputs: 
  trained_model:

settings:
  default_datastore: azureml:workspaceblobstore
  default_compute: azureml:eightcorecluster
  continue_on_step_failure: false

jobs:
  train_model:
    name: train_model
    display_name: train-model
    code: ./search_scripts
    command: >-
      python train.py 
      --input_data ${{inputs.diabetes_data}} 
      --model_output ${{outputs.model_output}}
    environment: azureml:diabetes-env-02@latest
    inputs:
      diabetes_data: ${{parent.inputs.input}}
    outputs:
      model_output: ${{parent.outputs.trained_model}}

  evaluate_model:
    name: evaluate_model
    display_name: evaluate-model
    code: ./search_scripts
    command: >-
      python evaluate.py 
      --experiment_name ${{inputs.experiment_name}} 
      --model_name ${{inputs.model_name}} 
      --model_path ${{inputs.model_path}} 
      --output_path ${{outputs.evaluate_output}}
    environment: azureml:diabetes-env-02@latest
    inputs:
      model_name: ${{parent.inputs.model_name}}
      model_path: ${{parent.jobs.train_model.outputs.model_output}}
      experiment_name: ${{parent.inputs.experiment_name}}
    outputs:
      evaluate_output: 

  register_model:
    name: register_model
    display_name: register-model
    code: ./search_scripts
    command: >-
      python register.py 
      --experiment_name ${{inputs.experiment_name}} 
      --model_name ${{inputs.model_name}} 
      --model_path ${{inputs.model_path}} 
      --eval_path ${{inputs.eval_path}}
    environment: azureml:diabetes-env-02@latest
    inputs:
      model_name: ${{parent.inputs.model_name}}
      model_path: ${{parent.jobs.train_model.outputs.model_output}}
      eval_path: ${{parent.jobs.evaluate_model.outputs.evaluate_output}}
      experiment_name: ${{parent.inputs.experiment_name}}

In [None]:
!az login

In [None]:
!az ml job create -n 04_test_pipeline-client-06 -f 06_pipeline_job_3step_client.yml