# Pipeline deployments for scaling experiments

In [27]:
# !pip install google-cloud-pipeline-components

! python3 -c "import kfp; print('KFP SDK version: {}'.format(kfp.__version__))"
! python3 -c "import google_cloud_pipeline_components; print('google_cloud_pipeline_components version: {}'.format(google_cloud_pipeline_components.__version__))"
! python3 -c "import google.cloud.aiplatform; print('aiplatform SDK version: {}'.format(google.cloud.aiplatform.__version__))"

KFP SDK version: 1.8.13
google_cloud_pipeline_components version: 1.0.17
aiplatform SDK version: 1.18.1


In [28]:
PROJECT_ID = 'hybrid-vertex' 
LOCATION = 'us-central1' 
VERTEX_SA = '934903580331-compute@developer.gserviceaccount.com'

!gcloud config set project {PROJECT_ID}

Updated property [core/project].


In [29]:
import json
import os
import time
import pandas as pd
import matplotlib.pylab as plt
import numpy as np
from pprint import pprint

# Display Images
from IPython.display import clear_output, Image
from IPython.core.display import HTML

from google.cloud import aiplatform as vertex_ai
from google.cloud import storage

# Pipelines
from typing import Any, Callable, Dict, NamedTuple, Optional, List
from google_cloud_pipeline_components import aiplatform as gcc_aip
from google_cloud_pipeline_components.types import artifact_types

# Kubeflow SDK
# TODO: fix these
from kfp.v2 import dsl
import kfp
import kfp.v2.dsl
from kfp.v2.google import client as pipelines_client
from kfp.v2.dsl import (Artifact, Dataset, Input, InputPath, Model, Output,
                        OutputPath, component)


storage_client = storage.Client(project=PROJECT_ID)

vertex_ai.init(project=PROJECT_ID,location=LOCATION)

## setup pipeline repo

In [30]:
#list the current work dir
os.chdir('/home/jupyter/retail-visual-similarity')
os.getcwd()

'/home/jupyter/retail-visual-similarity'

In [31]:
from datetime import datetime
TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")

# Optional: save and load pipeline definition
PIPELINES = {}

VERSION = 'v5'  # component code
RUN = f'run-{TIMESTAMP}'
PIPELINE_ROOT = 'pipelines_root'
BUCKET = 'retail-products-kaggle'
BUCKET_URI = f'gs://{BUCKET}'
DATA_FOLDER = 'data-full'
# DATA_FOLDER = 'dataset' # train_dir subset

PIPELINES_FILEPATH = f'gs://{BUCKET}/{PIPELINE_ROOT}/{VERSION}/{RUN}' # /pipelines.json'
print("PIPELINES_FILEPATH:", PIPELINES_FILEPATH)

if os.path.isfile(PIPELINES_FILEPATH):
    with open(PIPELINES_FILEPATH) as f:
        PIPELINES = json.load(f)
else:
    PIPELINES = {}

def save_pipelines():
    with open(PIPELINES_FILEPATH, 'w') as f:
        json.dump(PIPELINES, f)

PIPELINES_FILEPATH: gs://retail-products-kaggle/pipelines_root/v5/run-20221104003701


In [32]:
REPO_DOCKER_PATH_PREFIX = 'src'
PIPELINES_SUB_DIR = 'pipes'

In [33]:
! rm -rf {REPO_DOCKER_PATH_PREFIX}
! mkdir {REPO_DOCKER_PATH_PREFIX}

# ! rm -rf {REPO_DOCKER_PATH_PREFIX}/{PIPELINE_SUB_DIR}
! mkdir {REPO_DOCKER_PATH_PREFIX}/{PIPELINES_SUB_DIR}

# Pipe Components

## Generate candidate embeddings

In [34]:
%%writefile {REPO_DOCKER_PATH_PREFIX}/{PIPELINES_SUB_DIR}/generate_candidates.py

import kfp
from typing import Any, Callable, Dict, NamedTuple, Optional, List
from kfp.v2.dsl import (Artifact, Dataset, Input, InputPath, Model, Output,
                        OutputPath, component, Metrics)
