# Getting batch prediction EDA with Tensor Board and Text Gecko

In [None]:
import pandas as pd
from typing import Dict
from datasets import load_dataset
from typing import Dict, List
import tensorflow as tf
import numpy as np
import os
from tensorboard.plugins import projector

LIMIT = 3000
PROJECT_ID = "wortz-project-352116"
DATASET = "ecomm-embedding"
BUCKET_NAME = "ecomm-query-product-pairs"
BUCKET = f"gs://{BUCKET_NAME}"
USER_PROMPT = "User query: "
PRODUCT_PROMPT = "Product title: "

#### Load the data from huggingface

In [63]:
raw_data = load_dataset("tasksource/esci")  # , split=['train[:10%]','test[:10%]'])

#### Quick examination of the data

In [None]:
print("Total features: ", len(raw_data.column_names["train"]))
all_features = raw_data.column_names["train"]
all_features

#### Guide for embedding fine tuning

https://cloud.google.com/vertex-ai/generative-ai/docs/embeddings/get-text-embeddings

In [103]:
raw_data = load_dataset("tasksource/esci", split=["train[:10%]", "test[:10%]"])
train_split = raw_data[0]
test_split = raw_data[1]

In [110]:
def create_jsonl_from_raw_data(
    query_path: str,
    corpus_path: str,
    training_path: str,
    test_path: str,
    train_split=train_split,
    test_split=test_split,
    all_features=all_features,
    **kwargs,
) -> None:

    pandas_data_train = train_split.map(
        lambda example: {
          "query": f"{example["query"]}",
            "product_text": f"{example["product_text"]}",
            "query-id": str(example["query_id"]),
            "corpus-id": str(example["product_id"]),
            "score": 1,
        },
        remove_columns=all_features,
    ).to_pandas()

    pandas_data_test = train_split.map(
        lambda example: {
            "query": f"{example["query"]}",
            "product_text": f"{example["product_text"]}",
            "query-id": str(example["query_id"]),
            "corpus-id": str(example["product_id"]),
            "score": 1,
        },
        remove_columns=all_features,
    ).to_pandas()

    
    ### Data processing to create unique query and corpus vocab for training
    ### Plus examples
    rename_corpus = {"corpus-id": "_id", "product_text": "text"}
    rename_queries = {"query-id": "_id", "query": "text"}
    unique_corpus = pandas_data_train[["corpus-id", "product_text"]].rename(columns=rename_corpus)
    unique_corpus = unique_corpus.drop_duplicates(subset="_id")
    unique_queries = pandas_data_train[["query-id", "query"]].rename(columns=rename_queries)
    unique_queries.drop_duplicates(subset="_id")
    full_dataset_train = pandas_data_train[['query-id', 'corpus-id', 'score']]
    full_dataset_test = pandas_data_test[['query-id', 'corpus-id', 'score']]

    with open(query_path, "w") as f:
        f.write(unique_queries.to_json(lines=True, orient="records"))
    with open(corpus_path, "w") as f:
        f.write(unique_corpus.to_json(lines=True, orient="records"))
    full_dataset_train.to_csv(training_path, sep="\t")
    full_dataset_test.to_csv(test_path, sep="\t")

### For large file ops we will use gcs fuse to save directly to the bucket

`gcsfuse $BUCKET $data_path`

In [111]:
#establish mount point
data_path = 'tuning_data'
if not os.path.exists(data_path):
    os.mkdir(data_path)
!gcsfuse $BUCKET_NAME $data_path  

