# 03c - BQML + Vertex AI > Pipelines - automated pipelines for updating models

As time goes on change occurs:
- inputs to our models may shift in distribution compared to when the model was trained - called training-serving skew
- inputs to out models may shift over time - called prediction drift
- new inputs/features may become available
- a better model may be created

In the `03b` notebook we deployed the model built with BQML in the `03a` notebook to a Vertex AI Endpoint for online prediction.  In this notebook we will build a challenger model with the same training data, also using BQML but with a different model type - a deep neural network similar what we build in the `05` series of ntoebooks.  We will construct a Vertex AI Pipeline to orchestrate the process of building the new model, comparing to the deployed mode, and conditionally replacing the deployed model with the new one.  

This process could be triggered based on time elapsed, amount of new data, detected training-serving skew or even prediction drift by using Vertex AI Monitoring.  

### Video Walkthrough of this notebook:
Includes conversational walkthrough and more explanatory information than the notebook:

<p><center><a href="https://youtu.be/kzDd94KucBQ" target="_blank" rel="noopener noreferrer"><img src="architectures/thumbnails/playbutton/03c.png" width="50%"></a></center></p>

### Prerequisites:
-  03a - BigQuery Machine Learning (BQML) - Machine Learning with SQL
-  03b - Vertex AI + BQML - Online Predictions with BQML Models

### Overview:
- Build Custom Pipeline Components
    - Use BigQuery ML to Get Predictions and Scikit-Learn to calculate model metrics
    - Use BigQuery ML to train a new model - A Deep Neural Network
    - Compare model metrics for baseline and challenger model
    - Export BigQuery ML model to Google Cloud Storage
    - Replace a model deployed to an endpoint (`03b`) with the challenger model, undeploy previous model
- Define the Pipeline Flow
- Compile the Pipeline
- Run the Pipeline in Vertex AI
- Get Predictions from the upated Endpoint