@kfp.v2.dsl.component(
    base_image="python:3.9",
    packages_to_install=[
        'pandas==1.3.5',
        'gcsfs',
        'fsspec',
        'google-cloud-aiplatform==1.18.1',
        'google-cloud-storage',
        'tensorflow==2.8',
        'tensorflow-hub==0.12.0',
        # 'tensorflow-estimator==2.8.0',
        # 'keras==2.8.0'
    ],
)
def generate_candidates(
    project: str,
    # run: str,
    location: str,
    version: str,
    bucket: str, 
    dest_folder: str,
    images_gcs_uri: str,
    emb_index_gcs_uri: str,
    index_json_name: str,
) -> NamedTuple('Outputs', [
    ('embedding_index_file_uri', str),
    ('embedding_index_gcs_dir', str),
    # ('saved_pretrained_model_gcs_location', str),
]):
    import os
    import os.path
    import time
    import logging
    import pandas as pd
    
    from google.cloud import aiplatform as vertex_ai
    from google.cloud import storage
    from google.cloud.storage.bucket import Bucket
    from google.cloud.storage.blob import Blob
    
    
    from datetime import datetime
    import tensorflow as tf
    import tensorflow_hub as hub
    
    os.environ['TFHUB_MODEL_LOAD_FORMAT'] = 'COMPRESSED'
    TF_HUB_MODEL_DIR = "https://tfhub.dev/google/tf2-preview/mobilenet_v2/feature_vector/4"

    IMG_HEIGHT = 224
    IMG_WIDTH = 224
    IMG_CHANNELS = 3
    
    FILTER_INDEX_DIR = f'gs://{bucket}/indexes/{version}'
    
    IMG_PATH = f'gs://{bucket}/{dest_folder}/train/train'
    logging.info(f'IMG_PATH: {IMG_PATH}')
    LIST_DIR = tf.io.gfile.listdir(IMG_PATH)
    logging.info(f'Length of LIST_DIR: {len(LIST_DIR)}')
    
    CSV_URI = f'gs://{bucket}/{dest_folder}/train.csv'
    train_csv = pd.read_csv(CSV_URI)
    logging.info(f'CSV_URI: {CSV_URI}')
    
    vertex_ai.init(
        project=project,
        location=location,
    )
    
    # ==============================================================================
    # Define helper functions
    # ==============================================================================
    def _upload_blob_gcs(gcs_uri, source_file_name, destination_blob_name):
        """Uploads a file to GCS bucket"""
        client = storage.Client(project=project)
        blob = Blob.from_string(os.path.join(gcs_uri, destination_blob_name))
        blob.bucket._client = client
        blob.upload_from_filename(source_file_name)
    
    def read_and_decode(filename, reshape_dims=[IMG_HEIGHT, IMG_WIDTH]):
        # Read the file
        img = tf.io.read_file(filename)

        # Convert the compressed string to a 3D uint8 tensor.
        img = tf.image.decode_jpeg(img, channels=IMG_CHANNELS)

        # Use `convert_image_dtype` to convert to floats in the [0,1] range.
        # This makes the img 1 x 224 x 224 x 3 tensor with the data type of float32
        img = tf.image.convert_image_dtype(img, tf.float32)[tf.newaxis, ...]

        # Resize the image to the desired size.
        return tf.image.resize(img, reshape_dims)
    
    def create_embeddings_dataset(embedder, img_path):
    
        dataset_embeddings = []
        ids_list = []
        id_cat_list = []

        start = time.time()

        list_dir = tf.io.gfile.listdir(img_path)
        for file in list_dir:
            img_tensor = read_and_decode(img_path + "/" + file, [IMG_WIDTH, IMG_HEIGHT])
            embeddings = embedder(img_tensor)

            IMAGE_ID = file.split(".")[0]
            CAT = train_csv.loc[train_csv['ImgId'] == IMAGE_ID, 'categories'].item()

            dataset_embeddings.extend(embeddings)
            ids_list.append(IMAGE_ID)
            id_cat_list.append(CAT)

        dataset_embeddings = tf.convert_to_tensor(dataset_embeddings)

        end = time.time()
        elapsed_time = round((end - start), 2)
        logging.info(f'Elapsed time writting embeddings: {elapsed_time} seconds\n')

        # return dataset_filenames, dataset_embeddings, ids_list, id_cat_list
        return dataset_embeddings, ids_list, id_cat_list
    
    # ==============================================================================
    # Download TF Hub model
    # ==============================================================================
    layers = [
        hub.KerasLayer(
            f"{TF_HUB_MODEL_DIR}",
            input_shape=(IMG_HEIGHT, IMG_WIDTH, IMG_CHANNELS),
            trainable=False,
            name='mobilenet_embedding'),
        tf.keras.layers.Flatten()
    ]
    model = tf.keras.Sequential(
        layers, name='visual_embedding'
    )
    
    # ==============================================================================
    # create embedding vectors
    # ==============================================================================
    dataset_embeddings, dataset_ids, dataset_id_cats = create_embeddings_dataset(
        lambda x: model.predict(x),
        IMG_PATH,
    )
    
    logging.info(f"Shape of embeddings dataset: {dataset_embeddings.shape}\n")
    logging.info(f"dataset_ids: {dataset_ids[:3]}\n")
    logging.info(f"dataset_id_cats: {dataset_id_cats[:3]}\n")
    
    cleaned_embs = [x.numpy() for x in dataset_embeddings] #clean up the output
    cleaned_id_cats = [x for x in dataset_id_cats] #clean up the output
    cleaned_ids = [x for x in dataset_ids] #clean up the output
    
    # ==============================================================================
    # write json index
    # ==============================================================================
    
    with open(f"{index_json_name}", "w") as f:
        counter = 0 
        for img_id, vector, img_cat in zip(cleaned_ids, cleaned_embs, cleaned_id_cats):
            f.write('{"id":"' + str(img_id) + '",')
            f.write('"embedding":[' + ",".join(str(x).strip() for x in list(vector)) + '],')
            f.write('"restricts":[{"namespace":"category","allow":["' + str(img_cat) + '"]},') #,
            f.write('{"namespace":"tag_1","allow":['+ ('"even"' if counter % 2 == 0 else '"odd"') + ']}]}\n')
            counter+=1
        f.close()
        
        
    _upload_blob_gcs(emb_index_gcs_uri, f"{index_json_name}", f"{index_json_name}")
    
    # embedding_index_file_uri = f'{FILTER_INDEX_DIR}/{index_json_name}'
    embedding_index_file_uri = f'{emb_index_gcs_uri}/{index_json_name}'
    logging.info("embedding_index_file_uri:", embedding_index_file_uri)

    return(
        f'{embedding_index_file_uri}',
        f'{emb_index_gcs_uri}', # 'gs://{BUCKET}/indexes/{VERSION}'
      # f'{save_path}',
    )

