# Setup Info

In [1]:
USER_FLAG = "--user"

In [2]:
!pip3 install {USER_FLAG} google-cloud-aiplatform==1.7.0 --upgrade
!pip3 install {USER_FLAG} kfp==1.8.9 google-cloud-pipeline-components==0.2.0



In [3]:
import os

if not os.getenv("IS_TESTING"):
    # Automatically restart kernel after installs
    import IPython

    app = IPython.Application.instance()
    app.kernel.do_shutdown(True)

In [1]:
!python3 -c "import kfp; print('KFP SDK version: {}'.format(kfp.__version__))"
!python3 -c "import google_cloud_pipeline_components; print('google_cloud_pipeline_components version: {}'.format(google_cloud_pipeline_components.__version__))"


KFP SDK version: 1.8.9
google_cloud_pipeline_components version: 0.2.0


In [2]:
import os
PROJECT_ID = ""

# Get your Google Cloud project ID from gcloud
if not os.getenv("IS_TESTING"):
    shell_output=!gcloud config list --format 'value(core.project)' 2>/dev/null
    PROJECT_ID = shell_output[0]
    print("Project ID: ", PROJECT_ID)

Project ID:  rossmansales


In [3]:
BUCKET_NAME="gs://" + PROJECT_ID + "-bucket"
BUCKET_NAME

'gs://rossmansales-bucket'

## Imports

In [4]:
import kfp

from kfp.v2 import compiler, dsl
from kfp.v2.dsl import component, pipeline, Artifact, ClassificationMetrics, Input, Output, Model, Metrics

from google.cloud import aiplatform as aip
from google_cloud_pipeline_components import aiplatform as gcc_aip
from typing import NamedTuple

## Defining Constants

In [5]:
PATH=%env PATH
%env PATH={PATH}:/home/jupyter/.local/bin
REGION="us-central1"

PIPELINE_ROOT = f"{BUCKET_NAME}/pipeline_root"
PIPELINE_ROOT

env: PATH=/usr/local/cuda/bin:/opt/conda/bin:/opt/conda/condabin:/usr/local/bin:/usr/bin:/bin:/usr/local/games:/usr/games:/home/jupyter/.local/bin


'gs://rossmansales-bucket/pipeline_root'

In [6]:
aip.init(project=PROJECT_ID, staging_bucket=BUCKET_NAME)

In [7]:
from datetime import datetime

TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")
TIMESTAMP

'20211202142846'

# Creating Pipeline

In [44]:
@component(
    base_image="gcr.io/deeplearning-platform-release/tf2-cpu.2-3:latest",
    output_component_file="tabular_eval_component.yaml",
    packages_to_install=["google-cloud-aiplatform"],
)
def regression_model_eval_metrics(
    project: str,
    location: str,  # "us-central1",
    api_endpoint: str,  # "us-central1-aiplatform.googleapis.com",
    thresholds_dict_str: str,
    model: Input[Artifact],
    metrics: Output[Metrics],
) -> NamedTuple("Outputs", [("dep_decision", str)]):  # Return parameter.

    import json
    import logging

    from google.cloud import aiplatform as aip

    # Fetch model eval info
    def get_eval_info(client, model_name):
        from google.protobuf.json_format import MessageToDict

        response = client.list_model_evaluations(parent=model_name)
        metrics_list = []
        metrics_string_list = []
        for evaluation in response:
            print("model_evaluation")
            print(" name:", evaluation.name)
            print(" metrics_schema_uri:", evaluation.metrics_schema_uri)
            metrics = MessageToDict(evaluation._pb.metrics)
            for metric in metrics.keys():
                logging.info("metric: %s, value: %s", metric, metrics[metric])
            metrics_str = json.dumps(metrics)
            metrics_list.append(metrics)
            metrics_string_list.append(metrics_str)

        return (
            evaluation.name,
            metrics_list,
            metrics_string_list,
        )

    # Use the given metrics threshold(s) to determine whether the model is
    # accurate enough to deploy.
    def regression_thresholds_check(metrics_dict, thresholds_dict):
        for k, v in thresholds_dict.items():
            logging.info("k {}, v {}".format(k, v))
            if k == "rootMeanSquaredError":
                if metrics_dict[k] > v:  # if over threshold, don't deploy
                    logging.info("{} > {}; returning False".format(metrics_dict[k], v))
                    return False
                logging.info("threshold checks passed.")
                return True

    logging.getLogger().setLevel(logging.INFO)
    aip.init(project=project)
    # extract the model resource name from the input Model Artifact
    model_resource_path = model.metadata["resourceName"]
    logging.info("model path: %s", model_resource_path)

    client_options = {"api_endpoint": api_endpoint}
    # Initialize client that will be used to create and send requests.
    client = aip.gapic.ModelServiceClient(client_options=client_options)
    eval_name, metrics_list, metrics_str_list = get_eval_info(
        client, model_resource_path
    )
    logging.info("got evaluation name: %s", eval_name)
    logging.info("got metrics list: %s", metrics_list)

    thresholds_dict = json.loads(thresholds_dict_str)
    deploy = regression_thresholds_check(metrics_list[0], thresholds_dict)
    if deploy:
        dep_decision = "true"
    else:
        dep_decision = "false"
    logging.info("deployment decision is %s", dep_decision)

    return (dep_decision,)

