# VertexAI AutoML with retraining pipeline 
This notebook creates a MLOps pipeline that takes data from a BQ table, re-trains a vertexAI AutoML Tabluar model and deploys it conditionally to the same endpoint that already exists. 

### INSTALLATIONS

In [None]:
USER_FLAG = "--user"
!pip3 install {USER_FLAG} google-cloud-aiplatform==1.7.0 --upgrade
!pip3 install {USER_FLAG} kfp==1.8.9 google-cloud-pipeline-components==0.2.0

In [None]:
import os

if not os.getenv("IS_TESTING"):
    # Automatically restart kernel after installs
    import IPython

    app = IPython.Application.instance()
    app.kernel.do_shutdown(True)

Check that you have correctly installed the packages. The KFP SDK version should be >=1.8:

In [None]:
!python3 -c "import kfp; print('KFP SDK version: {}'.format(kfp.__version__))"
!python3 -c "import google_cloud_pipeline_components; print('google_cloud_pipeline_components version: {}'.format(google_cloud_pipeline_components.__version__))"

### CREATE BQ DATASET (IF REQUIRED)
Creates BQ Dataset called `beans`

In [None]:
!bq --location=us-central1 mk -d \
--description "Beans Dataset for AUTOML" \
beans

Loads data from CSV to new table called `beans_data`

In [None]:
!bq load \
  --source_format=CSV \
  --autodetect \
  beans.beans_data \
  data/beans_data.csv

In [None]:
BQ_SOURCE=f"bq://{PROJECT_ID}.beans.beans_data"
BQ_SOURCE

### SET ENVIRONMENT VARIABLES

In [1]:
import time

PROJECT_ID = "marinadel"
BUCKET_NAME="gs://marinadel-bucket"
BQ_SOURCE="bq://marinadel.beans.beans_data"  ##bq://{PROJECT_ID}.{DATASET}.{TABLE} Continue to next step if you havent set one up 
BUILD_NAME = 'automl-beans' 
DISPLAY_NAME = 'automl-beans-{}'.format(str(int(time.time())))  ## timestamped name for run 

REGION="us-central1"
PIPELINE_ROOT = f"{BUCKET_NAME}/pipeline_root/"

### DEFINE PIPELINE

In [2]:
import kfp

from kfp.v2 import compiler, dsl
from kfp.v2.dsl import component, pipeline, Artifact, ClassificationMetrics, Input, Output, Model, Metrics

from google.cloud import aiplatform
from google_cloud_pipeline_components import aiplatform as gcc_aip
from typing import NamedTuple

In [3]:
@component(
    base_image="gcr.io/deeplearning-platform-release/tf2-cpu.2-3:latest",
    output_component_file="tabular_eval_component.yaml",
    packages_to_install=["google-cloud-aiplatform"],
)