Writing src/pipes/generate_candidates.py


## Create ME Index

In [35]:
%%writefile {REPO_DOCKER_PATH_PREFIX}/{PIPELINES_SUB_DIR}/create_ann_index.py

import kfp
from typing import Any, Callable, Dict, NamedTuple, Optional, List
from kfp.v2.dsl import (Artifact, Dataset, Input, InputPath, Model, Output,
                        OutputPath, component, Metrics)

@kfp.v2.dsl.component(
    base_image="python:3.9",
    packages_to_install=[
        'google-cloud-aiplatform==1.18.1',
        # 'google-cloud-storage',
    ],
)
def create_ann_index(
    project: str,
    location: str,
    version: str, 
    staging_bucket: str,
    vpc_network_name: str,
    emb_index_gcs_uri: str,
    dimensions: int,
    ann_index_display_name: str,
    approximate_neighbors_count: int,
    distance_measure_type: str,
    leaf_node_embedding_count: int,
    leaf_nodes_to_search_percent: int, 
    ann_index_description: str,
    ann_index_labels: Dict, 
) -> NamedTuple('Outputs', [
    ('ann_index_resource_uri', str),
    ('ann_index', Artifact),
]):
    import logging
    from google.cloud import aiplatform as vertex_ai
    from datetime import datetime
    import time

    vertex_ai.init(
        project=project,
        location=location,
    )
    
    TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")
    
    ENDPOINT = "{}-aiplatform.googleapis.com".format(location)
    NETWORK_NAME = vpc_network_name
    INDEX_DIR_GCS = emb_index_gcs_uri
    PARENT = "projects/{}/locations/{}".format(project, location)

    logging.info("ENDPOINT: {}".format(ENDPOINT))
    logging.info("PROJECT_ID: {}".format(project))
    logging.info("REGION: {}".format(location))
    
    # ==============================================================================
    # Create Index 
    # ==============================================================================

    start = time.time()
        
    tree_ah_index = vertex_ai.MatchingEngineIndex.create_tree_ah_index(
        display_name=f'{ann_index_display_name}-{TIMESTAMP}',
        contents_delta_uri=f'{emb_index_gcs_uri}/', # emb_index_gcs_uri,
        dimensions=dimensions,
        approximate_neighbors_count=approximate_neighbors_count,
        distance_measure_type=distance_measure_type,
        leaf_node_embedding_count=leaf_node_embedding_count,
        leaf_nodes_to_search_percent=leaf_nodes_to_search_percent,
        description=ann_index_description,
        labels=ann_index_labels,
        # sync=True,
    )

    end = time.time()
    elapsed_time = round((end - start), 2)
    logging.info(f'Elapsed time creating index: {elapsed_time} seconds\n')
    
    ann_index_resource_uri = tree_ah_index.resource_name
    logging.info("ann_index_resource_uri:", ann_index_resource_uri) 

    return (
      f'{ann_index_resource_uri}',
      tree_ah_index,
    )

Writing src/pipes/create_ann_index.py


## Create Brute Force Index

In [36]:
%%writefile {REPO_DOCKER_PATH_PREFIX}/{PIPELINES_SUB_DIR}/create_brute_force_index.py

import kfp
from typing import Any, Callable, Dict, NamedTuple, Optional, List
from kfp.v2.dsl import (Artifact, Dataset, Input, InputPath, Model, Output,
                        OutputPath, component, Metrics)