{"time":"23/03/2024 10:37:47.705614","severity":"INFO","message":"Start gcsfuse/2.0.0 (Go version go1.22.1) for app \"\" using mount point: /home/user/embedding_tensorboard_text_gecko_amazon_shopping/notebooks/tuning_data\n"}
{"time":"23/03/2024 10:37:47.706134","severity":"INFO","message":"GCSFuse mount command flags: {\"AppName\":\"\",\"Foreground\":false,\"ConfigFile\":\"\",\"MountOptions\":{},\"DirMode\":493,\"FileMode\":420,\"Uid\":-1,\"Gid\":-1,\"ImplicitDirs\":false,\"OnlyDir\":\"\",\"RenameDirLimit\":0,\"CustomEndpoint\":null,\"BillingProject\":\"\",\"KeyFile\":\"\",\"TokenUrl\":\"\",\"ReuseTokenFromUrl\":true,\"EgressBandwidthLimitBytesPerSecond\":-1,\"OpRateLimitHz\":-1,\"SequentialReadSizeMb\":200,\"MaxRetrySleep\":30000000000,\"StatCacheCapacity\":20460,\"StatCacheTTL\":60000000000,\"TypeCacheTTL\":60000000000,\"HttpClientTimeout\":0,\"MaxRetryDuration\":-1000000000,\"RetryMultiplier\":2,\"LocalFileCache\":false,\"TempDir\":\"\",\"ClientProtocol\":\"http1\",\"MaxConnsPerHos

In [112]:
# create gcs subfolder - why? this ensures we are not in the root for this and keeping the artifacts organized
if not os.path.exists(os.path.join(data_path, data_path)):
    os.mkdir(os.path.join(data_path, data_path))

#### This function processes the data and puts it into the proper paths
Data is stored in gcs via GCSFuse

In [113]:
# the corpus file https://cloud.google.com/vertex-ai/generative-ai/docs/models/tune-embeddings#prepare-tuning
create_jsonl_from_raw_data(
        query_path = f"{data_path}/{data_path}/query.jsonl",
    corpus_path= f"{data_path}//{data_path}/corpus.jsonl",
    training_path= f"{data_path}/{data_path}/corpus-train.TSV",
    test_path= f"{data_path}/{data_path}/corpus-test.TSV",
)

#### Set tuning parameters in cell below

Many of these are copy/paste from settings in first cell

In [114]:
%%bash
BUCKET='gs://ecomm-query-product-pairs'
PROJECT_ID=wortz-project-352116
BASE_MODEL_VERSION_ID='textembedding-gecko@003'
TASK_TYPE=SEMANTIC_SIMILARITY
PIPELINE_SCRATCH_PATH=${BUCKET}
QUERIES_PATH=${BUCKET}/tuning_data/query.jsonl
CORPUS_PATH=${BUCKET}/tuning_data/corpus.jsonl
TRAIN_LABEL_PATH=${BUCKET}/tuning_data/corpus-train.TSV
TEST_LABEL_PATH=${BUCKET}/tuning_data/corpus-test.TSV
BATCH_SIZE=100
ITERATIONS=1000


curl -X POST  \
  -H "Authorization: Bearer $(gcloud auth print-access-token)" \
  -H "Content-Type: application/json; charset=utf-8" \
"https://us-central1-aiplatform.googleapis.com/v1/projects/${PROJECT_ID}/locations/us-central1/pipelineJobs?pipelineJobId=tune-text-embedding-$(date +%Y%m%d%H%M%S)" \
-d '{
  "displayName": "tune-text-embedding-model",
  "runtimeConfig": {
    "gcsOutputDirectory": "'${PIPELINE_SCRATCH_PATH}'",
    "parameterValues": {
      "project":  "'${PROJECT_ID}'",
      "base_model_version_id":  "'${BASE_MODEL_VERSION_ID}'",
      "task_type": "'${TASK_TYPE}'",
      "location": "us-central1",
      "queries_path":  "'${QUERIES_PATH}'",
      "corpus_path":  "'${CORPUS_PATH}'",
      "train_label_path":  "'${TRAIN_LABEL_PATH}'",
      "test_label_path":  "'${TEST_LABEL_PATH}'",
      "batch_size":  "'${BATCH_SIZE}'",
      "iterations":  "'${ITERATIONS}'"
    }
  },
  "templateUri": "https://us-kfp.pkg.dev/ml-pipeline/llm-text-embedding/tune-text-embedding-model/v1.1.1"
}'

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed


{
  "name": "projects/679926387543/locations/us-central1/pipelineJobs/tune-text-embedding-20240323223758",
  "displayName": "tune-text-embedding-model",
  "createTime": "2024-03-23T22:37:58.266808Z",
  "updateTime": "2024-03-23T22:37:58.266808Z",
  "pipelineSpec": {
    "deploymentConfig": {
      "@type": "type.googleapis.com/ml_pipelines.PipelineDeploymentConfig",
      "executors": {
        "exec-text-embedding-autosplitter": {
          "container": {
            "image": "gcr.io/ml-pipeline/google-cloud-pipeline-components:2.3.1",
            "command": [
              "sh",
              "-ec",
              "program_path=$(mktemp -d)\n\nprintf \"%s\" \"$0\" \u003e \"$program_path/ephemeral_component.py\"\n_KFP_RUNTIME=true python3 -m kfp.dsl.executor_main                         --component_module_path                         \"$program_path/ephemeral_component.py\"                         \"$@\"\n",
              "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom 

100 78570    0 77726  100   844   268k   2982 --:--:-- --:--:-- --:--:--  271k


In [None]:
# def get_fine_tuning_dataframe(raw_data: datasets.dataset_dict.DatasetDict, unique_queries: Dict, unique_products: Dict, split: str = 'train') -> Tuple[pd.Dataframe, pd.Dataframe]:
#     ### https://cloud.google.com/vertex-ai/generative-ai/docs/models/tune-embeddings#generative-ai-tune-embedding-drest
#     data_dict_corpus = {"_id": [], "text": []}
#     data_dict_query = data_dict_corpus.copy()
#     raw_data = raw_data[split]
#     for row in raw_data:

In [None]:
def get_input_dataframe(
    raw_data: Dict,
    user_prompt: str = USER_PROMPT,
    product_prompt: str = PRODUCT_PROMPT,
    limit: int = LIMIT,
) -> pd.DataFrame:
    """
    This function returns batch prediction data for embeddings
    """

    for i, row in enumerate(raw_data["train"]):
        if i == limit - 1:
            break
        elif i == 0:
            query_prod_pairs = pd.DataFrame(
                {
                    "content": [f'{user_prompt}{row["query"]}'],
                    "type": ["query"],
                    "id": [row["query_id"]],
                }
            )
        else:
            query_prod_pairs = pd.concat(
                [
                    query_prod_pairs,
                    pd.DataFrame(
                        {
                            "content": [f'{user_prompt}{row["query"]}'],
                            "type": ["query"],
                            "id": [row["query_id"]],
                        }
                    ),
                ]
            )
        query_prod_pairs = pd.concat(
            [
                query_prod_pairs,
                pd.DataFrame(
                    {
                        "content": [f'{product_prompt}{row["product_title"]}'],
                        "type": ["product_title"],
                        "id": [row["product_id"]],
                    }
                ),
            ]
        )
    return query_prod_pairs

In [None]:
query_prod_pairs = get_input_dataframe(raw_data)

In [None]:
query_prod_pairs = query_prod_pairs.reset_index()
query_prod_pairs.head()

In [None]:
#### Get unique product ids

# Batch prediction

https://cloud.google.com/vertex-ai/generative-ai/docs/embeddings/batch-prediction-genai-embeddings#request_a_batch_response

____

In [None]:
! gsutil mb $BUCKET

In [None]:
output_file = "batch_prediction_inputs.jsonl"

with open(output_file, "w") as f:
    f.write(query_prod_pairs[["content"]].to_json(lines=True, orient="records"))

In [None]:
! gsutil cp $output_file $BUCKET

In [None]:
from datetime import datetime

now = datetime.now()
now_string_tag = now.strftime("%Y-%m-%d-%H-%M-%S")
print("Tag for this run: ", now_string_tag)

In [None]:
from vertexai.preview.language_models import TextEmbeddingModel

textembedding_model = TextEmbeddingModel.from_pretrained("textembedding-gecko")
batch_prediction_job = textembedding_model.batch_predict(
    dataset=[f"{BUCKET}/{output_file}"],
    destination_uri_prefix=f"{BUCKET}/batch-predict-{now_string_tag}",
)
print(batch_prediction_job.display_name)
print(batch_prediction_job.resource_name)
print(batch_prediction_job.state)

#### When complete you should see something like this

<img src='../img/bp-job.png' width=600px />

<img src='../img/output-data.png' width=600px />

### Visualize the embeddings with Tensorboard

Following this guide https://www.tensorflow.org/tensorboard/tensorboard_projector_plugin

In [None]:
bp_output_gcs_folder = batch_prediction_job.output_info.gcs_output_directory

! gsutil cp $bp_output_gcs_folder/* .

In [None]:
predictions = pd.read_json(path_or_buf="000000000000.jsonl", lines=True)
predictions.head()

In [None]:
def get_predictions(df: pd.DataFrame) -> List[List[float]]:
    embedding_list = []
    for _, row in df.iterrows():
        single_emb = row["predictions"][0]["embeddings"]["values"]
        embedding_list.append(single_emb)
    return embedding_list


embedding_list = get_predictions(predictions)
len(embedding_list)

In [None]:
# Set up a logs directory, so Tensorboard knows where to look for files.
log_dir = "logs/ecomm-example/"
if not os.path.exists(log_dir):
    os.makedirs(log_dir)

# Save Labels separately on a line-by-line manner.
with open(os.path.join(log_dir, "metadata.tsv"), "w") as f:
    # header for columns
    f.write("data_type\tdata\n")
    for instance in predictions.instance:
        data_type = instance["content"].split(": ")[0]
        # data_type = data_type
        data = "".join(instance["content"].split(": ")[1:])
        f.write(f"{data_type}\t{data}\n")


# Save the weights we want to analyze as a variable. Note that the first
# value represents any unknown word, which is not in the metadata, here
# we will remove this value.
weights = tf.Variable(embedding_list)
# Create a checkpoint from embedding, the filename and key are the
# name of the tensor.
checkpoint = tf.train.Checkpoint(embedding=weights)
checkpoint.save(os.path.join(log_dir, "embedding.ckpt"))

# Set up config.
config = projector.ProjectorConfig()
embedding = config.embeddings.add()
# The name of the tensor will be suffixed by `/.ATTRIBUTES/VARIABLE_VALUE`.
embedding.tensor_name = "embedding/.ATTRIBUTES/VARIABLE_VALUE"
embedding.metadata_path = "metadata.tsv"
projector.visualize_embeddings(log_dir, config)

In [None]:
# Load the TensorBoard notebook extension
%load_ext tensorboard

In [None]:
# Now run tensorboard against on log data we just saved.
%tensorboard --logdir logs/ecomm-example/

#### The above will run until you stop it

You should be able to investigate the embedding space via PCA. Note the total variance captured to understand how much information is captured from the 3d view

<img src="../img/tensorboard.png" width=600px />


#### Also a great way to understand performance is to select a point of interest and top k neighbors appear

Below, we see natural hair dye query and it's associated nearest product description in the embedding space:


<img src="../img/knn-analysis.png" width=900px />


#### Lastly, you can analyze and color by data type to get a feel for how well the queries relate to the products



<img src="../img/analysis-by-type.png" width=900px />