def classification_model_eval_metrics(
    project: str,
    location: str,  # "us-central1",
    api_endpoint: str,  # "us-central1-aiplatform.googleapis.com",
    thresholds_dict_str: str,
    model: Input[Artifact],
    metrics: Output[Metrics],
    metricsc: Output[ClassificationMetrics],
) -> NamedTuple("Outputs", [("dep_decision", str)]):  # Return parameter.

    import json
    import logging

    from google.cloud import aiplatform as aip

    # Fetch model eval info
    def get_eval_info(client, model_name):
        from google.protobuf.json_format import MessageToDict

        response = client.list_model_evaluations(parent=model_name)
        metrics_list = []
        metrics_string_list = []
        for evaluation in response:
            print("model_evaluation")
            print(" name:", evaluation.name)
            print(" metrics_schema_uri:", evaluation.metrics_schema_uri)
            metrics = MessageToDict(evaluation._pb.metrics)
            for metric in metrics.keys():
                logging.info("metric: %s, value: %s", metric, metrics[metric])
            metrics_str = json.dumps(metrics)
            metrics_list.append(metrics)
            metrics_string_list.append(metrics_str)

        return (
            evaluation.name,
            metrics_list,
            metrics_string_list,
        )

    # Use the given metrics threshold(s) to determine whether the model is
    # accurate enough to deploy.
    def classification_thresholds_check(metrics_dict, thresholds_dict):
        for k, v in thresholds_dict.items():
            logging.info("k {}, v {}".format(k, v))
            if k in ["auRoc", "auPrc"]:  # higher is better
                if metrics_dict[k] < v:  # if under threshold, don't deploy
                    logging.info("{} < {}; returning False".format(metrics_dict[k], v))
                    return False
        logging.info("threshold checks passed.")
        return True

    def log_metrics(metrics_list, metricsc):
        test_confusion_matrix = metrics_list[0]["confusionMatrix"]
        logging.info("rows: %s", test_confusion_matrix["rows"])

        # log the ROC curve
        fpr = []
        tpr = []
        thresholds = []
        for item in metrics_list[0]["confidenceMetrics"]:
            fpr.append(item.get("falsePositiveRate", 0.0))
            tpr.append(item.get("recall", 0.0))
            thresholds.append(item.get("confidenceThreshold", 0.0))
        print(f"fpr: {fpr}")
        print(f"tpr: {tpr}")
        print(f"thresholds: {thresholds}")
        metricsc.log_roc_curve(fpr, tpr, thresholds)

        # log the confusion matrix
        annotations = []
        for item in test_confusion_matrix["annotationSpecs"]:
            annotations.append(item["displayName"])
        logging.info("confusion matrix annotations: %s", annotations)
        metricsc.log_confusion_matrix(
            annotations,
            test_confusion_matrix["rows"],
        )

        # log textual metrics info as well
        for metric in metrics_list[0].keys():
            if metric != "confidenceMetrics":
                val_string = json.dumps(metrics_list[0][metric])
                metrics.log_metric(metric, val_string)
        # metrics.metadata["model_type"] = "AutoML Tabular classification"

    logging.getLogger().setLevel(logging.INFO)
    aip.init(project=project)
    # extract the model resource name from the input Model Artifact
    model_resource_path = model.metadata["resourceName"]
    logging.info("model path: %s", model_resource_path)

    client_options = {"api_endpoint": api_endpoint}
    # Initialize client that will be used to create and send requests.
    client = aip.gapic.ModelServiceClient(client_options=client_options)
    eval_name, metrics_list, metrics_str_list = get_eval_info(
        client, model_resource_path
    )
    logging.info("got evaluation name: %s", eval_name)
    logging.info("got metrics list: %s", metrics_list)
    log_metrics(metrics_list, metricsc)

    thresholds_dict = json.loads(thresholds_dict_str)
    deploy = classification_thresholds_check(metrics_list[0], thresholds_dict)
    if deploy:
        dep_decision = "true"
    else:
        dep_decision = "false"
    logging.info("deployment decision is %s", dep_decision)

    return (dep_decision,)

In [4]:
@component(
    base_image="gcr.io/ml-pipeline/google-cloud-pipeline-components:latest",
    output_component_file="replace_model_on_endpoint.yaml",
    packages_to_install=["google-cloud-aiplatform"],
)

# Creates an enpoint if none exist for this pipeline, else uses the old one 
def replace_model_on_endpoint(project: str, display_name: str, model_name: str, traffic_percentage: int ):
    
    import logging
    from google.cloud import aiplatform as aip
    from collections import namedtuple
    
    models= aip.Model.list(filter=f"display_name={model_name}", project = project)
    model = models[0] 
    print("model to deploy: ", models[0]._gca_resource.name)

    print(f"Looking for endpoints")
    endpoints = aip.Endpoint.list(filter=f"display_name={display_name}", project = project ) 

    if endpoints == []:
        print(f"No reusable endpoint found, creating new endpoint: {display_name}")
        endpoint = aip.Endpoint.create(f"{display_name}", project = project)
        print(f"endpoint_uri = {endpoint._gca_resource.name}")
        print(f"deploying model")
        model.deploy(endpoint= endpoint, 
                 machine_type = "n1-standard-4" , 
                 min_replica_count=1,
                 max_replica_count=1)
    else:
        endpoint = endpoints[0]
        print(f"Reusable endpoint found. Endpoint_id: {endpoint}")
        print(f"endpoint_uri = {endpoint._gca_resource.name}")
        print(f"deploying model with traffic_percentage= {traffic_percentage}")
        model.deploy(endpoint= endpoint,
                     traffic_percentage = traffic_percentage,
                     machine_type = "n1-standard-4" , 
                     min_replica_count=1,
                     max_replica_count=1) 


    print(f"model deployed")
    return 



