# 02_ml_pipeline

This notebook covers the following tasks:

1. Build the Pipeline
2. Compile the Pipeline
3. Execute the Pipeline on Vertex AI

## Setup

### Import libraries

In [1]:
%load_ext autotime

time: 318 µs (started: 2022-11-27 20:44:21 +00:00)


In [2]:
import os
import datetime
import json
import kfp
from kfp.v2.dsl import component
from kfp.v2 import dsl, compiler
from google.cloud import aiplatform as aip
from google_cloud_pipeline_components import aiplatform as gcc_aip
import gcsfs
import importlib

time: 3 s (started: 2022-11-27 20:44:21 +00:00)


### Set configurations and constants

In [3]:
NOTEBOOK = '02_ml_pipeline'
REGION = "us-west1"
PROJECT = 'babyweight-prediction'
BUCKET = 'b_w_bucket'
BQ_DATASET = "bw_dataset"
APPNAME = "bw-prediction"

os.environ["REGION"] = REGION
os.environ["PROJECT"] = PROJECT
os.environ["BUCKET"] = BUCKET

time: 1.09 ms (started: 2022-11-27 20:44:24 +00:00)


### Set service account access for Vertex AI Pipeline

In [4]:
SERVICE_ACCOUNT = "69318036822-compute@developer.gserviceaccount.com"

time: 1.03 ms (started: 2022-11-27 20:44:24 +00:00)


In [5]:
!gcloud config list

[core]
account = 69318036822-compute@developer.gserviceaccount.com
disable_usage_reporting = True
project = babyweight-prediction

Your active configuration is: [default]
time: 933 ms (started: 2022-11-27 20:44:24 +00:00)


In [6]:
!gcloud auth list

                  Credentialed Accounts
ACTIVE  ACCOUNT
*       69318036822-compute@developer.gserviceaccount.com

To set the active account, run:
    $ gcloud config set account `ACCOUNT`

time: 938 ms (started: 2022-11-27 20:44:25 +00:00)


### Initialize the Vertex AI SDK & Tensorboard Instance

In [7]:
aip.init(project=PROJECT, location=REGION, staging_bucket=BUCKET)

time: 649 µs (started: 2022-11-27 20:44:26 +00:00)


In [8]:
## Create a Tensorboard instance
#tb = aip.Tensorboard.create(display_name=f"{APPNAME}_{NOTEBOOK}", description=NOTEBOOK)
#tb.resource_name

time: 1.15 ms (started: 2022-11-27 20:44:26 +00:00)


## 1. Construct the Pipeline

### 1.1. Define pipeline constants

In [9]:
#TIMESTAMP = datetime.datetime.now().strftime('%Y%m%d%H%M%S')
PIPELINE_URI = f"gs://{BUCKET}/{APPNAME}"

#Data: extract_source_data
SOURCE_BQ_TABLE_ID = "bigquery-public-data.samples.natality"
YEAR = 2003
EXTRACTED_BQ_TABLE_ID = f"{PROJECT}.{BQ_DATASET}.{APPNAME}_extracted_{YEAR}"

#Data: prepped_table_creating_task
LIMIT = 500000 #changed to 500,000
PREPPED_BQ_TABLE_ID = f"{PROJECT}.{BQ_DATASET}.{APPNAME}_extracted_{YEAR}_limit_{LIMIT}" 
VERTEX_DATASET_DISPLAY_NAME = f"{APPNAME}_extracted_{YEAR}_limit_{LIMIT}_dataset"

## bqml
BQML_MODEL_NAME = f"bqml_v2_{APPNAME}"
BQML_MODEL_VERSION_ALIASES = BQML_MODEL_NAME
VAR_TARGET = "weight_pounds"
#
##automl
AUTOML_MODEL_NAME = f"automl_{APPNAME}"
COLUMN_SPECS = {
    "is_male": "categorical",
    "mother_age": "numeric",
    "plurality": "categorical",
    "gestation_weeks": "numeric",
    "cigarette_use": "categorical",
    "alcohol_use": "categorical",
}
#
##model validation
REFERENCE_METRIC_NAME = "mae"
THRESHOLDS_DICT = {"mae": 50.0}
#
##deployment
ENDPOINT_DISPLAY_NAME = f"{APPNAME}_endpoint"
DEPLOY_IMAGE="us-docker.pkg.dev/vertex-ai/prediction/tf2-cpu.2-7:latest"
DEPLOY_MACHINE_TYPE = "n1-highmem-4"
#DEPLOY_MACHINE_TYPE = "n1-standard-1"
DEPLOY_MIN_REPLICA_COUNT = 1
DEPLOY_MAX_REPLICA_COUNT = 1

