**Notebook still in development**
* Search `TODO` 
* Current example uses a small image dataset; plan to adapt for ~ImageNet
* Need custom serving container or prediction routine to convert query images to model input in json format

# Image Similarity Matching / Retrieval

1. Use a pretrained deep learning model to extract feature (embedding) vectors for each image in a product catalog

2. Store embedding vectors in a scalable appriximate nearest neighbor (ANN) service, [Vertex Matching Engine](https://cloud.google.com/vertex-ai/docs/matching-engine/overview), where each image's embedding vectors are indexed by product ID

3. To retrieve the most similar images to a query image, use same pretrained model from (1) to extract the embedding vectors from the query image and query the index with these vectors

#### This notebook orchestrates the below pipeline steps with [Vertex Pipelines](https://cloud.google.com/vertex-ai/docs/pipelines/introduction)

In [57]:
# ![End-to-End Vertex Pipeline](https://github.com/tottenjordan/retail-visual-similarity/blob/master/img/pipeline-v1.png?raw=true)

### References

* Matching Engine [SDK source code](https://github.com/googleapis/python-aiplatform/tree/main/google/cloud/aiplatform/_matching_engine)
* TODO: remove when complete: Original [Colab](https://colab.sandbox.google.com/drive/1ysjjGTv7EKkBD90dsdVD3OZvW4Oka1aC#scrollTo=AIaRc1NI3M6P)

## Setup

In [3]:
PROJECT_ID = 'hybrid-vertex'  # <--- TODO: CHANGE THIS
LOCATION = 'us-central1' 
!gcloud config set project {PROJECT_ID}

Updated property [core/project].


If using Google Colab

In [4]:

import sys

# if 'google.colab' in sys.modules:
#     from google.colab import auth
#     auth.authenticate_user()
    
    
# if 'google.colab' in sys.modules:
#     USER_FLAG = ''
# else:
#     USER_FLAG = '--user'

## pip & package

In [1]:
# Use these
# ! pip3 install -U google-cloud-storage $USER_FLAG
! pip3 install $USER kfp google-cloud-pipeline-components --upgrade
# !pip install google-cloud-aiplatform==1.16.1

# (Optional) OSS Scann for testing
# !pip install scann

# TODO: Not Needed (?)
# !git clone https://github.com/kubeflow/pipelines.git
# !pip install pipelines/components/google-cloud/.


# Automatically restart kernel after installs
import IPython
app = IPython.Application.instance()
app.kernel.do_shutdown(True)

In [2]:
! python3 -c "import kfp; print('KFP SDK version: {}'.format(kfp.__version__))"
! python3 -c "import google_cloud_pipeline_components; print('google_cloud_pipeline_components version: {}'.format(google_cloud_pipeline_components.__version__))"
! python3 -c "import google.cloud.aiplatform; print('aiplatform SDK version: {}'.format(google.cloud.aiplatform.__version__))"

KFP SDK version: 1.8.13
google_cloud_pipeline_components version: 1.0.17
aiplatform SDK version: 1.16.1


In [3]:
import json
import os
import time
import pandas as pd
import matplotlib.pylab as plt
import numpy as np
import tensorflow as tf
import tensorflow_hub as hub
import sys

# Display Images
from IPython.display import clear_output, Image
from IPython.core.display import HTML

# GCP
from google.cloud import aiplatform
# from google.cloud import bigquery
from google.cloud import storage

# Pipelines
from typing import Any, Callable, Dict, NamedTuple, Optional, List
from google_cloud_pipeline_components import aiplatform as gcc_aip
from google_cloud_pipeline_components.types import artifact_types

# Kubeflow SDK
# TODO: fix these
from kfp.v2 import dsl
import kfp
import kfp.v2.dsl
from kfp.v2.google import client as pipelines_client
from kfp.v2.dsl import (Artifact, Dataset, Input, InputPath, Model, Output,
                        OutputPath, component)

### Setup Clients

In [38]:
# Required Pipeline Parameters
PIPE_USER = 'jtott'  #  {type: 'string'} <--- TODO: CHANGE THIS
BUCKET = 'retail-products-kaggle'
BUCKET_URI = f'gs://{BUCKET}'
DATA_FOLDER = 'data-full'

GS_PIPELINE_ROOT_PATH = 'gs://{}/pipeline_root/{}'.format(BUCKET, PIPE_USER)

LOCATION = 'us-central1'
PROJECT_ID = 'hybrid-vertex'
REGION = 'us-central1'

# !gcloud config set project {PROJECT_ID}

print('GS_PIPELINE_ROOT_PATH: {}'.format(GS_PIPELINE_ROOT_PATH))
print('BUCKET_URI: {}'.format(BUCKET_URI))

GS_PIPELINE_ROOT_PATH: gs://retail-products-kaggle/pipeline_root/jtott
BUCKET_URI: gs://retail-products-kaggle


#### create GCS bucket if not exists

In [6]:
# ! gsutil mb -l $REGION $BUCKET_URI

In [7]:
! gsutil ls -al $BUCKET_URI

 271604965  2022-08-17T01:48:43Z  gs://retail-products-kaggle/retail-products-classification.zip#1660700923141049  metageneration=1
                                 gs://retail-products-kaggle/data-full/
                                 gs://retail-products-kaggle/dataset/
TOTAL: 1 objects, 271604965 bytes (259.02 MiB)


In [8]:
# Setup Clients
os.environ['GOOGLE_CLOUD_PROJECT']=PROJECT_ID

# colab_auth.authenticate_user() # if using colab

storage_client = storage.Client(
    project=PROJECT_ID
)
# PipelineJob
pipeline_client = pipelines_client.AIPlatformClient(
  project_id=PROJECT_ID,
  region=LOCATION,
)

aiplatform.init(
    project=PROJECT_ID, location=LOCATION, staging_bucket=BUCKET
)



### Saving Pipelines

In [10]:
# Pipeline Stuff
PIPELINES = {}

PIPELINES_FILEPATH = f'gs://{BUCKET}/pipelines/pipelines.json' # <--- TODO: CHANGE THIS; can be blank json file
print("PIPELINES_FILEPATH:", PIPELINES_FILEPATH)

if os.path.isfile(PIPELINES_FILEPATH):
  with open(PIPELINES_FILEPATH) as f:
    PIPELINES = json.load(f)
else:
  PIPELINES = {}

def save_pipelines():
  with open(PIPELINES_FILEPATH, 'w') as f:
    json.dump(PIPELINES, f)

PIPELINES_FILEPATH: gs://retail-products-kaggle/pipelines/pipelines.json


# Create Pipeline

## Pipeline Components

In [23]:
! rm -rf pipelines
!mkdir -p ./pipelines

### Find Model Endpoint

In [39]:
@kfp.v2.dsl.component(
  base_image='python:3.9',
  packages_to_install=['google-cloud-aiplatform==1.16.1'],
  output_component_file="./pipelines/find_model_endpoint.yaml",
)
def find_model_endpoint_test(
    project: str,
    location: str,
    endpoint_name: str,
) -> NamedTuple('Outputs', [
                            ('create_new_endpoint', str),
                            ('existing_endpoint_uri', str),
                            ('deployed_models_count', int),
                            ('undeploy_model_needed', str),
                            ('deployed_model_list', list),
                            ('endpoint_traffic_split', str),
]):

  from google.cloud import aiplatform
  import json
  import logging

  aiplatform.init(
      project=project,
      location=location,
  )

  deployed_model_list = []

  logging.info(f"Searching for model endpoint: {endpoint_name}")

  if aiplatform.Endpoint.list(
      filter=f'display_name="{endpoint_name}"'):
    '''
    Because existing Endpoint found: 
        (1) will not create new
        (2) Need the endpoint uri
        (3) Need list of deployed models on this endpoint;
        (4) If more than 1 deployed model exists, trigger subsequent conditional step
            to undeploy all but 1 (latest) model 

    '''
    logging.info(f"Model endpoint, {endpoint_name}, already exists")
    create_new_endpoint="False"
    
    # create endpoint list resource in memory
    _endpoint = aiplatform.Endpoint.list(
        filter=f'display_name="{endpoint_name}"'
    )[0]
    logging.info(f"Parsing details for _endpoint: {_endpoint}")
    
    # retrieve endpoint uri
    existing_endpoint_uri = _endpoint.resource_name
    logging.info(f"existing_endpoint_uri: {existing_endpoint_uri}")
    _traffic_split = _endpoint.traffic_split

    # retrieve deployed model IDs
    deployed_models = _endpoint.gca_resource.deployed_models
    deployed_models_count = len(deployed_models)
    logging.info(f"deployed_models_count: {deployed_models_count}")

    if deployed_models_count > 1:
      # deployed_model_id_0 = _endpoint.gca_resource.deployed_models[0].id
      # deployed_model_id_1 = _endpoint.gca_resource.deployed_models[1].id
      undeploy_model_needed = "True"                                             # arbitrary assumption: no more than 2 (3) models per model_endpoint
      for model in deployed_models:
        deployed_model_list.append(model.id)
    elif deployed_models_count == 0:
      undeploy_model_needed = "False"
    else:
      undeploy_model_needed = "False"
      deployed_model_list.append(_endpoint.gca_resource.deployed_models[0].id)

    # deployed_model_id = _endpoint.gca_resource.deployed_models[0].id
    logging.info(f"Currently deployed_model_list {deployed_model_list}")

  else:
    logging.info(f"Model endpoint, {endpoint_name}, does not exist")
    
    create_new_endpoint="True"
    deployed_models_count=0
    existing_endpoint_uri="N/A"
    undeploy_model_needed = "N/A"
    _traffic_split = "N/A"
    # deployed_model_list = []

  logging.info(f"create_new_endpoint {create_new_endpoint}")
  logging.info(f"existing_endpoint_uri {existing_endpoint_uri}")
  logging.info(f"deployed_models_count {deployed_models_count}")
  logging.info(f"undeploy_model_needed {undeploy_model_needed}")
  logging.info(f"deployed_model_list {deployed_model_list}")
  logging.info(f"_traffic_split {_traffic_split}")


  return (
      f'{create_new_endpoint}',
      f'{existing_endpoint_uri}',
      deployed_models_count,
      f'{undeploy_model_needed}',
      deployed_model_list,
      f'{_traffic_split}',
  )

### TODO: Create Serving Function for Pretrained Model 
* see [this example](https://github.com/GoogleCloudPlatform/vertex-ai-samples/blob/main/notebooks/community/vertex_endpoints/tf_hub_obj_detection/deploy_tfhub_object_detection_on_vertex_endpoints.ipynb)

### TODO: Upload Model

In [40]:
# %%writefile vertex_train/trainer/task.py

# @kfp.v2.dsl.component(
#     base_image="python:3.9",
#     packages_to_install=['google-cloud-aiplatform==1.16.1',
#                          'google-cloud-storage',
#                          'tensorflow==2.8',
#                          'tensorflow-hub==0.12.0',
#                          'tensorflow-estimator==2.6.0',
#                          'keras==2.6'],
#     output_component_file="./pipelines/upload_pretrained_model.yaml",
# )
# def upload_pretrained_model(
#     project: str,
#     location: str,  
#     saved_model_gcs_bucket: str,
#     model_display_name: str,
#     serving_container_image_uri: str,
# ) -> NamedTuple('Outputs', [('vertex_model', Artifact),
#                             ('vertex_model_uri', str),
#                             ('vertex_model_display_name', str),
#                             ('vertex_model_gcs_dir', str)]):
  
#   from google.cloud import aiplatform
#   import json
#   import logging
#   import os
#   import tensorflow as tf
#   import tensorflow_hub as hub
#   from datetime import datetime

#   aiplatform.init(project=project,location=location)
  
#   # Load compressed models from tensorflow_hub
#   os.environ['TFHUB_MODEL_LOAD_FORMAT'] = 'COMPRESSED'

#   # TODO: paramaterize
#   IMG_HEIGHT = 224
#   IMG_WIDTH = 224
#   IMG_CHANNELS = 3
#   BATCH_SIZE = 32
#   NUM_IMAGES = 510
#   NUM_NEIGH = 3 # top 3

#   # ==============================================================================
#   # Load pretrained model from TF Hub
#   # ==============================================================================
#   layers = [
#       hub.KerasLayer(
#           "https://tfhub.dev/google/tf2-preview/mobilenet_v2/feature_vector/4", # TODO: paramaterize
#           input_shape=(IMG_HEIGHT, IMG_WIDTH, IMG_CHANNELS),
#           trainable=False,
#           name='mobilenet_embedding'),
#       tf.keras.layers.Flatten()
#   ]

#   model = tf.keras.Sequential(layers, name='img_embedding') # TODO: paramaterize
#   print("model summary:", model.summary())
  
#   TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")
#   MODEL_NAME = f'{model_display_name}-{TIMESTAMP}'
#   # print("MODEL_NAME", MODEL_NAME)
  
#   save_path = os.path.join("gs://", saved_model_gcs_bucket, MODEL_NAME)
#   print("model save_path", save_path)
  
#   model.save(save_path)

#   # ==============================================================================
#   # Load pretrained model from TF Hub
#   # ==============================================================================
  
#   # TODO: parametrize
#   SERVING_CONTAINER_IMAGE = "us-docker.pkg.dev/vertex-ai/prediction/tf2-cpu.2-6:latest"

#   # Upload model to Vertex
#   model = aiplatform.Model.upload(
#       display_name=f'{MODEL_NAME}',
#       artifact_uri=save_path,
#       serving_container_image_uri=SERVING_CONTAINER_IMAGE,
#       sync=True,
#   )
#   print("Model uploaded")
#   print("model.resource_name:", model.resource_name)
#   print("model.display_name:", model.display_name)

#   vertex_model_uri=model.resource_name
#   vertex_model_display_name=model.display_name

#   return (
#       model, 
#       f'{vertex_model_uri}',
#       f'{vertex_model_display_name}',
#       f'{save_path}',
#   )

### TODO: Deploy Model
Proceeded by pre-built component for creating endpoint

In [41]:
# @kfp.v2.dsl.component(
#     base_image="python:3.9",
#     packages_to_install=['google-cloud-aiplatform==1.16.1'], # 'tensorflow==2.6'
      # output_component_file="./pipelines/find_model_endpoint.yaml",
# )
# def deploy_pretrained_model(
#     project: str,
#     location: str,
#     model_endpoint_name: str,
#     model_resource_path: str,
#     model_display_name: str,
#     traffic_percentage: int,
#     serving_machine_type: str,
#     serving_min_replica_count: int,
#     serving_max_replica_count: int,
# ) -> NamedTuple('Outputs', [('vertex_endpoint', Artifact),
#                             ('vertex_model', Artifact),
#                             ('vertex_endpoint_uri', str),
#                             ('vertex_model_uri', str),]):
  
#   from google.cloud import aiplatform
#   import json
#   import logging
#   from datetime import datetime

#   aiplatform.init(
#       project=project,location=location,
#   )

#   # Find Vertex Endpoint
#   endpoint_id = aiplatform.Endpoint.list(filter='display_name={}'.format(model_endpoint_name))[0]
#   logging.info("endpoint_id: %s", endpoint_id)

#   endpoint_uri = endpoint_id.resource_name
#   logging.info("endpoint_uri: %s", endpoint_uri)
#   logging.info("endpoint_uri[54:]: %s", endpoint_uri[54:])

#   endpoint = aiplatform.Endpoint(endpoint_name=endpoint_uri[54:])
#   logging.info("endpoint: %s", endpoint)

#   # Initialize Vertex Model
#   logging.info("model path: %s", model_resource_path)

#   model_resource = aiplatform.Model(model_resource_path)
  
#   deployed_model = model_resource.deploy(
#       endpoint=endpoint,
#       deployed_model_display_name=f'deployed-{model_display_name}',
#       traffic_percentage=100,                                     # should be 100 since no other models on endpoint yet
#       machine_type=serving_machine_type,
#       min_replica_count=serving_min_replica_count,
#       max_replica_count=serving_max_replica_count,
#       sync=True,
#   )
#   # deployed_model.wait()
#   print("deployed_model.display_name:", deployed_model.display_name)
#   print("deployed_model.resource_name:", deployed_model.resource_name)

#   vertex_endpoint_uri=endpoint.resource_name
#   vertex_model_uri=deployed_model.resource_name
  
#   return (endpoint, 
#           deployed_model, 
#           f'{vertex_endpoint_uri}', 
#           f'{vertex_model_uri}')

### Feature Extraction

**TODO**
* logic for writting catalog embedding vectors`xx.json` is specific to current image GCS file pattern

In [42]:
@kfp.v2.dsl.component(
    base_image="python:3.9",
    packages_to_install=['google-cloud-aiplatform==1.16.1',
                         'google-cloud-storage',
                         'tensorflow==2.8',
                         'tensorflow-hub==0.12.0',
                         'tensorflow-estimator==2.8.0',
                         'keras==2.8.0'],
    output_component_file="./pipelines/feature_extraction.yaml",
)
def feature_extraction(
    project: str,
    location: str,
    images_gcs_uri: str,
    emb_index_gcs_uri: str,
    saved_model_gcs_bucket: str,
    index_json_name: str,
    # model_resource_path: str,
    # vertex_model_gcs_dir: str,
) -> NamedTuple('Outputs', [('embedding_index_file_uri', str),
                            ('saved_pretrained_model_gcs_location', str),
                            # ('img_bottleneck_model', Artifact),
                            ]):
  import os
  from google.cloud import storage
  from google.cloud.storage.bucket import Bucket
  from google.cloud.storage.blob import Blob
  from datetime import datetime
  import tensorflow as tf
  import tensorflow_hub as hub


  # Load compressed models from tensorflow_hub
  os.environ['TFHUB_MODEL_LOAD_FORMAT'] = 'COMPRESSED'

  # TODO: paramaterize
  IMG_HEIGHT = 224
  IMG_WIDTH = 224
  IMG_CHANNELS = 3
  BATCH_SIZE = 32
  NUM_IMAGES = 510
  NUM_NEIGH = 3 # top 3

  # ==============================================================================
  # Define helper functions
  # ==============================================================================
  
  def _upload_blob_gcs(gcs_uri, source_file_name, destination_blob_name):
    """Uploads a file to GCS bucket"""
    client = storage.Client(project=project)
    blob = Blob.from_string(os.path.join(gcs_uri, destination_blob_name))
    blob.bucket._client = client
    blob.upload_from_filename(source_file_name)

  def read_and_decode(filename, reshape_dims=[IMG_HEIGHT, IMG_WIDTH]):
    # Read the file
    img = tf.io.read_file(filename)
  
    # Convert the compressed string to a 3D uint8 tensor.
    img = tf.image.decode_jpeg(img, channels=IMG_CHANNELS)
  
    # Use `convert_image_dtype` to convert to floats in the [0,1] range.
    # This makes the img 1 x 224 x 224 x 3 tensor with the data type of float32
    img = tf.image.convert_image_dtype(img, tf.float32)[tf.newaxis, ...]
  
    # Resize the image to the desired size.
    return tf.image.resize(img, reshape_dims)

  def create_embeddings_dataset(embedder, img_path):
    dataset_filenames = []
    dataset_embeddings = []

    list_dir = tf.io.gfile.listdir(img_path)

    for file in list_dir:
      img_tensor = read_and_decode(img_path + "/" + file, [IMG_WIDTH, IMG_HEIGHT])
      embeddings = embedder(img_tensor)
      dataset_filenames.append(img_path + "/" + file)
      dataset_embeddings.extend(embeddings)
  
    dataset_embeddings = tf.convert_to_tensor(dataset_embeddings)
  
    return dataset_filenames, dataset_embeddings

  # ==============================================================================
  # Download pre-trained model
  # ==============================================================================
  layers = [
      hub.KerasLayer(
          "https://tfhub.dev/google/tf2-preview/mobilenet_v2/feature_vector/4", # TODO: paramaterize
          input_shape=(IMG_HEIGHT, IMG_WIDTH, IMG_CHANNELS),
          trainable=False,
          name='mobilenet_embedding'),
      tf.keras.layers.Flatten()
  ]

  model = tf.keras.Sequential(layers, name='pretrained_mobilenet') # TODO: paramaterize
  # loaded_model = tf.keras.models.load_model(vertex_model_gcs_dir)
  # print("model summary:", loaded_model.summary())
  
  TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")
  MODEL_NAME = f'pipe-mobilenet_v2-{TIMESTAMP}'
  print("MODEL_NAME", MODEL_NAME)
  
  save_path = os.path.join(saved_model_gcs_bucket, MODEL_NAME) # "gs://", 
  print("model save_path", save_path)
  
  model.save(save_path)

  # ==============================================================================
  # Create embedding dataset
  # ==============================================================================
  dataset_filenames, dataset_embeddings = create_embeddings_dataset(
    lambda x: model.predict(x),
    images_gcs_uri
  )

  print("sample dataset_filenames", dataset_filenames[:3])
  print("dataset_embeddings shape:", dataset_embeddings.shape)

  # ==============================================================================
  # Write Embeddings and IDs to json
  # ==============================================================================

  # TODO: this code will only work with the file pattern created from zipped file
  #       adjust this to expected GCS file patterns

  with open(f"{index_json_name}", "w") as f:
    for gcs_uri, vector in zip(dataset_filenames,dataset_embeddings):
      x = gcs_uri.split("/")[-1]
      id = x.split(".")[0]
      vector = vector.numpy()
      f.write('{"id":"' + str(id) + '",')
      f.write('"embedding":[' + ",".join(str(x) for x in vector) + "]}")
      f.write("\n")

  _upload_blob_gcs(emb_index_gcs_uri, f"{index_json_name}", f"{index_json_name}") 

  embedding_index_file_uri = f'{emb_index_gcs_uri}/{index_json_name}'
  print("embedding_index_file_uri:", embedding_index_file_uri)

  return(
      f'{embedding_index_file_uri}',
      f'{save_path}',
  )

In [43]:
# with open("candidate_embeddings.json", "w") as f:
#     for prod, emb in zip(ivm_s,candidate_embeddings):
#         f.write('{"id":"' + str(id) + '",')
#         f.write('"embedding":[' + ",".join(str(x) for x in vector) + "]}")
#         f.write("\n")

# with open(f"{index_json_name}", "w") as f:
#   for gcs_uri, vector in zip(dataset_filenames,dataset_embeddings):
#     f.write('{"id":"' + str(id) + '",')
#     f.write('"embedding":[' + ",".join(str(x) for x in vector) + "]}")
#     f.write("\n")

### Create ME Index

In [44]:
@kfp.v2.dsl.component(
    base_image="python:3.9",
    packages_to_install=['google-cloud-aiplatform==1.16.1',], # TODO: update once merged
    output_component_file="./pipelines/create_ann_index.yaml",
)
def create_ann_index(
    project: str,
    location: str,
    staging_bucket: str,
    vpc_network_name: str,
    emb_index_gcs_uri: str,
    dimensions: int,
    ann_index_display_name: str,
    approximate_neighbors_count: int,
    distance_measure_type: str,
    leaf_node_embedding_count: int,
    leaf_nodes_to_search_percent: int, 
    ann_index_description: str,
    ann_index_labels: Dict, 
) -> NamedTuple('Outputs', [('ann_index_resource_uri', str),
                            ('ann_index', Artifact),]):


  from google.cloud import aiplatform
  from datetime import datetime

  aiplatform.init(project=project, location=location, staging_bucket=staging_bucket)
  TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")


  ENDPOINT = "{}-aiplatform.googleapis.com".format(location)
  NETWORK_NAME = vpc_network_name
  INDEX_DIR_GCS = emb_index_gcs_uri
  PARENT = "projects/{}/locations/{}".format(project, location)

  print("ENDPOINT: {}".format(ENDPOINT))
  print("PROJECT_ID: {}".format(project))
  print("REGION: {}".format(location))

  ann_index = aiplatform.MatchingEngineIndex.create_tree_ah_index(
      display_name=f'{ann_index_display_name}-{TIMESTAMP}',
      contents_delta_uri=emb_index_gcs_uri,
      dimensions=dimensions,
      approximate_neighbors_count=approximate_neighbors_count,
      distance_measure_type=distance_measure_type,
      leaf_node_embedding_count=leaf_node_embedding_count,
      leaf_nodes_to_search_percent=leaf_nodes_to_search_percent,
      description=ann_index_description,
      labels=ann_index_labels,
  )

  ann_index_resource_uri = ann_index.resource_name
  print("ann_index_resource_uri:", ann_index_resource_uri) 

  return (
      f'{ann_index_resource_uri}',
      ann_index,
  )

### Create Brute Force Index

In [45]:
@kfp.v2.dsl.component(
    base_image="python:3.9",
    packages_to_install=['google-cloud-aiplatform==1.16.1',], # TODO: update once merged
    output_component_file="./pipelines/create_brute_force_index.yaml",
)
def create_brute_force_index(
    project: str,
    location: str,
    staging_bucket: str,
    vpc_network_name: str,
    emb_index_gcs_uri: str,
    dimensions: int,
    brute_force_index_display_name: str,
    approximate_neighbors_count: int,
    distance_measure_type: str,
    brute_force_index_description: str,
    brute_force_index_labels: Dict,
) -> NamedTuple('Outputs', [('brute_force_index_resource_uri', str),
                            ('brute_force_index', Artifact),]):


  from google.cloud import aiplatform
  from datetime import datetime

  aiplatform.init(project=project, location=location, staging_bucket=staging_bucket)
  TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")


  ENDPOINT = "{}-aiplatform.googleapis.com".format(location)
  NETWORK_NAME = vpc_network_name
  INDEX_DIR_GCS = emb_index_gcs_uri
  PARENT = "projects/{}/locations/{}".format(project, location)

  print("ENDPOINT: {}".format(ENDPOINT))
  print("PROJECT_ID: {}".format(project))
  print("REGION: {}".format(location))

  brute_force_index = aiplatform.MatchingEngineIndex.create_brute_force_index(
      display_name=f'{brute_force_index_display_name}-{TIMESTAMP}',
      contents_delta_uri=emb_index_gcs_uri,
      dimensions=dimensions,
      # approximate_neighbors_count=approximate_neighbors_count,
      distance_measure_type=distance_measure_type,
      description=brute_force_index_description,
      labels=brute_force_index_labels,
  )
  brute_force_index_resource_uri = brute_force_index.resource_name
  print("brute_force_index_resource_uri:",brute_force_index_resource_uri) 

  return (
      f'{brute_force_index_resource_uri}',
      brute_force_index,
  )

### Create IndexEndpoint with VPC

In [46]:
@kfp.v2.dsl.component(
    base_image="python:3.9",
    packages_to_install=['google-cloud-aiplatform==1.16.1',], # TODO: update once merged
    output_component_file="./pipelines/create_index_endpoint_vpc.yaml",
)
def create_index_endpoint_vpc(
    project: str,
    project_number: str,
    location: str,
    staging_bucket: str,
    vpc_network_name: str,
    index_endpoint_display_name: str,
    index_endpoint_description: str,
) -> NamedTuple('Outputs', [
                            ('vpc_network_resource_uri', str),
                            ('index_endpoint_resource_uri', str),
                            ('index_endpoint', Artifact),
                            ('index_endpoint_display_name', str),
                            ]):

  from google.cloud import aiplatform
  from datetime import datetime

  aiplatform.init(project=project, location=location, staging_bucket=staging_bucket)
  TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")

  vpc_network_resource_uri = f'projects/{project_number}/global/networks/{vpc_network_name}'
  print("vpc_network_resource_uri:", vpc_network_resource_uri)

  index_endpoint = aiplatform.MatchingEngineIndexEndpoint.create(
      display_name=f'{index_endpoint_display_name}-{TIMESTAMP}',
      description=index_endpoint_description,
      network=vpc_network_resource_uri,
  )
  index_endpoint_resource_uri = index_endpoint.resource_name
  print("index_endpoint_resource_uri:", index_endpoint_resource_uri)

  return (
      f'{vpc_network_resource_uri}',
      f'{index_endpoint_resource_uri}',
      index_endpoint,
      f'{index_endpoint_display_name}-{TIMESTAMP}'
  )

### Deploy Indexes

In [47]:
@kfp.v2.dsl.component(
    base_image="python:3.9",
    packages_to_install=['google-cloud-aiplatform==1.16.1',], # TODO: update once merged
    output_component_file="./pipelines/deploy_ann_index.yaml",
)
def deploy_ann_index(
    project: str,
    location: str,
    staging_bucket: str,
    deployed_ann_index_name: str,
    ann_index_resource_uri: str,
    index_endpoint_resource_uri: str,
) -> NamedTuple('Outputs', [
                            ('index_endpoint_resource_uri', str),
                            ('ann_index_resource_uri', str),
                            ('deployed_ann_index_name', str),
                            ('deployed_ann_index', Artifact),
                            ]):
  
  from google.cloud import aiplatform
  from datetime import datetime
  TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")

  aiplatform.init(project=project, location=location, staging_bucket=staging_bucket)

  ann_index = aiplatform.MatchingEngineIndex(
      index_name=ann_index_resource_uri
  )
  ann_index_resource_uri = ann_index.resource_name

  index_endpoint = aiplatform.MatchingEngineIndexEndpoint(
      index_endpoint_resource_uri
  )

  index_endpoint = index_endpoint.deploy_index(
      index=ann_index, 
      deployed_index_id=f'{deployed_ann_index_name}-{TIMESTAMP}'
  )

  print(index_endpoint.deployed_indexes)

  return (
      f'{index_endpoint_resource_uri}',
      f'{ann_index_resource_uri}',
      f'{deployed_ann_index_name}-{TIMESTAMP}',
      ann_index,
  )

In [48]:
@kfp.v2.dsl.component(
    base_image="python:3.9",
    packages_to_install=['google-cloud-aiplatform==1.16.1',], # TODO: update once merged
    output_component_file="./pipelines/deploy_brute_index.yaml",
)
def deploy_brute_index(
    project: str,
    location: str,
    staging_bucket: str,
    deployed_brute_force_index_name: str,
    brute_force_index_resource_uri: str,
    index_endpoint_resource_uri: str,
) -> NamedTuple('Outputs', [
                            ('index_endpoint_resource_uri', str),
                            ('brute_force_index_resource_uri', str),
                            ('deployed_brute_force_index_name', str),
                            ('deployed_brute_force_index', Artifact),
                            ]):
  
  from google.cloud import aiplatform
  from datetime import datetime
  TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")

  aiplatform.init(project=project, location=location, staging_bucket=staging_bucket)

  brute_index = aiplatform.MatchingEngineIndex(
      index_name=brute_force_index_resource_uri
  )
  brute_force_index_resource_uri = brute_index.resource_name

  index_endpoint = aiplatform.MatchingEngineIndexEndpoint(index_endpoint_resource_uri)

  index_endpoint = index_endpoint.deploy_index(
      index=brute_index, 
      deployed_index_id=f'{deployed_brute_force_index_name}-{TIMESTAMP}'
  )

  print(index_endpoint.deployed_indexes)

  return (
      f'{index_endpoint_resource_uri}',
      f'{brute_force_index_resource_uri}',
      f'{deployed_brute_force_index_name}-{TIMESTAMP}',
      brute_index,
  )

### Query indexes

In [49]:
@kfp.v2.dsl.component(
    base_image="python:3.9",
    packages_to_install=['google-cloud-aiplatform==1.16.1',
                         'google-cloud-storage',
                         'tensorflow==2.8',
                         'tensorflow-hub==0.12.0',
                         'tensorflow-estimator==2.8.0',
                         'keras==2.8'], # TODO: update once merged
    output_component_file="./pipelines/query_deployed_indexes.yaml",
)
def query_deployed_indexes(
    project: str,
    location: str,
    staging_bucket: str,
    num_neighbors: int,
    index_endpoint_resource_uri: str,
    deployed_brute_force_index_name: str,
    deployed_ann_index_name: str,
    test_imgs_gcs_dir: str,
    num_test_samples: int,
    vertex_model_gcs_dir: str,):
  
  import os
  import numpy
  import tensorflow as tf
  import tensorflow_hub as hub
  from google.cloud import aiplatform
  from datetime import datetime
  
  aiplatform.init(project=project, location=location, staging_bucket=staging_bucket)

  os.environ['TFHUB_MODEL_LOAD_FORMAT'] = 'COMPRESSED'

  IMG_HEIGHT = 224
  IMG_WIDTH = 224
  IMG_CHANNELS = 3
  
  ##############################################################################
  # Helper Functions
  ##############################################################################

  def read_and_decode(filename, reshape_dims=[IMG_HEIGHT, IMG_WIDTH]):
    # Read the file
    img = tf.io.read_file(filename)
    
    # Convert the compressed string to a 3D uint8 tensor.
    img = tf.image.decode_jpeg(img, channels=IMG_CHANNELS)
    
    # Use `convert_image_dtype` to convert to floats in the [0,1] range.
    # This makes the img 1 x 224 x 224 x 3 tensor with the data type of float32
    img = tf.image.convert_image_dtype(img, tf.float32)[tf.newaxis, ...]
    
    # Resize the image to the desired size.
    return tf.image.resize(img, reshape_dims)

  def create_query_embeddings(embedder, img_path):
    dataset_filenames = []
    dataset_embeddings = []
    list_dir = tf.io.gfile.listdir(img_path)

    for file in list_dir[:num_test_samples]:
      img_tensor = read_and_decode(img_path + "/" + file, [IMG_WIDTH, IMG_HEIGHT])
      embeddings = embedder(img_tensor)
      dataset_filenames.append(img_path + "/" + file)
      dataset_embeddings.extend(embeddings)
  
    dataset_embeddings = tf.convert_to_tensor(dataset_embeddings)
    return dataset_filenames, dataset_embeddings

  ##############################################################################
  # Init IndexEndpoint, Load Model, Create Query embeddings
  ##############################################################################

  index_endpoint = aiplatform.MatchingEngineIndexEndpoint(index_endpoint_resource_uri)

  loaded_model = tf.keras.models.load_model(vertex_model_gcs_dir)
  print("model summary:", loaded_model.summary())

  query_filenames, query_embeddings = create_query_embeddings(
      lambda x: loaded_model.predict(x),
      test_imgs_gcs_dir
  )
  print("query_embeddings shape:", query_embeddings.shape)
  print("query_filenames:", query_filenames)
  
  vector_list = []
  for q_vector in query_embeddings:
    vector_list.append(q_vector.numpy())

  ann_response = index_endpoint.match(
      deployed_index_id=deployed_ann_index_name, 
      queries=vector_list, 
      num_neighbors=num_neighbors
  )
  print("ann_response:", ann_response)

  brute_force_response = index_endpoint.match(
      deployed_index_id=deployed_brute_force_index_name, 
      queries=vector_list, 
      num_neighbors=num_neighbors
  )

  print("brute_force_response:", brute_force_response)

## Build & Compile Pipeline

## Pipe Configs

In [50]:
PROJECT_ID = 'hybrid-vertex'
LOCATION = 'us-central1'

# if not declared
PIPE_USER = 'jtott'
BUCKET = 'retail-products-kaggle'
BUCKET_URI = f'gs://{BUCKET}'

PIPELINE_VERSION = 'v1' # pipeline code
PIPELINE_TAG = f'retail-visual-similarity-{PIPELINE_VERSION}'
print("PIPELINE_TAG:", PIPELINE_TAG)

VERSION = 'v1' # component code

PIPELINE_TAG: retail-visual-similarity-v1


In [51]:
@kfp.v2.dsl.pipeline(
  name=f'{VERSION}-{PIPELINE_TAG}'.replace('_', '-')
)
def pipeline(
    project: str,
    project_number: str,
    location: str,
    staging_bucket: str,
    vpc_network_name: str,
    images_gcs_uri: str,
    emb_index_gcs_uri: str,
    saved_model_gcs_bucket: str,
    dimensions: int,
    ann_index_display_name: str,
    approximate_neighbors_count: int,
    distance_measure_type: str,
    leaf_node_embedding_count: int,
    leaf_nodes_to_search_percent: int, 
    ann_index_description: str,
    ann_index_labels: Dict,
    brute_force_index_display_name: str,
    brute_force_index_description: str,
    brute_force_index_labels: Dict,
    index_endpoint_display_name: str,
    index_endpoint_description: str,
    deployed_ann_index_name: str,
    deployed_brute_force_index_name: str,
    num_neighbors: int,
    test_imgs_gcs_dir: str,
    num_test_samples: int,
    model_endpoint_name: str,
    model_display_name: str,
    serving_container_image_uri: str,
    traffic_percentage: int,
    serving_machine_type: str,
    serving_min_replica_count: int,
    serving_max_replica_count: int,
    index_json_name: str,
):
    
    # ========================================================================
    # TODO: configure logic for model deployment / rollback / traffic split
    # TODO: fix: upload pre-trained model to Vertex Registry
    # ========================================================================
    
#     find_model_endpoint_op = (
#         find_model_endpoint_test(
#             project=project,
#             location=location,
#             endpoint_name=model_endpoint_name,
#         )
#         .set_display_name("Find Model Endpoint")
#         .set_caching_options(True)
#     )

#     create_endpoint_op = (
#         gcc_aip.EndpointCreateOp(
#             project=project,
#             location=location,
#             display_name=model_endpoint_name,
#         )
#         .set_display_name("Create Model Endpoint")
#         .set_caching_options(True)
#     )

#     upload_pretrained_model_op = (
#         upload_pretrained_model(
#             project=project,
#             location=location,
#             saved_model_gcs_bucket=saved_model_gcs_bucket,
#             model_display_name=model_display_name,
#             serving_container_image_uri=serving_container_image_uri,
#         )
#         .set_display_name("Upload Pretrained Model")
#         .after(create_endpoint_op)
#         .set_caching_options(True)
#     )

#     deploy_pretrained_model_op = (
#         deploy_pretrained_model(
#             project=project,
#             location=location,
#             model_endpoint_name=model_endpoint_name,
#             model_resource_path=upload_pretrained_model_op.outputs['vertex_model_uri'],
#             model_display_name=upload_pretrained_model_op.outputs['vertex_model_display_name'],
#             traffic_percentage=traffic_percentage,
#             serving_machine_type=serving_machine_type,
#             serving_min_replica_count=serving_min_replica_count,
#             serving_max_replica_count=serving_max_replica_count,
#         )
#         .set_display_name("Deploy Pretrained Model")
#         .after(upload_pretrained_model_op)
#         .set_caching_options(True)
#     )

    # ========================================================================
    # Feature Extraction
    # ========================================================================
    feature_extraction_op = (
        feature_extraction(
            project=project,
            location=location,
            images_gcs_uri=images_gcs_uri,
            emb_index_gcs_uri=emb_index_gcs_uri,
            saved_model_gcs_bucket=saved_model_gcs_bucket,
            # model_resource_path=upload_pretrained_model_op.outputs['vertex_model_uri'],
            # vertex_model_gcs_dir=upload_pretrained_model_op.outputs['vertex_model_gcs_dir'],
            index_json_name=index_json_name,
        )
        .set_display_name("Feature Extraction")
        # .after(deploy_pretrained_model_op)
        .set_caching_options(True)
    )

    create_ann_index_op = (
        create_ann_index(
            project=project,
            location=location,
            staging_bucket=staging_bucket,
            vpc_network_name=vpc_network_name,
            emb_index_gcs_uri=emb_index_gcs_uri,
            dimensions=dimensions,
            ann_index_display_name=ann_index_display_name,
            approximate_neighbors_count=approximate_neighbors_count,
            distance_measure_type=distance_measure_type,
            leaf_node_embedding_count=leaf_node_embedding_count,
            leaf_nodes_to_search_percent=leaf_nodes_to_search_percent, 
            ann_index_description=ann_index_description,
            ann_index_labels=ann_index_labels,
        )
        .set_display_name("Create ANN Index")
        .after(feature_extraction_op)
        .set_caching_options(True)
    )

    create_brute_force_index_op = (
        create_brute_force_index(
            project=project,
            location=location,
            staging_bucket=staging_bucket,
            vpc_network_name=vpc_network_name,
            emb_index_gcs_uri=emb_index_gcs_uri,
            dimensions=dimensions,
            brute_force_index_display_name=brute_force_index_display_name,
            approximate_neighbors_count=approximate_neighbors_count,
            distance_measure_type=distance_measure_type,
            brute_force_index_description=brute_force_index_description,
            brute_force_index_labels=brute_force_index_labels,
        )
        .set_display_name("Create Brute Force Index")
        .after(feature_extraction_op)
        .set_caching_options(True)
    )

    # ========================================================================
    # Create Index Endpoint
    # ========================================================================

    create_index_endpoint_vpc_op = (
        create_index_endpoint_vpc(
            project=project,
            project_number=project_number,
            location=location,
            staging_bucket=staging_bucket,
            vpc_network_name=vpc_network_name,
            index_endpoint_display_name=index_endpoint_display_name,
            index_endpoint_description=index_endpoint_description,
        )
        .set_display_name("Create Index Endpoint")
        .after(feature_extraction_op)
        .set_caching_options(True)
    )

    # ========================================================================
    # Deploy Indexes
    # ========================================================================

    deploy_ann_index_op = (
        deploy_ann_index(
            project=project,
            location=location,
            staging_bucket=staging_bucket,
            deployed_ann_index_name=deployed_ann_index_name,
            ann_index_resource_uri=create_ann_index_op.outputs['ann_index_resource_uri'],
            index_endpoint_resource_uri=create_index_endpoint_vpc_op.outputs['index_endpoint_resource_uri'],
        )
        .set_display_name("Deploy ANN Index")
        .set_caching_options(True)
    )

    deploy_brute_index_op = (
        deploy_brute_index(
            project=project,
            location=location,
            staging_bucket=staging_bucket,
            deployed_brute_force_index_name=deployed_brute_force_index_name,
            brute_force_index_resource_uri=create_brute_force_index_op.outputs['brute_force_index_resource_uri'],
            index_endpoint_resource_uri=create_index_endpoint_vpc_op.outputs['index_endpoint_resource_uri'],
        )
        .set_display_name("Deploy Brute Force")
        .set_caching_options(True)
    )

    query_deployed_indexes_op = (
        query_deployed_indexes(
            project=project,
            location=location,
            staging_bucket=staging_bucket,
            num_neighbors=num_neighbors,
            index_endpoint_resource_uri=create_index_endpoint_vpc_op.outputs['index_endpoint_resource_uri'],
            deployed_brute_force_index_name=deploy_brute_index_op.outputs['deployed_brute_force_index_name'],
            deployed_ann_index_name=deploy_ann_index_op.outputs['deployed_ann_index_name'],
            test_imgs_gcs_dir=test_imgs_gcs_dir,
            num_test_samples=num_test_samples,
            vertex_model_gcs_dir=feature_extraction_op.outputs['saved_pretrained_model_gcs_location'],
        )
        .set_display_name("Query Deployed Indexes")
        .set_caching_options(True)
    )

In [52]:
kfp.v2.compiler.Compiler().compile(
  pipeline_func=pipeline, 
  package_path='custom_container_pipeline_spec.json',
)

In [53]:
from datetime import datetime
TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")
# MODEL_NAME = f'pipe-mobilenet_v2-{TIMESTAMP}'

PREFIX = PIPE_USER    # 'v1'
PROJECT_ID = 'hybrid-vertex'
project_number='934903580331'
LOCATION = 'us-central1'
BUCKET = 'retail-products-kaggle'

staging_bucket=f'gs://{BUCKET}/staging'

emb_index_gcs_uri = f'gs://{BUCKET}/indexes/{VERSION}'
saved_model_gcs_bucket = f'gs://{BUCKET}/saved-models/{VERSION}'
vpc_network_name='ucaip-haystack-vpc-network'
images_gcs_uri=f'gs://{BUCKET}/{DATA_FOLDER}/train/train'
index_json_name = "retail_kaggle_catalog.json"

# Model
model_endpoint_name = f'pipe-mobilenet_v2_endpoint_{VERSION}'
model_display_name = f'{PREFIX}-pipe-mobilenet-v2'
serving_container_image_uri = "us-docker.pkg.dev/vertex-ai/prediction/tf2-cpu.2-6:latest"
traffic_percentage=100
serving_machine_type="n1-highcpu-32"
serving_min_replica_count=1
serving_max_replica_count=1

# Indexes
DIMENSIONS = 1280
approximate_neighbors_count=5
distance_measure_type="DOT_PRODUCT_DISTANCE"
leaf_node_embedding_count=500
leaf_nodes_to_search_percent=7

ann_index_display_name = f'ann_{DIMENSIONS}_index_{PREFIX}'
brute_force_index_display_name = f'brute_force_{DIMENSIONS}_index_{PREFIX}'

ann_index_description=f'Kaggle Retail Product MobileNet_v2 ANN index {VERSION}-{PIPELINE_VERSION}'
brute_force_index_description=f"Kaggle Retail Product MobileNet_v2 (brute force)-{VERSION}-{PIPELINE_VERSION}"


ann_index_labels={'version': f'{VERSION}',
                  'pipeline_version': f'{PIPELINE_VERSION}',}

brute_force_index_labels={'version': f'{VERSION}',
                          'pipeline_version': f'{PIPELINE_VERSION}',}

index_endpoint_display_name=f'index_endpoint_{PREFIX}'
index_endpoint_description="index endpoint description"

deployed_ann_index_name=f'ann_{DIMENSIONS}_deployed_index_{PREFIX}'
deployed_brute_force_index_name=f'brute_force_{DIMENSIONS}_deployed_index_{PREFIX}'

num_neighbors=3
test_imgs_gcs_dir=f'gs://{BUCKET}/{DATA_FOLDER}/test/test'
num_test_samples=20

In [54]:
overwrite = True
# overwrite = False

In [55]:
if not PIPELINES.get('train') or overwrite:
    response = pipeline_client.create_run_from_job_spec(
        job_spec_path='custom_container_pipeline_spec.json',
        network=f'projects/{project_number}/global/networks/{vpc_network_name}', # set to same VPC as index
        # service_account=SERVICE_ACCOUNT, # <--- TODO: Uncomment if needed
        parameter_values={
            'project': PROJECT_ID,
            'project_number': project_number,
            'location': LOCATION,
            'staging_bucket': staging_bucket,
            'vpc_network_name': vpc_network_name,
            'images_gcs_uri': images_gcs_uri,
            'emb_index_gcs_uri': emb_index_gcs_uri,
            'saved_model_gcs_bucket': saved_model_gcs_bucket,
            'dimensions': DIMENSIONS,
            'ann_index_display_name': ann_index_display_name,
            'approximate_neighbors_count': approximate_neighbors_count,
            'distance_measure_type': distance_measure_type,
            'leaf_node_embedding_count': leaf_node_embedding_count,
            'leaf_nodes_to_search_percent': leaf_nodes_to_search_percent, 
            'ann_index_description': ann_index_description,
            'ann_index_labels': ann_index_labels,
            'brute_force_index_display_name': brute_force_index_display_name,
            'brute_force_index_description': brute_force_index_description,
            'brute_force_index_labels': brute_force_index_labels,
            'index_endpoint_display_name': index_endpoint_display_name,
            'index_endpoint_description': index_endpoint_description,
            'deployed_ann_index_name': deployed_ann_index_name,
            'deployed_brute_force_index_name': deployed_brute_force_index_name,
            'num_neighbors': num_neighbors,
            'test_imgs_gcs_dir': test_imgs_gcs_dir,
            'num_test_samples': num_test_samples,
            'model_endpoint_name': model_endpoint_name,
            'model_display_name': model_display_name,
            'serving_container_image_uri': serving_container_image_uri,
            'traffic_percentage': traffic_percentage,
            'serving_machine_type': serving_machine_type,
            'serving_min_replica_count': serving_min_replica_count,
            'serving_max_replica_count': serving_max_replica_count,
            'index_json_name':index_json_name,
        },
        pipeline_root=f'{GS_PIPELINE_ROOT_PATH}/{VERSION}',
    )
    PIPELINES['train'] = response['name']