@kfp.v2.dsl.component(
    base_image="python:3.9",
    packages_to_install=[
        'google-cloud-aiplatform==1.18.1',
        # 'google-cloud-storage',
    ],
)
def create_brute_force_index(
    project: str,
    location: str,
    version: str,
    staging_bucket: str,
    vpc_network_name: str,
    emb_index_gcs_uri: str,
    dimensions: int,
    brute_force_index_display_name: str,
    approximate_neighbors_count: int,
    distance_measure_type: str,
    brute_force_index_description: str,
    brute_force_index_labels: Dict,
) -> NamedTuple('Outputs', [
    ('brute_force_index_resource_uri', str),
    ('brute_force_index', Artifact),
]):

    import logging
    from google.cloud import aiplatform as vertex_ai
    from datetime import datetime
    import time

    vertex_ai.init(
        project=project,
        location=location,
    )
    
    TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")
    
    ENDPOINT = "{}-aiplatform.googleapis.com".format(location)
    NETWORK_NAME = vpc_network_name
    INDEX_DIR_GCS = emb_index_gcs_uri
    PARENT = "projects/{}/locations/{}".format(project, location)

    logging.info("ENDPOINT: {}".format(ENDPOINT))
    logging.info("PROJECT_ID: {}".format(project))
    logging.info("REGION: {}".format(location))
    
    # ==============================================================================
    # Create Index 
    # ==============================================================================

    start = time.time()
    
    brute_force_index = vertex_ai.MatchingEngineIndex.create_brute_force_index(
        display_name=f'{brute_force_index_display_name}-{TIMESTAMP}',
        contents_delta_uri=f'{emb_index_gcs_uri}/', # emb_index_gcs_uri,
        dimensions=dimensions,
        # approximate_neighbors_count=approximate_neighbors_count,
        distance_measure_type=distance_measure_type,
        description=brute_force_index_description,
        labels=brute_force_index_labels,
        # sync=True,
    )
    brute_force_index_resource_uri = brute_force_index.resource_name
    print("brute_force_index_resource_uri:",brute_force_index_resource_uri) 

    return (
      f'{brute_force_index_resource_uri}',
      brute_force_index,
    )

Writing src/pipes/create_brute_force_index.py


## Create IndexEndpoint with VPC

### Create ANN Index Endpoint

In [37]:
%%writefile {REPO_DOCKER_PATH_PREFIX}/{PIPELINES_SUB_DIR}/create_ann_index_endpoint_vpc.py

import kfp
from typing import Any, Callable, Dict, NamedTuple, Optional, List
from kfp.v2.dsl import (Artifact, Dataset, Input, InputPath, Model, Output,
                        OutputPath, component, Metrics)

@kfp.v2.dsl.component(
    base_image="python:3.9",
    packages_to_install=[
        'google-cloud-aiplatform==1.18.1',
        # 'google-cloud-storage',
    ],
)
def create_ann_index_endpoint_vpc(
    ann_index_artifact: Input[Artifact],
    project: str,
    project_number: str,
    location: str,
    version: str,
    vpc_network_name: str,
    ann_index_endpoint_display_name: str,
    ann_index_endpoint_description: str,
) -> NamedTuple('Outputs', [
    ('vpc_network_resource_uri', str),
    ('ann_index_endpoint_resource_uri', str),
    ('ann_index_endpoint', Artifact),
    ('ann_index_endpoint_display_name', str),
]):

    import logging
    from google.cloud import aiplatform as vertex_ai
    from datetime import datetime
    import time

    vertex_ai.init(
        project=project,
        location=location,
    )
    
    TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")

    vpc_network_resource_uri = f'projects/{project_number}/global/networks/{vpc_network_name}'
    logging.info(f"vpc_network_resource_uri: {vpc_network_resource_uri}")

    ann_index_endpoint = vertex_ai.MatchingEngineIndexEndpoint.create(
        display_name=f'{ann_index_endpoint_display_name}',
        description=ann_index_endpoint_description,
        network=vpc_network_resource_uri,
    )
    ann_index_endpoint_resource_uri = ann_index_endpoint.resource_name
    logging.info(f"ann_index_endpoint_resource_uri: {ann_index_endpoint_resource_uri}")

    return (
        f'{vpc_network_resource_uri}',
        f'{ann_index_endpoint_resource_uri}',
        ann_index_endpoint,
        f'{ann_index_endpoint_display_name}'
    )

Writing src/pipes/create_ann_index_endpoint_vpc.py


### Create BF Index Endpoint

In [38]:
%%writefile {REPO_DOCKER_PATH_PREFIX}/{PIPELINES_SUB_DIR}/create_brute_index_endpoint_vpc.py

