# Install KFP and db_types

In [None]:
USER_FLAG = "--user"
!pip3 install {USER_FLAG} kfp==1.8.9 > /dev/null
!pip3 install {USER_FLAG} db_dtypes > /dev/null

# Restart notebook kernel to load new modules

In [None]:
import os
if not os.getenv("IS_TESTING"):
    # Automatically restart kernel after installs
    import IPython
    app = IPython.Application.instance()
    app.kernel.do_shutdown(True)

## Validate KFP Installation
Wait for restart modal before continuing

In [1]:
!python3 -c "import kfp; print('KFP SDK version: {}'.format(kfp.__version__))"

KFP SDK version: 1.8.9


# Set environment variables for PROJECT_ID and BUCKET_NAME

In [2]:
import os  # Reimport due to kernel restart

PROJECT_ID = ""
# Get your Google Cloud project ID from gcloud
if not os.getenv("IS_TESTING"):
    shell_output=!gcloud config list --format 'value(core.project)' 2>/dev/null
    PROJECT_ID = shell_output[0]
    print(f"Project ID: {PROJECT_ID}")
    
BUCKET_NAME="gs://" + PROJECT_ID + "-bucket"
print(f"Bucket Name: {BUCKET_NAME}")

Project ID: qwiklabs-gcp-00-6a332b3e9684
Bucket Name: gs://qwiklabs-gcp-00-6a332b3e9684-bucket


# Set environment variables for PATH, REGION & PIPELINE_ROOT

In [3]:
PATH=%env PATH
%env PATH={PATH}:/home/jupyter/.local/bin
REGION="us-central1"
PIPELINE_ROOT = f"{BUCKET_NAME}/pipeline_root/"
PIPELINE_ROOT

env: PATH=/usr/local/cuda/bin:/opt/conda/bin:/opt/conda/condabin:/usr/local/bin:/usr/bin:/bin:/usr/local/games:/usr/games:/home/jupyter/.local/bin


'gs://qwiklabs-gcp-00-6a332b3e9684-bucket/pipeline_root/'

# LAB BEGINS HERE
Import required modules

In [4]:
import matplotlib.pyplot as plt
import pandas as pd
from kfp.v2 import compiler, dsl
from kfp.v2.dsl import pipeline, component, Artifact, Dataset, Input, Metrics, Model, Output, InputPath, OutputPath
from google.cloud import aiplatform
from google.cloud import aiplatform_v1

from datetime import datetime

# Pipeline Step 1:  [Component] Load data from BQ
- Extracts training data from BQ table referenced as input to component
- Loads data into Dataframe
- Outputs data from component as CSV data

In [5]:
@component(
    packages_to_install=["google-cloud-bigquery", "pandas", "pyarrow", "db_dtypes"],
    base_image="python:3.9",
    output_component_file="create_dataset.yaml"
    )

def get_dataframe(
    bq_table: str,
    output_data_path: OutputPath("Dataset")
    ):

    from google.cloud import bigquery
    import pandas as pd
    bq_client = bigquery.Client(project="qwiklabs-gcp-00-6a332b3e9684")


    def get_query(bq_input_table: str) -> str:
        """Generates BQ Query to read data.

        Args:
        bq_input_table: The full name of the bq input table to be read into
        the dataframe (e.g. <project>.<dataset>.<table>)
        Returns: A BQ query string.
        """
        return f"""
        SELECT *
        FROM `{bq_input_table}`
        """

    def load_bq_data(query: str, client: bigquery.Client) -> pd.DataFrame:
        """Loads data from bq into a Pandas Dataframe for EDA.
        Args:
        query: BQ Query to generate data.
        client: BQ Client used to execute query.
        Returns:
        pd.DataFrame: A dataframe with the requested data.
        """
        df = client.query(query).to_dataframe()
        return df

    dataframe = load_bq_data(get_query(bq_table), bq_client)
    dataframe.to_csv(output_data_path)

In [6]:
from google.cloud import bigquery
import pandas as pd
bq_client = bigquery.Client(project="qwiklabs-gcp-00-6a332b3e9684")


def get_query(bq_input_table: str) -> str:
    """Generates BQ Query to read data.
    Args:
    bq_input_table: The full name of the bq input table to be read into
    the dataframe (e.g. <project>.<dataset>.<table>)
    Returns: A BQ query string.
    """
    return f"""
        SELECT *
        FROM `{bq_input_table}`
        """

def load_bq_data(query: str, client: bigquery.Client) -> pd.DataFrame:
    """Loads data from bq into a Pandas Dataframe for EDA.
    Args:
    query: BQ Query to generate data.
    client: BQ Client used to execute query.
    Returns:
    pd.DataFrame: A dataframe with the requested data.
    """
    df = client.query(query).to_dataframe()
    return df