### Resources:
-  [Export formats for BigQuery ML models](https://cloud.google.com/bigquery-ml/docs/exporting-models)
-  [Python Client for Vertex AI](https://googleapis.dev/python/aiplatform/latest/aiplatform.html)
-  Codelab: [Vertex AI Pipelines Introduction](https://codelabs.developers.google.com/vertex-mlmd-pipelines#0)
-  [Vertex AI Prebuilt KFP Components](https://google-cloud-pipeline-components.readthedocs.io/en/google-cloud-pipeline-components-0.2.0/)


---
## Vertex AI - Conceptual Flow

<img src="architectures/slides/03c_arch.png">

---
## Vertex AI - Workflow

<img src="architectures/slides/03c_console.png">

---
## Setup

inputs:

In [1]:
REGION = 'us-central1'
PROJECT_ID = 'ma-mx-presales-lab'
DATANAME = 'fraud'
NOTEBOOK = '03c'

# Resources
DEPLOY_IMAGE='us-docker.pkg.dev/vertex-ai/prediction/tf2-cpu.2-3:latest'
DEPLOY_COMPUTE = 'n1-standard-4'

# Model Training
VAR_TARGET = 'Class'
VAR_OMIT = 'transaction_id' # add more variables to the string with space delimiters

packages:

In [2]:
from google.cloud import aiplatform
from datetime import datetime
from typing import NamedTuple
import kfp # used for dsl.pipeline
import kfp.v2.dsl as dsl # used for dsl.component, dsl.Output, dsl.Input, dsl.Artifact, dsl.Model, ...
from google_cloud_pipeline_components import aiplatform as gcc_aip

from google.cloud import bigquery
from google.protobuf import json_format
from google.protobuf.struct_pb2 import Value
import json
import numpy as np

clients:

In [3]:
aiplatform.init(project=PROJECT_ID, location=REGION)
bigquery = bigquery.Client()

parameters:

In [4]:
TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")
BUCKET = "vertex-ai-mlops-bucket"
URI = f"gs://{BUCKET}/{DATANAME}/models/{NOTEBOOK}"
DIR = f"temp/{NOTEBOOK}"

In [5]:
# Give service account roles/storage.objectAdmin permissions
# Console > IMA > Select Account <projectnumber>-compute@developer.gserviceaccount.com > edit - give role
SERVICE_ACCOUNT = !gcloud config list --format='value(core.account)' 
SERVICE_ACCOUNT = SERVICE_ACCOUNT[0]
SERVICE_ACCOUNT

'825075454589-compute@developer.gserviceaccount.com'

environment:

In [6]:
!rm -rf {DIR}
!mkdir -p {DIR}

---
## Custom Components (KFP)

Vertex AI Pipelines are made up of components that run independently with inputs and outputs that connect to form a graph - the pipeline.  For this notebook workflow the following custom components are used to orchestrate the training of a challenger model, evaluating the challenger and an existing model, comparing them based on model metrics, if the challenger is better then replace the model already deployed on an existing endpoint.  These custom components are constructed as python functions!

### Model Metrics
- Get Predictions for Test data from BigQuery Model
- Calculate [average precision for the precision-recall curve](https://scikit-learn.org/stable/modules/generated/sklearn.metrics.average_precision_score.html#sklearn.metrics.average_precision_score)

In [7]:
@dsl.component(
    base_image = 'python:3.9',
    packages_to_install = ['pandas','pyarrow','sklearn','google-cloud-bigquery']
)
def bqml_eval(
    project: str,
    var_target: str,
    model: str,
    dataname: str,
    metrics: dsl.Output[dsl.Metrics],
    metricsc: dsl.Output[dsl.ClassificationMetrics]
) -> NamedTuple("model_eval", [("metric", float)]):

    from collections import namedtuple
    from sklearn.metrics import average_precision_score, confusion_matrix
    from google.cloud import bigquery
    bigquery = bigquery.Client(project = project)

    query = f"""
    SELECT {var_target}, predicted_{var_target}, prob, splits 
    FROM ML.PREDICT (MODEL `{project}.{dataname}.{model}`,(
        SELECT *
        FROM `{project}.{dataname}.{dataname}_prepped`
        WHERE splits = 'TEST')
      ), UNNEST(predicted_{var_target}_probs)
    WHERE label=1
    """
    pred = bigquery.query(query = query).to_dataframe()

    auPRC = average_precision_score(pred[var_target], pred['prob'], average='micro')    
    metrics.log_metric('auPRC', auPRC)
    metricsc.log_confusion_matrix(['Not Fraud', 'Fraud'], confusion_matrix(pred[var_target], pred[f'predicted_{var_target}']).tolist())
    
    model_eval = namedtuple("model_eval", ["metric"])
    return model_eval(metric = float(auPRC))

### BigQuery - Train DNN

In [8]:
@dsl.component(
    base_image = 'python:3.9',
    packages_to_install = ['google-cloud-bigquery']
)
def bqml_dnn(
    project: str,
    var_target: str,
    var_omit: str,
    model: str,
    dataname: str,
    bqml_model: dsl.Output[dsl.Artifact]
) -> NamedTuple("bqml_training", [("query", str)]):
    
    from collections import namedtuple
    from google.cloud import bigquery
    bigquery = bigquery.Client(project = project)
    
    query = f"""
    CREATE OR REPLACE MODEL `{project}.{dataname}.{model}`
    OPTIONS
        (model_type = 'DNN_CLASSIFIER',
            auto_class_weights = FALSE,
            input_label_cols = ['{var_target}'],
            data_split_col = 'custom_splits',
            data_split_method = 'CUSTOM',
            EARLY_STOP = FALSE,
            OPTIMIZER = 'SGD',
            HIDDEN_UNITS = [2],
            LEARN_RATE = 0.001,
            BATCH_SIZE = 100,
            DROPOUT = 0.25,
            ACTIVATION_FN = 'SIGMOID',
            MAX_ITERATIONS = 10
        ) AS
    SELECT * EXCEPT({','.join(var_omit.split())}, splits),
        CASE
            WHEN splits = 'TRAIN' THEN FALSE
            ELSE TRUE
        END AS custom_splits
    FROM `{project}.{dataname}.{dataname}_prepped`
    WHERE splits != 'TEST'
    """
    job = bigquery.query(query = query)
    job.result()
    bqml_model.uri = f"bq://{project}.{dataname}.{model}"
    
    result = namedtuple("bqml_training", ["query"])
                
    return result(query = str(query))

### Compare Models

In [9]:
@dsl.component
def model_compare(
    base_metric: float,
    challenger_metric: float,
) -> bool: 
    
    if base_metric < challenger_metric:
        replace = True
    else:
        replace = False
    
    return replace

### Export BQML Model

In [10]:
@dsl.component(
    base_image = 'python:3.9',
    packages_to_install = ['google-cloud-bigquery']
)
def bqml_export(
    project: str,
    export_location: str,
    bqml_model: dsl.Input[dsl.Model],
    tf_model: dsl.Output[dsl.Artifact],  
):
    
    from google.cloud import bigquery
    bigquery = bigquery.Client(project = project)
    
    bqml_model_name = bqml_model.uri.split("/")[-1]
    export = bigquery.query(query = f"EXPORT MODEL `{bqml_model_name}` OPTIONS(URI = '{export_location}')")
    export.result()
    
    tf_model.uri = export_location

### Replace Model On Endpoint

In [11]:
@dsl.component(
    packages_to_install = ['google-cloud-aiplatform'],
    base_image = 'python:3.9'
)
def endpoint_update(
    project: str,
    region: str,
    endpoint_prefix: str,
    newmodel: dsl.Input[dsl.Model],
    display_name: str,
    deploy_machine: str,
    deploy_container: str,
    label: str
):
    
    from google.cloud import aiplatform
    aiplatform.init(project = project, location = region)

    # upload new model (03c)
    model = aiplatform.Model.upload(
        display_name = display_name,
        serving_container_image_uri = deploy_container,
        artifact_uri = newmodel.uri,
        labels = {'notebook':f'{label}'}
    )
    
    # find endpoint from notebook 03b
    for e in aiplatform.Endpoint.list():
        if e.display_name.startswith(endpoint_prefix): endpoint = e
    print(endpoint.display_name)

    # list model(s) on 03b endpoint
    models = endpoint.list_models()
    if len(models) == 1:
        oldmodel = models[0]
    print(oldmodel)
    
    # deploy 03c model to endpoint with traffic_split = 100
    endpoint.deploy(
        model = model,
        deployed_model_display_name = display_name,
        traffic_percentage = 100,
        machine_type = deploy_machine,
        min_replica_count = 1,
        max_replica_count = 1
    )
    
    # undeploy 03b model
    endpoint.undeploy(
        deployed_model_id = oldmodel.id
    )

---
## Pipeline (KFP) Definition

In [25]:
@dsl.pipeline(
    name = f'kfp-{NOTEBOOK}-{DATANAME}-{TIMESTAMP}', 
    pipeline_root = URI+'/'+str(TIMESTAMP)+'/kfp/'
)
def pipeline(
    endpoint_prefix: str,
    project: str = PROJECT_ID,
    region: str = REGION,
    dataname: str = DATANAME,
    display_name: str = f'{NOTEBOOK}_{DATANAME}_{TIMESTAMP}',
    deploy_machine: str = DEPLOY_COMPUTE,
    deploy_container: str = DEPLOY_IMAGE,
    bq_source: str = f'bq://{PROJECT_ID}.{DATANAME}.{DATANAME}_prepped',
    var_target: str = VAR_TARGET,
    var_omit: str = VAR_OMIT,
    uri: str = URI,
    label: str = NOTEBOOK 
):
        
    # get AUC for current model
    base_model_eval = bqml_eval(
        project = project,
        var_target = var_target,
        model = f'{dataname}_lr',
        dataname = dataname
    )
    
    # train challenger model with BQML
    challenger_model = bqml_dnn(
        project = project,
        var_target = var_target,
        var_omit = var_omit,
        model = f'{dataname}_dnn',
        dataname = dataname,
    )
    
    # get AUC for challenger model
    challenger_model_eval = bqml_eval(
        project = project,
        var_target = var_target,
        model = f'{dataname}_dnn',
        dataname = dataname
    )
    challenger_model_eval.after(challenger_model).execution_options.caching_strategy.max_cache_staleness = "P0D"
    
    # compare models
    compare = model_compare(
        base_metric = base_model_eval.outputs["metric"],
        challenger_metric = challenger_model_eval.outputs["metric"]
    )
    
    # conditional deployment
    with dsl.Condition(
        compare.output == 'false',  # true      # Modificado para que jale...
        name = "replace_model"
    ):
        # export BQML model
        export = bqml_export(
            project = project,
            export_location = uri,
            bqml_model = challenger_model.outputs["bqml_model"]
        )
        
        # replace model on endpoint (03b)
        replace = endpoint_update(
            project = project,
            region = region,
            endpoint_prefix = endpoint_prefix,
            newmodel = export.outputs["tf_model"],
            display_name = display_name,
            deploy_machine = deploy_machine,
            deploy_container = deploy_container,
            label = label
        )

---
## Compile Pipeline

In [26]:
kfp.v2.compiler.Compiler().compile(
    pipeline_func = pipeline,
    package_path = f"{DIR}/{NOTEBOOK}.json"
)



Move compiled pipeline files to GCS Bucket

In [27]:
!gsutil cp {DIR}/{NOTEBOOK}.json {URI}/{TIMESTAMP}/kfp/

Copying file://temp/03c/03c.json [Content-Type=application/json]...
/ [1 files][ 31.0 KiB/ 31.0 KiB]                                                
Operation completed over 1 objects/31.0 KiB.                                     


---
## Create Vertex AI Pipeline Job

In [28]:
pipeline = aiplatform.PipelineJob(
    display_name = f'{NOTEBOOK}_{DATANAME}_{TIMESTAMP}',
    template_path = f"{URI}/{TIMESTAMP}/kfp/{NOTEBOOK}.json",
    pipeline_root = f"{URI}/{TIMESTAMP}/kfp/",
    parameter_values = {"endpoint_prefix": "03b"},
    enable_caching = False,
    labels = {'notebook':f'{NOTEBOOK}'}
)

In [29]:
response = pipeline.run(service_account = SERVICE_ACCOUNT)

INFO:google.cloud.aiplatform.pipeline_jobs:Creating PipelineJob
INFO:google.cloud.aiplatform.pipeline_jobs:PipelineJob created. Resource name: projects/825075454589/locations/us-central1/pipelineJobs/kfp-03c-fraud-20220330182342-20220330185900
INFO:google.cloud.aiplatform.pipeline_jobs:To use this PipelineJob in another session:
INFO:google.cloud.aiplatform.pipeline_jobs:pipeline_job = aiplatform.PipelineJob.get('projects/825075454589/locations/us-central1/pipelineJobs/kfp-03c-fraud-20220330182342-20220330185900')
INFO:google.cloud.aiplatform.pipeline_jobs:View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/kfp-03c-fraud-20220330182342-20220330185900?project=825075454589
INFO:google.cloud.aiplatform.pipeline_jobs:PipelineJob projects/825075454589/locations/us-central1/pipelineJobs/kfp-03c-fraud-20220330182342-20220330185900 current state:
PipelineState.PIPELINE_STATE_RUNNING
INFO:google.cloud.aiplatform.pipeline_jobs:PipelineJob projects/8

In [30]:
aiplatform.get_pipeline_df(pipeline = f'kfp-{NOTEBOOK}-{DATANAME}-{TIMESTAMP}')

Unnamed: 0,pipeline_name,run_name,param.input:var_omit,param.input:bq_source,param.input:deploy_container,param.input:var_target,param.input:project,param.input:endpoint_prefix,param.input:display_name,param.input:region,param.input:uri,param.input:deploy_machine,param.input:dataname,param.input:label,metric.auPRC,metric.confusionMatrix
0,kfp-03c-fraud-20220330182342,kfp-03c-fraud-20220330182342-20220330185900,transaction_id,bq://ma-mx-presales-lab.fraud.fraud_prepped,us-docker.pkg.dev/vertex-ai/prediction/tf2-cpu...,Class,ma-mx-presales-lab,03b,03c_fraud_20220330182342,us-central1,gs://vertex-ai-mlops-bucket/fraud/models/03c,n1-standard-4,fraud,03c,0.747496,{'annotationSpecs': [{'displayName': 'Not Frau...
1,kfp-03c-fraud-20220330182342,kfp-03c-fraud-20220330182342-20220330182348,transaction_id,bq://ma-mx-presales-lab.fraud.fraud_prepped,us-docker.pkg.dev/vertex-ai/prediction/tf2-cpu...,Class,ma-mx-presales-lab,03b,03c_fraud_20220330182342,us-central1,gs://vertex-ai-mlops-bucket/fraud/models/03c,n1-standard-4,fraud,03c,0.747496,{'annotationSpecs': [{'displayName': 'Not Frau...


## Review Pipeline Run

<img src="architectures/notebooks/03c_screenshots/kfp_pipeline.png">

---
## Prediction

### Prepare a record for prediction: instance and parameters lists

In [31]:
pred = bigquery.query(query = f"SELECT * FROM {DATANAME}.{DATANAME}_prepped WHERE splits='TEST' LIMIT 10").to_dataframe()

In [32]:
pred.head(4)

Unnamed: 0,Time,V1,V2,V3,V4,V5,V6,V7,V8,V9,...,V23,V24,V25,V26,V27,V28,Amount,Class,transaction_id,splits
0,6126,-0.130702,1.026568,2.692793,2.659141,0.764928,1.80033,0.317986,-0.392976,0.85146,...,-0.370958,-1.058635,-0.498915,0.054906,-0.590204,-0.537678,0.0,0,70526fff-bdf3-459f-bbc6-a18e8133bb35,TEST
1,32799,1.153477,-0.047859,1.358363,1.48062,-1.222598,-0.48169,-0.654461,0.128115,0.907095,...,-0.025964,0.701843,0.417245,-0.257691,0.060115,0.035332,0.0,0,e8b3c609-62a9-4ec3-9ff8-fd0974b07602,TEST
2,35599,1.168909,-0.139981,-0.095518,1.735426,1.699903,4.646212,-1.191502,1.2057,0.333882,...,-0.163863,1.045593,0.775343,0.228745,0.050846,0.020019,0.0,0,c5ac2c63-c26b-492e-bc8e-5f98e92ec866,TEST
3,49561,1.333331,-0.845997,1.161578,-0.610965,-1.635783,-0.198304,-1.331531,0.212857,-0.208834,...,-0.095332,0.367347,0.398274,-0.05981,0.041674,0.011653,0.0,0,aca44b9b-e816-4767-b636-0356637c286b,TEST


In [33]:
newob = pred[pred.columns[~pred.columns.isin(VAR_OMIT.split()+[VAR_TARGET, 'splits'])]].to_dict(orient='records')[0]
newob

{'Time': 6126,
 'V1': -0.130701942091627,
 'V2': 1.02656791747592,
 'V3': 2.6927927876854105,
 'V4': 2.6591407914726,
 'V5': 0.764928419239713,
 'V6': 1.80033037792473,
 'V7': 0.317985938878672,
 'V8': -0.39297551350890597,
 'V9': 0.8514596679861879,
 'V10': 1.1235743576742099,
 'V11': 0.912208762333391,
 'V12': -2.2333240196845696,
 'V13': 2.83347629334728,
 'V14': 0.260704557164804,
 'V15': -0.9541492143642322,
 'V16': 0.113724257119746,
 'V17': -0.187482921734084,
 'V18': 1.16909174898634,
 'V19': 1.5565622630036002,
 'V20': 0.37593560285606703,
 'V21': -0.26452459186183397,
 'V22': 0.116818154455445,
 'V23': -0.370958347793089,
 'V24': -1.05863543925301,
 'V25': -0.49891473125213603,
 'V26': 0.0549064105967339,
 'V27': -0.5902038164690239,
 'V28': -0.537678424176011,
 'Amount': 0.0}

### Get Predictions: Python Client
This model does not have the default signature name expected by Vertex AI endpoint of "serving_default".  To get arround this we prepare the json input to include the correct signature name and use the [raw_predict method with the PredictionService documented here](https://googleapis.dev/python/aiplatform/latest/aiplatform_v1/prediction_service.html#google.cloud.aiplatform_v1.services.prediction_service.PredictionServiceClient.raw_predict).

In [34]:
for e in aiplatform.Endpoint.list():
    if e.display_name.startswith('03b'): endpoint = e
print(endpoint.display_name)
print(endpoint.resource_name)

03b_fraud_20220330162620
projects/825075454589/locations/us-central1/endpoints/5318865509133844480


In [35]:
client_options = {"api_endpoint": f"{REGION}-aiplatform.googleapis.com"}
predictor = aiplatform.gapic.PredictionServiceClient(client_options = client_options)

In [36]:
from google.api import httpbody_pb2

instances = {"instances": [newob], "signature_name": "predict"}
http_body = httpbody_pb2.HttpBody(data = json.dumps(instances).encode("utf-8"), content_type = "application/json")

In [37]:
pred = predictor.raw_predict(endpoint = endpoint.resource_name, http_body = http_body)

In [38]:
json.loads(pred.data)

{'predictions': [{'class_ids': [1],
   'classes': ['0'],
   'all_class_ids': [0, 1],
   'all_classes': ['1', '0'],
   'logistic': [0.999893188],
   'probabilities': [0.000106808191, 0.999893188],
   'logits': [9.14436913]}]}

### Get Predictions: REST
This model does not have the default signature name expected by Vertex AI endpoint of "serving_default".  To get arround this we prepare the json input to include the correct signature name and use the [raw predict method documented here](https://cloud.google.com/vertex-ai/docs/reference/rest/v1/projects.locations.endpoints/rawPredict).

In [39]:
with open(f'{DIR}/request.json','w') as file:
    file.write(json.dumps({"signature_name": "predict", "instances": [newob]}))

In [40]:
!curl -X POST \
-H "Authorization: Bearer "$(gcloud auth application-default print-access-token) \
-H "Content-Type: application/json; charset=utf-8" \
-d @{DIR}/request.json \
https://{REGION}-aiplatform.googleapis.com/v1/{endpoint.resource_name}:rawPredict

{
    "predictions": [
        {
            "all_class_ids": [0, 1],
            "all_classes": ["1", "0"],
            "logistic": [0.999893188],
            "probabilities": [0.000106808191, 0.999893188],
            "logits": [9.14436913],
            "class_ids": [1],
            "classes": ["0"]
        }
    ]
}

### Get Predictions: gcloud (CLI)
This model does not have the default signature name expected by Vertex AI endpoint of "serving_default".  To get arround this we prepare the json input to include the correct signature name and use the [raw predict method documented here](https://cloud.google.com/sdk/gcloud/reference/ai/endpoints/raw-predict).

In [41]:
with open(f'{DIR}/request.json','w') as file:
    file.write(json.dumps({"signature_name": "predict", "instances": [newob]}))

In [42]:
!gcloud ai endpoints raw-predict {endpoint.name.rsplit('/',1)[-1]} --region={REGION} --request=@{DIR}/request.json

Using endpoint [https://us-central1-aiplatform.googleapis.com/]
{
    "predictions": [
        {
            "classes": ["0"],
            "all_class_ids": [0, 1],
            "all_classes": ["1", "0"],
            "logistic": [0.999893188],
            "probabilities": [0.000106808191, 0.999893188],
            "logits": [9.14436913],
            "class_ids": [1]
        }
    ]
}

---
## Remove Resources
see notebook "99 - Cleanup"