# BQML - LinRegression with Vizier HyperParam-Tuning + XAI on VertexAI Pipelines
**Training pipeline for BQML model using Vizier HyperParameter tuning for optimal model selection and explainability AI (XAI)**

The following creates a pipeline which trains a BigQuery Model using the hyperparameter tuning feature, evaluates and selects the optimal model and then deploys it to an end point if it meets the minimum threshold performance. The model is trained with the new XAI feature allowing us to obtain explainabilty at both global and prediction level. 


**Objective** 

Predict the tip amount for a NewYork taxi ride using a linear regression model. 

## Prerequisites
* Create or use a [Google Cloud Storage](https://console.cloud.google.com/storage) bucket to export the model to. <strong>Make sure to create the bucket in the same region where you will create Vertex AI Endpoint to host your model.</strong> 
* Create a BigQuery dataset to create the model in. <strong>Make sure to create the dataset in the same region where you will create Vertex AI Endpoint and bucket.</strong> 

In [None]:
# Build and push a docker image using Dockerfile as the base image for the Kubeflow pipeline components
!./dockerbuild.sh
# if Permission error run: chmod +x dockerbuild.sh 

In [1]:
# Check the KFP version, The KFP version should be >= 1.6. If lower, run !pip3 install --user kfp --upgrade, then restart the kernel
!python3 -c "import kfp; print('KFP version: {}'.format(kfp.__version__))"

KFP version: 1.8.11


## Set Variables

In [12]:
PATH=%env PATH
%env PATH={PATH}:/home/jupyter/.local/bin

# CHANGE the following settings
PROJECT_ID="hivedemoindia" #This is your GCP project ID
REGION='us-central1' # Vertex AI endpoint deployment region must match bucket region
USER = 'sourabhsjain'
BUCKET_NAME = 'hive-demos' 
BQ_DATASET_NAME="taxi" #This is the name of the target dataset where you model and predictions will be stored
MODEL_NAME='hp_xai_taxi_tip_model'

# Required Parameters for Vertex AI
PIPELINE_ROOT = 'gs://{}/pipeline_root/{}'.format(BUCKET_NAME, USER) #Cloud Storage URI that your pipelines service account can access.
BASE_IMAGE='gcr.io/{}/bq_vertexai_container:latest'.format(PROJECT_ID)  #This is the image built from the Dockfile in the same folder

env: PATH=/opt/conda/bin:/opt/conda/condabin:/usr/local/bin:/usr/bin:/bin:/usr/local/games:/usr/games:/home/jupyter/.local/bin:/home/jupyter/.local/bin


## Set Queries

In [13]:
## to 
import numpy

MODEL_QUERY = f"""
CREATE or REPLACE MODEL {BQ_DATASET_NAME}.{MODEL_NAME}
OPTIONS
  (model_type='linear_reg',
    num_trials=10,
    max_parallel_trials=2,
    enable_global_explain = TRUE) AS
SELECT
  * 
FROM(
    SELECT
 * EXCEPT(tip_amount,airport_fee,pickup_datetime,dropoff_datetime),
 tip_amount AS label
FROM
 `bigquery-public-data.new_york_taxi_trips.tlc_yellow_trips_2018`
WHERE
 tip_amount IS NOT NULL
LIMIT
 100000
)
"""

EVAL_QUERY= f"""
SELECT *
FROM
  ML.EVALUATE(MODEL {BQ_DATASET_NAME}.{MODEL_NAME})
ORDER BY  r2_score desc 
LIMIT 1"""

# Specifiy the Named Tuple output format of the evaluation query based on the output format of the EVAL_QUERY. 
# i.e the above EVAL_QUERY results in a table with a number of evaluation metrics: 
# ['trial_id', 'mean_absolute_error'... 'r2_score'], for the output of the evaluation component we only want to display the trial_id and r2_score
# so here we define the output tuple format.

EVAL_OUTPUT_TUPLE = [("trial_id", numpy.int64), ("r2_score", float)] 

EVAL_THRESHOLD = ("r2_score", 0.5) # specify the metric and the threshold value required to deploy the model. Ensure this metric is defined in EVAL_OUTPUT_TUPLE 

## Create Custom Component Functions

* **BigQuery function**.-  A generic BigQuery function that runs a BigQuery query and returns the table/model created. This will be re-used to return BigQuery results for all the different segments of the BigQuery process in the Kubeflow Pipeline. You will see later in the tutorial where this function is being passed as parameter (ddlop) to other functions to perform certain BigQuery operation.

* **Training helper function**
* **Evaluate function** - Outputs the EVAL_QUERY in the format of the EVAL_OUTPUT_TUPLE 
* **Export Model Function** - Exports the BQML model to GCS bucket

In [14]:
import kfp.dsl as dsl
from typing import NamedTuple
import json
import os
from typing import NamedTuple

from kfp.v2 import compiler
from kfp.v2.dsl import (Artifact, Dataset, Input, InputPath, Model, Output,
                        OutputPath, ClassificationMetrics, Metrics, component)
from kfp.v2.google.client import AIPlatformClient
from google.cloud import aiplatform
from google_cloud_pipeline_components import aiplatform as gcc_aip

def run_bigquery(project_id: str, query_string: str, location: str) -> NamedTuple(
    'DDLOutput', [('created_asset', str), ('query', str)]):
    """
    Runs BigQuery query and returns a table/model name
    """
    print(query_string)
        
    from google.cloud import bigquery
    from google.api_core.future import polling
    from google.cloud import bigquery
    from google.cloud.bigquery import retry as bq_retry
    
    bqclient = bigquery.Client(project=project_id, location=location)
    job = bqclient.query(query_string, retry=bq_retry.DEFAULT_RETRY)
    job._retry = polling.DEFAULT_RETRY
    
    while job.running():
        from time import sleep
        sleep(0.1)
        print('Running ...')
        
    tblname = job.ddl_target_table
    tblname = '{}.{}'.format(tblname.dataset_id, tblname.table_id)
    print('{} created in {}'.format(tblname, job.ended - job.started))
    
    from collections import namedtuple
    result_tuple = namedtuple('DDLOutput', ['created_asset', 'query'])
    return result_tuple(tblname, query_string)

def train_bq_model(ddlop, project_id, model_query):
    query = model_query
    print(query)
    return ddlop(project_id, query, 'US')

def evaluate(project_id: str, model_name: str, eval_query: str, target_metrics: str) -> NamedTuple("Outputs", EVAL_OUTPUT_TUPLE):
    query = eval_query
    print(query)
    from google.cloud import bigquery
    bqclient = bigquery.Client(project=project_id, location='US')
    results = bqclient.query(query).result().to_dataframe()
    
    target_metrics= list(target_metrics.split(",")) # converts target metrics back to list
    keyresults= results[target_metrics]
    return list(keyresults.itertuples(name='Outputs', index=False))[0]

def export_bqml_model(project_id:str, model:str, bucket_name:str) -> NamedTuple('ModelExport', [('model_name', str),('destination', str)]):
    import subprocess
    import shutil
    #bq extract -m {PROJECT_ID}:{DATASET_NAME}.{MODEL_NAME} gs://{BUCKET_NAME}/{MODEL_NAME}
    model_name = '{}:{}'.format(project_id, model)
    destination = 'gs://{}/{}'.format(bucket_name, model)
    print (model_name)

    subprocess.run(
        (
            shutil.which("bq"),
            "extract",
            "--project_id=" + project_id,
            "-m",
            model_name,
            destination
        ),
        stderr=subprocess.PIPE,
        check=True)
    return (model, destination)

## Create Pipeline

In [18]:
import kfp.v2.dsl as dsl
import kfp.components as comp
import time

@dsl.pipeline(
    name="bq-vizier-xai-pipeline",
    description='training pipeline for BQML model using Vizier HyperParameter tuning for optimal model selection')

def training_pipeline(
    project_id: str = PROJECT_ID,
    model_query: str= MODEL_QUERY,
    eval_query: str= EVAL_QUERY,
    bucket_name: str= BUCKET_NAME,
    model_dispay_name: str = f'{MODEL_NAME}_vai',
    eval_threshold_metric: str = EVAL_THRESHOLD[0],
    eval_threshold_val: float = EVAL_THRESHOLD[1]):
    
    ddlop = comp.func_to_container_op(run_bigquery, packages_to_install=['google-cloud-bigquery'])
        
    #Create Model
    bq_model_output = train_bq_model(ddlop, project_id, model_query).set_display_name('create BQ model')
    bq_model_output.execution_options.caching_strategy.max_cache_staleness = 'P0D'
    bq_model_dest = bq_model_output.outputs['created_asset']
    #bq_model_dest = "taxi.hp_xai_taxi_tip_model"
    
    #Evaluate Model
    evalop = comp.func_to_container_op(evaluate, packages_to_install=['google-cloud-bigquery', 'pandas','pyarrow','db-dtypes'])
    target_metrics=[i[0] for i in EVAL_OUTPUT_TUPLE] # extracts key metrics as list
    target_metrics = ','.join(map(str, target_metrics)) # converts key metrics to string for kfp component to ingest
    error = evalop(PROJECT_ID, bq_model_dest, eval_query, target_metrics)
    
    with dsl.Condition(error.outputs["r2_score"] > eval_threshold_val, name="deploy_decision"):
        #Export Model
        export_bqml_model_op = comp.func_to_container_op(export_bqml_model, base_image=BASE_IMAGE, output_component_file='export_bqml.yaml')   
        export_destination_output = export_bqml_model_op(project_id, bq_model_dest, bucket_name).set_display_name('export BQ model')
        export_destination_output.execution_options.caching_strategy.max_cache_staleness = 'P0D' 
        export_destination = export_destination_output.outputs['destination']

        #Upload Model 
        model_upload_op = gcc_aip.ModelUploadOp(
            project=project_id,
            display_name=model_dispay_name,
            artifact_uri=export_destination,
            serving_container_image_uri='us-docker.pkg.dev/vertex-ai/prediction/tf2-cpu.2-3:latest',
        )
        #Create End Point
        endpoint_create_op = gcc_aip.EndpointCreateOp(
            project=project_id,
            display_name="pipelines-created-endpoint-lr",
        )        
        #Deploy Model
        model_deploy_op = gcc_aip.ModelDeployOp(
            endpoint=endpoint_create_op.outputs["endpoint"],            
            model=model_upload_op.outputs["model"],
            deployed_model_display_name=model_dispay_name,
            dedicated_resources_machine_type="n1-standard-16",
            dedicated_resources_min_replica_count=1,
            dedicated_resources_max_replica_count=1)
            

In [19]:
import kfp.v2 as kfp
from kfp.v2 import compiler

pipeline_func = training_pipeline
compiler.Compiler().compile(pipeline_func=pipeline_func, 
                            package_path='bq_pipeline_job_lr.json')

{{pipelineparam:op=;name=model_query}}


## Run Pipeline
This will launch the pipeline to run in Vertex AI Pipelines. It will take ~30min to complete the training. The Training of the BQML model  can be seen in the BQ console, selecting the training view of the model will illustrate each itteration of the model and the relevant metrics. 

In [20]:
from kfp.v2.google.client import AIPlatformClient

api_client = AIPlatformClient(project_id=PROJECT_ID, region=REGION)

response = api_client.create_run_from_job_spec(
    job_spec_path='bq_pipeline_job_lr.json', 
    enable_caching=False,
    pipeline_root=PIPELINE_ROOT 
)

In [None]:
# BQ console link to model
f'https://console.cloud.google.com/bigquery?p={PROJECT_ID}&d={BQ_DATASET_NAME}&page=model&m={MODEL_NAME}'

## Explainability AI 
As we included the option `enable_global_explain` in the model creation statement we will be able to run analysis of the explainability of the model using a simple QUERY. We use`ML.GLOBAL_EXPLAIN` with a linear_reg model. See [Documentation](https://cloud.google.com/bigquery-ml/docs/reference/standard-sql/bigqueryml-syntax-xai-overview#explainable_ai_offerings_in_bigquery_ml) for other model type syntax 

In [None]:
from google.cloud import bigquery

EXPLAIN_QUERY=f'SELECT * FROM ML.GLOBAL_EXPLAIN(MODEL {BQ_DATASET_NAME}.{MODEL_NAME})'

client = bigquery.Client()
query_job = client.query(EXPLAIN_QUERY)
print(query_job.result().to_dataframe())

## Predict at EndPoint

In [None]:
%%writefile taxi-pred.json
{"instances":[{
    "vendor_id": "2",
    "pickup_datetime": "2018-05-15T11:15:12",
    "dropoff_datetime": "2018-05-15T12:13:40",
    "passenger_count": 1,
    "trip_distance": 11.1,
    "rate_code": "1",
    "store_and_fwd_flag": "N",
    "payment_type": "1",
    "fare_amount": 44,
    "extra": 0,
    "mta_tax": 0.5,
    "tolls_amount": 0,
    "imp_surcharge": 0.3,
    "total_amount": 53.76,
    "pickup_location_id": "138",
    "dropoff_location_id": "68"}]}

In [None]:
ENDPOINT_ID="3222589416174256128"
INPUT_DATA_FILE="taxi-pred.json"

In [None]:
!curl \
-X POST \
-H "Authorization: Bearer $(gcloud auth print-access-token)" \
-H "Content-Type: application/json" \
https://us-central1-aiplatform.googleapis.com/v1/projects/$PROJECT_ID/locations/us-central1/endpoints/$ENDPOINT_ID:predict \
-d "@taxi-pred.json"

### Predict at BQ for Explainability

In [None]:
from google.cloud import bigquery

Predict_explain_query=f""""""

client = bigquery.Client()
query_job = client.query(EXPLAIN_QUERY)
print(query_job.result().to_dataframe())