In [5]:
@pipeline(name=f"{BUILD_NAME}-pipeline",
                  pipeline_root=PIPELINE_ROOT)
def pipeline(
    bq_source: str = BQ_SOURCE,
    display_name: str = DISPLAY_NAME,
    build_name: str = BUILD_NAME,
    project: str = PROJECT_ID,
    gcp_region: str = "us-central1",
    api_endpoint: str = "us-central1-aiplatform.googleapis.com",
    thresholds_dict_str: str = '{"auRoc": 0.95}',
    trafic_split: int = 50 
):
    dataset_create_op = gcc_aip.TabularDatasetCreateOp(
        project=project, display_name=display_name, bq_source=bq_source
    )

    training_op = gcc_aip.AutoMLTabularTrainingJobRunOp(
        project=project,
        display_name=display_name,
        optimization_prediction_type="classification",
        budget_milli_node_hours=100,
        column_transformations=[
            {"numeric": {"column_name": "Area"}},
            {"numeric": {"column_name": "Perimeter"}},
            {"numeric": {"column_name": "MajorAxisLength"}},
            {"numeric": {"column_name": "MinorAxisLength"}},
            {"numeric": {"column_name": "AspectRation"}},
            {"numeric": {"column_name": "Eccentricity"}},
            {"numeric": {"column_name": "ConvexArea"}},
            {"numeric": {"column_name": "EquivDiameter"}},
            {"numeric": {"column_name": "Extent"}},
            {"numeric": {"column_name": "Solidity"}},
            {"numeric": {"column_name": "roundness"}},
            {"numeric": {"column_name": "Compactness"}},
            {"numeric": {"column_name": "ShapeFactor1"}},
            {"numeric": {"column_name": "ShapeFactor2"}},
            {"numeric": {"column_name": "ShapeFactor3"}},
            {"numeric": {"column_name": "ShapeFactor4"}},
            {"categorical": {"column_name": "Class"}},
        ],
        dataset=dataset_create_op.outputs["dataset"],
        target_column="Class",
    )


    model_eval_task = classification_model_eval_metrics(
        project,
        gcp_region,
        api_endpoint,
        thresholds_dict_str,
        training_op.outputs["model"],
    )


    with dsl.Condition(
        model_eval_task.outputs["dep_decision"] == "true",
        name="deploy_decision",
    ):

        endpoint_op = replace_model_on_endpoint(project, f"{build_name}-endpoint",  
                                                display_name, trafic_split )
        

In [6]:
compiler.Compiler().compile(
    pipeline_func=pipeline, package_path="automl_rep_pipeline.json"
)



In [7]:
ml_pipeline_job = aiplatform.PipelineJob(
    display_name=f"{BUILD_NAME}-training",
    template_path="automl_rep_pipeline.json",
    pipeline_root=PIPELINE_ROOT,
    parameter_values={"project": PROJECT_ID, "display_name": DISPLAY_NAME},
    enable_caching=True
)

In [8]:
ml_pipeline_job.submit()

INFO:google.cloud.aiplatform.pipeline_jobs:Creating PipelineJob
INFO:google.cloud.aiplatform.pipeline_jobs:PipelineJob created. Resource name: projects/865822053282/locations/us-central1/pipelineJobs/automl-beans-pipeline-20220317211001
INFO:google.cloud.aiplatform.pipeline_jobs:To use this PipelineJob in another session:
INFO:google.cloud.aiplatform.pipeline_jobs:pipeline_job = aiplatform.PipelineJob.get('projects/865822053282/locations/us-central1/pipelineJobs/automl-beans-pipeline-20220317211001')
INFO:google.cloud.aiplatform.pipeline_jobs:View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/automl-beans-pipeline-20220317211001?project=865822053282
