In [9]:
USER_FLAG = "--user"
!pip3 install {USER_FLAG} google-cloud-aiplatform kfp google-cloud-pipeline-components --upgrade



In [10]:
!python3 -c "import kfp; print('KFP SDK version: {}'.format(kfp.__version__))"
!python3 -c "import google_cloud_pipeline_components; print('google_cloud_pipeline_components version: {}'.format(google_cloud_pipeline_components.__version__))"

KFP SDK version: 1.8.12
google_cloud_pipeline_components version: 1.0.2


### Import the necessary libraries

In [31]:
import getopt, sys
import time
from typing import NamedTuple

import kfp
from google.cloud import aiplatform
from google_cloud_pipeline_components import aiplatform as gcc_aip
from google_cloud_pipeline_components.v1 import bigquery as gcc_bq

from kfp.v2 import compiler
from kfp.v2 import dsl
from kfp.v2.dsl import (Artifact, ClassificationMetrics, Input, Metrics,
                        Output, component)

### Set some constants

In [39]:
pipeline_root_path = "gs://my-sweet-data-bkt-b7eb4dc0"
project_id   = "skynet-dev-335306"
dataset_name = "iot_smart_home"
table_name   = "hh106_features"
DISPLAY_NAME = 'hh106_automl-activity-{}'.format(str(int(time.time())))
gcp_region   = 'us-central1'
api_endpoint = "us-central1-aiplatform.googleapis.com"

thresholds_dict_str = '{"auRoc": 0.95}'

### Create Custom Evaluation Component

In [44]:
@component(
    base_image="gcr.io/deeplearning-platform-release/tf2-cpu.2-3:latest",
    output_component_file="tabular_eval_component.yaml",
    packages_to_install=["google-cloud-aiplatform"],
)
def classification_model_eval_metrics(
    project: str,
    location: str,  # "us-central1",
    api_endpoint: str,  # "us-central1-aiplatform.googleapis.com",
    thresholds_dict_str: str,
    model: Input[Artifact],
    metrics: Output[Metrics],
    metricsc: Output[ClassificationMetrics],
) -> NamedTuple("Outputs", [("dep_decision", str)]):  # Return parameter.

    import json
    import logging

    from google.cloud import aiplatform as aip

    # Fetch model eval info
    def get_eval_info(client, model_name):
        from google.protobuf.json_format import MessageToDict

        response = client.list_model_evaluations(parent=model_name)
        metrics_list = []
        metrics_string_list = []
        for evaluation in response:
            print("model_evaluation")
            print(" name:", evaluation.name)
            print(" metrics_schema_uri:", evaluation.metrics_schema_uri)
            metrics = MessageToDict(evaluation._pb.metrics)
            for metric in metrics.keys():
                logging.info("metric: %s, value: %s", metric, metrics[metric])
            metrics_str = json.dumps(metrics)
            metrics_list.append(metrics)
            metrics_string_list.append(metrics_str)

        return (
            evaluation.name,
            metrics_list,
            metrics_string_list,
        )

    # Use the given metrics threshold(s) to determine whether the model is
    # accurate enough to deploy.
    def classification_thresholds_check(metrics_dict, thresholds_dict):
        for k, v in thresholds_dict.items():
            logging.info("k {}, v {}".format(k, v))
            if k in ["auRoc", "auPrc"]:  # higher is better
                if metrics_dict[k] < v:  # if under threshold, don't deploy
                    logging.info("{} < {}; returning False".format(metrics_dict[k], v))
                    return False
        logging.info("threshold checks passed.")
        return True

#     def log_metrics(metrics_list, metricsc):
#         test_confusion_matrix = metrics_list[0]["confusionMatrix"]
#         logging.info("rows: %s", test_confusion_matrix["rows"])

#         # log the ROC curve
#         fpr = []
#         tpr = []
#         thresholds = []
#         for item in metrics_list[0]["confidenceMetrics"]:
#             fpr.append(item.get("falsePositiveRate", 0.0))
#             tpr.append(item.get("recall", 0.0))
#             thresholds.append(item.get("confidenceThreshold", 0.0))
#         print(f"fpr: {fpr}")
#         print(f"tpr: {tpr}")
#         print(f"thresholds: {thresholds}")
#         metricsc.log_roc_curve(fpr, tpr, thresholds)