#EXPLANATION_PARAMATERS = {"sampledShapleyAttribution": {"pathCount": 10}}
#EXPLANATION_METADATA_JSON_PATH = f"{CUSTOM_TRAININGS_URI[5:]}/explanation_metadata.json"


time: 3.43 ms (started: 2022-11-27 20:44:26 +00:00)


### 1.3. BQML queries

In [10]:
from src.pipeline import data_comp
from src.pipeline import bqml_comp
from src.pipeline import automl_comp
from src.pipeline import model_validation_comp
from src.pipeline import serving_validation_comp
#
importlib.reload(data_comp)
importlib.reload(bqml_comp)
importlib.reload(automl_comp)
importlib.reload(model_validation_comp)
importlib.reload(serving_validation_comp)

<module 'src.pipeline.serving_validation_comp' from '/home/jupyter/babyweight-vertex-ai/src/pipeline/serving_validation_comp.py'>

time: 82.7 ms (started: 2022-11-27 20:44:26 +00:00)


In [11]:
QUERY_BUILDING_BQML_MODEL = bqml_comp.create_query_build_bqml_model(
    project=PROJECT,
    bq_dataset=BQ_DATASET,
    bq_model_name=BQML_MODEL_NAME,
    bq_version_aliases=BQML_MODEL_VERSION_ALIASES,
    var_target=VAR_TARGET,
    bq_train_table_id=PREPPED_BQ_TABLE_ID
)
print("QUERY_BUILDING_BQML_MODEL:", QUERY_BUILDING_BQML_MODEL)

QUERY_BUILDING_BQML_MODEL: 
    CREATE MODEL IF NOT EXISTS `babyweight-prediction.bw_dataset.bqml_v2_bw-prediction`
    OPTIONS(
        model_type = 'DNN_LINEAR_COMBINED_REGRESSOR',
        model_registry = 'vertex_ai', 
        vertex_ai_model_version_aliases = ['bqml_v2_bw-prediction'],
        input_label_cols = ['weight_pounds'],
        data_split_col = 'custom_splits',
        data_split_method = 'CUSTOM',
        HIDDEN_UNITS = [256, 128, 64],
        OPTIMIZER = 'adagrad',
        BATCH_SIZE = HPARAM_CANDIDATES([16, 32, 64]),
        DROPOUT =  HPARAM_CANDIDATES([0, 0.1, 0.2]),
        MAX_ITERATIONS = 5,
        MAX_PARALLEL_TRIALS = 4,
        NUM_TRIALS = 24
        ) AS
    SELECT * EXCEPT(splits),
        CASE
            WHEN splits = 'VALIDATE' THEN 'EVAL'
            ELSE splits
        END AS custom_splits
    FROM `babyweight-prediction.bw_dataset.bw-prediction_extracted_2003_limit_500000`
    
time: 1.01 ms (started: 2022-11-27 20:44:26 +00:00)


### 1.4. Define the pipeline