dataframe = load_bq_data(get_query("qwiklabs-gcp-00-6a332b3e9684.beans.dry_beans_tbl"), bq_client)
dataframe.to_csv(output_data_path)

Forbidden: 403 Access Denied: Table qwiklabs-gcp-00-6a332b3e9684:beans.dry_beans_tbl: User does not have permission to query table qwiklabs-gcp-00-6a332b3e9684:beans.dry_beans_tbl, or perhaps it does not exist in location US.

Location: US
Job ID: a3ec7498-f8c4-45d7-a2be-1f159f927662


# Pipeline Step 2:  [Component] Train Scikit-learn model
- Takes CSV data from step 1 as input
- Train Scikit-learn decision tree model
- Output model

In [7]:
@component(
    packages_to_install=["sklearn", "pandas", "joblib"],
    base_image="python:3.9",
    output_component_file="beans_model_component.yaml",
)
def sklearn_train(
    dataset: Input[Dataset],
    metrics: Output[Metrics],
    model: Output[Model]
):
    from sklearn.tree import DecisionTreeClassifier
    from sklearn.metrics import roc_curve
    from sklearn.model_selection import train_test_split
    from joblib import dump
    import pandas as pd
    df = pd.read_csv(dataset.path)
    labels = df.pop("Class").tolist()
    data = df.values.tolist()
    x_train, x_test, y_train, y_test = train_test_split(data, labels)
    skmodel = DecisionTreeClassifier()
    skmodel.fit(x_train,y_train)
    score = skmodel.score(x_test,y_test)
    print('accuracy is:',score)
    metrics.log_metric("accuracy",(score * 100.0))
    metrics.log_metric("framework", "Scikit Learn")
    metrics.log_metric("dataset_size", len(df))
    dump(skmodel, model.path + ".joblib")

# Pipeline Step 3:  [Component] Upload & Deploy model to Vertex AI
- Takes model from step 2 as input
- Upload model to Vertex AI
- Deploy model as Vertex AI endpoint

In [8]:
@component(
    packages_to_install=["google-cloud-aiplatform"],
    base_image="python:3.9",
    output_component_file="beans_deploy_component.yaml",
)
def deploy_model(
    model: Input[Model],
    project: str,
    region: str,
    vertex_endpoint: Output[Artifact],
    vertex_model: Output[Model]
):
    from google.cloud import aiplatform
    aiplatform.init(project=project, location=region)
    deployed_model = aiplatform.Model.upload(
        display_name="beans-model-pipeline",
        artifact_uri = model.uri.replace("model", ""),
        serving_container_image_uri="us-docker.pkg.dev/vertex-ai/prediction/sklearn-cpu.0-24:latest"
    )
    endpoint = deployed_model.deploy(machine_type="n1-standard-4")
    # Save data to the output params
    vertex_endpoint.uri = endpoint.resource_name
    vertex_model.uri = deployed_model.resource_name

# Pipeline Creation
## Define a pipeline from the 3 components

In [9]:
@pipeline(
    name="mlmd-pipeline"
)
def pipeline(
    bq_table: str = "",
    output_data_path: str = "data.csv",
    project: str = PROJECT_ID,
    region: str = REGION
):
    dataset_task = get_dataframe(bq_table)
    model_task = sklearn_train(
        dataset_task.output
    )
    deploy_task = deploy_model(
        model=model_task.outputs["model"],
        project=project,
        region=region
    )

## Compile pipeline to JSON

In [10]:
compiler.Compiler().compile(
    pipeline_func=pipeline, package_path="mlmd_pipeline.json"
)



# Execute Pipeline Runs

## Create Pipeline Job 1: Small Dataset Table

In [11]:
timestamp = datetime.now().strftime("%Y%m%d%H%M%S")
run1 = aiplatform.PipelineJob(
    display_name="mlmd-pipeline",
    template_path="mlmd_pipeline.json",
    job_id="mlmd-pipeline-small-{0}".format(timestamp),
    parameter_values={"bq_table":"{0}.beans.dry_beans_tbl_small".format(PROJECT_ID)},
    enable_caching=True,
)

## Create Pipeline Job 2:  Full Dataset Table

In [12]:
timestamp = datetime.now().strftime("%Y%m%d%H%M%S")
run2 = aiplatform.PipelineJob(
    display_name="mlmd-pipeline",
    template_path="mlmd_pipeline.json",
    job_id="mlmd-pipeline-large-{0}".format(timestamp),
    parameter_values={"bq_table":"{0}.beans.dry_beans_tbl".format(PROJECT_ID)},
    enable_caching=True,
)