import kfp
from typing import Any, Callable, Dict, NamedTuple, Optional, List
from kfp.v2.dsl import (Artifact, Dataset, Input, InputPath, Model, Output,
                        OutputPath, component, Metrics)

@kfp.v2.dsl.component(
    base_image="python:3.9",
    packages_to_install=[
        'google-cloud-aiplatform==1.18.1',
        # 'google-cloud-storage',
    ],
)
def create_brute_index_endpoint_vpc(
    bf_index_artifact: Input[Artifact],
    project: str,
    project_number: str,
    location: str,
    version: str,
    vpc_network_name: str,
    brute_index_endpoint_display_name: str,
    brute_index_endpoint_description: str,
) -> NamedTuple('Outputs', [
    ('vpc_network_resource_uri', str),
    ('brute_index_endpoint_resource_uri', str),
    ('brute_index_endpoint', Artifact),
    ('brute_index_endpoint_display_name', str),
]):

    import logging
    from google.cloud import aiplatform as vertex_ai
    from datetime import datetime
    import time

    vertex_ai.init(
        project=project,
        location=location,
    )
    
    TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")

    vpc_network_resource_uri = f'projects/{project_number}/global/networks/{vpc_network_name}'
    logging.info(f"vpc_network_resource_uri: {vpc_network_resource_uri}")

    brute_index_endpoint = vertex_ai.MatchingEngineIndexEndpoint.create(
        display_name=f'{brute_index_endpoint_display_name}',
        description=brute_index_endpoint_description,
        network=vpc_network_resource_uri,
    )
    brute_index_endpoint_resource_uri = brute_index_endpoint.resource_name
    logging.info(f"brute_index_endpoint_resource_uri: {brute_index_endpoint_resource_uri}")

    return (
      f'{vpc_network_resource_uri}',
      f'{brute_index_endpoint_resource_uri}',
      brute_index_endpoint,
      f'{brute_index_endpoint_display_name}'
    )

Writing src/pipes/create_brute_index_endpoint_vpc.py


## Deploy Indexes

### Deploy ANN Index

In [39]:
%%writefile {REPO_DOCKER_PATH_PREFIX}/{PIPELINES_SUB_DIR}/deploy_ann_index.py

import kfp
from typing import Any, Callable, Dict, NamedTuple, Optional, List
from kfp.v2.dsl import (Artifact, Dataset, Input, InputPath, Model, Output,
                        OutputPath, component, Metrics)

@kfp.v2.dsl.component(
    base_image="python:3.9",
    packages_to_install=[
        'google-cloud-aiplatform==1.18.1'
    ]
)
def deploy_ann_index(
    project: str,
    location: str,
    version: str,
    staging_bucket: str,
    deployed_ann_index_name: str,
    ann_index_resource_uri: str,
    index_endpoint_resource_uri: str,
) -> NamedTuple('Outputs', [
    ('index_endpoint_resource_uri', str),
    ('ann_index_resource_uri', str),
    ('deployed_ann_index_name', str),
    ('deployed_ann_index', Artifact),
]):
  
    import logging
    from google.cloud import aiplatform as vertex_ai
    from datetime import datetime
    import time

    vertex_ai.init(
        project=project,
        location=location,
    )
    TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")
    VERSION = version
    
    ann_index = vertex_ai.MatchingEngineIndex(
      index_name=ann_index_resource_uri
    )
    ann_index_resource_uri = ann_index.resource_name

    index_endpoint = vertex_ai.MatchingEngineIndexEndpoint(
      index_endpoint_resource_uri
    )

    index_endpoint = index_endpoint.deploy_index(
      index=ann_index, 
      deployed_index_id=f'{deployed_ann_index_name}' #-{TIMESTAMP}'
    )

    print(index_endpoint.deployed_indexes)

    return (
      f'{index_endpoint_resource_uri}',
      f'{ann_index_resource_uri}',
      f'{deployed_ann_index_name}',
      ann_index,
    )

Writing src/pipes/deploy_ann_index.py


### Deploy BF Index

In [40]:
%%writefile {REPO_DOCKER_PATH_PREFIX}/{PIPELINES_SUB_DIR}/deploy_brute_index.py

import kfp
from typing import Any, Callable, Dict, NamedTuple, Optional, List
from kfp.v2.dsl import (Artifact, Dataset, Input, InputPath, Model, Output,
                        OutputPath, component, Metrics)