In [12]:
@dsl.pipeline(name = APPNAME, pipeline_root = PIPELINE_URI)
def vertex_ai_pipeline(
    service_account: str=SERVICE_ACCOUNT,
    project: str=PROJECT,
    region: str=REGION,
):
    from google_cloud_pipeline_components import aiplatform as gcpc_aip
    from google_cloud_pipeline_components.v1.bigquery import BigqueryCreateModelJobOp
    from google_cloud_pipeline_components.v1.bigquery import BigqueryEvaluateModelJobOp
    
    ##################################
    #extract data from original natality dataset
    extracted_source_data_task = data_comp.extract_source_data(
        project=project,
        region=region,
        year=YEAR,
        in_bq_table_id=SOURCE_BQ_TABLE_ID, 
        out_bq_table_id=EXTRACTED_BQ_TABLE_ID,  
    ).set_display_name('create_extracted_table') 
    
    ##################################
    
    prepped_table_creating_task = data_comp.preprocess_data(
        project=project,
        region=region,
        limit=LIMIT,
        in_bq_table_id=EXTRACTED_BQ_TABLE_ID, 
        out_bq_table_id=PREPPED_BQ_TABLE_ID,  
    ).set_display_name('create_prepped_table').after(extracted_source_data_task)
    
    
    vertex_dataset_creating_task = gcc_aip.TabularDatasetCreateOp(
        project=project,
        display_name=VERTEX_DATASET_DISPLAY_NAME,
        bq_source=prepped_table_creating_task.outputs["out_bq_table_uri"],
    ).set_display_name('create_vertex_dataset')    

    ##################################
    
    bqml_creating_task = BigqueryCreateModelJobOp(
        project=project, 
        location="US",
        query=QUERY_BUILDING_BQML_MODEL,
    ).set_display_name('create_bqml_model').after(prepped_table_creating_task)
    
    bqml_evaluating_task = BigqueryEvaluateModelJobOp(
        project=project, 
        location="US",
        model=bqml_creating_task.outputs["model"]
    ).set_display_name('evaluate_bqml_model')
    
    bqml_evaluation_metrics_collecting_task = bqml_comp.collect_eval_metrics_bqml(
        eval_metrics_artifact=bqml_evaluating_task.outputs["evaluation_metrics"]
    )

    ###################################
    
    automl_creating_task = gcc_aip.AutoMLTabularTrainingJobRunOp(
        project=project,
        display_name=f"{AUTOML_MODEL_NAME}_TrainingJob",
        optimization_prediction_type="regression",
        optimization_objective="minimize-rmse",
        budget_milli_node_hours=3000, #changed from 1,000
        model_display_name=AUTOML_MODEL_NAME,
        dataset=vertex_dataset_creating_task.outputs["dataset"],
        column_specs=COLUMN_SPECS,
        target_column=VAR_TARGET,
    ).set_display_name('create_automl_model')
    
    automl_evaluation_metrics_collecting_task = automl_comp.collect_eval_metrics_automl(
        region='us-central1', 
        model=automl_creating_task.outputs["model"]
    )
    
    ###################################
    
    best_model_task = model_validation_comp.select_best_model(
        automl_metrics=automl_evaluation_metrics_collecting_task.outputs["metrics"],
        automl_model=automl_creating_task.outputs["model"],
        bqml_metrics=bqml_evaluation_metrics_collecting_task.outputs["metrics"],
        bqml_model=bqml_creating_task.outputs["model"],  #Argument type "Model" is incompatible with the input type "google.VertexModel"
        reference_metric_name=REFERENCE_METRIC_NAME,
        thresholds_dict=THRESHOLDS_DICT,
    )
    
    ##### DEPLOYMENT ############################### 
    endpoint_creating_task = gcc_aip.EndpointCreateOp(
        project=project,
        location='us-central1',
        #location=region,
        display_name=ENDPOINT_DISPLAY_NAME,
    ).set_display_name('create_endpoint')

    #===============================#    
    # If the deploy condition is True, then deploy the best model.
    
    with dsl.Condition(
        best_model_task.outputs["deploy_decision"] == "true",
        name="deploy_decision",
    ):
               
        #===============================#
        with dsl.Condition(
            best_model_task.outputs["best_model_name"] == "automl",
            name="deploy_automl",
        ): 
        
            # deploy the model to endpoint
            automl_model_deploy_task = gcc_aip.ModelDeployOp(
                #model=best_model_task.outputs["best_model"],
                model=automl_creating_task.outputs["model"],
                endpoint=endpoint_creating_task.outputs["endpoint"],
                dedicated_resources_machine_type=DEPLOY_MACHINE_TYPE,
                dedicated_resources_min_replica_count=DEPLOY_MIN_REPLICA_COUNT,
                dedicated_resources_max_replica_count=DEPLOY_MAX_REPLICA_COUNT,
                traffic_split={"0": 100},
            ).set_display_name('deploy_model_to_endpoint')    
            
    
       
    

