# AutoML Pipeline

## Install the Package

```
Create a Dataset resource.
Train an AutoML tabular classification Model resource.
Create an Endpoint resource.
Deploys the Model resource to the Endpoint resource.
Compile the KFP pipeline.
Execute the KFP pipeline using Vertex AI Pipelines
```

In [None]:
! pip3 install --upgrade --quiet google-cloud-aiplatform \
                                 google-cloud-storage \
                                 kfp \
                                 google-cloud-pipeline-components

## Set the Project ID, Region and Auth

In [None]:
import os
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = '../../.json'

In [None]:
PROJECT_ID = '...'

In [None]:
REGION = "asia-southeast2"

In [None]:
BUCKET_URI = f"gs://{PROJECT_ID}-demo-dataset-3"

# ! gsutil mb -l {REGION} -p {PROJECT_ID} {BUCKET_URI}

In [None]:
from typing import NamedTuple

import kfp
from google.cloud import aiplatform
from kfp import compiler, dsl
from kfp.dsl import (Artifact, ClassificationMetrics, Input, Metrics, Output,
                     component)

In [None]:
PIPELINE_NAME = "automl-tabular-demo-dataset-3-training"
PIPELINE_ROOT = "{}/pipeline_root/demo-dataset-3".format(BUCKET_URI)

In [None]:
aiplatform.init(project=PROJECT_ID, location=REGION, staging_bucket=BUCKET_URI)

## Preprocess

In [None]:
@component(
    packages_to_install=["google-cloud-bigquery", "db-dtypes"]
)
def preprocess_dataset_in_bq(
    project: str,
    location: str, 
    dataset: str,
    table: str,
) -> NamedTuple("Outputs", [("preprocess_result", str)]):
    from google.cloud import bigquery
    client = bigquery.Client(project=project, location=location)
    QUERY = f'''CREATE OR REPLACE TABLE `{dataset}.{table}_preprocessed` AS
SELECT
    *,
    ML.LABEL_ENCODER(purpose, 7, 5) OVER () AS purpose_encoded,
    CASE 
      WHEN fico < 580 THEN 1
      WHEN fico <= 669 THEN 2
      WHEN fico <= 739 THEN 3
      WHEN fico <= 799 THEN 4
      ELSE 5
    END AS fico_encoded
FROM
    `{dataset}.{table}`;
    
ALTER TABLE `demo_dataset_3.loan_data_preprocessed` DROP COLUMN IF EXISTS purpose;
ALTER TABLE `demo_dataset_3.loan_data_preprocessed` DROP COLUMN IF EXISTS fico;
'''
    try:
        query_job = client.query(QUERY)
        query_job.result()
        preprocess_result = 'true'
    except Exception as e:
        print(e)
        preprocess_result = 'false'
    return (preprocess_result, )

compiler.Compiler().compile(
    preprocess_dataset_in_bq, "preprocess_dataset_in_bq.yaml"
)

In [None]:
@component(
    packages_to_install=["google-cloud-bigquery", "db-dtypes", "pandas", "pandas-gbq", "scikit-learn"]
)
def smote_dataset_store_in_bq(
    project: str,
    location: str, 
    dataset: str,
    table: str,
) -> NamedTuple("Outputs", [("preprocess_result", str)]):
    from google.cloud import bigquery
    from sklearn.utils import resample
    import pandas as pd
    client = bigquery.Client(project=project, location=location)
    QUERY = f'''SELECT * FROM `{dataset}.{table}`'''
    try:
        df = client.query(QUERY).to_dataframe()
        df_majority = df[df['not_fully_paid']==0]
        df_minority = df[df['not_fully_paid']==1]
        
        df_minority_upsampled = resample(df_minority, 
                                        replace=True,
                                        n_samples=len(df_majority),  
                                        random_state=123)
        
        df_upsampled = pd.concat([df_majority, df_minority_upsampled])
        df.to_gbq(if_exists='replace', project_id=project, destination_table=f'{dataset}.{table}')
        preprocess_result = 'true'
    except Exception as e:
        print(e)
        preprocess_result = 'false'
    return (preprocess_result, )

