### The aim of this demo is to show the end to end ML workflow in a simple way in Vertex AI using some MLOps components such as Model Monitoring.

### Prepare environment, install libraries, set up environment variables.

In [None]:
print("Installing libraries")
USER_FLAG = "--user"
! pip3 install {USER_FLAG} --quiet --upgrade google-cloud-pipeline-components kfp
! pip3 install {USER_FLAG} --quiet --upgrade google-cloud-aiplatform google-cloud-bigquery
! pip3 install {USER_FLAG} --quiet db-dtypes

Installing libraries


In [None]:
#restart kernel after the installation of libraries
import IPython
app = IPython.Application.instance()
app.kernel.do_shutdown(True)

{'status': 'ok', 'restart': True}

In [1]:
shell_output = !gcloud config list --format 'value(core.project)' 
PROJECT_ID = shell_output[0]
print("Project ID: ", PROJECT_ID)


Project ID:  tadelle-372416


In [2]:
# timestamp - refresh when resubmitting pipeline runs
from datetime import datetime

TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")
print(TIMESTAMP)

20230124125520


In [3]:
BUCKET_NAME = "tadelle-bucket" 
BUCKET_URI = f"gs://{BUCKET_NAME}"
! gsutil ls -al $BUCKET_URI
REGION = "us-central1"
print(BUCKET_NAME+" in "+REGION)

                                 gs://tadelle-bucket/finance/
                                 gs://tadelle-bucket/hcls/
                                 gs://tadelle-bucket/telco/
tadelle-bucket in us-central1


In [None]:
SERVICE_ACCOUNT = "vertex-pipelines-sa@" + PROJECT_ID + ".iam.gserviceaccount.com"
print(SERVICE_ACCOUNT)

vertex-pipelines-sa@tadelle-372416.iam.gserviceaccount.com


### Import packages

In [4]:
import sys
from typing import NamedTuple
import os

from google.cloud import aiplatform as vertex
from google.cloud import bigquery
from google_cloud_pipeline_components import aiplatform as vertex_pipeline_components
from google_cloud_pipeline_components.experimental import bigquery as bq_components

from kfp.v2 import dsl
from kfp.v2 import compiler
from kfp.v2.dsl import (Artifact, Input, Metrics, Output, component, ClassificationMetrics)

#import kfp
#from kfp.v2 import dsl

In [7]:
PIPELINE_JSON_PKG_PATH = "churn-pipeline.json"
PIPELINE_ROOT = f"gs://{BUCKET_NAME}/pipeline_root"
DATA_FOLDER = f"{BUCKET_NAME}/data"

BQ_DATASET = "demo"  
BQ_DATASET_TABLE = "churn"
BQ_LOCATION = "us-central1"  
BQ_LOCATION = BQ_LOCATION.upper()
BQML_EXPORT_LOCATION = f"gs://{BUCKET_NAME}/artifacts/bqml"

DISPLAY_NAME = "churn"
ENDPOINT_DISPLAY_NAME = f"{DISPLAY_NAME}_endpoint"

image_prefix = REGION.split("-")[0]
BQML_SERVING_CONTAINER_IMAGE_URI = (    
    f"{image_prefix}-docker.pkg.dev/vertex-ai/prediction/tf2-cpu.2-8:latest")


### Pipeline components
### Split the data for Train/Test using BigQuery

In [8]:
@component(
    base_image="python:3.9",
    packages_to_install=["google-cloud-bigquery","pandas","pyarrow","fsspec"],
)  # pandas, pyarrow and fsspec required to export bq data to csv

def split_datasets(
    project: str,
    bq_location: str,
    bq_dataset: str,
    bq_dataset_table: str
) -> NamedTuple( "Outputs", [("dataset_uri", str), ("dataset_bq_uri", str)]):

    from collections import namedtuple
    from google.cloud import bigquery

    client = bigquery.Client(project=project, location=bq_location)

    def split_dataset():
        
        train_dataset_table = f"{project}.{bq_dataset}.{bq_dataset_table}_train"
        test_dataset_table = f"{project}.{bq_dataset}.{bq_dataset_table}_test"
        
        split_query = f"""
        CREATE OR REPLACE TABLE `{test_dataset_table}` AS
        SELECT *
        FROM `{project}.{bq_dataset}.{bq_dataset_table}`
        where RAND() <= 20/100;
        
        CREATE OR REPLACE TABLE `{train_dataset_table}` AS
        SELECT *
        FROM
         (
         SELECT *
         FROM `{project}.{bq_dataset}.{bq_dataset_table}` 
         EXCEPT distinct select * from `{test_dataset_table}`
         );
        """
        print("Splitting the dataset")
        
        query_job = client.query(split_query)  
        query_job.result()
        
        return (train_dataset_table, test_dataset_table) 

    dataset_uri = split_dataset()
    
    print("splitting dataset as training and test...")
    dataset_bq_uri = "bq://" + dataset_uri

    print(f"dataset: {dataset_uri}")

    result_tuple = namedtuple(
        "Outputs",
        ["dataset_uri", "dataset_bq_uri"],
    )
    return result_tuple(
        dataset_uri=str(dataset_uri),
        dataset_bq_uri=str(dataset_bq_uri),
    )

In [13]:
pipeline_params = {
    "project": PROJECT_ID, 
    "location": BQ_LOCATION,
    "bq_dataset": BQ_DATASET,
    "bq_dataset_table": BQ_DATASET_TABLE
}

In [14]:
@dsl.pipeline(name=DISPLAY_NAME, description="Simple pipeline")
def train_pipeline(
    project: str,
    location: str,
    bq_dataset: str,
    bq_dataset_table: str
):

    # Splits the BQ dataset using a custom component.
    split_datasets_op = split_datasets(project=project, bq_location=location, bq_dataset=bq_dataset, bq_dataset_table=bq_dataset_table)

In [15]:
compiler.Compiler().compile(
    pipeline_func=train_pipeline,
    package_path=PIPELINE_JSON_PKG_PATH,
)

vertex.init(project=PROJECT_ID, location=BQ_LOCATION)

In [16]:
pipeline_job = vertex.PipelineJob(
    display_name=DISPLAY_NAME,
    template_path=PIPELINE_JSON_PKG_PATH,
    pipeline_root=PIPELINE_ROOT,
    job_id="churn-pipeline-{0}".format(TIMESTAMP),
    parameter_values=pipeline_params,
    enable_caching=True
)

response = pipeline_job.submit()

Creating PipelineJob


PermissionDenied: 403 Permission 'aiplatform.pipelineJobs.create' denied on resource '//aiplatform.googleapis.com/projects/tadelle-372416/locations/US-CENTRAL1' (or it may not exist). [reason: "IAM_PERMISSION_DENIED"
domain: "aiplatform.googleapis.com"
metadata {
  key: "permission"
  value: "aiplatform.pipelineJobs.create"
}
metadata {
  key: "resource"
  value: "projects/tadelle-372416/locations/US-CENTRAL1"
}
]