time: 3.33 ms (started: 2022-11-27 20:44:26 +00:00)


## 2. Compile the Pipeline

In [13]:
# Create a new (local) directory to store the complied file
DIR = f"temp/{NOTEBOOK}"
!rm -rf {DIR}
!mkdir -p {DIR}

time: 242 ms (started: 2022-11-27 20:44:26 +00:00)


In [14]:
compiled_package = f"{DIR}/compiled_package.json"

compiler.Compiler().compile(
    pipeline_func = vertex_ai_pipeline,
    package_path = compiled_package
)



time: 674 ms (started: 2022-11-27 20:44:26 +00:00)


## 3. Execute the Pipeline on Vertex AI

### 3.1. Create Vertex AI Pipeline Job

In [15]:
pipeline_job = aip.PipelineJob(
    display_name = f"{APPNAME}",
    template_path = compiled_package,
    pipeline_root=PIPELINE_URI,
    parameter_values = {
        "project": PROJECT,
        "region": REGION,
    },
    #enable_caching = False,        #//TRUE, by default//
    labels = {
        'notebook':f'{NOTEBOOK}',
        'app':f'{APPNAME}'}
)

time: 184 ms (started: 2022-11-27 20:44:27 +00:00)


### 3.2. Run the job

In [16]:
response = pipeline_job.run(
    service_account = SERVICE_ACCOUNT,
    #sync = True
)

Creating PipelineJob
PipelineJob created. Resource name: projects/69318036822/locations/us-west1/pipelineJobs/bw-prediction-20221127204427
To use this PipelineJob in another session:
pipeline_job = aiplatform.PipelineJob.get('projects/69318036822/locations/us-west1/pipelineJobs/bw-prediction-20221127204427')
View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-west1/pipelines/runs/bw-prediction-20221127204427?project=69318036822
PipelineJob projects/69318036822/locations/us-west1/pipelineJobs/bw-prediction-20221127204427 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/69318036822/locations/us-west1/pipelineJobs/bw-prediction-20221127204427 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/69318036822/locations/us-west1/pipelineJobs/bw-prediction-20221127204427 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/69318036822/locations/us-west1/pipelineJobs/bw-prediction-20221127204427 current state

## 4. View the Pipeline results

In [17]:
pipeline_df = aip.get_pipeline_df(APPNAME)
pipeline_df = pipeline_df[pipeline_df.pipeline_name == APPNAME]
pipeline_df

Unnamed: 0,pipeline_name,run_name,param.input:project,param.input:service_account,param.input:region,metric.rSquared,metric.rootMeanSquaredError,metric.meanAbsoluteError,metric.meanAbsolutePercentageError,metric.rootMeanSquaredLogError,metric.framework,metric.r2_score,metric.mean_squared_log_error,metric.median_absolute_error,metric.mean_absolute_error,metric.mean_squared_error,metric.root_mean_squared_error,metric.explained_variance,metric.trial_id
0,bw-prediction,bw-prediction-20221127204427,babyweight-prediction,69318036822-compute@developer.gserviceaccount.com,us-west1,0.386401,1.010693,0.782775,12.299472,0.136865,AutoML,0.347389,0.022611,0.660123,0.812205,1.093847,1.045872,0.357209,9.0
1,bw-prediction,bw-prediction-20221127202315,babyweight-prediction,69318036822-compute@developer.gserviceaccount.com,us-west1,,,,,,BQML,0.443393,0.021446,0.675051,0.843462,1.173432,1.08325,0.445383,2.0
2,bw-prediction,bw-prediction-20221127200235,babyweight-prediction,69318036822-compute@developer.gserviceaccount.com,us-west1,,,,,,BQML,0.443393,0.021446,0.675051,0.843462,1.173432,1.08325,0.445383,2.0
3,bw-prediction,bw-prediction-20221124191247,babyweight-prediction,69318036822-compute@developer.gserviceaccount.com,us-west1,0.483891,1.071977,0.823724,13.171675,0.145592,BQML,0.443393,0.021446,0.675051,0.843462,1.173432,1.08325,0.445383,2.0
4,bw-prediction,bw-prediction-20221124180611,babyweight-prediction,69318036822-compute@developer.gserviceaccount.com,us-west1,0.483891,1.071977,0.823724,13.171675,0.145592,BQML,0.443393,0.021446,0.675051,0.843462,1.173432,1.08325,0.445383,2.0
5,bw-prediction,bw-prediction-20221124180033,babyweight-prediction,69318036822-compute@developer.gserviceaccount.com,us-west1,0.483891,1.071977,0.823724,13.171675,0.145592,AutoML,0.443393,0.021446,0.675051,0.843462,1.173432,1.08325,0.445383,2.0
6,bw-prediction,bw-prediction-20221124175128,babyweight-prediction,69318036822-compute@developer.gserviceaccount.com,us-west1,0.483891,1.071977,0.823724,13.171675,0.145592,BQML,0.443393,0.021446,0.675051,0.843462,1.173432,1.08325,0.445383,2.0
7,bw-prediction,bw-prediction-20221124174620,babyweight-prediction,69318036822-compute@developer.gserviceaccount.com,us-west1,,,,,,BQML,0.443393,0.021446,0.675051,0.843462,1.173432,1.08325,0.445383,2.0
8,bw-prediction,bw-prediction-20221124045408,babyweight-prediction,69318036822-compute@developer.gserviceaccount.com,us-west1,,,,,,BQML,0.443393,0.021446,0.675051,0.843462,1.173432,1.08325,0.445383,2.0
9,bw-prediction,bw-prediction-20221124042211,babyweight-prediction,69318036822-compute@developer.gserviceaccount.com,us-west1,,,,,,BQML,0.443393,0.021446,0.675051,0.843462,1.173432,1.08325,0.445383,2.0