In [45]:
TRAIN_FILE_NAME = "train.csv"
#! gsutil cp gs://aju-dev-demos-codelabs/sample_data/california_housing_train.csv {PIPELINE_ROOT}/data/

gcs_csv_path = f"{PIPELINE_ROOT}/data/{TRAIN_FILE_NAME}"


@kfp.dsl.pipeline(name="automl-tab-training-v2")
def pipeline(project: str = PROJECT_ID, 
             region: str = "us-central1",
             api_endpoint: str = "us-central1-aiplatform.googleapis.com",
             thresholds_dict_str: str = '{"rootMeanSquaredError": 2000}',
            ):

    dataset_create_op = gcc_aip.TabularDatasetCreateOp(
        project=project, display_name="sales", gcs_source=gcs_csv_path
    )

    training_op = gcc_aip.AutoMLTabularTrainingJobRunOp(
        project=project,
        display_name="train-automl-rossman-sales",
        optimization_prediction_type="regression",
        optimization_objective="minimize-rmse",
        column_transformations=[
            {"numeric": {"column_name": "Store"}},
            {"categorical": {"column_name": "DayOfWeek"}},
            {"numeric": {"column_name": "Sales"}},
            {"categorical": {"column_name": "Promo"}},
            {"categorical": {"column_name": "StoreType"}},
            {"categorical": {"column_name": "Assortment"}},
        ],
        dataset=dataset_create_op.outputs["dataset"],
        target_column="Sales",
        timestamp_split_column_name="Date",
    )
    model_eval_task = regression_model_eval_metrics(
        project,
        region,
        api_endpoint,
        thresholds_dict_str,
        training_op.outputs["model"],
    )
        
    with dsl.Condition(
        model_eval_task.outputs["dep_decision"] == "true",
        name="deploy_decision",
    ):
        
        endpoint_op = gcc_aip.EndpointCreateOp(
            project=project,
            location=region,
            display_name="train-automl-sales",
        )

        gcc_aip.ModelDeployOp(
            model=training_op.outputs["model"],
            endpoint=endpoint_op.outputs["endpoint"],
            dedicated_resources_machine_type="n1-standard-4",
            dedicated_resources_min_replica_count=1,
            dedicated_resources_max_replica_count=1,
        )

## Compiling the Pipeline

In [46]:
from kfp.v2 import compiler  # noqa: F811

compiler.Compiler().compile(
    pipeline_func=pipeline,
    package_path="tabular regression_pipeline.json".replace(" ", "_"),
)

## Submitting the Job

In [47]:
DISPLAY_NAME = "rossman_sales_" + TIMESTAMP

job = aip.PipelineJob(
    display_name=DISPLAY_NAME,
    template_path="tabular regression_pipeline.json".replace(" ", "_"),
    pipeline_root=PIPELINE_ROOT,
)

job.run()

INFO:google.cloud.aiplatform.pipeline_jobs:Creating PipelineJob
INFO:google.cloud.aiplatform.pipeline_jobs:PipelineJob created. Resource name: projects/301952515477/locations/us-central1/pipelineJobs/automl-tab-training-v2-20211202150922
INFO:google.cloud.aiplatform.pipeline_jobs:To use this PipelineJob in another session:
INFO:google.cloud.aiplatform.pipeline_jobs:pipeline_job = aiplatform.PipelineJob.get('projects/301952515477/locations/us-central1/pipelineJobs/automl-tab-training-v2-20211202150922')
INFO:google.cloud.aiplatform.pipeline_jobs:View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/automl-tab-training-v2-20211202150922?project=301952515477
INFO:google.cloud.aiplatform.pipeline_jobs:PipelineJob projects/301952515477/locations/us-central1/pipelineJobs/automl-tab-training-v2-20211202150922 current state:
PipelineState.PIPELINE_STATE_RUNNING
INFO:google.cloud.aiplatform.pipeline_jobs:PipelineJob projects/301952515477/locations/us