@kfp.v2.dsl.component(
    base_image="python:3.9",
    packages_to_install=[
        'google-cloud-aiplatform==1.18.1',
        # 'google-cloud-storage',
    ],
)
def deploy_brute_index(
    project: str,
    location: str,
    version: str,
    staging_bucket: str,
    deployed_brute_force_index_name: str,
    brute_force_index_resource_uri: str,
    index_endpoint_resource_uri: str,
) -> NamedTuple('Outputs', [
    ('index_endpoint_resource_uri', str),
    ('brute_force_index_resource_uri', str),
    ('deployed_brute_force_index_name', str),
    ('deployed_brute_force_index', Artifact),
]):
  
    import logging
    from google.cloud import aiplatform as vertex_ai
    from datetime import datetime
    import time

    vertex_ai.init(
        project=project,
        location=location,
    )
    TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")

    brute_index = vertex_ai.MatchingEngineIndex(
        index_name=brute_force_index_resource_uri
    )
    brute_force_index_resource_uri = brute_index.resource_name

    index_endpoint = vertex_ai.MatchingEngineIndexEndpoint(index_endpoint_resource_uri)

    index_endpoint = index_endpoint.deploy_index(
        index=brute_index, 
        deployed_index_id=f'{deployed_brute_force_index_name}', #-{TIMESTAMP}'
    )

    logging.info(index_endpoint.deployed_indexes)

    return (
      f'{index_endpoint_resource_uri}',
      f'{brute_force_index_resource_uri}',
      f'{deployed_brute_force_index_name}', #-{TIMESTAMP}',
      brute_index,
    )

Writing src/pipes/deploy_brute_index.py


# Build & Compile Pipeline

## Pipe Configs

In [41]:
PIPELINE_VERSION = 'v1' # pipeline code
PIPELINE_TAG = f'retail-visual-similarity-{PIPELINE_VERSION}'
print("PIPELINE_TAG:", PIPELINE_TAG)

PIPELINE_TAG: retail-visual-similarity-v1


In [42]:
from src.pipes import generate_candidates, create_ann_index, \
                      create_brute_force_index, create_ann_index_endpoint_vpc, create_brute_index_endpoint_vpc, \
                      deploy_ann_index, deploy_brute_index
