# Vertex pipeline

Requirements: See [notebook 4b](04b_setup_pipeline_resources.ipynb)

This notebook will take the work of defining the BQML PMI matrix factorization and ANN deployment found in the 01 and 02 notebooks, then will create a queryable ScaNN index as seen in notebook 05. If there is a need to convert the keras model, the steps below should explain how to productionize, many of these steps can be explained in this [repo](https://github.com/GoogleCloudPlatform/vertex-ai-samples/blob/master/notebooks/official/pipelines/pipelines_intro_kfp.ipynb)

The goal of this notebook is:
1. Define the tasks that will be encapsulated in pipeline components
2. Define the pipeline
3. Run and monitor the pipeline

In [5]:
# ### Install packages - restart kernel if installed

# %%capture --no-stderr

# !pip3 install kfp google.cloud.aiplatform --upgrade

In [6]:
from datetime import datetime
import os

PROJECT_ID = "rec-ai-demo-326116"  # @param {type:"string"}
REGION = "us-central1"  # @param {type: "string"}
TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")
BUCKET = 'rec_bq_jsw' # Change to the bucket you created.
BUCKET_NAME = f'gs://{BUCKET}'
DATASET_NAME="css_retail"
embeddings_table_name = 'item_embeddings'
output_dir = f'gs://{BUCKET}/bqml/item_embeddings'
temp_location = f'gs://{BUCKET}/dataflow_tmp'
dataflow_job_code = f'gs://{BUCKET}/embeddings_exporter/beam_kfp2.py'
PROJECT_NUMBER = '733956866731'
DEPLOYED_INDEX_ID = "ann_prod50_deployed3"
SA_NAME = "sa-pipeline@rec-ai-demo-326116.iam.gserviceaccount.com"


In [7]:
# Get your GCP project id from gcloud
shell_output = !gcloud auth list 2>/dev/null
SERVICE_ACCOUNT = shell_output[2].strip()
print("Service Account:", SERVICE_ACCOUNT)

Service Account: 733956866731-compute@developer.gserviceaccount.com


### Set service account access for Vertex Pipelines
Run the following commands to grant your service account access to read and write pipeline artifacts in the bucket that you created in the previous step -- you only need to run these once per service account.

In [8]:
! gsutil iam ch serviceAccount:{SERVICE_ACCOUNT}:roles/storage.objectCreator $BUCKET_NAME

! gsutil iam ch serviceAccount:{SERVICE_ACCOUNT}:roles/storage.objectViewer $BUCKET_NAME

No changes made to gs://rec_bq_jsw/
No changes made to gs://rec_bq_jsw/


In [9]:
import google.cloud.aiplatform as aip
# API service endpoint
API_ENDPOINT = "{}-aiplatform.googleapis.com".format(REGION)

PIPELINE_ROOT = "{}/pipeline_root/intro".format(BUCKET_NAME)

In [10]:
from kfp import dsl
from kfp import compiler
from kfp.v2.dsl import component

## Initialize Vertex SDK for Python
Initialize the Vertex SDK for Python for your project and corresponding bucket.

In [11]:
aip.init(project=PROJECT_ID, staging_bucket=BUCKET_NAME)

### Set up Biq Query DDLs for the pipelines
These functions will be leveraged to make bq calls and train the BQML models. Guide found [here](https://medium.com/google-cloud/using-bigquery-and-bigquery-ml-from-kubeflow-pipelines-991a2fa4bea8) - this will be a reusable component to interface with BQ via query strings

### First component: compute the `sp_ComputePMI` stored proc

In [12]:
from typing import NamedTuple, List

In [13]:
@component(output_component_file="bqml_scann_pipeline.yaml", #just an example of how you can create a component artifact
           base_image="python:3.9", 
           packages_to_install=['google-cloud-bigquery==2.18.0']
          )
def run_a_bq_call(
  project: str, query: str) -> NamedTuple('Outputs', [('RESULT', str)]):
    from google.cloud import bigquery
    bq_client = bigquery.Client(project=project)
    j = bq_client.query(query).result()

    return (
    str(j),
    )

#### Send the stored procedures for the models to local storage and load in BQ

In [14]:
#this loads the sprocs found in {BUCKET}/sql_scripts - loaded up in 00_prep_bq...

@component(base_image="python:3.9", packages_to_install=['google-cloud-bigquery', 
                                'google-cloud-storage']
          )
def load_sprocs(
  bucket: str, dataset_name: str, project: str, bucket_name: str) -> NamedTuple('Outputs', [('COMPLETE_CODE', str)]):
    import os
    from google.cloud import storage
    from google.cloud import bigquery
    
    client = bigquery.Client(project=project)
    
    sql_scripts = dict()
    SQL_SCRIPTS_DIR = f'{bucket_name}/sql_scripts'
    BQ_DATASET_NAME = dataset_name

    os.mkdir("downloads_")
    files = ['sp_ExractEmbeddings.sql', 'sp_ComputePMI.sql', 'sp_TrainItemMatchingModel.sql']
    
    storage_client = storage.Client(project)

    bucket = storage_client.bucket(bucket)
    
    for file in files:
        blob = bucket.blob(f"sql_scripts/{file}")

        blob.download_to_filename(f"downloads_/{file}")


    SQL_SCRIPTS_DIR = "downloads_/"

    for script_file in [file for file in os.listdir(SQL_SCRIPTS_DIR) if '.sql' in file]:
        script_file_path = os.path.join(SQL_SCRIPTS_DIR, script_file)
        sql_script = open(script_file_path, 'r').read()
        sql_script = sql_script.replace('@DATASET_NAME', BQ_DATASET_NAME)
        sql_scripts[script_file] = sql_script
    for script_file in sql_scripts:
        print(f'Executing {script_file} script...')
        query = sql_scripts[script_file]
        query_job = client.query(query)
        result = query_job.result()

    return (
    str(result),
    )


### Import and use the Dataflow component - used to extract embeddings from BQ -> GCS

In [15]:
import kfp.components as comp

dataflow_python_op = comp.load_component_from_file('dataflow-launch_python-component.yaml')

Copying file://embeddings_exporter/setup.py [Content-Type=text/x-python]...
Copying file://embeddings_exporter/pipeline_kfp.py [Content-Type=text/x-python]...
Copying file://embeddings_exporter/__init__.py [Content-Type=text/x-python]...  
Copying file://embeddings_exporter/pipeline.py [Content-Type=text/x-python]...  
/ [4 files][  5.5 KiB/  5.5 KiB]                                                
==> NOTE: You are performing a sequence of gsutil operations that may
run significantly faster if you instead use gsutil -m cp ... Please
see the -m section under "gsutil help options" for further information
about when gsutil -m can be advantageous.

Copying file://embeddings_exporter/beam_kfp2.py [Content-Type=text/x-python]...
Copying file://embeddings_exporter/runner.py [Content-Type=text/x-python]...    
Copying file://embeddings_exporter/embedding_exporter.egg-info/SOURCES.txt [Content-Type=text/plain]...
Copying file://embeddings_exporter/embedding_exporter.egg-info/top_level.txt [Con

#### Below are the BQ query strings to load the data and produce the embeddings for the model

In [16]:

create_item_view = f"""
CREATE or REPLACE VIEW `{PROJECT_ID}.{DATASET_NAME}.vw_item_groups`
AS
SELECT 
  userInfo.userID as group_id, 
  pd.id as item_id 
FROM 
  `{PROJECT_ID}.{DATASET_NAME}.purchase_complete`,
  UNNEST(productEventDetail.productDetails) as pd
"""

create_cooc_matrix_query = """
CREATE TABLE IF NOT EXISTS css_retail.item_cooc
AS SELECT 0 AS item1_Id, 0 AS item2_Id, 0 AS cooc, 0 AS pmi;
"""

create_bqml_model_query = f"""
CREATE MODEL IF NOT EXISTS {DATASET_NAME}.item_matching_model
OPTIONS(
    MODEL_TYPE='matrix_factorization', 
    USER_COL='item1_Id', 
    ITEM_COL='item2_Id',
    RATING_COL='score'
)
AS
SELECT 0 AS item1_Id, 0 AS item2_Id, 0 AS score;
"""

compute_PMI_query = f"""
DECLARE min_item_frequency INT64;
DECLARE max_group_size INT64;

SET min_item_frequency = 15;
SET max_group_size = 100;

CALL {DATASET_NAME}.sp_ComputePMI(min_item_frequency, max_group_size);
"""

train_item_matching_query = f"""
DECLARE dimensions INT64 DEFAULT 50;
CALL {DATASET_NAME}.sp_TrainItemMatchingModel(dimensions)
"""

extract_embeddings_query = f"""
CALL {DATASET_NAME}.sp_ExractEmbeddings() 
"""

export_embeddings_query = f"""
CREATE TEMP FUNCTION array_int_to_string(int_array ARRAY<FLOAT64>) 
  RETURNS ARRAY<STRING> LANGUAGE js as "return int_array.map(x => x+'')";
  
EXPORT DATA
OPTIONS (uri='{BUCKET_NAME}/bqml/item_embeddings/*.csv',
  format='CSV',
  overwrite=true) AS
select item_id, array_to_string(array_int_to_string(embedding), ',')  as embedding_string 
from `{DATASET_NAME}.item_embeddings`
"""

#### Create and deploy the index (ScaNN only)

##### Another very likely use case should be the index update for ANN ScaNN index

In [17]:
@component(base_image="python:3.9", 
           packages_to_install=['grpcio-tools', 'google-cloud-storage', 'protobuf', 'google-cloud-aiplatform']
          )
def create_index(
  project_id: str, bucket: str, region: str,
project_number: str, deployed_index_id: str,
sa_name: str) -> NamedTuple('Outputs', [('COMPLETE_CODE', str)]):
    import os
    import time
    import subprocess
    from google.cloud import storage

    import grpc
    from google.cloud import aiplatform_v1beta1
    from google.protobuf import struct_pb2
    
    NETWORK_NAME = 'default'
    PEERING_RANGE_NAME = 'google-reserved-range'
    PROJECT_ID = project_id
    BUCKET = bucket
    
    BUCKET_NAME = f"gs://{BUCKET}/bqml/item_embeddings"  # @param {type:"string"}
    REGION = region  # @param {type:"string"}
    PARENT = "projects/{}/locations/{}".format(PROJECT_ID, REGION)
    ENDPOINT = "{}-aiplatform.googleapis.com".format(REGION)
    DEPLOYED_INDEX_ID = deployed_index_id
    #AUTH_TOKEN = subprocess.check_output("gcloud auth print-access-token", shell=True)
    #AUTH_TOKEN = [str(AUTH_TOKEN2,'utf-8').replace("\n", "")]
    PROJECT_NUMBER = project_number
    
    index_client = aiplatform_v1beta1.IndexServiceClient(
    client_options=dict(api_endpoint=ENDPOINT)
    )
    DIMENSIONS = 50
    DISPLAY_NAME = "retail_demo_matching_engine"

    treeAhConfig = struct_pb2.Struct(
        fields={
            "leafNodeEmbeddingCount": struct_pb2.Value(number_value=500),
            "leafNodesToSearchPercent": struct_pb2.Value(number_value=7),
        }
    )

    algorithmConfig = struct_pb2.Struct(
        fields={"treeAhConfig": struct_pb2.Value(struct_value=treeAhConfig)}
    )

    config = struct_pb2.Struct(
        fields={
            "dimensions": struct_pb2.Value(number_value=DIMENSIONS),
            "approximateNeighborsCount": struct_pb2.Value(number_value=150),
            "distanceMeasureType": struct_pb2.Value(string_value="DOT_PRODUCT_DISTANCE"),
            "algorithmConfig": struct_pb2.Value(struct_value=algorithmConfig),
        }
    )

    metadata = struct_pb2.Struct(
        fields={
            "config": struct_pb2.Value(struct_value=config),
            "contentsDeltaUri": struct_pb2.Value(string_value=BUCKET_NAME), # 'tmp/' | BUCKET_NAME
        }
    )

    ann_index = {
        "display_name": DISPLAY_NAME,
        "description": "Retail 50 Index",
        "metadata": struct_pb2.Value(struct_value=metadata),
    }
    
    ann_index = index_client.create_index(parent=PARENT, index=ann_index)

    # Poll the operation until it's done successfullly.
    # This will take ~50 min.
    
    print("starting index creation")
    
    while not ann_index.done():
        print("Poll the operation to create index...")
        time.sleep(60)
        
    print("index created")    
    #create an indexEndpoint
    index_endpoint_client = aiplatform_v1beta1.IndexEndpointServiceClient(client_options=dict(api_endpoint=ENDPOINT))

    VPC_NETWORK_NAME = "projects/{}/global/networks/{}".format(PROJECT_NUMBER, NETWORK_NAME)
    VPC_NETWORK_NAME
    
    index_endpoint = {
    "display_name": "index_endpoint_for_demo",
    "network": VPC_NETWORK_NAME,}
    
    r = index_endpoint_client.create_index_endpoint(parent=PARENT, index_endpoint=index_endpoint)
    
    INDEX_ENDPOINT_NAME = r.result().name
    INDEX_RESOURCE_NAME = ann_index.result().name
    
    deploy_ann_index = {
    "id": DEPLOYED_INDEX_ID,
    "display_name": DEPLOYED_INDEX_ID,
    "index": INDEX_RESOURCE_NAME,
    }
    
    res = index_endpoint_client.deploy_index(index_endpoint=INDEX_ENDPOINT_NAME, deployed_index=deploy_ann_index)
    print(f"Starting endpoint deployment for{INDEX_ENDPOINT_NAME}")
    
    while not res.done():
        print("Poll the operation to deploy index...")
        time.sleep(60)
    print("success")
    
    return (
    str(200),
    )
    

## Declare the pipeline

In [19]:
from kfp.dsl.types import GCPProjectID, GCSPath
import json


@dsl.pipeline(name="bqml-scann-demo",
             description="a bqml matching engine demo",
             pipeline_root=PIPELINE_ROOT)
def pipeline(project:str = PROJECT_ID,
             temp_location:str = temp_location,
             dataflow_job_code:str  = dataflow_job_code,
             create_item_view:str = create_item_view,
             compute_PMI_query: str = compute_PMI_query,
             train_item_matching_query: str=train_item_matching_query,
             extract_embeddings_query: str=extract_embeddings_query,
             dataset_name: str = DATASET_NAME, 
             embeddings_table_name: str = embeddings_table_name,
             output_dir: str = output_dir, 
             region: str = REGION,
             create_cooc_matrix_query: str = create_cooc_matrix_query,
             create_bqml_model_query: str = create_bqml_model_query,
             bucket_name: str = BUCKET_NAME,
             export_embeddings_query: str = export_embeddings_query,
             bucket: str = BUCKET, project_number: str=PROJECT_NUMBER,
             deployed_index_id: str = DEPLOYED_INDEX_ID,
             sa_name: str = SA_NAME,
             args: List = json.dumps([
                 '--bq_dataset_name', DATASET_NAME, 
                 '--embeddings_table_name',  embeddings_table_name, 
                 '--output_dir', output_dir,
                 '--project_id', PROJECT_ID,
                 '--output', output_dir
             ]),
            
            ):
    
   
    ###### NOTEBOOK 0 TASKS
    item_view_created = run_a_bq_call(project, create_item_view)
    
    
    create_cooc_matrix = run_a_bq_call(project, create_cooc_matrix_query).after(item_view_created)
    
    create_bqml_model = run_a_bq_call(project, create_bqml_model_query).after(create_cooc_matrix)
    
    loaded_sprocs = load_sprocs(bucket_name=bucket_name, bucket = bucket, 
                                dataset_name=dataset_name, project=project).after(create_bqml_model)
    
    ###### NOTEBOOK 1 TASKS
    compute_bq_pmi_task = run_a_bq_call(project, compute_PMI_query).after(loaded_sprocs)
    
    train_bq_item_match_task = run_a_bq_call(project, train_item_matching_query).after(loaded_sprocs)
    ###### NOTEBOOK 2 TASKS
    create_bq_embeddings_task = run_a_bq_call(project, extract_embeddings_query).after(train_bq_item_match_task)
    
    export_emb_gcs_df = dataflow_python_op(project=project,
                                          python_file_path=dataflow_job_code,
                                          location = region,
                                          staging_dir = temp_location, args=args).after(create_bq_embeddings_task)
    
    ##### NOTEBOOK 5 TASKS - SET UP THE INDEX
    created_index = create_index(project_id=project, bucket=bucket, region=region, 
                                 project_number=project_number, deployed_index_id=deployed_index_id,
                                sa_name=sa_name).after(export_emb_gcs_df)
    
    return (
    str(200),
    )


### Now that the reusable components are set in a pipeline, declare the parameters, queries, logic, etc

In [20]:
from kfp.v2 import compiler  # noqa: F811

compiler.Compiler().compile(
    pipeline_func=pipeline, package_path="intro_pipeline.json".replace(" ", "_")
)

#### Execute the pipline on Vertex

In [None]:
from google.cloud.aiplatform import pipeline_jobs

TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")

DISPLAY_NAME = "matching-engine-deployment" + TIMESTAMP

pipeline_job = pipeline_jobs.PipelineJob(
    display_name=DISPLAY_NAME,
    job_id=DISPLAY_NAME,
    template_path="intro_pipeline.json".replace(" ", "_"),
    pipeline_root=PIPELINE_ROOT,
    # parameter_values=pipeline_params,
    enable_caching=True
)

pipeline_job.run()

INFO:google.cloud.aiplatform.pipeline_jobs:Creating PipelineJob
INFO:google.cloud.aiplatform.pipeline_jobs:PipelineJob created. Resource name: projects/733956866731/locations/us-central1/pipelineJobs/matching-engine-deployment20210927210600
INFO:google.cloud.aiplatform.pipeline_jobs:To use this PipelineJob in another session:
INFO:google.cloud.aiplatform.pipeline_jobs:pipeline_job = aiplatform.PipelineJob.get('projects/733956866731/locations/us-central1/pipelineJobs/matching-engine-deployment20210927210600')
INFO:google.cloud.aiplatform.pipeline_jobs:View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/matching-engine-deployment20210927210600?project=733956866731
INFO:google.cloud.aiplatform.pipeline_jobs:PipelineJob projects/733956866731/locations/us-central1/pipelineJobs/matching-engine-deployment20210927210600 current state:
PipelineState.PIPELINE_STATE_RUNNING
INFO:google.cloud.aiplatform.pipeline_jobs:PipelineJob projects/733956866731/

# Results - a deployed, queriable index

[<img src="figures/Screen Shot 2021-09-25 at 11.09.49 PM.png">]