## Execute Jobs

In [13]:
run1.submit(service_account = f"mlops-svc@{PROJECT_ID}.iam.gserviceaccount.com")

Creating PipelineJob
PipelineJob created. Resource name: projects/670026345621/locations/us-central1/pipelineJobs/mlmd-pipeline-small-20221007143057
To use this PipelineJob in another session:
pipeline_job = aiplatform.PipelineJob.get('projects/670026345621/locations/us-central1/pipelineJobs/mlmd-pipeline-small-20221007143057')
View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/mlmd-pipeline-small-20221007143057?project=670026345621


In [14]:
run2.submit(service_account = f"mlops-svc@{PROJECT_ID}.iam.gserviceaccount.com")

Creating PipelineJob
PipelineJob created. Resource name: projects/670026345621/locations/us-central1/pipelineJobs/mlmd-pipeline-large-20221007143058
To use this PipelineJob in another session:
pipeline_job = aiplatform.PipelineJob.get('projects/670026345621/locations/us-central1/pipelineJobs/mlmd-pipeline-large-20221007143058')
View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/mlmd-pipeline-large-20221007143058?project=670026345621


# Comparing pipeline runs with the Vertex AI SDK

In [16]:
df = aiplatform.get_pipeline_df(pipeline="mlmd-pipeline")
df

Unnamed: 0,pipeline_name,run_name,param.input:region,param.input:bq_table,param.input:project,param.input:output_data_path
0,mlmd-pipeline,mlmd-pipeline-large-20221007143058,us-central1,qwiklabs-gcp-00-6a332b3e9684.beans.dry_beans_tbl,qwiklabs-gcp-00-6a332b3e9684,data.csv
1,mlmd-pipeline,mlmd-pipeline-small-20221007143057,us-central1,qwiklabs-gcp-00-6a332b3e9684.beans.dry_beans_t...,qwiklabs-gcp-00-6a332b3e9684,data.csv
2,mlmd-pipeline,mlmd-pipeline-small-20221007135654,us-central1,qwiklabs-gcp-00-6a332b3e9684.beans.dry_beans_t...,qwiklabs-gcp-00-6a332b3e9684,data.csv
3,mlmd-pipeline,mlmd-pipeline-small-20221007134812,us-central1,qwiklabs-gcp-00-6a332b3e9684.beans.dry_beans_t...,qwiklabs-gcp-00-6a332b3e9684,data.csv
4,mlmd-pipeline,mlmd-pipeline-small-20221007134040,us-central1,qwiklabs-gcp-00-6a332b3e9684.beans.dry_beans_t...,qwiklabs-gcp-00-6a332b3e9684,data.csv
5,mlmd-pipeline,mlmd-pipeline-small-20221007133345,us-central1,qwiklabs-gcp-00-6a332b3e9684.beans.dry_bean_tb...,qwiklabs-gcp-00-6a332b3e9684,data.csv


In [None]:
plt.plot(df["metric.dataset_size"], df["metric.accuracy"],label="Accuracy")
plt.title("Accuracy and dataset size")
plt.legend(loc=4)
plt.show()

## Querying Pipeline Metrics
Getting all Model artifacts

In [None]:
API_ENDPOINT = "{}-aiplatform.googleapis.com".format(REGION)
metadata_client = aiplatform_v1.MetadataServiceClient(
  client_options={
      "api_endpoint": API_ENDPOINT
  }
)

In [None]:
MODEL_FILTER="schema_title = \"system.Model\""
artifact_request = aiplatform_v1.ListArtifactsRequest(
    parent="projects/{0}/locations/{1}/metadataStores/default".format(PROJECT_ID, REGION),
    filter=MODEL_FILTER
)
model_artifacts = metadata_client.list_artifacts(artifact_request)

## Filtering objects and displaying in a Dataframe

In [None]:
LIVE_FILTER = "create_time > \"2021-08-10T00:00:00-00:00\" AND state = LIVE"
artifact_req = {
    "parent": "projects/{0}/locations/{1}/metadataStores/default".format(PROJECT_ID, REGION),
    "filter": LIVE_FILTER
}
live_artifacts = metadata_client.list_artifacts(artifact_req)

# Display data in Dataframe
data = {'uri': [], 'createTime': [], 'type': []}
for i in live_artifacts:
    data['uri'].append(i.uri)
    data['createTime'].append(i.create_time)
    data['type'].append(i.schema_title)
df = pd.DataFrame.from_dict(data)
df