@kfp.v2.dsl.pipeline(
    name=f'{VERSION}-{PIPELINE_TAG}'.replace('_', '-')
)
def pipeline(
    project: str,
    project_number: str,
    location: str,
    version: str,
    # run: str,
    staging_bucket: str,
    bucket: str,
    vpc_network_name: str,
    images_gcs_uri: str,
    emb_index_gcs_uri: str,
    dimensions: int,
    ann_index_display_name: str,
    approximate_neighbors_count: int,
    distance_measure_type: str,
    leaf_node_embedding_count: int,
    leaf_nodes_to_search_percent: int, 
    ann_index_description: str,
    ann_index_labels: Dict,
    brute_force_index_display_name: str,
    brute_force_index_description: str,
    brute_force_index_labels: Dict,
    ann_index_endpoint_display_name: str,
    ann_index_endpoint_description: str,
    brute_index_endpoint_display_name: str,
    brute_index_endpoint_description: str,
    deployed_ann_index_name: str,
    deployed_brute_force_index_name: str,
    dest_folder: str,
    index_json_name: str,
):
    
    from google_cloud_pipeline_components.types import artifact_types
    # ========================================================================
    # TODO: data prep
    # ========================================================================
    
    # TODO
    
    # ========================================================================
    # generate embeddings
    # ========================================================================
    generate_candidates_op = (
        generate_candidates.generate_candidates(
            project=project,
            location=location,
            version=version,
            # run=run,
            bucket=bucket,
            images_gcs_uri=images_gcs_uri,
            dest_folder=dest_folder,
            emb_index_gcs_uri=emb_index_gcs_uri,
            index_json_name=index_json_name,
        )
        .set_display_name("Generate Catalog Embeddings")
        .set_caching_options(True)
    )

    create_ann_index_op = (
        create_ann_index.create_ann_index(
            project=project,
            location=location,
            version=version,
            staging_bucket=staging_bucket,
            vpc_network_name=vpc_network_name,
            emb_index_gcs_uri=generate_candidates_op.outputs['embedding_index_gcs_dir'],
            dimensions=dimensions,
            ann_index_display_name=ann_index_display_name,
            approximate_neighbors_count=approximate_neighbors_count,
            distance_measure_type=distance_measure_type,
            leaf_node_embedding_count=leaf_node_embedding_count,
            leaf_nodes_to_search_percent=leaf_nodes_to_search_percent, 
            ann_index_description=ann_index_description,
            ann_index_labels=ann_index_labels,
        )
        .set_display_name("Create ANN Index")
        # .after(generate_candidates_op)
        .set_caching_options(True)
    )

    create_brute_force_index_op = (
        create_brute_force_index.create_brute_force_index(
            project=project,
            location=location,
            version=version,
            staging_bucket=staging_bucket,
            vpc_network_name=vpc_network_name,
            emb_index_gcs_uri=generate_candidates_op.outputs['embedding_index_gcs_dir'],
            dimensions=dimensions,
            brute_force_index_display_name=brute_force_index_display_name,
            approximate_neighbors_count=approximate_neighbors_count,
            distance_measure_type=distance_measure_type,
            brute_force_index_description=brute_force_index_description,
            brute_force_index_labels=brute_force_index_labels,
        )
        .set_display_name("Create BF Index")
        # .after(generate_candidates_op)
        .set_caching_options(True)
    )

    # ========================================================================
    # Create Index Endpoint
    # ========================================================================

    
    create_ann_index_endpoint_vpc_op = (
        create_ann_index_endpoint_vpc.create_ann_index_endpoint_vpc(
            ann_index_artifact=create_ann_index_op.outputs['ann_index'],
            project=project,
            project_number=project_number,
            version=version,
            location=location,
            vpc_network_name=vpc_network_name,
            ann_index_endpoint_display_name=ann_index_endpoint_display_name,
            ann_index_endpoint_description=ann_index_endpoint_description,
        )
        .set_display_name("Create ANN Index Endpoint")
        .after(generate_candidates_op)
    )
        
    create_brute_index_endpoint_vpc_op = (
        create_brute_index_endpoint_vpc.create_brute_index_endpoint_vpc(
            bf_index_artifact=create_brute_force_index_op.outputs['brute_force_index'],
            project=project,
            project_number=project_number,
            version=version,
            location=location,
            vpc_network_name=vpc_network_name,
            brute_index_endpoint_display_name=brute_index_endpoint_display_name,
            brute_index_endpoint_description=brute_force_index_description,
        )
        .set_display_name("Create BF Index Endpoint")
        .after(generate_candidates_op)
    )

    # ========================================================================
    # Deploy Indexes
    # ========================================================================

    deploy_ann_index_op = (
        deploy_ann_index.deploy_ann_index(
            project=project,
            location=location,
            version=version,
            staging_bucket=staging_bucket,
            # vpc_network_resource_uri=create_ann_index_endpoint_vpc_op.outputs['vpc_network_resource_uri'],
            deployed_ann_index_name=deployed_ann_index_name,
            ann_index_resource_uri=create_ann_index_op.outputs['ann_index_resource_uri'],
            index_endpoint_resource_uri=create_ann_index_endpoint_vpc_op.outputs['ann_index_endpoint_resource_uri'],
        )
        .set_display_name("Deploy ANN Index")
        .set_caching_options(True)
    )

    deploy_brute_index_op = (
        deploy_brute_index.deploy_brute_index(
            project=project,
            location=location,
            version=version,
            staging_bucket=staging_bucket,
            # vpc_network_resource_uri=create_brute_index_endpoint_vpc_op.outputs['vpc_network_resource_uri'],
            deployed_brute_force_index_name=deployed_brute_force_index_name,
            brute_force_index_resource_uri=create_brute_force_index_op.outputs['brute_force_index_resource_uri'],
            index_endpoint_resource_uri=create_brute_index_endpoint_vpc_op.outputs['brute_index_endpoint_resource_uri'],
        )
        .set_display_name("Deploy BF Index")
        .set_caching_options(True)
    )

## Compile pipe

In [43]:
PIPELINES_FILEPATH

'gs://retail-products-kaggle/pipelines_root/v5/run-20221104003701'

In [44]:
kfp.v2.compiler.Compiler().compile(
  pipeline_func=pipeline, 
  package_path='custom_container_pipeline_spec.json', #PIPELINES_FILEPATH, # PIPELINE_JSON_SPEC_PATH # 'custom_container_pipeline_spec.json',
)

!gsutil cp custom_container_pipeline_spec.json $PIPELINES_FILEPATH/pipeline_spec.json

Copying file://custom_container_pipeline_spec.json [Content-Type=application/json]...
/ [1 files][ 48.2 KiB/ 48.2 KiB]                                                
Operation completed over 1 objects/48.2 KiB.                                     


### pipe args

In [45]:
# !gsutil du gs://retail-products-kaggle/dataset/train/train | wc -l # 16551
# !gsutil du gs://retail-products-kaggle/data-full/train/train | wc -l # 42001
# !gsutil du gs://retail-products-kaggle/dataset/test/test | wc -l # 6368
# !gsutil du gs://retail-products-kaggle/data-full/test/test | wc -l # 6368

In [46]:
from datetime import datetime
TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")

PREFIX = 'test-filters'
PROJECT_ID = 'hybrid-vertex'
project_number='934903580331'
LOCATION = 'us-central1'

# if not declared
PIPE_USER = 'jtott'
# BUCKET = 'retail-products-kaggle'
# BUCKET_URI = f'gs://{BUCKET}'
# DATA_FOLDER = 'data-full'

staging_bucket=f'gs://{BUCKET}/staging'

