## Orchestrate RecSys workflow with Vertex AI Pipelines

In [27]:
PROJECT_ID = 'hybrid-vertex'  # <--- TODO: CHANGE THIS
LOCATION = 'us-central1'

!gcloud config set project {PROJECT_ID}

Updated property [core/project].


### pip & package

In [None]:
# !pip install google-cloud-aiplatform==1.17.0 --upgrade
# !pip install google-cloud-pipeline-components==1.0.19 --upgrade
# !pip install kfp==1.8.13 --upgrade

In [2]:
! python3 -c "import kfp; print('KFP SDK version: {}'.format(kfp.__version__))"
! python3 -c "import google_cloud_pipeline_components; print('google_cloud_pipeline_components version: {}'.format(google_cloud_pipeline_components.__version__))"
! python3 -c "import google.cloud.aiplatform; print('aiplatform SDK version: {}'.format(google.cloud.aiplatform.__version__))"

KFP SDK version: 1.8.13
google_cloud_pipeline_components version: 1.0.19
aiplatform SDK version: 1.17.0


In [3]:
import json
import os
import time
import pandas as pd
import numpy as np
import sys

# Pipelines
from typing import Any, Callable, Dict, NamedTuple, Optional, List
from google_cloud_pipeline_components import aiplatform as gcc_aip
from google_cloud_pipeline_components.types import artifact_types

# Kubeflow SDK
import kfp
# from kfp.v2 import dsl
# import kfp.v2.dsl
from kfp.v2 import compiler
from kfp.v2.google.client import AIPlatformClient
from kfp.v2.dsl import (Artifact, Dataset, Input, InputPath, Model, Output,
                        OutputPath, component, Metrics)

# GCP
from google.cloud import aiplatform as vertex_ai
from google.cloud.aiplatform import pipeline_jobs
# from google.cloud import bigquery
# from google.cloud import storage

In [20]:
VERSION = 'pipev1'

REPO_DOCKER_PATH_PREFIX = 'src'

PIPE_USER = 'jtott' 
BUCKET = 'jt-tfrs-test'
BUCKET_URI = f'gs://{BUCKET}'

DOCKERNAME_TRAIN = 'Dockerfile.tfrs'

PIPELINE_ROOT = f'gs://{BUCKET}/pipeline_root/{PIPE_USER}'

vertex_ai.init(project=PROJECT_ID,location='us-central1')

## Create Pipeline Components

In [6]:
# Make folder for Python training script
! rm -rf {REPO_DOCKER_PATH_PREFIX}/pipelines
! mkdir {REPO_DOCKER_PATH_PREFIX}/pipelines

### TODO: Build custom train image

In [None]:
!export PWD=pwd
!export DOCKERNAME_TRAIN=DOCKERNAME_TRAIN
!export BUCKET_URI=BUCKET_URI
!export VERSION=VERSION
! echo $PWD
! echo $DOCKERNAME_TRAIN
! echo $BUCKET_URI
! echo $VERSION

/home/jupyter/spotify-tfrs
Dockerfile.tfrs
gs://jt-tfrs-test
pipev1


In [None]:
! echo $REPO_DOCKER_PATH_PREFIX

src


In [29]:
# # # copy training Dockerfile

# !gsutil cp $PWD/src/Dockerfile.tfrs $BUCKET_URI/$VERSION/src/

# # # copy training application code

# !gsutil cp -r $PWD/src/trainer/* $BUCKET_URI/$VERSION/src/trainer/

# # # list copied files from GCS location
# !gsutil ls -Rl $BUCKET_URI/$VERSION/trainer/

# print(f"Copied training application code and Dockerfile to {BUCKET_URI}/{VERSION}/trainer/")

In [30]:
%%writefile {REPO_DOCKER_PATH_PREFIX}/pipelines/build_custom_train_image.py

import kfp
from typing import Any, Callable, Dict, NamedTuple, Optional, List
from kfp.v2.dsl import (Artifact, Dataset, Input, InputPath, Model, Output,
                        OutputPath, component, Metrics)