#         # log the confusion matrix
#         annotations = []
#         for item in test_confusion_matrix["annotationSpecs"]:
#             annotations.append(item["displayName"])
#         logging.info("confusion matrix annotations: %s", annotations)
#         metricsc.log_confusion_matrix(
#             annotations,
#             test_confusion_matrix["rows"],
#         )

#         # log textual metrics info as well
#         for metric in metrics_list[0].keys():
#             if metric != "confidenceMetrics":
#                 val_string = json.dumps(metrics_list[0][metric])
#                 metrics.log_metric(metric, val_string)
#         # metrics.metadata["model_type"] = "AutoML Tabular classification"

    logging.getLogger().setLevel(logging.INFO)
    aip.init(project=project)
    # extract the model resource name from the input Model Artifact
    model_resource_path = model.metadata["resourceName"]
    logging.info("model path: %s", model_resource_path)

    client_options = {"api_endpoint": api_endpoint}
    # Initialize client that will be used to create and send requests.
    client = aip.gapic.ModelServiceClient(client_options=client_options)
    eval_name, metrics_list, metrics_str_list = get_eval_info(
        client, model_resource_path
    )
    logging.info("got evaluation name: %s", eval_name)
    logging.info("got metrics list: %s", metrics_list)
    # log_metrics(metrics_list, metricsc)

    thresholds_dict = json.loads(thresholds_dict_str)
    deploy = classification_thresholds_check(metrics_list[0], thresholds_dict)
    if deploy:
        dep_decision = "true"
    else:
        dep_decision = "false"
    logging.info("deployment decision is %s", dep_decision)

    return (dep_decision,)

### Define the pipeline

In [45]:
@kfp.dsl.pipeline(
    name="automl-activity-pipeline",
    pipeline_root=pipeline_root_path)
def pipeline():
    
    ds_op = gcc_aip.TabularDatasetCreateOp(
        project=project_id,
        display_name=DISPLAY_NAME,
        bq_source= f"bq://{project_id}.{dataset_name}.{table_name}"
    )

    train_op = gcc_aip.AutoMLTabularTrainingJobRunOp(
        project=project_id,
        display_name=DISPLAY_NAME,
        model_display_name=DISPLAY_NAME,
        dataset=ds_op.outputs['dataset'],
        target_column='activity',
        training_fraction_split=0.8, 
        validation_fraction_split=0.1,
        test_fraction_split=0.1,
        optimization_prediction_type='classification'
        )

    model_eval_task = classification_model_eval_metrics(
        project_id,
        gcp_region,
        api_endpoint,
        thresholds_dict_str,
        train_op.outputs["model"],
    )

    with dsl.Condition(
        model_eval_task.outputs["dep_decision"] == "true",
        name="deploy_decision",
    ):
        create_endpoint_op = gcc_aip.EndpointCreateOp(
            project=project_id,
            display_name = DISPLAY_NAME,
        )

        model_deploy_op = gcc_aip.ModelDeployOp(
            model=train_op.outputs["model"],
            endpoint=create_endpoint_op.outputs['endpoint'],
            dedicated_resources_min_replica_count=1,
            dedicated_resources_max_replica_count=1,
            dedicated_resources_machine_type="n1-standard-4",
            traffic_split={'0': 100} #newly deployed model gets 100% of the traffic
        )


### Compile

In [46]:
# Compile the python code into a Kubeflow Pipeline
compiler.Compiler().compile(pipeline_func=pipeline,
        package_path='automl_pipeline.json')

### Create the job and execute

In [47]:
ml_pipeline_job = aiplatform.PipelineJob(
    display_name="automl-tab-activity-training",
    template_path="automl_pipeline.json",
    pipeline_root=pipeline_root_path,
    enable_caching=True
)

ml_pipeline_job.submit()

INFO:google.cloud.aiplatform.pipeline_jobs:Creating PipelineJob
INFO:google.cloud.aiplatform.pipeline_jobs:PipelineJob created. Resource name: projects/566630855060/locations/us-central1/pipelineJobs/automl-activity-pipeline-20220411032705
INFO:google.cloud.aiplatform.pipeline_jobs:To use this PipelineJob in another session:
INFO:google.cloud.aiplatform.pipeline_jobs:pipeline_job = aiplatform.PipelineJob.get('projects/566630855060/locations/us-central1/pipelineJobs/automl-activity-pipeline-20220411032705')
INFO:google.cloud.aiplatform.pipeline_jobs:View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/automl-activity-pipeline-20220411032705?project=566630855060