emb_index_gcs_uri = f'gs://{BUCKET}/indexes/{VERSION}'
vpc_network_name='ucaip-haystack-vpc-network'
images_gcs_uri=f'gs://{BUCKET}/{DATA_FOLDER}/train/train'
index_json_name = "retail_kaggle_embeddings.json"


# Indexes
DIMENSIONS = 1280
approximate_neighbors_count=50
distance_measure_type="DOT_PRODUCT_DISTANCE"
leaf_node_embedding_count=500
leaf_nodes_to_search_percent=7

ann_index_display_name = f'ann_{DIMENSIONS}_index_{VERSION}'
brute_force_index_display_name = f'brute_force_{DIMENSIONS}_index_{VERSION}'

ann_index_description=f'Kaggle Retail Product MobileNet_v2 ANN index {VERSION}-{PIPELINE_VERSION}'
brute_force_index_description=f"Kaggle Retail Product MobileNet_v2 (brute force)-{VERSION}-{PIPELINE_VERSION}"


ann_index_labels={'version': f'{VERSION}',
                  'pipeline_version': f'{PIPELINE_VERSION}',}

brute_force_index_labels={'version': f'{VERSION}',
                          'pipeline_version': f'{PIPELINE_VERSION}',}

index_endpoint_display_name=f'index_endpoint_{VERSION}'
index_endpoint_description="index endpoint description"


ann_index_endpoint_description = f'ann {index_endpoint_description}'
brute_index_endpoint_description = f'bf {index_endpoint_description}'

ann_index_endpoint_display_name = f'ann {index_endpoint_display_name}'
brute_index_endpoint_display_name = f'bf {index_endpoint_display_name}'



deployed_ann_index_name=f'ann_{DIMENSIONS}_deployed_index_{VERSION}'
deployed_brute_force_index_name=f'brute_force_{DIMENSIONS}_deployed_index_{VERSION}'

### Submit pipe

In [47]:
overwrite = True
# overwrite = False

from kfp.v2.google.client import AIPlatformClient

pipeline_client = AIPlatformClient(
  project_id=PROJECT_ID,
  region=LOCATION,
)

if not PIPELINES.get('train') or overwrite:
    response = pipeline_client.create_run_from_job_spec(
        job_spec_path='custom_container_pipeline_spec.json',
        network=f'projects/{project_number}/global/networks/{vpc_network_name}', # set to same VPC as index
        # service_account=SERVICE_ACCOUNT, # <--- TODO: Uncomment if needed
        parameter_values={
            'project': PROJECT_ID,
            'project_number': project_number,
            'location': LOCATION,
            'version': VERSION,
            # 'run': RUN,
            'staging_bucket': staging_bucket,
            'vpc_network_name': vpc_network_name,
            'images_gcs_uri': images_gcs_uri,
            'emb_index_gcs_uri': emb_index_gcs_uri,
            'bucket': BUCKET,
            'dest_folder': DATA_FOLDER,
            'dimensions': DIMENSIONS,
            'ann_index_display_name': ann_index_display_name,
            'approximate_neighbors_count': approximate_neighbors_count,
            'distance_measure_type': distance_measure_type,
            'leaf_node_embedding_count': leaf_node_embedding_count,
            'leaf_nodes_to_search_percent': leaf_nodes_to_search_percent, 
            'ann_index_description': ann_index_description,
            'ann_index_labels': ann_index_labels,
            'brute_force_index_display_name': brute_force_index_display_name,
            'brute_force_index_description': brute_force_index_description,
            'brute_force_index_labels': brute_force_index_labels,
            'ann_index_endpoint_description': ann_index_endpoint_description,
            'brute_index_endpoint_description': brute_index_endpoint_description,
            # 'index_endpoint_display_name': index_endpoint_display_name,
            # 'index_endpoint_description': index_endpoint_description,
            'deployed_ann_index_name': deployed_ann_index_name,
            'deployed_brute_force_index_name': deployed_brute_force_index_name,
            'ann_index_endpoint_display_name': ann_index_endpoint_display_name,
            'brute_index_endpoint_display_name': brute_index_endpoint_display_name,
            # 'test_imgs_gcs_dir': test_imgs_gcs_dir,
            # 'model_endpoint_name': model_endpoint_name,
            # 'model_display_name': model_display_name,
            # 'serving_container_image_uri': serving_container_image_uri,
            # 'traffic_percentage': traffic_percentage,
            # 'serving_machine_type': serving_machine_type,
            # 'serving_min_replica_count': serving_min_replica_count,
            # 'serving_max_replica_count': serving_max_replica_count,
            'index_json_name':index_json_name,
        },
        pipeline_root=f'{PIPELINES_FILEPATH}',
    )
    PIPELINES['train'] = response['name']