time: 1.84 s (started: 2022-11-28 01:01:11 +00:00)


In [18]:
PROJECT_NUMBER = pipeline_job.resource_name.split("/")[1]
PIPELINE_JOB_ID = pipeline_job.name
PIPELINE_TASKS_URI = f"{PIPELINE_URI}/{PROJECT_NUMBER}/{PIPELINE_JOB_ID}"

print("PROJECT_NUMBER:",PROJECT_NUMBER)
print("PIPELINE_JOB_ID:",PIPELINE_JOB_ID)
print("PIPELINE_TASKS_URI:",PIPELINE_TASKS_URI)

PROJECT_NUMBER: 69318036822
PIPELINE_JOB_ID: bw-prediction-20221127204427
PIPELINE_TASKS_URI: gs://b_w_bucket/bw-prediction/69318036822/bw-prediction-20221127204427
time: 1.32 ms (started: 2022-11-28 01:01:13 +00:00)


In [19]:
all_pipeline_tasks = pipeline_job.gca_resource.job_detail.task_details
endpoint_create_task_index = 0 
print("All pipeline tasks:")
for i in range (len(all_pipeline_tasks)):
    print(f" {i}: {all_pipeline_tasks[i].task_name}")
    if all_pipeline_tasks[i].task_name =="endpoint-create": endpoint_create_task_index = i 

print("--- Index of `endpoint-create` task:", endpoint_create_task_index)

All pipeline tasks:
 0: tabular-dataset-create
 1: bigquery-evaluate-model-job
 2: bw-prediction-20221127204427
 3: endpoint-create
 4: collect-eval-metrics-bqml
 5: automl-tabular-training-job
 6: model-deploy
 7: preprocess-data
 8: select-best-model
 9: bigquery-create-model-job
 10: condition-deploy-decision-1
 11: condition-deploy-automl-2
 12: collect-eval-metrics-automl
 13: extract-source-data
--- Index of `endpoint-create` task: 3
time: 3.89 ms (started: 2022-11-28 01:01:13 +00:00)


In [20]:
endpoint_resource_name = all_pipeline_tasks[endpoint_create_task_index].outputs["endpoint"].artifacts[0].metadata["resourceName"]
endpoint_resource_name

'projects/69318036822/locations/us-central1/endpoints/4074389870305345536'

time: 5.63 ms (started: 2022-11-28 01:01:13 +00:00)


***Click here to verify:***  
https://console.cloud.google.com/vertex-ai/locations/us-central1/endpoints/5840539796048445440?project=asc-ahnat-adsi-sandbox