@component(
    base_image="gcr.io/google.com/cloudsdktool/cloud-sdk:latest",
    packages_to_install=["google-cloud-build"],
    output_component_file="./pipelines/build_custom_train_image.yaml",
)
def build_custom_train_image(
    project: str, 
    gcs_train_script_path: str,                                                 # TRAIN_APP_CODE_PATH = f"{BUCKET_URI}/{APP_NAME}/{VERSION}/vertex_train/"
    training_image_uri: str,                                                   # TRAIN_IMAGE_URI = f"gcr.io/{PROJECT_ID}/multiworker:2tower-pipe-{VERSION}" 
) -> NamedTuple("Outputs", [("training_image_uri", str)]):

    # TODO: make output Artifact for image_uri
    """
    custom pipeline component to build custom training image using
    Cloud Build and the training application code and dependencies
    defined in the Dockerfile
    """

    import logging
    import os

    from google.cloud.devtools import cloudbuild_v1 as cloudbuild
    from google.protobuf.duration_pb2 import Duration

    # initialize client for cloud build
    logging.getLogger().setLevel(logging.INFO)
    build_client = cloudbuild.services.cloud_build.CloudBuildClient()

    # parse step inputs to get path to Dockerfile and training application code
    gs_dockerfile_path = os.path.join(gcs_train_script_path, "Dockerfile")   # two-tower-pipes/2tower-recsys/vertex_train
    _gcs_train_script_path = os.path.join(gcs_train_script_path, "trainer/")  # TRAIN_APP_CODE_PATH = f"{BUCKET_URI}/{APP_NAME}/{VERSION}/vertex_train/"

    logging.info(f"training_image_uri: {training_image_uri}") 

    # define build steps to pull the training code and Dockerfile
    # and build/push the custom training container image
    build = cloudbuild.Build()
    build.steps = [
        {
            "name": "gcr.io/cloud-builders/gsutil",
            "args": ["cp", "-r", _gcs_train_script_path, "."],
        },
        {
            "name": "gcr.io/cloud-builders/gsutil",
            "args": ["cp", gs_dockerfile_path, "Dockerfile"],
        },
        # enabling Kaniko cache in a Docker build that caches intermediate
        # layers and pushes image automatically to Container Registry
        # https://cloud.google.com/build/docs/kaniko-cache
        # {
        #     "name": "gcr.io/kaniko-project/executor:latest",
        #     # "name": "gcr.io/kaniko-project/executor:v1.8.0",        # TODO; downgraded to avoid error in build
        #     # "args": [f"--destination={training_image_uri}", "--cache=true"],
        #     "args": [f"--destination={training_image_uri}", "--cache=false"],
        # },
        {
            "name": "gcr.io/cloud-builders/docker",
            "args": ['build','-t', f'{training_image_uri}', '.'],
        },
        {
            "name": "gcr.io/cloud-builders/docker",
            "args": ['push', f'{training_image_uri}'], 
        },
    ]
    # override default timeout of 10min
    timeout = Duration()
    timeout.seconds = 7200
    build.timeout = timeout

    # create build
    operation = build_client.create_build(project_id=project, build=build)
    logging.info("IN PROGRESS:")
    logging.info(operation.metadata)

    # get build status
    result = operation.result()
    logging.info("RESULT:", result.status)
    
    logging.info(f"training_image_uri: {training_image_uri}")

    # return step outputs
    return (
        training_image_uri,
    )

Writing src/pipelines/build_custom_train_image.py


### Create Managed TensorBoard resource

In [31]:
%%writefile {REPO_DOCKER_PATH_PREFIX}/pipelines/create_tensorboard.py

import kfp
from typing import Any, Callable, Dict, NamedTuple, Optional, List
from kfp.v2.dsl import (Artifact, Dataset, Input, InputPath, Model, Output,
                        OutputPath, component, Metrics)

@kfp.v2.dsl.component(
  base_image='python:3.9',
  packages_to_install=[
                       'google-cloud-aiplatform==1.17.0',
  ],
  output_component_file="./pipelines/create_tensorboard.yaml",
)
def create_tensorboard(
    project: str,
    location: str,
    version: str,
    gcs_bucket_name: str,
    model_display_name: str,
    create_tb_resource: bool,
) -> NamedTuple('Outputs', [
                            ('tensorboard', Artifact),
                            ('tensorboard_resource_name', str),
]):

    import google.cloud.aiplatform as aiplatform
    from datetime import datetime
    import logging

    # TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")

    aiplatform.init(
        project=project,
        location=location,
    )

    TENSORBOARD_DISPLAY_NAME = f"tb-{model_display_name}-{version}"

    if create_tb_resource:
        logging.info(f"TENSORBOARD_DISPLAY_NAME: {TENSORBOARD_DISPLAY_NAME}")

        tensorboard = aiplatform.Tensorboard.create(display_name=TENSORBOARD_DISPLAY_NAME)

        tensorboard_resource_name = tensorboard.resource_name # projects/934903580331/locations/us-central1/tensorboards/6275818857298919424

        logging.info(f"Created tensorboard_resource_name: {tensorboard_resource_name}")

    else:
        logging.info(f"Searching for Existing TB: {TENSORBOARD_DISPLAY_NAME}")

        _tb_resource = aiplatform.TensorboardExperiment.list(
            filter=f'display_name="{TENSORBOARD_DISPLAY_NAME}"'
        )[0]

        # retrieve endpoint uri
        tensorboard_resource_name = _tb_resource.resource_name
        logging.info(f"Found existing TB resource: {tensorboard_resource_name}")

        tensorboard = aiplatform.Tensorboard(f'{tensorboard_resource_name}')

    return (
        tensorboard,
        f'{tensorboard_resource_name}',
    )