compiler.Compiler().compile(
    preprocess_dataset_in_bq, "preprocess_dataset_in_bq.yaml"
)

In [None]:
@component(
    base_image="gcr.io/deeplearning-platform-release/tf2-cpu.2-6:latest",
    packages_to_install=["google-cloud-aiplatform"],
)
def classification_model_eval_metrics(
    project: str,
    location: str,
    thresholds_dict_str: str,
    model: Input[Artifact],
    metrics: Output[Metrics],
    metricsc: Output[ClassificationMetrics],
) -> NamedTuple("Outputs", [("dep_decision", str)]):  # Return parameter.
    
    import json
    import logging

    from google.cloud import aiplatform

    aiplatform.init(project=project)

    # Fetch model eval info
    def get_eval_info(model):
        response = model.list_model_evaluations()
        metrics_list = []
        metrics_string_list = []
        for evaluation in response:
            evaluation = evaluation.to_dict()
            print("model_evaluation")
            print(" name:", evaluation["name"])
            print(" metrics_schema_uri:", evaluation["metricsSchemaUri"])
            metrics = evaluation["metrics"]
            for metric in metrics.keys():
                logging.info("metric: %s, value: %s", metric, metrics[metric])
            metrics_str = json.dumps(metrics)
            metrics_list.append(metrics)
            metrics_string_list.append(metrics_str)

        return (
            evaluation["name"],
            metrics_list,
            metrics_string_list,
        )

    # Use the given metrics threshold(s) to determine whether the model is
    # accurate enough to deploy.
    def classification_thresholds_check(metrics_dict, thresholds_dict):
        for k, v in thresholds_dict.items():
            logging.info("k {}, v {}".format(k, v))
            if k in ["auRoc", "auPrc"]:  # higher is better
                if metrics_dict[k] < v:  # if under threshold, don't deploy
                    logging.info("{} < {}; returning False".format(metrics_dict[k], v))
                    return False
        logging.info("threshold checks passed.")
        return True

    def log_metrics(metrics_list, metricsc):
        test_confusion_matrix = metrics_list[0]["confusionMatrix"]
        logging.info("rows: %s", test_confusion_matrix["rows"])

        # log the ROC curve
        fpr = []
        tpr = []
        thresholds = []
        for item in metrics_list[0]["confidenceMetrics"]:
            fpr.append(item.get("falsePositiveRate", 0.0))
            tpr.append(item.get("recall", 0.0))
            thresholds.append(item.get("confidenceThreshold", 0.0))
        print(f"fpr: {fpr}")
        print(f"tpr: {tpr}")
        print(f"thresholds: {thresholds}")
        metricsc.log_roc_curve(fpr, tpr, thresholds)

        # log the confusion matrix
        annotations = []
        for item in test_confusion_matrix["annotationSpecs"]:
            annotations.append(item["displayName"])
        logging.info("confusion matrix annotations: %s", annotations)
        metricsc.log_confusion_matrix(
            annotations,
            test_confusion_matrix["rows"],
        )

        # log textual metrics info as well
        for metric in metrics_list[0].keys():
            if metric != "confidenceMetrics":
                val_string = json.dumps(metrics_list[0][metric])
                metrics.log_metric(metric, val_string)

    logging.getLogger().setLevel(logging.INFO)

    # extract the model resource name from the input Model Artifact
    model_resource_path = model.metadata["resourceName"]
    logging.info("model path: %s", model_resource_path)

    # Get the trained model resource
    model = aiplatform.Model(model_resource_path)

    # Get model evaluation metrics from the the trained model
    eval_name, metrics_list, metrics_str_list = get_eval_info(model)
    logging.info("got evaluation name: %s", eval_name)
    logging.info("got metrics list: %s", metrics_list)
    log_metrics(metrics_list, metricsc)

    thresholds_dict = json.loads(thresholds_dict_str)
    deploy = classification_thresholds_check(metrics_list[0], thresholds_dict)
    if deploy:
        dep_decision = "true"
    else:
        dep_decision = "false"
    logging.info("deployment decision is %s", dep_decision)

    return (dep_decision,)