Writing src/pipelines/create_tensorboard.py


### Train Custom Two-Tower model

In [None]:
import kfp
from typing import Any, Callable, Dict, NamedTuple, Optional, List
from kfp.v2.dsl import (Artifact, Dataset, Input, InputPath, Model, Output,
                        OutputPath, component, Metrics)
@component(
base_image='python:3.9',
packages_to_install=[
                       'google-cloud-aiplatform==1.17.0',
                       'tensorflow==2.9.2',
                       'tensorflow-recommenders==0.7.0',
                       'numpy',
                       'google-cloud-storage',
  ])
def train_custom_model(
    project: str,
    version: str,
    model_name: str, 
    worker_pool_specs: dict,
    # vocab_dict_uri: str, 
    model_dir: str,
    training_image_uri: str,
    tensorboard_resource_name: str,
    service_account: str,
    experiment_name: str,
    experiment_run: str,
) -> NamedTuple('Outputs', [
    ('query_tower_dir_uri', str),
    ('candidate_tower_dir_uri', str),
    ('candidate_index_dir_uri', str),
]):
    
    from google.cloud import aiplatform as vertex_ai
    import logging
    
    vertex_ai.init(
        project=project,
        location='us-central1',
    )
    
    JOB_NAME = f'train-{model_name}'
    logging.info(f'JOB_NAME: {JOB_NAME}')
    logging.info(f'tensorboard_resource_name: {tensorboard_resource_name}')
    logging.info(f'service_account: {service_account}')
    logging.info(f'worker_pool_specs: {worker_pool_specs}')
  
    job = vertex_ai.CustomJob(
        display_name=job_name,
        worker_pool_specs=worker_pool_specs,
        staging_bucket=base_output_dir,
    )
    
    logging.info(f'Submitting train job to Vertex AI...')

    job.run(
        tensorboard=tensorboard_resource_name,
        service_account=f'{service_account}',
        restart_job_on_worker_restart=False,
        enable_web_access=True,
    )
    
    query_tower_dir_uri = f"gs://{model_dir}/{version}/{experiment_run}/query_tower/" 
    candidate_tower_dir_uri = f"gs://{model_dir}/{version}/{experiment_run}/candidate_tower/"
    candidate_index_dir_uri = f"gs://{model_dir}/{version}/{experiment_run}/candidate-index/"
    
    return (
        f'{query_tower_dir_uri}',
        f'{candidate_tower_dir_uri}',
        f'{candidate_index_dir_uri}'.
    )

## Build Pipeline

In [None]:
# from pipelines import ....

@kfp.v2.dsl.pipeline(
  name=f'{VERSION}-{PIPELINE_TAG}'.replace('_', '-')
)
def pipeline(
    project: str,
    project_number: str,
    location: str,
    version:str,
    pipeline_tag: str,
    train_image_uri: str,
    model_dir: str,
    output_dir_gcs_bucket_name: str,
    create_tb_resource: bool,
    model_name: str,
):

    from kfp.v2.components import importer_node
    from google_cloud_pipeline_components.types import artifact_types
    
    # ========================================================================
    # Build Custom TRain Image
    # ========================================================================
    
    build_custom_train_image_op = (
        build_custom_train_image.build_custom_train_image(
            project=project,
            gcs_train_script_path=gcs_train_script_path,
            training_image_uri=train_image_uri,
        )
        .set_display_name("Build custom train image")
        .set_caching_options(True)
    )
    
    create_tensorboard_op = (
        create_tensorboard.create_tensorboard(
            project=project,
            location=location,
            version=version,
            gcs_bucket_name=output_dir_gcs_bucket_name,
            create_tb_resource=create_tb_resource
        )
        .set_display_name("Tensorboard Instance")
        .set_caching_options(True)
    )
    
    run_train_task_op = (
        train_custom_model.train_custom_model(
            project=project,
            version=version,
            model_name = model_name,
            worker_pool_specs=WORKER_POOL_SPECS, 
            base_output_dir=BASE_OUTPUT_DIR,
            vocab_dict_uri = 'todo', 
            training_image_uri=build_custom_train_image_op.outputs['ccc'],     # TRAIN_IMAGE,
            tensorboard_resource_name=create_tensorboard_op.outputs['tensorboard_resource_name'],
            service_account=service_account,
        )
        .set_display_name("Multiworker Training")
        .set_caching_options(True)  # TODO
        .after(build_custom_train_image_op)
    )
    