compiler.Compiler().compile(
    classification_model_eval_metrics, "tabular_eval_component.yaml"
)

In [None]:
@kfp.dsl.pipeline(name=PIPELINE_NAME, pipeline_root=PIPELINE_ROOT)
def pipeline(
    bq_source: str,
    DATASET_DISPLAY_NAME: str,
    TRAINING_DISPLAY_NAME: str,
    MODEL_DISPLAY_NAME: str,
    ENDPOINT_DISPLAY_NAME: str,
    MACHINE_TYPE: str,
    project: str,
    gcp_region: str,
    thresholds_dict_str: str,
):
    from google_cloud_pipeline_components.v1.automl.training_job import \
        AutoMLTabularTrainingJobRunOp
    from google_cloud_pipeline_components.v1.dataset.create_tabular_dataset.component import \
        tabular_dataset_create as TabularDatasetCreateOp
    from google_cloud_pipeline_components.v1.endpoint.create_endpoint.component import \
        endpoint_create as EndpointCreateOp
    from google_cloud_pipeline_components.v1.endpoint.deploy_model.component import \
        model_deploy as ModelDeployOp

    preprocess_dataset = preprocess_dataset_in_bq(
        project=project,
        location='asia-southeast2',
        dataset='demo_dataset_3',
        table='loan_data'
    )
    with dsl.If(
        preprocess_dataset.outputs['preprocess_result'] == 'true',
        name='clean_dataset_decision'
    ):
        smote_dataset = smote_dataset_store_in_bq(
            project=project,
            location='asia-southeast2',
            dataset='demo_dataset_3',
            table='loan_data_preprocessed'
        )

        with dsl.If(
            smote_dataset.outputs['preprocess_result'] == 'true',
            name='clean_dataset_decision'
        ):

            dataset_create_op = TabularDatasetCreateOp(
                project=project,
                location=gcp_region,
                display_name=DATASET_DISPLAY_NAME,
                bq_source=bq_source,
            )

            training_op = AutoMLTabularTrainingJobRunOp(
                project=project,
                location=gcp_region,
                display_name=TRAINING_DISPLAY_NAME,
                optimization_prediction_type="classification",
                optimization_objective="minimize-log-loss",
                budget_milli_node_hours=1000,
                model_display_name=MODEL_DISPLAY_NAME,
                column_specs={
                    "credit_policy": "categorical",
                    "int_rate": "numeric",
                    "installment": "numeric",
                    "log_annual_inc": "numeric",
                    "dti": "numeric",
                    "fico_encoded": "numeric",
                    "days_with_cr_line": "numeric",
                    "revol_bal": "numeric",
                    "revol_util": "numeric",
                    "inq_last_6mths": "numeric",
                    "delinq_2yrs": "numeric",
                    "pub_rec": "numeric",
                    "not_fully_paid": "numeric",
                    "purpose_encoded": "numeric",
                },
                dataset=dataset_create_op.outputs["dataset"],
                target_column="not_fully_paid",
            )

            model_eval_task = classification_model_eval_metrics(
                project=project,
                location=gcp_region,
                thresholds_dict_str=thresholds_dict_str,
                model=training_op.outputs["model"],
            )

            with dsl.If(
                model_eval_task.outputs["dep_decision"] == "true",
                name="deploy_decision",
            ):

                endpoint_op = EndpointCreateOp(
                    project=project,
                    location=gcp_region,
                    display_name=ENDPOINT_DISPLAY_NAME,
                )

                ModelDeployOp(
                    model=training_op.outputs["model"],
                    endpoint=endpoint_op.outputs["endpoint"],
                    dedicated_resources_min_replica_count=1,
                    dedicated_resources_max_replica_count=1,
                    dedicated_resources_machine_type=MACHINE_TYPE,
                )
compiler.Compiler().compile(
    pipeline_func=pipeline,
    package_path="tabular_classification_pipeline.yaml",
)

## Run the Pipeline

In [None]:
import random
import string

# Generate a uuid of a specifed length(default=8)
def generate_uuid(length: int = 8) -> str:
    return "".join(random.choices(string.ascii_lowercase + string.digits, k=length))

UUID = generate_uuid()

In [None]:
# Set the display-names for Vertex AI resources
PIPELINE_DISPLAY_NAME = "demo-dataset-3-loan-data-pipeline"  # @param {type:"string"}
DATASET_DISPLAY_NAME = "demo-dataset-3-loan-data-dataset"  # @param {type:"string"}
MODEL_DISPLAY_NAME = "demo-dataset-3-loan-data-model"  # @param {type:"string"}
TRAINING_DISPLAY_NAME = "demo-dataset-3-loan-data-training"  # @param {type:"string"}
ENDPOINT_DISPLAY_NAME = "demo-dataset-3-loan-data-endpoint"  # @param {type:"string"}

# Otherwise, use the default display-names
if PIPELINE_DISPLAY_NAME == "demo-dataset-3-loan-data-pipeline":
    PIPELINE_DISPLAY_NAME = f"pipeline_demo3_{UUID}"

if DATASET_DISPLAY_NAME == "demo-dataset-3-loan-data-dataset":
    DATASET_DISPLAY_NAME = f"dataset_demo3_{UUID}"

if MODEL_DISPLAY_NAME == "demo-dataset-3-loan-data-model":
    MODEL_DISPLAY_NAME = f"model_demo3_{UUID}"

if TRAINING_DISPLAY_NAME == "demo-dataset-3-loan-data-training":
    TRAINING_DISPLAY_NAME = f"automl_training_demo3_{UUID}"

if ENDPOINT_DISPLAY_NAME == "demo-dataset-3-loan-data-endpoint":
    ENDPOINT_DISPLAY_NAME = f"endpoint_demo3_{UUID}"

# Set machine type
MACHINE_TYPE = "n1-standard-4"

In [None]:
# Validate region of the given source (BigQuery) against region of the pipeline
from google.cloud import bigquery
import os
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = '...'

bq_source = "dla-ml-specialization.demo_dataset_3.loan_data_preprocessed"

client = bigquery.Client()
bq_region = client.get_table(bq_source).location.lower()
try:
    assert bq_region in REGION
    print(f"Region validated: {REGION}")
except AssertionError:
    print(
        "Please make sure the region of BigQuery (source) and that of the pipeline are the same."
    )

In [None]:
# Configure the pipeline
job = aiplatform.PipelineJob(
    display_name=PIPELINE_DISPLAY_NAME,
    template_path="tabular_classification_pipeline.yaml",
    pipeline_root=PIPELINE_ROOT,
    parameter_values={
        "project": PROJECT_ID,
        "gcp_region": REGION,
        "bq_source": f"bq://{bq_source}",
        "thresholds_dict_str": '{"auRoc": 0.80}',
        "DATASET_DISPLAY_NAME": DATASET_DISPLAY_NAME,
        "TRAINING_DISPLAY_NAME": TRAINING_DISPLAY_NAME,
        "MODEL_DISPLAY_NAME": MODEL_DISPLAY_NAME,
        "ENDPOINT_DISPLAY_NAME": ENDPOINT_DISPLAY_NAME,
        "MACHINE_TYPE": MACHINE_TYPE,
    },
    enable_caching=False,
)

In [None]:
# Run the job
job.run(service_account='...')

In [None]:
pipeline_df = aiplatform.get_pipeline_df(pipeline=PIPELINE_NAME)
print(pipeline_df.head(2))